1、Offset管理

Kafka从0.10.x开始Offset偏移量就自从维护在Kafka内部中,看下面代码。
注意,我们使用的是earliest从头开始消费,也就是说如果你的SparkStreaming刚开始启动,那么会从Kafka对应的Topic从第一条数据开始消费到当前。
下面模拟,第一次消费后DStream停止了,但是Kafka依然在生产数据,再次启动DStream会从什么位置消费。

package com.ruozedata.sparkimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}object DirectKafkaApp {def main(args: Array[String]): Unit = {val sparkConf=new SparkConf().setMaster("local[2]").setAppName("SocketWCApp")val ssc=new StreamingContext(sparkConf,Seconds(10))val kafkaParams = Map[String, Object]("bootstrap.servers" -> "vm01:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "earliest",  // latest 最新的  earliest从头开始消费"enable.auto.commit" -> (false: java.lang.Boolean))val topics = Array("g6spark")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,//PreferBrokers,Subscribe[String, String](topics, kafkaParams))stream.map(record => record.value).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()// offset偏移量管理stream.foreachRDD{ rdd=>val offsetRanges=rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition{ iter =>val o: OffsetRange=offsetRanges(TaskContext.get.partitionId())//打印topic,所在分区,offset开始位置,offset结束位置println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)  //异步提交offset}ssc.start()ssc.awaitTermination()}}

启动Kafka,看博客:https://blog.csdn.net/greenplum_xiaofan/article/details/99224269
然后启动上面的DSstream应用程序,第一次消费Kafka的数据
结果如下,可以看到topic=>g6spark,分区=>0,offset读取位置=>22,offset最后位置=>27
因为我的topic里面有数据啊,所以第一次启动,肯定读取了以前的数据,这个没关系。

停止DStream程序,Kafka继续生产数据

[hadoop@vm01 bin]$  ./kafka-console-producer.sh \
> --broker-list vm01:9092 \
> --topic g6spark
>hello,spark
>hello,hadoop
>hello,flume
>hello,spark

然后再次启动DStream程序,我们可以看到是从offset=27这个位置开始读取的。

2、瘦包和胖包提交

2.1 SparkStreaming执行代码

瘦包:只有源码,没有带依赖的。
胖包:除了源码,还附加开发环境依赖的包。
实际工作中我们肯定是要用瘦包提交的,试想一下如果开发环境和生产环境依赖的包不同,程序肯定会报错;当然也有特殊情况,在缺少小部分依赖的情况下,可以带上这些依赖,或者直接上传过去就行。

package com.ruozedata.sparkimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object DirectKafkaApp02 {def main(args: Array[String]): Unit = {//传递三个参数,做判断if(args.length !=3){System.err.println("Usage:DirectKafkaApp <brokers> <topic> <groupid>")System.exit(1)}val Array(brokers,topic,groupid)=args//注释部分,肯定是在spark-submit的时候设置val sparkConf=new SparkConf()//.setMaster("local[2]").setAppName("SocketWCApp")val ssc=new StreamingContext(sparkConf,Seconds(10))val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupid,"auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean))//如果多个topic,就使用逗号分隔val topics = topic.split(",")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.map(record => record.value).flatMap(_.split(",")).map((_,1)).reduceByKey(_+_).print()ssc.start()ssc.awaitTermination()}}

2.2 先本地验证ok,再提交

先开放这一段,//.setMaster(“local[2]”).setAppName(“SocketWCApp”)

#参数
vm01:9092 g6spark use_a_separate_group_id_for_each_stream


启动执行,ok

然后再注释刚才那段,进行下面的操作

2.3 首先说下瘦包如何打包和提交

打包:运行后,会生成在你的Project工作空间下。


提交:

[hadoop@vm01 bin]$ cd $SPARK_HOME
[hadoop@vm01 spark-2.4.2-bin-2.6.0-cdh5.7.0]$ cd bin
[hadoop@vm01 bin]$ ./spark-submit \
--master local[2] \
--name kafkaStreaming \
--class com.ruozedata.spark.DirectKafkaApp02 \
--packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 \
/home/hadoop/lib/spark-train-1.0.jar \
vm01:9092 g6spark use_a_separate_group_id_for_each_stream

说明:
–name kafkaStreaming \ 名字随便取
–class com.ruozedata.spark.DirectKafkaApp02 \ 要运行的类,全路径的

–packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.0 \ 这个因为集群里面没有Kafka依赖包,所以提交的时候要告诉集群,在哪里下载
/home/hadoop/lib/spark-train-1.0.jar \ 包存房目录
vm01:9092 g6spark use_a_separate_group_id_for_each_stream 传递的三个参数

Kafka生产数据

[hadoop@vm01 bin]$  ./kafka-console-producer.sh \
> --broker-list vm01:9092 \
> --topic g6spark
>hello,spark
>hello,hadoop
>hello,flume
>hello,spark

Spark提交端消费到的数据

2.4 胖包如何打包和提交

这里有个概念:provided
在pom.xml标记为provided,那么在打包的时候,被标记的依赖是不会打包的,被标记的说明在集群上存在这些依赖。
比如,这个就需要你自己好好去标记了。

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version><scope>provided</scope></dependency>

然后还需要添加一段插件,添加在<build>

<pluginManagement><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins>
</pluginManagement>

然后开始打包

如果你的没有看到Maven,左上角“+”添加,name随便取一个

然后运行,打出来的jar包是 spark-train-1.0-jar-with-dependencies.jar

打包之后,就附带上了所有需要的依赖
接下来就是提交,测试和上面的一样了。

[hadoop@vm01 bin]$ ./spark-submit \
--master local[2] \
--name kafkaStreaming \
--class com.ruozedata.spark.DirectKafkaApp02 \
/home/hadoop/lib/spark-train-1.0-jar-with-dependencies.jar \
vm01:9092 g6spark use_a_separate_group_id_for_each_stream

SparkStreaming之Offset管理、胖包和瘦包提交相关推荐

  1. maven打包之胖包与瘦包

    IDEA的Maven打包步骤 胖包和瘦包的区别: 胖包:将maven项目中的依赖包和项目打为一个包 瘦包:直接打包,不打包依赖包,仅打包出项目中的代码到JAR包中. maven打胖包 胖包的意识就是可 ...

  2. Maven中打胖包和瘦包

    1.把如下代码复制到porm.xml中的path,如下 <build><!--jar包的首名称--><finalName>myduaf</finalName& ...

  3. 如何快速打胖包和瘦包

    目录 1.胖瘦包区别 2.简单手动打胖瘦包 2.1 无依赖瘦包 2.2 胖包 3.修改配置文件快速打胖瘦包 1.胖瘦包区别 胖包和瘦包有一定的区别,胖包是连同写的项目和其所依赖的包全部打包,方便在任何 ...

  4. maven-compiler-plugin如何快速打胖包和瘦包

    maven打jar包 胖瘦包区别 简单手动打胖瘦包 无依赖瘦包 胖包 修改配置文件快速打胖瘦包 胖瘦包区别 胖包和瘦包有一定的区别,胖包是连同写的项目和其所依赖的包全部打包,方便在任何环境下运行,瘦包 ...

  5. idea同时打胖包和瘦包的方法

    在pom包中改写build可实现同时打胖包和瘦包. 代码如下: <build><plugins><plugin><artifactId>maven-co ...

  6. Maven中的打胖包瘦包

    maven工程中的胖包与瘦包的区别在于:一.文件大小,胖包往往是瘦包的很多很多倍:二.胖包中自带依赖,瘦包中没有,所以胖包到哪里都能用,而要使用瘦包必须引用的工程中自带依赖才行 第一步:在maven的 ...

  7. IDEA中如何正确快速打jar包(包括瘦包、胖包)

    第一种普通java工程下: 1.打开idea 如图所示:点击箭头所指的位置: 2.选择你自己要打包的available element 后 如图所示 逐步点击即可 进入如下页面 点击ok 打包完成 来 ...

  8. 无线路由器、AP、胖AP、瘦AP的概念区别

    在学校里这几个概念一直迷迷糊糊,如今刚刚走上工作岗位,对这几个概念进行梳理一下.如果感觉不对,请帮忙指正,谢谢! 一.无线路由器与无线AP(Access Point 无线接入点)的区别 成本:无线路由 ...

  9. 组网胖模式_胖AP和瘦AP的区别,组网优缺点分析

    满意答案 fengzhu30 2013.04.11 采纳率:47%    等级:9 已帮助:617人 无线AP是access point(无线接入点)的意思,是WLAN网络中的重要组成部分,其作用是把 ...

最新文章

  1. C++ - emplace_back 和 push_back 的区别
  2. 【Linux】Linux简单操作之管道与重定向
  3. JQuery:JQuery 中的CSS()方法
  4. 程序员总结:帮助你早些明白一些道理
  5. linux命令行安装使用KVM
  6. es6中新增对象的特性和方法
  7. 洛谷P1035 [NOIP2002 普及组] 级数求和
  8. 做技术支持的工作心得
  9. KOFLive Beta Daily-Scrum 2
  10. 黑马博客——详细步骤(九)项目功能的实现之mongoDB数据库添加账号
  11. ip复原Java_Java实现 LeetCode 93 复原IP地址
  12. 新年快乐@2008!
  13. 2.aop原理:@EnableAspectJAutoProxy
  14. 【转载】C# 开源库大全非常好
  15. 2、那智机器人时序基板的TBEX1、TBEX2连接
  16. chrome插件离线安装包(.crx)下载
  17. stack Error: EACCES: permission denied
  18. 现阶段人工智能应用涉及到哪些行业?
  19. linux命令显示文件内容行号|linux将内容以行号显示出来
  20. spring-cloud-starter-bus-kafka利用kafka消息总线实现动态刷新配置

热门文章

  1. SPI速度最快,其次UART,IIC最慢。UART转成485通讯距离最长,其他两个应该差不多
  2. bzoj2733 永无乡
  3. 日常电脑操作小技能篇(生活无处不精彩)
  4. SpringBoot学习(六)——springboot整合后台模板
  5. aistudio使用py检测深度学习是否训练完成并发送短信提示
  6. 排序模型进阶-WideDeepWDL模型导出
  7. Android高工:细说 Android 多线程,211本硕如何通过字节跳动、百度、美团Android面试
  8. CTFHub Bypass disable_function系列(已完结)
  9. leetcode 大礼包
  10. Wavelets: Seeing the forest and the trees