目录

  • 一、简介
    • 1、概述
    • 2、批处理和流计算
    • 3、Kafka Streams介绍
      • 特点
      • 概念介绍
  • 二、Kafka Streams示例
    • 1、单词统计
    • 2、求和
    • 3、窗口操作

一、简介

1、概述

Kafka Streams是一套处理分析Kafka中存储数据的客户端类库,处理完的数据可以写回Kafka,或者发送给外部系统。它构建在一些重要的流处理概念之上:区分事件时间和处理时间、开窗的支持、简单有效的状态管理等。

2、批处理和流计算

  • 批计算
    批计算是在计算之前将这次计算的源数据一次性到位,按数据块来处理数据,每一个task接收一定大小的数据块,然后经过批计算在这次计算的结果一次性返还给调用者。
    批计算的处理的对象是有限数据(bound data),得到的结果也是一个有限结果集,因此批量计算中的每个任务都是短任务,任务在处理完其负责的数据后关闭。
  • 流计算
    流计算与批处理相反,流计算处理的对象是无限数据,流式计算的上游算子处理完一条数据后,会立马发送给下游算子,所以一条数据从进入流式系统到输出结果的时间间隔较短,经过流计算得到的结果也是无限的结果集。
    流式计算往往是长任务,每个work一直运行,持续接受数据源传过来的数据。

3、Kafka Streams介绍

特点

  • Kafka Streams是一个Java 客户端库,不是框架,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 支持stateful(有状态的处理),如:join、aggregation等操作
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据

概念介绍

  • 数据流(stream)
    Stream是Kafka Streams中最重要的概念,代表大小没有限制且不断更新的数据集,一个Stream是一个有序的,允许重复的不可变的数据集,被定义为一个容错的键值对。

  • 流处理器(processor)
    流处理器代表了处理拓扑中的不同步骤,并完成相应的数据转换。
    一个流处理器从它所在的拓扑上游接收数据,通过Kafka Streams提供的流处理的基本方法,如map()、filter()、join()以及聚合等方法,对数据进行处理,然后将处理之后的一个或者多个输出结果发送给下游流处理器。一个拓扑中的流处理器有Source和Sink处理器连个特殊的流处理器
    Source处理器:该处理器没有任何上游处理器
    Sink处理器:该处理器没有任何下游处理器。该处理器将从上游处理器接受到的任何数据发送到指定的主题当中

  • 处理拓扑(processor topology)
    处理拓扑是整个流处理的运算逻辑,可以理解为一个图(graph)结构,其中的顶点是各个流处理器(stream processor),数据流(stream)则构成了边

  • 无状态和有状态
    无状态(stateless):数据转换的结果仅仅取决于你目前正在处理的数据,如:map、filter等操作
    有状态(stateful):数据转换的结果依赖于一个外部的状态(state),比如一个外部的表格。如:join、count、aggregation等操作

  • 时间
    在流式数据处理中,时间是数据的一个非常重要的属性。从Kafka 0.10开始,每条记录除了Key和Value外,还增加了timestamp属性。目前Kafka Stream支持三种时间:事件发生时间、消息接收时间、消息处理时间。
    事件发生时间:事件发生的时间,包含在数据记录中。发生时间由Producer在构造ProducerRecord时指定。并且需要Broker或者Topic将message.timestamp.type设置为CreateTime(默认值)才能生效。
    消息接收时间:即消息存入Broker的时间。当Broker或Topic将message.timestamp.type设置为LogAppendTime时生效。此时Broker会在接收到消息后,存入磁盘前,将其timestamp属性值设置为当前机器时间。一般消息接收时间比较接近于事件发生时间,部分场景下可代替事件发生时间。
    消息处理时间:即Kafka Stream处理消息时的时间。

  • 窗口
    流式数据是在时间上无界的数据,而聚合操作只能作用在特定的数据集,即有界的数据集上。因此需要通过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种非常常用的设定计算边界的方式。kafka支持的窗口类型如下:
    Hopping Time Window :它有两个属性,一个是Window size,一个是Advance interval。Window size指定了窗口的大小,也即每次计算的数据集的大小。而Advance interval定义输出的时间间隔。一个典型的应用场景是,每隔5秒钟输出一次过去1个小时内网站的PV或者UV。
    Tumbling Time Window:可以认为它是Hopping Time Window的一种特例,即Window size和Advance interval相等。它的特点是各个Window之间完全不相交。
    Sliding Window:该窗口只用于2个KStream进行Join计算时。该窗口的大小定义了Join两侧KStream的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为5秒,则参与Join的2个KStream中,记录时间差小于5的记录被认为在同一个窗口中,可以进行Join计算。
    Session Window:该窗口用于对Key做Group后的聚合操作中。它需要对Key做分组,然后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,希望通过Session Window计算某个用户访问网站的时间。对于一个特定的用户(用Key表示)而言,当发生登录操作时,该用户(Key)的窗口即开始,当发生退出操作或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等

  • KTable和KStream
    KStream和KTable是Kafka Streams里内建的两个最重要的抽象,分别对应数据流和数据库。Kafka Streams很好地将存储状态的表(table)和作为记录的流(stream)无缝地结合在了一起。
    KStream
    数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。数据流中比较常记录的是事件(stream of events),这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中 插入(insert) 新数据。
    KTable
    传统数据库,包含了各种存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新(upsert) 操作
    如下图,假设有一个KStream和KTable,基于同一个Topic创建

    若是对k分组,v求和,KStream得到的结果是:<A,4>,<B,2>,<C,9>,而KTable得到的结果为:<A,3>,<B,2>,<C,5>

二、Kafka Streams示例

1、单词统计

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.KTable;import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;public class WordCountStream {public static void main(String[] args) {Properties prop = new Properties();// application_id 唯一标识Streamsprop.put(StreamsConfig.APPLICATION_ID_CONFIG,"wordCount");// 与kafka建立连接的地址和端口号prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.233.133:9092");// 提交的时间间隔prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,3000);// 下面两句分别设置Consumer读取消息的配置,非必须prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");// 设置key的序列化器prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());// 设置value的序列化器prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());// 定义一个拓扑构建器StreamsBuilder builder = new StreamsBuilder();KTable<String, Long> count = builder.stream("wordcount-input")  // 从kafka中一条一条取数据.flatMapValues(            // 遍历value,返回压扁后的数据(value) -> {        // 对数据进行按空格切割String[] split = value.toString().split(" "); // 对数据进行按空格切割List<String> strings = Arrays.asList(split); // flatMapValues需要返回一个可迭代的类型,因为List有个iterator()方法,故将其转换成List类型return strings;}).map((k, v) -> { // 此时k为空,将键值对转换成<v,"1">的形式return new KeyValue<String,String>(v,"1");}).groupByKey() // 分组.count();  // 求出单词个数count.toStream().foreach((k,v)->{ // 遍历输出k,v 方便理解System.out.println("key:"+k+"   value"+v);});count.toStream().map((x,y)->{return new KeyValue<String,String>(x,y.toString());}).to("wordcount-output" ); // 输入到另一个topic// 构建Topology对象final Topology topo = builder.build();// 构建 kafka流 API实例,将算子以及操作的服务器配置到kafka流final KafkaStreams streams = new KafkaStreams(topo,prop);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("stream"){@Overridepublic void run() {streams.close();latch.countDown(); //当前线程调用此方法,则计数减一}});try {streams.start();latch.await(); // 阻塞当前线程,直到计数器的值为0} catch (InterruptedException e) {e.printStackTrace();}System.exit(0);}
}

2、求和

public class SumStream {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "sum");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.133:9092");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000);prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KTable<String, String> reduce = builder.stream("sum-input").flatMapValues((values) -> {String[] s = values.toString().split(" ");List<String> strings = Arrays.asList(s);return strings;}).map((k, v) -> {return new KeyValue<String, String>("0", v);}).groupByKey().reduce(new Reducer<String>() {  // reduce用于求和@Overridepublic String apply(String value1, String value2) {int i = Integer.parseInt(value1);int i1 = Integer.parseInt(value2);return Integer.toString(i + i1);}}); // 输出便于查看reduce.toStream().foreach((k, v) -> {System.out.println("num; " + k + "    sum: " + v);});// 将结果写入另一个topicreduce.toStream().map((x, y) -> {return new KeyValue<String, String>(x, y);}).to("sum-output");final Topology topo = builder.build();final KafkaStreams streams = new KafkaStreams(topo, prop);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("stream") {@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.exit(0);}
}

3、窗口操作

public class WindowStream {public static void main(String[] args) {Properties prop = new Properties();prop.put(StreamsConfig.APPLICATION_ID_CONFIG, "sessionwindow");prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.233.133:9092");prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");prop.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 3000);prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());StreamsBuilder builder = new StreamsBuilder();KStream<Object, Object> source = builder.stream("windowdemo");source.flatMapValues(value-> Arrays.asList(value.toString().split("\\s+"))).map((x,y)->{return new KeyValue<String,String>(y,"1");}).groupByKey()/*Tumbling Time Window,该窗口的窗口大小和滑动大小相等,即数据没有重复.windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()))*//*Session Window,在一个会话中,数据全都有效.windowedBy(SessionWindows.with(Duration.ofSeconds(15).toMillis()))*/// Hopping Time Window,需要分别指定窗口大小和滑动时间.windowedBy(TimeWindows.of(Duration.ofSeconds(5).toMillis()).advanceBy(Duration.ofSeconds(2).toMillis())).count().toStream().foreach((x,y)->{System.out.println("x:"+x+" y:"+y);});final Topology topo = builder.build();final KafkaStreams streams = new KafkaStreams(topo,prop);final CountDownLatch latch = new CountDownLatch(1);Runtime.getRuntime().addShutdownHook(new Thread("stream"){@Overridepublic void run() {streams.close();latch.countDown();}});try {streams.start();latch.await();} catch (InterruptedException e) {e.printStackTrace();}System.exit(0);}
}

kafka Streams相关推荐

  1. kafka streams_如何使用Kafka Streams实施更改数据捕获

    kafka streams Change Data Capture (CDC) involves observing the changes happening in a database and m ...

  2. 云计算情报局预告|告别 Kafka Streams,让轻量级流处理更加简单

    作者:不周 关键词:Kafka ETL,高弹性.免运维.低成本 阿里云消息队列 Kafka 版提供兼容 Apache Kafka 生态的全托管服务,彻底解决开源产品长期的痛点,是大数据生态中不可或缺的 ...

  3. Confluent Platform 3.0支持使用Kafka Streams实现实时的数据处理(最新版已经是3.1了,支持kafka0.10了)...

    来自 Confluent 的 Confluent Platform 3.0 消息系统支持使用 Kafka Streams 实现实时的数据处理,这家公司也是在背后支撑 Apache Kafka 消息框架 ...

  4. 【kafka】在 Kafka Streams 中启用 Exactly-Once

    文章目录 1.概述 2. 什么是Kafka Streams ? 3.Exactly-Once 作为 Kafka Streams 中的单个配置 4.什么是流处理的 Exactly-Once? 5.Exa ...

  5. 【Kafka】Kafka Streams简介

    1.概述 Kafka在0.10.0.0版本以前的定位是分布式,分区化的,带备份机制的日志提交服务.而kafka在这之前也没有提供数据处理的顾服务.大家的流处理计算主要是还是依赖于Storm,Spark ...

  6. Kafka Streams简介: 让流处理变得更简单

    Introducing Kafka Streams: Stream Processing Made Simple 这是Jay Kreps在三月写的一篇文章,用来介绍Kafka Streams.当时Ka ...

  7. java kafkastream_手把手教你写Kafka Streams程序

    一. 设置Maven项目 我们将使用Kafka Streams Maven Archetype来创建Streams项目结构: mvn archetype:generate \ -DarchetypeG ...

  8. Kafka Streams 剖析

    1.概述 Kafka Streams 是一个用来处理流式数据的库,属于Java类库,它并不是一个流处理框架,和Storm,Spark Streaming这类流处理框架是明显不一样的.那这样一个库是做什 ...

  9. Kafka Streams开发单词计数应用

    pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="htt ...

  10. Kafka Streams的容错机制

    Kafka Streams构建于Kafka本地集成的容错功能上.kafka分区具有高可用性和复制,因此当流数据持久保存到Kafka时,即使应用程序失败并需要重新处理时也可用.Kafka Streams ...

最新文章

  1. Eclipse 最常用的 10 组快捷键,个个牛逼!
  2. ts连接mysql数据库_各种数据库的连接方法
  3. 机器学习算法基础——逻辑回归
  4. 笔记-中项案例题-2019年下-信息系统安全管理
  5. Word中轻松插入本地视频
  6. C#中Trim()、TrimStart()、TrimEnd()的错误认识
  7. php mysql增改删_PHP分享:如何实现MySQL的增加删除修改查看
  8. 音视频技术开发周刊 | 135
  9. 电脑可以开机但是黑屏_电脑开机后黑屏如何解决 电脑开机后黑屏解决方法【详解】...
  10. ubuntu下sublime中修改字体
  11. .net html5 框架,ASP.NET - 介绍 ASP.NET Web 窗体框架的导航 | Microsoft Docs
  12. RESTful JSON Web服务最佳实践
  13. deeplearning中卷积后尺寸的变化
  14. Cannot get a connection, pool exhausted, cause: ValidateObject failed
  15. 鲁棒性的获得 —— 测试的架构
  16. ubantu 终端屏幕查找字符串
  17. 沙扬娜拉一首——赠日本女郎(徐志摩)
  18. C++编译错误提示 [Error] name lookup of 'i' changed for ISO 'for' scoping
  19. 域用户绑定计算机,域批量绑定用户帐户与计算机帐户
  20. MATLAB绘制“问题儿童表情包”动图2

热门文章

  1. 前几年系列3:无间道2(由人物到剧情)
  2. ios 解决file not found问题
  3. 修改 dokuwiki(适合开发类的最好的wiki) 支持 editor.md(国人做的最好的markdown)编辑
  4. 心之所向,素履以往——有许多困难,我们不得不独自面对
  5. HTML常用鼠标指针样式设置
  6. Unity添加可拉伸的图片
  7. 如何写好一个系列专栏
  8. 关于duplicate symbols for architecture i386 问题
  9. C++中accumulate函数的使用
  10. 领域驱动DDD在签到场景落地案例之架构模式(二)