基于Confluent.Kafka实现的Kafka客户端操作类使用详解
一、引言
有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续。今天正好是周末,有点时间,来写新东西吧。最近公司用了Kafka做为消息的中间件,最开始写的那个版本不是很好,我就要来优化它,所以就抽了一些时间来研究Kafka。很多概念性的东西就不写了,今天主要是上干货,主要是代码,今天就把Kafka的消费者和生产者的代码贴出来,以供大家参考,当然这个是代码样板,最后我也会把地址贴出来。以后有时间我会把我自己实现的Kafka消息的生产者和消费者的代码贴出来。好了,话不多说,言归正传。
说明一点,如果想调试这里的代码,必须引入Confluent.Kafka这个dll才可以,直接在Visual Studio 项目的 Nuget 里面可以查找,直接安装就可以了。
二、消息的生产者(Kafka消息的Producer)
大多数的消息中间件都包含三个部分,一个是消息的生产者,一个是存放消息的队列,另外一个就是消息的消费者,我们就按着这个顺序,我就先把消息生产者的代码写出来。直接上代码,其实不是很难,里面有很多备注,只要有基本的概念理解起来还是很容易的。
第一个版本,同步版本!
1 using System; 2 using System.IO; 3 using System.Text; 4 using System.Collections.Generic; 5 using Confluent.Kafka; 6 using Confluent.Kafka.Serialization; 7 8 9 namespace Confluent.Kafka.Examples.Producer 10 { 11 public class Program 12 { 13 public static void Main(string[] args) 14 { 15 if (args.Length != 2) 16 { 17 Console.WriteLine("Usage: .. brokerList topicName"); 18 return; 19 } 20 21 string brokerList = args[0]; 22 string topicName = args[1]; 23 24 var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; 25 26 using (var producer = new Producer<string, string>(config, new StringSerializer(Encoding.UTF8), new StringSerializer(Encoding.UTF8))) 27 { 28 var cancelled = false; 29 Console.CancelKeyPress += (_, e) => { 30 e.Cancel = true; // 阻止进程退出 31 cancelled = true; 32 }; 33 34 while (!cancelled) 35 { 36 Console.Write("> "); 37 38 string text; 39 try 40 { 41 text = Console.ReadLine(); 42 } 43 catch (IOException) 44 { 45 // IO 异常抛出的时候设置此值ConsoleCancelEventArgs.Cancel == true. 46 break; 47 } 48 if (text == null) 49 { 50 break; 51 } 52 53 string key = null; 54 string val = text; 55 56 // 如果指定了键和值,则拆分行. 57 int index = text.IndexOf(" "); 58 if (index != -1) 59 { 60 key = text.Substring(0, index); 61 val = text.Substring(index + 1); 62 } 63 64 // 在下面的异步生产请求上调用.Result会导致它阻塞,直到它完成。 通常,您应该避免同步生成,因为这会对吞吐量产生巨大影响。对于这个交互式控制台的例子,这是我们想要的。 65 var deliveryReport = producer.ProduceAsync(topicName, key, val).Result; 66 Console.WriteLine( 67 deliveryReport.Error.Code == ErrorCode.NoError 68 ? "delivered to: "+deliveryReport.TopicPartitionOffset 69 : "failed to deliver message: "+deliveryReport.Error.Reason 70 ); 71 } 72 73 // 由于我们是同步的生产消息,此时不会有消息在传输并且也不需要等待消息到达的确认应答, 销毁生产者之前我们是不需要调用 producer.Flush 方法, 就像正常使用一样。 74 } 75 } 76 } 77 }
第二个版本,异步版本,推荐使用
1 using System; 2 using System.IO; 3 using System.Text; 4 using System.Collections.Generic; 5 using Confluent.Kafka; 6 using Confluent.Kafka.Serialization; 7 8 9 namespace Confluent.Kafka.Examples.Producer 10 { 11 public class Program 12 { 13 public static void Main(string[] args) 14 { 15 if (args.Length != 2) 16 { 17 Console.WriteLine("Usage: .. brokerList topicName"); 18 return; 19 } 20 21 string brokerList = args[0]; 22 string topicName = args[1]; 23 string message="我就是要传输的消息内容"; 24 25 //这是以异步方式生产消息的代码实例 26 var config = new Dictionary<string, object> { { "bootstrap.servers", brokerList } }; 27 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 28 { 29 var deliveryReport = producer.ProduceAsync(topicName, null, message); 30 deliveryReport.ContinueWith(task => 31 { 32 Console.WriteLine("Producer: "+producer.Name+"\r\nTopic: "+topicName+"\r\nPartition: "+task.Result.Partition+"\r\nOffset: "+task.Result.Offset); 33 }); 34 35 producer.Flush(TimeSpan.FromSeconds(10)); 36 } 37 } 38 } 39 }
好了,上面给出了两个版本的消息生产者的代码,一个是同步版本,第二个是异步版本的,推荐使用异步版本的代码实现。
三、消息的消费者(Kafka消息的Consumer)
在消息的生产者中已经说明了消息中间件的三个部分,第一个是消息的生产者,没有消息的生产者,就没有消息的消费者了,巧妇难为无米之炊吧。在上一节我们已经写了消息生产者的代码,这一节,我们主要来贴出消息消费者的代码。代码同样很简单,注释也很全。
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using Confluent.Kafka.Serialization; 6 7 8 /// <summary> 9 /// 演示如何使用Consumer客户端. 10 /// </summary> 11 namespace Confluent.Kafka.Examples.Consumer 12 { 13 public class Program 14 { 15 /// <summary> 16 // 在这个例子中: 17 /// - offsets 是自动提交的。 18 /// - consumer.Poll / OnMessage 是用于消息消费的。 19 /// - 没有为轮询循环创建(Poll)二外的线程,当然可以创建 20 /// </summary> 21 public static void Run_Poll(string brokerList, List<string> topics) 22 { 23 var config = new Dictionary<string, object> 24 { 25 { "bootstrap.servers", brokerList }, 26 { "group.id", "csharp-consumer" }, 27 { "enable.auto.commit", true }, // 默认值 28 { "auto.commit.interval.ms", 5000 }, 29 { "statistics.interval.ms", 60000 }, 30 { "session.timeout.ms", 6000 }, 31 { "auto.offset.reset", "smallest" } 32 }; 33 34 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 35 { 36 // 注意: 所有事件处理程序的执行都是在主线程中执行的,就是同步的。 37 38 //当成功消费了消息就会触发该事件 39 consumer.OnMessage += (_, msg) => Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value); 40 41 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset); 42 43 //当然发生了严重错误,比如,连接丢失或者Kafka服务器无效就会触发该事件 44 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error); 45 46 //当反序列化有错误,或者消费的过程中发生了错误,即error != NoError,就会触发该事件 47 consumer.OnConsumeError += (_, msg) 48 => Console.WriteLine("Error consuming from topic/partition/offset "+msg.Topic+"/"+msg.Partition+"/"+msg.Offset+": "+msg.Error); 49 50 //成功提交了Offsets会触发该事件 51 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets: "+commit.Error : "Successfully committed offsets: "+commit.Offsets); 52 53 // 当消费者被分配一组新的分区时触发该事件 54 consumer.OnPartitionsAssigned += (_, partitions) => 55 { 56 Console.WriteLine("Assigned partitions:"+partitions+" "+member id: "+consumer.MemberId); 57 // 如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你这样做,你必须明确地调用.Assign以便消费者开始消费消息。 58 //开始从分区中消息消息 59 consumer.Assign(partitions); 60 }; 61 62 // 当消费者的当前分区集已被撤销时引发该事件。 63 consumer.OnPartitionsRevoked += (_, partitions) => 64 { 65 Console.WriteLine("Revoked partitions:"+partitions); 66 // 如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你这样做了,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。 67 68 //停止从分区中消费消息 69 consumer.Unassign(); 70 }; 71 72 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json); 73 74 consumer.Subscribe(topics); 75 76 Console.WriteLine("Subscribed to:"+consumer.Subscription); 77 78 var cancelled = false; 79 Console.CancelKeyPress += (_, e) => { 80 e.Cancel = true; // 组织进程退出 81 cancelled = true; 82 }; 83 84 Console.WriteLine("Ctrl-C to exit."); 85 while (!cancelled) 86 { 87 consumer.Poll(TimeSpan.FromMilliseconds(100)); 88 } 89 } 90 } 91 92 /// <summary> 93 /// 在这实例中 94 /// - offsets 是手动提交的。 95 /// - consumer.Consume方法用于消费消息 96 /// (所有其他事件仍由事件处理程序处理) 97 /// -没有为了 轮询(消耗)循环 创建额外的线程。 98 /// </summary> 99 public static void Run_Consume(string brokerList, List<string> topics) 100 { 101 var config = new Dictionary<string, object> 102 { 103 { "bootstrap.servers", brokerList }, 104 { "group.id", "csharp-consumer" }, 105 { "enable.auto.commit", false }, 106 { "statistics.interval.ms", 60000 }, 107 { "session.timeout.ms", 6000 }, 108 { "auto.offset.reset", "smallest" } 109 }; 110 111 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 112 { 113 // 注意:所有事件处理都是在主线程中处理的,也就是说同步的 114 115 consumer.OnPartitionEOF += (_, end) 116 => Console.WriteLine("Reached end of topic "+end.Topic+" partition "+end.Partition+", next message will be at offset "+end.Offset); 117 118 consumer.OnError += (_, error)=> Console.WriteLine("Error: "+error); 119 120 // 当反序列化有错误,或者消费的过程中发生了错误,即error != NoError,就会触发该事件 121 consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error: "+error); 122 123 // 当消费者被分配一组新的分区时触发该事件 124 consumer.OnPartitionsAssigned += (_, partitions) => 125 { 126 Console.WriteLine("Assigned partitions:"+partitions+" "+member id: "+consumer.MemberId); 127 // 如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你这样做,你必须明确地调用.Assign以便消费者开始消费消息。 128 //开始从分区中消息消息 129 consumer.Assign(partitions); 130 }; 131 132 // 当消费者的当前分区集已被撤销时引发该事件。 133 consumer.OnPartitionsRevoked += (_, partitions) => 134 { 135 Console.WriteLine("Revoked partitions:"+partitions); 136 // 如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你这样做了,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。 137 138 //停止从分区中消费消息 139 consumer.Unassign(); 140 }; 141 142 consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: "+json); 143 144 consumer.Subscribe(topics); 145 146 Console.WriteLine("Started consumer, Ctrl-C to stop consuming"); 147 148 var cancelled = false; 149 Console.CancelKeyPress += (_, e) => { 150 e.Cancel = true; // 防止进程退出 151 cancelled = true; 152 }; 153 154 while (!cancelled) 155 { 156 if (!consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromMilliseconds(100))) 157 { 158 continue; 159 } 160 161 Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value); 162 163 if (msg.Offset % 5 == 0) 164 { 165 var committedOffsets = consumer.CommitAsync(msg).Result; 166 Console.WriteLine("Committed offset: "+committedOffsets); 167 } 168 } 169 } 170 } 171 172 /// <summary> 173 /// 在这个例子中 174 /// - 消费者组功能(即.Subscribe +offset提交)不被使用。 175 /// - 将消费者手动分配给分区,并始终从特定偏移量(0)开始消耗。 176 /// </summary> 177 public static void Run_ManualAssign(string brokerList, List<string> topics) 178 { 179 var config = new Dictionary<string, object> 180 { 181 // 即使您不打算使用任何使用者组功能,也必须在创建使用者时指定group.id属性。 182 { "group.id", new Guid().ToString() }, 183 { "bootstrap.servers", brokerList }, 184 // 即使消费者没有订阅该组,也可以将分区偏移量提交给一个组。 在这个例子中,自动提交被禁用以防止发生这种情况。 185 { "enable.auto.commit", false } 186 }; 187 188 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 189 { 190 //总是从0开始消费 191 consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList()); 192 193 // 引发严重错误,例如 连接失败或所有Kafka服务器失效。 194 consumer.OnError += (_, error) => Console.WriteLine("Error: "+error); 195 196 // 这个事件是由于在反序列化出现错误,或者在消息消息的时候出现错误,也就是 error != NoError 的时候引发该事件 197 consumer.OnConsumeError += (_, error) => Console.WriteLine("Consume error: "+error); 198 199 while (true) 200 { 201 if (consumer.Consume(out Message<Ignore, string> msg, TimeSpan.FromSeconds(1))) 202 { 203 Console.WriteLine("Topic: "+msg.Topic+" Partition: "+msg.Partition+" Offset: "+msg.Offset+" "+msg.Value); 204 } 205 } 206 } 207 } 208 209 private static void PrintUsage()=> Console.WriteLine("Usage: .. <poll|consume|manual> <broker,broker,..> <topic> [topic..]"); 210 211 public static void Main(string[] args) 212 { 213 if (args.Length < 3) 214 { 215 PrintUsage(); 216 return; 217 } 218 219 var mode = args[0]; 220 var brokerList = args[1]; 221 var topics = args.Skip(2).ToList(); 222 223 switch (mode) 224 { 225 case "poll": 226 Run_Poll(brokerList, topics); 227 break; 228 case "consume": 229 Run_Consume(brokerList, topics); 230 break; 231 case "manual": 232 Run_ManualAssign(brokerList, topics); 233 break; 234 default: 235 PrintUsage(); 236 break; 237 } 238 } 239 } 240 }
以上代码也有两个版本,第一个版本是自动提交Offset,第二个版本是人工提交Offset,但是代码没有分开写,只是不同的版本用了不同的方法。
四、结束
好了,今天就写到这里了,这是一个引子,所有代码都是真实有效的,我已经全部测试过,所以大家可以放心使用或者改造成自己的消息的生产者和消息消费者的实现。原文的地址如下,https://github.com/confluentinc/confluent-kafka-dotnet/tree/master/examples ,内容差不多。不忘初心,继续努力吧。
转载于:https://www.cnblogs.com/PatrickLiu/p/9247601.html
基于Confluent.Kafka实现的Kafka客户端操作类使用详解相关推荐
- Java File类(文件操作类)详解
在 Java 中,File 类是 java.io 包中唯一代表磁盘文件本身的对象,也就是说,如果希望在程序中操作文件和目录,则都可以通过 File 类来完成.File 类定义了一些方法来操作文件,如新 ...
- PHP数据库操作类ADODB 详解
2019独角兽企业重金招聘Python工程师标准>>> 17.4.4 ADODB的查询方法 ADODB的查询方法如下. 1.直接查询 Excute()方法通过连接句柄执行SQL查询, ...
- php node 目录,node.js基于fs模块对系统文件及目录进行读写操作的方法详解
本文主要介绍了node.js基于fs模块对系统文件及目录进行读写操作的方法,结合实例形式分析了nodejs使用fs模块针对文件与目录的读写.创建.删除等相关操作技巧,需要的朋友可以参考下. 如果要用这 ...
- pdo mysql 绑定查询_php mysql PDO 查询操作的实例详解
php mysql PDO 查询操作的实例详解 php mysql PDO 查询操作的实例详解 这篇文章主要介绍了php mysql PDO 查询操作的实例详解的相关资料,希望通过本文能帮助到大家,需 ...
- 搬砖:新一代基于UDP的低延时网络传输层协议——QUIC详解
技术扫盲:新一代基于UDP的低延时网络传输层协议--QUIC详解 本文来自腾讯资深研发工程师罗成的技术分享,主要介绍 QUIC 协议产生的背景和核心特性等. 1.写在前面 如果你的 App,在不需要任 ...
- MySQL操作mysqldump命令详解
MySQL操作mysqldump命令详解 基本命令: # 数据库备份-->mysqldump命令默认做锁表操作 mysqldump -uroot -ppassword --all-databas ...
- C# ManagementObjectSearcher操作window案例详解
C# ManagementObjectSearcher操作window案例详解* 前言: 我们在很多情况下想要获得计算机的硬件或操作系统的信息,比如获得CPU序列号给自己的软件添加机器码锁绑定指定电脑 ...
- Android客户端实现注册/登录详解(一)
前言 我们在开发安卓App时难免要与服务器打交道,尤其是对于用户账号信息的注册与登录更是每个android开发人员必须掌握的技能,本文将对客户端的注册/登录功能的实现进行分析,不到之处还请指出. 在这 ...
- matlab对图像操作函数的详解(笔记1)
matlab对图像操作函数的详解 一. 读写图像文件 1. imread imread函数用于读入各种图像文件,如:a=imread('e:\w01.tif') 注:计算机E盘上要有w01相应的.ti ...
最新文章
- 单片机音频节奏灯_用C51写的单片机音乐彩灯程序
- 软件测试方法单元测试例子,service单元测试例子
- 0基础学python看什么书-0基础学Python入门书籍应该看什么?
- 隐藏UITableView当没有数据或数据不够的时候出现的分割线.
- 单通道图像保存_3D图像展示篇结构更清晰
- 软件测试——检查代码
- svn server启动报错:The HTTP service failed to start
- 日常折腾日记:手动配置UG二次开发环境——使用NX Open C++和Visual Studio
- dropbox无法访问后国内网盘对比选择
- css所有缩写属性,CSS常见属性缩写与全写对比
- Photoshop菜单中英文对照表
- Flink/Hbase 异常 - 4.Sink 背压100% 与 hbase.util.RetryCounter.sleepUntilNextRetry 异常分析与排查
- 川普无法结束俄罗斯对美国的软件战争
- 使用Charles进行手机抓包
- 天生变态狂:TED心理学家的脑犯罪之旅
- matlab 创建一个X矩阵,里面的变量从x1、x2...xn
- foxmail 发不出邮件,被电脑管家云查杀引擎检测出带有病毒:Win32.Trojan.Agent.hryf
- ES集群搭建主机规划
- IINA 1.1.0beta1中文版 - Mac最强万能视频播放器
- pixhawk 自定义uart的应用
热门文章
- php session 域,PHP session 跨子域问题总结
- 重磅!公开基于“内心对话”的EEG脑机接口数据集,助力语音意念控制研究
- 机器学习,满足人类情感:如何帮助电脑监控你的精神状态
- UE4全景插件Nvidia Ansel Photography
- oculus rift 开发入门
- 基于Android移动终端的微型餐饮管理系统的设计与实现2-侧滑菜单
- matlab蚁群算法 路径规划,基于蚁群算法的机器人路径规划MATLAB源码
- 这款AI语音模型让派大星承认自己是钢铁侠,造假小扎对口型,火到挤爆服务器|在线可玩...
- 波士顿动力机器狗测评来了!售价堪比豪车,避障、导航、舞蹈样样都行,买不起还能租...
- android--------Socket的简单了解