一、消费者位移管理数据准备
### --- 准备数据~~~     # 生成消息文件
[root@hadoop ~]# for i in `seq 60`; do echo "hello yanqi $i" >> nm.txt; done

~~~     # 创建主题,三个分区,每个分区一个副本
[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create \
--topic tp_demo_01 --partitions 3 --replication-factor 1~~~     # 查看创建的主题
[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list
tp_demo_01

~~~     # 将消息生产到主题中
[root@hadoop ~]# kafka-console-producer.sh --broker-list localhost:9092 --topic \
tp_demo_01 < nm.txt
~~~输出参数
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>~~~     # 查看里面的消息
[root@hadoop ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic tp_demo_01 --from-beginning
hello yanqi 2
~~~省略部分参数
hello yanqi 60

二、创建一个maven项目:demo-09-kafka-consumerInterceptors
### --- 添加pom.xml依赖<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.2</version></dependency></dependencies>

三、消费者位移管理API实现
### --- API实战:MyConsumerpackage com.yanqi.kafka.demo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;public class MyConsumer {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// group.id很重要configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);consumer.subscribe(Arrays.asList("tp_demo_01"));while (true) {final ConsumerRecords<String, String> records = consumer.poll(1_000);records.forEach(new Consumer<ConsumerRecord<String, String>>() {@Overridepublic void accept(ConsumerRecord<String, String> record) {System.out.println(record);}});}}
}

### --- MyOffsetManagerpackage com.yanqi.kafka.demo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.util.*;
import java.util.function.BiConsumer;public class MyOffsetManager {public static void main(String[] args) {Map<String, Object> configs = new HashMap<>();configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);// group.id很重要configs.put(ConsumerConfig.GROUP_ID_CONFIG, "mygrp1");KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(configs);//        consumer.subscribe(Collections.singleton("tp_demo_01"));// 如何手动给消费者分配分区?// 1、需要知道有哪些主题可以访问,和消费// 获取当前消费者可以访问和消费的主题以及它们的分区信息
//        final Map<String, List<PartitionInfo>> stringListMap = consumer.listTopics();
//
//        stringListMap.forEach(new BiConsumer<String, List<PartitionInfo>>() {
//            @Override
//            public void accept(String topicName, List<PartitionInfo> partitionInfos) {
//                System.out.println("主题名称:" + topicName);
//                for (PartitionInfo partitionInfo : partitionInfos) {
//                    System.out.println(partitionInfo);
//                }
//            }
//        });//        final Set<TopicPartition> assignment1 = consumer.assignment();
//
//        for (TopicPartition partition : assignment1) {
//            System.out.println(partition);
//        }
//        System.out.println("----------------------------");// 给当前消费者分配指定的主题分区consumer.assign(Arrays.asList(new TopicPartition("tp_demo_01", 0),new TopicPartition("tp_demo_01", 1),new TopicPartition("tp_demo_01", 2)));// 获取给当前消费者分配的主题分区信息
//        final Set<TopicPartition> assignment = consumer.assignment();
//
//        for (TopicPartition partition : assignment) {
//            System.out.println(partition);
//        }// 查看当前消费者在指定主题的分区上的消费者偏移量
//        final long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));
//
//        System.out.println("当前主题在0号分区上的位移:" + offset0);//        consumer.seekToBeginning(Arrays.asList(
//                new TopicPartition("tp_demo_01", 0),
//                new TopicPartition("tp_demo_01", 2)
//        ));long offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));long offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));long offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));System.out.println(offset0);System.out.println(offset1);System.out.println(offset2);//        consumer.seekToEnd(Arrays.asList(new TopicPartition("tp_demo_01", 2)));consumer.seek(new TopicPartition("tp_demo_01", 2), 14);offset0 = consumer.position(new TopicPartition("tp_demo_01", 0));offset1 = consumer.position(new TopicPartition("tp_demo_01", 1));offset2 = consumer.position(new TopicPartition("tp_demo_01", 2));System.out.println(offset0);System.out.println(offset1);System.out.println(offset2);consumer.close();}
}

四、编译打印
### --- 编译打印MyOffsetManagerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=57956:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-01-consumerOffsetMgr\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.MyOffsetManager
# 给当前消费者分配指定的主题分区
20
20
20
20
20
14

### --- 获取某个分区的所有消息:MyConsumerD:\JAVA\jdk1.8.0_231\bin\java.exe "-javaagent:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\lib\idea_rt.jar=63391:D:\IntelliJIDEA\IntelliJ IDEA 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath D:\JAVA\jdk1.8.0_231\jre\lib\charsets.jar;D:\JAVA\jdk1.8.0_231\jre\lib\deploy.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\access-bridge-64.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\cldrdata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\dnsns.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jaccess.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\jfxrt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\localedata.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\nashorn.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunec.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunjce_provider.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunmscapi.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\sunpkcs11.jar;D:\JAVA\jdk1.8.0_231\jre\lib\ext\zipfs.jar;D:\JAVA\jdk1.8.0_231\jre\lib\javaws.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jce.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfr.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jfxswt.jar;D:\JAVA\jdk1.8.0_231\jre\lib\jsse.jar;D:\JAVA\jdk1.8.0_231\jre\lib\management-agent.jar;D:\JAVA\jdk1.8.0_231\jre\lib\plugin.jar;D:\JAVA\jdk1.8.0_231\jre\lib\resources.jar;D:\JAVA\jdk1.8.0_231\jre\lib\rt.jar;E:\NO.Z.10000——javaproject\NO.Z.00002.Hadoop\kafka_demo\demo-01-consumerOffsetMgr\target\classes;C:\Users\Administrator\.m2\repository\org\apache\kafka\kafka-clients\1.0.2\kafka-clients-1.0.2.jar;C:\Users\Administrator\.m2\repository\org\lz4\lz4-java\1.4\lz4-java-1.4.jar;C:\Users\Administrator\.m2\repository\org\xerial\snappy\snappy-java\1.1.4\snappy-java-1.1.4.jar;C:\Users\Administrator\.m2\repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar com.yanqi.kafka.demo.MyConsumer
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 14, CreateTime = 1632169885451, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 45)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 15, CreateTime = 1632169885452, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 48)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 16, CreateTime = 1632169885452, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 51)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 17, CreateTime = 1632169885452, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 54)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 18, CreateTime = 1632169885453, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 57)
ConsumerRecord(topic = tp_demo_01, partition = 2, offset = 19, CreateTime = 1632169885453, serialized key size = -1, serialized value size = 14, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello yanqi 60)

CC00034.kafka——|Hadoopkafka.V19|——|kafka.v19|消费者位移管理.v02|相关推荐

  1. CC00038.kafka——|Hadoopkafka.V23|——|kafka.v23|消费者拦截器参数配置|

    一.消费者拦截器参数配置:消费者参数配置补充 配置项 说明 bootstrap.servers 建立到Kafka集群的初始连接用到的host/port列表. 客户端会使用这里指定的所有的host/po ...

  2. CC00073.kafka——|Hadoopkafka.V58|——|kafka.v58|稳定性|事务操作|

    一.事务操作 ### --- 事务操作~~~ # 在Kafka事务中,一个原子性操作,根据操作类型可以分为3种情况.情况如下: ~~~ 只有Producer生产消息,这种场景需要事务的介入: ~~~ ...

  3. CC00042.kafka——|Hadoopkafka.V27|——|kafka.v27|主题管理.v02|

    一.修改主题 ### --- 为topic_x加入segment.bytes配置[root@hadoop ~]# kafka-topics.sh --zookeeper localhost:2181/ ...

  4. CC00065.kafka——|Hadoopkafka.V50|——|kafka.v50|日志清理|

    一.日志压缩策略 ### --- 概念~~~ 日志压缩是Kafka的一种机制,可以提供较为细粒度的记录保留, ~~~ 而不是基于粗粒度的基于时间的保留. ~~~ 对于具有相同的Key,而数据不同,只保 ...

  5. CC00060.kafka——|Hadoopkafka.V45|——|kafka.v45|日志存储概述|

    一.日志存储概述 ### --- 日志存储概述~~~ Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响. ~~~ 每个主题又可以分为一个或多个分区. ~~~ 每个分区各自存在 ...

  6. 7.【kafka运维】 kafka-consumer-groups.sh消费者组管理

    文章目录 消费者组管理 kafka-consumer-groups.sh 1. 查看消费者列表`--list` 2. 查看消费者组详情`--describe` 3. 删除消费者组`--delete` ...

  7. Kafka 的 Java 消费者如何管理 TCP 连接?

    何时创建 TCP 连接? 和生产者不同的是,构建 KafkaConsumer 实例时是不会创建任何 TCP 连接的,TCP 连接是在调用 KafkaConsumer.poll 方法时被创建的.再细粒度 ...

  8. kafka partition java,kafka中partition数量与消费者对应关系以及Java实践

    kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...

  9. Kafka系列 - 14 Kafka消费者|分区的分配策略及再平衡|Range|RoundRobin|Sticky|CooperativeSticky

    文章目录 1. 分区的分配以及再平衡 2. Range 分区分配以及再平衡 3. RoundRobin 分区分配以及再平衡 4. Sticky 分区分配以及再平衡 1. 分区的分配以及再平衡 一个co ...

最新文章

  1. C#里partial关键字的作用(转摘)
  2. 为什么 C++ 中提倡尽量避免使用宏 #define(转)
  3. 医疗器械软件网络安全法规和标准概述(附所有标准)
  4. 性能优化CPU、内存、磁盘I/O、网络性能相关命令
  5. vim学习(2)小幅提升
  6. java controller json_Controller 获取 JSON
  7. -le: unary operator expected_【AFM+STM-LE】超经典:研究单分子化学反应引起的光发射ACS Nano...
  8. ubuntu中实践操作系统第二章系统调用与课件不同之处
  9. PAT (Basic Level) Practice (中文)答案合集
  10. 搭建LINUX BIND实现DNS解析
  11. 8086汇编语言显示一串字符串中ASCII码最大的一个字符
  12. R爬虫小白入门:Rvest爬链家网+分析(一)
  13. 中国人工智能(AI)发展历程、AI产业重点发展区域、重点发展城市及中国AI产业地区发展总结及展望
  14. 2014 usnews 计算机科学 排名 天道留学,2014年USNews美国公立大学排名
  15. DaSiamRPN、SiamRPN++论文阅读
  16. 更改itunes备份路径【windows备份iphone数据】
  17. K8S报error: You must be logged in to the server错误
  18. java的serialization_Java序列化(Serialization) 机制
  19. 微信做音乐相册html5,如何制作微信音乐相册 微信音乐相册制作软件的精品教程...
  20. 黑马培训---分享点干货二 IOS面试非技术性问题

热门文章

  1. 目标检测模型YOLO-V1损失函数详解
  2. PixiJS - HTML5 创作引擎
  3. 【数据库】数据库设计
  4. C语言之二分法解非线性方程
  5. 当企业衡量和评估私有云和公有云时,需要考虑哪些核心因素?
  6. Vector3的使用
  7. 原生h5+css3 实现简单视频播放器组件
  8. 拷贝数变异(Copy number variation, CNV)分析简介
  9. Linux安装Nexus(图文解说详细版)
  10. 【软件工程基础复习整理】第五章概要设计(3) 面向数据流图的软件结构设计