Kafka-Steam Branch介绍

通过Kafka Steam的Branch功能,可以实现对数据进行筛选,然后根据数据匹配规则对Steam进行切分。

应用场景

通过Branch功能,我们可以实现一个数据过滤/转发器。

例如该Steam收到了如下业务数据。

9:38:xiaojiu
1:25:xiaoming
4:34:xiaosi
7:75:xiaoqi
1:25:xiaoming
1:25:xiaoming
2:56:xiaoer
3:23:xiaosan

数据格式:

用户ID:年龄:姓名

有一个专门对用户ID为1的用户信息特殊处理的业务。例如用户ID为1的VIP用户有一个特定统计PV需求。

为了不影响原业务的处理逻辑和效率,项目里需要对数据源中的数据进行过滤,筛选出来特殊处理。

此时就可以通过定义一个Kafka-Steam的切分规则Predicate,对用户ID为1的数据进行拦截,生成特定的流,发送到另外单独创建的Topic中进行额外处理。

代码:

package com.hyr.kafka.demo.streams.high.dsl.operator;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;import java.util.HashMap;
import java.util.Map;/******************************************************************************** @date 2017-12-28 下午 5:28* @author: <a href=mailto:>黄跃然</a>* @Description: Branch 将stream按照规则进行切分多个stream。******************************************************************************/
public class BranchStreams {public static void main(String[] args) {Map<String, Object> props = new HashMap<String, Object>();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.133:9092");// 制定K-V 格式props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Serdes : Data Types and Serializationprops.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Serdes : Data Types and SerializationStreamsConfig config = new StreamsConfig(props);KStreamBuilder builder = new KStreamBuilder();KStream<String, String> kStream = builder.stream("my-input-topic");// Branch (or split) a KStream based on the supplied predicates into one or more KStream instances.// 根据规则将Stream进行切分Predicate<? super String, ? super String> predicate = new Predicate<String, String>() {@Overridepublic boolean test(String key, String value) {switch (value) {case "a":return true;case "b":return true;case "c":return true;case "d":return true;default:return false;}}};KStream<String, String>[] kStreams = kStream.branch(predicate);// TODO 根据规则,切分stream,分发到不通的topic中。for (KStream stream : kStreams) {stream.to("my-output-topic"); // stream.to("a"); stream.to("b"); stream.to("c"); stream.to("d");}KafkaStreams kafkaStreams = new KafkaStreams(builder, config);kafkaStreams.start();}}

整个KafkaDemo项目代码已提交到Github。

https://github.com/huangyueranbbc/KafkaDemo

Kafka-Steam Branch功能 对数据进行切分Steam,发送到指定的Topic相关推荐

  1. 利用OGG实现Oracle到Kafka到Greenplum的增量数据同步

    墨墨导读:本文来自墨天轮用户 肖杰 的投稿,介绍用OGG实现Oracle到Kafka到Greenplum的增量数据同步的全过程. 墨天轮主页:https://www.modb.pro/u/6722 背 ...

  2. mysql 数据表格切分_MySQL数据库垂直和水平切分

    replication的限制:一旦数据库过于庞大,尤其是当写入过于频繁,很难由一台主机支撑的时候,我们还是会面临到扩展瓶颈.数据切分(sharding):通过某种特定的条件,将我们存放在同一个数据库中 ...

  3. linux更换steam目录,如何在Linux上备份Steam游戏数据 | MOS86

    无论您是需要重新安装Linux操作系统,还是只是想确保游戏进度不会丢失数据,备份存储游戏数据就是答案. 您可以制作完整的磁盘映像,也可以使用专用工具. 无论出于什么原因备份游戏数据,您都可以选择以下三 ...

  4. 游戏玩家的计算机配置,游戏玩家的PC都爱用什么配置?Steam十一月硬件调查数据告诉你...

    [PConline 杂谈]很多人都会为了自己喜欢的游戏,配上一台主机或者升级一下配置,可你知道目前大部分玩游戏的人都用什么样的配置呢?作为目前全球最大的PC游戏游戏平台,Steam会通过对steam用 ...

  5. Flume+Kafka+Spark Streaming实现大数据实时流式数据采集

    近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...

  6. AKHQ:用于Apache Kafka管理主题、主题数据、消费者组、模式注册表、连接等的Kafka GUI。。。

    参考文章:https://www.5axxw.com/wiki/content/q7nyiu AKHQ(以前称为KafkaHQ) 用于Apache Kafka管理主题.主题数据.消费者组.模式注册表. ...

  7. 热议:大脑功能磁共振数据不可靠?杜克大学教授对自己15年的工作提出质疑...

    来源:brainnews 作者:brainnew创作团队 杜克大学的研究人员对功能磁共振数据进行了重新测评,对自己15年的工作提出了质疑. 脑部图像显示了不同的两天完成3个任务所对应的功能磁共振成像. ...

  8. 使用SQLServer 2008的CDC功能实现数据变更捕获

    原文: 使用SQLServer 2008的CDC功能实现数据变更捕获 最近由于工作需要,研究了一下2008 CDC功能,觉得还不错,下面整理了一下研究过程,虽然比较粗略,但是基本上能用了,如果有补充请 ...

  9. kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式

    Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...

最新文章

  1. 三公子论「财务自由」
  2. 潘在亮:给业务开发提供黑科技装备的“测试Q博士”(图灵访谈)
  3. BZOJ 3261 最大异或和 可持久化Trie树
  4. Ubuntu下使用Evernote
  5. 转自微信号:测试那点事
  6. Riot - 比 Facebook React 更轻量的 UI 库
  7. dom4j解析xml获取所有的子节点并放入map中
  8. 二下语文书电子课本_沪教版牛津英语小学五年级上册高清电子课本教材书(三年级起点)...
  9. 用C语言编写99乘法表
  10. 樱(桜) - 堀江由衣 (日语-中文-罗马)
  11. access操作mysql_Access数据库基础及应用(公选课) 第二章:Access2010数据库创建与操作...
  12. 【粒子动画tsParticles】
  13. Powerbi环比分析及其修正
  14. 利用audacity分析浊音、清音、爆破音的时域及频域特性
  15. postgresql端使用tds_fdw创建访问sqlserver的linked server的操作说明
  16. Web自动化之Pytest测试框架
  17. 史上最全的iOS面试题及答案
  18. Java - 多线程
  19. PT与PX,em(%)区别
  20. 八年级作文-流动的忧郁

热门文章

  1. inventor如何画心_Illustrator | 如何画一个心型图案
  2. Linux目录一个点.和两个点..的区别
  3. python 设置Pyplot的动态rc参数、绘图的填充
  4. 绝地求生渠道和用户画像分析
  5. 电脑音频没声音,静音
  6. Codeforces 954D. Fight Against Traffic
  7. 获取农历节日的公共方法
  8. slice,splice,split的区别,一开就懂
  9. 利用RecordRTC支持web端录制屏幕(vue写法)
  10. 今天给大家分享用scratch的画笔绘制彩色花瓣!