1. 定义消费者
using KafkaHelper.Config;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Confluent.Kafka;namespace KafkaHelper
{public class KafkaConsumer{public IOptionsMonitor<KafkaConfig> _kafkaconfig;public KafkaConsumer(IOptionsMonitor<KafkaConfig> kafkaconfig){_kafkaconfig = kafkaconfig;}public void consumerMessage(List<string> topics, CancellationToken cancellationToken){var consumerConfig = new ConsumerConfig(){BootstrapServers = $"{_kafkaconfig.CurrentValue.Host}:{_kafkaconfig.CurrentValue.Port}",EnableAutoCommit = false,SessionTimeoutMs = 10000,GroupId = "csharp-consumer",AutoOffsetReset = AutoOffsetReset.Earliest,HeartbeatIntervalMs = 1000,//发送心跳,告知消费者还存活,避免rebalanceMaxPollIntervalMs = 100000 //在这个时间内,假如消费者没有执行pull,拉取消息则认为消费者挂了,会触发Rebalance};using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig)// Note: All handlers are called on the main .Consume thread..SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")).SetStatisticsHandler((_, json) => Console.WriteLine($"Statistics: {json}")).SetPartitionsAssignedHandler((c, partitions) =>{//注册的回调函数,当有分区分配的时候调用// Since a cooperative assignor (CooperativeSticky) has been configured, the// partition assignment is incremental (adds partitions to any existing assignment).Console.WriteLine("Partitions incrementally assigned: [" +string.Join(',', partitions.Select(p => p.Partition.Value)) +"], all: [" +string.Join(',', c.Assignment.Concat(partitions).Select(p => p.Partition.Value)) +"]");// Possibly manually specify start offsets by returning a list of topic/partition/offsets// to assign to, e.g.:// return partitions.Select(tp => new TopicPartitionOffset(tp, externalOffsets[tp]));}).SetPartitionsRevokedHandler((c, partitions) =>{//失去分区时候调用// Since a cooperative assignor (CooperativeSticky) has been configured, the revoked// assignment is incremental (may remove only some partitions of the current assignment).var remaining = c.Assignment.Where(atp => partitions.Where(rtp => rtp.TopicPartition == atp).Count() == 0);Console.WriteLine("Partitions incrementally revoked: [" +string.Join(',', partitions.Select(p => p.Partition.Value)) +"], remaining: [" +string.Join(',', remaining.Select(p => p.Partition.Value)) +"]");}).SetPartitionsLostHandler((c, partitions) =>{//失去分区时候调用// The lost partitions handler is called when the consumer detects that it has lost ownership// of its assignment (fallen out of the group).Console.WriteLine($"Partitions were lost: [{string.Join(", ", partitions)}]");}).Build()){consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);if (consumeResult.IsPartitionEOF){Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");try{// Store the offset associated with consumeResult to a local cache. Stored offsets are committed to Kafka by a background thread every AutoCommitIntervalMs. // The offset stored is actually the offset of the consumeResult + 1 since by convention, committed offsets specify the next message to consume. // If EnableAutoOffsetStore had been set to the default value true, the .NET client would automatically store offsets immediately prior to delivering messages to the application. // Explicitly storing offsets after processing gives at-least once semantics, the default behavior does not.//consumer.StoreOffset(consumeResult);consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine($"Store Offset error: {e.Error.Reason}");}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}}//固定消费者,这个消费者通过assingn 分区,让消费者消费特定的分区public void Run_ManualAssign(string brokerList, List<string> topics, CancellationToken cancellationToken){var config = new ConsumerConfig{// the group.id property must be specified when creating a consumer, even // if you do not intend to use any consumer group functionality.GroupId = "groupid-not-used-but-mandatory",BootstrapServers = brokerList,// partition offsets can be committed to a group even by consumers not// subscribed to the group. in this example, auto commit is disabled// to prevent this from occurring.EnableAutoCommit = false};using (var consumer =new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")).Build()){consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 0, Offset.Beginning)).ToList());try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);// Note: End of partition notification has not been enabled, so// it is guaranteed that the ConsumeResult instance corresponds// to a Message, and not a PartitionEOF event.Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: ${consumeResult.Message.Value}");}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}}}}
}

Run_ManualAssign 方法是指定消费者消费的分区,另外一个则是正常的订阅
2. 定义hostservice

using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;namespace KafkaHelper
{public class Service : BackgroundService{private readonly IServiceScopeFactory _scopeFactory;private readonly IServiceScope _serviceScope;public Service(IServiceScopeFactory scopeFactory){_scopeFactory = scopeFactory;_serviceScope = _scopeFactory.CreateScope();}protected override Task ExecuteAsync(CancellationToken stoppingToken){KafkaConsumer consumer = _serviceScope.ServiceProvider.GetService<KafkaConsumer>();Task.Run(() =>{consumer.consumerMessage(new List<string>() { "corekafka" }, stoppingToken);});return Task.CompletedTask;}public override void Dispose(){base.Dispose();_serviceScope.Dispose();}}
}
  1. 注册service
builder.Services.Configure<KafkaConfig>(builder.Configuration.GetSection("KafkaConfig"));builder.Services.AddSingleton<KafkaProducer>();
builder.Services.AddSingleton<KafkaConsumer>();builder.Services.AddHostedService<Service>();

源码
https://github.com/xdqt/asp.net-core-efcore-jwt-middleware/tree/master/CoreKafka

https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/examples/Consumer/Program.cs

.net core confluent kafka消费者相关推荐

  1. 基于Confluent.Kafka实现的KafkaConsumer消费者类和KafkaProducer消息生产者类型

    一.引言 研究Kafka有一段时间了,略有心得,基于此自己就写了一个Kafka的消费者的类和Kafka消息生产者的类,进行了单元测试和生产环境的测试,还是挺可靠的. 二.源码 话不多说,直接上代码,代 ...

  2. .net core 中使用confluent kafka构建生产者

    创建.net 6 API 安装依赖包 创建kafka生产者 using System; using System.Collections.Generic; using System.Linq; usi ...

  3. Kafka学习征途:.NET Core操作Kafka

    [Kafka]| 总结/Edison Zhou 1可用的Kafka .NET客户端 作为一个.NET Developer,自然想要在.NET项目中集成Kafka实现发布订阅功能.那么,目前可用的Kaf ...

  4. 基于Confluent.Kafka实现的Kafka客户端操作类使用详解

    一.引言 有段时间没有写东西了,当然不是没得写,还有MongoDB的系列没有写完呢,那个系列还要继续.今天正好是周末,有点时间,来写新东西吧.最近公司用了Kafka做为消息的中间件,最开始写的那个版本 ...

  5. .Net Core 集成 Kafka

    最近维护的一个系统并发有点高,所以想引入一个消息队列来进行削峰.考察了一些产品,最终决定使用kafka来当做消息队列.以下是关于kafka的一些知识的整理笔记. kafka kafka 是分布式流式平 ...

  6. kafka消费者接收分区测试

    [README] 本文演示了当有新消费者加入组后,其他消费者接收分区情况: 本文还模拟了 broker 宕机的情况: 本文使用的是最新的 kafka3.0.0 : 本文测试案例,来源于 消费者接收分区 ...

  7. (转)Kafka 消费者 Java 实现

    转自: Kafka 消费者 Java 实现 - 简书应用程序使用 KafkaConsumer向 Kafka 订阅 Topic 接收消息,首先理解 Kafka 中消费者(consumer)和消费者组(c ...

  8. apache kafka_Apache Kafka消费者再平衡

    apache kafka 消费者重新平衡决定哪个消费者负责某个主题的所有可用分区的哪个子集. 例如,您可能有一个包含20个分区和10个使用者的主题. 在重新平衡结束时,您可能希望每个使用者都从2个分区 ...

  9. kafka基础篇(四)——kafka消费者客户端

    一.入门程序 先上代码,从代码入手,讲解kafka消费者客户端的细节. public class HelloKafkaConsumer {public static void main(String[ ...

最新文章

  1. 如何用node开发自己的cli工具
  2. RecyclerView横向滑动与ViewPager冲突问题
  3. 方向对了?MIT新研究:GPT-3和人类大脑处理语言的方式惊人相似
  4. python请求url非阻塞_Tornado请求中的非阻塞/异步URL获取
  5. Imageloader5-ImageLoader的变量初始化
  6. Windows Tftpd32 DHCP服务器 使用
  7. Marketing Cloud的语音输入功能
  8. Java基础知识编写一个HelloWorld案例
  9. Hibernate初次搭建与第一个测试例子
  10. bzoj 2761 平衡树
  11. wireshark常用选项与功能总结【10分钟成为抓包大师】
  12. quartz java 实现_Quartz使用-入门使用(java定时任务实现)
  13. Arcgis 如何将大量数据转换成csv导出
  14. MyBatis的常见面试题
  15. html 外联 变 内联,Html 内联元素、外联元素 和 可变元素
  16. 电脑接收文件被360安全卫士错当成木马病毒怎么恢复
  17. 单调队列优化dp--bzoj5185: [Usaco2018 Jan]Lifeguards
  18. 国外的码农是什么状态?硅谷程序员:不加班,不穿女装
  19. 网易云 -微信小程序-vue
  20. Hbase 热点问题3种解决方案

热门文章

  1. S2S (sequence to sequence) 算法理解
  2. 百度easydl代码集成移植到安卓
  3. 智能红外额温枪芯片PCBA解决方案
  4. (五)Landat_5 TM 遥感影像计算NDVI、MNDWI、NDBI以及地表温度反演
  5. 新安装WIN10网卡未驱动,驱动人生网卡版等软件不能成功安装网卡的解决办法...
  6. ADI的CCES在WINDOWS10下的注册
  7. 中国知名企业ERP失败案例分析
  8. opencv Grabcut-抠图
  9. Java 实现人脸识别登录、注册等功能【附源码】
  10. storybook使用教程_如何使用Storybook构建React开发游乐场