【Kafka】| 总结/Edison Zhou

1可用的Kafka .NET客户端

作为一个.NET Developer,自然想要在.NET项目中集成Kafka实现发布订阅功能。那么,目前可用的Kafka客户端有哪些呢?

目前.NET圈子主流使用的是 Confluent.Kafka

confluent-kafka-dotnet : https://github.com/confluentinc/confluent-kafka-dotnet

其他主流的客户端还有rdkafka-dotnet项目,但是其已经被并入confluent-kakfa-dotnet项目进行维护了。

因此,推荐使用confluent-kafka-dotnet,其配置友好,功能也更全面。

NCC千星项目CAP的Kafka扩展包(DotNetCore.CAP.Kafka)内部也是基于Confluent.Kafka来实现的:

接下来,本文就来在.NET Core项目下通过Confluent.Kafka和CAP两个主流开源项目来操作一下Kafka,实现一下发布订阅的功能。

2基于Confluent.Kafka的Sample

要完成本文示例,首先得有一个启动好的Kafka Broker服务。关于如何搭建Kafka,请参考上一篇:通过Docker部署Kafka集群。

安装相关组件

在.NET Core项目中新建一个类库,暂且命名为EDT.Kafka.Core,安装Confluent.Kafka组件:

PM>Install-Package Confluent.Kafka

编写KafkaService

编写IKafkaService接口:

namespace EDT.Kafka.Core
{public interface IKafkaService{Task PublishAsync<T>(string topicName, T message) where T : class;Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class;}
}

编写KafkaService实现类:

namespace EDT.Kafka.Core
{public class KafkaService : IKafkaService{public static string KAFKA_SERVERS = "127.0.0.1:9091";public async Task PublishAsync<T>(string topicName, T message) where T : class{var config = new ProducerConfig { BootstrapServers = KAFKA_SERVERS,BatchSize = 16384, // 修改批次大小为16KLingerMs = 20 // 修改等待时间为20ms};using (var producer = new ProducerBuilder<string, string>(config).Build()){await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = JsonConvert.SerializeObject(message)}); ;}}public async Task SubscribeAsync<T>(IEnumerable<string> topics, Action<T> messageFunc, CancellationToken cancellationToken = default) where T : class{var config = new ConsumerConfig{BootstrapServers = KAFKA_SERVERS,GroupId = "Consumer",EnableAutoCommit = false, // 禁止AutoCommitAcks = Acks.Leader, // 假设只需要Leader响应即可AutoOffsetReset = AutoOffsetReset.Earliest // 从最早的开始消费起};using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}T messageResult = null;try{messageResult = JsonConvert.DeserializeObject<T>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}await Task.CompletedTask;}}
}

为了方便后续的演示,在此项目中再创建一个类 EventData:

public class EventData
{public string TopicName { get; set; }public string Message { get; set; }public DateTime EventTime { get; set; }
}

编写Producer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Producer,其主体内容如下:

namespace EDT.Kafka.Demo.Producer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();for (int i = 0; i < 50; i++){var eventData = new EventData{TopicName = "testtopic",Message = $"This is a message from Producer, Index : {i + 1}",EventTime = DateTime.Now};await kafkaService.PublishAsync(eventData.TopicName, eventData);}Console.WriteLine("Publish Done!");Console.ReadKey();}}
}

编写Consumer

新建一个Console项目,暂且命名为:EDT.Kafka.Demo.Consumer,其主体内容如下:

namespace EDT.Kafka.Demo.Consumer
{public class Program{static async Task Main(string[] args){KafkaService.KAFKA_SERVERS = "kafka1:9091,kafka2:9092,kafka3:9093";var kafkaService = new KafkaService();var topics = new List<string> { "testtopic" };await kafkaService.SubscribeAsync<EventData>(topics, (eventData) => {Console.WriteLine($" - {eventData.EventTime: yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");});}}
}

测试Pub/Sub效果

将Producer和Consumer两个项目都启动起来,可以看到当Consumer消费完50条消息并一一确认之后,Producer这边就算发布结束。

3基于CAP项目的Sample

模拟场景说明

假设我们有两个微服务,一个是Catalog微服务,一个是Basket微服务,当Catalog微服务产生了Product价格更新的事件,就会将其发布到Kafka,Basket微服务作为消费者就会订阅这个消息然后更新购物车中对应商品的最新价格。

Catalog API

新建一个ASP.NET Core WebAPI项目,然后分别安装以下组件:

PM>Install Package DotNetCore.CAP
PM>Install Package DotNetCore.CAP.MongoDB
PM>Install Package DotNetCore.CAP.Kafka

在Startup中的ConfigureServices方法中注入CAP:

public void ConfigureServices(IServiceCollection services)
{......services.AddCap(x =>{x.UseMongoDB("mongodb://account:password@mongodb-server:27017/products?authSource=admin");x.UseKafka("kafka1:9091,kafka2:9092,kafka3:9093");});
}

新建一个ProductController,实现一个Update产品价格的接口,在其中通过CapPublisher完成发布消息到Kafka:

namespace EDT.Demo.Catalog.API.Controllers
{[ApiController][Route("[controller]")]public class ProductController : ControllerBase{private static readonly IList<Product> Products = new List<Product>{new Product { Id = "0001", Name = "电动牙刷A", Price = 99.90M,  Introduction = "暂无介绍" },new Product { Id = "0002", Name = "电动牙刷B", Price = 199.90M,  Introduction = "暂无介绍" },new Product { Id = "0003", Name = "洗衣机A", Price = 2999.90M,  Introduction = "暂无介绍" },new Product { Id = "0004", Name = "洗衣机B", Price = 3999.90M,  Introduction = "暂无介绍" },new Product { Id = "0005", Name = "电视机A", Price = 1899.90M,  Introduction = "暂无介绍" },};private readonly ICapPublisher _publisher;private readonly IMapper _mapper;public ProductController(ICapPublisher publisher, IMapper mapper){_publisher = publisher;_mapper = mapper;}[HttpGet]public IList<ProductDTO> Get(){return _mapper.Map<IList<ProductDTO>>(Products); ;}[HttpPut]public async Task<IActionResult> UpdatePrice(string id, decimal newPrice){// 业务代码var product = Products.FirstOrDefault(p => p.Id == id);product.Price = newPrice;// 发布消息await _publisher.PublishAsync("ProductPriceChanged", new ProductDTO { Id = product.Id, Name = product.Name, Price = product.Price});return NoContent();}}
}

Basket API

参照Catalog API项目创建ASP.NET Core WebAPI项目,并安装对应组件,在ConfigureServices方法中注入CAP。

新建一个BasketController,用于订阅Kafka对应Topic:ProductPriceChanged 的消息。

namespace EDT.Demo.Basket.API.Controllers
{[ApiController][Route("[controller]")]public class BasketController : ControllerBase{private static readonly IList<MyBasketDTO> Baskets = new List<MyBasketDTO>{new MyBasketDTO { UserId = "U001", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0001", Name = "电动牙刷A", Price = 99.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0005", Name = "电视机A", Price = 1899.90M }, Count = 1 },}    },new MyBasketDTO { UserId = "U002", Catalogs = new List<Catalog>{new Catalog { Product = new ProductDTO { Id = "0002", Name = "电动牙刷B", Price = 199.90M }, Count = 2 },new Catalog { Product = new ProductDTO { Id = "0004", Name = "洗衣机B", Price = 3999.90M }, Count = 1 },}}};[HttpGet]public IList<MyBasketDTO> Get(){return Baskets;}[NonAction][CapSubscribe("ProductPriceChanged")]public async Task RefreshBasketProductPrice(ProductDTO productDTO){if (productDTO == null)return;foreach (var basket in Baskets){foreach (var catalog in basket.Catalogs){if (catalog.Product.Id == productDTO.Id){catalog.Product.Price = productDTO.Price;break;}}}await Task.CompletedTask;}}
}

测试效果

同时启动Catalog API 和 Basket API两个项目。

首先,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品是199.9元。

然后,通过Swagger在Catalog API中更新Id为0002的商品的价格至499.9元。

最后,通过Swagger在Basket API中查看所有用户购物车中的商品的价格,可以看到,0002的商品已更新至499.9元。

End总结

本文总结了.NET Core如何通过对应客户端操作Kafka,基于Confluent.Kafka项目和CAP项目可以方便的实现发布订阅的效果。

参考资料

阿星Plus,《.NET Core下使用Kafka》:https://blog.csdn.net/meowv/article/details/108675741

麦比乌斯皇,《.NET使用Kafka小结》:https://www.cnblogs.com/hsxian/p/12907542.html

Tony,《.NET Core事件总线解决方案:CAP基于Kafka》:https://www.cnblogs.com/Tony100/archive/2019/01/29/10333440.html

极客时间,胡夕《Kafka核心技术与实战》

B站,尚硅谷《Kafka 3.x入门到精通教程》

年终总结:Edison的2020年终总结

数字化转型:我在传统企业做数字化转型

C#刷题:C#刷剑指Offer算法题系列文章目录

.NET面试:.NET开发面试知识体系

.NET大会:2020年中国.NET开发者大会PDF资料

Kafka学习征途:.NET Core操作Kafka相关推荐

  1. Kafka学习笔记(3)----Kafka的数据复制(Replica)与Failover

    1. CAP理论 1.1 Cosistency(一致性) 通过某个节点的写操作结果对后面通过其他节点的读操作可见. 如果更新数据后,并发访问的情况下可立即感知该更新,称为强一致性 如果允许之后部分或全 ...

  2. Kafka学习笔记(八)Kafka消费者

    版权声明:本文为转载文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 原文链接:https://blog.csdn.net/weixin_39468305/articl ...

  3. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

  4. Kafka学习笔记-Java简单操作

    Maven依赖包: [plain] view plaincopy <dependency> <groupId>org.apache.kafka</groupId> ...

  5. Kafka学习总结(1)——Kafka入门简介

    Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. ...

  6. kafka学习(六):kafka应用场景

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ ...

  7. kafka学习(一)初识kafka

    本文借鉴:再过半小时,你就能明白kafka的工作原理了(特此感谢!) 一.简介 定义:kafka是一个分布式,基于zookeeper协调的发布/订阅模式的消息系统,本质是一个MQ(消息队列Messag ...

  8. Kafka学习记录(三)——Broker

    Kafka学习记录(三)--Broker 目录 Kafka学习记录(三)--Broker 对应课程 Zookeeper存储的Kafka信息 Broker总体工作流程 Broker的服役和退役 Kafk ...

  9. Kafka学习记录(四)——消费者

    Kafka学习记录(四)--消费者 目录 Kafka学习记录(四)--消费者 对应课程 Kafka消费者工作流程 消费方式和流程 消费者组原理 消费者组初始化流程 消费者组详细消费流程 重要参数 ka ...

最新文章

  1. 抓住青春的尾巴再愤青一把
  2. ACMMM2017 | 电子科大斩获最佳论文!中科院自动化所多媒体计算组获得IEEE期刊最佳论文!
  3. 单身人数减少?平均年薪 15 万+?今年的程序员可不一般!
  4. bower解决js的依赖管理
  5. Oracle自治事务
  6. 在Azure上创建MYSQL服务
  7. Java中date和calendar的用法
  8. mysql线程异常中断事务_清理MySQL死锁事务线程
  9. 台式计算机攒机报告,计算机装机报告.doc
  10. 编写一个求和函数sum,使输入sum(2)(3)或输入sum(2,3),输出结果都为5
  11. redis 都有哪些数据类型?分别在哪些场景下使用比较合适?
  12. C#String.Split (string[], StringSplitOptions) 多参数分割得到数组
  13. 背靠福特的Argo无人车发生严重事故,两名乘客已送往医院
  14. python快速入门神器 知乎_Python爬虫偷懒神器!快速一键生成Python爬虫请求头
  15. FreeRTOS 教程指南 学习笔记 第三章 任务管理(二)
  16. c语言void调用不可作为,C语言的选择题.pdf
  17. 学习:网络接口RJ45
  18. 虎虎生威年,用Compose Canvas画只猛虎让大家 “虎躯一震” 吧
  19. 第075封“情书”:百撕不得其解Tearing Cloth<Entagma>Houdini 2018
  20. mysql盲注_Mysql 布尔型盲注手工注入详解

热门文章

  1. PRO-seq数据分析
  2. 中国工业管道过滤器市场深度研究分析报告
  3. 千锋 Vue 详细笔记整理
  4. OSChina 周四乱弹 ——印象开源中国:总是飙车就容易脱发
  5. OpenGL ES:绘制函数glDrawArrays 和 glDrawElements 的区别
  6. #9733;一名“标题党”自我修炼的10…
  7. 微信SEO优化搜索排名如何做
  8. 云计算机英语怎么说,云的英语怎么说
  9. Supermap iDesktop处理导入CAD文件存在线型风格显示缺失问题
  10. matlab 实现完整的正弦波信号发生器