1. 创建.net 6 API
  2. 安装依赖包
  3. 创建kafka生产者
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using KafkaHelper.Config;
using Microsoft.Extensions.Options;namespace KafkaHelper
{public class KafkaProducer{public IOptionsMonitor<KafkaConfig> _kafkaconfig;public KafkaProducer(IOptionsMonitor<KafkaConfig> kafkaconfig){_kafkaconfig = kafkaconfig;}public void sendMessage(){}//创建topicpublic async Task<bool> createTopic(string topicName,short factorNum,int partitionNum){using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}"}).Build()){try{await adminClient.CreateTopicsAsync(new TopicSpecification[] {new TopicSpecification { Name = topicName, ReplicationFactor = factorNum, NumPartitions = partitionNum }});return true;}catch (CreateTopicsException e){Console.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}");return false;}}}//删除topicpublic async Task<bool> deleteTopic(List<string> topicName){using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build()){try{await adminClient.DeleteTopicsAsync(topicName, null);return true;}catch (Exception){return false;}}}//判断topic存在与否public async Task<bool> checkTopic(string topicName){using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build()){var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));var topicsMetadata = metadata.Topics;var topicNames = metadata.Topics.Select(a => a.Topic).ToList();return topicNames.Contains(topicName);}}public static void handler(DeliveryReport<Null, string> deliveryReport){Console.WriteLine(!deliveryReport.Error.IsError? $"Delivered message to {deliveryReport.TopicPartitionOffset}": $"Delivery Error: {deliveryReport.Error.Reason}");}public bool sendMessage(string topicName){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2};try{using (var p = new ProducerBuilder<Null, string>(producerConfig).Build()){p.Produce(topicName, new Message<Null, string> { Value = $"my message" }, handler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}//自定义将消息发送到某个topic的分区中,以保证这个分区只存储某一个特定类型的数据public bool sendMessagePartition(string topicName){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2};try{using (var p = new ProducerBuilder<Null, string>(producerConfig).Build()){var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}" }).Build();var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20));var topic = meta.Topics.SingleOrDefault(t => t.Topic == topicName);var topicPartitions = topic.Partitions;TopicPartition topicPartition = new TopicPartition(topicName, new Partition(1));p.Produce(topicPartition, new Message<Null, string> { Value = $"my message" }, handler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}//添加标头public bool sendMessageHeader(string topicName){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2};try{using (var p = new ProducerBuilder<Null, string>(producerConfig).Build()){var header = new Headers();header.Add("ellis", Encoding.UTF8.GetBytes("{\"ellis\":\"dalian\"}"));p.Produce(topicName, new Message<Null, string> { Value = $"my message",Headers=header }, handler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}public static void keyhandler(DeliveryReport<string, string> deliveryReport){Console.WriteLine(!deliveryReport.Error.IsError? $"Delivered message to {deliveryReport.TopicPartitionOffset}": $"Delivery Error: {deliveryReport.Error.Reason}");}//通过指定key,让kafka按照key的hash值进行message的分区选择,相同的key会发送到相同的分区public bool sendMessageKey(string topicName,string key){var producerConfig = new ProducerConfig{BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",MessageSendMaxRetries = 2,CompressionType= CompressionType.Gzip,Acks= Acks.All,EnableIdempotence= true,};try{using (var p = new ProducerBuilder<string, string>(producerConfig).Build()){p.Produce(topicName, new Message<string, string> { Key=key, Value = $"my test" }, keyhandler);p.Flush(TimeSpan.FromSeconds(10));}return true;}catch (Exception){return false;}}}
}

p.Flush(TimeSpan.FromSeconds(10)),这里Flush函数的作用是等待所有回调函数执行完成,参数是超时时间,也就是最大的等待时间,这个操作无法被取消,所以应该设置较短的时间。还有需要注意的是,Flush函数的位置,不要让阻塞出现在循环中。

需要说明的是,kafka生产者在不指定key的时候,消息会均衡的分布在各个分区,我们可以指定消息的key,使得同一个key的消息发送到同一个分区。也可以指定消息发送的partition。

同一个分区消息是有序的。

关于kafka生产者的配置
https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html

样例
https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples

本博客源码
https://github.com/xdqt/asp.net-core-efcore-jwt-middleware/tree/master/CoreKafka

.net core 中使用confluent kafka构建生产者相关推荐

  1. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  2. 在Asp.Net Core中集成Kafka

    在我们的业务中,我们通常需要在自己的业务子系统之间相互发送消息,一端去发送消息另一端去消费当前消息,这就涉及到使用消息队列MQ的一些内容,消息队列成熟的框架有多种,这里你可以读这篇文章来了解这些MQ的 ...

  3. 基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型

    一.引言 研究Kafka有一段时间了,略有心得,基于此自己就写了一个Kafka的消费者的类和Kafka消息生产者的类,进行了单元测试和生产环境的测试,还是挺可靠的. 二.源码 话不多说,直接上代码,代 ...

  4. 如何在ASP.NET Core 中快速构建PDF文档

    比如我们需要ASP.NET Core 中需要通过PDF来进行某些简单的报表开发,随着这并不难,但还是会手忙脚乱的去搜索一些资料,那么恭喜您,这篇帖子会帮助到您,我们就不会再去浪费一些宝贵的时间. 在本 ...

  5. 在ASP.NET Core中使用的ML.NET模型构建器入门

    目录 介绍 背景 先决条件 使用代码 第1步-创建ASP.NET Core应用程序 步骤2:使用ML.NET Model Builder 数据 训练 评估 代码 步骤3:将ML.NET添加到ASP.N ...

  6. 如何在ASP.NET Core中使用SignalR构建与Angular通信的实时通信应用程序

    图片 假设我们要创建一个监视Web应用程序,该应用程序为用户提供了一个能够显示一系列信息的仪表板,这些信息会随着时间的推移而更新. 第一种方法是在定义的时间间隔(轮询)定期调用API 以更新仪表板上的 ...

  7. .net core confluent kafka消费者

    定义消费者 using KafkaHelper.Config; using Microsoft.Extensions.Options; using System; using System.Colle ...

  8. DataPipeline联合Confluent Kafka Meetup上海站

    Confluent作为国际数据"流"处理技术领先者,提供实时数据处理解决方案,在市场上拥有大量企业客户,帮助企业轻松访问各类数据.DataPipeline作为国内首家原生支持Kaf ...

  9. 基于Confluent.Kafka实现的Kafka客户端操作类使用详解

    一.引言 有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续.今天正好是周末,有点时间,来写新东西吧.最近公司用了Kafka做为消息的中间件,最开始写的那个版本 ...

最新文章

  1. solidworks完全卸载安装高版本
  2. latex_Texstudio+Miktex+Bibtex(参考文献引用信息)
  3. #中regex的命名空间_Python命名空间实例解析
  4. redux-form(V7.4.2)笔记(二)
  5. iOS UILable高度自适应
  6. sass和less的优缺点
  7. 基于pt100温度计仿真_基于8pt网格的设计系统
  8. linux内核体系学习路径_Linux内核分析(一)linux体系简介|内核源码简介|内核配置编译安装...
  9. 一个合格程序员的标准
  10. 用PHP制作饼图调查表
  11. win7上的linux环境变量,java之环境变量配置win7andlinux.docx
  12. ssh传输越多越慢_Linux下分析网站访问慢原因
  13. SQL-SERVER2008登录错误233
  14. Debit credit problem
  15. 管理感悟:需要什么样的技术文档
  16. 稳压二极管型号对照表
  17. 带农历万年历C语言程序,c语言万年历程序代码
  18. CSR8811A12-ICXR-R双模数据传输模块 蓝牙芯片4.2
  19. 阿里云邮箱企业版和个人免费版之间的区别
  20. jsp显示中文文件名的图片 详细出处参考:http://www.jb51.net/article/37149.htm

热门文章

  1. L1-概率论中的10个基本概念:古典概率、联合概率、条件概率、生日问题等
  2. afrog 进阶篇(实战)
  3. echart地图隐藏南海诸岛
  4. 淘宝商家如何在得物做推广?得物推广有效果吗?
  5. 如何创建dblink
  6. R语言分析财收与税收的线性回归关系
  7. 自发和诱发电生理活动之间的动态关系
  8. CentOS7-samba文件共享服务
  9. 奥利给,redis项目中初体验
  10. 【wxPython】wxPython之窗口操作