java+maven+kafka开发spark streaming demo程序
关于kafka的环境搭建这里略过。
1. 正常流程
1.1 添加maven依赖
<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version></dependency></dependencies>
1.2 编写测试代码
package com.demo;import kafka.serializer.StringDecoder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;/****/
public class SparkStreanmingKafkaWordCount {public static void main(String[] args) throws Exception {/*** 第一步:配置sparkConf* */SparkConf sc = new SparkConf().setMaster("local[*]").setAppName("online count");/*** 第二步:创建SparkStreamingContext:* 这是个SparkStreaming应用程序所有功能的起点和程序调度的核心* SparkStreamingContext构建基于SparkConf参数,可以基于持久化的SparkStreamingContext内容 */JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(12));/** 第三步:创建Spark Streaming输入数据来源input Stream:* 数据输入来源可以基于File、HDFS、Flume、Kafka、Socket */Map<String,String> kafkaParam = new HashMap<>();kafkaParam.put("metadata.broker.list","10.18.0.54:9092");HashSet<String> topic = new HashSet<>();topic.add("test");JavaPairInputDStream<String, String> line = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParam, topic);JavaDStream<String> flatLine = line.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {@Overridepublic Iterator<String> call(Tuple2<String, String> tuple2) throws Exception {return Arrays.asList(tuple2._2.split(" ")).iterator();}});JavaPairDStream<String, Integer> pair = flatLine.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<String, Integer>(s, 1);}});JavaPairDStream<String, Integer> count = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) throws Exception {return integer + integer2;}});count.print();jsc.start();jsc.awaitTermination();jsc.stop();}
}
1.3 运行程序
看到定时输出类似下面的日志,表示程序启动成功。
1.4 启动kafka生产者
在kafka命令行终端输入字符
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test
1.5 观察程序执行日志
看到类似如下的日志,表示执行成功。
22/01/14 13:31:24 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 10.18.0.66:54253 in memory (size: 1875.0 B, free: 1989.6 MB)
-------------------------------------------
Time: 1642138284000 ms
-------------------------------------------
(,1)
(a,3)
(b,1)
(t,1)
(5,1)
2. 调试异常信息
2.1 连接服务失败异常
22/01/14 13:26:56 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelExceptionat org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)at scala.util.Either.fold(Either.scala:98)at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:384)at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)at com.demo.SparkStreanmingKafkaWordCount.main(SparkStreanmingKafkaWordCount.java:72)
22/01/14 13:27:17 INFO SparkContext: Invoking stop() from shutdown hook
ip变化了导致连接连接失败。
java+maven+kafka开发spark streaming demo程序相关推荐
- 使用IntelliJ Idea开发Spark Streaming流应用程序
使用IntelliJ Idea开发Spark Streaming流应用程序 一.实验目的 二.实验内容 三.实验原理 四.实验环境 五.实验步骤 5.1 启动IntelliJ Idea并创建spark ...
- 使用 Kafka 和 Spark Streaming 构建实时数据处理系统
使用 Kafka 和 Spark Streaming 构建实时数据处理系统 来源:https://www.ibm.com/developerworks,这篇文章转载自微信里文章,正好解决了我项目中的 ...
- 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)
导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...
- spark1.6 maven java_Spark+ECLIPSE+JAVA+MAVEN windows开发环境搭建及入门实例【附详细代码】...
前言 本文旨在记录初学Spark时,根据官网快速入门中的一段Java代码,在Maven上建立应用程序并实现执行. 首先推荐一个很好的入门文档库,就是CSDN的Spark知识库,里面有很多spark的从 ...
- JAVA maven Spring 开发 webservice 步骤
首先新建web项目Eclipse-->File-->New-->Other-->弹出框搜索web-->选择Dynamic Web Project;依次填入项目名等等... ...
- Scala和Java二种方式实战Spark Streaming开发
一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming ...
- Kafka+Spark Streaming如何保证exactly once语义
大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 在Kafka.Storm.Flink.Spark Streaming等分布式流处理系统中(没错,Ka ...
- Spark中如何管理Spark Streaming消费Kafka的偏移量
spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的of ...
- Spark Streaming 遇到 kafka
Spark Streaming 遇到 kafka 站酷 | 插画 搭建流程略,主要讲一下如何更好的结合使用,看图说话. Kafka 结合 Spark Streaming 实现在线的准实时流分析,除了保 ...
最新文章
- 分布式存储系统的关键技术-针对应用和负载的存储优化技术
- applet操作本地文件
- 博弈论笔记:逆向选择与非对称信息
- class_create()函数
- idea将本地项目推送至远程仓库(图形化版本01)
- tensorflow精进之路(十七)——python3网络爬虫(上)
- server2008r2经常蓝屏或者自动重启
- gson将JSON字符串转成Java对象
- java实现活动安排问题_贪心算法-活动安排问题
- 当2000万多头猪联接上网,会发生什么
- 读研计算机技术与控制工程比较,电气工程与控制工程研究生考研就业的区别,哪个比较好...
- C语言二维数组及指针引用
- 徐有高:为你详细解读我国40省市新能源汽车补贴政策(转载)
- 2022熔化焊接与热切割复训题库模拟考试平台操作
- java小易——Servlet轻量级服务
- 第二弹!python爬虫批量下载高清大图
- CentOS 8 更新/etc/yum.repos.d
- OPC及OPC服务器的设计与实现
- 试题 算法提高 Monday-Saturday质因子
- 《每天五分钟冲击python基础之字符串练习题》(七)
热门文章
- 为什么Unity按钮点击失灵了
- 上海瞬渺光电成功举办自适应光学研讨会
- 报num_samples should be a positive integer value, but got num_samples=0错误
- 【模型修改的漫漫长路】经典VGG模型理解-这大概是目前最详细的讲解了【一】
- 电动自行车ce认证标准EN15194
- 网络传输中的交换机和路由器
- 史上最全面试题总结JavaScript
- python抽样不同花色纸牌_想要随机出5个不同花色和数字的扑克牌该怎么做?
- 2021备战金三银四血拼一波算法:字节+百度,java编程教程视频
- App应用接口版本兼容设计和使用原则