val helloStream: FlinkKafkaConsumer011[String] = new FlinkKafkaConsumer011[String]("hello", valueDeserializer, kafkaProps)  //  指定消费策略  helloStream.setStartFromEarliest() // - 从最早的记录开始;  helloStream.setStartFromLatest() //- 从最新记录开始;  helloStream.setStartFromTimestamp(null); // 从指定的epoch时间戳(毫秒)开始;  helloStream.setStartFromGroupOffsets(); // 默认行为,从上次消费的偏移量进行继续消费。

  import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition  val specificStartOffsets = new mutable.HashMap[KafkaTopicPartition,Long]()  specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L) // 第一个分区从23L开始  specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L) // 第二个分区从31L开始  specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L) // 第三个分区从43L开始  helloStream.setStartFromSpecificOffsets(specificStartOffsets)

//  Kafka支持Topic自动发现,也就是用正则的方式创建FlinkKafkaConsumer

转载于:https://www.cnblogs.com/maoxiangyi/p/10912274.html

Flink Kafka consumer的消费策略配置相关推荐

  1. kafka consumer 停止消费topic

    现象 在kafka consumer (以 kafka1.0.0为例)消费 topic 时,常常会出现程序还在运行,但是已经不消费消息了(kafka producer正常生产消息),使用kafka命令 ...

  2. kafka消费者分区消费策略

    前言 在上一篇,我们谈到了从生产者一端,kafka是基于何种策略,将消息推送到集群下topic的不同分区中,可以使用kafka自带的分区策略,也可以根据自身的业务定制消息推送的分区策略 而从消费者一端 ...

  3. kafka 消费者的消费策略以及再平衡

    一 kafka的消费策略 1 .一个 consumer group 中有多个 consumer 组成,一个 topic 有多个 partition 组成,现在的问题是, 到底由哪个 consumer ...

  4. Kafka Consumer多线程消费

    概述 OrdinaryConsumer类 ConsumerWorker.java MultiThreadedConsumer.java MultiThreadedRebalanceListener.j ...

  5. java kafka consumer不消费,报错marking the coordinator (id rack null) dead for group

    问题描述:在linux系统,通过 kafka 命令行客户端测试消费正常,但通过Java consumer客户端无法正常接收队列消息,启动后输出如下日志信息: 15:21:34.864 [concurr ...

  6. Flink Kafka

    1.Flink读取kafka策略 读取kafka策略有 org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients ...

  7. flink kafka addSource(comsumer ) 源码学习笔记

    addsource 其中function存的是FlinkKafkaConsumer对象 public <OUT> DataStreamSource<OUT> addSource ...

  8. kafka控制台模拟消费_Flink初试——对接Kafka

    本篇文章我们用 Flink Kafka Connector对接Kafka,实现一个简单的报警业务.我们暂时不去谈论理论,先上手实现这个简单的需求. flink-connector-kafka是 fli ...

  9. Apache Kafka Consumer 消费者集

    1.目标 在我们的上一篇文章中,我们讨论了Kafka Producer.今天,我们将讨论Kafka Consumer.首先,我们将看到什么是Kafka Consumer和Kafka Consumer的 ...

最新文章

  1. ubuntu下codeblocks起步
  2. python字典实现关键字检索_如何实现搜索框的关键词提示功能
  3. Hacked VisualSVN Server by PHP to allow user change password
  4. 【MyBatis】MyBatis对Log4J的支持、MyBatis实现新增、删除、修改、查询
  5. 线程并发编程之线程锁
  6. win10无限重启_win10系统安装无限循环如何解决_win10教程
  7. mysql索引的增删_mysql索引的增删改查怎么实现?
  8. 动画分析步骤“三步曲”
  9. 数据结构学习笔记:变位词侦测案例
  10. 微课|中学生可以这样学Python(例7.2):三维向量类
  11. tensorflow测量工具的使用
  12. 使用selenium搭建网站自动化测试框架及selenium简介
  13. 四种模式、五大架构 规划企业物联网蓝图
  14. OpenCV总结——convertTo函数与浮点数类型
  15. 小红书引流软件有哪些
  16. ios服务器需要开启ipv6的支持,关于ios苹果APP审核 支持IPv6的问题解答
  17. 传统武式太极拳练习五阶段
  18. 那一年,他还在为了今日的成就而奋力打拼
  19. 详解准确率acc、精确率p、准确率acc、F1 score
  20. 关于Cookie和Session的一些疑惑和猜测

热门文章

  1. pip sintall pyspider 报错:ERROR: Command errored out with exit status 10
  2. REVERSE-PRACTICE-BUUCTF-26
  3. 【Tyvj - 1305】最大子序和(单调队列优化dp)
  4. 【51Nod - 1344】走格子 (思维)
  5. linux远程打开windows程序,为新手讲解Linux和Windows系统的远程桌面访问知识
  6. bash: pcre-config: 未找到命令..._Docker 常用操作命令
  7. 40029错误{“errcode“:40029,“errmsg“:“invalid code, rid: 623bbdcd-3c97f4af-5a2c06d6“}
  8. 获取数组中元素值为偶数的累加和与元素值为奇数的累加和,并计算他们之间的差值
  9. html bootstrap复选框全选,javascript+bootstrap+html实现层级多选框全层全选和多选功能代码实例...
  10. php位值,php中,如何取得一个整型值的低位和高位值?