一. 设置Maven项目

我们将使用Kafka Streams Maven Archetype来创建Streams项目结构:

mvn archetype:generate \

-DarchetypeGroupId=org.apache.kafka \

-DarchetypeArtifactId=streams-quickstart-java \

-DarchetypeVersion=1.1.0 \

-DgroupId=streams.examples \

-DartifactId=streams.examples \

-Dversion=0.1 \

-Dpackage=myapps

如果你需要,您可以为groupId,artifactId和package设置不同的值。假设您使用上述参数值,该命令将创建一个如下所示的项目结构:

> tree streams.examples

streams-quickstart

|-- pom.xml

|-- src

|-- main

|-- java

| |-- myapps

| |-- LineSplit.java

| |-- Pipe.java

| |-- WordCount.java

|-- resources

|-- log4j.properties

项目中包含的pom.xml文件已经定义了Streams依赖项,并且在src/main/java已经有几个Streams示例程序。 既然我们要从头开始编写这样的程序,现在我们先删除这些例子:

> cd streams-quickstart

> rm src/main/java/myapps/*.java

二. 编写第一个Streams应用程序:Pipe

It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under src/main/java. Let's name it Pipe.java:

现在是编码时间! 随意打开你最喜欢的IDE并导入这个Maven项目,或者直接打开一个文本编辑器并在src/main/java下创建一个java文件。 我们将其命名为Pipe.java:

package myapps;

public class Pipe {

public static void main(String[] args) throws Exception {

}

}

我们在main中来编写这个pipe程序。请注意,由于IDE通常可以自动添加导入语句,因此我们不会列出导入语句。但是,如果您使用的是文本编辑器,则需要手动添加导入,并且在本节末尾,我们将为您显示带有导入语句的完整代码段。

编写Streams应用程序的第一步是创建一个java.util.Properties映射来指定StreamsConfig中定义的不同Streams执行配置值。 需要设置的几个重要配置值:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,它指定用于建立初始连接到Kafka集群的host/port列表,以及StreamsConfig.APPLICATION_ID_CONFIG,它提供了Streams的唯一标识符应用程序与其他应用程序进行区分:

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // assuming that the Kafka broker

假设这个应用程序和集群在同一台机器运行。

另外,你也可以自定义其他配置,例如设置消息key-value对的默认序列化和反序列:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

有关Kafka Streams的完整配置列表,请参阅这里。

接下来我们将定义Streams应用程序的计算逻辑。在Kafka Streams中,这种计算逻辑被定义为连接处理器节点的拓扑结构。我们可以使用拓扑构建器来构建这样的拓扑,

final StreamsBuilder builder = new StreamsBuilder();

然后使用此拓扑构建器,创建主题为streams-plaintext-input的源流(ps:就是数据的来源):

KStream source = builder.stream("streams-plaintext-input");

现在我们得到一个KStream,它不断的从来源主题streams-plaintext-input获取消息。消息是String类型的key-value对。我们可以用这个流做的最简单的事情就是将它写入另一个Kafka主题streams-pipe-output中:

source.to("streams-pipe-output");

请注意,我们也可以将上面两行连接成一行,如下所示:

builder.stream("streams-plaintext-input").to("streams-pipe-output");

我们可以通过执行以下操作来检查此构建器创建的拓扑结构类型:

final Topology topology = builder.build();

将描述输出:

System.out.println(topology.describe());

如果我们现在编译并运行程序,它会输出以下信息:

> mvn clean package

> mvn exec:java -Dexec.mainClass=myapps.Pipe

Sub-topologies:

Sub-topology: 0

Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001

Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output)

Global Stores:

none

如上所示,它说明构建的拓扑有两个处理器节点,源节点KSTREAM-SOURCE-0000000000和sink节点KSTREAM-SINK-0000000001。KSTREAM-SOURCE-0000000000连续读取Kafka主题streams-plaintext-input的消息,并将它们传送到其下游节点KSTREAM-SINK-0000000001; KSTREAM-SINK-0000000001会将其接收到的每条消息写入另一个Kafka主题streams-pipe-output中( -->和

请注意,我们总是可以像在上面那样在任何给定点上描述拓扑,而我们正在代码中构建它,因此作为用户,您可以交互式地“尝试并品尝”拓扑中定义的计算逻辑,直到你满意为止。假设我们已经完成了这个简单的拓扑结构,它只是以一种无尽的流式方式将数据从一个Kafka主题管道传输到另一个主题,我们现在可以使用我们刚刚构建的两个组件构建Streams客户端:配置map和拓扑对象(也可以从props map构造一个StreamsConfig对象,然后将该对象传递给构造函数,可以重载KafkaStreams构造函数来实现任一类型)。

final KafkaStreams streams = new KafkaStreams(topology, props);

通过调用它的start()函数,我们可以触发这个客户端的执行。在此客户端上调用close()之前,执行不会停止。 例如,我们可以添加一个带有倒计时的shutdown hook来捕获用户中断,并在终止该程序时关闭客户端:

final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c

Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (Throwable e) {

System.exit(1);

}

System.exit(0);

到目前为止,完整的代码如下所示:

package myapps;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class Pipe {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

builder.stream("streams-plaintext-input").to("streams-pipe-output");

final Topology topology = builder.build();

final KafkaStreams streams = new KafkaStreams(topology, props);

final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c

Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {

@Override

public void run() {

streams.close();

latch.countDown();

}

});

try {

streams.start();

latch.await();

} catch (Throwable e) {

System.exit(1);

}

System.exit(0);

}

}

如果您已经在localhost:9092上运行了Kafka,并且创建了主题streams-plaintext-input和streams-pipe-output,则可以在IDE或命令行上使用Maven运行此代码:

> mvn clean package

> mvn exec:java -Dexec.mainClass=myapps.Pipe

有关如何运行Streams应用程序并观察计算结果的详细说明,请阅读Play with a Streams部分。本节的其余部分我们不会谈论这一点。

三. 编写第二个Streams应用程序:Line Split

我们已经学会了如何构建Streams客户端及其两个关键组件:StreamsConfig和Topology。 现在让我们继续通过增加当前拓扑来添加一些实际的处理逻辑。我们可以首先复制现有的Pipe.java类来创建另一个程序:

> cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java

并更改其类名以及应用程序ID配置以,与之前的程序区分开来:

public class LineSplit {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");

// ...

}

}

由于每个源流的消息都是一个字符串类型的键值对,因此让我们将值字符串视为文本行,并使用FlatMapValues运算符将其分成单词:

KStream source = builder.stream("streams-plaintext-input");

KStream words = source.flatMapValues(new ValueMapper>() {

@Override

public Iterable apply(String value) {

return Arrays.asList(value.split("\\W+"));

}

});

操作员将把源流作为输入,并通过按顺序处理源流中的每条消息并将其值字符串分解为一个单词列表,并生成每个单词作为输出的新消息,从而生成一个名为单词的新流。这是一个无状态的操作,无需跟踪以前收到的消息或处理结果。请注意,如果您使用的是JDK 8,则可以使用lambda表达式并简化上面的代码:

KStream source = builder.stream("streams-plaintext-input");

KStream words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));

最后,我们可以将单词流写回另一个Kafka主题,比如说stream-linesplit-output。 再次,这两个步骤可以如下所示连接(假设使用lambda表达式):

KStream source = builder.stream("streams-plaintext-input");

source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))

.to("streams-linesplit-output");

如果我们现在将此扩展拓扑描述打印出来System.out.println(topology.describe()),我们将得到以下结果:

> mvn clean package

> mvn exec:java -Dexec.mainClass=myapps.LineSplit

Sub-topologies:

Sub-topology: 0

Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001

Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002

Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output)

Global Stores:

none

正如我们上面看到的,一个新的处理器节点KSTREAM-FLATMAPVALUES-0000000001被注入到原始源节点和sink节点之间的拓扑中。 它将源节点作为其父节点,将sink节点作为其子节点。换句话说,源节点获取的每个消息,将首先遍历新加入的KSTREAM-FLATMAPVALUES-0000000001节点进行处理,并且结果将生成一个或多个新消息。它们将继续往下走到sink节点回写给kafka。注意这个处理器节点是“无状态的”,因为它不与任何仓库相关联(即(stores:[]))。

完整的代码如下所示(假设使用lambda表达式):

package myapps;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class LineSplit {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream source = builder.stream("streams-plaintext-input");

source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))

.to("streams-linesplit-output");

final Topology topology = builder.build();

final KafkaStreams streams = new KafkaStreams(topology, props);

final CountDownLatch latch = new CountDownLatch(1);

// ... same as Pipe.java above

}

}

四. 编写第三个Streams应用程序:Wordcount

现在让我们进一步通过计算源文本流中单词的出现,来向拓扑中添加一些“有状态”计算。按照类似的步骤,我们创建另一个基于LineSplit.java类的程序:

public class WordCount {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");

// ...

}

}

为了计算单词,我们可以首先修改flatMapValues,将它们全部作为小写字母(假设使用lambda表达式):

source.flatMapValues(new ValueMapper>() {

@Override

public Iterable apply(String value) {

return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));

}

});

我们必须首先指定我们要关键流的字符串value,即小写单词,用groupBy操作。该运算符生成一个新的分组流,然后可以由一个计数操作员汇总,该操作员可以在每个分组键上生成一个运行计数:

KTable counts =

source.flatMapValues(new ValueMapper>() {

@Override

public Iterable apply(String value) {

return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));

}

})

.groupBy(new KeyValueMapper() {

@Override

public String apply(String key, String value) {

return value;

}

})

// Materialize the result into a KeyValueStore named "counts-store".

// The Materialized store is always of type as this is the format of the inner most store.

.count(Materialized.> as("counts-store"));

请注意,count运算符有Materialized参数,该参数指定运行计数应存储在名为counts-store的状态存储中。 此Counts仓库可以实时查询,详情请参阅开发者手册。

请注意,为了从主题streams-wordcount-output读取changelog流,需要将值反序列化设置为org.apache.kafka.common.serialization.LongDeserializer。假设可以使用JDK 8的lambda表达式,上面的代码可以简化为:

KStream source = builder.stream("streams-plaintext-input");

source.flatMapValues(value-> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))

.groupBy((key, value) -> value)

.count(Materialized.>as("counts-store"))

.toStream()

.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

如果我们再次将这种扩展拓扑描述为System.out.println(topology.describe()),我们将得到以下结果:

> mvn clean package

> mvn exec:java -Dexec.mainClass=myapps.WordCount

Sub-topologies:

Sub-topology: 0

Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001

Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002

Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005

Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004

Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition)

Sub-topology: 1

Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003

Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007

Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008

Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output)

Global Stores:

none

如上所述,拓扑现在包含两个断开的子拓扑。第一个子拓扑的接收节点KSTREAM-SINK-0000000004将写入一个重新分区主题Counts-repartition,它将由第二个子拓扑的源节点KSTREAM-SOURCE-0000000006读取。重分区topic通过使用聚合键“shuffle”的源流,在这种情况下,聚合键为值字符串。此外,在第一个子拓扑结构内部,在分组KSTREAM-KEY-SELECT-0000000002节点和sink节点之间注入无状态的KSTREAM-FILTER-0000000005节点,以过滤出聚合key为空的任何中间记录。

在第二个子拓扑中,聚合节点KSTREAM-AGGREGATE-0000000003与名为Counts的状态存储相关联(名称由用户在count运算符中指定)。在即将到来的流源节点接收到每个消息时,聚合处理器将首先查询其关联的Counts存储以获得该密钥的当前计数,并将其增加1,然后将新计数写回仓库。将每个更新的key计数传送到KTABLE-TOSTREAM-0000000007节点,KTABLE-TOSTREAM-0000000007节点将该更新流解释为消息流,然后再传输到汇聚节点KSTREAM-SINK-0000000008以写回Kafka。

完整的代码如下所示(假设使用lambda表达式):

package myapps;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.Topology;

import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;

import java.util.Locale;

import java.util.Properties;

import java.util.concurrent.CountDownLatch;

public class WordCount {

public static void main(String[] args) throws Exception {

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

final StreamsBuilder builder = new StreamsBuilder();

KStream source = builder.stream("streams-plaintext-input");

source.flatMapValues(value-> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))

.groupBy((key, value) -> value)

.count(Materialized.>as("counts-store"))

.toStream()

.to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());

final Topology topology = builder.build();

final KafkaStreams streams = new KafkaStreams(topology, props);

final CountDownLatch latch = new CountDownLatch(1);

// ... same as Pipe.java above

}

}

本文转发自 http://orchome.com/957

在本指南中,我们将从头开始帮助你搭建自己的Kafka Streams流处理程序。 强烈建议您首先阅读快速入门,了解如何运行使用Kafka Streams编写的Streams应用程序(如果尚未这样做)。

关于Kafka深入学习视频, 如Kafka领导选举, offset管理, Streams接口, 高性能之道, 监控运维, 性能测试等,

请关注个人微信公众号: 求学之旅, 发送Kafka, 即可收获Kafka学习视频大礼包一枚。

java kafkastream_手把手教你写Kafka Streams程序相关推荐

  1. 手把手教你写Kafka Streams程序

    个人名片: 因为云计算成为了监控工程师

  2. 【Golang项目实战】手把手教你写一个备忘录程序|附源码——建议收藏

    博主简介:努力学习的大一在校计算机专业学生,热爱学习和创作.目前在学习和分享:数据结构.Go,Java等相关知识. 博主主页: @是瑶瑶子啦 所属专栏: Go语言核心编程 近期目标:写好专栏的每一篇文 ...

  3. 手把手教你写个小程序定时器管理库

    背景 凹凸曼是个小程序开发者,他要在小程序实现秒杀倒计时.于是他不假思索,写了以下代码: Page({init: function () {clearInterval(this.timer)this. ...

  4. qt定时器暂停与重新开始_手把手教你写个小程序定时器管理库

    背景 凹凸曼是个小程序开发者,他要在小程序实现秒杀倒计时.于是他不假思索,写了以下代码: Page({init: function () { clearInterval(this.timer)this ...

  5. 手把手教你写个扫雷程序自己玩

    目录 前言 一.扫雷是什么? 二.代码详解(模块化) 1.创建文件 2.test.c 3.game.c 4.game.h 总结 test.c game.c game.h 前言 相信大家过年玩游戏玩王者 ...

  6. 手把手教你写一个spring IOC容器

    本文分享自华为云社区<手把手教你写一个spring IOC容器>,原文作者:技术火炬手. spring框架的基础核心和起点毫无疑问就是IOC,IOC作为spring容器提供的核心技术,成功 ...

  7. socket 长链接linux,手把手教你写 Socket 长连接

    原标题:手把手教你写 Socket 长连接 8点43分打卡 就是真爱 本文转载自公众号 玉刚说,由玉刚说写作平台[1]提供写作赞助 原作者:水晶虾饺[2] 版权声明:本文版权归微信公众号玉刚说所有,未 ...

  8. 手把手教你写需求之代码实现pdf转jpg

    前言 初入公司,很多朋友如果碰到一些莫名其妙的需求,没有做过就会很慌张.不要慌张,本文通过一个小案例手把手教你写需求. 场景 一个阳光明媚的下午,需求小姐姐向你款款走来,娇滴滴的寻求你帮忙,她需要你把 ...

  9. Android 开发之手把手教你写 ButterKnife 框架(三)

    系列文章目录导读: Android开发之手把手教你写ButterKnife框架(一) Android开发之手把手教你写ButterKnife框架(二) Android开发之手把手教你写ButterKn ...

最新文章

  1. feign session 调用_springboot使用feign调用session传递失效解决方案
  2. c++ extern “C”
  3. 【干货】常见的40个知识模型:学习力、思考力、创造力、共情力......
  4. 谈谈Web Workers
  5. jpype,jpython调用jar包中jdk的问题.
  6. rust怎么造双层_DIY双层电路板 制作详解
  7. 编译安装的mysql如何更改文件路径
  8. PPC手机QQ2008 最新版下载
  9. 计算机病毒的入侵路径,[浅谈VBS脚本病毒入侵计算机的途径与防治] 计算机病毒是指...
  10. 计算机三级网络技术-----DHCP报文分析
  11. PenTesters框架(PTF)
  12. redhat红帽官方软件仓库同步方案
  13. Python实战小项目—绘制玫瑰花送给女朋友叭
  14. 【C Primer Plus 编程题】里程和耗油量的测量方案
  15. Searching for MobileNetV3翻译
  16. 《数字图像处理》学习总结及感悟:第二章数字图像基础(1)人眼结构、感知和错觉
  17. mt7601驱动使用(二)
  18. 不同分布所表示的物理含义
  19. perl 常用模块使用例子
  20. OpenCV-Python 中文教程10——图像阈值

热门文章

  1. TestNG介绍 - 1
  2. 3.9 限制root远程登录
  3. 计蒜客——学生成绩查找系统
  4. 2021-03-06JAVA大数据Week1
  5. 《深入理解Nginx:模块开发与架构解析》一3.3 如何将自己的HTTP模块编译进Nginx...
  6. 用matlab怎么画频率特性,(matlab)频率特性仿真.pdf
  7. 强化学习离轨策略:从失败中获得成功经验 - 以追女孩为例 | 采样率的数学意义
  8. Atitit.识别损坏的图像
  9. 【操作系统/OS笔记13】信号量、PV操作、管程、条件变量、生产者消费者问题
  10. linux下bus、devices和platform的基础模型 【转】