安装

CentOS 安装 kafka

  • Kafka : http://kafka.apache.org/downloads

  • ZooLeeper : https://zookeeper.apache.org/releases.html

下载并解压

# 下载,并解压
$ wget https://archive.apache.org/dist/kafka/2.1.1/kafka_2.12-2.1.1.tgz
$ tar -zxvf  kafka_2.12-2.1.1.tgz
$ mv kafka_2.12-2.1.1.tgz /data/kafka# 下载 zookeeper,解压
$ wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
$ tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
$ mv apache-zookeeper-3.5.8-bin /data/zookeeper

启动 ZooKeeper

# 复制配置模版
$ cd /data/kafka/conf
$ cp zoo_sample.cfg zoo.cfg# 看看配置需不需要改
$ vim zoo.cfg# 命令
$ ./bin/zkServer.sh start    # 启动
$ ./bin/zkServer.sh status   # 状态
$ ./bin/zkServer.sh stop     # 停止
$ ./bin/zkServer.sh restart  # 重启# 使用客户端测试
$ ./bin/zkCli.sh -server localhost:2181
$ quit

启动 Kafka

# 备份配置
$ cd /data/kafka
$ cp config/server.properties config/server.properties_copy# 修改配置
$ vim /data/kafka/config/server.properties# 集群配置下,每个 broker 的 id 是必须不同的
# broker.id=0# 监听地址设置(内网)
# listeners=PLAINTEXT://ip:9092# 对外提供服务的IP、端口
# advertised.listeners=PLAINTEXT://106.75.84.97:9092# 修改每个topic的默认分区参数num.partitions,默认是1,具体合适的取值需要根据服务器配置进程确定,UCloud.ukafka = 3
# num.partitions=3# zookeeper 配置
# zookeeper.connect=localhost:2181# 通过配置启动 kafka
$  ./bin/kafka-server-start.sh  config/server.properties&# 状态查看
$ ps -ef|grep kafka
$ jps

docker下安装Kafka

docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeperdocker pull wurstmeister/kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=192.168.1.111 --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka

介绍

  • Broker:消息中间件处理节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。

  • Topic:一类消息,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。

  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。

  • Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。

  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息。

kafka partition 和 consumer 数目关系

  • 如果consumer比partition多是浪费,因为kafka的设计是在一个partition上是不允许并发的,所以consumer数不要大于partition数 。

  • 如果consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,否则会导致partition里面的数据被取的不均匀 。最好partiton数目是consumer数目的整数倍,所以partition数目很重要,比如取24,就很容易设定consumer数目 。

  • 如果consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不同

  • 增减consumer,broker,partition会导致rebalance,所以rebalance后consumer对应的partition会发生变化

快速开始

在 .NET Core 项目中安装组件

Install-Package Confluent.Kafka

开源地址:https://github.com/confluentinc/confluent-kafka-dotnet

添加IKafkaService服务接口

public interface IKafkaService
{/// <summary>/// 发送消息至指定主题/// </summary>/// <typeparam name="TMessage"></typeparam>/// <param name="topicName"></param>/// <param name="message"></param>/// <returns></returns>Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class;/// <summary>/// 从指定主题订阅消息/// </summary>/// <typeparam name="TMessage"></typeparam>/// <param name="topics"></param>/// <param name="messageFunc"></param>/// <param name="cancellationToken"></param>/// <returns></returns>Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class;
}

实现IKafkaService

public class KafkaService : IKafkaService
{public async Task PublishAsync<TMessage>(string topicName, TMessage message) where TMessage : class{var config = new ProducerConfig{BootstrapServers = "127.0.0.1:9092"};using var producer = new ProducerBuilder<string, string>(config).Build();await producer.ProduceAsync(topicName, new Message<string, string>{Key = Guid.NewGuid().ToString(),Value = message.SerializeToJson()});}public async Task SubscribeAsync<TMessage>(IEnumerable<string> topics, Action<TMessage> messageFunc, CancellationToken cancellationToken) where TMessage : class{var config = new ConsumerConfig{BootstrapServers = "127.0.0.1:9092",GroupId = "consumer",EnableAutoCommit = false,StatisticsIntervalMs = 5000,SessionTimeoutMs = 6000,AutoOffsetReset = AutoOffsetReset.Earliest,EnablePartitionEof = true};//const int commitPeriod = 5;using var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) =>{Console.WriteLine($"Error: {e.Reason}");}).SetStatisticsHandler((_, json) =>{Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > 消息监听中..");}).SetPartitionsAssignedHandler((c, partitions) =>{string partitionsStr = string.Join(", ", partitions);Console.WriteLine($" - 分配的 kafka 分区: {partitionsStr}");}).SetPartitionsRevokedHandler((c, partitions) =>{string partitionsStr = string.Join(", ", partitions);Console.WriteLine($" - 回收了 kafka 的分区: {partitionsStr}");}).Build();consumer.Subscribe(topics);try{while (true){try{var consumeResult = consumer.Consume(cancellationToken);Console.WriteLine($"Consumed message '{consumeResult.Message?.Value}' at: '{consumeResult?.TopicPartitionOffset}'.");if (consumeResult.IsPartitionEOF){Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} 已经到底了:{consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}TMessage messageResult = null;try{messageResult = JsonConvert.DeserializeObject<TMessage>(consumeResult.Message.Value);}catch (Exception ex){var errorMessage = $" - {DateTime.Now:yyyy-MM-dd HH:mm:ss}【Exception 消息反序列化失败,Value:{consumeResult.Message.Value}】 :{ex.StackTrace?.ToString()}";Console.WriteLine(errorMessage);messageResult = null;}if (messageResult != null/* && consumeResult.Offset % commitPeriod == 0*/){messageFunc(messageResult);try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine(e.Message);}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}await Task.CompletedTask;}
}

注入IKafkaService,在需要使用的地方直接调用即可。

public class MessageService : IMessageService, ITransientDependency
{private readonly IKafkaService _kafkaService;public MessageService(IKafkaService kafkaService){_kafkaService = kafkaService;}public async Task RequestTraceAdded(XxxEventData eventData){await _kafkaService.PublishAsync(eventData.TopicName, eventData);}
}

以上相当于一个生产者,当我们消息队列发出后,还需一个消费者进行消费,所以可以使用一个控制台项目接收消息来处理业务。

var cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{e.Cancel = true;cts.Cancel();
};await kafkaService.SubscribeAsync<XxxEventData>(topics, async (eventData) =>
{// Your logicConsole.WriteLine($" - {eventData.EventTime:yyyy-MM-dd HH:mm:ss} 【{eventData.TopicName}】- > 已处理");
}, cts.Token);

IKafkaService中已经写了订阅消息的接口,这里也是注入后直接使用即可。

生产者消费者示例

生产者

static async Task Main(string[] args)
{if (args.Length != 2){Console.WriteLine("Usage: .. brokerList topicName");// 127.0.0.1:9092 helloTopicreturn;}var brokerList = args.First();var topicName = args.Last();var config = new ProducerConfig { BootstrapServers = brokerList };using var producer = new ProducerBuilder<string, string>(config).Build();Console.WriteLine("\n-----------------------------------------------------------------------");Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");Console.WriteLine("-----------------------------------------------------------------------");Console.WriteLine("To create a kafka message with UTF-8 encoded key and value:");Console.WriteLine("> key value<Enter>");Console.WriteLine("To create a kafka message with a null key and UTF-8 encoded value:");Console.WriteLine("> value<enter>");Console.WriteLine("Ctrl-C to quit.\n");var cancelled = false;Console.CancelKeyPress += (_, e) =>{e.Cancel = true;cancelled = true;};while (!cancelled){Console.Write("> ");var text = string.Empty;try{text = Console.ReadLine();}catch (IOException){break;}if (string.IsNullOrWhiteSpace(text)){break;}var key = string.Empty;var val = text;var index = text.IndexOf(" ");if (index != -1){key = text.Substring(0, index);val = text.Substring(index + 1);}try{var deliveryResult = await producer.ProduceAsync(topicName, new Message<string, string>{Key = key,Value = val});Console.WriteLine($"delivered to: {deliveryResult.TopicPartitionOffset}");}catch (ProduceException<string, string> e){Console.WriteLine($"failed to deliver message: {e.Message} [{e.Error.Code}]");}}
}

消费者

static void Main(string[] args)
{if (args.Length != 2){Console.WriteLine("Usage: .. brokerList topicName");// 127.0.0.1:9092 helloTopicreturn;}var brokerList = args.First();var topicName = args.Last();Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) =>{e.Cancel = true;cts.Cancel();};var config = new ConsumerConfig{BootstrapServers = brokerList,GroupId = "consumer",EnableAutoCommit = false,StatisticsIntervalMs = 5000,SessionTimeoutMs = 6000,AutoOffsetReset = AutoOffsetReset.Earliest,EnablePartitionEof = true};const int commitPeriod = 5;using var consumer = new ConsumerBuilder<Ignore, string>(config).SetErrorHandler((_, e) =>{Console.WriteLine($"Error: {e.Reason}");}).SetStatisticsHandler((_, json) =>{Console.WriteLine($" - {DateTime.Now:yyyy-MM-dd HH:mm:ss} > monitoring..");//Console.WriteLine($"Statistics: {json}");}).SetPartitionsAssignedHandler((c, partitions) =>{Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}]");}).SetPartitionsRevokedHandler((c, partitions) =>{Console.WriteLine($"Revoking assignment: [{string.Join(", ", partitions)}]");}).Build();consumer.Subscribe(topicName);try{while (true){try{var consumeResult = consumer.Consume(cts.Token);if (consumeResult.IsPartitionEOF){Console.WriteLine($"Reached end of topic {consumeResult.Topic}, partition {consumeResult.Partition}, offset {consumeResult.Offset}.");continue;}Console.WriteLine($"Received message at {consumeResult.TopicPartitionOffset}: {consumeResult.Message.Value}");if (consumeResult.Offset % commitPeriod == 0){try{consumer.Commit(consumeResult);}catch (KafkaException e){Console.WriteLine($"Commit error: {e.Error.Reason}");}}}catch (ConsumeException e){Console.WriteLine($"Consume error: {e.Error.Reason}");}}}catch (OperationCanceledException){Console.WriteLine("Closing consumer.");consumer.Close();}
}

.NET Core 下使用 Kafka相关推荐

  1. .Net Core下如何管理配置文件

    一.前言 根据该issues来看,System.Configuration在.net core中已经不存在了,那么取而代之的是由Microsoft.Extensions.Cnfiguration.XX ...

  2. 在asp.net core 下定义统一的入参和出参格式

    在使用.net core 开发Api的过程中,为了统一输入参数的格式,并增加一些全局必须含有的字段,比如:Code,Message,Lang等等,能采取的变通方式还是有几种的,然而都不够优雅,为了需求 ...

  3. 一个.NET Core下的开源插件框架Pluginfactory

    插件模式历史悠久,各种中大型软件基本上都会实现插件机制,以此支持功能扩展,从开发部署层面,插件机制也可实现功能解耦,对于并行开发.项目部署.功能定制等都有比较大的优势. 在.NET Core下,一般我 ...

  4. .NET Core 下的爬虫利器

    爬虫大家或多或少的都应该接触过的,爬虫有风险,抓数需谨慎. 本着研究学习的目的,记录一下在 .NET Core 下抓取数据的实际案例.爬虫代码一般具有时效性,当我们的目标发生改版升级,规则转换后我们写 ...

  5. 在Asp.Net Core中集成Kafka

    在我们的业务中,我们通常需要在自己的业务子系统之间相互发送消息,一端去发送消息另一端去消费当前消息,这就涉及到使用消息队列MQ的一些内容,消息队列成熟的框架有多种,这里你可以读这篇文章来了解这些MQ的 ...

  6. SeaweedFS在.net core下的实践方案

    一直对分布式的文件储存系统很感兴趣,最开始关注淘宝的TFS(Taobao File System),好像搁浅了,官方地址无法访问,github上面,各种编译问题,无意间发现了SeaweedFS 链接s ...

  7. Net Core下多种ORM框架特性及性能对比

    在.NET Framework下有许多ORM框架,最著名的无外乎是Entity Framework,它拥有悠久的历史以及便捷的语法,在占有率上一路领先.但随着Dapper的出现,它的地位受到了威胁,本 ...

  8. .Net Core下通过Proxy 模式 使用 WCF

    .NET Core下的WCF客户端也是开源的,这次发布.NET Core 2.0,同时也发布了 WCF for .NET Core 2.0.0, 本文介绍在.NET Core下如何通过Proxy 消费 ...

  9. .NET Core下使用gRpc公开服务(SSL/TLS)

    一.前言 前一阵子关于.NET的各大公众号都发表了关于gRpc的消息,而随之而来的就是一波关于.NET Core下如何使用的教程,但是在这众多的教程中基本都是泛泛而谈,难以实际在实际环境中使用,而该篇 ...

最新文章

  1. 阿里NASA计划:城市大脑成智能研究第一平台
  2. python基础之day1
  3. 安卓系统底层C语言算法之测试参数是几个long型的算法
  4. goahead如何使用cgi服务_北斗导航系统现已提供全球服务!你知道如何使用这个服务吗?...
  5. 获取html元素的位置,如何获取页面元素的位置
  6. Windows下的定时任务
  7. 云原生 - Istio可观察性之分布式跟踪(三)
  8. Codeforces Round #318 (Div. 2) B Bear and Three Musketeers (暴力)
  9. 浅谈MaxCompute资源规划管理及评估
  10. 解构变换矩阵:如何使变换矩阵分解为位移(T),旋转(R),缩放(S)矩阵
  11. 【前端】【labelme】labelme 保存 imageData 的 base64编码机制 —— python 源码探究与 js 实现
  12. hdu 1312深搜入门题
  13. 破解基础篇之第一部分
  14. windows系统镜像文件汇总
  15. python web开发实战pdf 百度网盘_python web开发实战 pdf
  16. 2022年申请亳州市发明专利材料,专利说明书摘要写作技巧
  17. ITK-SNAP自动分割应用示例:如何进行乳腺腺体脂肪体积测量
  18. 项目质量管理控制过程的新老七种工具速记法
  19. Mybatis关联查询的两种方式
  20. 笔记本计算机没有声音是怎么回事,笔记本电脑声音没了怎么恢复_笔记本怎么没有声音-win7之家...

热门文章

  1. GitHub项目管理维护实用教程
  2. Leetcode: Single Number
  3. Process Explorer 15.2:微软增强型任务管理器
  4. js 技巧杂引(转)
  5. 昨天订了一台FSC Lifebook S6220
  6. java发送gmail_如何在Gmail中轻松通过电子邮件发送人群
  7. drools简单应用
  8. 学习RUNOOB.COM进度一
  9. CentOS7安装EPEL源
  10. Error: package or namespace load failed for ‘rJava’: