流式计算之kafka Stream

概念

一般流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,可以认为在时间上是无界的,也就意味着,永远拿不到全量数据去做计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算一般对实时性要求较高,同时一般是先定义目标计算,然后数据到来之后将计算逻辑应用于数据。同时为了提高计算效率,往往尽可能采用增量计算代替全量计算,是可以源源不断的产生数据,源源不断的接收数据,没有边界。
Kafka Stream的特点如下:

kafka Stream

  • Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
  • 除了Kafka外,无任何外部依赖
  • 充分利用Kafka分区机制实现水平扩展和顺序性保证
  • 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
  • 支持正好一次处理语义
  • 提供记录级的处理能力,从而实现毫秒级的低延迟
  • 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
  • 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)

1.搭建环境

因为Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

此处我使用的是linux系统,云服务器,Docker进行的一个安装

Docker安装zookeeper:
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

Docker安装kafka:
创建容器
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka
–env KAFKA_ADVERTISED_HOST_NAME=ip地址
–env KAFKA_ZOOKEEPER_CONNECT=ip地址
–env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://ip地址:9092
–env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
–env KAFKA_HEAP_OPTS=“-Xmx256M -Xms256M”
-p 9092:9092 wurstmeister/kafka:2.12-2.3.1
注:因为此处我使用的是云服务器,所以是-p 9092:9092 wurstmeister/kafka:2.12-2.3.1,如果是本地的虚拟机则改为:–net=host wurstmeister/kafka:2.12-2.3.1

2. 导入pom依赖

<!-- kafkfa --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId><exclusions><exclusion><artifactId>connect-json</artifactId><groupId>org.apache.kafka</groupId></exclusion></exclusions></dependency>

3.测试案例

生产者

package com.wcl.demo;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;/*** 生产者*/
public class ProducerQuickStart {public static void main(String[] args) throws ExecutionException, InterruptedException {//2.添加配置Properties properties = new Properties();//kafka的连接地址properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"ip地址:9092");//发送失败,失败的重试次数properties.put(ProducerConfig.RETRIES_CONFIG,5);//消息key的序列化器properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//消息value的序列化properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");//1.创建生产者对象KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);//3.封装发送的消息ProducerRecord<String, String> record = new ProducerRecord<String, String>("topic_input","kafka001","hello,world");//4.发送消息//4.1 同步发送   使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功RecordMetadata recordMetadata = producer.send(record).get();System.out.println(recordMetadata);//5.关闭消息通道,否则发不出去producer.close();}
}

消费者

package com.wcl.demo;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;/*** 消费者*/
public class ConsumerQuickStart {public static void main(String[] args) {//2.添加kafka的配置信息Properties properties = new Properties();//kafka的连接地址properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "ip地址:9092");//消费者组properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group2");//消息的反序列化器properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//1.创建消费者对象KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);//3.订阅主题consumer.subscribe(Collections.singletonList("topic_out"));//4.获取消息,while循环,一直处于接收状态while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));for (ConsumerRecord<String, String> record : records) {System.out.println(record.key());System.out.println(record.value());}}}
}

流式就是 kafka stream

package com.wcl.demo;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.*;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaStreamQuickStart {public static void main(String[] args) {//kafka配置中心Properties properties = new Properties();properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"ip地址:9092");properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,Serdes.String().getClass());properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-quickstart");//stream构造器StreamsBuilder streamsBuilder = new StreamsBuilder();//流式计算//指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("topic_input");// 处理接收到的消息:valuestream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String s) {return Arrays.asList(s.split(" "));}})//按照value进行聚合处理.groupBy((key, value) -> value)//时间窗口 此处为10秒.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//统计单词的个数.count()//转换为kStream.toStream().map(new KeyValueMapper<Windowed<String>, Long, KeyValue<?, ?>>() {@Overridepublic KeyValue<?, ?> apply(Windowed<String> stringWindowed, Long aLong) {System.out.println("key:====>"+stringWindowed+", value:=====>"+aLong);System.out.println("key测试:====>"+stringWindowed.key()+", value:=====>"+aLong.toString());return new KeyValue<>(stringWindowed.key().toString(),aLong.toString());}})//发送消息.to("topic_out");//创建kafkaStream对象KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);//开启流式计算kafkaStreams.start();}
}

结果:

  • 使用生产者在topic为:topic_input中发送多条消息

  • 使用消费者接收topic为:topic_out

  • 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出

SpringBoot集成Kafka Stream

(1)自定配置参数

package com.wcl.demo.config;import lombok.Getter;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;import java.util.HashMap;
import java.util.Map;@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {//    private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;private String hosts;private String group;@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {Map<String, Object> props = new HashMap<>();//连接信息props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);//设置一个组props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");//设置一个应用名称props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");//重试测试props.put(StreamsConfig.RETRIES_CONFIG, 10);//keyprops.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());//valueprops.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());return new KafkaStreamsConfiguration(props);}
}

(2)新增配置类,创建KStream对象,进行聚合

package com.wcl.demo.config;import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.time.Duration;
import java.util.Arrays;@Configuration
@Slf4j
public class KafkaStreamHelloListener {@Beanpublic KStream<String,String> kStream(StreamsBuilder streamsBuilder){//创建kstream对象,同时指定从那个topic中接收消息KStream<String, String> stream = streamsBuilder.stream("topic_input");stream.flatMapValues(new ValueMapper<String, Iterable<String>>() {@Overridepublic Iterable<String> apply(String value) {return Arrays.asList(value.split(" "));}})//根据value进行聚合分组.groupBy((key,value)->value)//聚合计算时间间隔.windowedBy(TimeWindows.of(Duration.ofSeconds(10)))//求单词的个数.count().toStream()//处理后的结果转换为string字符串.map((key,value)->{System.out.println("key:"+key+",value:"+value);return new KeyValue<>(key.key().toString(),value.toString());})//发送消息.to("topic_out");return stream;}
}

(3)修改application.yml文件

spring:application:name: kafkaStreamkafka:hosts: 122.112.249.147:9092group: ${spring.application.name}

测试:启动服务,和消费者,在通过生产者发送消息,完成测试

流式计算之kafka Stream相关推荐

  1. 流式计算新贵Kafka Stream设计详解--转

    原文地址:https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162822&idx=1&sn=8c4611436 ...

  2. mysql流式计算,Stream流式计算

    Stream流式计算 什么是Stream流式计算 数据处理中不可缺少的两部分:存储 + 计算 集合.MySQL本质就是存储数据的,计算都应该交给流来操作! package com.kuang.stre ...

  3. java1.8 流式计算:利用接口的函数式编程 + 链式编程

    java1.8 流式计算:利用接口的函数式编程 + 链式编程 文章目录 java1.8 流式计算:利用接口的函数式编程 + 链式编程 1.流式计算 1)ArrayList和Stream关联的底层解析( ...

  4. Kafka设计解析(七)- 流式计算的新贵 Kafka Stream

    http://www.infoq.com/cn/articles/kafka-analysis-part-7 Kafka Stream背景 Kafka Stream是什么 Kafka Stream是A ...

  5. spark kafka java api_java实现spark streaming与kafka集成进行流式计算

    java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...

  6. Stream流式计算

    一.什么是Stream流式计算(学习此之前,要先知道四大函数式接口) 1.常用的集合是为了存储数集,而对于集合数据的一些处理(像筛选集合数据等)可以使用Stream流来处理         2.jav ...

  7. 专访阿里云高级技术专家吴威:Kafka、Spark和Flink类支持流式计算的软件会越来越流行...

    杭州·云栖大会将于2016年10月13-16日在云栖小镇举办,在这场标签为互联网.创新.创业的云计算盛宴上,众多行业精英都将在这几天里分享超过450个演讲主题. 为了帮助大家进一步了解这场全球前言技术 ...

  8. [编程] Java8 Stream(流式计算) 常见的一些用法汇总

    前提:以下基于 List<Student> 列表进行举例,大家实际使用进行举一反三即可.大同小异,Java8 的流式计算功能很强大,需要大家实际应用中逐渐挖掘更高级的用法. Student ...

  9. java基础知识——流式计算Stream API

    文章目录 一.基本概念 二.创建流 三.中间操作 3.1 filter 3.2 map 3.3 distinct 3.4 sorted 3.5 limit 3.6 skip 3.7 flatMap 四 ...

最新文章

  1. Java8 stream操作
  2. python socket select 错误 Filedescriptor out of range in select 解决方法
  3. linux resin 自动启动不了,Resin 安装-配置-自启动-Linux
  4. 用idea新建springboot项目遇到的@Restcontroller不能导入的问题
  5. Robolectric 探索之路
  6. centos 7 redis-4.0.11 主从
  7. java jni 生成_利用javah技术生成jni接口的详细步骤 | 学步园
  8. oracle卸载重新安装失败,Oracle卸载重新安装——实战
  9. 使用subs和evals函数对sympy中的符号进行赋值并且设置数值位数
  10. 32位与64位到底什么区别?
  11. IBM Spectrum LSF RTM
  12. java计算机毕业设计教师管理系统源码+mysql数据库+系统+lw文档+部署
  13. 好吃易做的简单菜谱家常菜做法
  14. 腾讯X5浏览器简单使用
  15. 单片机数字滤波c语言程序,单片机系统中数字滤波的算法【C程序整理】
  16. 克转换成千克怎么算python_如何在Python中将磅转换为千克
  17. 阿里云1M带宽够不够用?可以支持多少访问量
  18. 机器人总动员片尾曲歌词_机器人瓦力 主题曲 很感人的那首歌 叫什么名字
  19. 拖放(DragDrop)
  20. java怎么表示正无穷大_如何在Java中实现无穷大?

热门文章

  1. c语言while循环小于0判断为true还是false?
  2. 集成sleuth_Spring Cloud Sleuth整合zipkin过程解析
  3. iPhone 13定了,定了,定了,发布时间曝光
  4. 移动APP后端网络处理一些问题记录
  5. 使用python爬取淘宝商品信息
  6. 水果店行业前景分析,水果店好干吗
  7. sws_scale函数
  8. 【ABAQUS之二次开发】如何利用坐标表达式选择网格节点
  9. vue项目中点击button下载文件到浏览器
  10. 幸福手机,给爸妈的高端大气上档次的手机