基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型
一、引言
研究Kafka有一段时间了,略有心得,基于此自己就写了一个Kafka的消费者的类和Kafka消息生产者的类,进行了单元测试和生产环境的测试,还是挺可靠的。
二、源码
话不多说,直接上代码,代码不是很难,注释很全,希望大家多多发表意见,继续提升。
1 /// <summary> 2 /// Kafka消息消费者接口 3 /// </summary> 4 public interface IKafkaConsumer 5 { 6 /// <summary> 7 /// 指定的组别的消费者开始消费指定主题的消息 8 /// </summary> 9 /// <param name="broker">Kafka消息服务器的地址</param> 10 /// <param name="topic">Kafka消息所属的主题</param> 11 /// <param name="groupID">Kafka消费者所属的组别</param> 12 /// <param name="action">可以对已经消费的消息进行相关处理</param> 13 void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null); 14 }
以上类型是接口定义,这个类定义的抽象类,可以重复使用相关的代码定义其中,但是目前这两个方法没有使用。
1 /// <summary> 2 /// Kafka抽象基类,提供公共接口实现 3 /// </summary> 4 public abstract class KafkaBase 5 { 6 /// <summary> 7 /// 获取Kafka服务器地址 8 /// </summary> 9 /// <param name="brokerNameKey">配置文件中Broker服务器地址的key的名称</param> 10 /// <returns>返回获取到的Kafka服务器的地址明细</returns> 11 public string GetKafkaBroker(string brokerNameKey = "Broker") 12 { 13 string kafkaBroker = string.Empty; 14 15 if (!ConfigurationManager.AppSettings.AllKeys.Contains(brokerNameKey)) 16 { 17 kafkaBroker = "http://localhost:9092"; 18 } 19 else 20 { 21 kafkaBroker = ConfigurationManager.AppSettings[brokerNameKey]; 22 } 23 return kafkaBroker; 24 } 25 26 /// <summary> 27 /// 在配置文件中获取系统中已经生成的主题名称 28 /// </summary> 29 /// <param name="topicNameKey">配置文件中主题的key名称</param> 30 /// <returns>返回获取到的主题的具体值</returns> 31 public string GetTopicName(string topicNameKey = "Topic") 32 { 33 string topicName = string.Empty; 34 35 if (!ConfigurationManager.AppSettings.AllKeys.Contains(topicNameKey)) 36 { 37 throw new Exception("Key \"" + topicNameKey + "\" not found in Config file -> configuration/AppSettings"); 38 } 39 else 40 { 41 topicName = ConfigurationManager.AppSettings[topicNameKey]; 42 } 43 return topicName; 44 } 45 }
还有一个用于数据传递的工具类,代码如下:
1 /// <summary> 2 /// Kafka消息消费者设置对象,提供Kafka消费消息的参数对象(Consumer.Consum) 3 /// </summary> 4 public sealed class ConsumerSetting 5 { 6 /// <summary> 7 /// Kafka消息服务器的地址 8 /// </summary> 9 public string Broker { get; set; } 10 11 /// <summary> 12 /// Kafka消息所属的主题 13 /// </summary> 14 public string Topic { get; set; } 15 16 /// <summary> 17 /// Kafka消息消费者分组主键 18 /// </summary> 19 public string GroupID { get; set; } 20 21 /// <summary> 22 /// 消费消息后可以执行的方法 23 /// </summary> 24 public Action<ConsumerResult> Action { get; set; } 25 }
我们可以对消息进行消费,该类型用于对消息进行整理,代码如下:
1 /// <summary> 2 /// 已经消费的消息的详情信息 3 /// </summary> 4 public sealed class ConsumerResult 5 { 6 /// <summary> 7 /// Kafka消息服务器的地址 8 /// </summary> 9 public string Broker { get; set; } 10 11 /// <summary> 12 /// Kafka消息所属的主题 13 /// </summary> 14 public string Topic { get; set; } 15 16 /// <summary> 17 /// Kafka消息消费者分组主键 18 /// </summary> 19 public string GroupID { get; set; } 20 21 /// <summary> 22 /// 我们需要处理的消息具体的内容 23 /// </summary> 24 public string Message { get; set; } 25 26 /// <summary> 27 /// Kafka数据读取的当前位置 28 /// </summary> 29 public long Offset { get; set; } 30 31 /// <summary> 32 /// 消息所在的物理分区 33 /// </summary> 34 public int Partition { get; set; } 35 }
最后阶段了,该是消费者的代码了,代码如下:
1 /// <summary> 2 /// Kafka消息消费者 3 /// </summary> 4 public sealed class KafkaConsumer : KafkaBase, IKafkaConsumer 5 { 6 #region 私有字段 7 8 private bool isCancelled; 9 10 #endregion 11 12 #region 构造函数 13 14 /// <summary> 15 /// 构造函数,初始化IsCancelled属性 16 /// </summary> 17 public KafkaConsumer() 18 { 19 isCancelled = false; 20 } 21 22 #endregion 23 24 #region 属性 25 26 /// <summary> 27 /// 是否应该取消继续消费Kafka的消息,默认值是false,继续消费消息 28 /// </summary> 29 public bool IsCancelled 30 { 31 get { return isCancelled; } 32 set { isCancelled = value; } 33 } 34 35 #endregion 36 37 #region 同步版本 38 39 /// <summary> 40 /// 指定的组别的消费者开始消费指定主题的消息 41 /// </summary> 42 /// <param name="broker">Kafka消息服务器的地址</param> 43 /// <param name="topic">Kafka消息所属的主题</param> 44 /// <param name="groupID">Kafka消费者所属的组别</param> 45 /// <param name="action">可以对已经消费的消息进行相关处理</param> 46 public void Consume(string broker, string topic, string groupID, Action<ConsumerResult> action = null) 47 { 48 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 49 { 50 throw new ArgumentNullException("Kafka消息服务器的地址不能为空!"); 51 } 52 53 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 54 { 55 throw new ArgumentNullException("消息所属的主题不能为空!"); 56 } 57 58 if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0) 59 { 60 throw new ArgumentNullException("用户分组ID不能为空!"); 61 } 62 63 var config = new Dictionary<string, object> 64 { 65 { "bootstrap.servers", broker }, 66 { "group.id", groupID }, 67 { "enable.auto.commit", true }, // this is the default 68 { "auto.commit.interval.ms", 5000 }, 69 { "statistics.interval.ms", 60000 }, 70 { "session.timeout.ms", 6000 }, 71 { "auto.offset.reset", "smallest" } 72 }; 73 74 75 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 76 { 77 // Note: All event handlers are called on the main thread. 78 //consumer.OnMessage += (_, message) => Console.WriteLine("Topic:" + message.Topic + " Partition:" + message.Partition + " Offset:" + message.Offset + " " + message.Value); 79 //consumer.OnMessage += (_, message) => Console.WriteLine("Offset:【" + message.Offset + "】Message:【" + message.Value + "】"); 80 if (action != null) 81 { 82 consumer.OnMessage += (_, message) => { 83 ConsumerResult messageResult = new ConsumerResult(); 84 messageResult.Broker = broker; 85 messageResult.Topic = message.Topic; 86 messageResult.Partition = message.Partition; 87 messageResult.Offset = message.Offset.Value; 88 messageResult.Message = message.Value; 89 90 //执行外界自定义的方法 91 action(messageResult); 92 }; 93 } 94 95 consumer.OnPartitionEOF += (_, end) => Console.WriteLine("Reached end of topic " + end.Topic + " partition " + end.Partition + ", next message will be at offset " + end.Offset); 96 97 consumer.OnError += (_, error) => Console.WriteLine("Error:" + error); 98 99 //引发反序列化错误或消费消息出现错误!= NoError。 100 consumer.OnConsumeError += (_, message) => Console.WriteLine("Error consuming from topic/partition/offset " + message.Topic + "/" + message.Partition + "/" + message.Offset + ": " + message.Error); 101 102 consumer.OnOffsetsCommitted += (_, commit) => Console.WriteLine(commit.Error ? "Failed to commit offsets:" + commit.Error : "Successfully committed offsets:" + commit.Offsets); 103 104 // 当消费者被分配一组新的分区时引发。 105 consumer.OnPartitionsAssigned += (_, partitions) => 106 { 107 Console.WriteLine("Assigned Partitions:" + partitions + ", Member ID:" + consumer.MemberId); 108 //如果您未向OnPartitionsAssigned事件添加处理程序,则会自动执行以下.Assign调用。 如果你为它添加了事件处理程序,你必须明确地调用.Assign以便消费者开始消费消息。 109 consumer.Assign(partitions); 110 }; 111 112 // Raised when the consumer's current assignment set has been revoked. 113 //当消费者的当前任务集已被撤销时引发。 114 consumer.OnPartitionsRevoked += (_, partitions) => 115 { 116 Console.WriteLine("Revoked Partitions:" + partitions); 117 // If you don't add a handler to the OnPartitionsRevoked event,the below .Unassign call happens automatically. If you do, you must call .Unassign explicitly in order for the consumer to stop consuming messages from it's previously assigned partitions. 118 //如果您未向OnPartitionsRevoked事件添加处理程序,则下面的.Unassign调用会自动发生。 如果你为它增加了事件处理程序,你必须明确地调用.Usessign以便消费者停止从它先前分配的分区中消费消息。 119 consumer.Unassign(); 120 }; 121 122 //consumer.OnStatistics += (_, json) => Console.WriteLine("Statistics: " + json); 123 124 consumer.Subscribe(topic); 125 126 //Console.WriteLine("Subscribed to:" + consumer.Subscription); 127 128 while (!IsCancelled) 129 { 130 consumer.Poll(TimeSpan.FromMilliseconds(100)); 131 } 132 } 133 } 134 135 #endregion 136 137 #region 异步版本 138 139 /// <summary> 140 /// 指定的组别的消费者开始消费指定主题的消息 141 /// </summary> 142 /// <param name="broker">Kafka消息服务器的地址</param> 143 /// <param name="topic">Kafka消息所属的主题</param> 144 /// <param name="groupID">Kafka消费者所属的组别</param> 145 /// <param name="action">可以对已经消费的消息进行相关处理</param> 146 public void ConsumeAsync(string broker, string topic, string groupID, Action<ConsumerResult> action = null) 147 { 148 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 149 { 150 throw new ArgumentNullException("Kafka消息服务器的地址不能为空!"); 151 } 152 153 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 154 { 155 throw new ArgumentNullException("消息所属的主题不能为空!"); 156 } 157 158 if (string.IsNullOrEmpty(groupID) || string.IsNullOrWhiteSpace(groupID) || groupID.Length <= 0) 159 { 160 throw new ArgumentNullException("用户分组ID不能为空!"); 161 } 162 163 ThreadPool.QueueUserWorkItem(KafkaAutoCommittedOffsets, new ConsumerSetting() { Broker = broker, Topic = topic, GroupID = groupID, Action=action }); 164 } 165 166 #endregion 167 168 #region 两种提交Offsets的版本 169 170 /// <summary> 171 /// Kafka消息队列服务器自动提交offset 172 /// </summary> 173 /// <param name="state">消息消费者信息</param> 174 private void KafkaAutoCommittedOffsets(object state) 175 { 176 ConsumerSetting setting = state as ConsumerSetting; 177 178 var config = new Dictionary<string, object> 179 { 180 { "bootstrap.servers", setting.Broker }, 181 { "group.id", setting.GroupID }, 182 { "enable.auto.commit", true }, // this is the default 183 { "auto.commit.interval.ms", 5000 }, 184 { "statistics.interval.ms", 60000 }, 185 { "session.timeout.ms", 6000 }, 186 { "auto.offset.reset", "smallest" } 187 }; 188 189 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 190 { 191 if (setting.Action != null) 192 { 193 consumer.OnMessage += (_, message) => 194 { 195 ConsumerResult messageResult = new ConsumerResult(); 196 messageResult.Broker = setting.Broker; 197 messageResult.Topic = message.Topic; 198 messageResult.Partition = message.Partition; 199 messageResult.Offset = message.Offset.Value; 200 messageResult.Message = message.Value; 201 202 //执行外界自定义的方法 203 setting.Action(messageResult); 204 }; 205 } 206 207 //consumer.OnStatistics += (_, json)=> Console.WriteLine("Statistics: {json}"); 208 209 //可以写日志 210 //consumer.OnError += (_, error)=> Console.WriteLine("Error:"+error); 211 212 //可以写日志 213 //consumer.OnConsumeError += (_, msg) => Console.WriteLine("Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}"); 214 215 consumer.Subscribe(setting.Topic); 216 217 while (!IsCancelled) 218 { 219 consumer.Poll(TimeSpan.FromMilliseconds(100)); 220 } 221 } 222 } 223 224 /// <summary> 225 /// Kafka消息队列服务器手动提交offset 226 /// </summary> 227 /// <param name="state">消息消费者信息</param> 228 private void KafkaManuallyCommittedOffsets(object state) 229 { 230 ConsumerSetting setting = state as ConsumerSetting; 231 232 var config = new Dictionary<string, object> 233 { 234 { "bootstrap.servers", setting.Broker }, 235 { "group.id", setting.GroupID }, 236 { "enable.auto.commit", false },//不是自动提交的 237 { "auto.commit.interval.ms", 5000 }, 238 { "statistics.interval.ms", 60000 }, 239 { "session.timeout.ms", 6000 }, 240 { "auto.offset.reset", "smallest" } 241 }; 242 243 using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8))) 244 { 245 //可以写日志 246 //consumer.OnError += (_, error) => Console.WriteLine("Error:"+error); 247 248 //可以写日志 249 // Raised on deserialization errors or when a consumed message has an error != NoError. 250 //consumer.OnConsumeError += (_, error)=> Console.WriteLine("Consume error:"+error); 251 252 consumer.Subscribe(setting.Topic); 253 254 Message<Ignore, string> message = null; 255 256 while (!isCancelled) 257 { 258 if (!consumer.Consume(out message, TimeSpan.FromMilliseconds(100))) 259 { 260 continue; 261 } 262 263 if (setting.Action != null) 264 { 265 ConsumerResult messageResult = new ConsumerResult(); 266 messageResult.Broker = setting.Broker; 267 messageResult.Topic = message.Topic; 268 messageResult.Partition = message.Partition; 269 messageResult.Offset = message.Offset.Value; 270 messageResult.Message = message.Value; 271 272 //执行外界自定义的方法 273 setting.Action(messageResult); 274 } 275 276 if (message.Offset % 5 == 0) 277 { 278 var committedOffsets = consumer.CommitAsync(message).Result; 279 //Console.WriteLine("Committed offset:"+committedOffsets); 280 } 281 } 282 } 283 } 284 285 #endregion 286 }
有了消息的消费者代码,那消息的生产者代码肯定是少不了的。代码如下:
1 /// <summary> 2 /// Kafka消息生产者 3 /// </summary> 4 public sealed class KafkaProducer : KafkaBase, IKafkaProducer 5 { 6 /// <summary> 7 /// 生产消息并发送消息 8 /// </summary> 9 /// <param name="broker">kafka的服务器地址</param> 10 /// <param name="topic">kafka的消息主题名称</param> 11 /// <param name="message">需要传送的消息</param> 12 public bool Produce(string broker, string topic, string message) 13 { 14 bool result = false; 15 if (string.IsNullOrEmpty(broker) || string.IsNullOrWhiteSpace(broker) || broker.Length <= 0) 16 { 17 throw new ArgumentNullException("Kafka消息服务器地址不能为空!"); 18 } 19 20 if (string.IsNullOrEmpty(topic) || string.IsNullOrWhiteSpace(topic) || topic.Length <= 0) 21 { 22 throw new ArgumentNullException("消息所属的主题不能为空!"); 23 } 24 25 if (string.IsNullOrEmpty(message) || string.IsNullOrWhiteSpace(message) || message.Length <= 0) 26 { 27 throw new ArgumentNullException("消息内容不能为空!"); 28 } 29 30 var config = new Dictionary<string, object> { { "bootstrap.servers", broker } }; 31 using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8))) 32 { 33 var deliveryReport = producer.ProduceAsync(topic, null, message); 34 deliveryReport.ContinueWith(task => 35 { 36 if (task.Result.Error.Code == ErrorCode.NoError) 37 { 38 result = true; 39 } 40 //可以在控制台使用以下语句 41 //Console.WriteLine("Producer:" + producer.Name + "\r\nTopic:" + topic + "\r\nPartition:" + task.Result.Partition + "\r\nOffset:" + task.Result.Offset + "\r\nMessage:" + task.Result.Value); 42 }); 43 44 producer.Flush(TimeSpan.FromSeconds(10)); 45 } 46 return result; 47 } 48 }
好了,以上就是全部代码,大家可以整理使用。
三、总结
继续学习,先使用,慢慢了解内部的细节,我已经迈出了第一步,不忘初心,继续努力。
转载于:https://www.cnblogs.com/PatrickLiu/p/9269116.html
基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型相关推荐
- 基于Confluent.Kafka实现的Kafka客户端操作类使用详解
一.引言 有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续.今天正好是周末,有点时间,来写新东西吧.最近公司用了Kafka做为消息的中间件,最开始写的那个版本 ...
- 【AIS学习】09:B类AIS收发消息类型
B类AIS收发消息的类型如表所示 消息编号 消息名称 B类"SO" AIS B类"CS" AIS 发射 接收 发射 接收 1 船位报告 不允许 接收 不允许 接 ...
- .net core confluent kafka消费者
定义消费者 using KafkaHelper.Config; using Microsoft.Extensions.Options; using System; using System.Colle ...
- Kafka 0.9 新消费者API
kafka诞生之初,它自带一个基于scala的生产者和消费者客户端.但是慢慢的我们认识到这些API有很多限制.比如,消费者有一个"高级"API支持分组和异常控制,但是不支持很多更复 ...
- Hello Kafka(八)——Confluent Kafka简介
一.Confluent Kafka简介 1.Confluent Kafka简介 2014年,Kafka的创始人Jay Kreps.NahaNarkhede和饶军离开LinkedIn创立Confluen ...
- Kafka知识总结之消费者简单使用
本文简述 这篇文件主要是讲kafka消费者相关使用,诸如,offset的使用,消费者的相关配置,多线程消费模式和springboot整合.至于这些里面涉及到原理等相关深入的知识会放到下一篇文件kafk ...
- DataPipeline联合Confluent Kafka Meetup上海站
Confluent作为国际数据"流"处理技术领先者,提供实时数据处理解决方案,在市场上拥有大量企业客户,帮助企业轻松访问各类数据.DataPipeline作为国内首家原生支持Kaf ...
- 基于Confluent+Flink的实时数据分析最佳实践
简介:在实际业务使用中,需要经常实时做一些数据分析,包括实时PV和UV展示,实时销售数据,实时店铺UV以及实时推荐系统等,基于此类需求,Confluent+实时计算Flink版是一个高效的方案. 业务 ...
- java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/List
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 本文链接: Flink入门程序异常,记录一下跟大家分享. SLF4J: Failed to l ...
最新文章
- 从对ML一窍不通到斩获AT等special offer,拿下大厂算法岗就靠它了
- 对tmemorystream的一些改进_delphi教程
- 栅格数据的像素值保存问题
- linux命令之创建符号连接-ln
- 【测试】批量删除供应商配额(Quota )
- 关于理解Perl的fork函数的一个范例
- 后端返回number类型数据_Javascript基础教程之数据类型 (数值 Number)
- LeetCode算法总结-回溯法与深度优先搜索
- android第三方推送实现,Android--利用第三方推送实现APP伪保活(小米篇)
- wincc怎么做一个弹出画面_wincc怎样弹出确认窗口?
- C++自学-默认参数的函数
- 吴恩达神经网络和深度学习-学习笔记-25-定位数据不匹配
- a标签点击不跳转的几种方法
- 智能翻译软件—人人译视界 for Mac
- 腾讯视频转码,把qlv格式转换成mp4格式
- js排序的时间复杂度_各种排序算法时间复杂度
- 金蝶K3 如何添加其它出库单出库类型
- 高校GIS房地产管理系统
- 上位机使用C++通过ADS协议与倍福PLC通信例程-通过变量名方式读写浮点数
- JavaScript - 360浏览器默认使用极速模式打开网页