Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费
一、Spark Streaming
Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(如Kafka,Flume,Kinesis或TCP sockets)中提取,并且可以使用以高级函数表示的复杂算法进行处理map
,例如reduce
,join
和window
。最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。
二、SparkStreaming实现
Kafka和Zookeeper事先装过,没有先安装Zookeeper,则无法运行Kafka服务。但是,已经为CloudKarafka群集安装并配置了Zookeeper。
我搭建的是 Scala 的 maven 项目,项目和环境都在单机上运行。
1、先看我的 pom.xml 配置:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sparkstream</groupId><artifactId>LyhSparkStreaming</artifactId><version>1.0-SNAPSHOT</version><properties><spark.version>2.3.3</spark.version><scala.version>2.11</scala.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>commons-beanutils</groupId><artifactId>commons-beanutils-core</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.11</artifactId><version>1.6.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_${scala.version}</artifactId><version>${spark.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.19</version><configuration><skip>true</skip></configuration></plugin></plugins></build></project>
我用的Scala版本是 2.11.8。
2、我的 Producer,代码读取 text1.txt文件中的内容,然后把每行数据发送到 Kafka的名叫 Hunter 的 topic 中,这个名字可以自己改,如果Kafka中不存在这个topic的话,系统会自动创建。text1.txt文件自己创建一个,一行一行的数据就可以,并没什么要求,文件的路径改成自己的。
package KafkaAndStreamingimport java.io.{BufferedReader, FileInputStream, FileNotFoundException, IOException, InputStreamReader} import java.util.Propertiesimport org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.{Callback, KafkaProducer, Producer, ProducerRecord, RecordMetadata}object TestKafkaProducer {type Key = Stringtype Val = Stringdef getProducerCnfig():Properties={/*** 对于kafka producer的相关配置文件项* 还有其他的属性,自己去查一下**/val props:Properties = new Properties()// Kafka 的 urlprops.put("bootstrap.servers", "localhost:9092")// group.id 随便写props.put("group.id", "producer-group")props.put("replication.factor", "min.insync.replicas")// 备份数量props.put("min.insync.replicas", "3")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")props}def main(args: Array[String])={// 获取配置文件val props:Properties = getProducerCnfig()// 创建生产者var producer = new KafkaProducer[String, String](props)try{//读取保存的文件val fis:FileInputStream = new FileInputStream("/Users/hunter/text1.txt")val isr:InputStreamReader = new InputStreamReader(fis, "UTF-8")val br:BufferedReader = new BufferedReader(isr)var line: String = ""line = br.readLine()var i: Int=0while (line != null) {producer.send(toMessage(line.toString, Option(i.toString), Option("Hunter")),new Callback {override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {println(s"""Message $i, send to: """ + recordMetadata.topic())}})i += 1line = br.readLine()}producer.close()br.close()isr.close()fis.close()} catch {case ex: FileNotFoundException =>{println("Missing file exception")}case ex: IOException => {println("IO Exception")}case _ =>{println("Have other Error")}} // dealWithData}//把我的消息包装成了ProducerRecordprivate def toMessage(value: String, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {val t = topic.getOrElse(Option("test").getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))require(!t.isEmpty, "Topic must not be empty")key match {case Some(k) => new ProducerRecord(t, k, value)case _ => new ProducerRecord(t, value)}} }
3、我的 Consumer端,这里主函数启动需要参数,参数为 localhost:9092 Hunter
代码如下:
package KafkaAndStreamingimport kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.spark.{SparkConf, TaskContext} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext}object KafkaAndPrintInSpark {//判断设置的时输入参数,是否包含brokers 和 topic 至少参数的长度为2,即单机运行一个test的topic: broker=localhost:9092 topic=testdef main(args: Array[String]) {if (args.length < 2) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <topics>| <brokers> is a list of one or more Kafka brokers| <topics> is a list of one or more kafka topics to consume from""".stripMargin)System.exit(1)}//将参数args读入到数组中val Array(brokers, topics) = args// 用2秒批间隔创建上下文val sparkConf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(5))// 创建kafka流与brokers和topicval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,"bootstrap.servers" -> "localhost:9092", // "auto.offset.reset" -> "smallest","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)// 此处注释部分是自定义偏移量 5L代表从 5开始读取。默认是读取最新的,offset从上一次读取结束的位置开始 // val offsetList = List(("Hunter", 0, 5L)) // val fromOffsets = setFromOffsets(offsetList)//对List进行处理,变成需要的格式,即Map[TopicAndPartition, Long] // val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata,这个所有使用情况都是一样的,就这么写 // val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)]( // ssc, kafkaParams, fromOffsets, messageHandler)messages.foreachRDD( rdd => { // if(rdd.count()>0) {rdd.foreach( records => {println("_1: " + records._1)println("_2: " + records._2)})val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter =>val o: OffsetRange = offsetRanges(TaskContext.get.partitionId())println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")} // }})// 开始计算ssc.start()ssc.awaitTermination()}def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {var fromOffsets: Map[TopicAndPartition, Long] = Map()for (offset <- list) {val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数fromOffsets += (tp -> offset._3) // offset位置}fromOffsets} }
三、最后结果
1、Producer
2、Consumer
转载于:https://www.cnblogs.com/Lyh1997/p/11458614.html
Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费相关推荐
- spark streaming kafka Couldn't find leader
问题描述: 使用spark streaming接受kafka数据(使用direct方式)报错 Couldn't find leader offsets for Set([test,0], [test, ...
- spark spark streaming + kafka receiver方式消费消息
2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...
- 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题
问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...
- DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36
前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...
- Spark Streaming概述_大数据培训
Spark Streaming是什么 Spark Streaming用于流式数据的处理.Spark Streaming支持的数据输入源很多,例如:Kafka.Flume.Twitter.ZeroMQ和 ...
- Spark Streaming 新手指南(原始文章已经发布表在IBM Developworks)
插个小广告:本人的<大话Java性能优化>一书已经在亚马逊.当当.京东.天猫出售,提前谢谢大家的支持. 亚马逊地址:https://www.amazon.cn/%E5%A4%A7%E8%A ...
- pythonspark实践_基于Python的Spark Streaming Kafka编程实践
版权声明:本文为CSDN博主原创文章,未经博主允许不得转载. 说明 Spark Streaming的原理说明的文章很多,这里不做介绍.本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些 ...
- Spark Streaming的容错和数据无丢失机制(WAL机制)
实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之处,Spark Streaing就支持driver和worker节点的错误恢复.然后,在使用某些数据源的时候,错误恢 ...
- Kafka不丢失数据与不重复消费数据
文章目录 一.不丢失数据 1.生产者数据不丢失 2.消费者数据不丢失 二.不重复消费数据 一.不丢失数据 1.生产者数据不丢失 同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待). ...
最新文章
- 99%网工都会遇到的10道经典面试问题
- 基于mysql数据库binlog的增量订阅消费
- dnn中个性化服务的使用
- 计算机技能鉴定操作试题,计算机操作员中级操作技能考核试卷职业技能鉴定国家题库...
- Python多态、鸭子类型
- 不再需要词典了,现在,AI通过无监督学习学会了双语翻译
- How to make .dmg install for Mac
- npm安装项目所有依赖包
- 小程序电子名片 制作
- 天正电气插入图块非常小与比例不符合
- 小学班级计算机社团活动章程,西华小学速算社团活动章程.doc
- 内存卡坏了怎么修复?内存卡恢复也不难
- Java中Scanner的进阶---求和与求平均数
- SLT学习(一)——STL组成介绍
- Unity模型制作导出规范
- 胖虎技术群Java后端的良师
- php网页制作过程,网页制作步骤
- 使用D3D8实现2D图形显示技术
- QII中的几个Warning的解决方法
- 电路中滤波电容和退耦电容_电容的多种作用,定时,耦合,滤波,去耦,微分,分频...
热门文章
- Java 压缩字符串
- [转载]在中文Windows环境下,控制台窗口中也可以用特殊符号拼出漂亮的表格来。...
- Niblack二值化方法的实现
- 利用ashx和ajax实现表格的异步填充
- 基于Ajax的Web框架Echo2 2.0发布
- python语言程序设计基础第二版第六章答案-python语言程序设计基础(第二版)第五章答案随笔...
- python入门编程-对没有编程基础的人来说,直接学Python入门IT合适吗?
- python资料书-《Python数据分析与应用》——图书配套资料下载
- python怎么写文件-python 头文件怎么写
- python详细安装教程环境配置-[Python] 安装及环境配置