Kafka-Steam Branch功能 对数据进行切分Steam,发送到指定的Topic
Kafka-Steam Branch介绍
通过Kafka Steam的Branch功能,可以实现对数据进行筛选,然后根据数据匹配规则对Steam进行切分。
应用场景
通过Branch功能,我们可以实现一个数据过滤/转发器。
例如该Steam收到了如下业务数据。
9:38:xiaojiu |
1:25:xiaoming |
4:34:xiaosi |
7:75:xiaoqi |
1:25:xiaoming |
1:25:xiaoming |
2:56:xiaoer |
3:23:xiaosan |
数据格式:
用户ID:年龄:姓名
有一个专门对用户ID为1的用户信息特殊处理的业务。例如用户ID为1的VIP用户有一个特定统计PV需求。
为了不影响原业务的处理逻辑和效率,项目里需要对数据源中的数据进行过滤,筛选出来特殊处理。
此时就可以通过定义一个Kafka-Steam的切分规则Predicate,对用户ID为1的数据进行拦截,生成特定的流,发送到另外单独创建的Topic中进行额外处理。
代码:
package com.hyr.kafka.demo.streams.high.dsl.operator;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.Predicate;import java.util.HashMap;
import java.util.Map;/******************************************************************************** @date 2017-12-28 下午 5:28* @author: <a href=mailto:>黄跃然</a>* @Description: Branch 将stream按照规则进行切分多个stream。******************************************************************************/
public class BranchStreams {public static void main(String[] args) {Map<String, Object> props = new HashMap<String, Object>();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.133:9092");// 制定K-V 格式props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Serdes : Data Types and Serializationprops.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // Serdes : Data Types and SerializationStreamsConfig config = new StreamsConfig(props);KStreamBuilder builder = new KStreamBuilder();KStream<String, String> kStream = builder.stream("my-input-topic");// Branch (or split) a KStream based on the supplied predicates into one or more KStream instances.// 根据规则将Stream进行切分Predicate<? super String, ? super String> predicate = new Predicate<String, String>() {@Overridepublic boolean test(String key, String value) {switch (value) {case "a":return true;case "b":return true;case "c":return true;case "d":return true;default:return false;}}};KStream<String, String>[] kStreams = kStream.branch(predicate);// TODO 根据规则,切分stream,分发到不通的topic中。for (KStream stream : kStreams) {stream.to("my-output-topic"); // stream.to("a"); stream.to("b"); stream.to("c"); stream.to("d");}KafkaStreams kafkaStreams = new KafkaStreams(builder, config);kafkaStreams.start();}}
整个KafkaDemo项目代码已提交到Github。
https://github.com/huangyueranbbc/KafkaDemo
Kafka-Steam Branch功能 对数据进行切分Steam,发送到指定的Topic相关推荐
- 利用OGG实现Oracle到Kafka到Greenplum的增量数据同步
墨墨导读:本文来自墨天轮用户 肖杰 的投稿,介绍用OGG实现Oracle到Kafka到Greenplum的增量数据同步的全过程. 墨天轮主页:https://www.modb.pro/u/6722 背 ...
- mysql 数据表格切分_MySQL数据库垂直和水平切分
replication的限制:一旦数据库过于庞大,尤其是当写入过于频繁,很难由一台主机支撑的时候,我们还是会面临到扩展瓶颈.数据切分(sharding):通过某种特定的条件,将我们存放在同一个数据库中 ...
- linux更换steam目录,如何在Linux上备份Steam游戏数据 | MOS86
无论您是需要重新安装Linux操作系统,还是只是想确保游戏进度不会丢失数据,备份存储游戏数据就是答案. 您可以制作完整的磁盘映像,也可以使用专用工具. 无论出于什么原因备份游戏数据,您都可以选择以下三 ...
- 游戏玩家的计算机配置,游戏玩家的PC都爱用什么配置?Steam十一月硬件调查数据告诉你...
[PConline 杂谈]很多人都会为了自己喜欢的游戏,配上一台主机或者升级一下配置,可你知道目前大部分玩游戏的人都用什么样的配置呢?作为目前全球最大的PC游戏游戏平台,Steam会通过对steam用 ...
- Flume+Kafka+Spark Streaming实现大数据实时流式数据采集
近年来,随着企业信息化建设的飞速发展,大数据应用的问题越来越备受关注.很多企业投入大量的人力.物力和财力建设企业大数据平台,平台建设工作涵盖数据采集.数据处理.数据存储.数据服务.数据展示以及数据质量 ...
- AKHQ:用于Apache Kafka管理主题、主题数据、消费者组、模式注册表、连接等的Kafka GUI。。。
参考文章:https://www.5axxw.com/wiki/content/q7nyiu AKHQ(以前称为KafkaHQ) 用于Apache Kafka管理主题.主题数据.消费者组.模式注册表. ...
- 热议:大脑功能磁共振数据不可靠?杜克大学教授对自己15年的工作提出质疑...
来源:brainnews 作者:brainnew创作团队 杜克大学的研究人员对功能磁共振数据进行了重新测评,对自己15年的工作提出了质疑. 脑部图像显示了不同的两天完成3个任务所对应的功能磁共振成像. ...
- 使用SQLServer 2008的CDC功能实现数据变更捕获
原文: 使用SQLServer 2008的CDC功能实现数据变更捕获 最近由于工作需要,研究了一下2008 CDC功能,觉得还不错,下面整理了一下研究过程,虽然比较粗略,但是基本上能用了,如果有补充请 ...
- kafka修改分区数_大数据技术:解析SparkStreaming和Kafka集成的两种方式
Spark Streaming是基于微批处理的流式计算引擎,通常是利用Spark Core或者Spark Core与Spark Sql一起来处理数据.在企业实时处理架构中,通常将Spark Strea ...
最新文章
- 三公子论「财务自由」
- 潘在亮:给业务开发提供黑科技装备的“测试Q博士”(图灵访谈)
- BZOJ 3261 最大异或和 可持久化Trie树
- Ubuntu下使用Evernote
- 转自微信号:测试那点事
- Riot - 比 Facebook React 更轻量的 UI 库
- dom4j解析xml获取所有的子节点并放入map中
- 二下语文书电子课本_沪教版牛津英语小学五年级上册高清电子课本教材书(三年级起点)...
- 用C语言编写99乘法表
- 樱(桜) - 堀江由衣 (日语-中文-罗马)
- access操作mysql_Access数据库基础及应用(公选课) 第二章:Access2010数据库创建与操作...
- 【粒子动画tsParticles】
- Powerbi环比分析及其修正
- 利用audacity分析浊音、清音、爆破音的时域及频域特性
- postgresql端使用tds_fdw创建访问sqlserver的linked server的操作说明
- Web自动化之Pytest测试框架
- 史上最全的iOS面试题及答案
- Java - 多线程
- PT与PX,em(%)区别
- 八年级作文-流动的忧郁