kafka 生产和消费信息入门
启动生产者
kafka-console-producer.sh \
--broker-list mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
启动消费者
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet
默认情况下,消费者获取的是启动后生产者生产的信息
,如果想要获取启动之前的所有消息,需要加参数--from-beginning
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic pet \
--from-beginning
指定offset获取信息
如果想从指定位置
获取信息,就可以指定offset
信息.
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic cat \
--partition 0 \
--group group1 \
--offset earliest
使用offset
参数必须同时指定partition
参数,否走会报错,如下所示
[root@mypc01 /]# kafka-console-consumer.sh \
> --bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
> --topic cat \
> --offset earliest
The partition is required when offset is specified.
在 Kafka
中,每当消费者组内的消费者查找不到所记录的消费位移或发生位移越界时,就会根据消费者客户端参数 auto.offset.reset
的配置来决定从何处开始进行消费,这个参数的默认值为 “latest
” 。
auto.offset.reset 的值可以为 earliest、latest 和 none 。
各含义在真实情况如下所示:
earliest :
当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;
无提交的 offset 时,从头开始消费。
latest :
当各分区下存在已提交的 offset 时,从提交的 offset 开始消费;
无提交的 offset 时,消费该分区下新产生的数据。
none :
topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;
只要有一个分区不存在已提交的offset,则抛出异常。
kafka从头开始消费所有信息
(1)使用一个全新的"group.id"(就是之前没有被任何消费者使用过);(2)指定"auto.offset.reset"参数的值为earliest;
比如
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic cat \
--partition 0 \
--group group1 \
--offset earliest
或者也可以
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic catt \
--partition 0 \
--group group1 \
--offset 0
或者
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic cat \
--partition 0 \
--group group1 \
--from-beginning
以上的全部消息指的是某分区的全部消息
,并不是所有分区的全部消息,如果要消费所有分区的所以消息,可以用这个,不指定分区,然后用from-beginning
参数
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--topic cat \
--from-beginning
查看某个消费组的消费情况
比如你想知道现在的offset是多少,就是消费到什么位置了.可以如下操作
[root@mypc01 /]# kafka-consumer-groups.sh \
> --bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
> --describe \
> --group group1
Note: This will not show information about old Zookeeper-based consumers.
Consumer group 'group1' has no active members.TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
pet 0 16474 16474 0 - - -
查看总得消费条数
ctrl+c
Processed a total of 16474 messages
消息信息的无序说明
就是你获取的信息和发送消息的顺序
是不一样的~
1 指的是分区间数据无序,分区内有序
2 可以使用–from-beginning来显示效果
获取所有分区的全部消息,不一定是先获取哪一个分区的数据.
消费者组与Partition
1 消费者是可以分组的
2 生产者的信息可以被多个组的消费者同时消费,组与组之间不受影响
3 同一个组内的消费者,一个消费者只能消费一个分区的数据. 前提 分区数==组内消费者数量;
如果消费者数量大于分区数量,多余的消费者闲着;
如果消费者的数量<分区数量,某些消费者可以消费多个分区的数据.
kafka消费者在消费数据的时候,都是分组别的。不同组的消费不受影响,相同组内的消费,需要注意,
- 如果partition有3个,消费者有3个,那么便是每一个消费者消费其中一个partition对应的数据;
- -如果有2个消费者,此时一个消费者消费其中一个partition数据,另一个消费者消费2个partition的数据。
- 如果有超过3个的消费者,同一时间只能最多有3个消费者能消费得到数据。
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--group g1 \
--topic pet
kafka-console-consumer.sh \
--bootstrap-server mypc01:9092,mypc02:9092,mypc03:9092 \
--group g2 \
--topic pet
报错
如何报如下错误,说明有一台kafka
没有启动
WARN [Consumer clientId=consumer-1, groupId=console-consumer-98098] Connection to node -3 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
kafka 生产和消费信息入门相关推荐
- python使用kafka生产和消费案例
// confluent_kafka 使用案例 import json from confluent-kafka import Producertopic_name = "" co ...
- Kafka生产与消费脚本工具
创建主题 [root@localhost bin]# ./kafka-topics.sh --bootstrap-server 192.168.10.141:9092,192.168.10.142:9 ...
- Kafka 命令之查看topic生产消费数据查看组的消费信息
1.创建 topic: [root@node1 bin]# ./kafka-topics.sh --zookeeper node2:2181,node3:2181,node3:2181 --creat ...
- kafka java_Kafka 使用Java实现数据的生产和消费demo
前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...
- KAFKA 最新版 Shell API单机生产与消费
文章目录 一.KAFKA 启动与监控 二.KAFKA 主题创建于查看生产与消费 2.1. 查看主题列表 2.2. 创建主题 2.3. 查看主题信息 2.4. 主题信息分析 三.KAFKA 主题创建于查 ...
- kafka生产消费原理笔记
一.什么是kafka Kafka是最初由Linkedin公司开发,是一个分布式.支持分区的(partition).多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性 ...
- springboot 整合kafka 实现生产,消费数据
一 kafka集群的启动 1.1 机器说明 192.168.152.128 master 192.168.152.129 slaver01 192.168.152.130 slaver02 1.2 查 ...
- Kafka实现消息生产和消费
文章目录 一.Kafka测试消息生产与消费 二.Java程序进行Kafka收发消息 1.消息生产者 2.消息消费者 一.Kafka测试消息生产与消费 # 首先创建一个主题 [root@192 kafk ...
- kafka同一个group 消费两个topic吗_MQ: 一张图读懂kafka工作原理
1.关于kafka Kafka是由Apache软件基金会开发的一个开源消息队列,由Scala和Java编写. 相关文章参考: MQ: 消息队列常见应用场景及主流消息队列ActiveMQ.RabbitM ...
最新文章
- Numpy掩码数组masked arrays,这一篇就够了
- matlab与excel的交互
- Linux2.6--Linus电梯
- travis-cli 使用
- kafka消费报错:org.apache.kafka.common.errors.WakeupException: null
- ubuntu安装过程中遇到的一些问题及解决办法。
- node 版本管理器 之 nvm 安装与使用
- oracle创建用户、创建表空间、授权、建表
- html转换为pdf的笔顺,笔顺正确写法,很全面.pdf
- 度量衡计算工具_度量衡计量单位换算转换器
- 云加数支撑商业地产转型——无止境的商业想象力测试
- 狂野飙车8服务器在哪个文件夹,狂野飙车8数据包安装存放位置详解
- [.NET源码] asp.net中手机版和PC版识别
- 局域网文件共享的几种方法
- 哈罗数据分析(SQL)笔试
- 西语加计算机专业怎么样,如何在电脑上添加西班牙语输入法?
- 2021-2025年中国口服降糖药和胰岛素类似物行业市场供需与战略研究报告
- pta乙级1031查验身份证(AC)
- MetaQ中间件原理浅析
- Moonlight Clan
热门文章
- github使用指南_GitHub 上的 12 个骚操作
- j循环赋予li id_《Science》子刊:超强Li-S电池诞生!硫负载量高达15mg·cm-2
- MySQL中查询字段为空或者为null方法
- MySQL安装和修改密码
- getaway网关转发去前缀_蚂蚁金服 Service Mesh 大规模落地系列 - 网关篇
- linux内核中union,Linux上的Union mount
- python通过什么对象连接数据库步骤_python如何连接数据库
- informatica如何设置失败告警_智能运维中的关键一步——告警管理
- java微信刷卡支付demo_微信刷卡支付例子
- 本地更新github项目_GitHub开源项目20200627更新精选