Kafka分布式消费学习

目录:

1、Logstash input Kafka配置参数解析:

2、Kafka的Topic命令查看:

3、单机多进程实现Kafka的at least once分布式消费:

4、多机多进程实现Kafka分布式消费:


1、Logstash input Kafka配置参数解析:

kafka{bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"topics => ["remoa3"]consumer_threads => 6decorate_events => truegroup_id => "remoa3"
}

l topics:

官网描述:Value type is array

Default value is ["logstash"]

A list of topics to subscribe to, defaults to ["logstash"].

翻译:值类型是数组,默认值为[“logstash”]。

要订阅的主题列表,默认为[“logstash”]。

l consumer_threads:

官网描述:Value type is number

Default value is 1

Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will be idle

翻译:消费者线程:

值类型是数字。默认值为1。

理想情况下,你应该拥有与完美平衡的分区数一样多的线程数,比分区更多的线程意味着有些线程将空闲。

l decorate_events

官网描述:Value type is boolean

Default value is false

Option to add Kafka metadata like topic, message size to the event. This will add a field named kafka to the logstash event containing the following attributes: topic: The topic this message is associated with consumer_group: The consumer group used to read in this event partition: The partition this message is associated with offset: The offset from the partition this message is associated with key: A ByteBuffer containing the message key

翻译:装饰事件:

值类型是布尔型,默认值为false。

可以将Kafka元数据(如主题,消息大小)添加到事件中。这将添加一个名为kafka的字段到包含以下属性的logstash事件:

主题

该消息与consumer_group相关联的主题

用于读取这个事件分区的消费者组

此消息与偏移量相关联的分区

此消息与密钥相关联的分区偏移量

包含消息密钥的BtyeBuffer

即设置为True时在输出消息时会输出自身的信息,如Topic来源,消费消息的大小,消费者的group信息等等。

l group_id:

官网描述:Value type is string

Default value is "logstash"

The identifier of the group this consumer belongs to. Consumer group is a single logical subscriber that happens to be made up of multiple processors. Messages in a topic will be distributed to all Logstash instances with the same group_id

翻译:group_id:

值类型是字符串。默认值为”logstash”

该消费者所属的组的标识符。消费者组是单个逻辑用户,是由多个处理器组成。主题中的消息将分发给具有相同group_id的所有的Logstash实例。

即不同的组之间消费是互补影响的,相互隔离的。

2、Kafka的Topic命令查看:

(1)创建Topic:

执行:bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --create --topic remoa2 --partitions 3 --replication-factor 1

创建名为remoa2的Topic,使用3个分区分别存放数据,复制因子为1,数据备份共1份。

图2.1 截图1

(2)显示名为remoa2的Topic的详细信息:

执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa2 --describe

图2.2 截图2

第一行列出了该Topic的总体情况,包括Topic名称为remoa2,分区数量为3,副本数量为1。

Partition:分区

Leader:负责读写指定分区的节点

Replicas:复制该分区日志的节点列表

Isr:”in-sync replicas”,当前活跃的副本列表

(3)显示所有的Topic的详细信息:

执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --describe

图2.3 截图3

(4)删除Topic:

执行:bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --delete --topic topic1,topic2,topic3,topic4

删除名为topic1、topic2、topic3、topic4的Topic。

图2.4 截图4

(5)查看已存在Topic列表:

pwd所在目录为:/opt/package/kafka_2.10-0.10.1.0/bin

执行:bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --list

图2.5 截图5

(6)修改Topic分区数量:

执行:bash kafka-topics.sh --alter --zookeeper hdp1.example.com:2181/kafka_010 --topic remoaindex --partitions 4

将名称为remoaindex的Topic的分区修改为4。

图2.6 截图6

执行--describe参数可以查看到remoaindex的Topic分区数量修改为了4。

图2.7 截图7

3、单机多进程实现Kafka的at least once分布式消费:

(1)注意:

A)使用多个Logstash端协同消费同一个Topic的话,需要把两个或是多个Logstash消费端配置成相同的group_id和topic_id,同时需要把相应的Topic分成多个分区,多个消费者消费是无法保证消息的消费顺序性的。

B)Kafka的消息模型是对Topic分区以达到分布式效果。每个Topic下的不同的partitions只能有一个Owner去消费。所以只有多个分区后才能启动多个消费者,对应不同的区去消费。其中协调消费部分是由Server端协调而成。消息的消费是无序的。

C)若要保证消息的顺序,则使用一个partition。Kafka的每个partition只能同时被同一个group中的consumer消费。

(2)试验原理说明:

A)Kafka保证同一个consumer group中只有一个consumer会消费某条消息。Kafka保证在稳定状态下每一个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。

B)三种情况:

①如果某个consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据。

②如果某个consumer group中consumer数量等于partition数量,则正好一个consumer消费一个partition的数据。

③如果某个consumer group中consumer数量多于partition数量,会有部分的consumer无法消费该Topic下任何一条消息。

C)at least once:消息绝不会丢,但可能会重复传输

D)at most once:消息可能会丢,但绝不会重复传输

E)Exactly once:每条消息肯定会被传输一次且仅传输一次。

试验测试第一种情况,consumer数量为2,partition数量为3。

(3)producer.conf脚本及consumer.conf脚本内容如下:

A)producer.conf脚本内容:

input {beats{port => 5044}
}filter{if "beats_input_codec_plain_applied" in [tags]{mutate{remove_tag => ["beats_input_codec_plain_applied"]}}grok{patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"match => {"message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"}}
}output {stdout{codec => rubydebug}kafka{topic_id => "remoa2"bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"compression_type => "none"acks => "1"}
}

B)consumer1.conf及consumer2.conf脚本内容:

input{kafka{bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"topics => ["remoa2"]consumer_threads => 3decorate_events => truegroup_id => "remoa2"}
}filter{if "beats_input_codec_plain_applied" in [tags]{mutate{remove_tag => ["beats_input_codec_plain_applied"]}}grok{patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"match => {"message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"}}
}output{stdout{codec => rubydebug}elasticsearch{hosts => ["kdc1.example.com:9200","kdc2.example.com:9200"]user => logstashpassword => logstashaction => "index"index => "logstash-kafka1-%{+YYYY.MM.dd}"truststore => "/opt/package/logstash-5.2.2/config/keys/truststore.jks"truststore_password => whoamissl => truessl_certificate_verification => truecodec => "json"}
}

(4)在单机中启动Filebeat,然后执行两个相同的Logstash消费者脚本,其Logstash消费端配置成相同的group_id和topic_id。

执行:service filebeat start

bash ../../bin/logstash -f producer.conf

bash ../../bin/logstash -f consumer2.conf

bash ../../bin/logstash -f consumer1.conf

(5)查看到producer.conf的Logstash output Kafka中的标准输出:

图3.1 截图8

(6)查看到consumer1.conf的标准输出:

图3.2 截图9

图3.3 截图10

通过上下滑动查看标准输出,查看到consumer1其消费了两个分区的数据,为分区1和分区2,标准输出结果里分区1及分区2的输出结果随机交替出现。

(7)查看到consumer2.conf的标准输出:

图3.4 截图11

查看到consumer2消费的分区为分区0。

(8)查看名为remoa2的Topic的具体信息:

bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa2 --describe

图3.5 截图12

(9)在Kibana中查看到Elasticsearch中对应index名为logstash-kafka1-2017.09.13的具体内容:

GET logstash-kafka1-2017.09.13/_search

图3.6 截图13

图3.7 截图14

4、多机多进程实现Kafka分布式消费:

(1)创建名为remoa3的Topic,使用6个分区,数据备份一份。

bash kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --create --topic remoa3 --partitions 6 --replication-factor 1

(2)在两台机子分别启动三个消费者进程,两台机的logstash版本分别为5.2.2及5.2.1,实测版本兼容不影响测试。

conf脚本文件如下:

input{kafka{bootstrap_servers => "hdp1.example.com:9092"security_protocol => "SASL_PLAINTEXT"sasl_kerberos_service_name => "kafka"jaas_path => "/tmp/kafka_jaas.conf.demouser"kerberos_config => "/etc/krb5.conf"topics => ["remoa3"]consumer_threads => 6decorate_events => truegroup_id => "remoa3"}
}filter{if "beats_input_codec_plain_applied" in [tags]{mutate{remove_tag => ["beats_input_codec_plain_applied"]}}grok{patterns_dir => "/opt/package/logstash-5.2.2/config/patterns/filter1pattern"match => {"message" => "<%{INT:systemType}>%{MYTIME:logTime}\s*%{MYCOMMAND:command}\[%{INT:pid}\]:%{MYOPERATE:operate}"}}
}output{stdout{codec => rubydebug}elasticsearch{hosts => ["kdc1.example.com:9200","kdc2.example.com:9200"]user => logstashpassword => logstashaction => "index"index => "logstash-kafka3-%{+YYYY.MM.dd}"truststore => "/opt/package/logstash-5.2.2/config/keys/truststore.jks"truststore_password => whoamissl => truessl_certificate_verification => truecodec => "json"}
}

(3)当启动filebeat进行日志采集后,可以查看到两台主机中的三个logstash进程都进行了消费。

图4.1 截图15

A)查看hdp1机中的三个logstash进程消费情况:

第一个进程消费了partition1中的数据,共消费数据15条。

图4.2 截图16

第二个进程消费了partition3中的数据,共消费数据16条。

图4.3 截图17

第三个进程消费了partition4中的数据,共消费数据15条。

图4.4 截图18

B)查看hdp2机中的三个logstash进程消费情况:

第一个进程消费了partition5中的数据,共消费数据15条。

图4.5 截图19

第二个进程消费了partition0中的数据,共消费数据16条。

图4.6 截图20

第三个进程消费了partition2中的数据,共消费数据16条。

图4.7 截图21

可以查看到六个消费者进程六个分区,则每个消费者消费一个分区的数据。16 + 16 + 15 + 15 + 16 + 15 = 93条数据,与测试文本中数据总量保持一致,实现了多机多进程分布式消费。

(4)查看名为remoa3的Topic的详细信息:

bash /opt/package/kafka_2.10-0.10.1.0/bin/kafka-topics.sh --zookeeper hdp1.example.com:2181/kafka_010 --topic remoa3 --describe

图4.8 截图22

(5)当然,相应的在Kibana也能够查看到结果。

GET _cat/indices

GET logstash-kafka3-2017.09.13/_search

图4.9 截图23

图4.10 截图24

Kafka分布式消费学习相关推荐

  1. (十七)java版spring cloud+spring boot 社交电子商务平台-spring+springmvc+kafka分布式消息中间件集成方案...

    电子商务平台源码请加企鹅求求:一零三八七七四六二六.kafka消息平台使用spring+kafka的集成方案,详情如下: 使用最高版本2.1.0.RELEASE集成jar包:spring-integr ...

  2. Kafka入门篇学习笔记整理

    Kafka入门篇学习笔记整理 Kafka是什么 Kafka的特性 应用场景 Kafka的安装 单机版部署 集群部署环境准备 Kafka 2.x集群部署 Kafka 3.x集群部署 监听器和内外网络 K ...

  3. Kafka 顺序消费方案

    欢迎关注方志朋的博客,回复"666"获面试宝典 来源:blog.csdn.net/qq_38245668/article/ details/105900011 前言 本文针对解决K ...

  4. 搭建高吞吐量 Kafka 分布式发布订阅消息 集群

    搭建高吞吐量 Kafka 分布式发布订阅消息 集群 简介 Kafka 是一种高吞吐的分布式发布订阅消息系统,能够替代传统的消息队列用于解耦合数据处理,缓存未处理消息等,同时具有更高的吞吐率,支持分区. ...

  5. 正确处理kafka多线程消费的姿势

    最近项目开发过程使用kafka作为项目模块间负载转发器,实现实时接收不同产品线消息,分布式准实时消费产品线消息.通过kafka作为模块间的转换器,不仅有MQ的几大好处:异步. 解耦. 削峰等几大好处, ...

  6. kafka生产消费原理笔记

    一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...

  7. Kafka 分布式消息队列介绍

    Kafka 分布式消息队列 类似产品有JBoss.MQ 一.由Linkedln 开源,使用scala开发,有如下几个特点: (1)高吞吐 (2)分布式 (3)支持多语言客户端 (C++.Java) 二 ...

  8. spring+springmvc+kafka分布式消息中间件集成方案

    Honghu的消息服务平台已经抛弃了之前的ActiveMQ,改用高吞吐量比较大的Kafka分布式消息中间件方案: kafka消息平台使用spring+kafka的集成方案,详情如下: 1. 使用最高版 ...

  9. 答读者问:Kafka顺序消费吞吐量下降该如何优化?

    大家好,我是威哥,<RocketMQ技术内幕>一书作者,荣获RocketMQ官方社区优秀布道师.CSDN2020博客执之星Top2等荣誉称号.目前担任中通快递技术平台部资深架构师,主要负责 ...

最新文章

  1. 算法工程师的必备学习资料,《AI算法工程师手册》正式开源了
  2. hdu 1544(求回文子串的个数)
  3. 窗口刷新 Invalidate UpdateWindow RedrawWindow
  4. 主板电源开关接口图解_组装电脑时主板跳线如何接?DIY装机主板接线教程
  5. 使用live555制作rtsp客户端,捕获h264等解码
  6. 微软为 Windows Terminal 推出全新 logo
  7. 鸿蒙不如安卓PPT,被吐槽为PPT、“哄蒙”、安卓套壳,华为鸿蒙一路走来真不容易...
  8. Spark内核解析之二:Spark 部署模式
  9. Java程序员必会的工具库,让你的代码量减少90%!
  10. Python 上传文件到阿里云OSS
  11. linux中孚软件,中孚主机监控与审计系统
  12. 基于LED恒流驱动芯片芯鼎盛TX6122设计的DC-DC降压恒流DEMO
  13. linux在gpt分区装系统,linux安装到GPT分区
  14. linux tac文件最后五行,tac命令以及各种linux文件查看命令
  15. Could not set property ‘XXX‘ of ‘class com.entity.XXX‘
  16. 写给新的一年(2015)
  17. pytorch Vgg网络模型
  18. 【Markdown】Typora中文手册
  19. 六大学习趋势正重塑在线教育产业-网络线上教学
  20. Linux下集群的搭建

热门文章

  1. JavaScript 异步操作之回调函数
  2. 有 3 个候选人,每个选民只能投票选一人,要求编一个统计选票的程序,先后输入被选人的名字,最后输出各人得票结果
  3. Python视频学习(八、MySQL)
  4. php实现Stripe支付
  5. 多楼层室内环境下的三维几何重建
  6. 阿里云RDS——产品系列概述
  7. Ecshop支付宝插件SQL注入及漏洞利用(exp)
  8. CSS实现文本溢出显示省略号
  9. ogg怎么转mp3格式?三个方法学习起来!
  10. 极其精简的PHP框架WJW