一:kafka介绍

kafka(官网地址:http://kafka.apache.org)是一种高吞吐量的分布式发布订阅的消息队列系统,具有高性能和高吞吐率。

1.1 术语介绍

Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker

Topic
主题:每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

Partition
分区:Partition是物理上的概念,每个Topic包含一个或多个Partition.(一般为kafka节点数cpu的总核数)

Producer
生产者,负责发布消息到Kafka broker

Consumer
消费者:从Kafka broker读取消息的客户端。

Consumer Group
消费者组:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
1.2 基本特性
可扩展性
在不需要下线的情况下进行扩容
数据流分区(partition)存储在多个机器上
高性能
单个broker就能服务上千客户端
单个broker每秒种读/写可达每秒几百兆字节
多个brokers组成的集群将达到非常强的吞吐能力
性能稳定,无论数据多大
Kafka在底层摒弃了Java堆缓存机制,采用了操作系统级别的页缓存,同时将随机写操作改为顺序写,再结合Zero-Copy的特性极大地改善了IO性能。
1.3 消息格式
一个topic对应一种消息格式,因此消息用topic分类
一个topic代表的消息有1个或者多个patition(s)组成
一个partition应该存放在一到多个server上,如果只有一个server,就没有冗余备份,是单机而不是集群;如果有多个server,一个server为leader(领导者),其他servers为followers(跟随者),leader需要接受读写请求,followers仅作冗余备份,leader出现故障,会自动选举一个follower作为leader,保证服务不中断;每个server都可能扮演一些partitions的leader和其它partitions的follower角色,这样整个集群就会达到负载均衡的效果
消息按顺序存放;消息顺序不可变;只能追加消息,不能插入;每个消息都有一个offset,用作消息ID, 在一个partition中唯一;offset有consumer保存和管理,因此读取顺序实际上是完全有consumer决定的,不一定是线性的;消息有超时日期,过期则删除
1.4 原理解析
producer创建一个topic时,可以指定该topic为几个partition(默认是1,配置num.partitions),然后会把partition分配到每个broker上,分配的算法是:a个broker,第b个partition分配到b%a的broker上,可以指定有每个partition有几分副本Replication,副本的分配策略为:第c个副本存储在第(b+c)%a的broker上。一个partition在每个broker上是一个文件夹,文件夹中文件的命名方式为:topic名称+有序序号。每个partition中文件是一个个的segment,segment file由.index和.log文件组成。两个文件的命名规则是,上一个segmentfile的最后一个offset。这样,可以快速的删除old文件。

producer往kafka里push数据,会自动的push到所有的分区上,消息是否push成功有几种情况:1,接收到partition的ack就算成功,2全部副本都写成功才算成功;数据可以存储多久,默认是两天;producer的数据会先存到缓存中,等大小或时间达到阈值时,flush到磁盘,consumer只能读到磁盘中的数据。

consumer从kafka里poll数据,poll到一定配置大小的数据放到内存中处理。每个group里的consumer共同消费全部的消息,不同group里的数据不能消费同样的数据,即每个group消费一组数据。

consumer的数量和partition的数量相等时消费的效率最高。这样,kafka可以横向的扩充broker数量和partitions;数据顺序写入磁盘;producer和consumer异步

二:环境搭建(windows)

2.1 安装zookeeper
kafka需要用到zookeeper,所以需要先安装zookeeper

到官网下载最新版zookeeper,http://www.apache.org/dyn/closer.cgi/zookeeper/
解压到指定路径
复制conf目录下zoo_sample.cfg,粘贴改名为zoo.cfg,修改zoo.cfg中的dataDir的值为E:/data/zookeeper,并添加一行dataLogDir=E:/log/zookeeper
修改系统环境变量,在Path后添加 ;E:\zookeeper\zookeeper-3.4.10\bin
运行cmd命令窗口,输入zkServer回车,启动
2.2 安装kafka
到官网下载最新版kafka,http://kafka.apache.org/downloads
解压到指定路径,如:E:\kafka_2.12-0.10.2.0
修改E:\kafka_2.12-0.10.2.0\config目录下的server.properties中 log.dirs的值为E:/log/kafka
添加系统环境变量,在Path后添加 ;E:\kafka_2.12-0.10.2.0\bin\windows
启动kafka,在cmd命令行用cd命令切换到kafka根目录E:\kafka_2.12-0.10.2.0,输入命令
.\bin\windows\kafka-server-start.bat .\config\server.properties
出现started (kafka.server.KafkaServer)字样表示启动成功
运行cmd命令行,创建一个topic,命令如下:
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
再打开一个cmd,创建一个Producer,命令如下:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
再打开一个cmd,创建一个Customer,命令如下:
kafka-console-consumer.bat --zookeeper localhost:2181 --topic test
在Producer窗口下输入信息进行测试 ,每输入一行回车后消息马上就会出现在Customer中,表明kafka已经安装测试成功

三:基于.net的常用类库

基于.net实现kafka的消息队列应用,常用的类库有kafka-net,Confluent.Kafka,官网推荐使用Confluent.Kafka,本文也是基于该库的实现,使用版本预发行版1.0.0-beta,创建控制台应用程序。

四:应用–生产者

生产者将数据发布到指定的主题,一般生产环境下的负载均衡,服务代理会有多个,BootstrapServers属性则为以逗号隔开的多个代理地址

/// <summary>
/// 生产者
/// </summary>
public static void Produce()
{var config = new ProducerConfig { BootstrapServers = "localhost:9092" }Action<DeliveryReportResult<Null, string>> handler = r =>Console.WriteLine(!r.Error.IsError? $"Delivered message to {r.TopicPartitionOffset}": $"Delivery Error: {r.Error.Reason}");using (var producer = new Producer<Null, string>(config)){// 错误日志监视producer.OnError += (_, msg) => { Console.WriteLine($"Producer_Erro信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };for (int i = 0; i < 5; i++){// 异步发送消息到主题producer.BeginProduce("MyTopic", new Message<Null, string> { Value = i.ToString() }, handler);}   // 3后 Flush到磁盘producer.Flush(TimeSpan.FromSeconds(3));}
}

五:应用–消费者

消费者使用消费者组名称标记自己,并且发布到主题的每个记录被传递到每个订阅消费者组中的一个消费者实例。消费者实例可以在单独的进程中,也可以在不同的机器

如果所有消费者实例具有相同的消费者组,则记录将有效地在消费者实例上进行负载平衡。

如果所有消费者实例具有不同的消费者组,则每个记录将广播到所有消费者进程

上图为两个服务器Kafka群集,托管四个分区(P0-P3),包含两个消费者组。消费者组A有两个消费者实例,B组有四个消费者实例。

默认EnableAutoCommit 是自动提交,只要从队列取出消息,偏移量自动移到后一位,无论消息后续处理成功与否,该条消息都会消失,所以为免除处理失败的数据丢失,消费者方可设置该属性为false,后面进行手动commint()提交偏移

<summary>/// 消费者/// </summary>public static void Consumer(){var conf = new ConsumerConfig{GroupId = "test-consumer-group",BootstrapServers = "localhost:9092",AutoOffsetReset = AutoOffsetResetType.Earliest,EnableAutoCommit = false  // 设置非自动偏移,业务逻辑完成后手动处理偏移,防止数据丢失};using (var consumer = new Consumer<Ignore, string>(conf)){// 订阅topicconsumer.Subscribe("MyTopic");// 错误日志监视 consumer.OnError += (_, msg) => { Console.WriteLine($"Consumer_Error信息:Code:{msg.Code};Reason:{msg.Reason};IsError:{msg.IsError}"); };while (true){try{var consume = consumer.Consume();string receiveMsg = consume.Value;Console.WriteLine($"Consumed message '{receiveMsg}' at: '{consume.TopicPartitionOffset}'.");// 开始我的业务逻辑...// 业务结束if(成功){consumer.Commit(new List<TopicPartitionOffset>() { consume.TopicPartitionOffset }); //手动提交偏移}}catch (ConsumeException e){Console.WriteLine($"Consumer_Error occured: {e.Error.Reason}");}}}}

常见数据问题处理

1.重复消费最常见的原因:re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session
timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
去重问题:消息可以使用唯一id标识
2.保证不丢失消息: 生产者(ack= -1 或 all 代表至少成功发送一次) 消费者
(offset手动提交,业务逻辑成功处理后,提交offset)
3.保证不重复消费:落表(主键或者唯一索引的方式,避免重复数据)
业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)
Kafka 可视化调试

借助可视化客户端工具 kafka tool
具体使用可参考:https://www.cnblogs.com/frankdeng/p/9452982.html

C#使用kafka消息队列相关推荐

  1. kafka消息队列的概念理解

    kafka在大数据.分布式架构中都很流行.kafka可以进行流式计算,也可以做为日志系统,还可以用于消息队列. kafka作为消息队列的优点: 分布式的系统 高吞吐量.即使存储了许多TB的消息,它也保 ...

  2. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯

    作者 | 陈屹       责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...

  3. 19 kafka消息队列

    文章目录 19 kafka消息队列 一.kafka介绍 1.消息队列基本介绍 2.常用的消息队列介绍 3.消息队列的应用场景 4.消息队列的两种模式 5.kafka的基本介绍 6.kafka的架构介绍 ...

  4. kafka 消息队列

    kafka 消息队列 kafka 架构原理 大数据时代来临,如果你还不知道Kafka那就真的out了!据统计,有三分之一的世界财富500强企业正在使用Kafka,包括所有TOP10旅游公司,7家TOP ...

  5. Java+Kafka消息队列

    本文主要针对,Java端对Kafka消息队列的生产和消费.Kafka的安装部署,请看查看相关文章. 笔者最近所用的是Spring mvc,监听文件路径,然后将读取到的文件内容发送到消息队列中.由另外系 ...

  6. SpringBoot集成Kafka消息队列

    1.说明 Spring可以方便的集成使用 Kafka消息队列 , 只需要引入依赖包spring-kafka, 注意版本兼容问题, 本文详细介绍SpringBoot集成Kafka的方法, 以及生产者和消 ...

  7. kafka消息队列应用总结

    kafka官网: Apache Kafka 公司使用阿里云提供的kafka消息队列服务,分别为测试环境与生产环境,部署了多个集群. 使用场景:应用对外提供API接口调用,同时支持kafka增量消息推送 ...

  8. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  9. Kafka—消息队列

    Kafka-消息队列(理论部分) 一.Kafka概述 1.1.简介 kafka是一个分布式的基于发布/订阅模式的消息队列 主要应用场景:大数据实时处理领域 1.2.什么是消息队列? 消息队列 = 消息 ...

  10. kafka消息队列使用场景

    一.消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构.目前使用较多的消息队列有ActiveMQ,Rabbit ...

最新文章

  1. Java常用API(二)String
  2. vue-router 路由跳转
  3. 在线js拼接html代码,关于js拼接html元素?
  4. php图片颤抖,PHP 判断图片是否带点赞(以抖音为例)
  5. Flask 富文本编辑器
  6. 2010-09-11
  7. 跨境电商卖家如何选择ERP系统?
  8. 合肥一六八2021高考成绩查询,合肥高考成绩查询入口
  9. MongoDB Sharding 机制分析
  10. vue点击改变data_vue 中自定义指令改变data中的值
  11. 【译】Jep 文档(2)——基本用法(Basic Usage)
  12. Axure 9 实战案例,母版的应用 3,用母版绘制高逼格APP原型
  13. 【基因组学】系统发育分析-进化树的相关知识点
  14. Openwrt:创建编译IPK软件包
  15. 前端Echarts数据可视化
  16. 20种银河科幻风格ps字体样式
  17. Linux系统的应用软件流程图绘制软件yEd
  18. 百度网盘8种批量修改文件名称重命名的方法
  19. 计算机如何更新目录,wps怎么更新目录【具体阐明】
  20. 获取微信小程序的APPID及任意一个页面的路径信息

热门文章

  1. Node 非服务脚本调试
  2. js,jq表格/文本内容溢出,用三个点替代,鼠标悬停时显示全部内容
  3. 台式电脑右下角声音图标有红色×解决办法(打开GUI)
  4. 【Python】Python应用:8个水果随机分配给3个同学
  5. OpenCV图像边缘检测(Laplace算法)
  6. 用PS临摹悟空(黑神话)
  7. php诗词,古诗词停止或快速完成
  8. 黑客入门指南,学习黑客必须掌握的技术
  9. 【Blender教程】在Blender中制作森系少女(下)
  10. 藏獒经济崩盘 为何青藏高原会有如此之多的流浪藏獒