65.9K
CodeProject 正在变化。 阅读更多。
Home

MongoDB 和 System.Transactions

starIconstarIconstarIconstarIcon
emptyStarIcon
starIcon

4.67/5 (13投票s)

2016 年 6 月 6 日

CPOL

9分钟阅读

viewsIcon

32640

如何在 MongoDB 操作中使用 TransactionScope。

引言

MongoDB 是一个开源的面向文档的 NoSQL 数据库,它使用类似 JSON 的文档和动态模式进行工作——正如其维基百科页面所描述的那样。本文档的目的不是提供 MongoDB 的高层概述或入门教程——但如果您还不了解这个出色软件的来龙去脉,那么最好的起点就是其官方网站 https://mongodb.ac.cn/what-is-mongodb

本文档既不是 C# 操作 MongoDB 的入门教程,尽管它会在一定程度上涉及使用 .NET 方式连接和组织 CRUD 操作的要点。

本文档将描述如何构建自己的资源管理器,以便为现有的 MongoDB 数据操作集成可靠的事务机制。我自己在许多项目中都缺少这个功能,一段时间后,我决定自己实现它。我决定该解决方案应满足 3 个核心的、不容争辩的标准:

  1. 该解决方案不应具有侵入性。MongoDB 的设计初衷就是如此,为了可伸缩性和部署问题,这些问题超出了本文的讨论范围。在这种情况下,任何解决方案都不应以可能违反 MongoDB 架构原则的方式行事。因此,我需要一种包装器式的解决方案,将操作提升为事务,充当外骨骼(sic),而不是插件。
  2. 它必须简单,并且采用一种可接受、经过测试、可靠且适合企业需求的方法。没有人想学习一个新的自定义库,或者更糟的是将其嵌入到生产环境中;尤其是在涉及事务和故障恢复等关键问题时。自定义事务框架解决方案被排除在外。
  3. 我们需要将我们的数据操作提升为事务,而不是从头开始构建一个事务框架。

鉴于这些边界条件,显而易见的答案是利用 Microsoft 自 2.0 版本以来在 System.Transactions 命名空间中提供的强大且可扩展的功能。

背景

什么是资源管理器?

任何参与事务的资源都必须由资源管理器进行管理。资源管理器负责跟踪其资源的数值变化,并根据事务的结果,允许数值更改提交或回滚到原始值。Microsoft SQL Server、消息队列、Oracle 数据库、自定义内存数据结构都可以作为资源管理器。

资源管理器处理持久或易失数据。这种区别在于资源管理器是否支持故障恢复。如果资源管理器支持持久性,它会将数据持久化到持久存储,这样在系统发生故障、崩溃、中断等情况下,它可以在恢复后重新执行事务,并执行所有必要的操作以完成中断的事务。另一方面,易失资源管理器在内存数据结构等易失资源中处理其数据,这意味着在发生故障时,恢复中断的事务不再是可选项。本文将介绍的资源管理器样板代码主要用于处理易失资源。然而,构建一个支持故障恢复的资源管理器,将不会远离本文的结论。这个粗略描述的动作序列由事务管理器协调。

什么是事务管理器?

事务管理器是协调所有资源管理器并确保事务无论其状态如何(提交或回滚)都能到达终点的组件。幸运的是,我们无需开发任何东西,因为 Microsoft 在开箱即用的产品中提供了多种选择(记住决定标准 #3)。

  1. 轻量级事务管理器 (LTM):这是一个相当高效的事务管理器,它使用轻量级事务协议(超出本文范围——请自行搜索)来管理本地事务(单个应用程序域)。
  2. 内核事务管理器 (KTM):管理事务型操作系统资源,如现代 Windows 版本(Vista 及更高版本)的文件系统和注册表资源管理器。与我们的上下文完全无关。
  3. 分布式事务协调器 (DTC):支持跨任何执行边界的事务。

当我们从 System.Transactions 启动一个新事务时,它总是最初由 LTM 处理。当我们的已注册资源管理器只处理易失数据或最多一个单阶段通知资源(单个域,无硬盘交互)时,整个过程都由 LTM 监督。如果我们有更多持久资源,或者至少有一个资源不支持单阶段通知,或者资源跨域边界,那么 DTC 会自动接管。

它们是如何组合的?

要使资源管理器成为事务的一部分,它必须在事务中注册。在 .NET 的 System.Transactions 命名空间中,有一个名为 Transactions 的类,它提供了一些以“Enlist”开头的方​​法,这就是让它成为事务一部分所需的全部。正如我们所说,资源管理器可以有两种注册方式:持久和易失。因此,在根据我们的资源管理器持久性支持选择使用哪种方法后,我们最终将我们的资源注册为两阶段提交的一部分。额外的要求是我们的类(将作为资源管理器)必须实现 IEnlistmentNotification 接口。(实现细节可以在下一段找到)。通过实现此接口(当然也要注册我们的资源管理器),我们确保事务管理器将回调我们的资源管理器,以便在事务提交或中止时执行必要的操作。

注册后,事情就变得相当直接了。持久或易失资源管理器正在准备其资源信息(通过在内存或持久存储中保存版本/快照),并等待事务管理器启动两阶段提交阶段。事务管理器将要求每个已注册的资源管理器报告它们是否准备好提交。资源管理器必须准备并投票表示其打算提交还是中止事务。如果任何参与的资源管理器声明失败,事务管理器将指示所有资源管理器回滚。

MongoDB 的基本 CRUD 操作

虽然我在引言中已经说过,这不是一个 C# 操作 MongoDB 的入门教程,但我们需要提及一些内容作为后续使用的基础。

我一直喜欢在设计系统时构建一个强大的对象层次结构。因此,我们将存储在 MongoDB 数据库中的每个实体文档都将继承自一个单一的父类,该父类包含的信息不多,但如下所示:

    public class MongoEntity : IEntity
    {
        [BsonId]
        public string Id { get; set; }
    }

对于数据库中的每个集合,我们将创建一个相应的辅助类,该类允许我们(基于 Repository Pattern)针对数据库执行所有 CRUD 操作。

    public interface IEntityRepository<T> where T : IEntity
    {
        IEnumerable<T> All();
        IQueryable<T> All(int page, int pageSize);
        T Get(string id);
        IQueryable<T> GetFunc(Expression<Func<T, bool>> expression);
        T Add(T entity);
        int Add(IEnumerable<T> entities);
        void Remove(T entity);
        bool Remove(string id);
        bool RemoveAll();
        int Remove(Expression<Func<T, bool>> expression);
        T Update(T updatedEntity);
    }

然后,我们构造一个抽象类,作为我们所有未来存储库的基类。

    public abstract class EntityRepositoryBase<T> : IEntityRepository<T> where T : IEntity
    {
        private MongoServer m_Server;
        private MongoDatabase m_Index;

        private MongoCollection<T> m_Entities;
        public MongoCollection<T> Entities
        {
            get
            {
                return m_Entities;
            }
        }

        private string m_Collection;

        public EntityRepositoryBase(string collection) : this(collection, null, null)
        {

        }

        public EntityRepositoryBase(string collection, string connectionString, string database)
        {
            m_Collection = collection;
            m_Server = new MongoClient(connectionString).GetServer();
            m_Index = m_Server.GetDatabase(database);
            m_Entities = m_Index.GetCollection<T>(m_Collection);
        }

        public IEnumerable<T> All()
        {
            return this.m_Entities.AsQueryable<T>().ToList();
        }

        public IQueryable<T> All(int page, int pageSize)
        {
            //Out of the scope of this article
        }

        public IEnumerable<D> AllAs<D>()
        {
            return m_Entities.AsQueryable<T>().OfType<D>().ToList();
        }

        public T Get(string id)
        {
            IMongoQuery query = Query.EQ("_id", id);
            return this.m_Entities.Find(query).FirstOrDefault();
        }

        public IQueryable<T> GetFunc(System.Linq.Expressions.Expression<Func<T, bool>> expression)
        {
            return this.m_Entities.AsQueryable<T>().Where(expression);
        }

        public IQueryable<T> GetFunc(System.Linq.Expressions.Expression<Func<T, bool>> expression, int page, int pageSize)
        {
            return this.m_Entities.AsQueryable<T>().Where(expression).Skip((page - 1) * pageSize).Take(pageSize);
        }

        public IQueryable<T> GetAs<D>(System.Linq.Expressions.Expression<Func<T, bool>> expression)
        {
            return m_Entities.FindAllAs<D>().Cast<T>().ToList().AsQueryable().Where(expression);
        }

        public virtual T Add(T entity)
        {
            try
            {
                IEntity oEntity = (entity as IEntity);

                oEntity.Id = String.IsNullOrEmpty(oEntity.Id) ?
                    ObjectId.GenerateNewId().ToString() :
                    oEntity.Id;

                m_Entities.Insert(entity);

                return entity;
            }
            catch (Exception mongoException)
            {
                if (mongoException.HResult == -2146233088)
                {
                    throw new MongoEntityUniqueIndexException("Unique Index violation", mongoException);
                }
                else
                {
                    throw mongoException;
                }
            }

            return default(T);
        }

        public virtual int Add(IEnumerable<T> entities)
        {
            int addCount = 0;

            entities.ToList().ForEach(entity =>
            {
                if (Add(entity) != null)
                {
                    addCount++;
                }
            });

            return addCount;
        }

        public virtual void AddBatch(IEnumerable<T> entities)
        {
            int addCount = 0;

            entities.ToList().ForEach(entity =>
            {
                IEntity oEntity = (entity as IEntity);

                oEntity.Id = String.IsNullOrEmpty(oEntity.Id) ?
                    ObjectId.GenerateNewId().ToString() :
                    oEntity.Id;

                oEntity.Created = timeStamp;
                oEntity.LastModified = timeStamp;
            });

            try
            {
                m_Entities.InsertBatch(entities);
            }
            catch (Exception addBatchException)
            {

            }
        }

        public virtual void Remove(T entity)
        {
            Remove(entity.Id);
        }

        public virtual bool Remove(string id)
        {
            try
            {
                IMongoQuery query = Query.EQ("_id", id);
                var result = m_Entities.Remove(query);
                return result.DocumentsAffected == 1;
            }
            catch (Exception mongoException)
            {
            }

            return false;
        }

        public virtual bool RemoveAll()
        {
            try
            {
                var result = m_Entities.RemoveAll();
                return result.DocumentsAffected == 1;
            }
            catch (Exception mongoException)
            {

            }

            return false;
        }

        public virtual int Remove(System.Linq.Expressions.Expression<Func<T, bool>> expression)
        {
            int removeCount = 0;
            List<T> entitiesToRemove = this.m_Entities.AsQueryable<T>().Where(expression).ToList();

            entitiesToRemove.ForEach(entity =>
            {
                if (Remove((entity as IEntity).Id))
                {
                    removeCount++;
                }
            });

            return removeCount;
        }

        public virtual T Update(T updatedEntity)
        {
            return Update(updatedEntity);
        }

    }

我们到目前为止取得的成就:我们已经创建了一种通用的方式来启动与 MongoDB 数据库的连接并执行基本关键数据操作(不完整的代码片段如下)。

    public EntityRepositoryBase(string collection) : this(collection, null, null)
    
    public EntityRepositoryBase(string collection, string connectionString, string database)

    public virtual T Add(T entity)

    public virtual void Remove(T entity)

    public virtual T Update(T updatedEntity)

 

构建事务支持

正如我们之前提到的,需要注册的核心组件是为了参与事务的资源管理器。它必须满足两个要求。首先,它必须能够保存资源的数值及其变化,并实现 IEnlistmentNotification 接口。决定是构建一个单独的类来保存资源的值,该类还将包含提交和回滚功能。这个类将是 TransactionEntity<T>

    public class TransactionalEntity<T> where T : IEntity
    {
        private T m_Original;
        public T Original
        {
          get { return m_Original; }
        }

        private T m_Current;
        public T Current
        {
            get { return m_Current; }
        }

        private TransactionalRepositoryBase<T> m_Repository;

        public TransactionalRepositoryBase<T> Repository
        {
            get { return m_Repository; }
        }

        private bool m_CommitWithSuccess = false;

        public bool CommitWithSuccess
        {
            get { return m_CommitWithSuccess; }
        }

        private bool m_RollbackWithSuccess = false;

        public bool RollbackWithSuccess
        {
            get { return m_RollbackWithSuccess; }
        }

        private bool m_Prepared = false;

        public bool Prepared
        {
            get { return m_Prepared; }
        }

        private EntityRepositoryCommandsEnum m_Command;

        public EntityRepositoryCommandsEnum Command
        {
            get { return m_Command; }
        }

        public TransactionalEntity(T original, T current, TransactionalRepositoryBase<T> repository, EntityRepositoryCommandsEnum command)
        {
            m_Original = original;
            m_Current = current;
            m_Repository = repository;
            m_Command = command;
        }

        public bool Commit()
        {
            // if it reached that far it means that all are OK, need just to inform
            // resource manager that he can vote for commit

            m_CommitWithSuccess = true;
            return m_CommitWithSuccess;
        }

        public bool Rollback()
        {
            if (m_Command == EntityRepositoryCommandsEnum.Update)
            {
                m_Repository.NonTxUpdate(this.m_Original);
            }

            if (m_Command == EntityRepositoryCommandsEnum.Add)
            {
                m_Repository.NonTxRemove(this.m_Current);
            }

            if (m_Command == EntityRepositoryCommandsEnum.Remove)
            {
                m_Repository.NonTxAdd(this.m_Original);
            }

            m_RollbackWithSuccess = true;
            return m_RollbackWithSuccess;
        }

        public T Add()
        {
            T result = m_Repository.NonTxAdd(this.m_Current);
            m_Prepared = true;

            return result;
        }

        public void Remove()
        {
            m_Repository.NonTxRemove(this.Original);
            m_Prepared = true;
        }

        public T Update()
        {
            T result =  m_Repository.NonTxUpdate(this.m_Current);
            m_Prepared = true;

            return result;
        }
    }

TransactionEntity<T> 将是一个泛型类,以 IEntity 作为参数。它将在 respective 属性中保存 CurrentOriginal 值,并且它将知道它必须使用的存储库类,以便针对数据库执行操作。此外,通过 command 属性,它将自我感知要执行哪个命令,并相应地决定在失败时要执行的补偿命令。

    public T Original
 
    public T Current
    
    public TransactionalEntity(T original, T current, TransactionalRepositoryBase<T> repository, EntityRepositoryCommandsEnum command)
    
    public EntityRepositoryCommandsEnum Command

命令是有限的,并且是枚举的一部分。

    public enum EntityRepositoryCommandsEnum
    {
        Add,
        Remove,
        Update
    }

此外,它还包含 5 个重要方法。CommitRollback,它们将由资源管理器代表资源执行(在所有投票都已投出并且事务管理器已决定该事务的结果之后)。根据针对数据库的命令,Rollback 方法决定了补偿对策会是什么。

        public bool Commit()
        {
            m_CommitWithSuccess = true;
            return m_CommitWithSuccess;
        }

        public bool Rollback()
        {
            if (m_Command == EntityRepositoryCommandsEnum.Update)
            {
                m_Repository.NonTxUpdate(this.m_Original);
            }

            if (m_Command == EntityRepositoryCommandsEnum.Add)
            {
                m_Repository.NonTxRemove(this.m_Current);
            }

            if (m_Command == EntityRepositoryCommandsEnum.Remove)
            {
                m_Repository.NonTxAdd(this.m_Original);
            }

            m_RollbackWithSuccess = true;
            return m_RollbackWithSuccess;
        }

剩下的 3 个方法是 Add、Update 和 Remove,它们通过执行单阶段提交(这将是我们的准备操作)来实际执行工作。它们调用我们将在本文后面讨论的关联存储库类的某些委托。

        public T Add()
        {
            T result = m_Repository.NonTxAdd(this.m_Current);
            m_Prepared = true;

            return result;
        }

        public void Remove()
        {
            m_Repository.NonTxRemove(this.Original);
            m_Prepared = true;
        }

        public T Update()
        {
            T result =  m_Repository.NonTxUpdate(this.m_Current);
            m_Prepared = true;

            return result;
        }

下一个出现的类是最终期待已久的资源管理器。当然,它是一个泛型类,像我们家族树中的所有其他类一样,以 IEntity 作为参数。CommitInDoubtRollbackPrepare 方法是事务管理器实际的回调。它利用 TransactionalEntity<T> 来准备、提交、回滚数据,并通知其关于事务结果的投票的决定。

    public class MongoResourceManager<T> : IEnlistmentNotification where T : IEntity 
    {
        private TransactionalEntity<T> m_TxEntity;

        public MongoResourceManager(TransactionalEntity<T> txEntity)
        {
            m_TxEntity = txEntity;
        }

        public MongoResourceManager(T entity, TransactionalRepositoryBase<T> repository, EntityRepositoryCommandsEnum command)
        {
            T current = entity;
            T original = repository.Get(entity.Id);

            TransactionalEntity<T> txEntity = new TransactionalEntity<T>(original, current, repository, command);

            m_TxEntity = txEntity;
        }

        public void Commit(Enlistment enlistment)
        {
            bool success = this.m_TxEntity.Commit();

            if (success)
            {
                enlistment.Done();
            }
        }

        public void InDoubt(Enlistment enlistment)
        {
            Rollback(enlistment);
        }

        public void Prepare(PreparingEnlistment preparingEnlistment)
        {
            if (this.m_TxEntity.Prepared)
            {
                preparingEnlistment.Prepared();
            }
        }

        public void Rollback(Enlistment enlistment)
        {
            bool success = this.m_TxEntity.Rollback();

            if (success)
            {
                enlistment.Done();
            }
        }
    }

为了实现我们的目标,我们必须创建一个存储库类的新子类。这个新的子类 TransactionalRepositoryBase<T> 将能够识别发出的命令是否在事务(确切地说,在 TransactionScope 内)的上下文中发出,并创建新的 MongoResourceManager<T> 并注册它,或者执行正常的数据库操作。

    public abstract class TransactionalRepositoryBase<T> : EntityRepositoryBase<T> where T : IEntity
    {
        internal delegate T AddEntityHandler(T entity);
        internal delegate void RemoveEntityHandler(T entity);
        internal delegate T UpdateEntityHandler(T entity);

        internal AddEntityHandler NonTxAdd;
        internal RemoveEntityHandler NonTxRemove;
        internal UpdateEntityHandler NonTxUpdate;

        public TransactionalRepositoryBase(string collection) : this(collection, null, null)
        {
        }

        public TransactionalRepositoryBase(string collection, string connectionString, string database) : base(collection, connectionString, database)
        {
            NonTxAdd = new AddEntityHandler(base.Add);
            NonTxRemove = new RemoveEntityHandler(base.Remove);
            NonTxUpdate = new UpdateEntityHandler(base.Update);
        }

        public override T Add(T entity)
        {
            if (Transaction.Current != null)
            {
                TransactionalEntity<T> txEntity = new TransactionalEntity<T>(default(T), entity, this, EntityRepositoryCommandsEnum.Add);
                MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);

                Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
                return txEntity.Add();
            }
            else 
            {
                return NonTxAdd(entity);
            }
        }

        public override void Remove(T entity)
        {
            if (Transaction.Current != null)
            {
                TransactionalEntity<T> txEntity = new TransactionalEntity<T>(entity, default(T), this, EntityRepositoryCommandsEnum.Remove);
                MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);

                Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
                txEntity.Remove();
            }
            else
            {
                NonTxRemove(entity);
            }
        }

        public override T Update(T entity)
        {
            if (Transaction.Current != null)
            {
                T original = this.Get(entity.Id);
                TransactionalEntity<T> txEntity = new TransactionalEntity<T>(original, entity, this, EntityRepositoryCommandsEnum.Remove);
                MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);

                Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
                return txEntity.Update();
            }
            else
            {
                return NonTxUpdate(entity);
            }
        }
    }

实现这一目标的关键是重写原始存储库抽象类的 AddUpdateRemove 方法,如下所示。通过检查 Transaction.Current 属性,我们可以确定我们是否在 TransactionScope 的上下文中。我们创建 TransactionalEntity<T> 和它的 MongoResourceManager<T>,然后将其注册为易失性,并执行 TransactionalEntity<T> 类提供的方​​法(AddRemoveUpdate)。否则,我们将执行我们之前讨论的其中一个委托。

        public override T Add(T entity)
        {
            if (Transaction.Current != null)
            {
                TransactionalEntity<T> txEntity = new TransactionalEntity<T>(default(T), entity, this, EntityRepositoryCommandsEnum.Add);
                MongoResourceManager<T> txRm = new MongoResourceManager<T>(txEntity);

                Transaction.Current.EnlistVolatile(txRm, EnlistmentOptions.None);
                return txEntity.Add();
            }
            else 
            {
                return NonTxAdd(entity);
            }
        }

这些委托指向基类(EntityRepositoryBase)的 AddRemoveUpdate 方法,并将以单阶段提交的方式执行数据库命令。它们将由 TransactionalRepositoryBase<T>TransactionalEntity<T> 重用。

        internal delegate T AddEntityHandler(T entity);         
        internal delegate void RemoveEntityHandler(T entity);         
        internal delegate T UpdateEntityHandler(T entity);  
       
        internal AddEntityHandler NonTxAdd;         
        internal RemoveEntityHandler NonTxRemove;         
        internal UpdateEntityHandler NonTxUpdate;  
       
        public TransactionalRepositoryBase(string collection, string connectionString, string database) : base(collection, connectionString, database)         
        {             
              NonTxAdd = new AddEntityHandler(base.Add);             
              NonTxRemove = new RemoveEntityHandler(base.Remove);             
              NonTxUpdate = new UpdateEntityHandler(base.Update);         
        }

 

示例和用法

让我们创建一个新的实体和一个新的存储库类。

    public class TestDocument : MongoEntity
    {
        public string DocumentId { get; set; }
    }

    public class TestDocumentsRepository : TransactionalRepositoryBase<TestDocument>
    {
        public TestDocumentsRepository()
            : base("test_documents", "mongodb://:27017", "tx_tests")
        {

        }
    }

然后,我们创建一个可以模拟随机成功和中止事务的情况。每次调用 repository.Add(document) 方法时,都会创建一个新的 MongoResourceManager<T> 并将其注册到为我们的 TransactionScope 范围创建的事务中。如果代码成功执行到 scope.Complete(),则事务将成功提交,否则它将自动回滚并从集合中删除所有数据。

        private void ExecuteInTx(object sender, EventArgs e)
        {
            TestDocumentsRepository repository = new TestDocumentsRepository();
            repository.RemoveAll();

            using (TransactionScope scope = new TransactionScope(TransactionScopeOption.RequiresNew))
            {
                try
                {
                    for (int docIdx = 0; docIdx < 5; docIdx++)
                    {
                        int poison = random.Next(1000);

                        if (poison != 0 && poison % 200 == 0)
                        {
                            throw new Exception("Poison killed my TX");
                        }

                        TestDocument document = new TestDocument();
                        document.DocumentId = Guid.NewGuid().ToString();

                        repository.Add(document);

                        Thread.Sleep(100);
                    }

                    scope.Complete();
                }
                catch (Exception)
                {

                }               
            }
        }

 

关注点

要监视 LTM 或 DTC 的活动,您只需打开 Windows 的组件服务 MMC 即可。

本文提供的代码绝不是供您在生产环境中使用的一次性复制代码。它是一个指南和基础,说明如何在 .NET 的通用方式中将事务集成到 MongoDB 数据库。

历史

-

© . All rights reserved.