1 概况

Kafka Streams是一套处理分析Kafka中存储数据的客户端类库,处理完的数据或者写回Kafka,或者发送给外部系统。它构建在一些重要的流处理概念之上:区分事件时间和处理时间、开窗的支持、简单有效的状态管理等。Kafka Streams入门的门槛很低:很容易编写单机的示例程序,然后通过在多台机器上运行多个实例即可水平扩展从而达到高吞吐量。Kafka Streams利用Kafka的并发模型以实现透明的负载均衡。

一些亮点:

·            设计成简单和轻量级的客户端类库,可以和现有Java应用、部署工具轻松整合。

·            除了Kafka自身外不依赖其他外部系统。利用Kafka的分区模型来实现水平扩展并保证有序处理。

·            支持容错的本地状态,这使得快速高效处理一些有状态的操作(如连接和开窗聚合)成为可能。

·            支持一次一条记录的处理方式以实现低延迟,也支持基于事件时间的开窗操作。

·            提供了两套流处理原语:高层的流DSL和低层的处理器API

2 开发

2.1 核心概念

流处理拓扑

·            “流”是Kafka Streams最重要的抽象,代表了一个无边界的、持续更新的数据集。流是一种有序的、可回放的、容错的、不可变的数据记录序列,“数据记录”指一个键值对。

·            一个流处理应用程序通过一或多个“处理器拓扑”来定义其计算逻辑,一个处理器拓扑就是一张以流处理器(节点)和流(边)构成的图。(实际为DAG,太熟悉了吧)

·            “流处理器”是处理器拓扑中的节点,表示一个转换流中数据的处理步骤,它从上游处理器一次接受一条输入记录,操作记录,然后输出一或多条输出记录到下游处理器。

Kafka Streams提供两种定义流处理拓扑的方式(上面已提到):流DSL提供最常用的数据变换操作如map和filter;低层的处理器API允许随意连接自定义处理器并与“状态仓库”交互。

时间

时间的概念在流处理中很关键,比如开窗这种操作就是根据时间边界来定义的。

上面也提到过两个常见概念:

·            事件时间:事件或数据记录发生的时刻。

·            处理时间:事件或数据记录被流处理应用开始处理的时刻,比如记录开始被消费。处理时间可能比事件时间晚几毫秒到几天不等。

Kafka Streams通过TimestampExtractor接口给每个数据记录赋一个时间戳。可以根据不同的需要来确定时间戳的实现,如使用数据记录的内置时间戳来实现事件时间的语义,或者打上处理器开始消费的时间来实现处理时间的语义。开发者可以根据业务需求来选择一种。

状态

一些流处理应用不需要管理状态,这意味着一条消息和另一条消息是独立的。(如Storm,不过需要区分Acker中的“状态”,那个是用来确保单条消息exactly once语义的而不是消息间的)如果管理状态的话可以提供很多比较复杂的流处理应用:如在流中连接、分组或聚合数据记录等。大量有状态的操作方法在流DSL中提供。

Kafka Streams提供了一种“状态仓库”,可被流处理应用用来存储和查询状态数据。这对实现有状态的操作很关键。每个Kafka Streams使用一或多个状态仓库,可通过API来存取数据。这种状态仓库可以是持久化的键值对、内存中的hashmap、或其他各类数据结构。Kafka Streams对本地状态仓库提供了容错和自动恢复。

2.2 低层处理器API

处理器

可通过实现Processor接口来自定义处理逻辑,该接口有两个主要方法,process方法会被作用于每条收到的记录,punctuate方法基于时间的流逝周期性地执行。另外,处理器可使用init方法中创建的ProcessorContext实例来维护当前上下文,并使用上下文来调用周期性任务(context().schedule),或将修改的、新的键值对推送给下游处理器(context().forward),或提交当前的处理进度(context().commit),等等。

1.  public class MyProcessor extends Processor {

2.          private ProcessorContext context;

3.          private KeyValueStore kvStore;

4.

5.          @Override

6.          @SuppressWarnings("unchecked")

7.          public void init(ProcessorContext context) {

8.              this.context = context;

9.              this.context.schedule(1000);

10.             this.kvStore = (KeyValueStore) context.getStateStore("Counts");

11.         }

12.

13.         @Override

14.         public void process(String dummy, String line) {

15.             String[] words = line.toLowerCase().split(" ");

16.

17.             for (String word : words) {

18.                 Integer oldValue = this.kvStore.get(word);

19.

20.                 if (oldValue == null) {

21.                     this.kvStore.put(word, 1);

22.                 } else {

23.                     this.kvStore.put(word, oldValue + 1);

24.                 }

25.             }

26.         }

27.

28.         @Override

29.         public void punctuate(long timestamp) {

30.             KeyValueIterator iter = this.kvStore.all();

31.

32.             while (iter.hasNext()) {

33.                 KeyValue entry = iter.next();

34.                 context.forward(entry.key, entry.value.toString());

35.             }

36.

37.             iter.close();

38.             context.commit();

39.         }

40.

41.         @Override

42.         public void close() {

43.             this.kvStore.close();

44.         }

45.     };

处理器拓扑

有了在处理器API中自定义的处理器,然后就可以使用TopologyBuilder来将处理器连接到一起从而构建处理器拓扑:

1.  TopologyBuilder builder = new TopologyBuilder();

2.

3.      builder.addSource("SOURCE", "src-topic")

4.

5.          .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")

6.          .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")

7.          .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

8.

9.          .addSink("SINK1", "sink-topic1", "PROCESS1")

10.         .addSink("SINK2", "sink-topic2", "PROCESS2")

11.         .addSink("SINK3", "sink-topic3", "PROCESS3");

以上代码中包含了这些步骤:

·        首先一个名为“SOURCE”的源节点被加入拓扑,使用addSource方法,且实用“src-topic”这一Kafka topic来提供数据。

·            随后使用addProcessor方法加入三个处理器节点。这里第一个处理器是“SOURCE”节点的子节点,且是后两个节点的父节点。

·            最后使用addSink方法加入三个sink节点,每个都从一个父处理器节点中获取数据病写到一个topic中。

本地状态仓库

处理器API不仅可以处理当前到达的记录,也可以管理本地状态仓库以使得已到达的记录都可用于有状态的处理操作中(如聚合或开窗连接)。为利用本地状态仓库的优势,可使用TopologyBuilder.addStateStore方法以便在创建处理器拓扑时创建一个相应的本地状态仓库;或将一个已创建的本地状态仓库与现有处理器节点连接,通过TopologyBuilder.connectProcessorAndStateStores方法。

1.  TopologyBuilder builder = new TopologyBuilder();

2.

3.  builder.addSource("SOURCE", "src-topic")

4.

5.      .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")

6.      // create the in-memory state store "COUNTS" associated with processor "PROCESS1"

7.      .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")

8.      .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

9.      .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

10.

11.     // connect the state store "COUNTS" with processor "PROCESS2"

12.     .connectProcessorAndStateStores("PROCESS2", "COUNTS");

13.

14.     .addSink("SINK1", "sink-topic1", "PROCESS1")

15.     .addSink("SINK2", "sink-topic2", "PROCESS2")

16.     .addSink("SINK3", "sink-topic3", "PROCESS3");

高层流DSL

为使用流DSL来创建处理器拓扑,可使用KStreamBuilder类,其扩展自TopologyBuilder类。Kafka的源代码中在streams/examples包中提供了一个示例。

从Kafka创建源端流

KafkaStreams为高层流定义了两种基本抽象:记录流(定义为KStream)可从一或多个Kafka topic源来创建,更新日志流(定义为KTable)可从一个Kafka topic源来创建。两者的区别是,前者更像是传统意义上的流,每一个键值对可以看成独立的,而后者更接近Map的概念,同一个key输入两次的话,后者会将前者覆盖。

1.  KStreamBuilder builder = new KStreamBuilder();

2.

3.  KStream source1 = builder.stream("topic1", "topic2");

4.  KTable source2 = builder.table("topic3");

5.      转换一个流

6.        KStream和KTable相应地都提供了一系列转换操作。每个操作可产生一或多个KStream和KTable对象,可被翻译成一或多个相连的处理器。所有这些转换方法连接在一起形成一个复杂的处理器拓扑。因为KStream和KTable是强类型的,这些转换操作都被定义为通用函数,使得用户可指定输入和输出数据类型。

7.        这些转换中,filter、map、mapValues等是无状态的,可用于KStream和KTable两者,通常用户会传一个自定义函数给这些函数作为参数,例如Predicate给filter,KeyValueMapper给map等:

8.  // written in Java 8+, using lambda expressions

9.  KStream mapped = source1.mapValue(record -> record.get("category"));

无状态的转换不依赖于处理的状态,因此不需要状态仓库。有状态的转换则需要存取相应状态以处理和生成结果。例如,在join和aggregate操作里,一个窗口状态用于保存当前预定义窗口中收到的记录。于是转换可以获取状态仓库中累积的记录,并执行计算。

1.  // written in Java 8+, using lambda expressions

2.  KTable, Long> counts = source1.aggregateByKey(

3.      () -> 0L,  // initial value

4.      (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value

5.      HoppingWindows.of("counts").with(5000L).every(1000L), // intervals in milliseconds

6.  );

7.

8.  KStream joined = source1.leftJoin(source2,

9.      (record1, record2) -> record1.get("user") + "-" + record2.get("region");

10. );

11.  将流写回Kafka

12.      在处理的最后,用户可选择连续地将最终结果写回某个Kafka topic,通过KStream.to或KTable.to:

13. joined.to("topic4");

如果应用需要在记录被物化到topic中继续读和处理它们,你可能会想到创建一个新的流从这个输出topic中读取。Kafka Streams提供了一个方便的函数称为through:

1.  // equivalent to

2.  //

3.  // joined.to("topic4");

4.  // materialized = builder.stream("topic4");

5.  KStream materialized = joined.through("topic4");

配置参数

所有参数见下表

名称

描述

类型

默认值

application.id

流处理应用的标识,对同一个应用需要一致,因为它是作为消费的group_id的

string

bootstrap.servers

host1:port1,host2:port2 这样的列表,是用来发现所有Kafka节点的种子,因此不需要配上所有的Kafka节点

list

client.id

应用的一个客户端的逻辑名称,设定后可以区分是哪个客户端在请求

string

“"

zookeeper.connect

zookeeper连接串

string

“"

key.serde

键的序列化/反序列化类

class

class org.apache.kafka.common.serialization.Serdes$ByteArraySerde

partition.grouper

用于分区组织的类,需要实现PartitionGrouper接口

class

class org.apache.kafka.streams.processor.DefaultPartitionGrouper

replication.factor

流处理应用会创建change log topic和repartition topic用于管理内部状态,这个参数设定这些topic的副本数

int

1

state.dir

状态仓库的存储路径

string

/tmp/kafka-streams

timestamp.extractor

时间戳抽取类,需要实现TimestampExtractor接口

class

class org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor

value.serde

值的序列化/反序列化类

class

class org.apache.kafka.common.serialization.Serdes$ByteArraySerde

buffered.records.per.partition

每个分区缓存的最大记录数

int

1000

commit.interval.ms

存储处理器当前位置的间隔毫秒数

long

30000

metric.reporters

用于性能报告的类列表。需要实现MetricReporter接口。JmxReporter会永远开启不需要指定

list

[]

metric.num.samples

计算性能需要的采样数

int

2

metric.sample.window.ms

性能采样的时间间隔

long

30000

num.standby.replicas

每个任务的后备副本数

int

0

num.stream.threads

执行流处理的线程数

int

1

poll.ms

等待输入的毫秒数

long

100

state.cleanup.delay.ms

一个分区迁移后,在删除状态前等待的毫秒数

long

60000

3 总结

综上,Kafka Streams的价值体现在以下几点,首先它提供了两套轻量且易用的API有效降低了Kafka数据流处理的开发成本,在这之前可以使用SparkStreaming(不支持单条消费)、Storm(必须使用Trident才支持时间窗),或者自己写consumer(以前高层API还好,低层API是初学者的噩梦,最欢乐的是官方将低层API称为“Simple API”),现在至少又多了一种选择。其次用它开发的应用支持跑在Yarn、Mesos、Docker或者纯Java应用内,比较灵活。再次是数据流的两种抽象比较有意思,目前我还没有深入研究,但觉得用来处理不去重/去重的场景简直太方便了。当然缺点也有,首先目前不支持异步操作,这就需要开发者小心在处理方法中不能有高开销动作,否则整个处理线程阻塞。另外如果需要SQL接口或者ML能力,那还是去找SparkStreaming吧。

kafka Steams详解相关推荐

  1. Kafka 原理详解

    Kafka 原理详解 1 kakfa基础概念说明 Broker:消息服务器,就是我们部署的一个kafka服务 Partition:消息的水平分区,一个Topic可以有多个分区,这样实现了消息的无限量存 ...

  2. python使用kafka原理详解_Python操作Kafka原理及使用详解

    Python操作Kafka原理及使用详解 一.什么是Kafka Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理 ...

  3. kafka实战教程(python操作kafka),kafka配置文件详解

    全栈工程师开发手册 (作者:栾鹏) 架构系列文章 应用往Kafka写数据的原因有很多:用户行为分析.日志存储.异步通信等.多样化的使用场景带来了多样化的需求:消息是否能丢失?是否容忍重复?消息的吞吐量 ...

  4. Kafka 安装详解

    注意:确保有JDK1.8版本及以上 官方文档:https://kafka.apache.org/quickstart 清华镜像下载:https://mirrors.tuna.tsinghua.edu. ...

  5. 日志文件和mysql同步到kafka_logstash_output_kafka:Mysql同步Kafka深入详解

    0.题记 实际业务场景中,会遇到基础数据存在Mysql中,实时写入数据量比较大的情景.迁移至kafka是一种比较好的业务选型方案. 而mysql写入kafka的选型方案有: 方案一:logstash_ ...

  6. python使用kafka原理详解真实完整版_转:Kafka史上最详细原理总结 ----看完绝对不后悔...

    消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一.下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果. 1.1  K ...

  7. Kafka参数详解及调优--生产者

    在实际的kafka开发中,我们会发现,无论是生产者还是消费者,都需要构建一个Properties对象,里面设置了很多参数.对于很多初学者来说,会看不懂这些参数分别代表什么含义. 在本篇文章我们就来详细 ...

  8. Kafka消费者详解

    一.基本概念 1.消费者和消费组 Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息.假设有一个T1主题,该主题有4个分区:同时我们有一个消费组G ...

  9. Kafka生产者详解

    一.消息发送 1.java客户端数据生产流程解析 ① 首先要构造一个 ProducerRecord 对象,该对象可以声明主题Topic.分区Partition.键 Key以及值 Value,主题和值是 ...

最新文章

  1. maven项目update报错
  2. Java课程主观题作业_JAVA课程作业01
  3. 坐标1-based和0-based
  4. new一个xssfworkbook时出错_java用poi操作excel的时候,new XSSFWorkbook,总是报错?
  5. html灵活响应 图片设置,jQuery轻量级响应式图片轮播插件ResponsiveSlides.js(仅1kb)
  6. Vue学习之路---No.7(分享心得,欢迎批评指正)
  7. PowerShell~文件操作和对象遍历
  8. Docker解析及轻量级PaaS平台演练(一)--Docker简介与安装
  9. thinkphp 3 升到 thinkphp 5 或更高 ( 资料收集)
  10. C语言入门——内功心法
  11. 超声乳化设备行业调研报告 - 市场现状分析与发展前景预测(2021-2027年)
  12. 01 按指定格式读写文件出现乱码
  13. MTK:DrvGen驱动的使用
  14. 电脑ip地址设置_关于路由器动态IP如何设置教程
  15. 开源中国社区(OsChina.NET) 8月第3周 精彩回顾
  16. 微信防撤回补丁来了!更新也不会失效
  17. 数据开发常用的几种数据预处理和数据整理方法
  18. 为什么我朋友的移动手机打不开我的网站却能打开www.ip138.com?
  19. LNBP11L_LNB电源和控制电压调节器——科时进商城
  20. 我们距离构建一个逼真的虚拟世界还有多远

热门文章

  1. 2017华为软件精英挑战赛参赛过程回顾与心得
  2. 跟着鬼哥学so改动,一,准备篇
  3. 1695. 果壳游戏
  4. 一个农村小姑娘的麻辣作文
  5. java mysql的一次报错,Public Key Retrieval is not allowed
  6. 哪些机型适配了android11,ColorOS11适配机型有哪些 ColorOS11支持升级机型汇总
  7. 坐上回到过去的「时光机」:2023 互联网怀旧指南
  8. Ruby 正则表达式
  9. 【评价模型】对相关系数矩阵做因子分析
  10. c volaitle