Kafka分布式消费学习
Kafka分布式消费学习
目录:
1、Logstash input Kafka配置参数解析:
2、Kafka的Topic命令查看:
3、单机多进程实现Kafka的at least once分布式消费:
4、多机多进程实现Kafka分布式消费:
1、Logstash input Kafka配置参数解析:
kafka{bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"topics => ["remoa3"]consumer_threads => 6decorate_events => truegroup_id => "remoa3"
}
l topics:
官网描述:Value type is array
Default value is ["logstash"]
A list of topics to subscribe to, defaults to ["logstash"].
翻译:值类型是数组,默认值为[“logstash”]。
要订阅的主题列表,默认为[“logstash”]。
l consumer_threads:
官网描述:Value type is number
Default value is 1
Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will be idle
翻译:消费者线程:
值类型是数字。默认值为1。
理想情况下,你应该拥有与完美平衡的分区数一样多的线程数,比分区更多的线程意味着有些线程将空闲。
l decorate_events
官网描述:Value type is boolean
Default value is false
Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with offset: The offset from the partition this message is associated with key: A ByteBuffer containing the message key
翻译:装饰事件:
值类型是布尔型,默认值为false。
可以将Kafka元数据(如主题,消息大小)添加到事件中。这将添加一个名为kafka的字段到包含以下属性的logstash事件:
主题
该消息与consumer_group相关联的主题
用于读取这个事件分区的消费者组
此消息与偏移量相关联的分区
此消息与密钥相关联的分区偏移量
包含消息密钥的BtyeBuffer
即设置为True时在输出消息时会输出自身的信息,如Topic来源,消费消息的大小,消费者的group信息等等。
l group_id:
官网描述:Value type is string
Default value is "logstash"
The identifier of the group this consumer belongs to. Consumer group is a single logical subscriber that happens to be made up of multiple processors. Messages in a topic will be distributed to all Logstash instances with the same group_id
翻译:group_id:
值类型是字符串。默认值为”logstash”
该消费者所属的组的标识符。消费者组是单个逻辑用户,是由多个处理器组成。主题中的消息将分发给具有相同group_id的所有的Logstash实例。
即不同的组之间消费是互补影响的,相互隔离的。
2、Kafka的Topic命令查看:
(1)创建Topic:
执行:bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --create --topic remoa2 --partitions 3 --replication-factor 1
创建名为remoa2的Topic,使用3个分区分别存放数据,复制因子为1,数据备份共1份。
图2.1 截图1
(2)显示名为remoa2的Topic的详细信息:
执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa2 --describe
图2.2 截图2
第一行列出了该Topic的总体情况,包括Topic名称为remoa2,分区数量为3,副本数量为1。
Partition:分区
Leader:负责读写指定分区的节点
Replicas:复制该分区日志的节点列表
Isr:”in-sync replicas”,当前活跃的副本列表
(3)显示所有的Topic的详细信息:
执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --describe
图2.3 截图3
(4)删除Topic:
执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --delete --topic topic1,topic2,topic3,topic4
删除名为topic1、topic2、topic3、topic4的Topic。
图2.4 截图4
(5)查看已存在Topic列表:
pwd所在目录为:/opt/package/kafka_2.10-0.10.1.0/bin
执行:bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --list
图2.5 截图5
(6)修改Topic分区数量:
执行:bash kafka-topics.sh --alter --zookeeper hdp1.example.com:2181/kafka_010 --topic remoaindex --partitions 4
将名称为remoaindex的Topic的分区修改为4。
图2.6 截图6
执行--describe参数可以查看到remoaindex的Topic分区数量修改为了4。
图2.7 截图7
3、单机多进程实现Kafka的at least once分布式消费:
(1)注意:
A)使用多个Logstash端协同消费同一个Topic的话,需要把两个或是多个Logstash消费端配置成相同的group_id和topic_id,同时需要把相应的Topic分成多个分区,多个消费者消费是无法保证消息的消费顺序性的。
B)Kafka的消息模型是对Topic分区以达到分布式效果。每个Topic下的不同的partitions只能有一个Owner去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由Server端协调而成。消息的消费是无序的。
C)若要保证消息的顺序,则使用一个partition。Kafka的每个partition只能同时被同一个group中的consumer消费。
(2)试验原理说明:
A)Kafka保证同一个consumer group中只有一个consumer会消费某条消息。Kafka保证在稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。
B)三种情况:
①如果某个consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据。
②如果某个consumer group中consumer数量等于partition数量,则正好一个consumer消费一个partition的数据。
③如果某个consumer group中consumer数量多于partition数量,会有部分的consumer无法消费该Topic下任何一条消息。
C)at least once:消息绝不会丢,但可能会重复传输
D)at most once:消息可能会丢,但绝不会重复传输
E)Exactly once:每条消息肯定会被传输一次且仅传输一次。
试验测试第一种情况,consumer数量为2,partition数量为3。
(3)producer.conf脚本及consumer.conf脚本内容如下:
A)producer.conf脚本内容:
input {beats{port => 5044}
}filter{if "beats_input_codec_plain_applied" in [tags]{mutate{remove_tag => ["beats_input_codec_plain_applied"]}}grok{patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"match => {"message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"}}
}output {stdout{codec => rubydebug}kafka{topic_id => "remoa2"bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"compression_type => "none"acks => "1"}
}
B)consumer1.conf及consumer2.conf脚本内容:
input{kafka{bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"topics => ["remoa2"]consumer_threads => 3decorate_events => truegroup_id => "remoa2"}
}filter{if "beats_input_codec_plain_applied" in [tags]{mutate{remove_tag => ["beats_input_codec_plain_applied"]}}grok{patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"match => {"message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"}}
}output{stdout{codec => rubydebug}elasticsearch{hosts => ["kdc1.example.com:9200","kdc2.example.com:9200"]user => logstashpassword => logstashaction => "index"index => "logstash-kafka1-%{+YYYY.MM.dd}"truststore => "/opt/package/logstash-5.2.2/config/keys/truststore.jks"truststore_password => whoamissl => truessl_certificate_verification => truecodec => "json"}
}
(4)在单机中启动Filebeat,然后执行两个相同的Logstash消费者脚本,其Logstash消费端配置成相同的group_id和topic_id。
执行:service filebeat start
bash ../../bin/logstash -f producer.conf
bash ../../bin/logstash -f consumer2.conf
bash ../../bin/logstash -f consumer1.conf
(5)查看到producer.conf的Logstash output Kafka中的标准输出:
图3.1 截图8
(6)查看到consumer1.conf的标准输出:
图3.2 截图9
图3.3 截图10
通过上下滑动查看标准输出,查看到consumer1其消费了两个分区的数据,为分区1和分区2,标准输出结果里分区1及分区2的输出结果随机交替出现。
(7)查看到consumer2.conf的标准输出:
图3.4 截图11
查看到consumer2消费的分区为分区0。
(8)查看名为remoa2的Topic的具体信息:
bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa2 --describe
图3.5 截图12
(9)在Kibana中查看到Elasticsearch中对应index名为logstash-kafka1-2017.09.13的具体内容:
GET logstash-kafka1-2017.09.13/_search
图3.6 截图13
图3.7 截图14
4、多机多进程实现Kafka分布式消费:
(1)创建名为remoa3的Topic,使用6个分区,数据备份一份。
bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --create --topic remoa3 --partitions 6 --replication-factor 1
(2)在两台机子分别启动三个消费者进程,两台机的logstash版本分别为5.2.2及5.2.1,实测版本兼容不影响测试。
conf脚本文件如下:
input{kafka{bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"topics => ["remoa3"]consumer_threads => 6decorate_events => truegroup_id => "remoa3"}
}filter{if "beats_input_codec_plain_applied" in [tags]{mutate{remove_tag => ["beats_input_codec_plain_applied"]}}grok{patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"match => {"message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"}}
}output{stdout{codec => rubydebug}elasticsearch{hosts => ["kdc1.example.com:9200","kdc2.example.com:9200"]user => logstashpassword => logstashaction => "index"index => "logstash-kafka3-%{+YYYY.MM.dd}"truststore => "/opt/package/logstash-5.2.2/config/keys/truststore.jks"truststore_password => whoamissl => truessl_certificate_verification => truecodec => "json"}
}
(3)当启动filebeat进行日志采集后,可以查看到两台主机中的三个logstash进程都进行了消费。
图4.1 截图15
A)查看hdp1机中的三个logstash进程消费情况:
第一个进程消费了partition1中的数据,共消费数据15条。
图4.2 截图16
第二个进程消费了partition3中的数据,共消费数据16条。
图4.3 截图17
第三个进程消费了partition4中的数据,共消费数据15条。
图4.4 截图18
B)查看hdp2机中的三个logstash进程消费情况:
第一个进程消费了partition5中的数据,共消费数据15条。
图4.5 截图19
第二个进程消费了partition0中的数据,共消费数据16条。
图4.6 截图20
第三个进程消费了partition2中的数据,共消费数据16条。
图4.7 截图21
可以查看到六个消费者进程六个分区,则每个消费者消费一个分区的数据。16 + 16 + 15 + 15 + 16 + 15 = 93条数据,与测试文本中数据总量保持一致,实现了多机多进程分布式消费。
(4)查看名为remoa3的Topic的详细信息:
bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa3 --describe
图4.8 截图22
(5)当然,相应的在Kibana也能够查看到结果。
GET _cat/indices
GET logstash-kafka3-2017.09.13/_search
图4.9 截图23
图4.10 截图24
Kafka分布式消费学习相关推荐
- (十七)java版spring cloud+spring boot 社交电子商务平台-spring+springmvc+kafka分布式消息中间件集成方案...
电子商务平台源码请加企鹅求求:一零三八七七四六二六.kafka消息平台使用spring+kafka的集成方案,详情如下: 使用最高版本2.1.0.RELEASE集成jar包:spring-integr ...
- Kafka入门篇学习笔记整理
Kafka入门篇学习笔记整理 Kafka是什么 Kafka的特性 应用场景 Kafka的安装 单机版部署 集群部署环境准备 Kafka 2.x集群部署 Kafka 3.x集群部署 监听器和内外网络 K ...
- Kafka 顺序消费方案
欢迎关注方志朋的博客,回复"666"获面试宝典 来源:blog.csdn.net/qq_38245668/article/ details/105900011 前言 本文针对解决K ...
- 搭建高吞吐量 Kafka 分布式发布订阅消息 集群
搭建高吞吐量 Kafka 分布式发布订阅消息 集群 简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区. ...
- 正确处理kafka多线程消费的姿势
最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...
- kafka生产消费原理笔记
一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...
- Kafka 分布式消息队列介绍
Kafka 分布式消息队列 类似产品有JBoss.MQ 一.由Linkedln 开源,使用scala开发,有如下几个特点: (1)高吞吐 (2)分布式 (3)支持多语言客户端 (C++.Java) 二 ...
- spring+springmvc+kafka分布式消息中间件集成方案
Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案: kafka消息平台使用spring+kafka的集成方案,详情如下: 1. 使用最高版 ...
- 答读者问:Kafka顺序消费吞吐量下降该如何优化?
大家好,我是威哥,<RocketMQ技术内幕>一书作者,荣获RocketMQ官方社区优秀布道师.CSDN2020博客执之星Top2等荣誉称号.目前担任中通快递技术平台部资深架构师,主要负责 ...
最新文章
- 算法工程师的必备学习资料,《AI算法工程师手册》正式开源了
- hdu 1544(求回文子串的个数)
- 窗口刷新 Invalidate UpdateWindow RedrawWindow
- 主板电源开关接口图解_组装电脑时主板跳线如何接?DIY装机主板接线教程
- 使用live555制作rtsp客户端,捕获h264等解码
- 微软为 Windows Terminal 推出全新 logo
- 鸿蒙不如安卓PPT,被吐槽为PPT、“哄蒙”、安卓套壳,华为鸿蒙一路走来真不容易...
- Spark内核解析之二:Spark 部署模式
- Java程序员必会的工具库,让你的代码量减少90%!
- Python 上传文件到阿里云OSS
- linux中孚软件,中孚主机监控与审计系统
- 基于LED恒流驱动芯片芯鼎盛TX6122设计的DC-DC降压恒流DEMO
- linux在gpt分区装系统,linux安装到GPT分区
- linux tac文件最后五行,tac命令以及各种linux文件查看命令
- Could not set property ‘XXX‘ of ‘class com.entity.XXX‘
- 写给新的一年(2015)
- pytorch Vgg网络模型
- 【Markdown】Typora中文手册
- 六大学习趋势正重塑在线教育产业-网络线上教学
- Linux下集群的搭建