Flink提供了FlinkKafkaConsumer08,使用Kafka的High-level接口,从Kafka中读取指定Topic的数据,如果要从多个Topic读取数据,可以如下操作:

1.application.conf中配置

如果使用了配置管理库typesafe.config,可以在其application.conf按如下方式配置List类型的元素:

myToicList:["t1","t2","t3"]

2.读取配置文件

object MyFlinkConfig {import com.typesafe.config.{ Config, ConfigFactory }import net.ceedubs.ficus.Ficus._def apply(): MyFlinkConfig = apply(ConfigFactory.load)def apply(applicationConfig: Config): MyFlinkConfig = {val config = applicationConfig.getConfig("MyFlinkConfig")new MyFlinkConfig (config.as[List[String]]("myTopicList"))}
}case class MyFlinkConfig (myTopicList: List[String]) extends Serializable {}

3.读取多个Topic

因为FlinkKafkaConsumer08使用Java实现的,而MyFlinkConfig 中的List是Scala的List,所以要将Scala的List转为Java的List

val config =MyFlinkConfig()
import scala.collection.JavaConversions._
val kafkaConsumer=new FlinkKafkaConsumer08[MonitorDataRecord](config.myTopicList, new SimpleStringSchema(), kafkaProps)

4.遇到的问题

4.1 如果要读取的Topic不存在,则应用程序直接报错,因此Topic在配置文件中配置时一定要正确

4.2 如果要读取的Topic列表中,其中一个在Kafka中没有数据,而你又基于Event Time提取Timestamp并且设置Watermark,会导致整个Topic列表都没法基于时间窗口触发操作,解决方案:

先rebalance,然后再设置水位:

    val monitorSampling = env
      .addSource(kafkaConsumer).rebalance.assignTimestampsAndWatermarks(new MyWatermarkGenerator[MyRecord](Time.seconds(config.latencyDuration)))

转载于:https://www.cnblogs.com/liugh/p/7479515.html

Flink从Kafka 0.8中读取多个Topic时的问题相关推荐

  1. Structured Streaming从Kafka 0.8中读取数据的问题

    众所周知,Structured Streaming默认支持Kafka 0.10,没有提供针对Kafka 0.8的Connector,但这对高手来说不是事儿,于是有个Hortonworks的邵大牛(前段 ...

  2. Flink使用KafkaSource从Kafka消息队列中读取数据

    Flink使用KafkaSource从Kafka消息队列中读取数据 使用KafkaSource从Kafka消息队列中读取数据 1.KafkaSource创建的DataStream是一个并行的DataS ...

  3. sublime python3中读取和写入文件时如何解决编码问题

    sublime python3中读取和写入文件时如何解决编码问题 参考文章: (1)sublime python3中读取和写入文件时如何解决编码问题 (2)https://www.cnblogs.co ...

  4. python读文件缺key_在Python中读取Twitter json文件时出现KeyErrors

    我试图用我从twitter收集的数据分析一个json文件,但是当我试图搜索一个关键字时,它说找不到,但是我可以看到它在那里.我试了两种不同的方法.我会把它们贴在下面.任何建议都很好.在 尝试1:imp ...

  5. mysql 8.0.11 中使用 grant ... identified by 时 error 1064 near 'identified by '密码'' at line 1

    1 问题: 当使用 grant 权限列表 on 数据库 to '用户名'@'访问主机' identified by '密码'; 时会出现"......near 'identified by ...

  6. 5.Flink对接Kafka入门

    Flink Connector Kafka 1. Kafka 1.1. [Kafka官网](http://kafka.apache.org/) 1.2. Kafka 简述 1.3. Kafka特性 1 ...

  7. c++怎么可以在二进制文件中读取带string的数据_文件处理 | csv文件读写

    欢迎关注公众号 学习资料不会少 文件处理 在我们做自动化测试的过程中,常常会将数据文件存放在csv或者Excel文件里边.这一章节内容将给大家介绍,如何使用python进行csv和Excel文件的处理 ...

  8. .net Kafka.Client多个Consumer Group对Topic消费不能完全覆盖研究总结(一)

    我们知道Kafka支持Consumer Group的功能,但是最近在应用Consumer Group时发现了一个Topic 的Partition不能100%覆盖的问题. 程序部署后,发现Kafka在p ...

  9. 使用Flink时从Kafka中读取Array[Byte]类型的Schema

    使用Flink时,如果从Kafka中读取输入流,默认提供的是String类型的Schema: val myConsumer = new FlinkKafkaConsumer08[String](&qu ...

最新文章

  1. 2022-2028年中国锂电材料产业投资分析及前景预测报告
  2. mysql利用merge存储引擎分表的方法
  3. 开发日记-20190918 关键词 努力
  4. JSON对象和JSON 字符串之间的相互转换
  5. css 竖行进度图_前端学习--汇集了大量 CSS 的使用和学习的示例代码
  6. matlab 大括号
  7. Android学习之调用系统相机实现拍照功能
  8. oracle创建目录的命令,使用create database命令手工创建Oracle数据库
  9. 执​行​o​r​a​c​l​e​函​数​的​四​种​方​法
  10. JNI新旧两种方式不冲突,可以共存
  11. NETAPP存储常用巡检命令
  12. BZOJ 1984: 月下“毛景树” [树链剖分 边权]
  13. 设置低电平有效,即取反
  14. i标签和em标签的区别
  15. 300万数据导入导出优化方案,从80s优化到8s(实测)
  16. Finder教程|如何自定义访达工具栏?
  17. bandit game
  18. Linux vim/vi下backspace(退格键)出现^? 或^H
  19. chatgpt的一些思考
  20. 金仓监控软件kmonitor安装成功后打开监控界面,监控不到实例

热门文章

  1. oracle数据库实现不同数据库之间的表格数据定期同步
  2. IntelliJ IDEA常用统一设置(Linux/Mac/Windows)
  3. 人脸识别如何在大型银行中大规模商用?
  4. 2016年光伏系统成本将持续下降
  5. 《Photoshop Lab修色圣典(修订版)》—第1课1.4节逐步校正峡谷图像
  6. VC 下 64bit 整数的显示和读取格式化字串
  7. 实现基于Spring框架应用的权限控制系统(转)
  8. ISA2006标准版配置导入企业版
  9. php中空心字体怎么打,php打印一个边长为N的实心和空心菱型的方法
  10. 5.3.1 TCP协议特点和TCP报文段格式