一、Spark Streaming

  Spark Streaming是核心Spark API的扩展,可实现实时数据流的可扩展,高吞吐量,容错流处理。数据可以从许多来源(如Kafka,Flume,Kinesis或TCP sockets)中提取,并且可以使用以高级函数表示的复杂算法进行处理map,例如reducejoinwindow。最后,处理后的数据可以推送到文件系统,数据库和实时仪表板。

二、SparkStreaming实现

  Kafka和Zookeeper事先装过,没有先安装Zookeeper,则无法运行Kafka服务。但是,已经为CloudKarafka群集安装并配置了Zookeeper。

  我搭建的是 Scala 的 maven 项目,项目和环境都在单机上运行。

  1、先看我的 pom.xml 配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.sparkstream</groupId><artifactId>LyhSparkStreaming</artifactId><version>1.0-SNAPSHOT</version><properties><spark.version>2.3.3</spark.version><scala.version>2.11</scala.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_${scala.version}</artifactId><version>${spark.version}</version><exclusions><exclusion><groupId>commons-beanutils</groupId><artifactId>commons-beanutils-core</artifactId></exclusion></exclusions></dependency><!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka_2.11</artifactId><version>1.6.3</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_${scala.version}</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-mllib_${scala.version}</artifactId><version>${spark.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.6.0</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-surefire-plugin</artifactId><version>2.19</version><configuration><skip>true</skip></configuration></plugin></plugins></build></project>

  我用的Scala版本是 2.11.8。

  2、我的 Producer,代码读取 text1.txt文件中的内容,然后把每行数据发送到 Kafka的名叫 Hunter 的 topic 中,这个名字可以自己改,如果Kafka中不存在这个topic的话,系统会自动创建。text1.txt文件自己创建一个,一行一行的数据就可以,并没什么要求,文件的路径改成自己的。

package KafkaAndStreamingimport java.io.{BufferedReader, FileInputStream, FileNotFoundException, IOException, InputStreamReader}
import java.util.Propertiesimport org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, Producer, ProducerRecord, RecordMetadata}object TestKafkaProducer {type Key = Stringtype Val = Stringdef getProducerCnfig():Properties={/*** 对于kafka producer的相关配置文件项* 还有其他的属性,自己去查一下**/val props:Properties = new Properties()// Kafka 的 urlprops.put("bootstrap.servers", "localhost:9092")// group.id 随便写props.put("group.id", "producer-group")props.put("replication.factor", "min.insync.replicas")// 备份数量props.put("min.insync.replicas", "3")props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")props}def main(args: Array[String])={// 获取配置文件val props:Properties = getProducerCnfig()// 创建生产者var producer = new KafkaProducer[String, String](props)try{//读取保存的文件val fis:FileInputStream = new FileInputStream("/Users/hunter/text1.txt")val isr:InputStreamReader = new InputStreamReader(fis, "UTF-8")val br:BufferedReader = new BufferedReader(isr)var line: String = ""line = br.readLine()var i: Int=0while (line != null) {producer.send(toMessage(line.toString, Option(i.toString), Option("Hunter")),new Callback {override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = {println(s"""Message $i, send to: """ + recordMetadata.topic())}})i += 1line = br.readLine()}producer.close()br.close()isr.close()fis.close()} catch {case ex: FileNotFoundException =>{println("Missing file exception")}case ex: IOException => {println("IO Exception")}case _ =>{println("Have other Error")}}
//    dealWithData}//把我的消息包装成了ProducerRecordprivate def toMessage(value: String, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {val t = topic.getOrElse(Option("test").getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))require(!t.isEmpty, "Topic must not be empty")key match {case Some(k) => new ProducerRecord(t, k, value)case _ => new ProducerRecord(t, value)}}
}

  3、我的 Consumer端,这里主函数启动需要参数,参数为 localhost:9092 Hunter

  代码如下:

package KafkaAndStreamingimport kafka.common.TopicAndPartition
import kafka.message.MessageAndMetadata
import kafka.serializer.StringDecoder
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}object KafkaAndPrintInSpark {//判断设置的时输入参数,是否包含brokers 和 topic 至少参数的长度为2,即单机运行一个test的topic:   broker=localhost:9092 topic=testdef main(args: Array[String]) {if (args.length < 2) {System.err.println(s"""|Usage: DirectKafkaWordCount <brokers> <topics>|  <brokers> is a list of one or more Kafka brokers|  <topics> is a list of one or more kafka topics to consume from""".stripMargin)System.exit(1)}//将参数args读入到数组中val Array(brokers, topics) = args// 用2秒批间隔创建上下文val sparkConf = new SparkConf().setMaster("local[2]").setAppName("DirectKafkaWordCount")val ssc = new StreamingContext(sparkConf, Seconds(5))// 创建kafka流与brokers和topicval topicsSet = topics.split(",").toSetval kafkaParams = Map[String, String]("metadata.broker.list" -> brokers,"bootstrap.servers" -> "localhost:9092",
//      "auto.offset.reset" -> "smallest","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)// 此处注释部分是自定义偏移量 5L代表从 5开始读取。默认是读取最新的,offset从上一次读取结束的位置开始
//    val offsetList = List(("Hunter", 0, 5L))
//    val fromOffsets = setFromOffsets(offsetList)//对List进行处理,变成需要的格式,即Map[TopicAndPartition, Long]
//    val messageHandler = (mam: MessageAndMetadata[String, String]) => (mam.topic, mam.message()) //构建MessageAndMetadata,这个所有使用情况都是一样的,就这么写
//    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
//      ssc, kafkaParams, fromOffsets, messageHandler)messages.foreachRDD( rdd => {
//      if(rdd.count()>0) {rdd.foreach( records => {println("_1: " + records._1)println("_2: " + records._2)})val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd.foreachPartition { iter =>val o: OffsetRange = offsetRanges(TaskContext.get.partitionId())println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")}
//      }})// 开始计算ssc.start()ssc.awaitTermination()}def setFromOffsets(list: List[(String, Int, Long)]): Map[TopicAndPartition, Long] = {var fromOffsets: Map[TopicAndPartition, Long] = Map()for (offset <- list) {val tp = TopicAndPartition(offset._1, offset._2)//topic和分区数fromOffsets += (tp -> offset._3)           // offset位置}fromOffsets}
}

  三、最后结果

    1、Producer

       

    2、Consumer

   

转载于:https://www.cnblogs.com/Lyh1997/p/11458614.html

Scala Spark Streaming + Kafka + Zookeeper完成数据的发布和消费相关推荐

  1. spark streaming kafka Couldn't find leader

    问题描述: 使用spark streaming接受kafka数据(使用direct方式)报错 Couldn't find leader offsets for Set([test,0], [test, ...

  2. spark spark streaming + kafka receiver方式消费消息

    2019独角兽企业重金招聘Python工程师标准>>> kafka + spark streaming 集群 前提: spark 安装成功,spark 1.6.0 zookeeper ...

  3. 使用spark.streaming.kafka.consumer.poll.ms和reconnect.backoff.ms解决spark streaming消费kafka时任务不稳定的问题

    问题描述 在用spark streaming程序消费kafka的数据时,遇到了一个神奇的现象:同样的数据量.相似的数据,在消费时,有些批次的数据在做map操作时神奇的多了40多秒,具体看下面的数据:在 ...

  4. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  5. Spark Streaming概述_大数据培训

    Spark Streaming是什么 Spark Streaming用于流式数据的处理.Spark Streaming支持的数据输入源很多,例如:Kafka.Flume.Twitter.ZeroMQ和 ...

  6. Spark Streaming 新手指南(原始文章已经发布表在IBM Developworks)

    插个小广告:本人的<大话Java性能优化>一书已经在亚马逊.当当.京东.天猫出售,提前谢谢大家的支持. 亚马逊地址:https://www.amazon.cn/%E5%A4%A7%E8%A ...

  7. pythonspark实践_基于Python的Spark Streaming Kafka编程实践

    版权声明:本文为CSDN博主原创文章,未经博主允许不得转载. 说明 Spark Streaming的原理说明的文章很多,这里不做介绍.本文主要介绍使用Kafka作为数据源的编程模型,编码实践,以及一些 ...

  8. Spark Streaming的容错和数据无丢失机制(WAL机制)

    实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之处,Spark Streaing就支持driver和worker节点的错误恢复.然后,在使用某些数据源的时候,错误恢 ...

  9. Kafka不丢失数据与不重复消费数据

    文章目录 一.不丢失数据 1.生产者数据不丢失 2.消费者数据不丢失 二.不重复消费数据 一.不丢失数据 1.生产者数据不丢失 同步模式:配置=1(只有Leader收到,-1所有副本成功,0不等待). ...

最新文章

  1. 99%网工都会遇到的10道经典面试问题
  2. 基于mysql数据库binlog的增量订阅消费
  3. dnn中个性化服务的使用
  4. 计算机技能鉴定操作试题,计算机操作员中级操作技能考核试卷职业技能鉴定国家题库...
  5. Python多态、鸭子类型
  6. 不再需要词典了,现在,AI通过无监督学习学会了双语翻译
  7. How to make .dmg install for Mac
  8. npm安装项目所有依赖包
  9. 小程序电子名片 制作
  10. 天正电气插入图块非常小与比例不符合
  11. 小学班级计算机社团活动章程,西华小学速算社团活动章程.doc
  12. 内存卡坏了怎么修复?内存卡恢复也不难
  13. Java中Scanner的进阶---求和与求平均数
  14. SLT学习(一)——STL组成介绍
  15. Unity模型制作导出规范
  16. 胖虎技术群Java后端的良师
  17. php网页制作过程,网页制作步骤
  18. 使用D3D8实现2D图形显示技术
  19. QII中的几个Warning的解决方法
  20. 电路中滤波电容和退耦电容_电容的多种作用,定时,耦合,滤波,去耦,微分,分频...

热门文章

  1. Java 压缩字符串
  2. [转载]在中文Windows环境下,控制台窗口中也可以用特殊符号拼出漂亮的表格来。...
  3. Niblack二值化方法的实现
  4. 利用ashx和ajax实现表格的异步填充
  5. 基于Ajax的Web框架Echo2 2.0发布
  6. python语言程序设计基础第二版第六章答案-python语言程序设计基础(第二版)第五章答案随笔...
  7. python入门编程-对没有编程基础的人来说,直接学Python入门IT合适吗?
  8. python资料书-《Python数据分析与应用》——图书配套资料下载
  9. python怎么写文件-python 头文件怎么写
  10. python详细安装教程环境配置-[Python] 安装及环境配置