2019独角兽企业重金招聘Python工程师标准>>>

public class KafkaConsumer2 {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("group.id", "one");props.put("enable.auto.commit", true);props.put("auto.commit.interval.ms", 5000);props.put("session.timeout.ms", 50000);props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//设置使用最开始的offset偏移量为该group.id的最早。如果不设置,则会是latest即该topic最新一个消息的offset//如果采用latest,消费者只能得道其启动后,生产者生产的消息
//        props.put("auto.offset.reset", "earliest");props.put("auto.offset.reset", "latest");KafkaConsumer<String, String> consumer =  new KafkaConsumer(props);try {String topicName = "mykafka";//重置offset,可同时设置多个topic和partitionMap<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(2));consumer.commitSync(hashMaps);//启动订阅consumer.subscribe(Arrays.asList(topicName));while (true) {ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);for (ConsumerRecord<String, String> record : records) {String value = record.value();long offset = record.offset();int partition = record.partition();String topic = record.topic();String f="-------->>>topic=%s,offset=%s,partition=%s,value=%s";System.out.println(String.format(f,topic,offset,partition,value));}consumer.commitSync();}} catch (Exception e) {e.printStackTrace();}finally {if(consumer != null){consumer.close();}}}
}
public class KafkaProudcer2 {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "127.0.0.1:9092");props.put("retries", 3);props.put("linger.ms", 1);props.put("key.serializer",   "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer",  "org.apache.kafka.common.serialization.StringSerializer");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);while (true){String topicName = "mykafka";ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, System.currentTimeMillis()+"", "今天天气不错哟yoyo=======>");producer.send(record, new Callback() {public void onCompletion(RecordMetadata metadata, Exception e) {if (e != null)System.out.println("the producer has a error:" + e.getMessage());else {String f="----------->>>partition=%s,offset=%s";System.out.println(String.format(f,metadata.partition(),metadata.offset()));}}});try {Thread.sleep(1000);} catch (InterruptedException e1) {e1.printStackTrace();}}// producer.close();}
}

转载于:https://my.oschina.net/u/1159254/blog/1936613

Kafka消费者重置offset读取数据相关推荐

  1. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  2. Structured Streaming从Kafka 0.8中读取数据的问题

    众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...

  3. kafka实际应用—>读取数据,并用java实现业务逻辑“行转列”

    kafka实际应用--读取数据,并用java实现业务逻辑"行转列" 一.业务需求 二.业务实现 2.1 kafka中创建topic: event_attendees_raw 2.2 ...

  4. 实用 | 从Apache Kafka到Apache Spark安全读取数据

    引言 随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要.本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两 ...

  5. kafka监听topic消费_Kafka消费者-从Kafka读取数据

    (1)Customer和Customer Group (1)两种常用的消息模型 队列模型(queuing)和发布-订阅模型(publish-subscribe). 队列的处理方式是一组消费者从服务器读 ...

  6. kafka消费者参数详解 java读取不到消费者数据

    程序运行中,生产者可以成功生产数据,消费者却一直拿不到存储的数据,运行消费者命令:kafka-console-consumer --bootstrap-server 127.0.0.1:9092 -- ...

  7. Kafka消费者——从 Kafka读取数据

    应用程序使用 KafkaConsumer向 Kafka 订阅主题,并从订阅的主题上接收消息 . 从 Kafka 读取数据不同于从其他悄息系统读取数据,它涉及一些独特的概念和想法.如果不先理解 这些概念 ...

  8. 大数据技术之kafka (第 3 章 Kafka 架构深入 ) offset讲解

    新版的 Kafka 使用一个选举出来的 controller 来监听 zookeeper,其他 node 再去和 controller 通信,这么做的目的是为了减少 zookeeper 的压力.boo ...

  9. 大数据技术之 Kafka (第 3 章 Kafka 架构深入 ) Kafka 消费者

    3.3.1 消费方式 consumer 采用 pull(拉)模式从 broker 中读取数据.  push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的. 它的目标 ...

最新文章

  1. 达达真人漫画秀,微博演绎男版杜拉拉生存记
  2. 网站推广专员浅析网站推广运营如何提升企业网站转化率?
  3. python判断是不是整数的命令_介绍python判断一个数是不是正小数和整数的方法
  4. 10、自学——Linux的学习进度与任务【用户和用户组相关操作】
  5. 原来医生的处方不是随便乱写的...
  6. HDFS查看文件的前几行-后几行-行数
  7. Process finished with exit code 0 报错解决方法
  8. 模板(范型)的安全数组C++代码
  9. explain mysql执行顺序_面试前必须知道的MySQL命令【explain】
  10. HDU 4791 Alice's Print Service
  11. symbol是c语言标识符,symbol的理解
  12. MPEG-2压缩编码的视频基本流
  13. 喜欢计算机专业的理由英语作文,计算机专业英文自我评价范文
  14. 刷脸支付互联网巨头纷纷从线上走到线下
  15. Android——待办事项(ToDoList)
  16. HTTP协议详解以及URL具体访问过程
  17. Inversion of Java Interview - 计算机网络篇
  18. java75-GUL文本框和标签
  19. ThinkSNS安装手记
  20. 双耳节拍 枕头_枕头自行运行

热门文章

  1. python读写json文件
  2. SAP常用BASIS技巧整理
  3. 大数据-07-Spark之流数据
  4. Python学习笔记一简介及安装配置
  5. 《响应式Web设计:HTML5和CSS3实践指南》——2.9节基于位置伪类的交替行样式
  6. 电影天堂React Native 客户端
  7. 如何做好网站开发项目需求分析(转)
  8. JNI/NDK开发指南(八)——调用构造方法和父类实例方法
  9. (转载)一种根据纠偏数据对火星坐标进行完美拟合的方法
  10. native2ascii用法