1. 首先启动zookeeper

2. 启动kafka

3. 核心代码

  • 生产者生产消息的java代码,生成要统计的单词
package streaming;import java.util.Properties; import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; public class MyProducer {   public static void main(String[] args) {   Properties props = new Properties();   props.setProperty("metadata.broker.list","localhost:9092");   props.setProperty("serializer.class","kafka.serializer.StringEncoder");   props.put("request.required.acks","1");   ProducerConfig config = new ProducerConfig(props);   //创建生产这对象Producer<String, String> producer = new Producer<String, String>(config);//生成消息KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("top1","test kafka");KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("top2","hello world");try {   int i =1; while(i < 100){    //发送消息
                    producer.send(data1);   producer.send(data2);i++;Thread.sleep(1000);} } catch (Exception e) {   e.printStackTrace();   }   producer.close();   }
}

  • 在SparkStreaming中接收指定话题的数据,对单词进行统计
package streaming;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;import scala.Tuple2;import com.google.common.collect.Lists;
public class KafkaStreamingWordCount {public static void main(String[] args) {//设置匹配模式,以空格分隔final Pattern SPACE = Pattern.compile(" ");//接收数据的地址和端口String zkQuorum = "localhost:2181";//话题所在的组String group = "1";//话题名称以“,”分隔String topics = "top1,top2";//每个话题的分片数int numThreads = 2;        SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
//        jssc.checkpoint("checkpoint"); //设置检查点//存放话题跟分片的映射关系Map<String, Integer> topicmap = new HashMap<>();String[] topicsArr = topics.split(",");int n = topicsArr.length;for(int i=0;i<n;i++){topicmap.put(topicsArr[i], numThreads);}//从Kafka中获取数据转换成RDDJavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);//从话题中过滤所需数据JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {@Overridepublic Iterable<String> call(Tuple2<String, String> arg0)throws Exception {return Lists.newArrayList(SPACE.split(arg0._2));}});//对其中的单词进行统计JavaPairDStream<String, Integer> wordCounts = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1);}}).reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer i1, Integer i2) {return i1 + i2;}});//打印结果
        wordCounts.print();jssc.start();jssc.awaitTermination();}}

spark streaming 接收 kafka 数据java代码WordCount示例相关推荐

  1. sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

    Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...

  2. DStream实战之Spark Streaming接收socket数据实现WordCount 31

    需求 现在想要通过socket发送数据, 然后Spark Streaming接收数据并且统计socket发送的每个单词的个数. 1. 架构图 2. 实现流程 安装并启动生产者 首先在linux服务器上 ...

  3. Spark Streaming读取Kafka数据的两种方式

    Kafka在0.8和0.10之间引入了一种新的消费者API,因此,Spark Streaming与Kafka集成,有两种包可以选择: spark-streaming-kafka-0-8与spark-s ...

  4. sparkstreaming监听hdfs目录_Spark Streaming消费Kafka数据的两种方案

    下午的时候翻微信看到大家在讨论Spark消费Kafka的方式,官网中就有答案,只不过是英文的,当然很多博客也都做了介绍,正好我的收藏夹中有一篇文章供大家参考.文章写的通俗易懂,搭配代码,供大家参考. ...

  5. spark kafka java api_java实现spark streaming与kafka集成进行流式计算

    java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...

  6. java读写德卡数据_Spark Streaming 读取Kafka数据写入ES

    简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...

  7. 使用Spark Streaming从kafka中读取数据把数据写入到mysql 实例

    文章目录 一. 题目 题目和数据 二. pom依赖 三.建表语句 四. 连接kafka配置类 五. 自定义分区类 六. 读取数据并发送数据 七. 消费数据,把数据存储到mysql 一. 题目 题目和数 ...

  8. Spark Streaming使用Kafka保证数据零丢失

    为什么80%的码农都做不了架构师?>>>    源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...

  9. spark streaming运行kafka数据源

    一.Kafka准备工作 Kafka的安装,请看另外一文,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是1.6.2,scala版本号是2.10,所以,一定要选 ...

最新文章

  1. Visual Studio® 2010 Web Deployment Projects站点编译生成bin同时发表插件
  2. Disruptor-net
  3. CRM Mock framework debug
  4. 阿里DIN模型(深度兴趣网络)详解及理解
  5. ios退款 怎么定位到是哪个用户_哪个浏览器兼容性最好用?看看用户都是怎么评价的吧...
  6. 转载:margin外边距合并问题以及解决方式
  7. SilverLight中的基本图形(转)
  8. 从本科到研究生,看大疆工程师给你定制的机器人学习计划
  9. Oink:类似Foursquare Radar的排名应用
  10. Hololens学习(三)打包编译安装HoloLens2应用
  11. switch删除用户显示无法连接服务器,switch无法连接互联网怎么办 NS无法联机联网详细解决办法...
  12. Renesas CS+ for ca cx入门(一)
  13. 认识黑客常用的入侵方法
  14. Facebook和Heroku结成伙伴关系
  15. 这篇3万字的Java后端面试总结,面试官看了瑟瑟发抖(汇总)
  16. hiho 挑战赛16 A.王胖浩与三角形
  17. python星星排列_python中怎么实现星星排列
  18. 三星存储新品首销情况揭秘
  19. 数据结构与算法之数组: Leetcode 914. 卡牌分组 (Typescript版)
  20. Latex中宏包总结

热门文章

  1. 如何将一个集合里的对象进行计算再排序
  2. numpy中计算矩阵数值的核心函数
  3. 苏联最早批量生产的晶体三极管
  4. 2021年春季学期-信号与系统-第十一次作业参考答案-第七小题
  5. 第十六届全国大学生智能汽车竞赛-航天智慧物流创意组 线下选拔赛赛题发布!
  6. 给树莓派增加微型摄像头 Raspberry Mode4
  7. 初学者的深度学习训练与部署
  8. 电动双联电位器ZW1613
  9. win10 hao123劫持html文件,Win10 edge主页被hao123劫持如何解决|edge主页被hao123劫持的解决方法...
  10. python selenium 进入新标签页_Python 爬虫 | 用selenium实现批改网的自动翻译