CC00034.kafka——|Hadoopkafka.V19|——|kafka.v19|消费者位移管理.v02|
### --- 准备数据~~~ # 生成消息文件
[root@hadoop ~]# for i in `seq 60`; do echo "hello yanqi $i" >> nm.txt; done
~~~ # 创建主题,三个分区,每个分区一个副本
[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create \
--topic tp_demo_01 --partitions 3 --replication-factor 1~~~ # 查看创建的主题
[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list
tp_demo_01
~~~ # 将消息生产到主题中
[root@hadoop ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic \
tp_demo_01 < nm.txt
~~~输出参数
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>~~~ # 查看里面的消息
[root@hadoop ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tp_demo_01 --from-beginning
hello yanqi 2
~~~省略部分参数
hello yanqi 60
### --- 添加pom.xml依赖<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.2</version></dependency></dependencies>
### --- API实战:MyConsumerpackage com.yanqi.kafka.demo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;public class MyConsumer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// group.id很重要configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);consumer.subscribe(Arrays.asList("tp_demo_01"));while (true) {final ConsumerRecords<String, String> records = consumer.poll(1_000);records.forEach(new Consumer<ConsumerRecord<String, String>>() {@Overridepublic void accept(ConsumerRecord<String, String> record) {System.out.println(record);}});}}
}
### --- MyOffsetManagerpackage com.yanqi.kafka.demo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;
import java.util.function.BiConsumer;public class MyOffsetManager {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// group.id很重要configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);// consumer.subscribe(Collections.singleton("tp_demo_01"));// 如何手动给消费者分配分区?// 1、需要知道有哪些主题可以访问,和消费// 获取当前消费者可以访问和消费的主题以及它们的分区信息
// final Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
//
// stringListMap.forEach(new BiConsumer<String, List<PartitionInfo>>() {
// @Override
// public void accept(String topicName, List<PartitionInfo> partitionInfos) {
// System.out.println("主题名称:" + topicName);
// for (PartitionInfo partitionInfo : partitionInfos) {
// System.out.println(partitionInfo);
// }
// }
// });// final Set<TopicPartition> assignment1 = consumer.assignment();
//
// for (TopicPartition partition : assignment1) {
// System.out.println(partition);
// }
// System.out.println("----------------------------");// 给当前消费者分配指定的主题分区consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0),new TopicPartition("tp_demo_01", 1),new TopicPartition("tp_demo_01", 2)));// 获取给当前消费者分配的主题分区信息
// final Set<TopicPartition> assignment = consumer.assignment();
//
// for (TopicPartition partition : assignment) {
// System.out.println(partition);
// }// 查看当前消费者在指定主题的分区上的消费者偏移量
// final long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
//
// System.out.println("当前主题在0号分区上的位移:" + offset0);// consumer.seekToBeginning(Arrays.asList(
// new TopicPartition("tp_demo_01", 0),
// new TopicPartition("tp_demo_01", 2)
// ));long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));long offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));long offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));System.out.println(offset0);System.out.println(offset1);System.out.println(offset2);// consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 2)));consumer.seek(new TopicPartition("tp_demo_01", 2), 14);offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));System.out.println(offset0);System.out.println(offset1);System.out.println(offset2);consumer.close();}
}
### --- 编译打印MyOffsetManagerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=57956:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-01-consumerOffsetMgr\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.MyOffsetManager
# 给当前消费者分配指定的主题分区
20
20
20
20
20
14
### --- 获取某个分区的所有消息:MyConsumerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=63391:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-01-consumerOffsetMgr\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.MyConsumer
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 14, CreateTime = 1632169885451, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 45)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 15, CreateTime = 1632169885452, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 48)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 16, CreateTime = 1632169885452, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 51)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 17, CreateTime = 1632169885452, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 54)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 18, CreateTime = 1632169885453, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 57)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 19, CreateTime = 1632169885453, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 60)
CC00034.kafka——|Hadoopkafka.V19|——|kafka.v19|消费者位移管理.v02|相关推荐
- CC00038.kafka——|Hadoopkafka.V23|——|kafka.v23|消费者拦截器参数配置|
一.消费者拦截器参数配置:消费者参数配置补充 配置项 说明 bootstrap.servers 建立到Kafka集群的初始连接用到的host/port列表. 客户端会使用这里指定的所有的host/po ...
- CC00073.kafka——|Hadoopkafka.V58|——|kafka.v58|稳定性|事务操作|
一.事务操作 ### --- 事务操作~~~ # 在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况.情况如下: ~~~ 只有Producer生产消息,这种场景需要事务的介入: ~~~ ...
- CC00042.kafka——|Hadoopkafka.V27|——|kafka.v27|主题管理.v02|
一.修改主题 ### --- 为topic_x加入segment.bytes配置[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/ ...
- CC00065.kafka——|Hadoopkafka.V50|——|kafka.v50|日志清理|
一.日志压缩策略 ### --- 概念~~~ 日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留, ~~~ 而不是基于粗粒度的基于时间的保留. ~~~ 对于具有相同的Key,而数据不同,只保 ...
- CC00060.kafka——|Hadoopkafka.V45|——|kafka.v45|日志存储概述|
一.日志存储概述 ### --- 日志存储概述~~~ Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响. ~~~ 每个主题又可以分为一个或多个分区. ~~~ 每个分区各自存在 ...
- 7.【kafka运维】 kafka-consumer-groups.sh消费者组管理
文章目录 消费者组管理 kafka-consumer-groups.sh 1. 查看消费者列表`--list` 2. 查看消费者组详情`--describe` 3. 删除消费者组`--delete` ...
- Kafka 的 Java 消费者如何管理 TCP 连接?
何时创建 TCP 连接? 和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的.再细粒度 ...
- kafka partition java,kafka中partition数量与消费者对应关系以及Java实践
kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...
- Kafka系列 - 14 Kafka消费者|分区的分配策略及再平衡|Range|RoundRobin|Sticky|CooperativeSticky
文章目录 1. 分区的分配以及再平衡 2. Range 分区分配以及再平衡 3. RoundRobin 分区分配以及再平衡 4. Sticky 分区分配以及再平衡 1. 分区的分配以及再平衡 一个co ...
最新文章
- C#里partial关键字的作用(转摘)
- 为什么 C++ 中提倡尽量避免使用宏 #define(转)
- 医疗器械软件网络安全法规和标准概述(附所有标准)
- 性能优化CPU、内存、磁盘I/O、网络性能相关命令
- vim学习(2)小幅提升
- java controller json_Controller 获取 JSON
- -le: unary operator expected_【AFM+STM-LE】超经典:研究单分子化学反应引起的光发射ACS Nano...
- ubuntu中实践操作系统第二章系统调用与课件不同之处
- PAT (Basic Level) Practice (中文)答案合集
- 搭建LINUX BIND实现DNS解析
- 8086汇编语言显示一串字符串中ASCII码最大的一个字符
- R爬虫小白入门:Rvest爬链家网+分析(一)
- 中国人工智能(AI)发展历程、AI产业重点发展区域、重点发展城市及中国AI产业地区发展总结及展望
- 2014 usnews 计算机科学 排名 天道留学,2014年USNews美国公立大学排名
- DaSiamRPN、SiamRPN++论文阅读
- 更改itunes备份路径【windows备份iphone数据】
- K8S报error: You must be logged in to the server错误
- java的serialization_Java序列化(Serialization) 机制
- 微信做音乐相册html5,如何制作微信音乐相册 微信音乐相册制作软件的精品教程...
- 黑马培训---分享点干货二 IOS面试非技术性问题