SparkStreaming kafka zookeeper本地环境调试安装
项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
欢迎大家star,留言,一起学习进步
1.需求
线上需要使用流式数据实时反馈CTR,因此想用spark streaming计算相关数据。之前一直没有在本地配置streaming的调试环境,因此在本地安装一下streaming的调试环境并记录。
2.需要安装的组件
spark streaming一般会接消息队列作为数据源,以kafka为例,所以需要在本地安装kafka。kafka又依赖zookeeper,所以还需要安装zookeeper。
3. 安装zookeeper
3.1 下载zk
先去zookeeper官网下载zookeeper对应的.bin.tar.gz包,我下载的版本为apache-zookeeper-3.5.8-bin.tar.gz。
下载完毕以后解压。
3.2 配置环境变量
编辑~/.bashrc文件,加上环境变量
export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin
其中ZOOKEEPER_HOME为解压的地址。
3.3 修改zoo.cfg
需要将 $ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg, zoo.cfg。
默认配置为
# The number of milliseconds of each
ticktickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1
将其修改为
ticketTime=2000
clientPort=2181
dataDir=/home/mi/wanglei/zkdata
dataLogDir=/home/mi/wanglei/zklogs
其中,dataDir, dataLogDir自己配置。
3.4 启动zk
sh zkServer.sh start
输出信息为
ZooKeeper JMX enabled by default
Using config: /home/mi/wanglei/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
说明启动成功
3.5 验证zk信息
在另外终端中输入
telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
然后输入stat,会报错
stat is not executed because it is not in the whitelist. connection closed by foreign host
可以修改zkServer.sh
...ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"fi
elseecho "JMX disabled by user request" >&2ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi
在后面加上一行
ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"
此时再执行
telnet 127.0.0.1 2181
输入stat
Zookeeper version: 3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:07 GMT
Clients:/127.0.0.1:47452[1](queued=0,recved=1992,sent=1995)/127.0.0.1:50060[0](queued=0,recved=1,sent=0)Latency min/avg/max: 0/1/62
Received: 2081
Sent: 2087
Connections: 2
Outstanding: 0
Zxid: 0xa1
Mode: standalone
Node count: 134
Connection closed by foreign host.
4.安装kafka
4.1 下载并解压
去kafka官网下载并解压kafka到本地
4.2 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
4.3启动生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
注意kafka依赖zookeeper,需要保证zookeeper正常运行。
5.streaming代码
终于到最后写streaming的环节了。可以在IDE中新建一个maven项目,具体的pom.xml配置如下
5.1.配置pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark-streaming-local</artifactId><version>1.0-SNAPSHOT</version><properties><scala.version>2.11.2</scala.version><!--<spark.version>2.1.0</spark.version>--><spark.version>2.1.0</spark.version><spark.kafka.version>1.6.3</spark.kafka.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><gjson.version>2.8.0</gjson.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><fork>true</fork><verbose>true</verbose><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><configuration><scalaVersion>2.11.8</scalaVersion></configuration><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution></executions></plugin></plugins></build></project>
5.2.写一个wordcount demo
import org.apache.kafka.common.serialization.{IntegerDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object KafkaWordCount {def main(args: Array[String]): Unit = {val zkQuorum = "xxx:9092"val topics = Array("test")val kafkaMap = Map[String, Object]("bootstrap.servers" -> zkQuorum,"key.deserializer" -> classOf[IntegerDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "test","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean),"session.timeout.ms" -> "30000")val conf = new SparkConf().setAppName(KafkaWordCount.getClass.getSimpleName).setMaster("local[4]")val ssc = new StreamingContext(conf, Seconds(20))val consumer = ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)val lines = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumer).map(_.value())val wordCount = lines.flatMap(_.split(" ")).map(key => (key, 1L)).reduceByKey(_ + _)wordCount.print()ssc.start()ssc.awaitTermination()}
}
在IDE里启动,可以看到IDE里的输出
20/05/12 15:16:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
20/05/12 15:16:40 INFO TaskSetManager: Finished task 0.0 in stage 19.0 (TID 22) in 5 ms on localhost (executor driver) (1/3)
20/05/12 15:16:40 INFO TaskSetManager: Finished task 1.0 in stage 19.0 (TID 23) in 4 ms on localhost (executor driver) (2/3)
20/05/12 15:16:40 INFO Executor: Finished task 2.0 in stage 19.0 (TID 24). 1718 bytes result sent to driver
20/05/12 15:16:40 INFO TaskSetManager: Finished task 2.0 in stage 19.0 (TID 24) in 5 ms on localhost (executor driver) (3/3)
20/05/12 15:16:40 INFO TaskSchedulerImpl: Removed TaskSet 19.0, whose tasks have all completed, from pool
20/05/12 15:16:40 INFO DAGScheduler: ResultStage 19 (print at KafkaWordCount.scala:40) finished in 0.006 s
20/05/12 15:16:40 INFO DAGScheduler: Job 9 finished: print at KafkaWordCount.scala:40, took 0.011706 s
20/05/12 15:16:40 INFO JobScheduler: Finished job streaming job 1589267800000 ms.0 from job set of time 1589267800000 ms
20/05/12 15:16:40 INFO JobScheduler: Total delay: 0.088 s for time 1589267800000 ms (execution: 0.064 s)
20/05/12 15:16:40 INFO ShuffledRDD: Removing RDD 19 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 19
20/05/12 15:16:40 INFO MapPartitionsRDD: Removing RDD 18 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 18
20/05/12 15:16:40 INFO MapPartitionsRDD: Removing RDD 17 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 17
20/05/12 15:16:40 INFO MapPartitionsRDD: Removing RDD 16 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 16
20/05/12 15:16:40 INFO KafkaRDD: Removing RDD 15 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 15
20/05/12 15:16:40 INFO ReceivedBlockTracker: Deleting batches:
20/05/12 15:16:40 INFO InputInfoTracker: remove old batch metadata: 1589267760000 ms
-------------------------------------------
Time: 1589267800000 ms
-------------------------------------------
然后在kafka producer中输入
a b c ab a b c d e
输出:
(d,1)
(e,1)
(a,2)
(ab,1)
(b,2)
(c,2)
实现了wordcount的功能!
SparkStreaming kafka zookeeper本地环境调试安装相关推荐
- 零基础搭建PHP本地环境并安装WordPress网站(图文指导)...
搭建PHP本地环境前言 以前在大学课堂上学过一点JAVA, PHP. 因为那时是零基础,需要自己搭建APACH, MYSQL, PHPADMIN过程挺烦的,本地环境都不知道是啥,但是做出来结果却很高兴 ...
- php本地的调试安装,教你本地安装、运行、调试PHP程序
安装工具 对学习PHP的新手来说,WINDOWS下环境配置是一件很困难的事:至少对于我来说本地调试PHP程序比登天还要困难,繁荣拖沓的各种程序.以前我博客程序用的是PJBLOG,本地的IIS就完全可以 ...
- mysql数据库下载 织梦本地安装教程_织梦本地环境包安装数据库管理界面phpMyAdmin...
用织梦环境安装包搭建本地PHP环境但是却不能登陆数据库后台. 那么我们真的就没有办法了吗?回答是否定的!我们可以在本地安装phpmyadmin. phpMyAdmin 是一个以PHP为基础,以Web- ...
- zookeeper本地下载与安装
zookeeper下载与安装 1. 下载 从官网上下载:Apache ZooKeeper 2. 下载本地后进行解压,其结构如下,不同版本略有不同 3. 修改配置信息 进入目录中的conf目录,有一个 ...
- SparkStreaming+kafka+flume+hbase日志实时流处理项目
1.项目背景: 互联网访问日志概述 为什么要记录访问日志的行为呢? 通过日志我们可以得到网站页面的访问量,网站的黏性,推荐用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计.分析, ...
- 使用 Docker 搭建 Laravel 本地环境
(原文地址:https://blog.tanteng.me/2017/...) Laravel 官方提供 Homestead 和 Valet 作为本地开发环境,Homestead 是一个官方预封装的 ...
- 若依微服务版RuoYi-Cloud本地环境快速搭建教程
参考若依官网文档:https://doc.ruoyi.vip/ruoyi-cloud/ 具体步骤: 一.下载RuoYi-Cloud源码 源码官方地址:https://gitee.com/y_proje ...
- Zookeeper和Kafka window环境下安装、使用
Zookeeper和Kafka window环境下安装.使用 一,Zookeeper windows下的安装以及使用 命令 1,下载并安装在windows上 2,配置zookeeper 3,启动zoo ...
- 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)
高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...
- Zookeeper本地安装配置(windows)
1. 下载zookeeper,下载地址: https://www.apache.org/dyn/closer.cgi/zookeeper/ 2. 解压后,进入目录中的conf目录,有一个zoo_sam ...
最新文章
- bower解决js的依赖管理
- 房地产企业大量海外融资,或将来资不抵债,被收购
- GRUB and LVM and EVMS
- c语言通用Makefile
- github 提交报403 forbidden的错误解决
- tensorflow-RNN和LSTM
- Arduino开发板制作
- python requests cookies请求_python+requests实现接口测试 - cookies的使用
- 电脑版剪映v0.6.9内测版
- 代码手写UI,xib和StoryBoard间的博弈,以及Interface Builder的一些小技巧
- 惠普T5325 惠普T5565 惠普T5400 瘦客机评测
- 5s管理推进的三个阶段及三大实施原则
- 拖拉机大战更新了,更多新功能
- 扫福得福背后,支付宝AR红包的技术创新与故事
- 纳米数据,足球比分,赛事数据接口api,足球数据接口
- 基于PHP的人才招聘网站设计
- 将特殊字体添加到了html页面中
- 定点数的运算 —— 原码、补码的乘法运算
- 初识C语言之详解char类型
- Tensorflow(03)——keras和tensorflow的关系