目录

  • 1 实时流式计算
    • 1.1 概念
    • 1.2 应用场景
    • 1.3 技术方案选型
  • 2 Kafka Stream
    • 2.1 概述
    • 2.2 Kafka Streams的关键概念
    • 2.3 KStream&KTable
    • 2.4 Kafka Stream入门案例编写
    • 2.5 SpringBoot集成Kafka Stream

1 实时流式计算

1.1 概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算。

流式计算就相当于上图的右侧扶梯,是可以源源不断的产生数据,源源不断的接收数据,没有边界。

1.2 应用场景

  • 日志分析

网站的用户访问日志进行实时的分析,计算访问量,用户画像,留存率等等,实时的进行数据分析,帮助企业进行决策

  • 大屏看板统计

可以实时的查看网站注册数量,订单数量,购买数量,金额等。

  • 公交实时数据

可以随时更新公交车方位,计算多久到达站牌等

  • 实时文章分值计算

头条类文章的分值计算,通过用户的行为实时文章的分值,分值越高就越被推荐。

1.3 技术方案选型

  • Hadoop

  • Apche Storm

Storm 是一个分布式实时大数据处理系统,可以帮助我们方便地处理海量数据,具有高可靠、高容错、高扩展的特点。是流式框架,有很高的数据吞吐能力。

  • Kafka Stream

可以轻松地将其嵌入任何Java应用程序中,并与用户为其流应用程序所拥有的任何现有打包,部署和操作工具集成。

2 Kafka Stream

2.1 概述

Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。

Kafka Stream的特点如下:

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

2.2 Kafka Streams的关键概念

(1)Stream处理拓扑

  • 是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
  • 通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。
  • 流处理器处理器拓扑中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。

(2)在拓扑中有两个特别的处理器:

  • 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。

  • Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题

2.3 KStream&KTable

(1)数据结构类似于map,如下图,key-value键值对

(2)KStream

KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。

KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->("alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回4alice。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据

(3)KTable

KTable传统数据库,包含了各种存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(update)

为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:

(“ alice”,1)->(“” alice“,3)

如果您的流处理应用是要总结每个用户的价值,它将返回3alice。为什么?因为第二条数据记录将被视为先前记录的更新。

KStream - 每个新数据都包含了部分信息。

KTable - 每次更新都合并到原记录上。

2.4 Kafka Stream入门案例编写

(1)引入依赖

在之前的kafka-demo工程的pom文件中引入

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><version>${kafka.client.version}</version><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>

(2)创建类

package com.oldlu.kafka.simple;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;import java.util.Arrays;
import java.util.Properties;/*** 需求:*  接收kafka消息内容并计算消息内单词的个数*  如:*      hello kafka stareams*      hello oldlu kafka*      hello beijing oldlu kafka**  结果:*      hello  3*      kafka 3*      streams 1*      oldlu 2*      beijing 1*/
public class KafkaStreamFastStart {public static void main(String[] args) {//kafka配置信息Properties prop = new Properties();prop.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");prop.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());prop.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-faststart");//stream构建器StreamsBuilder builder = new StreamsBuilder();//流式计算group(builder);//创建kafkaStreamKafkaStreams kafkaStreams = new KafkaStreams(builder.build(),prop);//开启kafka流计算kafkaStreams.start();}/*** 实时流式计算* @param builder*/private static void group(StreamsBuilder builder) {//接收上游处理器的消息KStream<String, String> stream = builder.stream("input_topic");KStream<String, String> map = stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {/*** 把消息中的词组,转换为一个一个的单词放到集合中* @param value* @return*/@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {/*** 把消息的key,重新赋值,目前消息的key,就是一个个的单词,把单词作为key进行聚合* @param key* @param value* @return*/@Overridepublic KeyValue<String, String> apply(String key, String value) {return new KeyValue<>(value, value);}})//根据key进行分组  目前的key 就是value,就是一个个的单词.groupByKey()//聚合的时间窗口  多久聚合一次.windowedBy(TimeWindows.of(10000))//聚合  求单词的个数,调用count后,消息的vlaue是聚合单词后的统计数值  是一个long类型//Materialized.as("count-article-num-001")  是当前消息的状态值,不重复即可.count(Materialized.as("count-article-num-001"))//转换成 Kstream.toStream()//把处理后的key和value转成string.map((key, value) -> {return new KeyValue<>(key.key().toString(), value.toString());});//处理后的结果,发送给下游处理器map.to("out_topic");}
}

(3)测试

准备

  • 使用生产者在topic为:input_topic中发送多条消息
  • 使用消费者接收topic为:out_topic

①生产者代码修改ProducerFastStart

package com.oldlu.kafka.simple;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.protocol.types.Field;import java.util.Properties;/*** 消息生产者*/
public class ProducerFastStart {private static final String INPUT_TOPIC="input_topic";public static void main(String[] args) {//添加kafka的配置信息Properties properties = new Properties();//配置broker信息properties.put("bootstrap.servers","192.168.200.130:9092");properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.RETRIES_CONFIG,10);//生产者对象KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);try {//封装消息for (int i = 0; i < 10; i++) {if (i % 2 == 0) {ProducerRecord<String,String> record =new ProducerRecord<String, String>(INPUT_TOPIC,i+"","hello shanghai kafka stream hello");//发送消息producer.send(record);System.out.println("发送消息:"+record);}else {ProducerRecord<String,String> record =new ProducerRecord<String, String>(INPUT_TOPIC,i+"","helloworld kafka stream");//发送消息producer.send(record);System.out.println("发送消息:"+record);}}}catch (Exception e){e.printStackTrace();}//关系消息通道producer.close();}
}

②消费者ConsumerFastStart

package com.oldlu.kafka.simple;import com.sun.scenario.effect.Offset;
import jdk.nashorn.internal.runtime.logging.Logger;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;/*** 消息消费者*/
public class ConsumerFastStart {private static final String OUT_TOPIC="out_topic";public static void main(String[] args) {//添加配置信息Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.200.130:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//设置分组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");//创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//订阅主题consumer.subscribe(Collections.singletonList(OUT_TOPIC));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key()+":"+record.value());/*try {//手动提交偏移量consumer.commitSync();}catch (CommitFailedException e){e.printStackTrace();System.out.println("记录错误信息为:"+e);}*/}consumer.commitAsync(new OffsetCommitCallback() {@Overridepublic void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {if(e!=null){System.out.println("记录当前错误信息时提交的偏移量"+map+",异常信息为:"+e);}}});}}
}

结果:

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

2.5 SpringBoot集成Kafka Stream

从资料文件夹中把提供好的4个类拷贝到项目的config目录下

当前kafka-demo项目需要添加lombok的依赖包

<properties><lombok.version>1.18.8</lombok.version>
</properties><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version><scope>provided</scope>
</dependency>

(1)自定配置参数

/*** 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;/*** 重新定义默认的KafkaStreams配置属性,包括:* 1、服务器地址* 2、应用ID* 3、流消息的副本数等配置* @return*/@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");props.put(StreamsConfig.RETRIES_CONFIG, 10);props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());// 消息副本数量props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());return new KafkaStreamsConfiguration(props);}
}

修改application.yml文件,在最下方添加自定义配置

kafka:hosts: 192.168.200.130:9092group: ${spring.application.name}

(2)定义监听接口

/*** 流数据的监听消费者实现的接口类,系统自动会通过* KafkaStreamListenerFactory类扫描项目中实现该接口的类,* 并注册为流数据的消费端。** 其中泛型可是KStream或KTable* @param <T>*/
public interface KafkaStreamListener<T> {// 监听的类型String listenerTopic();// 处理结果发送的类String sendTopic();// 对象处理逻辑T getService(T stream);}

(3)KafkaStream自动处理包装类

/*** KafkaStream自动处理包装类*/
public class KafkaStreamProcessor {// 流构建器StreamsBuilder streamsBuilder;private String type;KafkaStreamListener listener;public KafkaStreamProcessor(StreamsBuilder streamsBuilder,KafkaStreamListener kafkaStreamListener){this.streamsBuilder = streamsBuilder;this.listener = kafkaStreamListener;this.parseType();Assert.notNull(this.type,"Kafka Stream 监听器只支持kstream、ktable,当前类型是"+this.type);}/*** 通过泛型类型自动注册对应类型的流处理器对象* 支持KStream、KTable* @return*/public Object doAction(){if("kstream".equals(this.type)) {KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));stream=(KStream)listener.getService(stream);stream.to(listener.sendTopic());return stream;}else{KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));table = (KTable)listener.getService(table);table.toStream().to(listener.sendTopic());return table;}}/*** 解析传入listener类的泛型类*/private void parseType(){Type[] types = listener.getClass().getGenericInterfaces();if(types!=null){for (int i = 0; i < types.length; i++) {if( types[i] instanceof ParameterizedType){ParameterizedType t = (ParameterizedType)types[i];String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();if(name.contains("org.apache.kafka.streams.kstream.kstream")||name.contains("org.apache.kafka.streams.kstream.ktable")){this.type = name.substring(0,name.indexOf('<')).replace("org.apache.kafka.streams.kstream.","").trim();break;}}}}}
}

(4)KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程

@Component
public class KafkaStreamListenerFactory implements InitializingBean {Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);@AutowiredDefaultListableBeanFactory defaultListableBeanFactory;/*** 初始化完成后自动调用*/@Overridepublic void afterPropertiesSet() {Map<String, KafkaStreamListener> map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);for (String key : map.keySet()) {KafkaStreamListener k = map.get(key);KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;//注册baen,并且执行doAction方法defaultListableBeanFactory.registerSingleton(beanName,processor.doAction());logger.info("add kafka stream auto listener [{}]",beanName);}}
}

(5)手动创建监听器

1,该类需要实现KafkaStreamListener接口

2,listenerTopic方法返回需要监听的topic

3,sendTopic方法返回需要处理完后发送的topic

4,getService方法,主要处理流数据

package com.oldlu.kafka.stream.listerer;import com.oldlu.kafka.config.KafkaStreamListener;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.stereotype.Component;import java.util.Arrays;/*** KafkaStreamListener的泛型是固定的,有两种泛型可以选择* KTable* KStream*/
@Component
public class StreamHandlerListener implements KafkaStreamListener<KStream<String,String>> {/*** 在哪里接收消息  源处理器* @return*/@Overridepublic String listenerTopic() {return "input_topic";}/*** 计算完成后的结果发送到什么位置  下游处理器* @return*/@Overridepublic String sendTopic() {return "out_topic";}@Overridepublic KStream<String, String> getService(KStream<String, String> stream) {//计算return stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {/*** 把消息中的词组,转换为一个一个的单词放到集合中* @param value* @return*/@Overridepublic Iterable<String> apply(String value) {System.out.println("消息的value:"+value);//hello kafka stareamsString[] strings = value.split(" ");return Arrays.asList(strings);}}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {/*** 把消息的key,重新赋值,目前消息的key就是单词* @param key* @param value* @return*/@Overridepublic KeyValue<String, String> apply(String key, String value) {return new KeyValue<>(value,value);}}).groupByKey()//时间聚合窗口.windowedBy(TimeWindows.of(5000))//消息的value就是聚合单词后的统计数值,long类型.count(Materialized.as("count-word-num-0001"))//转换为Kstream.toStream()//把处理后的key和value转换String.map((key,value)->{return new KeyValue<>(key.key().toString(),value.toString());});}
}

测试:

启动微服务,正常发送消息,可以正常接收到消息

kafkaStream处理实时流式计算相关推荐

  1. 高大上的介绍实时流式计算!

    实时流式计算,也就是RealTime,Streaming,Analyse,在不同的领域有不同的定义,这里我们说的是大数据领域的实时流式计算. 实时流式计算,或者是实时计算,流式计算,在大数据领域都是差 ...

  2. Oceanus的实时流式计算实践与优化

    导语 | 随着互联网场景的不断深化发展,业务实时化趋势越来越强,要求也越来越高.特别是在广告推荐.实时大屏监控.实时风控.实时数仓等各业务领域,实时计算已经成为了不可或缺的一环.在大数据技术的不断发展 ...

  3. JStorm—实时流式计算框架入门介绍

    JStorm介绍   JStorm是参考storm基于Java语言重写的实时流式计算系统框架,做了很多改进.如解决了之前的Storm nimbus节点的单点问题.   JStorm类似于Hadoop ...

  4. 为什么阿里会选择 Flink 作为新一代流式计算引擎?

    本文由 [AI前线]原创,ID:ai-front,原文链接:t.cn/ROISIr3 [AI前线导读]2017 年 10 月 19日,阿里巴巴的高级技术专家王绍翾(花名"大沙")将 ...

  5. Flink系列-1、流式计算简介

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 大数据系列文章目录 官方网址:https://flink.apache.org/ 学习资料:h ...

  6. 流式计算strom,Strom解决的问题,实现实时计算系统要解决那些问题,离线计算是什么,流式计算什么,离线和实时计算区别,strom应用场景,Strorm架构图和编程模型(来自学习资料)

    1.背景-流式计算与storm 2011年在海量数据处理领域,Hadoop是人们津津乐道的技术,Hadoop不仅可以用来存储海量数据,还以用来计算海量数据.因为其高吞吐.高可靠等特点,很多互联网公司都 ...

  7. 大数据workshop:《在线用户行为分析:基于流式计算的数据处理及应用》之《实时数据分析:海量日志数据多维透视》篇...

    实验背景介绍 了解更多2017云栖大会·成都峰会 TechInsight & Workshop. 本手册为云栖大会Workshop之<在线用户行为分析:基于流式计算的数据处理及应用> ...

  8. 流式计算、实时计算和离线计算

    流式计算和批处理计算 实时计算和离线计算 以水为例,Hadoop可以看作是纯净水,一桶桶地搬:而Storm是用水管,预先接好(Topology),然后打开水龙头,水就源源不断地流出来了. 1.流式计算 ...

  9. Flink - 批量、流式计算和离线、实时计算

    在了解Flink之前,我们需要先简单了解批量.流式计算和离线.实时计算. 首先需要明确的一点是,批量.流式计算和离线.实时计算是按照不同维度划分的两套数据处理方式. (1)批量.流式计算体现在数据计算 ...

最新文章

  1. 任艳频 | 竞赛12年纪念文集--后记
  2. OpenJudge 2757 最长上升子序列 / Poj 2533 Longest Ordered Subsequence
  3. ubuntu linux下使用vscode切换python虚拟环境
  4. 目前市场上用于个人计算机的硬盘尺寸是,第5章-硬盘(计算机组装与维护).docx
  5. SpringFox Swagger2注解基本用法
  6. python培训班靠谱吗-什么样的python培训机构靠谱?
  7. 达梦数据库存储过程调用
  8. 计算机关闭提示音,即将发布:如何关闭Apple计算机启动提示音
  9. 统计学(贾俊平《第七版》)知识总结
  10. springbootBBS问答社区系统的设计与实现毕业设计源码121007
  11. 怎样把已经做好的网页传到网上去?
  12. 数字电路中的锁存器(latch)和各种触发器(flip-flop)
  13. Apache反向代理配置
  14. ctfshow萌新赛web
  15. iOS视频转Gif(附example code)
  16. 是真正的发现,还是可耻的堕落?
  17. 桌面文件删除了怎么恢复
  18. 多线程使用场景(经典必看)
  19. 计算多边形(polygon)面积的算法原理和python实现
  20. 【JZOJ 省选模拟】多项式(poly)

热门文章

  1. 淘宝API接口 upload_img - 上传图片到淘宝
  2. 黑客就在你身边 - 通过名片怎么黑掉你
  3. 《网络安全工程师笔记》 第十二章:域
  4. 普通人应该如何找到,一个适合自己的创业项目?开一家赚钱的小店?
  5. 学计算机买笔记本是i5 i7,买电脑时怎么选处理器呢?i7就一定强过i5吗?
  6. r7 4800h安装linux,华硕天选(R7-4800H) u盘pe如何重装win7系统
  7. springmvc上床多个文件简单记录
  8. 【万字长文,Java常用算法面试题
  9. 魔众文库系统 v2.1.0 页面SEO优化,系统升级调整
  10. linux安装i219网卡驱动下载,Linux redhat 6.5 安装网卡I219-LM 驱动