一、Kafka准备工作

Kafka的安装,请看另外一文,一定要选择和自己电脑上已经安装的scala版本号一致才可以,本教程安装的Spark版本号是1.6.2,scala版本号是2.10,所以,一定要选择Kafka版本号是2.10开头的。比如,到Kafka官网中,可以下载安装文件Kafka_2.10-0.10.1.0,前面的2.10就是支持的scala版本号,后面的0.10.1.0是Kafka自身的版本号。

下面,我们启动Kafka。
请登录Linux系统(本教程统一使用hadoop用户登录),打开一个终端,输入下面命令启动Zookeeper服务:

cd /usr/local/kafka
./bin/zookeeper-server-start.sh config/zookeeper.properties

注意,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,是Zookeeper服务器启动了,正在处于服务状态。所以,千万不要关闭这个终端窗口,一旦关闭,zookeeper服务就停止了,所以,不能关闭这个终端窗口。

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

同样,执行上面命令以后,终端窗口会返回一堆信息,然后就停住不动了,是Kafka服务器启动了,正在处于服务状态。所以,千万不要关闭这个终端窗口,一旦关闭,Kafka服务就停止了,所以,不能关闭这个终端窗口。

下面先测试一下Kafka是否可以正常使用。再另外打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsendertest”的topic(关于什么是topic,请参考《Ubuntu中Kafka的安装》):

cd /usr/local/kafka
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest
//这个topic叫wordsendertest,2181是zookeeper默认的端口号,partition是topic里面的分区数,replication-factor是备份的数量,在kafka集群中使用,这里单机版就不用备份了
//可以用list列出所有创建的topics,来查看上面创建的topic是否存在
./bin/kafka-topics.sh --list --zookeeper localhost:2181

这个名称为“wordsendertest”的topic,就是专门负责采集发送一些单词的。
下面,我们需要用producer来产生一些数据,请在当前终端内继续输入下面命令:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic wordsendertest

上面命令执行后,你就可以在当前终端内用键盘输入一些英文单词,比如我们可以输入:

hello world
hello teach
hello spark

这些单词就是数据源,这些单词会被Kafka捕捉到以后发送给消费者。我们现在可以启动一个消费者,来查看刚才producer产生的数据。请另外打开第四个终端,输入下面命令:

cd /usr/local/kafka
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wordsendertest --from-beginning

可以看到,屏幕上会显示出如下结果,也就是刚才你在另外一个终端里面输入的内容:

hello world
hello teach
hello spark

到这里,与Kafka相关的准备工作就顺利结束了。注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

二、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件)。按照我们前面安装好的Spark版本(spark1.6.2安装),这些jar包都不在里面,为了证明这一点,我们现在可以测试一下。请打开一个新的终端,然后启动spark-shell:

cd /usr/local/spark
./bin/spark-shell

启动成功后,在spark-shell中执行下面import语句:

scala> import org.apache.spark.streaming.kafka._
<console>:25: error: object kafka is not a member of package org.apache.spark.streamingimport org.apache.spark.streaming.kafka._^

你可以看到,马上会报错,因为找不到相关的jar包。所以,现在我们就需要下载spark-streaming-kafka_2.10.jar。
现在请在Linux系统中,打开一个火狐浏览器,请点击这里访问Spark官网,里面有提供spark-streaming-kafka_2.10.jar文件的下载,其中,2.10表示scala的版本。这个下载页面会列出spark-streaming-kafka_2.10.jar的很多版本,我们这里选择1.6.2版本(因为本教程安装的Spark版本是1.6.2),你可以点击1.6.2版本的按钮,进入spark-streaming-kafka_2.10-1.6.2.jar的下载页面,点击下载。下载后的文件会被默认保存在当前Linux登录用户的下载目录下,本教程统一使用hadoop用户名登录Linux系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们就把这个文件复制到Spark目录的lib目录下。请新打开一个终端,输入下面命令:

cd /usr/local/spark/lib
mkdir kafka
cd ~
cd 下载
cp ./spark-streaming-kafka_2.10-1.6.2.jar /usr/local/spark/lib/kafka

这样,我们就把spark-streaming-kafka_2.10-1.6.2.jar文件拷贝到了“/usr/local/spark/lib/kafka”目录下。
下面开始的过程,让笔者消耗了2个白天和2个黑夜,测试了网络上各种解决方案,反复失败,最终,只能自己进行“蛮力测试”,下载各种版本进行尝试,最终,得以解决,解决的秘诀是,第一个秘诀是,凡是遇到“java.lang.NoSuchMethodError”这种错误,一定是由版本不一致导致的,也就是spark、scala、spark streaming、Kafka这几个软件的版本不一致,所以,必须保证它们版本的一致性。比如,笔者电脑安装了Spark1.6.2版本,它包含的scala版本是2.10.5,那么,与之对应的straming的版本也必须是spark-streaming_2.10-1.6.2.jar,对应的Kafka也必须是spark-streaming-kafka_2.10-1.6.2.jar。第二个秘诀是,凡是遇到“Class Not Found”这种错误,一般是由缺少jar包引起的。到底缺少什么jar包,笔者在网络上找不到解决方案,也只能靠自己“蒙”,所以,下面笔者测试通过的方法,只是自己反复测试两天后“蒙”对了,但是,也说不出是什么道理。请你按照下面操作即可。

还需要在Linux系统中,打开火狐浏览器,到网络上下载另外一个文件spark-streaming_2.10-1.6.2.jar(下载地址),其中,2.10是scala版本号,1.6.2是Spark版本号。
spark-streaming_2.10-1.6.1.jar下载成功以后是被放到当前Linux登录用户的下载目录下,本教程统一使用hadoop用户名登录Linux系统,所以,文件下载后会被保存到“/home/hadoop/下载”目录下面。现在,我们就把这个文件复制到Spark目录的lib目录下。请新打开一个终端,输入下面命令:

cd /usr/local/spark/lib
cd ~
cd 下载
cp ./spark-streaming_2.10-1.6.1.jar /usr/local/spark/lib/kafka

还没完,下面还要继续把Kafka安装目录的lib目录下的所有jar文件复制到“/usr/local/spark/lib/kafka”目录下,请在终端中执行下面命令:

cd /usr/local/kafka/libs
ls
cp ./* /usr/local/spark/lib/kafka

为什么要拷贝所有的jar过来,因为笔者遇到错误以后,知道是因为缺少jar包,但是,实在不知道是缺少什么jar包,所以,就全部拷贝过来了。但是,全部拷贝过来以后,有些jar包会和spark中已经有的jar包发生冲突,程序一运行就会出现一堆的错误,根据错误提示信息,笔者大概猜测是因为冲突引起的,所以,就删除一些可能引起冲突的jar包,删除一些后,再测试程序,再出错,再删除一批,最后测试通过了,就发现,需要把下面这些jar包删除掉,这样才不会出现错误:

cd /usr/local/kafka/libs
ls
rm log4j*
rm jackson*

上面这些方法纯粹是靠蛮力反复测试才成功的,没有找到问题的本质,也没有网络资料可以帮笔者解决。但是,起码通过这种方式,程序可以顺利运行了。

现在,我们需要配置spark-env.sh文件,让spark能够在启动的时候找到spark-streaming-kafka_2.10-1.6.2.jar等5个jar文件。命令如下:

cd /usr/local/spark/conf
vim spark-env.sh

使用vim编辑器打开了spark-env.sh文件,因为这个文件之前已经反复修改过,目前里面的前面几行的内容应该是这样的:

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/spark/lib/hbase/*
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

我们只要简单修改一下,把“/usr/local/spark/lib/kafka/*“增加进去,修改后的内容如下:

export SPARK_CLASSPATH=$SPARK_CLASSPATH:/usr/local/spark/lib/hbase/*:/usr/local/spark/lib/kafka/*
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

保存该文件后,退出vim编辑器。然后,就可以启动spark-shell:

cd /usr/local/spark
./bin/spark-shell

启动成功后,再次执行命令:

scala> import org.apache.spark.streaming.kafka._
//会显示下面信息
import org.apache.spark.streaming.kafka._

说明导入成功了。这样,我们就已经准备好了Spark环境,它可以支持kafka相关编程了。

编写Spark程序使用Kafka数据源

下面,我们就可以进行程序编写了。请新打开一个终端,然后,执行命令创建代码目录:

cd /usr/local/spark/mycode
mkdir kafka
cd kafka
mkdir -p src/main/scala
cd src/main/scala
vim KafkaWordProducer.scala

使用vim编辑器新建了KafkaWordProducer.scala,它是产生一系列字符串的程序,会产生随机的整数序列,每个整数被当做一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{ProducerConfig, KafkaProducer, ProducerRecord}
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConfobject KafkaWordProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper connection propertiesval props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)// Send some messageswhile(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString).mkString(" ")val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}
}

保存后退出vim编辑器。然后,继续在当前目录下创建KafkaWordCount.scala代码文件:

vim KafkaWordCount.scala

KafkaWordCount.scala是用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka.KafkaUtilsobject KafkaWordCount{def main(args:Array[String]){StreamingExamples.setStreamingLogLevels()
val sc = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sc,Seconds(10))
ssc.checkpoint("file:///usr/local/spark/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动hadoop
val zkQuorum = "localhost:2181" //Zookeeper服务器地址
val group = "1"  //topic所在的group,可以设置为自己想要的名称,比如不用1,而是val group = "test-consumer-group"
val topics = "wordsender"  //topics的名称
val numThreads = 1  //每个topic的分区数
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val lines = lineMap.map(_._2)
val words = lines.flatMap(_.split(" "))
val pair = words.map(x => (x,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2) //这行代码的含义在下一节的窗口转换操作中会有介绍
wordCounts.print
ssc.start
ssc.awaitTermination
}
}

保存后退出vim编辑器。然后,继续在当前目录下创建StreamingExamples.scala代码文件:

vim StreamingExamples.scala

下面是StreamingExamples.scala的代码,用于设置log4j:

import org.apache.spark.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}}
}

这样,我们在“/usr/local/spark/mycode/kafka/src/main/scala”目录下,就有了如下三个代码文件:

然后,请执行下面命令:

cd /usr/local/spark/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka_2.10" % "1.6.2"

保存文件退出vim编辑器。然后执行下面命令,进行打包编译:

cd /usr/local/spark/mycode/kafka/
/usr/local/sbt/sbt package


打包成功后,就可以执行程序测试效果了。
首先,请启动hadoop(有可能你前面采用了ssc.checkpoint(“/user/hadoop/checkpoint”)这种形式,写入HDFS):

cd /usr/local/hadoop
./sbin/start-all.sh

启动hadoop成功以后,就可以测试我们刚才生成的词频统计程序了。
要注意,我们之前已经启动了zookeeper服务,启动了kafka服务,因为我们之前那些终端窗口都没有关闭,所以,这些服务都在运行。如果你不小心关闭了之前的终端窗口,那就请回到本文前面,再启动zookeeper服务,启动kafka服务。

首先,请新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd /usr/local/spark
/usr/local/spark/bin/spark-submit --class "KafkaWordProducer" /usr/local/spark/mycode/kafka/target/scala-2.10/simple-project_2.10-1.0.jar localhost:9092 wordsender 3 5

注意,上面命令中,”localhost:9092 wordsender 3 5″是提供给KafkaWordProducer程序的4个输入参数,第1个参数localhost:9092是Kafka的broker的地址,第2个参数wordsender是topic的名称,我们在KafkaWordCount.scala代码中已经把topic名称写死掉,所以,KafkaWordCount程序只能接收名称为”wordsender”的topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示,每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现新的单词,如下:

这个终端窗口就放在这里,不要关闭,千万不要关闭,就让它一直不断发送单词。

然后,请新打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd /usr/local/spark
/usr/local/spark/bin/spark-submit --class "KafkaWordCount"
/usr/local/spark/mycode/kafka/target/scala-2.10/simple-project_2.10-1.0.jar

Spark Streaming和Kafka的集成完成!!!

---------------------------------------------------------------------提示---------------------------------------------------------------------
如果你在部署过程中,遇到什么问题,可以通过评论区加我微信,我们相互讨论。共同成长!!!

---------------------------------------------------------------------提示---------------------------------------------------------------------

spark streaming运行kafka数据源相关推荐

  1. spark streaming 消费 kafka入门采坑解决过程

    kafka 服务相关的命令 # 开启kafka的服务器 bin/kafka-server-start.sh -daemon config/server.properties & # 创建top ...

  2. Spark Streaming 实战案例(五) Spark Streaming与Kafka

    主要内容 Spark Streaming与Kafka版的WordCount示例(一) Spark Streaming与Kafka版的WordCount示例(二) 1. Spark Streaming与 ...

  3. sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

    Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...

  4. Spark中如何管理Spark Streaming消费Kafka的偏移量

    spark streaming 版本 2.1 kafka 版本0.9.0.0 在这之前,先重述下spark streaming里面管理偏移量的策略,默认的spark streaming它自带管理的of ...

  5. Flume 以twitter为source,kafka为channel,hdfs为sink,再用spark streaming 读kafka topic

    Flume 以twitter为source,kafka为channel,hdfs为sink,再用spark streaming 读kafka topic Flume的配置文件: kafka_twitt ...

  6. 根据官网文档看Spark Streaming对接Kafka的两种方式, 以及如何实现Exactly Once语义

    注: 本文算是本人的学习记录, 中间可能有些知识点并不成熟, 不能保证正确性. 只能算是对官网文档作了个翻译和解读, 随时有可能回来更新和纠错 上一篇文章讨论了Spark Streaming的WAL( ...

  7. Spark Streaming使用Kafka保证数据零丢失

    为什么80%的码农都做不了架构师?>>>    源文件放在github,随着理解的深入,不断更新,如有谬误之处,欢迎指正.原文链接https://github.com/jacksu/ ...

  8. 通过案例对 spark streaming 透彻理解三板斧之三:spark streaming运行机制与架构

    本期内容: 1. Spark Streaming Job架构与运行机制 2. Spark Streaming 容错架构与运行机制 事实上时间是不存在的,是由人的感官系统感觉时间的存在而已,是一种虚幻的 ...

  9. Spark Streaming 遇到 kafka

    Spark Streaming 遇到 kafka 站酷 | 插画 搭建流程略,主要讲一下如何更好的结合使用,看图说话. Kafka 结合 Spark Streaming 实现在线的准实时流分析,除了保 ...

最新文章

  1. 使用 Eigen 库写第一个程序
  2. 【c++】14.编译proto和proto相关用法
  3. typora图床教程-阿里云版
  4. Java Mail+Thymeleaf模板引擎实现发送HTML格式邮件
  5. Kafka创建Topic时如何将分区放置到不同的Broker中
  6. python用xpath爬取10页网站图片
  7. socket buffer套接字缓存
  8. php root权限执行命令,如何使用PHP执行需要root权限的系统命令
  9. WIN10 64位 JDK的安装
  10. java随机数函数生成指定区间的,意外的惊喜
  11. PHP 缓存 内存,php - 一个大型数组变量的APC内存缓存(22MB)
  12. SQL server 2008 安装教程
  13. (十一)可编辑表格EditorGridPanel
  14. 对称矩阵的逆矩阵也是对称矩阵吗
  15. 强化学习(8):Asynchronous Advantage Actor-Critic(A3C)算法
  16. Latex之复杂距离、自定义章节样式、自定义目录样式
  17. Day54.XML解析(DOM4J)、Tomcat服务器、HTML协议简介: 请求、响应报文、响应码
  18. android代码 发警报音,Android 8中的警报重复
  19. Binomial Coefficient(二项式系数)的计算
  20. GGSN - SCP 业务控制点

热门文章

  1. 适合婚礼唱的流行歌_流行的婚礼歌曲被重新想象成数据即纸杯蛋糕
  2. 【中学教师资格证-教育知识与能力】简答题
  3. 社区说|Android 13 新特性 EROFS-只读文件系统解析
  4. 经验整理-win10安装ubuntu18.04.2双系统(NVIDIA Geforce GTX 1060独显)
  5. Golff Lend 借贷重磅上线
  6. NOIP提高组初赛[选择题知识点汇总]
  7. 人脸识别选这几家就对了!
  8. [cocos2d-x]捕鱼达人炮台射击角度的旋转实现
  9. String.split()方法介绍
  10. 【MATLAB】关于matlab的table数据使用