项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
欢迎大家star,留言,一起学习进步

1.需求

线上需要使用流式数据实时反馈CTR,因此想用spark streaming计算相关数据。之前一直没有在本地配置streaming的调试环境,因此在本地安装一下streaming的调试环境并记录。

2.需要安装的组件

spark streaming一般会接消息队列作为数据源,以kafka为例,所以需要在本地安装kafka。kafka又依赖zookeeper,所以还需要安装zookeeper。

3. 安装zookeeper

3.1 下载zk

先去zookeeper官网下载zookeeper对应的.bin.tar.gz包,我下载的版本为apache-zookeeper-3.5.8-bin.tar.gz。

下载完毕以后解压。

3.2 配置环境变量

编辑~/.bashrc文件,加上环境变量

export ZOOKEEPER_HOME=/opt/zookeeper
export PATH=$PATH:$ZOOKEEPER_HOME/bin

其中ZOOKEEPER_HOME为解压的地址。

3.3 修改zoo.cfg

需要将 $ZOOKEEPER_HOME/conf 目录下的 zoo_sample.cfg 重命名为 zoo.cfg, zoo.cfg。
默认配置为

# The number of milliseconds of each
ticktickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

将其修改为

ticketTime=2000
clientPort=2181
dataDir=/home/mi/wanglei/zkdata
dataLogDir=/home/mi/wanglei/zklogs

其中,dataDir, dataLogDir自己配置。

3.4 启动zk

sh zkServer.sh start

输出信息为

ZooKeeper JMX enabled by default
Using config: /home/mi/wanglei/soft/apache-zookeeper-3.5.8-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

说明启动成功

3.5 验证zk信息

在另外终端中输入

telnet 127.0.0.1 2181
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.

然后输入stat,会报错

stat is not executed because it is not in the whitelist. connection closed by foreign host

可以修改zkServer.sh

...ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"fi
elseecho "JMX disabled by user request" >&2ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi

在后面加上一行

ZOOMAIN="-Dzookeeper.4lw.commands.whitelist=* ${ZOOMAIN}"

此时再执行

telnet 127.0.0.1 2181

输入stat

Zookeeper version: 3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:07 GMT
Clients:/127.0.0.1:47452[1](queued=0,recved=1992,sent=1995)/127.0.0.1:50060[0](queued=0,recved=1,sent=0)Latency min/avg/max: 0/1/62
Received: 2081
Sent: 2087
Connections: 2
Outstanding: 0
Zxid: 0xa1
Mode: standalone
Node count: 134
Connection closed by foreign host.

4.安装kafka

4.1 下载并解压

去kafka官网下载并解压kafka到本地

4.2 创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

4.3启动生产者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

注意kafka依赖zookeeper,需要保证zookeeper正常运行。

5.streaming代码

终于到最后写streaming的环节了。可以在IDE中新建一个maven项目,具体的pom.xml配置如下

5.1.配置pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark-streaming-local</artifactId><version>1.0-SNAPSHOT</version><properties><scala.version>2.11.2</scala.version><!--<spark.version>2.1.0</spark.version>--><spark.version>2.1.0</spark.version><spark.kafka.version>1.6.3</spark.kafka.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><gjson.version>2.8.0</gjson.version></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version><configuration><source>1.8</source><target>1.8</target><fork>true</fork><verbose>true</verbose><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><configuration><scalaVersion>2.11.8</scalaVersion></configuration><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>compile</goal></goals></execution></executions></plugin></plugins></build></project>

5.2.写一个wordcount demo

import org.apache.kafka.common.serialization.{IntegerDeserializer, StringDeserializer}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}object KafkaWordCount {def main(args: Array[String]): Unit = {val zkQuorum = "xxx:9092"val topics = Array("test")val kafkaMap = Map[String, Object]("bootstrap.servers" -> zkQuorum,"key.deserializer" -> classOf[IntegerDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "test","auto.offset.reset" -> "latest","enable.auto.commit" -> (false: java.lang.Boolean),"session.timeout.ms" -> "30000")val conf = new SparkConf().setAppName(KafkaWordCount.getClass.getSimpleName).setMaster("local[4]")val ssc = new StreamingContext(conf, Seconds(20))val consumer = ConsumerStrategies.Subscribe[String, String](topics, kafkaMap)val lines = KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,consumer).map(_.value())val wordCount = lines.flatMap(_.split(" ")).map(key => (key, 1L)).reduceByKey(_ + _)wordCount.print()ssc.start()ssc.awaitTermination()}
}

在IDE里启动,可以看到IDE里的输出

20/05/12 15:16:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
20/05/12 15:16:40 INFO TaskSetManager: Finished task 0.0 in stage 19.0 (TID 22) in 5 ms on localhost (executor driver) (1/3)
20/05/12 15:16:40 INFO TaskSetManager: Finished task 1.0 in stage 19.0 (TID 23) in 4 ms on localhost (executor driver) (2/3)
20/05/12 15:16:40 INFO Executor: Finished task 2.0 in stage 19.0 (TID 24). 1718 bytes result sent to driver
20/05/12 15:16:40 INFO TaskSetManager: Finished task 2.0 in stage 19.0 (TID 24) in 5 ms on localhost (executor driver) (3/3)
20/05/12 15:16:40 INFO TaskSchedulerImpl: Removed TaskSet 19.0, whose tasks have all completed, from pool
20/05/12 15:16:40 INFO DAGScheduler: ResultStage 19 (print at KafkaWordCount.scala:40) finished in 0.006 s
20/05/12 15:16:40 INFO DAGScheduler: Job 9 finished: print at KafkaWordCount.scala:40, took 0.011706 s
20/05/12 15:16:40 INFO JobScheduler: Finished job streaming job 1589267800000 ms.0 from job set of time 1589267800000 ms
20/05/12 15:16:40 INFO JobScheduler: Total delay: 0.088 s for time 1589267800000 ms (execution: 0.064 s)
20/05/12 15:16:40 INFO ShuffledRDD: Removing RDD 19 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 19
20/05/12 15:16:40 INFO MapPartitionsRDD: Removing RDD 18 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 18
20/05/12 15:16:40 INFO MapPartitionsRDD: Removing RDD 17 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 17
20/05/12 15:16:40 INFO MapPartitionsRDD: Removing RDD 16 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 16
20/05/12 15:16:40 INFO KafkaRDD: Removing RDD 15 from persistence list
20/05/12 15:16:40 INFO BlockManager: Removing RDD 15
20/05/12 15:16:40 INFO ReceivedBlockTracker: Deleting batches:
20/05/12 15:16:40 INFO InputInfoTracker: remove old batch metadata: 1589267760000 ms
-------------------------------------------
Time: 1589267800000 ms
-------------------------------------------

然后在kafka producer中输入

a b c ab a b c d e

输出:

(d,1)
(e,1)
(a,2)
(ab,1)
(b,2)
(c,2)

实现了wordcount的功能!

SparkStreaming kafka zookeeper本地环境调试安装相关推荐

  1. 零基础搭建PHP本地环境并安装WordPress网站(图文指导)...

    搭建PHP本地环境前言 以前在大学课堂上学过一点JAVA, PHP. 因为那时是零基础,需要自己搭建APACH, MYSQL, PHPADMIN过程挺烦的,本地环境都不知道是啥,但是做出来结果却很高兴 ...

  2. php本地的调试安装,教你本地安装、运行、调试PHP程序

    安装工具 对学习PHP的新手来说,WINDOWS下环境配置是一件很困难的事:至少对于我来说本地调试PHP程序比登天还要困难,繁荣拖沓的各种程序.以前我博客程序用的是PJBLOG,本地的IIS就完全可以 ...

  3. mysql数据库下载 织梦本地安装教程_织梦本地环境包安装数据库管理界面phpMyAdmin...

    用织梦环境安装包搭建本地PHP环境但是却不能登陆数据库后台. 那么我们真的就没有办法了吗?回答是否定的!我们可以在本地安装phpmyadmin. phpMyAdmin 是一个以PHP为基础,以Web- ...

  4. zookeeper本地下载与安装

    zookeeper下载与安装 1. 下载 从官网上下载:Apache ZooKeeper 2.  下载本地后进行解压,其结构如下,不同版本略有不同 3. 修改配置信息 进入目录中的conf目录,有一个 ...

  5. SparkStreaming+kafka+flume+hbase日志实时流处理项目

    1.项目背景: 互联网访问日志概述 为什么要记录访问日志的行为呢? 通过日志我们可以得到网站页面的访问量,网站的黏性,推荐用户行为分析,是指在获得网站访问量基本数据的情况下,对有关数据进行统计.分析, ...

  6. 使用 Docker 搭建 Laravel 本地环境

    (原文地址:https://blog.tanteng.me/2017/...) Laravel 官方提供 Homestead 和 Valet 作为本地开发环境,Homestead 是一个官方预封装的 ...

  7. 若依微服务版RuoYi-Cloud本地环境快速搭建教程

    参考若依官网文档:https://doc.ruoyi.vip/ruoyi-cloud/ 具体步骤: 一.下载RuoYi-Cloud源码 源码官方地址:https://gitee.com/y_proje ...

  8. Zookeeper和Kafka window环境下安装、使用

    Zookeeper和Kafka window环境下安装.使用 一,Zookeeper windows下的安装以及使用 命令 1,下载并安装在windows上 2,配置zookeeper 3,启动zoo ...

  9. 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)

    高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...

  10. Zookeeper本地安装配置(windows)

    1. 下载zookeeper,下载地址: https://www.apache.org/dyn/closer.cgi/zookeeper/ 2. 解压后,进入目录中的conf目录,有一个zoo_sam ...

最新文章

  1. bower解决js的依赖管理
  2. 房地产企业大量海外融资,或将来资不抵债,被收购
  3. GRUB and LVM and EVMS
  4. c语言通用Makefile
  5. github 提交报403 forbidden的错误解决
  6. tensorflow-RNN和LSTM
  7. Arduino开发板制作
  8. python requests cookies请求_python+requests实现接口测试 - cookies的使用
  9. 电脑版剪映v0.6.9内测版
  10. 代码手写UI,xib和StoryBoard间的博弈,以及Interface Builder的一些小技巧
  11. 惠普T5325 惠普T5565 惠普T5400 瘦客机评测
  12. 5s管理推进的三个阶段及三大实施原则
  13. 拖拉机大战更新了,更多新功能
  14. 扫福得福背后,支付宝AR红包的技术创新与故事
  15. 纳米数据,足球比分,赛事数据接口api,足球数据接口
  16. 基于PHP的人才招聘网站设计
  17. 将特殊字体添加到了html页面中
  18. 定点数的运算 —— 原码、补码的乘法运算
  19. 初识C语言之详解char类型
  20. Tensorflow(03)——keras和tensorflow的关系

热门文章

  1. 冲刺第七天 12.3 MON
  2. WINDOWS 2008Server 配置nginx 反向代理服务器 安装成服务
  3. MacFree ePlicy Orchestrator
  4. Http压力测试工具HttpTest4Net
  5. Google gae部署php简单说明
  6. idea自动生成unit test插件
  7. 前端基础知识复习之html
  8. angularjs外部文件中的控制器使用
  9. 天猫运动户外狂欢日来了!700多个大牌要如何回馈消费者
  10. Webpack+Babel+React环境搭建