博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
FreeSql接入CAP的实践
阅读量:4033 次
发布时间:2019-05-24

本文共 9579 字,大约阅读时间需要 31 分钟。

CAP

CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。

  • https://github.com/dotnetcore/CAP

ADO.NET事务

1.DotNetCore.CAP.MySql中引用 了如下类库.在Commit事务时,会调用 Flush方法推送消息

  • https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs

    public class MySqlCapTransaction : CapTransactionBase    {        public MySqlCapTransaction(            IDispatcher dispatcher) : base(dispatcher)        {        }        public override void Commit()        {            Debug.Assert(DbTransaction != null);            switch (DbTransaction)            {                case IDbTransaction dbTransaction:                    dbTransaction.Commit();                    break;                case IDbContextTransaction dbContextTransaction:                    dbContextTransaction.Commit();                    break;            }            Flush();        }    }

其中我们能看到,事务的提交,会调用父类CapTransactionBase中的方法Flush。他是protected类型的,并未开放出此接口。

       protected virtual void Flush()        {            while (!_bufferList.IsEmpty)            {                _bufferList.TryDequeue(out var message);                _dispatcher.EnqueueToPublish(message);            }        }

我们来看一下集成 的demo调用

    [Route("~/adonet/transaction")]    public IActionResult AdonetWithTransaction()    {        using (var connection = new MySqlConnection(AppDbContext.ConnectionString))        {            using (var transaction = connection.BeginTransaction(_capBus, true))            {                //your business code                connection.Execute("insert into test(name) values('test')", transaction: (IDbTransaction)transaction.DbTransaction);                //for (int i = 0; i < 5; i++)                //{                _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);                //}            }        }        return Ok();    }
  • https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/ICapTransaction.MySql.cs

代码中通过扩展IDbConnection类,增加BeginTransaction方法,传递了注入的_capBus类,传了autoCommit

private readonly ICapPublisher _capBus;public PublishController(ICapPublisher capPublisher){    _capBus = capPublisher;}/// /// Start the CAP transaction/// /// 
The 
./// 
The 
./// 
Whether the transaction is automatically committed when the message is published/// 
The 
 object.
public static ICapTransaction BeginTransaction(this IDbConnection dbConnection,    ICapPublisher publisher, bool autoCommit = false){    if (dbConnection.State == ConnectionState.Closed)    {        dbConnection.Open();    }    var dbTransaction = dbConnection.BeginTransaction();    publisher.Transaction.Value = publisher.ServiceProvider.GetService
();    return publisher.Transaction.Value.Begin(dbTransaction, autoCommit);}

autoCommit:false,(此属性会自动提交事务,集成其他ORM,不建议开启)因为,我们只要调用 了Publish,他会调用MySqlCapTransaction中的Commit(),并执行Flush,即消息 会发出去。IDbContextTransaction

这段代码是非常 重要的。

    publisher.Transaction.Value = publisher.ServiceProvider.GetService
();

从CapPublisher中可以看出,事务是通过AsyncLocal实现状态共享的。

internal class CapPublisher : ICapPublisher{     public AsyncLocal
 Transaction { get; }}

publisher.Transaction.Value的类型实现上才是ICapTransaction ,

CapTransactionExtensions.cs还有一个扩展方法,调用Begin,相当于给当前控制器上注入的ICapPublisher设置了new MySqlConnection(AppDbContext.ConnectionString).BeginTransaction()的值。

      public static ICapTransaction Begin(this ICapTransaction transaction,            IDbTransaction dbTransaction, bool autoCommit = false)        {            transaction.DbTransaction = dbTransaction;            transaction.AutoCommit = autoCommit;            return transaction;        }

对于ADO.NET,我们只要传递transaction,就能保证发送消息和操作DB是一个事务了。。

EF Core事务

同样,我们看扩展方法和使用方式

    public static IDbContextTransaction BeginTransaction(this DatabaseFacade database,        ICapPublisher publisher, bool autoCommit = false)    {        var trans = database.BeginTransaction();        publisher.Transaction.Value = publisher.ServiceProvider.GetService
();        var capTrans = publisher.Transaction.Value.Begin(trans, autoCommit);        return new CapEFDbTransaction(capTrans);    }

dbContext.Database就是DatabaseFacade类型。直接能BeginTransaction事务。

[Route("~/ef/transaction")]public IActionResult EntityFrameworkWithTransaction([FromServices]AppDbContext dbContext){    using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: false))    {        dbContext.Persons.Add(new Person() { Name = "ef.transaction" });        for (int i = 0; i < 1; i++)        {            _capBus.Publish("sample.rabbitmq.mysql", DateTime.Now);        }        dbContext.SaveChanges();        trans.Commit();    }    return Ok();}

同样,还有一个Begin扩展方法,仅仅是给ICapTransaction赋下值。

public static ICapTransaction Begin(this ICapTransaction transaction,    IDbContextTransaction dbTransaction, bool autoCommit = false){    transaction.DbTransaction = dbTransaction;    transaction.AutoCommit = autoCommit;    return transaction;}

在这个demo,上,autoCommit是false,因为dbContext有自己的SaveChanges(),如果发送不太合适。SaveChanges()要做好些操作,具体不太情况是什么。具体不详细研究。

但我们可以看下CapTransactionBase源码,DbTransaction是Object类型。

EF Core中的事务类型是IDbContextTransaction

ADO.NET实际是IDbTransaction类型。

 public object DbTransaction { get; set; }

所以在最开始的那段代码,判断DbTransaction,是哪种类型,然后调用自身内部使用的事务进行Commit()。如果要集成其他ORM,但又想去掉EFCore的依赖,然后增加其他ORM,如下类似的处理,就是关键,比如CommitAsync,Commit,Roolback()

    public override void Commit()    {        Debug.Assert(DbTransaction != null);        switch (DbTransaction)        {            case IDbTransaction dbTransaction:                dbTransaction.Commit();                break;            case IDbContextTransaction dbContextTransaction:                dbContextTransaction.Commit();                break;        }        Flush();    }

还有MySqlDataStorage.cs

  • https://github.com/dotnetcore/CAP/blob/master/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs

判断dbTransaction的类型,然后获取当前事务,引用其他ORM,记得修改此处。

    var dbTrans = dbTransaction as IDbTransaction;    if (dbTrans == null && dbTransaction is IDbContextTransaction dbContextTrans)    {        dbTrans = dbContextTrans.GetDbTransaction();    }

参考项目(不打算维护)

  • https://github.com/luoyunchong/DotNetCore.CAP.Provider

  • 维护就要保证与上层Dotnetcore/cap项目保持同步,这是一件困难的事。

  • 还有一个重要的原因是:我们有更简单的方式。

FreeSql接入CAP(最简单的方式)

关于此问题的想法

  • https://github.com/luoyunchong/DotNetCore.CAP.Provider/issues/1

我们还是引用各自官方的库

Install-Package DotNetCore.CAP.DashboardInstall-Package DotNetCore.CAP.MySqlInstall-Package DotNetCore.CAP.RabbitMQInstall-Package FreeSqlInstall-Package FreeSql.DbContextInstall-Package FreeSql.Provider.MySqlConnector

关于CAP集成的方式,配置项,这里不做详情,官方地址有中文:http://cap.dotnetcore.xyz/

  • 代码参考。因为只有一个类,我们自行复制项目即可。

  • https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Application/CapUnitOfWorkExtensions.cs

重写扩展方法,BeginTransaction。是基于IUnitOfWork的扩展。

提交事务调用Commit(IUnitOfWork)时,内部再通过反射调用 ICapTransaction中protected类型的方法Flush。

public static class CapUnitOfWorkExtensions    {        public static void Flush(this ICapTransaction capTransaction)        {            capTransaction?.GetType().GetMethod("Flush", BindingFlags.Instance | BindingFlags.NonPublic)?.Invoke(capTransaction, null);        }               public static ICapTransaction BeginTransaction(this IUnitOfWork unitOfWork, ICapPublisher publisher, bool autoCommit = false)        {            publisher.Transaction.Value = (ICapTransaction)publisher.ServiceProvider.GetService(typeof(ICapTransaction));            return publisher.Transaction.Value.Begin(unitOfWork.GetOrBeginTransaction(), autoCommit);        }        public static void Commit(this ICapTransaction capTransaction, IUnitOfWork unitOfWork)        {            unitOfWork.Commit();            capTransaction.Flush();        }    }

注入我们的FreeSql

public void ConfigureServices(IServiceCollection services) {    IConfigurationSection configurationSection = Configuration.GetSection($"ConnectionStrings:MySql");    IFreeSql fsql = new FreeSqlBuilder()           .UseConnectionString(DataType.MySql, configurationSection.Value);           .UseNameConvert(NameConvertType.PascalCaseToUnderscoreWithLower)           .UseAutoSyncStructure(true)           .UseNoneCommandParameter(true)           .UseMonitorCommand(cmd =>           {               Trace.WriteLine(cmd.CommandText + ";");           }           )           .Build();    services.AddSingleton(fsql);    services.AddFreeRepository();    services.AddScoped
();}

示例

    [HttpGet("~/freesql/unitofwork/{id}")]    public DateTime UnitOfWorkManagerTransaction(int id, [FromServices] IBaseRepository
 repo)    {        DateTime now = DateTime.Now;        using (IUnitOfWork uow = _unitOfWorkManager.Begin())        {            ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);            repo.Insert(new Book()            {                Author = "luoyunchong",                Summary = "2",                Title = "122"            });            _capBus.Publish("freesql.time", now);            trans.Commit(uow);        }        return now;    }        [NonAction]    [CapSubscribe("freesql.time")]    public void GetTime(DateTime time)    {        Console.WriteLine($"time:{time}");    }

注意trans不需要using,freesql内部会释放资源。,也可using,但请更新到最新的freesql版本。

ICapTransaction trans = _unitOfWorkManager.Current.BeginTransaction(_capBus, false);

提交事务,也请调用扩展方法,否则事务无法正常。

trans.Commit(uow);

源码位置

  • https://github.com/luoyunchong/lin-cms-dotnetcore/blob/master/src/LinCms.Web/Controllers/v1/TestController.cs

转载地址:http://sakdi.baihongyu.com/

你可能感兴趣的文章
Selenium-ActionChains Api接口详解
查看>>
Selenium-Switch与SelectApi接口详解
查看>>
Selenium-Css Selector使用方法
查看>>
Linux常用统计命令之wc
查看>>
测试必会之 Linux 三剑客之 sed
查看>>
Socket请求XML客户端程序
查看>>
Java中数字转大写货币(支持到千亿)
查看>>
Java.nio
查看>>
函数模版类模版和偏特化泛化的总结
查看>>
VMware Workstation Pro虚拟机不可用解决方法
查看>>
最简单的使用redis自带程序实现c程序远程访问redis服务
查看>>
redis学习总结-- 内部数据 字符串 链表 字典 跳跃表
查看>>
iOS 对象序列化与反序列化
查看>>
iOS 序列化与反序列化(runtime) 01
查看>>
iOS AFN 3.0版本前后区别 01
查看>>
iOS ASI和AFN有什么区别
查看>>
iOS QQ侧滑菜单(高仿)
查看>>
iOS 扫一扫功能开发
查看>>
iOS app之间的跳转以及传参数
查看>>
iOS __block和__weak的区别
查看>>