KafkaStreams

  • 介绍
  • 需求
  • 代码演示
    • 准备主题
    • 启动控制台生产者发送单词

介绍

  • Kafka在0.10.0.0版本以前的定位是分布式,分区化的,带备份机制的日志提交服务。
  • 在这之前kafka也没有提供数据处理的服务。
  • 大家的流处理计算主要是还是依赖于Spark Streaming,Flink等流式处理框架。
  • 但是他们都离不开Kafka的消息中转,所以Kafka于0.10.0.0版本推出了自己的流处理框架,Kafka Streams。
  • Kafka的定位也正式变成为了Apache Kafka® is a distributed streaming platform,分布式流处理平台
  • 目前KafkaStreams在实际中用的不多
  • 流式数据计算方面用的更多的还是SparkStreaming和Flink,他们更专业

需求

  • 从teststream主题接收数据,并做单词统计
  • 如:
    • 输入 kafka kafka spark spark spark ,
    • 得出kafka 2 spark 3

代码演示

准备主题

kafka-topics.sh --zookeeper node01:2181 --create --topic teststream --replication-factor 2 --partitions 3

启动控制台生产者发送单词

kafka-console-producer.sh --broker-list node01:9092 --topic teststream
package cn.hanjiaxiaozhi.stream;import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;import java.util.Arrays;
import java.util.Properties;/*** Author hanjiaxiaozhi* Date 2020/7/11 16:18* Desc* 从teststream主题接收数据,并做单词统计* 如:* 输入 kafka kafka spark spark spark ,* 得出kafka 2   spark 3*/
public class WordCount {public static void main(String[] args) {//1.准备参数Properties props = new Properties();props.put(StreamsConfig.APPLICATION_ID_CONFIG, "MyWordCount");props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());//2.进行流数据处理StreamsBuilder builder = new StreamsBuilder();//接收到一行行的数据,如kafka kafka spark spark sparkKStream<String, String> textLines = builder.stream("teststream");//对上面的数据按照单词进行切分KTable<String, Long> wordCounts = textLines.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))//按照单词进行分组,这样相同的单词就到同一个组中了.groupBy((key, word) -> word)//对各个组内的单词进行计数,并存在counts-store中.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));//3.输出统计结果wordCounts.foreach((k,v)-> System.out.println(k+" : "+v));//4.创建并启动流程序KafkaStreams streams = new KafkaStreams(builder.build(), props);streams.start();}
}

KafkaStreams相关推荐

  1. kafka-streams api示例

    源码 import java.util.Properties;import org.apache.kafka.common.serialization.Serdes; import org.apach ...

  2. Java高级面试题!kafkastreams加时间窗口的count

    蚂蚁金服一面: 下午杭州的电话,问有没有空,果断有空,虽然感觉略显紧张,有点懵逼. 面试的题目: HashMap和Hashtable的区别 实现一个保证迭代顺序的HashMap 说一说排序算法,稳定性 ...

  3. kafka-2.11-2.3.0版本配置文件参数详解_Kafka版本特性总结

    截止2020.2.14,Kakfa最新版本为2.4.0,最早版本为0.7.0,最新版本为2.4.0.当前共有39个版本,总结的特性不包含Kafka Stream和Kafka connect. 版本介绍 ...

  4. Kafka核心设计与实践原理总结:进阶篇

    作者:未完成交响曲,资深Java工程师!目前在某一线互联网公司任职,架构师社区合伙人! kafka作为当前热门的分布式消息队列,具有高性能.持久化.多副本备份.横向扩展能力.我学习了<深入理解K ...

  5. 从安装Kafka服务到运行WordCount程序

    之所以写这篇文章,是因为Kafka初学的同学在了解了Kafka的基本原理之后,希望在自己的机器上面运行最简单的wordCount的时候,从开始安装Kafka到找到合适的example源码最后到成功运行 ...

  6. kafka java_Kafka 使用Java实现数据的生产和消费demo

    前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...

  7. spark context stop use with as

    调用方法: with session.SparkStreamingSession('CC_Traffic_Realtime', ssc_time_windown) as ss_session:kafk ...

  8. Apache Kafka:大数据的实时处理时代

    在过去几年,对于 Apache Kafka 的使用范畴已经远不仅是分布式的消息系统:我们可以将每一次用户点击,每一个数据库更改,每一条日志的生成,都转化成实时的结构化数据流,更早的存储和分析它们,并从 ...

  9. spark streaming 消费 kafka入门采坑解决过程

    kafka 服务相关的命令 # 开启kafka的服务器 bin/kafka-server-start.sh -daemon config/server.properties & # 创建top ...

最新文章

  1. R语言使用ggpubr包的ggline函数绘制各种漂亮形式的线图实战
  2. [ 1001] 动态开辟二维数组的说明
  3. python3 生成器的send_Python:生成器中send()的行为
  4. Oracel中连接的总结(一)
  5. ERP与SCM之区别
  6. Golang系列:打印命令行参数
  7. 【Python CheckiO 题解】House Password
  8. .NET框架程序设计
  9. 内部存储_Mongodb存储特性与内部原理
  10. linux 常见问题集 q,Linux新手几个常见问题集
  11. 【广度优先搜索】一个实例+两张动图彻底理解 BFS | 思路+代码详解 | 用 DFS 自动控制我们的小游戏
  12. 复函数图像怎么画_matlab复变函数画图形
  13. Echart资源网站
  14. 最易学和最难学编程语言排行榜!
  15. 【DZS-12CE/S DC220V型直流回路监视继电器】
  16. spark中RSS工具简介
  17. [jzoj 4742] 单峰 {快速幂}
  18. 公主连结显示服务器内部错误,公主连结Re:Dive无法连接服务器是什么原因
  19. 水平居中和垂直居中css_如何使用CSS将图像垂直和水平居中
  20. Ace,CodeMirror 和 Monaco:Web 代码编辑器的对比

热门文章

  1. php 中 t怎么打开,PHP中的流
  2. Html5开发 微信视频及夸克手机浏览器问题
  3. 互联网行业,哪些岗位越老越吃香?
  4. 解决Element UI 组件el-popover图片溢出屏幕可视区域问题(popover定位问题)
  5. 写给初学者:如何在keil里建立一个自己的库,以及如何调用
  6. ld: framework not found FileProvider
  7. 2019年武汉Web前端开发薪酬数据,可以了解一下!
  8. 无线传感器网络 与 OMNET++学习笔记(一)
  9. EasyCrawler-爬取某岛国的病例统计网
  10. 实现CEGUI中文汉字输入法光标跟随(C/C++源码)