kafka Streams实例
本文主要描述kafka Streams的三个流实例
一. Pipe 二. line Split 三. word count
启动kafka服务
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties创建输入输出流
bin/kafka-topics.sh --create
--bootstrap-server localhost:9092
--replication-factor 1
--partitions 1
--topic streams-plaintext-inputbin/kafka-topics.sh --create
--bootstrap-server localhost:9092
--replication-factor 1
--partitions 1
--topic streams-wordcount-output
--config cleanup.policy=compact利用JAVA IDE工具Idea 或者Eclipse创建一个maven项目。引入jar包
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version> </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.0</version> </dependency> 复制代码
所用的客户端版本与kafka服务器版本保持一致,本文采用的服务器版本是kafka_2.12-2.2.0
在新建的maven项目中新增Pipe处理类
import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology;import java.util.Properties; import java.util.concurrent.CountDownLatch;public class Pipe {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();builder.stream("streams-plaintext-input").to("streams-pipe-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);} } 复制代码
启动该main方法,然后在输入流输入,输出流就有对应的输出。
在新增的maven的项目中新增LineSplit处理类
import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.KStream;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class LineSplit {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);} 复制代码
}
启动该main方法, 输出流结果为
6.在新增的maven项目中新增word count处理类
import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.KTable;import org.apache.kafka.streams.kstream.Produced;import java.util.Arrays;import java.util.Locale;import java.util.Properties;import java.util.concurrent.CountDownLatch;public final class WordCount {public static void main(final String[] args) {final Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data// Note: To re-run the demo, you need to use the offset reset tool:// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Toolprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");final StreamsBuilder builder = new StreamsBuilder();final KStream<String, String> source = builder.stream("streams-plaintext-input");final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))).groupBy((key, value) -> value).count();// need to override value serde to Long typecounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0);}}
复制代码
启动该main方法,查看输出结果命令: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic streams-wordcount-output
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
kafka Streams实例相关推荐
- kafka streams学习笔记
流式处理 流式处理是利用连续计算来处理无限数据流的能力,因为数据流是流动的.所以无须收集或存储数据以对其进行操作 这个弹珠图是流式处理的一个简单表示.图中每个圆圈代表某一特定时间点的某些信息或发生的事 ...
- Kafka Streams简介: 让流处理变得更简单
Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Ka ...
- Kafka Streams的容错机制
Kafka Streams构建于Kafka本地集成的容错功能上.kafka分区具有高可用性和复制,因此当流数据持久保存到Kafka时,即使应用程序失败并需要重新处理时也可用.Kafka Streams ...
- kafka Streams
目录 一.简介 1.概述 2.批处理和流计算 3.Kafka Streams介绍 特点 概念介绍 二.Kafka Streams示例 1.单词统计 2.求和 3.窗口操作 一.简介 1.概述 Kafk ...
- Kafka Streams开发者指南
Kafka Streams 1.1 概述 Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统.Kafka Stream基 ...
- 大数据Hadoop之——Kafka Streams原理介绍与简单应用示例
文章目录 一.Kafka Streams概述 1)Kafka Streams是什么 2)流式计算与批量计算区别 3)Kafka Streams特点 二.Kafka Streams流处理拓扑 1)相关概 ...
- Kafka Streams 核心讲解
Kafka Stream 的特点如下: •Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外, ...
- Kafka Streams(三十)
Kafka Streams Kafka 一直被认为是一个强大的消息中间件,它实现了高吞吐.高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源.目前通用的一些流式处理框架如 Apach ...
- Kafka Streams流式原理解析
前言 本篇文章会从Kafka的核心流式计算原理进行分析,Kafka Streams Low-level processor API 和 核心概念,以及常见的应用场景分析 流式计算 通过业务场景去分析流 ...
最新文章
- log4j2配置文件log4j2.xml详解
- Oracle中过程/函数返回结果集
- 解决mac osx下pip安装ipython权限的问题
- spring cloud 微服务调用--ribbon和feign调用
- 温故而知新!这篇文章可以满足你80%日常工作!面试真题解析
- 编程体系结构(06):Java面向对象
- Zookeeper——入门介绍(相关原理、安装启动及使用操作)
- C语言的构造函数与析构函数
- 统计网站 同一IP访问的次数及IP地址
- 阿里云轻量服务器使用
- vue 调用虚拟键盘
- 老电脑装linux系统能变流畅吗,老电脑非常卡如何变流畅?高手教你把电脑变流畅的五种方法...
- IDEA使用Git大全
- 如何刷机:iphone8锁屏密码错误多次,手机停用,连接iTunes,磁盘已满,双重认证弊端!
- 用思维导图和孩子们一起了解“什么是春节”
- pyqt5 制作壁纸切换工具实例 第一章
- VMOS-Pro一款虚拟机app。
- 品牌与商家如何做电商直播运营的呢?
- 电脑诊断出策略服务器未运行,IE浏览器打不开,诊断策略服务未运行
- [程序员学英语]英语国际音标