一 运行环境

从阿里云申请三台云服务器,这里我使用了两个不同的阿里云账号去申请云服务器。我们配置三台主机名分别为zy1,zy2,zy3。

我们通过阿里云可以获取主机的公网ip地址,如下:

通过secureRCT连接主机106.15.74.155,运行ifconfig,可以查看其内网ip地址:

1、账号1申请了两台云服务器:

主机zy1的公网ip为:106.15.74.155,内网ip为172.19.182.67。

主机zy2的公网ip为:47.103.134.70,内网ip为172.19.14.178。

2、账号2申请了一台云服务器:

主机zy3的公网ip为:47.97.10.51,内网ip为172.16.229.255。

3、阿里云入规则配置

由于主机位于不同的局域网下,因此需要进行一个公网端口到内网端口的映射。在搭建zookeeper和kafka需要使用到2181,2888 ,3888,9092端口。需要在阿里云中配置入规则,具体可以参考阿里云官方收藏:同一个地域、不同账号下的实例实现内网互通 。

注意:如果7.103.134.70配置一个入端口3888,那么对该47.103.134.70:3888的访问会实际映射到172.19.14.178:3888下。如果是同一局域网下的两个主机,是不需要配置这个的,可以直接互4通。

如果想了解更多,可以参考以下博客:

通过SSH访问阿里云服务器的原理可以参考-用SSH访问内网主机的方法

云服务器主机内网ip和外网ip的区别

一台阿里云2台腾讯云服务器搭建Hadoop集群

4、配置/etc/hosts

以主机zy1为例:配置如下:

注意zy1对应的ip需要配置为内网ip,也就是本机ip:172.19.182.67。而zy2、zy3配置的都是公网ip。

二 JDK安装

在每个主机下执行以下操作:

1、安装之前先查看一下有无系统自带jdk

rpm -qa |grep javarpm -qa |grep jdkrpm -qa |grep gcj

如果有就使用批量卸载命令

rpm -qa | grep java | xargs rpm -e --nodeps 

2、直接yum安装1.8.0版本openjdk

yum install java-1.8.0-openjdk* -y

默认jre jdk 安装路径是/usr/lib/jvm 下面:

3、配置环境变量

vim /etc/profile
#set java environment   , appendexport JAVA_HOME=/usr/lib/jvm/javaexport CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/jre/lib/rt.jarexport PATH=$PATH:$JAVA_HOME/bin

使得配置生效

. /etc/profile

4、查看版本

echo $JAVA_HOME
echo $CLASSPATH
java -version

三 安装zookeeper

在主机zy1下面执行以下操作:

1、下载并解压

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

创建目录/opt/bigdata:

mkdir /opt/bigdata

解压文件到/opt/bigdata:

tar -zxvf  zookeeper-3.4.13.tar.gz  -C /opt/bigdata

跳转目录:

cd /opt/bigdata/zookeeper-3.4.13/

2、复制配置文件

cp conf/zoo_sample.cfg  conf/zoo.cfg

修改配置文件如下:

vim conf/zoo.cfg

其中部分参数意义如下:

  • server.1=zy1:2888:3888:server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里。第一个端口是master和slave之间的通信端口,默认是2888,第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888
  • dataDir:快照日志的存储路径。
  • dataLogDir:事物日志的存储路径,如果不配置这个那么事物日志会默认存储到dataDir制定的目录,这样会严重影响zk的性能,当zk吞吐量较大的时候,产生的事物日志、快照日志太多。
  • clientPort:这个端口就是客户端连接 zookeeper 服务器的端口,zookeeper 会监听这个端口,接受客户端的访问请求。

3、创建myid文件

创建/opt/bigdata/data/zookeeper/zkdata:

mkdir -vp /opt/bigdata/data/zookeeper/zkdata

创建myid文件:

echo 1 > /opt/bigdata/data/zookeeper/zkdata/myid

4、拷贝zookeeper到主机zy2、zy3

scp -r /opt/bigdata/zookeeper-3.4.13/ zy2:/opt/bigdata/
scp -r /opt/bigdata/zookeeper-3.4.13/ zy3:/opt/bigdata/

5、创建主机zy2、zy3的myid文件

zyx主机:

创建/opt/bigdata/data/zookeeper/zkdata:

mkdir -vp /opt/bigdata/data/zookeeper/zkdata

创建myid文件:

echo x > /opt/bigdata/data/zookeeper/zkdata/myid

注意:x表示主机的编号。

6、配置环境变量(每个主机都需要配置)

vim /etc/profile
#set java environment   , appendexport ZOOKEEPER_HOME=/opt/bigdata/zookeeper-3.4.13export PATH=$ZOOKEEPER_HOME/bin:$PATH

使得配置生效

. /etc/profile

7、启动服务并查看

进入到zookeeper目录下,在每个主机下分别执行

cd /opt/bigdata/zookeeper-3.4.13
bin/zkServer.sh start

检查服务状态

bin/zkServer.sh status

可以用“jps”查看zk的进程,这个是zk的整个工程的main

jps

注意:zk集群一般只有一个leader,多个follower,主一般是相应客户端的读写请求,而从主同步数据,当主挂掉之后就会从follower里投票选举一个leader出来。

8、客户端连接

zookeeper服务开启后,进入客户端的命令:

zkCli.sh

更多常用命令参考博客:Kafka在zookeeper中存储结构和查看方式。

9、出现错误常用排错手段

1、防火墙

防火墙没有关闭问题。解决方式参考:https://blog.csdn.net/weiyongle1996/article/details/73733228

2、端口没有开启

如果/etc/hosts全部配置为公网:在zy1运行zkServer.sh start,查看端口开启状态:

netstat -an | grep 3888

则会发现无法开启公网3888端口,我们应该打开的是内网机器对应的端口。
如果端口已经开启,可以通过telnet ip por判断该端口是否可以从外部访问。

四 安装kafka

在主机zy1下执行:

1、下载并解压

wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.1.1/kafka_2.12-2.1.1.tgz
tar -zxvf kafka_2.12-2.1.1.tgz  -C /opt/bigdata

重命名:

cd /opt/bigdata/
mv kafka_2.12-2.1.1 kafka

2、修改kafka配置文件

 在/opt/bigdata/kafka下:

vim config/server.properties

各个参数意义:

  • broker.id=1  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样;
  • listeners=PLAINTEXT://主机:9092  #当前kafka对外提供服务的主机:端口(默认是9092);
  • num.network.threads=3  #这个是borker进行网络处理的线程数;
  • num.io.threads=8 #这个是borker进行I/O处理的线程数;
  • log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个;
  • socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能;
  • socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘;
  • socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小;
  • num.partitions=1 #默认的分区数,一个topic默认1个分区数;
  • log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天;
  • message.max.byte=5242880  #消息保存的最大值5M;
  • default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务;
  • replica.fetch.max.bytes=5242880  #取消息的最大直接数;
  • log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件;
  • log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除;
  • log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能;
  • zookeeper.connect=xx:12181,xx:12181,xx:12181#设置zookeeper的连接端口;

注意,这里如果希望在java中创建topic也是多个备份,需要添加一下属性

#default replication factors for automatically created topics,默认值1;

default.replication.factor=3

#When a producer sets acks to "all" (or "-1"), this configuration specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful.

#min.insync.replicas and acks allow you to enforce greater durability guarantees,默认值1;

min.insync.replicas=3

上面是参数的解释,实际的修改项为:

broker.id=1

listeners=PLAINTEXT://zy1:9092           #内网地址

advertised.listeners=PLAINTEXT://106.15.74.155:9092   #公网地址(不然远程客户端无法访问)

log.dirs=/opt/bigdata/kafka/kafka-logs

#此外,可以在log.retention.hours=168 下面新增下面三项:

message.max.byte=5242880

default.replication.factor=2

replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口

zookeeper.connect=zy1:2181,zy2:2181,zy3:2181 

如果我们需要删除topic,还需要配置一下内容:

delete.topic.enable=true

具体参考博客:kafka安装及删除Topic,Kafka0.8.2.1删除topic逻辑。

3、复制kafka到zy2、zy3

scp -r /opt/bigdata/kafka zy2:/opt/bigdata/scp -r /opt/bigdata/kafka zy3:/opt/bigdata/

4、修改zy2、zy3的配置文件server.properties

拷贝文件过去的其他两个节点需要更改broker.id和listeners,以zy2为例:

5、启动kafka

我们可以根据Kafka内带的zk集群来启动,但是建议使用独立的zk集群:

zkServer.sh start

在/opt/bigdata/kafka下 ,三个节点分别执行如下命令,启动kafka集群:
bin/kafka-server-start.sh config/server.properties &

运行命令后服务确实后台启动了,但日志会打印在控制台,而且关掉命令行窗口,服务就会随之停止,这个让我挺困惑的。后来,参考了其他的启动脚本,通过测试和调试最终找到了完全满足要求的命令。

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

其中1>/dev/null 2>&1 是将命令产生的输入和错误都输入到空设备,也就是不输出的意思。/dev/null代表空设备。

注意:如果内存不足:打开kafka安装位置,在bin目录下找到kafka-server-start.sh文件,将export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"修改为export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"。

6、验证

思路:以下给出几条kafka指令。创建一个topic,一个节点作为生产者,两个节点作为消费者分别看看能否接收数据,进行验证:

创建及查看topic:

cd /opt/big/data/kafka
bin/kafka-topics.sh -list -zookeeper  zy1:2181
bin/kafka-topics.sh --create --zookeeper  zy1:2181 --replication-factor 3 --partitions 3  --topic zy-test

开启生产者:

bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test

开启消费者:

bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning 

节点zy1产生消息,如果消息没有清理,在节点zy2、zy3都可以接收到消息。

7、更多kafka命令

以下是kafka常用命令行总结:

查看topic的详细信息  

 bin/kafka-topics.sh -zookeeper zy1:2181 --describe --topic zy-test

可以看到topic包含3个复本,每个副本又分为三个partition。以zy-test:partition0为例,其leader保存在broker.id=1的主机上,副本保存在2、3节点上。其消息保存在配置参数log.dirs所指定的路径下:

为topic增加副本  

bin/kafka-reassign-partitions.sh --zookeeper zy1:2181 --reassignment-json-file  json/partitions-to-move.json -execute  

创建topic 

bin/kafka-topics.sh --create --zookeeper  zy1:2181 --replication-factor 3 --partitions 3  --topic zy-test

为topic增加partition  

bin/kafka-topics.sh –-zookeeper zy1:2181 –-alter –-partitions 3 –-topic  zy-test

kafka生产者客户端命令  

bin/kafka-console-producer.sh --broker-list zy1:9092 --topic zy-test

kafka消费者客户端命令  

bin/kafka-console-consumer.sh --bootstrap-server zy2:9092 --topic zy-test --from-beginning 

kafka服务启动  

bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

删除topic  

 bin/kafka-topics.sh --zookeeper zy1:2181 --delete --topic  zy-test

8、关闭kafka

bin/kafka-server-stop.sh

五 consumer offsets

由于Zookeeper并不适合大批量的频繁写入操作,新版Kafka已推荐将consumer的位移信息保存在kafka内部的topic中,即__consumer_offsets topic,并且默认提供了kafka_consumer_groups.sh脚本供用户查看consumer信息。

1、获取消息在topic中的记录信息

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list zy1:9092 --topic zy-test

输出结果每个字段分别表示topic、partition、untilOffset(当前partition的最大偏移);

上面的输出结果表明kafka队列总有产生过4条消息(这并不代表kafka队列现在一定有4条消息,因为kafka有两种策略可以删除旧数据:基于时间、基于大小)。

由于我使用kafka-console-producer.sh生成了四条消息:zy、19941108、 liuyan、1。因此kafka消息队列中存在4条消息。

2、创建一个console consumer group

bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --consumer.config config/consumer.properties --topic zy-test  --from-beginning

再次建立消费者:

会发现获取不到数据。这是因为我们指定了消费组,第一次消费时从offset为0开始消费,把4条消息全部读出,此时offset移动到最后,当再次使用同一消费组读取数据,则会从上次的offset开始获取数据。

而使用:

bin/kafka-console-consumer.sh --bootstrap-server zy1:9092 --topic zy-test --from-beginning 

每次都会获取四条数据,这是因为每次都会创建一个新的消费者,这些消费者会被随机分配到一个不同的组,因此每次都是从offset为0开始消费。

参数解释:

3、 获取该consumer group的group id

bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --list  

可以看到有三个消费组,前两个消费者没有指定消费组,随机产生一个console-consumer-***的group.ig。

第三个是我们刚刚在config/consumer.properties 中指定的消费组。

4、查看消费者组的offset

 bin/kafka-consumer-groups.sh --bootstrap-server zy1:9092 --describe --group test-consumer-group

如果此时再使用生产者客户端生成两条消息:

再次查看消费组test-consumer-group的消费情况:

5、KAFKA API指定位移消费

由于我们还没有介绍KAFKA的API,这块内容就不先介绍,具体参考博客:Kafka消费者 之 指定位移消费。

六 各个端口作用

  • 2888:zookeeper集群三台主机心跳端口;
  • 3888:zookeeper集群三台主机选取leader端口,防止其中一个宕机了;
  • 2181:zookeeper服务器监听端口,等待消费者连接,消费者可以从中获取topic分区以及消费offset等信息(高版本消费者offset已经保存在kafka内部的topic中了);
  • 9092:kafka服务器绑定端口,等待生产者和消费者连接;

因此上面介绍的kafka命令,与topic相关的使用--zookeeper zy1:2181,与生产者、消费者相关的使用 --bootstrap-server zy1:9092。

参考博客:

[1]kafka和zookeeper集群搭建详细步骤

[2]yum安装jdk环境变量配置

[3]使用命令读取kafka的内部topic:__consumer_offsets

[4]kafka学习笔记:知识点整理 - cyfonly - 博客园(推荐)

转载于:https://www.cnblogs.com/zyly/p/11327605.html

大数据 -- zookeeper和kafka集群环境搭建相关推荐

  1. 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)

    高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...

  2. 融云发送自定义消息_数据源管理 | Kafka集群环境搭建,消息存储机制详解

    一.Kafka集群环境 1.环境版本 版本:kafka2.11,zookeeper3.4 注意:这里zookeeper3.4也是基于集群模式部署. 2.解压重命名 tar -zxvf kafka_2. ...

  3. 数据源管理 | Kafka集群环境搭建,消息存储机制详解

    本文源码:GitHub·点这里 || GitEE·点这里 一.Kafka集群环境 1.环境版本 版本:kafka2.11,zookeeper3.4 注意:这里zookeeper3.4也是基于集群模式部 ...

  4. Kafka集群环境搭建

    首先准备至少三台虚拟机. 每台虚拟机解压下载好的kafka压缩包并重命名 cd /usr/local wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/ ...

  5. 大数据 -- Hadoop集群环境搭建

    首先我们来认识一下HDFS, HDFS(Hadoop Distributed File System )Hadoop分布式文件系统.它其实是将一个大文件分成若干块保存在不同服务器的多个节点中.通过联网 ...

  6. 大数据第一天 Hadoop01-入门集群环境搭建

    Hadoop01-入门&集群环境搭建 今日内容 ​ Hadoop的介绍 ​ 集群环境搭建准备工作 ​ Linux命令和Shell脚本增强 ​ 集群环境搭建 大数据概述 ​ 大数据: 就是对海量 ...

  7. 大数据介绍、集群环境搭建、Hadoop介绍、HDFS入门介绍

    大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 文章目录 大数据介绍.集群环境搭建.Hadoop介绍.HDFS入门介绍 1.课前资料 2.课程整体介绍 3.大数据介绍 3.1 什么是大数 ...

  8. 王家林的“云计算分布式大数据Hadoop实战高手之路---从零开始”的第五讲Hadoop图文训练课程:解决典型Hadoop分布式集群环境搭建问题

    王家林的"云计算分布式大数据Hadoop实战高手之路---从零开始"的第五讲Hadoop图文训练课程:解决典型Hadoop分布式集群环境搭建问题 参考文章: (1)王家林的&quo ...

  9. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

最新文章

  1. WPF使用X:Static做多语言支持
  2. Android存储路径你了解多少?
  3. zing jvm_Zing加快了JVM应用程序的预热
  4. mysql 闪回到指定时间_oracle数据库表恢复到特定时间点
  5. linux开机到登陆的流程图,1-描述linux开机到登陆界面的启动过程.docx
  6. 一步一步写算法(之“数星星”)
  7. SAP License:SAP复制和默认快捷小贴士
  8. mac虚拟机linux性能测试,Veertu 1.0.4 – Mac上最轻便的虚拟机,可以运行Windows/Linux。...
  9. Go 单元测试从 0 到 1
  10. PowerShell Format-Table的细节(AutoSize和Wrap参数)
  11. har文件分析http_如何使用HAR文件分析一段时间内的性能
  12. 高等代数(第四版) 王萼芳、石生明编|高等教育出版社 大学课后习题答案
  13. Altium Designer使用-----智能pdf输出装配图
  14. MATLAB 插值与拟合
  15. 计算机英语二国家线,历年考研英语国家线一览表
  16. Python 使用itchat 获取微信好友信息并解析(性别 区域 头像 签名等)
  17. linux排行榜网站,DistroWatch网站搞的Linux国际排名为它赢得了不少的流量
  18. GitHub上README.md排版样式教程
  19. c语言常考的笔试题1
  20. html5什么网站知乎,老司机秒懂的5个资源网站,知乎超过10万人推荐,竟被我挖掘到了...

热门文章

  1. TypeScript的非空断言操作符(感叹号)
  2. jQuery.each() 和原生JS的for loop效率对比
  3. python selenium ide使用_第 2 章 Selenium IDE 的使用 Selenium 3+Python 3 自动化测试
  4. 带电插拔损坏设备原理_热水器不用时,要不要拔插座?看完才知道,天天拔比不拔更危险...
  5. php分页采集数据,php实现分页调取数据库记录
  6. tensorflow生成图片标签_Tensorboard高维向量可视化 + 解决标签和图片不显示BUG
  7. plsql导出表结构到excel_plsql 将表结构导出到excel中的两种方式
  8. conda常用命令汇总
  9. FIR设置过采样率 matlab,Xilinx FIR IP的介绍与仿真
  10. 计算机网络电信号误差,用0V~5V方式传输远方温度信号的弊端