当使用KTable时,当实例/使用者数等于分区数时,Kafka流不允许实例从特定主题的多个分区中读取.我尝试使用GlobalKTable来实现这一点,但问题是数据将被覆盖,并且聚合也无法应用于其上.

假设我有一个名为“ data_in”的主题,具有3个分区(P1,P2,P3).当我运行Kafka流应用程序的3个实例(I1,I2,I3)时,我希望每个实例都从“ data_in”的所有分区中读取数据.我的意思是,I1可以从P1,P2和P3读取,I2可以从P1,P2和P3,I2以及其他方式读取.

编辑:请记住,生产者可以将两个相似的ID发布到“ data_in”中的两个不同分区中.因此,当运行两个不同的实例时,GlobalKtable将被覆盖.

拜托,如何实现这一目标?这是我代码的一部分

private KTable globalStream() {

// KStream of records from data-in topic using String and theDataSerde deserializers

KStream trashStream = getBuilder().stream("data_in",Consumed.with(Serdes.String(), SerDes.theDataSerde));

// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)

KGroupedStream KGS = trashStream.groupByKey();

Materialized> materialized = Materialized.as("agg-stream-store");

materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable

return KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {

if (!value.getValideData())

aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());

else

aggregate.getList().add(value);

return aggregate;

}, materialized);

}

解决方法:

将输入主题“ data_in”的分区数更改为1个分区,或者使用GlobalKtable从主题中所有分区获取数据,然后可以将其加入流.这样一来,您的应用实例将不再需要位于不同的使用者组中.

该代码将如下所示:

private GlobalKTable globalStream() {

// KStream of records from data-in topic using String and theDataSerde deserializers

KStream trashStream = getBuilder().stream("data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

thrashStream.to("new_data_in"); // by sending to an other topic you're forcing a repartition on that topic

KStream newTrashStream = getBuilder().stream("new_data_in", Consumed.with(Serdes.String(), SerDes.theDataSerde));

// Apply an aggregation operation on the original KStream records using an intermediate representation of a KStream (KGroupedStream)

KGroupedStream KGS = newTrashStream.groupByKey();

Materialized> materialized = Materialized.as("agg-stream-store");

materialized = materialized.withValueSerde(SerDes.theDataDataListSerde);

// Return a KTable

KGS.aggregate(() -> new theDataList(), (key, value, aggregate) -> {

if (!value.getValideData())

aggregate.getList().removeIf((t) -> t.getTimestamp() <= value.getTimestamp());

else

aggregate.getList().add(value);

return aggregate;

}, materialized)

.to("agg_data_in");

return getBuilder().globalTable("agg_data_in");

}

编辑:我编辑了上面的代码,以强制对名为“ new_data_in”的主题进行重新分区.

标签:apache-kafka-streams,java,apache-kafka,partitioning

来源: https://codeday.me/bug/20191009/1877602.html

kafka读写 java_java-Kafka流:从应用程序的每个实例中的所有分区读取相关推荐

  1. kafka读写速度快的原因

    KAFKA是分布式发布-订阅消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务.它主要用于处理活跃的流式数据. 现在被广泛地应用于构建实时数据管道和流应用的场景中,具有横向扩展,容错,快等 ...

  2. Kafka Streams简介: 让流处理变得更简单

    Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Ka ...

  3. java 连接oracle_「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  4. 「事件驱动架构」使用GoldenGate创建从Oracle到Kafka的CDC事件流

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  5. oracle一列中间加一个字_「首席看架构」用GoldenGate创建从Oracle到Kafka的CDC事件流...

    我们通过GoldenGate技术在Oracle DB和Kafka代理之间创建集成,该技术实时发布Kafka中的CDC事件流. Oracle在其Oracle GoldenGate for Big Dat ...

  6. Kafka 3.0 源码笔记(3)-Kafka 消费者的核心流程源码分析

    文章目录 前言 1. 消费者负载均衡的实现 2. 源码分析 2.1 KafkaConsumer 的初始化 2.2 KafkaConsumer 的消息拉取 2.2.1 消息拉取的准备及入口 2.2.2 ...

  7. Kafka读写机制深度剖析

    目录 01. 概述 02. Kafka架构 2.1 Topic & partition 03. 高可靠性分析 3.1. Kafka文件存储机制 3.2 复制原理和同步方式 3.3 ISR 3. ...

  8. Flink读写系列之-读Kafka并写入Kafka

    读写Kafka比较简单,官方提供了connector,也提供了例子可以参看,官网例子的GitHub地址: https://github.com/apache/flink/tree/master/fli ...

  9. Kafka学习笔记: Kafka 百惑梳理

    1. 消息经常堆积起来,不能消费了,重启服务就能继续消费了. 消息堆积可能原因如下:1. 生产速度大于消费速度,这样可以适当增加分区,增加consumer数量,提升消费TPS:2. consumer消 ...

最新文章

  1. git生成ssh keys步骤与使用
  2. Latex使用简单总结
  3. 志澄观察:卫星互联网——太空经济新动力
  4. 微软重新开源 MS-DOS 1.25/2.0:已诞生 36 年
  5. DL之NN:NN算法(本地数据集50000张训练集图片)进阶优化之三种参数改进,进一步提高手写数字图片识别的准确率
  6. VIDEOIO ERROR: V4L: can't open camera by index 0
  7. Android 取消返回键返回事件,返回桌面,再按一次退出程序,双击事件
  8. 190729知识笔记
  9. HDU 5752 Sqrt Bo【枚举,大水题】
  10. 影响 5000 万开发者,GitHub 与 CSDN 掌舵人对话技术社区未来
  11. safari 浏览器提示添加到主屏幕_Safari浏览器的秘密技能
  12. kvm虚拟机vnc和spice配置
  13. Java代码:调用外部接口(使用Json格式传递参数)的方法
  14. Python操作IHTMLDocument2用于自动化测试
  15. Monkey 命令 基本参数介绍
  16. 多多计算机分屏版本怎么使用,电脑双屏幕怎么设置_电脑分屏怎么设置方法
  17. Beetl的基本用法
  18. js利用点击事件更换皮肤
  19. windows mysql 开启非3306端口
  20. WiFi底层通信接口@Netlink

热门文章

  1. android onpreviewframe保存mp4_无需第三方工具!教你如何保存抖音完整版视频
  2. ceil和floor
  3. 问题 F: 分盒子(经典)
  4. 细说双 11 直播背后的压测保障技术
  5. 拔得头筹 | 阿里云混合云荣膺IPv6最佳实践奖
  6. 一次.net托管内存泄露分析
  7. ​Spring事务的传播行为案例分析
  8. 过于自嗨的《紫塞秋风》,怎么就成了行业教科书?
  9. 游戏编程中的数学——随机数字生成(RNG)的黑暗秘密
  10. 百思不得其解,一个钻石玩家可以短时间上王者?因为猎游?