.net core 中使用confluent kafka构建生产者
- 创建.net 6 API
- 安装依赖包
- 创建kafka生产者
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using KafkaHelper.Config;
using Microsoft.Extensions.Options;namespace KafkaHelper
{public class KafkaProducer{public IOptionsMonitor<KafkaConfig> _kafkaconfig;public KafkaProducer(IOptionsMonitor<KafkaConfig> kafkaconfig){_kafkaconfig = kafkaconfig;}public void sendMessage(){}//创建topicpublic async Task<bool> createTopic(string topicName,short factorNum,int partitionNum){using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}"}).Build()){try{await adminClient.CreateTopicsAsync(new TopicSpecification[] {new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }});return true;}catch (CreateTopicsException e){Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");return false;}}}//删除topicpublic async Task<bool> deleteTopic(List<string> topicName){using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build()){try{await adminClient.DeleteTopicsAsync(topicName, null);return true;}catch (Exception){return false;}}}//判断topic存在与否public async Task<bool> checkTopic(string topicName){using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build()){var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));var topicsMetadata = metadata.Topics;var topicNames = metadata.Topics.Select(a => a.Topic).ToList();return topicNames.Contains(topicName);}}public static void handler(DeliveryReport<Null, string> deliveryReport){Console.WriteLine(!deliveryReport.Error.IsError? $"Delivered message to {deliveryReport.TopicPartitionOffset}": $"Delivery Error: {deliveryReport.Error.Reason}");}public bool sendMessage(string topicName){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2};try{using (var p = new ProducerBuilder<Null, string>(producerConfig).Build()){p.Produce(topicName, new Message<Null, string> { Value = $"my message" }, handler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}//自定义将消息发送到某个topic的分区中,以保证这个分区只存储某一个特定类型的数据public bool sendMessagePartition(string topicName){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2};try{using (var p = new ProducerBuilder<Null, string>(producerConfig).Build()){var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build();var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));var topic = meta.Topics.SingleOrDefault(t => t.Topic == topicName);var topicPartitions = topic.Partitions;TopicPartition topicPartition = new TopicPartition(topicName, new Partition(1));p.Produce(topicPartition, new Message<Null, string> { Value = $"my message" }, handler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}//添加标头public bool sendMessageHeader(string topicName){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2};try{using (var p = new ProducerBuilder<Null, string>(producerConfig).Build()){var header = new Headers();header.Add("ellis", Encoding.UTF8.GetBytes("{\"ellis\":\"dalian\"}"));p.Produce(topicName, new Message<Null, string> { Value = $"my message",Headers=header }, handler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}public static void keyhandler(DeliveryReport<string, string> deliveryReport){Console.WriteLine(!deliveryReport.Error.IsError? $"Delivered message to {deliveryReport.TopicPartitionOffset}": $"Delivery Error: {deliveryReport.Error.Reason}");}//通过指定key,让kafka按照key的hash值进行message的分区选择,相同的key会发送到相同的分区public bool sendMessageKey(string topicName,string key){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2,CompressionType= CompressionType.Gzip,Acks= Acks.All,EnableIdempotence= true,};try{using (var p = new ProducerBuilder<string, string>(producerConfig).Build()){p.Produce(topicName, new Message<string, string> { Key=key, Value = $"my test" }, keyhandler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}}
}
p.Flush(TimeSpan.FromSeconds(10)),这里Flush函数的作用是等待所有回调函数执行完成,参数是超时时间,也就是最大的等待时间,这个操作无法被取消,所以应该设置较短的时间。还有需要注意的是,Flush函数的位置,不要让阻塞出现在循环中。
需要说明的是,kafka生产者在不指定key的时候,消息会均衡的分布在各个分区,我们可以指定消息的key,使得同一个key的消息发送到同一个分区。也可以指定消息发送的partition。
同一个分区消息是有序的。
关于kafka生产者的配置
https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html
样例
https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples
本博客源码
https://github.com/xdqt/asp.net-core-efcore-jwt-middleware/tree/master/CoreKafka
.net core 中使用confluent kafka构建生产者相关推荐
- 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)
1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...
- 在Asp.Net Core中集成Kafka
在我们的业务中,我们通常需要在自己的业务子系统之间相互发送消息,一端去发送消息另一端去消费当前消息,这就涉及到使用消息队列MQ的一些内容,消息队列成熟的框架有多种,这里你可以读这篇文章来了解这些MQ的 ...
- 基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型
一.引言 研究Kafka有一段时间了,略有心得,基于此自己就写了一个Kafka的消费者的类和Kafka消息生产者的类,进行了单元测试和生产环境的测试,还是挺可靠的. 二.源码 话不多说,直接上代码,代 ...
- 如何在ASP.NET Core 中快速构建PDF文档
比如我们需要ASP.NET Core 中需要通过PDF来进行某些简单的报表开发,随着这并不难,但还是会手忙脚乱的去搜索一些资料,那么恭喜您,这篇帖子会帮助到您,我们就不会再去浪费一些宝贵的时间. 在本 ...
- 在ASP.NET Core中使用的ML.NET模型构建器入门
目录 介绍 背景 先决条件 使用代码 第1步-创建ASP.NET Core应用程序 步骤2:使用ML.NET Model Builder 数据 训练 评估 代码 步骤3:将ML.NET添加到ASP.N ...
- 如何在ASP.NET Core中使用SignalR构建与Angular通信的实时通信应用程序
图片 假设我们要创建一个监视Web应用程序,该应用程序为用户提供了一个能够显示一系列信息的仪表板,这些信息会随着时间的推移而更新. 第一种方法是在定义的时间间隔(轮询)定期调用API 以更新仪表板上的 ...
- .net core confluent kafka消费者
定义消费者 using KafkaHelper.Config; using Microsoft.Extensions.Options; using System; using System.Colle ...
- DataPipeline联合Confluent Kafka Meetup上海站
Confluent作为国际数据"流"处理技术领先者,提供实时数据处理解决方案,在市场上拥有大量企业客户,帮助企业轻松访问各类数据.DataPipeline作为国内首家原生支持Kaf ...
- 基于Confluent.Kafka实现的Kafka客户端操作类使用详解
一.引言 有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续.今天正好是周末,有点时间,来写新东西吧.最近公司用了Kafka做为消息的中间件,最开始写的那个版本 ...
最新文章
- solidworks完全卸载安装高版本
- latex_Texstudio+Miktex+Bibtex(参考文献引用信息)
- #中regex的命名空间_Python命名空间实例解析
- redux-form(V7.4.2)笔记(二)
- iOS UILable高度自适应
- sass和less的优缺点
- 基于pt100温度计仿真_基于8pt网格的设计系统
- linux内核体系学习路径_Linux内核分析(一)linux体系简介|内核源码简介|内核配置编译安装...
- 一个合格程序员的标准
- 用PHP制作饼图调查表
- win7上的linux环境变量,java之环境变量配置win7andlinux.docx
- ssh传输越多越慢_Linux下分析网站访问慢原因
- SQL-SERVER2008登录错误233
- Debit credit problem
- 管理感悟:需要什么样的技术文档
- 稳压二极管型号对照表
- 带农历万年历C语言程序,c语言万年历程序代码
- CSR8811A12-ICXR-R双模数据传输模块 蓝牙芯片4.2
- 阿里云邮箱企业版和个人免费版之间的区别
- jsp显示中文文件名的图片 详细出处参考:http://www.jb51.net/article/37149.htm