关于kafka的环境搭建这里略过。

1. 正常流程

1.1 添加maven依赖

<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.2.0</version></dependency></dependencies>

1.2 编写测试代码

package com.demo;import kafka.serializer.StringDecoder;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;/****/
public class SparkStreanmingKafkaWordCount {public static void main(String[] args) throws Exception {/*** 第一步:配置sparkConf* */SparkConf sc = new SparkConf().setMaster("local[*]").setAppName("online count");/*** 第二步:创建SparkStreamingContext:* 这是个SparkStreaming应用程序所有功能的起点和程序调度的核心* SparkStreamingContext构建基于SparkConf参数,可以基于持久化的SparkStreamingContext内容         */JavaStreamingContext jsc = new JavaStreamingContext(sc,Durations.seconds(12));/** 第三步:创建Spark Streaming输入数据来源input Stream:* 数据输入来源可以基于File、HDFS、Flume、Kafka、Socket         */Map<String,String> kafkaParam = new HashMap<>();kafkaParam.put("metadata.broker.list","10.18.0.54:9092");HashSet<String> topic = new HashSet<>();topic.add("test");JavaPairInputDStream<String, String> line = KafkaUtils.createDirectStream(jsc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParam, topic);JavaDStream<String> flatLine = line.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {@Overridepublic Iterator<String> call(Tuple2<String, String> tuple2) throws Exception {return Arrays.asList(tuple2._2.split(" ")).iterator();}});JavaPairDStream<String, Integer> pair = flatLine.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<String, Integer>(s, 1);}});JavaPairDStream<String, Integer> count = pair.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) throws Exception {return integer + integer2;}});count.print();jsc.start();jsc.awaitTermination();jsc.stop();}
}

1.3 运行程序

看到定时输出类似下面的日志,表示程序启动成功。

1.4 启动kafka生产者

在kafka命令行终端输入字符

.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test

1.5 观察程序执行日志

看到类似如下的日志,表示执行成功。

22/01/14 13:31:24 INFO BlockManagerInfo: Removed broadcast_3_piece0 on 10.18.0.66:54253 in memory (size: 1875.0 B, free: 1989.6 MB)
-------------------------------------------
Time: 1642138284000 ms
-------------------------------------------
(,1)
(a,3)
(b,1)
(t,1)
(5,1)

2. 调试异常信息

2.1 连接服务失败异常

22/01/14 13:26:56 INFO SimpleConsumer: Reconnect due to socket error: java.nio.channels.ClosedChannelException
Exception in thread "main" org.apache.spark.SparkException: java.nio.channels.ClosedChannelExceptionat org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:385)at scala.util.Either.fold(Either.scala:98)at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:384)at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)at com.demo.SparkStreanmingKafkaWordCount.main(SparkStreanmingKafkaWordCount.java:72)
22/01/14 13:27:17 INFO SparkContext: Invoking stop() from shutdown hook

ip变化了导致连接连接失败。

java+maven+kafka开发spark streaming demo程序相关推荐

  1. 使用IntelliJ Idea开发Spark Streaming流应用程序

    使用IntelliJ Idea开发Spark Streaming流应用程序 一.实验目的 二.实验内容 三.实验原理 四.实验环境 五.实验步骤 5.1 启动IntelliJ Idea并创建spark ...

  2. 使用 Kafka 和 Spark Streaming 构建实时数据处理系统

    使用 Kafka 和 Spark Streaming 构建实时数据处理系统  来源:https://www.ibm.com/developerworks,这篇文章转载自微信里文章,正好解决了我项目中的 ...

  3. 基于大数据的Uber数据实时监控(Part 2:Kafka和Spark Streaming)

    导言 本文是系列文章的第二篇,我们将建立一个分析和监控Uber汽车GPS旅行数据的实时示例.在第一篇文章中讨论了使用Apache Spark的K-means算法创建机器学习模型,以根据位置聚类Uber ...

  4. spark1.6 maven java_Spark+ECLIPSE+JAVA+MAVEN windows开发环境搭建及入门实例【附详细代码】...

    前言 本文旨在记录初学Spark时,根据官网快速入门中的一段Java代码,在Maven上建立应用程序并实现执行. 首先推荐一个很好的入门文档库,就是CSDN的Spark知识库,里面有很多spark的从 ...

  5. JAVA maven Spring 开发 webservice 步骤

    首先新建web项目Eclipse-->File-->New-->Other-->弹出框搜索web-->选择Dynamic Web Project;依次填入项目名等等... ...

  6. Scala和Java二种方式实战Spark Streaming开发

    一.Java方式开发 1.开发前准备:假定您以搭建好了Spark集群. 2.开发环境采用eclipse maven工程,需要添加Spark Streaming依赖. 3.Spark streaming ...

  7. Kafka+Spark Streaming如何保证exactly once语义

    大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 在Kafka.Storm.Flink.Spark Streaming等分布式流处理系统中(没错,Ka ...

  8. Spark中如何管理Spark Streaming消费Kafka的偏移量

    spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的of ...

  9. Spark Streaming 遇到 kafka

    Spark Streaming 遇到 kafka 站酷 | 插画 搭建流程略,主要讲一下如何更好的结合使用,看图说话. Kafka 结合 Spark Streaming 实现在线的准实时流分析,除了保 ...

最新文章

  1. 分布式存储系统的关键技术-针对应用和负载的存储优化技术
  2. applet操作本地文件
  3. 博弈论笔记:逆向选择与非对称信息
  4. class_create()函数
  5. idea将本地项目推送至远程仓库(图形化版本01)
  6. tensorflow精进之路(十七)——python3网络爬虫(上)
  7. server2008r2经常蓝屏或者自动重启
  8. gson将JSON字符串转成Java对象
  9. java实现活动安排问题_贪心算法-活动安排问题
  10. 当2000万多头猪联接上网,会发生什么
  11. 读研计算机技术与控制工程比较,电气工程与控制工程研究生考研就业的区别,哪个比较好...
  12. C语言二维数组及指针引用
  13. 徐有高:为你详细解读我国40省市新能源汽车补贴政策(转载)
  14. 2022熔化焊接与热切割复训题库模拟考试平台操作
  15. java小易——Servlet轻量级服务
  16. 第二弹!python爬虫批量下载高清大图
  17. CentOS 8 更新/etc/yum.repos.d
  18. OPC及OPC服务器的设计与实现
  19. 试题 算法提高 Monday-Saturday质因子
  20. 《每天五分钟冲击python基础之字符串练习题》(七)

热门文章

  1. 为什么Unity按钮点击失灵了
  2. 上海瞬渺光电成功举办自适应光学研讨会
  3. 报num_samples should be a positive integer value, but got num_samples=0错误
  4. 【模型修改的漫漫长路】经典VGG模型理解-这大概是目前最详细的讲解了【一】
  5. 电动自行车ce认证标准EN15194
  6. 网络传输中的交换机和路由器
  7. 史上最全面试题总结JavaScript
  8. python抽样不同花色纸牌_想要随机出5个不同花色和数字的扑克牌该怎么做?
  9. 2021备战金三银四血拼一波算法:字节+百度,java编程教程视频
  10. App应用接口版本兼容设计和使用原则