SparkStreaming之Offset管理、胖包和瘦包提交
1、Offset管理
Kafka从0.10.x开始Offset偏移量就自从维护在Kafka内部中,看下面代码。
注意,我们使用的是earliest从头开始消费,也就是说如果你的SparkStreaming刚开始启动,那么会从Kafka对应的Topic从第一条数据开始消费到当前。
下面模拟,第一次消费后DStream停止了,但是Kafka依然在生产数据,再次启动DStream会从什么位置消费。
package com.ruozedata.sparkimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}object DirectKafkaApp {def main(args: Array[String]): Unit = {val sparkConf=new SparkConf().setMaster("local[2]").setAppName("SocketWCApp")val ssc=new StreamingContext(sparkConf,Seconds(10))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "vm01:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "earliest", // latest 最新的 earliest从头开始消费"enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("g6spark")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,//PreferBrokers,Subscribe[String, String](topics, kafkaParams))stream.map(record => record.value).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()// offset偏移量管理stream.foreachRDD{ rdd=>val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition{ iter =>val o: OffsetRange=offsetRanges(TaskContext.get.partitionId())//打印topic,所在分区,offset开始位置,offset结束位置println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) //异步提交offset}ssc.start()ssc.awaitTermination()}}
启动Kafka,看博客:https://blog.csdn.net/greenplum_xiaofan/article/details/99224269
然后启动上面的DSstream应用程序,第一次消费Kafka的数据
结果如下,可以看到topic=>g6spark,分区=>0,offset读取位置=>22,offset最后位置=>27
因为我的topic里面有数据啊,所以第一次启动,肯定读取了以前的数据,这个没关系。
停止DStream程序,Kafka继续生产数据
[hadoop@vm01 bin]$ ./kafka-console-producer.sh \
> --broker-list vm01:9092 \
> --topic g6spark
>hello,spark
>hello,hadoop
>hello,flume
>hello,spark
然后再次启动DStream程序,我们可以看到是从offset=27这个位置开始读取的。
2、瘦包和胖包提交
2.1 SparkStreaming执行代码
瘦包:只有源码,没有带依赖的。
胖包:除了源码,还附加开发环境依赖的包。
实际工作中我们肯定是要用瘦包提交的,试想一下如果开发环境和生产环境依赖的包不同,程序肯定会报错;当然也有特殊情况,在缺少小部分依赖的情况下,可以带上这些依赖,或者直接上传过去就行。
package com.ruozedata.sparkimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object DirectKafkaApp02 {def main(args: Array[String]): Unit = {//传递三个参数,做判断if(args.length !=3){System.err.println("Usage:DirectKafkaApp <brokers> <topic> <groupid>")System.exit(1)}val Array(brokers,topic,groupid)=args//注释部分,肯定是在spark-submit的时候设置val sparkConf=new SparkConf()//.setMaster("local[2]").setAppName("SocketWCApp")val ssc=new StreamingContext(sparkConf,Seconds(10))val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupid,"auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))//如果多个topic,就使用逗号分隔val topics = topic.split(",")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.map(record => record.value).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}}
2.2 先本地验证ok,再提交
先开放这一段,//.setMaster(“local[2]”).setAppName(“SocketWCApp”)
#参数
vm01:9092 g6spark use_a_separate_group_id_for_each_stream
启动执行,ok
然后再注释刚才那段,进行下面的操作
2.3 首先说下瘦包如何打包和提交
打包:运行后,会生成在你的Project工作空间下。
提交:
[hadoop@vm01 bin]$ cd $SPARK_HOME
[hadoop@vm01 spark-2.4.2-bin-2.6.0-cdh5.7.0]$ cd bin
[hadoop@vm01 bin]$ ./spark-submit \
--master local[2] \
--name kafkaStreaming \
--class com.ruozedata.spark.DirectKafkaApp02 \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 \
/home/hadoop/lib/spark-train-1.0.jar \
vm01:9092 g6spark use_a_separate_group_id_for_each_stream
说明:
–name kafkaStreaming \ 名字随便取
–class com.ruozedata.spark.DirectKafkaApp02 \ 要运行的类,全路径的
–packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 \ 这个因为集群里面没有Kafka依赖包,所以提交的时候要告诉集群,在哪里下载
/home/hadoop/lib/spark-train-1.0.jar \ 包存房目录
vm01:9092 g6spark use_a_separate_group_id_for_each_stream 传递的三个参数
Kafka生产数据
[hadoop@vm01 bin]$ ./kafka-console-producer.sh \
> --broker-list vm01:9092 \
> --topic g6spark
>hello,spark
>hello,hadoop
>hello,flume
>hello,spark
Spark提交端消费到的数据
2.4 胖包如何打包和提交
这里有个概念:provided
在pom.xml标记为provided,那么在打包的时候,被标记的依赖是不会打包的,被标记的说明在集群上存在这些依赖。
比如,这个就需要你自己好好去标记了。
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency>
然后还需要添加一段插件,添加在<build>
下
<pluginManagement><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins>
</pluginManagement>
然后开始打包
如果你的没有看到Maven,左上角“+”添加,name随便取一个
然后运行,打出来的jar包是 spark-train-1.0-jar-with-dependencies.jar
打包之后,就附带上了所有需要的依赖
接下来就是提交,测试和上面的一样了。
[hadoop@vm01 bin]$ ./spark-submit \
--master local[2] \
--name kafkaStreaming \
--class com.ruozedata.spark.DirectKafkaApp02 \
/home/hadoop/lib/spark-train-1.0-jar-with-dependencies.jar \
vm01:9092 g6spark use_a_separate_group_id_for_each_stream
SparkStreaming之Offset管理、胖包和瘦包提交相关推荐
- maven打包之胖包与瘦包
IDEA的Maven打包步骤 胖包和瘦包的区别: 胖包:将maven项目中的依赖包和项目打为一个包 瘦包:直接打包,不打包依赖包,仅打包出项目中的代码到JAR包中. maven打胖包 胖包的意识就是可 ...
- Maven中打胖包和瘦包
1.把如下代码复制到porm.xml中的path,如下 <build><!--jar包的首名称--><finalName>myduaf</finalName& ...
- 如何快速打胖包和瘦包
目录 1.胖瘦包区别 2.简单手动打胖瘦包 2.1 无依赖瘦包 2.2 胖包 3.修改配置文件快速打胖瘦包 1.胖瘦包区别 胖包和瘦包有一定的区别,胖包是连同写的项目和其所依赖的包全部打包,方便在任何 ...
- maven-compiler-plugin如何快速打胖包和瘦包
maven打jar包 胖瘦包区别 简单手动打胖瘦包 无依赖瘦包 胖包 修改配置文件快速打胖瘦包 胖瘦包区别 胖包和瘦包有一定的区别,胖包是连同写的项目和其所依赖的包全部打包,方便在任何环境下运行,瘦包 ...
- idea同时打胖包和瘦包的方法
在pom包中改写build可实现同时打胖包和瘦包. 代码如下: <build><plugins><plugin><artifactId>maven-co ...
- Maven中的打胖包瘦包
maven工程中的胖包与瘦包的区别在于:一.文件大小,胖包往往是瘦包的很多很多倍:二.胖包中自带依赖,瘦包中没有,所以胖包到哪里都能用,而要使用瘦包必须引用的工程中自带依赖才行 第一步:在maven的 ...
- IDEA中如何正确快速打jar包(包括瘦包、胖包)
第一种普通java工程下: 1.打开idea 如图所示:点击箭头所指的位置: 2.选择你自己要打包的available element 后 如图所示 逐步点击即可 进入如下页面 点击ok 打包完成 来 ...
- 无线路由器、AP、胖AP、瘦AP的概念区别
在学校里这几个概念一直迷迷糊糊,如今刚刚走上工作岗位,对这几个概念进行梳理一下.如果感觉不对,请帮忙指正,谢谢! 一.无线路由器与无线AP(Access Point 无线接入点)的区别 成本:无线路由 ...
- 组网胖模式_胖AP和瘦AP的区别,组网优缺点分析
满意答案 fengzhu30 2013.04.11 采纳率:47% 等级:9 已帮助:617人 无线AP是access point(无线接入点)的意思,是WLAN网络中的重要组成部分,其作用是把 ...
最新文章
- C++ - emplace_back 和 push_back 的区别
- 【Linux】Linux简单操作之管道与重定向
- JQuery:JQuery 中的CSS()方法
- 程序员总结:帮助你早些明白一些道理
- linux命令行安装使用KVM
- es6中新增对象的特性和方法
- 洛谷P1035 [NOIP2002 普及组] 级数求和
- 做技术支持的工作心得
- KOFLive Beta Daily-Scrum 2
- 黑马博客——详细步骤(九)项目功能的实现之mongoDB数据库添加账号
- ip复原Java_Java实现 LeetCode 93 复原IP地址
- 新年快乐@2008!
- 2.aop原理:@EnableAspectJAutoProxy
- 【转载】C# 开源库大全非常好
- 2、那智机器人时序基板的TBEX1、TBEX2连接
- chrome插件离线安装包(.crx)下载
- stack Error: EACCES: permission denied
- 现阶段人工智能应用涉及到哪些行业?
- linux命令显示文件内容行号|linux将内容以行号显示出来
- spring-cloud-starter-bus-kafka利用kafka消息总线实现动态刷新配置
热门文章
- SPI速度最快,其次UART,IIC最慢。UART转成485通讯距离最长,其他两个应该差不多
- bzoj2733 永无乡
- 日常电脑操作小技能篇(生活无处不精彩)
- SpringBoot学习(六)——springboot整合后台模板
- aistudio使用py检测深度学习是否训练完成并发送短信提示
- 排序模型进阶-WideDeepWDL模型导出
- Android高工:细说 Android 多线程,211本硕如何通过字节跳动、百度、美团Android面试
- CTFHub Bypass disable_function系列(已完结)
- leetcode 大礼包
- Wavelets: Seeing the forest and the trees