本文作者:倪泽,Apache RocketMQ committer、RSQLDB/RocketMQ Streams Maintainer

01 背景

RocketMQ Streams是一款基于RocketMQ为基础的轻量级流计算引擎,具有资源消耗少、部署简单、功能全面的特点,目前已经在社区开源。RocketMQ Streams在阿里云内部被使用在对资源比较敏感,同时又强烈需要流计算的场景,比如在自建机房的云安全场景下。

自RocketMQ Streams开源以来,吸引了大量用户调研和试用。但是也存在一些问题,在RocketMQ Streams 1.1.0中,主要针对以下问题做出了改进和优化。

1、面向用户API不够友好,不能使用泛型,不支持自定义序列化/反序列化;

2、代码冗余,在RocketMQ Streams中存在将流处理拓扑序列化反序列化模块,RocketMQ Streams作为轻量级流处理SDK,构建好流处理节点之后应该可以直接处理数据,不存在将流处理拓扑图本地保存或者网络传输需求。

3、流处理过程不容易理解,含有大量缓存、刷新逻辑;

4、存在大量支持SQL的代码,这部分和SDK方式运行流处理任务的逻辑无关;

在RocketMQ Streams 1.1.0中,对上述问题做出了改进,期望能带来更好的使用体验。同时,重新设计了流处理拓扑构建过程、去掉冗余代码,使得代码更容易被理解。

从今天起,将推出系列文章介绍RocketMQ Streams 1.1.0版本,本次文章主要介绍RocketMQ Streams 1.1.0的API如何使用,如何利用RocketMQ Streams快速构建流处理应用。

02 典型使用示例

本地运行下列示例的步骤:

1、部署RocketMQ 5.0;

2、使用mqAdmin创建topic;

3、构建示例工程,添加依赖,启动示例。RocketMQ Streams 坐标:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-streams</artifactId><version>1.1.0</version>
</dependency>

4、向topic中写入相应数据,并观察结果。

更详细文档请参考:GitHub - apache/rocketmq-streams: Apache rocketmq

WordCount

public class WordCount {public static void main(String[] args) {StreamBuilder builder = new StreamBuilder("wordCount");builder.source("sourceTopic", total -> {String value = new String(total, StandardCharsets.UTF_8);return new Pair<>(null, value);}).flatMap((ValueMapperAction<String, List<String>>) value -> {String[] splits = value.toLowerCase().split("\W+");return Arrays.asList(splits);}).keyBy(value -> value).count().toRStream().print();TopologyBuilder topologyBuilder = builder.build();Properties properties = new Properties();properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("wordcount-shutdown-hook") {@Overridepublic void run() {rocketMQStream.stop();latch.countDown();}});try {rocketMQStream.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0);}
}

WordCount示例要点:

1、JobId wordCount唯一标识流处理任务;

2、自定义的反序列化;

3、一对多转化;

4、lambda形式从数据中指定Key;

5、支持有状态计算;

窗口聚合

public class WindowCount {public static void main(String[] args) {StreamBuilder builder = new StreamBuilder("windowCountUser");AggregateAction<String, User, Num> aggregateAction = (key, value, accumulator) -> new Num(value.getName(), 100);builder.source("user", source -> {User user1 = JSON.parseObject(source, User.class);return new Pair<>(null, user1);}).selectTimestamp(User::getTimestamp).filter(value -> value.getAge() > 0).keyBy(value -> "key").window(WindowBuilder.tumblingWindow(Time.seconds(15))).aggregate(aggregateAction).toRStream().print();TopologyBuilder topologyBuilder = builder.build();Properties properties = new Properties();properties.putIfAbsent(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");properties.put(Constant.TIME_TYPE, TimeType.EVENT_TIME);properties.put(Constant.ALLOW_LATENESS_MILLISECOND, 2000);RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);rocketMQStream.start();}
}

窗口聚合示例要点:

1、支持指定时间字段;

2、支持滑动、滚动、会话多种类型window;

3、支持自定义UDAF类型聚合;

4、支持自定义时间类型和数据最大迟到时间;

双流JOIN

public class JoinWindow {public static void main(String[] args) {StreamBuilder builder = new StreamBuilder("joinWindow");//左流RStream<User> user = builder.source("user", total -> {User user1 = JSON.parseObject(total, User.class);return new Pair<>(null, user1);});//右流RStream<Num> num = builder.source("num", source -> {Num user12 = JSON.parseObject(source, Num.class);return new Pair<>(null, user12);});//自定义join后的运算ValueJoinAction<User, Num, Union> action = new ValueJoinAction<User, Num, Union>() {@Overridepublic Union apply(User value1, Num value2) {...}};user.join(num).where(User::getName).equalTo(Num::getName).window(WindowBuilder.tumblingWindow(Time.seconds(30))).apply(action).print();TopologyBuilder topologyBuilder = builder.build();Properties properties = new Properties();properties.put(MixAll.NAMESRV_ADDR_PROPERTY, "127.0.0.1:9876");RocketMQStream rocketMQStream = new RocketMQStream(topologyBuilder, properties);rocketMQStream.start();}
}

双流聚合示例要点:

1、支持window join和非window join,对于非window join,只需要在上述及连表达式中去掉window即可;

2、支持多种窗口类型的window join;

3、支持对join后数据自定义操作;

03 参与贡献

RocketMQ Streams是Apache RocketMQ的子项目,已经在社区开源,参与RocketMQ Streams相关工作,请参考以下资源:

1、试用RocketMQ Streams,并阅读相关文档以了解更多信息;

maven仓库坐标:

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-streams</artifactId><version>1.1.0</version>
</dependency>

RocketMQ Streams文档:

https://rocketmq.apache.org/zh/docs/streams/30RocketMQ%20Streams%20Overview

2、参与贡献:如果你有任何功能请求或错误报告,请随时提交 Pull Request 来分享你的反馈和想法;

社区仓库:

https://github.com/apache/rocketmq-streams

3、联系我们:可以在 GitHub上创建 Issue,向 RocketMQ 邮件列表发送电子邮件,或在 RocketMQ Streams SIG 交流群与专家共同探讨,RocketMQ Streams SIG加入方式:添加“小火箭”微信,回复RocketMQ Streams。

邮件列表:

https://lists.apache.org/list.html?dev@rocketmq.apache.org

RocketMQ Streams 1.1.0: 轻量级流处理再出发相关推荐

  1. RocketMQ Streams拓扑构建与数据处理过程

    本文作者:倪泽,Apache RocketMQ committer.RSQLDB/RocketMQ Streams Maintainer 01 背景 RocketMQ Streams 1.1.0版本已 ...

  2. RocketMQ Streams在云安全及 IoT 场景下的大规模最佳实践

    本文作者:袁小栋,Apache RocketMQ Committer,RocketMQ Streams Cofonder,阿里云安全智能计算引擎负责人 RocketMQ Streams简介 Rocke ...

  3. mysql mgr_MySQL 8.0 MGR 流控功能

    MGR(MySQL Group Replication)是MySQL集群技术的一个重大改进,而流控(Flow Control)在MGR组复制的性能和数据完整性上起着非常关键的作用,本文将解释MGR中的 ...

  4. 空指针:从 0 到 NULL,再到 nullptr

    nullptr 空指针:从 0 到 NULL,再到 nullptr NULL 是一个宏定义: #undef NULL #if defined(__cplusplus) #define NULL 0 # ...

  5. 云计算情报局预告|告别 Kafka Streams,让轻量级流处理更加简单

    作者:不周 关键词:Kafka ETL,高弹性.免运维.低成本 阿里云消息队列 Kafka 版提供兼容 Apache Kafka 生态的全托管服务,彻底解决开源产品长期的痛点,是大数据生态中不可或缺的 ...

  6. 再见 2020!Apache RocketMQ 发布 4.8.0,DLedger 模式全面提升!

    作者 | RocketMQ社区 来源|阿里巴巴云原生公众号 "童年的雨天最是泥泞,却是记忆里最干净的曾经.凛冬散尽,星河长明,新的一年,万事顺遂,再见,2020!" 走过这个岁末, ...

  7. 利用EVC快速开发WINCE5.0的流驱动(转载)

    WinCE5.0提供了一个标准的流驱动格式,大大方便了设备驱动程序的开发工作.但是传统的开发方式往往效率很低.方法如下: 1.在Platform Builder下建立一个流驱动的dll工程. 2.为流 ...

  8. Apache RocketMQ 发布 v4.4.0,新添权限控制和消息轨迹特性

    近日,Apache RocketMQ 发布了 v4.4.0,该版本主要增加了权限控制(ACL)和消息轨迹(Message Trace)两大特性,并做了8项优化,和修复了4处bug. 权限控制(ACL) ...

  9. (连载)Android 8.0 : 系统启动流程之Linux内核

    这是一个连载的博文系列,我将持续为大家提供尽可能透彻的Android源码分析 github连载地址 前言 Android本质上就是一个基于Linux内核的操作系统,与Ubuntu Linux.Fedo ...

最新文章

  1. 拒绝了我们的连接请求_职场上,我们该如何巧妙而优雅的拒绝同事忙的请求呢?...
  2. Asp.net MVC JsonResult 忽略属性
  3. 单选项选择时,即时更新至数据库
  4. LeetCode 986. 区间列表的交集
  5. linux teaming状态命令,Linux 网卡Teaming
  6. 单目深度估计方法:现状与前瞻
  7. public/protected/private简介
  8. mapper同时添加数据只能添加一条_神器之通用mapper的使用
  9. android office转pdf插件,office word转pdf插件-Office自带Word转PDF插件下载__飞翔下载
  10. PS-如何用ps软件看psd文件中的字体大小
  11. 索尼1a dac插电脑用什么驱动。在哪下载,求助
  12. 服务器主机密码忘记了怎么破解?
  13. 关于试用期的四大认知误区,千万别被渣公司坑了!
  14. 多旋翼无人机技术详解
  15. Windows Shell编程-第七章.侵入Shell
  16. str和repr显示格式
  17. vmware部署优麒麟系统
  18. 共享单车智能管控技术手段探讨
  19. 基于springboot的中国国家图书馆管理系统项目(管理功能)
  20. php实现图片的翻滚,css实现图片滚动 - 我是希希呀的个人空间 - OSCHINA - 中文开源技术交流社区...

热门文章

  1. 运用tkinter做一个钟表(动态)
  2. 最强组合HuggingFace+ChatGPT=「贾维斯」现在开放demo了!
  3. partprobe分区报错
  4. 查询oracle表序列,Oracle查询全部序列
  5. 魅族16Android版本,魅族16/16 Plus再曝光:大小都有骁龙845版本
  6. 大都会系统MetLife小记
  7. A 3. 笔记 - 精通scrapy网络爬虫 - 刘硕(18年7月)
  8. 第二类斯特林数与自然数幂和
  9. 计算机开机后反复上电无法启动,电脑无法开机一直重启显示器也不亮——用排除法一样来,拔...
  10. vista怎么看计算机配置,查看电脑配置软件_无需软件 三方法查看电脑配置