本文主要描述kafka Streams的三个流实例
一. Pipe 二. line Split 三. word count

  1. 启动kafka服务
    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties

  2. 创建输入输出流

    bin/kafka-topics.sh --create
    --bootstrap-server localhost:9092
    --replication-factor 1
    --partitions 1
    --topic streams-plaintext-input

    bin/kafka-topics.sh --create
    --bootstrap-server localhost:9092
    --replication-factor 1
    --partitions 1
    --topic streams-wordcount-output
    --config cleanup.policy=compact

  3. 利用JAVA IDE工具Idea 或者Eclipse创建一个maven项目。引入jar包

    <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.2.0</version>
    </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>2.2.0</version>
    </dependency>
    复制代码

所用的客户端版本与kafka服务器版本保持一致,本文采用的服务器版本是kafka_2.12-2.2.0

  1. 在新建的maven项目中新增Pipe处理类

    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.Topology;import java.util.Properties;
    import java.util.concurrent.CountDownLatch;public class Pipe {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();builder.stream("streams-plaintext-input").to("streams-pipe-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
    }
    复制代码

    启动该main方法,然后在输入流输入,输出流就有对应的输出。

  1. 在新增的maven的项目中新增LineSplit处理类

     import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.Topology;import org.apache.kafka.streams.kstream.KStream;import java.util.Arrays;import java.util.Properties;import java.util.concurrent.CountDownLatch;public class LineSplit {public static void main(String[] args) throws Exception {Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());final StreamsBuilder builder = new StreamsBuilder();KStream<String, String> source = builder.stream("streams-plaintext-input");source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))).to("streams-linesplit-output");final Topology topology = builder.build();final KafkaStreams streams = new KafkaStreams(topology, props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (Throwable e) {System.exit(1);}System.exit(0);}
    复制代码

    }

启动该main方法, 输出流结果为

6.在新增的maven项目中新增word count处理类

    import org.apache.kafka.clients.consumer.ConsumerConfig;import org.apache.kafka.common.serialization.Serdes;import org.apache.kafka.streams.KafkaStreams;import org.apache.kafka.streams.StreamsBuilder;import org.apache.kafka.streams.StreamsConfig;import org.apache.kafka.streams.kstream.KStream;import org.apache.kafka.streams.kstream.KTable;import org.apache.kafka.streams.kstream.Produced;import java.util.Arrays;import java.util.Locale;import java.util.Properties;import java.util.concurrent.CountDownLatch;public final class WordCount {public static void main(final String[] args) {final Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());// setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data// Note: To re-run the demo, you need to use the offset reset tool:// https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Application+Reset+Toolprops.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");final StreamsBuilder builder = new StreamsBuilder();final KStream<String, String> source = builder.stream("streams-plaintext-input");final KTable<String, Long> counts = source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "))).groupBy((key, value) -> value).count();// need to override value serde to Long typecounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));final KafkaStreams streams = new KafkaStreams(builder.build(), props);final CountDownLatch latch = new CountDownLatch(1);// attach shutdown handler to catch control-cRuntime.getRuntime().addShutdownHook(new Thread("streams-wordcount-shutdown-hook") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (final Throwable e) {System.exit(1);}System.exit(0);}}
复制代码

启动该main方法,查看输出结果命令: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic streams-wordcount-output
--from-beginning
--formatter kafka.tools.DefaultMessageFormatter
--property print.key=true
--property print.value=true
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

kafka Streams实例相关推荐

  1. kafka streams学习笔记

    流式处理 流式处理是利用连续计算来处理无限数据流的能力,因为数据流是流动的.所以无须收集或存储数据以对其进行操作 这个弹珠图是流式处理的一个简单表示.图中每个圆圈代表某一特定时间点的某些信息或发生的事 ...

  2. Kafka Streams简介: 让流处理变得更简单

    Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Ka ...

  3. Kafka Streams的容错机制

    Kafka Streams构建于Kafka本地集成的容错功能上.kafka分区具有高可用性和复制,因此当流数据持久保存到Kafka时,即使应用程序失败并需要重新处理时也可用.Kafka Streams ...

  4. kafka Streams

    目录 一.简介 1.概述 2.批处理和流计算 3.Kafka Streams介绍 特点 概念介绍 二.Kafka Streams示例 1.单词统计 2.求和 3.窗口操作 一.简介 1.概述 Kafk ...

  5. Kafka Streams开发者指南

    Kafka Streams 1.1 概述 Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统.Kafka Stream基 ...

  6. 大数据Hadoop之——Kafka Streams原理介绍与简单应用示例

    文章目录 一.Kafka Streams概述 1)Kafka Streams是什么 2)流式计算与批量计算区别 3)Kafka Streams特点 二.Kafka Streams流处理拓扑 1)相关概 ...

  7. Kafka Streams 核心讲解

    Kafka Stream 的特点如下: •Kafka Stream 提供了一个非常简单而轻量的 Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署•除了 Kafka 外, ...

  8. Kafka Streams(三十)

    Kafka Streams Kafka 一直被认为是一个强大的消息中间件,它实现了高吞吐.高可用和低延时的消息传输能力,这让它成为流式处理系统中完美的数据来源.目前通用的一些流式处理框架如 Apach ...

  9. Kafka Streams流式原理解析

    前言 本篇文章会从Kafka的核心流式计算原理进行分析,Kafka Streams Low-level processor API 和 核心概念,以及常见的应用场景分析 流式计算 通过业务场景去分析流 ...

最新文章

  1. log4j2配置文件log4j2.xml详解
  2. Oracle中过程/函数返回结果集
  3. 解决mac osx下pip安装ipython权限的问题
  4. spring cloud 微服务调用--ribbon和feign调用
  5. 温故而知新!这篇文章可以满足你80%日常工作!面试真题解析
  6. 编程体系结构(06):Java面向对象
  7. Zookeeper——入门介绍(相关原理、安装启动及使用操作)
  8. C语言的构造函数与析构函数
  9. 统计网站 同一IP访问的次数及IP地址
  10. 阿里云轻量服务器使用
  11. vue 调用虚拟键盘
  12. 老电脑装linux系统能变流畅吗,老电脑非常卡如何变流畅?高手教你把电脑变流畅的五种方法...
  13. IDEA使用Git大全
  14. 如何刷机:iphone8锁屏密码错误多次,手机停用,连接iTunes,磁盘已满,双重认证弊端!
  15. 用思维导图和孩子们一起了解“什么是春节”
  16. pyqt5 制作壁纸切换工具实例 第一章
  17. VMOS-Pro一款虚拟机app。
  18. 品牌与商家如何做电商直播运营的呢?
  19. 电脑诊断出策略服务器未运行,IE浏览器打不开,诊断策略服务未运行
  20. [程序员学英语]英语国际音标

热门文章

  1. 操作系统习题自用(一)
  2. 嵌入式计算机答辩标准,秦云川答辩公告
  3. 华为HMS的“生态雪球”,滚动在万物智联的新跑道
  4. SUPPORTS, REQUIRED ,REQUIRES_NEW的区别
  5. 六一新玩法!AI涂鸦秒变精美艺术画
  6. MetersPhere 试用
  7. 我搞自由职业怎么样了
  8. 赵雷_成都(入门吉他谱)D调简易版,纵玩乐器制谱
  9. python作业程序设计_Python程序设计基础【实境编程】高校邦作业课后答案
  10. GLAD:布里渊散射散斑现象聚焦几何模拟