大数据处理工具Kafka、Zk、Spark
搭建kafka和zk集群环境
安装环境
MAC操作系统VMware Fusion虚拟机3个centos7服务器
安装虚拟机 飞机票 安装虚拟机Centos系统并安装Docker过程记录
安装包下载
https://kafka.apache.org/downloads.html
服务器环境准备
安装文件上传工具
yum install lrzsz
查看服务器ip
ip addr show
centos-1 192.168.84.128 centos-2 192.168.84.129centos-3 192.168.84.130
通过ssh工具连接
这个工具很好用
先安装下centos-1服务器环境
上传kafka安装包
mkdir /opt/kafka通过rz将压缩包上传kafka_2.10-0.8.2.1.tgz解压tar xvf kafka_2.10-0.8.2.1.tgz
创建zk目录
创建zk数据目录 并设定服务器编号mkdir /opt/zk_datacd /opt/zk_datavi myid该文件内容为1、2、3分别对应centos-1、centos-2、centos-3
配置zk
kafka安装包中内置zk服务
配置zookeeper.properties
vi /opt/kafka/kafka_2.10-0.8.2.1/config/zookeeper.properties
# zk服务器之间的心跳时间间隔 以毫秒为单位tickTime=2000# zk 数据保存目录 zk服务器的ID文件也保存到这个目录下dataDir=/opt/zk_data/# zk服务器监听这个端口 然后等待客户端连接clientPort=2181# zk集群中follower服务器和leader服务器之间建立# 初始连接时所能容忍的心跳次数的极限值initLimit=5# zk集群中follower服务器和leader服务器之间请求和应答过程中所能容忍的心跳次数的极限值syncLimit=2# server.N N代表zk集群服务器的编号# 服务器IP地址:该服务器于leader服务器的数据交换端口:选举leader服务器时用到的通信端口server.1=192.168.84.128:2888:3888server.2=192.168.84.129:2888:3888server.3=192.168.84.130:2888:3888
配置kafka
配置kafka broker
mkdir /opt/kafka/kafka-logs
vi /opt/kafka/kafka_2.10-0.8.2.1/config/server.properties
#kafka broker的唯一标识 集群中不能重复broker.id=0# broker监听端口 用于监听producer或者consumer的连接port=9092# 当前broker服务器 ip地址或机器名host.name=192.168.84.128#broker作为zk的client 可以连接的zk的地址信息zookeeper.contact=192.168.84.128:2181,192.168.84.129:2181,192.168.84.130:2181# 日志保存目录log.dirs=/opt/kafka/kafka-logs
配置broker地址列表
vi /opt/kafka/kafka_2.10-0.8.2.1/config/producer.properties
# 集群中的broker地址列表broker.list=192.168.84.128:9092,192.168.84.128:9092,192.168.84.128:9092# Producer类型 async 异步生产者 sync 同步生产者producer.type=async
配置consumer
vi /opt/kafka/kafka_2.10-0.8.2.1/config/consumer.properties
# consumer可以连接的zk服务器地址列表zookeeper.contact=192.168.84.128:2181,192.168.84.128:2181,192.168.84.128:2181
打包配置好的kafka安装包并上传到其他服务器
tar cvf kafka_2.10-0.8.2.1.tar ./kafka_2.10-0.8.2.1
得到kafka_2.10-0.8.2.1.tar
scp ./kafka_2.10-0.8.2.1.tar root@192.168.84.129:/opt/kafkascp ./kafka_2.10-0.8.2.1.tar root@192.168.84.130:/opt/kafka
传到centos-2和centos-3之后
分别操作
解压tar xvf kafka_2.10-0.8.2.1.tar
vi /opt/kafka/kafka_2.10-0.8.2.1/config/server.properties
文件中的 broker.id 和 host.name
broker.id,可以分别复制 1 和 2host.name 需要改成当前机器的 IP
安装jdk1.8
每个服务器都需要安装java环境
切换阿里云源
mv /etc/yum.repos.d/CentOS-Base.repo /etc/yum.repos.d/CentOS-Base.repo.backup
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
或者
curl -o /etc/yum.repos.d/CentOS-Base.repo http://mirrors.aliyun.com/repo/Centos-7.repo
清除缓存
yum makecache
备注
OpenJDK Development Environment:开发版本带JDK
不要下载 Open JDK runtime Environment只有JRE
安装jdk1.8
yum -y install java-1.8.0-openjdk-devel.x86_64
安装路径/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.272.b10-1.el7_9.x86_64
全局环境变量配置
vi /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.272.b10-1.el7_9.x86_64export PATH=$JAVA_HOME/bin:$PATH export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
. /etc/profile
关闭防火墙
报错信息:
2020-09-14 03:28:23,562 [myid:0] - WARN [WorkerSender[myid=0]:QuorumCnxManager@588] - Cannot open channel to 3 at election address h6/192.168.1.16:3888java.net.ConnectException: 拒绝连接 (Connection refused)
此时需要关闭防火墙
sudo systemctl stop firewalld #临时关闭sudo systemctl disable firewalld #然后reboot 永久关闭sudo systemctl status firewalld #查看防火墙状态
分别启动zk
mkdir /opt/kafka/runmkdir /opt/kafka/run/kafkamkdir /opt/kafka/run/zkcd /opt/kafka/run/zk
nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/zookeeper.properties &
分别启动kafka
cd /opt/kafka/run/kafka
nohup /opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /opt/kafka/kafka_2.10-0.8.2.1/config/server.properties &
查看kafka和zk进程是否启动
ps -ef | grep kafka
验证kafka、zk环境是否可用
创建消息主题
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create \--replication-factor 3 \--partition 3 \--topic user-behavior-topic \--zookeeper 192.168.84.128:2181,192.168.84.129:2181,192.168.84.130:2181
通过console producer生产消息
启动console producer
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 192.168.84.128:9092 --topic user-behavior-topic
通过console consumer消费消息
在另一台机器打开consumer
/opt/kafka/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 192.168.84.129:2181 --topic user-behavior-topic --from-beginning
如果在producer console输入一条消息 能从consumer console看到这条消息就代表安装是成功的
centos-1 生产消息
centos-2 消费消息
说明kafka和zk集群环境是可用的
Spark
安装包下载
https://www.apache.org/dyn/closer.lua/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz
tar xvf spark-3.0.1-bin-hadoop2.7.tgz
依赖条件
jdk1.8python 2.7.5
部署方式
1、 底层资源调度可以依赖外部的资源调度框架:相对稳定的Mesos、Hadoop YARN模式
2、使用spark内建的standalone模式
Local[N]本地模式 使用N个线程
Local Cluster[Worker,core,Memory]
伪分布式 可以配置所需要启动的虚拟工作节点的数量
以及每个工作节点所管理的CPU数量和内存尺寸
Spark://hostname:port:Standalone模式
Yarn client
主程序运行在本地 具体任务运行在yarn集群
YARN standalone/Yarn cluster
主程序逻辑和任务都运行在YARN集群中
需要部署Spark到相关节点
URL为Spark Master主机地址和端口
Mesos://hostname:port:Mesos 模式
需要部署Spark和Mesos到相关节点
URL为Mesos主机地址和端口
上面的部署方式:实际应用中spark应用程序的运行模式取决于传递给 sparkcontext的master环境变量的值
个别模式还需要依赖辅助程序接口来配合使用
示例代码
examples/src/main
运行脚本
bin/run-example[params]
计算PI
spark-3.0.1-bin-hadoop2.7/bin/run-example SparkPi 10 > Sparkpilog.txt
日志包含两部分
一部分是通用日志信息由一系列脚本及程序产生(计算机信息、spark信息)
另一部分是运行程序的输出结果
计算词数
假设有一个数据文件wordcountdata.txt统计该文件单词出现的个数spark-3.0.1-bin-hadoop2.7/bin/run-example JavaWordCount ./wordcountdata.txt
RDD
一个spark的任务对应一个RDD
RDD是弹性分布式数据集即一个RDD代表一个被分区的只读数据集
一个RDD生成
可以来自于内存集合和外部系统也可通过转换操作来自于其他RDD map filter join
脚本的调用过程
Run-example.sh->load-spark-env.sh->lib 目录下的 jar 包文件->spark-submit.sh->spark-class
Scala
1、scala最终启动的是jvm线程所以它可以访问java的库文件 例如java.io.File
2、通过Main函数的方式启动了一个JVM进程随后针对该进程又托管了一系列线程级别的操作
3、scala 简单 轻巧 相对java 非常适合并行计算框架的编写
运行过程
函数
map
根据现有数据集返回一个新的分布式数据集
由于每个原元素经过func函数转换后组成
flatMap
每一个输入函数 会被影射为0到多个输出函数
返回值是一个Seq 而不是单一元素
reduceByKey
在一个(K,V)对的数据集上使用返回一个(K,V)对的数据集
Key相同都会被指定的reduce聚合在一起
总体工作流程
无论本地模式还是分布式模式
内部程序逻辑结构都是类似的只是其中部分模块有所简化
本地模式中 集群管理模块被简化为进程内部的线程池
spark环境部署
使用docker部署 spark集群
飞机票安装SBT环境运行Scala项目
结尾
下篇文章通过一个实际案例来介绍下如何使用 spark streaming
案例描述:
假设某论坛需要根据用户对站内网页的点击量,停留时间,以及是否点赞,来近实时的计算网页热度,进而动态的更新网站的今日热点模块,把最热话题的链接显示其中。
大数据处理工具Kafka、Zk、Spark相关推荐
- [转]开源大数据处理工具汇总
查询引擎 一.Phoenix 贡献者::Salesforce 简介:这是一个Java中间层,可以让开发者在Apache HBase上执行SQL查询.Phoenix完全使用Java编写,代码位于GitH ...
- 一共81个,开源大数据处理工具汇总
作者:大数据女神-诺蓝(微信公号:dashujunvshen).本文是36大数据专稿,转载必须标明来源36大数据. 本文一共分为上下两部分.我们将针对大数据开源工具不同的用处来进行分类,并且附上了官网 ...
- 一共81个,开源大数据处理工具汇总(下)转
作者:大数据女神-诺蓝(微信公号:dashujunvshen).本文是36大数据专稿,转载必须标明来源36大数据. 接上一部分:一共81个,开源大数据处理工具汇总(上),第二部分主要收集整理的内容主要 ...
- 大数据处理工具有哪些?
大数据工具可以帮助大数据工作人员进行日常的大数据工作,以下是大数据工作中常用的工具: 1. Hivemall Hivemall结合了面向Hive的多种机器学习算法.它包括诸多高度扩展性算法,可用于数据 ...
- (转)一共81个,开源大数据处理工具汇总
[思路网注] 本文一共分为上下两部分.我们将针对大数据开源工具不同的用处来进行分类,并且附上了官网和部分下载链接,希望能给做大数据的朋友做个参考. 本文一共分为上下两部分.我们将针对大数据开源工具不同 ...
- 有哪些大数据处理工具?
简介:近几年里,大数据行业发展势头迅猛,故而相应的分布式产品和架构层出不穷,本文分享作者在大数据系统实践过程中接触过的一些工具及使用感受,抛砖引玉,和同学们一起构建一个分布式产品的全景图. 下图是由著 ...
- 盘点 | 有哪些大数据处理工具?
导读:近几年里,大数据行业发展势头迅猛,故而相应的分布式产品和架构层出不穷,本文分享作者在大数据系统实践过程中接触过的一些工具及使用感受,抛砖引玉,和同学们一起构建一个分布式产品的全景图. 下图是由著 ...
- 开源大数据处理工具汇总
类别 名称 官网 备注 查询引擎 Phoenix http://phoenix.incubator.apache.org/ Salesforce公司出品,Apache HBase之上的一个SQL中间层 ...
- 大数据工具kafka
最新文章
- easyScholar——文献数据库插件
- 星空下的痕迹 Jenkins学习(四)----------windows下Publish over FTP插件应用
- 终于给cs来了一次小整容
- 8)Thymeleaf 基本对象表达式
- 【三维路径规划】基于matlab改进粒子滤波无人机三维路径规划【含Matlab源码 1269期】
- 活动报名场地预约自定义表单小程序开发
- 全国(全球)快递查询物流查询API,物流信息追踪接口
- catia中的螺旋伞齿轮画法_聚焦:螺旋伞齿轮画法要领
- 威纶通触摸屏锁机程序模板 系统共设置有12期分期付款,可以每期设置需要分期付款的时间
- 2021年CCPC网络预选赛重赛补题
- Day06_动态组件_插槽_自定义指令_tabbar案例
- php人民币小写转大写函数
- 云游戏能成为5G第一个杀手级应用吗?
- 几个摄像头和雷达融合的目标检测方法
- 机器人d435建图全是障碍物的原因分析
- 微型计算机原理与接口技术ppt,单片机原理与接口技术课件ppt
- 动态文字闪图怎么做?手把手教你在线做动态闪图
- ISO9001标准文档模版-测试计划
- break能跳出几层
- 10.1登录成功提示、加载提示
热门文章
- 数据结构(二)——堆
- 松原哪家计算机学校好,松原高中学校排名2021最新排名,松原高中排名前十
- 启动修复可以尝试将您的计算机还原到,如何修复Windows Bootloader问题(如果您的计算机不启动) | MOS86...
- bootstrap bootstraptable 固定列_BootStrapTable分页
- docker 安装wordpress
- 基础 | numpy ndarray 之内功心法,理解高维操作!
- 网络事件触发自己主动登录
- groovy学习笔记 - 目录
- 我的第一个wp8小程序
- WebBrowser一点心得,如果在Javascript和Winform代码之间实现双向通信