spark streaming 接收 kafka 数据java代码WordCount示例
1. 首先启动zookeeper
2. 启动kafka
3. 核心代码
- 生产者生产消息的java代码,生成要统计的单词
package streaming;import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("metadata.broker.list","localhost:9092"); props.setProperty("serializer.class","kafka.serializer.StringEncoder"); props.put("request.required.acks","1"); ProducerConfig config = new ProducerConfig(props); //创建生产这对象Producer<String, String> producer = new Producer<String, String>(config);//生成消息KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("top1","test kafka");KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("top2","hello world");try { int i =1; while(i < 100){ //发送消息 producer.send(data1); producer.send(data2);i++;Thread.sleep(1000);} } catch (Exception e) { e.printStackTrace(); } producer.close(); } }
- 在SparkStreaming中接收指定话题的数据,对单词进行统计
package streaming; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern;import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils;import scala.Tuple2;import com.google.common.collect.Lists; public class KafkaStreamingWordCount {public static void main(String[] args) {//设置匹配模式,以空格分隔final Pattern SPACE = Pattern.compile(" ");//接收数据的地址和端口String zkQuorum = "localhost:2181";//话题所在的组String group = "1";//话题名称以“,”分隔String topics = "top1,top2";//每个话题的分片数int numThreads = 2; SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); // jssc.checkpoint("checkpoint"); //设置检查点//存放话题跟分片的映射关系Map<String, Integer> topicmap = new HashMap<>();String[] topicsArr = topics.split(",");int n = topicsArr.length;for(int i=0;i<n;i++){topicmap.put(topicsArr[i], numThreads);}//从Kafka中获取数据转换成RDDJavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);//从话题中过滤所需数据JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {@Overridepublic Iterable<String> call(Tuple2<String, String> arg0)throws Exception {return Lists.newArrayList(SPACE.split(arg0._2));}});//对其中的单词进行统计JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});//打印结果 wordCounts.print();jssc.start();jssc.awaitTermination();}}
spark streaming 接收 kafka 数据java代码WordCount示例相关推荐
- sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据
Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...
- DStream实战之Spark Streaming接收socket数据实现WordCount 31
需求 现在想要通过socket发送数据, 然后Spark Streaming接收数据并且统计socket发送的每个单词的个数. 1. 架构图 2. 实现流程 安装并启动生产者 首先在linux服务器上 ...
- Spark Streaming读取Kafka数据的两种方式
Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...
- sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案
下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...
- spark kafka java api_java实现spark streaming与kafka集成进行流式计算
java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...
- java读写德卡数据_Spark Streaming 读取Kafka数据写入ES
简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...
- 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例
文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...
- Spark Streaming使用Kafka保证数据零丢失
为什么80%的码农都做不了架构师?>>> 源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...
- spark streaming运行kafka数据源
一.Kafka准备工作 Kafka的安装,请看另外一文,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是1.6.2,scala版本号是2.10,所以,一定要选 ...
最新文章
- Visual Studio® 2010 Web Deployment Projects站点编译生成bin同时发表插件
- Disruptor-net
- CRM Mock framework debug
- 阿里DIN模型(深度兴趣网络)详解及理解
- ios退款 怎么定位到是哪个用户_哪个浏览器兼容性最好用?看看用户都是怎么评价的吧...
- 转载:margin外边距合并问题以及解决方式
- SilverLight中的基本图形(转)
- 从本科到研究生,看大疆工程师给你定制的机器人学习计划
- Oink:类似Foursquare Radar的排名应用
- Hololens学习(三)打包编译安装HoloLens2应用
- switch删除用户显示无法连接服务器,switch无法连接互联网怎么办 NS无法联机联网详细解决办法...
- Renesas CS+ for ca cx入门(一)
- 认识黑客常用的入侵方法
- Facebook和Heroku结成伙伴关系
- 这篇3万字的Java后端面试总结,面试官看了瑟瑟发抖(汇总)
- hiho 挑战赛16 A.王胖浩与三角形
- python星星排列_python中怎么实现星星排列
- 三星存储新品首销情况揭秘
- 数据结构与算法之数组: Leetcode 914. 卡牌分组 (Typescript版)
- Latex中宏包总结
热门文章
- 如何将一个集合里的对象进行计算再排序
- numpy中计算矩阵数值的核心函数
- 苏联最早批量生产的晶体三极管
- 2021年春季学期-信号与系统-第十一次作业参考答案-第七小题
- 第十六届全国大学生智能汽车竞赛-航天智慧物流创意组 线下选拔赛赛题发布!
- 给树莓派增加微型摄像头 Raspberry Mode4
- 初学者的深度学习训练与部署
- 电动双联电位器ZW1613
- win10 hao123劫持html文件,Win10 edge主页被hao123劫持如何解决|edge主页被hao123劫持的解决方法...
- python selenium 进入新标签页_Python 爬虫 | 用selenium实现批改网的自动翻译