环境:

net core 5

DotNetCore.CAP 6.0.1

RabbitMQ 3.8.5

Github

Cap官方文档

配置Cap

startup.cs:

 public void ConfigureServices(IServiceCollection services){services.AddControllers();//配置WebAPI服务services.AddTransient<ICapSubscribeService, CapSubscribeService>();
services.AddCap((option) =>{option.UseInMemoryStorage(); // 使用内存存储消息(消息发送失败处理)// 使用EntityFramework进行存储操作//option.UseEntityFramework<DatabaseContext>();// 使用sqlserver进行事务处理,防止推送MQ失败,会在指定数据库中自动生成以"cap."开头的表//option.UseSqlServer(Configuration.GetConnectionString("DefaultConnection"));//CAP支持 RabbitMQ、Kafka、AzureServiceBus 等作为MQ,进行事件中心处理,根据使用选择配置://option.UseRabbitMQ("MQConnectionStrings");//option.UseInMemoryStorage();//使用内存持久化消息option.UseRabbitMQ(rb =>{rb.HostName = "localhost";rb.UserName = "guest";rb.Password = "guest";rb.Port = 5672;rb.VirtualHost = "/";});//option.UseKafka("MQConnectionStrings");// 添加cap后台监控页面(人工处理);页面地址为“/cap”;如:http://localhost:5000/capoption.UseDashboard();//配置定时器重试策略option.FailedRetryInterval = 60; //重试间隔时间(秒),默认60s,使用默认的就可以,可不用配置option.FailedRetryCount = 5; //重试次数,默认值:50//设置处理成功的数据在数据库中保存的时间(秒),为保证系统新能,数据会定期清理。option.SucceedMessageExpiredAfter = 24 * 3600;//option.DefaultGroupName = "default-group-name";//指定默认消费者组名称//失败时的回调通知函数option.FailedThresholdCallback = (failedInfo) =>{Console.WriteLine($"失败回调:messageType:{failedInfo.MessageType};message:{failedInfo.Message}");};});}

控制器 CapPublishController:

发布:

[Route("api/[controller]")][ApiController]public class CapPublishController : ControllerBase{public readonly ICapPublisher _capPub;public CapPublishController(ICapPublisher capPub){_capPub = capPub;}[HttpPost]public ActionResult AddUser(){User user = new User{Age = 26,FirstName = "陈",LastName = "兆杰",};//简单发布:{_capPub.Publish("test.mq.adduser2", user);_capPub.Publish("test.mq.header", user);var header = new Dictionary<string, string>(){["my.header.first"] = "first",["my.header.second"] = "second"};_capPub.Publish("test.show.time", DateTime.Now, header);}//发布:订阅 + 回调{//发布+回调 ,test.mq.adduser1 订阅者消费成功后,会将消费函数的返回值作为,回调函数的入参_capPub.Publish("test.mq.adduser1", user, "CallBack1");_capPub.Publish("test.mq.adduser", user, "CallBack");}return base.Ok("添加成功");}[HttpPost][Route("~/ef/AddUser")]public IActionResult AddUser(User user){DatabaseContext dbContext = null;using (var trans = dbContext.Database.BeginTransaction(_capPub, autoCommit: true)){//业务代码_capPub.Publish("test.mq.adduser", DateTime.Now);//调用订阅 test.mq.adduser 的实现}return Ok();}[NonAction][CapSubscribe("test.mq.header", Group = "group.test2")]public void Subscriber(User user, [FromCap] CapHeader header){var id = header[Headers.MessageId];Console.WriteLine($@"{DateTime.Now} Subscriber invoked, Info: {user}");}}

订阅:

public interface ICapSubscribeService{/// <summary>/// 无需返回值,void或Task即可/// </summary>/// <param name="user"></param>User Show(User user);string Show1(User user);void Show2(User user);void CallBack1(string message);void CallBack(User user);}/// <summary>/// 如果你的订阅方法没有位于 Controller 中,则你订阅的类需要继承 ICapSubscribe /// /// 分组Group:/// CAP启动的时候,她将创建一个默认的消费者组,如果多个相同消费者组的消费者消费同一个Topic消息的时候,只会有一个消费者被执行。相反,如果消费者都位于不同的消费者组,则所有的消费者都会被执行/// </summary>public class CapSubscribeService : ICapSubscribeService, ICapSubscribe{//[CapSubscribe("test.mq.*")] // 一对多匹配[CapSubscribe("test.mq.adduser")] // 一对一匹配public User Show(User user){Console.WriteLine($"姓名:{user.FirstName + user.LastName} 年龄:{user.Age}");return user;}[CapSubscribe("test.mq.adduser1", Group = "group1")]public string Show1(User user){string message = $"姓名:{user.FirstName + user.LastName} 年龄:{user.Age}";Console.WriteLine(message);return message;}[CapSubscribe("test.mq.adduser2", Group = "group2")]public void Show2(User user){string message = $"姓名:{user.FirstName + user.LastName} 年龄:{user.Age}";Console.WriteLine(message);throw new NotImplementedException();}/// <summary>/// 回调:/// 可以用来判断订阅者是否处理成功,来处理发布的业务的回滚等等。。。/// message入参为发布函数的返回值,如:Show1 的返回值 /// 服务端处理来自客户端的订阅,即订阅回调topic: CallBack/// </summary>/// <param name="message"></param>[CapSubscribe("CallBack1", Group = "CallbackServer")]public void CallBack1(string message){Console.WriteLine($"接收到回调:{message}");}[CapSubscribe("CallBack", Group = "CallbackServer")]public void CallBack(User user){Console.WriteLine($"接收到回调:{user}");}}

Cap.DashBoard:

http://ip:port/cap

DotNetCore CAP相关推荐

  1. DotNetCore CAP(分布式事务最终一致性)框架

    前言 CAP用来处理分布式事务以及提供EventBus的功能,具有轻量级,高性能,易使用等特点. 安装包 Install-package DotNetCore.CAP Install-package ...

  2. DotNetCore.CAP 基础应用

    目录 1.准备工作 2.创建第一个项目 3.创建第二个项目 4.运行 1.准备工作 1.1 rabbitmq username:admin password:admin port:5672 1.2 m ...

  3. NetCore使用DotNetCore.CAP框架

    CAP 是一个EventBus,同时也是一个在微服务或者SOA系统中解决分布式事务问题的一个框架.它有助于创建可扩展,可靠并且易于更改的微服务系统. 这个项目中使用到SqlServer(数据库方面大同 ...

  4. 让DotnetCore.CAP和SignalR接力数据推送

    DotnetCore.CAP是一款用于实现数据最终一致性的开源方案,SignalR是微软ASP.NET/ASP.NET Core体系下的实时数据传输解决方案,两种技术看起来没什么交集,但具体的业务让两 ...

  5. DotNetCore.Cap分布式事务实现最终一致性

    项目演化过程中,微服务已经遍地开花.一个大项目下几十上百个微服务已经是常态.但衍生出另外一个问题就是跨微服务事务,跨库事务的分布式事务.市面了解过的有2CP.3CP.TCC等等分布式事务解决方案,各有 ...

  6. cap mysql_.NetCore关于Cap(RabbitMQ)结合MySql使用出现MySql相关类冲突问题解决办法

    问题还原 引用了 DotNetCore.CAP.MySql MySql.Data.EntityFrameworkCore 在使用MySql相关对象的时候会出现如下冲突,在命名空间加入伪空间名称是不能解 ...

  7. NCC CAP 6.0 发布 —— 新增支持 OpenTelemetry

    前言 今天,我们很高兴宣布 CAP 发布 6.0 版本正式版,在这个版本中,我们主要致力于对 OpenTelemetry 提供支持,以及更好的适配 .NET 6. 那么,接下来我们具体看一下吧. 总览 ...

  8. 分布式事务最终一致性-CAP框架轻松搞定

    前言 对于分布式事务,常用的解决方案根据一致性的程度可以进行如下划分: 强一致性(2PC.3PC):数据库层面的实现,通过锁定资源,牺牲可用性,保证数据的强一致性,效率相对比较低. 弱一致性(TCC) ...

  9. CAP 发布 5.0 版本正式发布

    前言 今天,我们很高兴宣布 CAP 发布 5.0 版本正式版.同时我们也很高兴的告诉你 CAP 已经有越来越多的用户并且变得越来越流行. 在 5.0 版本中,我们主要致力于更好的支持 .NET 5 以 ...

  10. 在 CAP 中使用 AOP ( Castle.DynamicProxy )

    简介 本篇文章主要介绍如何在 CAP 中集成使用 Castle.DynamicProxy,Castle DynamicProxy 是一个用于在运行时动态生成轻量级.NET代理的库.代理对象允许在不修改 ...

最新文章

  1. Android Market 链接的生成与分享
  2. iview select选中值取值_vue+iview 项目
  3. #一周五# VS2015 CTP6, TFS2015 CTP1更新,老衣的开发工具汇总,2015 MVP 社区巡讲
  4. Git-rebase 小筆記
  5. NULL和INITIAL的区别 and database interface
  6. windows同时安装python2和3编码_Windows同时安装多个版本,python2和python3,window
  7. *【HDU - 2473】Junk-Mail Filter (并查集--删点操作)
  8. ID3/C4.5/Gini Index
  9. 【转】测试用例编写(功能测试框架)
  10. vscode中配置cmake及debug使用
  11. 计算机软件故障实验报告,湖大选修实验报告计算机软硬件一般故障的排除.doc...
  12. 强连通分量-Trajan算法
  13. No bootable device
  14. 手机无线连接到电脑共享文件
  15. linux下使用LVM合并挂载硬盘以及扩容
  16. 双稳态电路的两个稳定状态是什么_单稳态电路与双稳态电路
  17. 小麦苗健康检查脚本说明(Oracle巡检脚本)
  18. [DP]JZOJ 5804 简单的序列
  19. 自制p站小姐姐图片返回api.
  20. 大胆预测一下《数据结构》期末机考题

热门文章

  1. 一度智信:电商平台商品定价策略
  2. 易语言教程数据库删除命令
  3. 【笔记】第2章 向量
  4. 对嵌入式开发方向的一些思考:在物联网方向
  5. WARNING: We noticed you're using the `useBuiltIns` option without declaring a core-js version.
  6. 2022年全球市场HTCC陶瓷封装总体规模、主要生产商、主要地区、产品和应用细分研究报告
  7. 二项分布,柏松分布和正态分布
  8. 应用程序正常初始化(0xc0000160)失败
  9. gatk过滤_快速入门GATK | Public Library of Bioinformatics
  10. 笔记本电脑键盘的禁用与恢复【亲测有效】