本文共 9579 字,大约阅读时间需要 31 分钟。
CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。
https://github.com/dotnetcore/CAP
1.DotNetCore.CAP.MySql中引用 了如下类库.在Commit事务时,会调用 Flush方法推送消息
https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
public class MySqlCapTransaction : CapTransactionBase { public MySqlCapTransaction( IDispatcher dispatcher) : base(dispatcher) { } public override void Commit() { Debug.Assert(DbTransaction != null); switch (DbTransaction) { case IDbTransaction dbTransaction: dbTransaction.Commit(); break; case IDbContextTransaction dbContextTransaction: dbContextTransaction.Commit(); break; } Flush(); } }
其中我们能看到,事务的提交,会调用父类CapTransactionBase中的方法Flush。他是protected类型的,并未开放出此接口。
protected virtual void Flush() { while (!_bufferList.IsEmpty) { _bufferList.TryDequeue(out var message); _dispatcher.EnqueueToPublish(message); } }
我们来看一下集成 的demo调用
[Route("~/adonet/transaction")] public IActionResult AdonetWithTransaction() { using (var connection = new MySqlConnection(AppDbContext.ConnectionString)) { using (var transaction = connection.BeginTransaction(_capBus, true)) { //your business code connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction); //for (int i = 0; i < 5; i++) //{ _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); //} } } return Ok(); }
https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs
代码中通过扩展IDbConnection类,增加BeginTransaction方法,传递了注入的_capBus类,传了autoCommit
private readonly ICapPublisher _capBus;public PublishController(ICapPublisher capPublisher){ _capBus = capPublisher;}////// Start the CAP transaction/// /// The./// The ./// Whether the transaction is automatically committed when the message is published/// The public static ICapTransaction BeginTransaction(this IDbConnection dbConnection, ICapPublisher publisher, bool autoCommit = false){ if (dbConnection.State == ConnectionState.Closed) { dbConnection.Open(); } var dbTransaction = dbConnection.BeginTransaction(); publisher.Transaction.Value = publisher.ServiceProvider.GetServiceobject. (); return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);}
autoCommit:false,(此属性会自动提交事务,集成其他ORM,不建议开启)因为,我们只要调用 了Publish,他会调用MySqlCapTransaction中的Commit(),并执行Flush,即消息 会发出去。IDbContextTransaction
这段代码是非常 重要的。
publisher.Transaction.Value = publisher.ServiceProvider.GetService();
从CapPublisher中可以看出,事务是通过AsyncLocal实现状态共享的。
internal class CapPublisher : ICapPublisher{ public AsyncLocalTransaction { get; }}
publisher.Transaction.Value的类型实现上才是ICapTransaction ,
CapTransactionExtensions.cs还有一个扩展方法,调用Begin,相当于给当前控制器上注入的ICapPublisher设置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。
public static ICapTransaction Begin(this ICapTransaction transaction, IDbTransaction dbTransaction, bool autoCommit = false) { transaction.DbTransaction = dbTransaction; transaction.AutoCommit = autoCommit; return transaction; }
对于ADO.NET,我们只要传递transaction,就能保证发送消息和操作DB是一个事务了。。
同样,我们看扩展方法和使用方式
public static IDbContextTransaction BeginTransaction(this DatabaseFacade database, ICapPublisher publisher, bool autoCommit = false) { var trans = database.BeginTransaction(); publisher.Transaction.Value = publisher.ServiceProvider.GetService(); var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit); return new CapEFDbTransaction(capTrans); }
dbContext.Database就是DatabaseFacade类型。直接能BeginTransaction事务。
[Route("~/ef/transaction")]public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext){ using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false)) { dbContext.Persons.Add(new Person() { Name = "ef.transaction" }); for (int i = 0; i < 1; i++) { _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now); } dbContext.SaveChanges(); trans.Commit(); } return Ok();}
同样,还有一个Begin扩展方法,仅仅是给ICapTransaction赋下值。
public static ICapTransaction Begin(this ICapTransaction transaction, IDbContextTransaction dbTransaction, bool autoCommit = false){ transaction.DbTransaction = dbTransaction; transaction.AutoCommit = autoCommit; return transaction;}
在这个demo,上,autoCommit是false,因为dbContext有自己的SaveChanges(),如果发送不太合适。SaveChanges()要做好些操作,具体不太情况是什么。具体不详细研究。
但我们可以看下CapTransactionBase源码,DbTransaction是Object类型。
EF Core中的事务类型是IDbContextTransaction
ADO.NET实际是IDbTransaction类型。
public object DbTransaction { get; set; }
所以在最开始的那段代码,判断DbTransaction,是哪种类型,然后调用自身内部使用的事务进行Commit()。如果要集成其他ORM,但又想去掉EFCore的依赖,然后增加其他ORM,如下类似的处理,就是关键,比如CommitAsync,Commit,Roolback()
public override void Commit() { Debug.Assert(DbTransaction != null); switch (DbTransaction) { case IDbTransaction dbTransaction: dbTransaction.Commit(); break; case IDbContextTransaction dbContextTransaction: dbContextTransaction.Commit(); break; } Flush(); }
还有MySqlDataStorage.cs
https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
判断dbTransaction的类型,然后获取当前事务,引用其他ORM,记得修改此处。
var dbTrans = dbTransaction as IDbTransaction; if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans) { dbTrans = dbContextTrans.GetDbTransaction(); }
https://github.com/luoyunchong/DotNetCore.CAP.Provider
维护就要保证与上层Dotnetcore/cap项目保持同步,这是一件困难的事。
还有一个重要的原因是:我们有更简单的方式。
关于此问题的想法
https://github.com/luoyunchong/DotNetCore.CAP.Provider/issues/1
我们还是引用各自官方的库
Install-Package DotNetCore.CAP.DashboardInstall-Package DotNetCore.CAP.MySqlInstall-Package DotNetCore.CAP.RabbitMQInstall-Package FreeSqlInstall-Package FreeSql.DbContextInstall-Package FreeSql.Provider.MySqlConnector
关于CAP集成的方式,配置项,这里不做详情,官方地址有中文:http://cap.dotnetcore.xyz/
代码参考。因为只有一个类,我们自行复制项目即可。
https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Application/CapUnitOfWorkExtensions.cs
重写扩展方法,BeginTransaction。是基于IUnitOfWork的扩展。
提交事务调用Commit(IUnitOfWork)时,内部再通过反射调用 ICapTransaction中protected类型的方法Flush。
public static class CapUnitOfWorkExtensions { public static void Flush(this ICapTransaction capTransaction) { capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null); } public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false) { publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction)); return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit); } public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork) { unitOfWork.Commit(); capTransaction.Flush(); } }
注入我们的FreeSql
public void ConfigureServices(IServiceCollection services) { IConfigurationSection configurationSection = Configuration.GetSection($"ConnectionStrings:MySql"); IFreeSql fsql = new FreeSqlBuilder() .UseConnectionString(DataType.MySql, configurationSection.Value); .UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower) .UseAutoSyncStructure(true) .UseNoneCommandParameter(true) .UseMonitorCommand(cmd => { Trace.WriteLine(cmd.CommandText + ";"); } ) .Build(); services.AddSingleton(fsql); services.AddFreeRepository(); services.AddScoped();}
[HttpGet("~/freesql/unitofwork/{id}")] public DateTime UnitOfWorkManagerTransaction(int id, [FromServices] IBaseRepositoryrepo) { DateTime now = DateTime.Now; using (IUnitOfWork uow = _unitOfWorkManager.Begin()) { ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false); repo.Insert(new Book() { Author = "luoyunchong", Summary = "2", Title = "122" }); _capBus.Publish("freesql.time", now); trans.Commit(uow); } return now; } [NonAction] [CapSubscribe("freesql.time")] public void GetTime(DateTime time) { Console.WriteLine($"time:{time}"); }
注意trans不需要using,freesql内部会释放资源。,也可using,但请更新到最新的freesql版本。
ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);
提交事务,也请调用扩展方法,否则事务无法正常。
trans.Commit(uow);
源码位置
https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Web/Controllers/v1/TestController.cs
转载地址:http://sakdi.baihongyu.com/