Kafka集群部署

概述

之前的大数据集群主要是离线处理的方式对集群的数据进行开发处理。当前的集群数据量已经达到了PB级别了,离线数据获取主要是从数仓侧进行全量或者增量的方式导入大数据平台,部分是通过SFTP的方式解析进入大数据平台,少量数据是通过接口的方式准实时接入到大数据平台。随着业务的发展,对于实时数据的接入和应用显得越来越重要了,接下来的时间会一直更新整个时间数据接入和应用的分享。

我们都是知道kafak是一种分布式的,基于发布/订阅的消息系统。以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能、高吞吐率。这个是非常符合当前接入实时数据进行处理的一个组件。

为什么说kafka是非常适合我当前的场景呢?当前市场kafka的应用场景常见有:消息系统、跟踪网站活动、运营指标、日志聚合、流处理、采集日志、提交日志。kafka具有以下的以下优点(特性):

· 高吞吐、低延迟:kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒。

· 可扩展性:kafka 集群支持热扩展

· 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

· 容错性:允许集群中节点失败

· 高并发:支持数千个客户端同时读写

即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输。支持Kafka Server间的消息分区,及分布式消费,同时保证每个Partition内的消息顺序传输。同时支持离线数据处理(hive、HBASE)和实时数据处理(spark、storm)。

话不多说,开启我实际安装过程。如下的安装步骤都是可以在虚拟机里面直接复制使用的。温馨提醒,如果是在实际的生产环境进行部署的话,请记得挂着磁盘。

希望对你们有帮助!!!其中标红的部分,新手要特别注意,大神们就忽略了。也欢迎大家留言指正!

一、集群规划

二、集群环境准备

以下步骤无特殊说明则各节点均需执行

1)关闭防火墙(三台)

# 需要用root用户执行

service iptables stop

chkconfig iptables off

2)关闭selinux(三台)

vim /etc/selinux/config

# 注释下面一行

#SELINUX=enforcing

# 添加下面一行

SELINUX=disabled

3)配置域名映射(三台)

vim /etc/hosts

# 在末尾添加以下几行,注意填写实际IP及hostname

IP01 hostname01 kafka01

IP02 hostname02 kafka02

IP03 hostname03 kafka03

重启机器

reboot -h now

4)配置免密登录

# 生成公钥与私钥, 执行以下命令后按三次回车键即可

ssh-keygen -t rsa

所有节点拷贝公钥到kafka01

ssh-copy-id kafka01

复制kafka01认证到其他机器

在kafka01执行

scp /root/.ssh/authorized_keys kafka02:/root/.ssh

scp /root/.ssh/authorized_keys kafka03:/root/.ssh

若中间出错,100%后面不为1212 (404X3,n个节点为404Xn),可以通过机器互ping验证

5)时间同步

在kafka01执行

确认是否安装ntpd服务

rpm -qa | grep ntp

若未安装,在线安装

yum install -y ntp

设置ntpd服务开机启动

chkconfig ntpd on

修改配置文件

vim /etc/ntp.conf

# 添加下面一行,注意修改子网IP,如192.168.11.0

restrict 子网IP mask 255.255.255.0 nomodify notrap

# 注释下面四行

#server 0.centos.pool.ntp.org

#server 1.centos.pool.ntp.org

#server 2.centos.pool.ntp.org

#server 3.centos.pool.ntp.org

# 取消注释或添加下面两行

server 127.127.1.0 # local clock

fudge 127.127.1.0 stratum 10

配置以下内容,保证BIOS与系统时间同步

vim /etc/sysconfig/ntpd

# 添加下面一行

SYNC_HWLOCK=yes

在其他节点执行

配置另外几台与kafka01时间同步

crontab -e

*/1 * * * * /usr/sbin/ntpdate kafka01

6)安装jdk(三台)

查看并卸载自带的openjdk

rpm -qa | grep java

rpm -e java-1.6.0-openjdk-1.6.0.41-1.13.13.1.el6_8.x86_64 tzdata-java-2016j-1.el6.noarch java-1.7.0-openjdk-1.7.0.131-2.6.9.0.el6_8.x86_64 --kafkaps

创建文件夹

# 用于存放安装包

mkdir -p /export/software/

# 用于安装软件

mkdir -p /export/servers/

上传到/export/software/并解压jdk

cd /export/software/

tar -zxvf jdk-8u141-linux-x64.tar.gz -C ../servers/

配置环境变量

vim /etc/profile

# JAVA_HOME

export JAVA_HOME=/export/servers/jdk1.8.0_141

export PATH=:$JAVA_HOME/bin:$PATH

source /etc/profile

验证

java -version

结果显示如下

java version "1.8.0_141"

Java(TM) SE Runtime Environment (build 1.8.0_141-b15)

Java HotSpot(TM) 64-Bit Server VM (build 25.141-b15, mixed mode)

三、Zookeeper集群安装

1)下载安装包

下载完成后上传到集群上

2)解压安装包

cd /export/software/

tar -zxvf zookeeper-3.4.14.tar.gz -C ../servers/

3)创建软链接

cd /export/servers/

ln -s zookeeper-3.4.14 zookeeper

4)修改配置文件

cd /export/servers/zookeeper/conf

cp zoo_sample.cfg zoo.cfg

mkdir -p /export/servers/zookeeper/zkdatas/

vim zoo.cfg

# 指定zk文件存储目录

dataDir=/export/servers/zookeeper/zkdatas

# 取消下面两行注释

autopurge.snapRetainCount=3

autopurge.purgeInterval=1

# 添加下面三行

server.1=kafka01:2888:3888

server.2=kafka02:2888:3888

server.3=kafka03:2888:3888

5)添加myid配置

echo 1 > /export/servers/zookeeper/zkdatas/myid

6)分发安装包

scp -r /export/servers/zookeeper-3.4.14/ kafka02:/export/servers/

scp -r /export/servers/zookeeper-3.4.14/ kafka03:/export/servers/

7)其他机器创建软链接及修改myid值

ln -s /export/servers/zookeeper-3.4.14/ /export/servers/zookeeper

kafka02

echo 2 > /export/servers/zookeeper/zkdatas/myid

kafka03

echo 3 > /export/servers/zookeeper/zkdatas/myid

8)启动zookeeper集群

启动脚本如下

zookeeper-start.sh

#!/bin/bash

echo "1.启动节点kafka01、kafka02和kafka03......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh start"

done

#休眠1秒

sleep 1

echo "2.查看集群各节点状态......"

for n in kafa01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh status"

done

停止脚本如下

zookeeper-stop.sh

#!/bin/bash

echo "1.停止节点kafka01、kafka02和kafka03......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh stop"

done

#休眠1秒

sleep 1

echo "4. 查看集群各节点状态......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/zookeeper/bin/zkServer.sh status"

done

四、kafka集群安装

1)下载安装包

下载完成后上传到集群上

2)解压安装包

cd /export/software/

tar -zxvf kafka_2.11-2.2.1.tgz -C ../servers/

3)创建软链接

ln -s kafka_2.11-2.2.1/ kafka

4)创建logs文件夹

cd /export/servers/kafka

mkdir logs

5)备份并修改配置文件

cd /export/servers/kafka/config

cp server.properties server.properties.bak

修改内容

vim server.properties

############################# Server Basics #############################

#broker的全局唯一编号,不能重复

broker.id=0

############################# Socket Server Settings #############################

# 监听列表

listeners=PLAINTEXT://kafka01:9092

# 处理网络请求的线程数, 一般等于核心数

num.network.threads=3

# 用来处理磁盘IO的线程数

num.io.threads=8

# socket 发送缓冲区

socket.send.buffer.bytes=102400

# socket 接收缓冲区

socket.receive.buffer.bytes=102400

# socket请求最大数值,防止serverOOM

socket.request.max.bytes=104857600

############################# Log Basics #############################

# kafka运行日志存放的路径

log.dirs=/export/servers/kafka/logs

# 默认分区数 会被命令行参数覆盖

num.partitions=3

# 用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings #############################

# 内置主题 "__consumer_offsets" and "__transaction_state" 的副本数

offsets.topic.replication.factor=3

transaction.state.log.replication.factor=3

transaction.state.log.min.isr=3

############################# Log Flush Policy #############################

# The number of messages to accept before forcing a flush of data to disk

#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush

#log.flush.interval.ms=1000

############################## Log Retention Policy #############################

# topic数据默认保留时长, 10天

log.retention.hours=240

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining

# segments drop below log.retention.bytes. Functions independently of log.retention.hours.

#log.retention.bytes=1073741824

# 一个消息长度, 超过再创建一个

log.segment.bytes=1073741824

# 文件大小检查周期

log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 配置连接Zookeeper集群地址

zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181

# zookeeper连接超时时间

zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################

group.initial.rebalance.delay.ms=0

############################# Other #############################

# 数据副本个数

default.replication.factor=3

# 单条消息最大长度4Mb

message.max.bytes=4000000

# broker可复制的消息的最大字节数。值应该比message.max.bytes大

replica.fetch.max.bytes=4194304

# 消费者能读取的最大消息,值应该大于或等于message.max.bytes

max.partition.fetch.bytes=4194304

# 允许删除主题

delete.topic.enable=true

6)各节点分别添加环境变量

vim /etc/profile

#KAFKA_HOME

export KAFKA_HOME=/export/servers/kafka/

export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile

7)分发安装包

注意:kafka02和kafka03

scp -r /export/servers/kafka_2.11-2.2.1/ kafka02:/export/servers/

scp -r /export/servers/kafka_2.11-2.2.1/ kafka03:/export/servers/

8)其他机器创建软链接及修改配置文件

ln -s /export/servers/kafka_2.11-2.2.1 /export/servers/kafka

kafka02

broker.id=1

listeners=PLAINTEXT://kafka02:9092

kafka03

broker.id=2

listeners=PLAINTEXT://kafka03:9092

9)启动kafka集群

启动脚本如下

kafka-start.sh

#!/bin/bash

echo "正在启动kafka集群......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/kafka/bin/kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties"

done

echo "启动kafka集群完成......"

停止脚本如下

kafka-stop.sh

#!/bin/bash

echo "正在关闭kafka集群......"

for n in kafka01 kafka02 kafka03

do

ssh $n "source /etc/profile;/export/servers/kafka/bin/kafka-server-stop.sh"

done

echo "关闭kafka集群完成......"

10)kafka测试

# 创建topic

kafka-topics.sh --create --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 --replication-factor 3 --partitions 3 --topic topic-test

# 查看topic详情

kafka-topics.sh --describe --zookeeper kafka01:2181,kafka02:2181,kafka03:2181 localhost:2181 --topic topic-test

# 开启消息生产者

kafka-console-producer.sh --broker-list kafka01:9092,kafka02:9092,kafka03:9092 --topic topic-test

# 开启消息消费者

kafka-console-consumer.sh --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --from-beginning --topic topic-test

在消息生产者发送消息消费者能收到即可

kafka安装完整步骤_还在寻找Kafka最新的安装教程吗?精细的安装步骤分享给大家...相关推荐

  1. kafka计算机专业读法_面试官:Kafka 为什么快?

    无论 kafka 作为 MQ 也好,作为存储层也罢,无非就是两个功能(好简单的样子),一是 Producer 生产的数据存到 broker,二是 Consumer 从 broker 读取数据.那 Ka ...

  2. 网件r6300安装mysql数据库_网件(NETGEAR)R6300 V1/V2路由器设置教程【图文】

    连接方式请参考上面,电话线上网时的连接. 网件(NETGEAR)R6300路由器 电话线/光钎接入上网时,NETGEAR R6300路由器的正确连接方式 第一步.设置电脑IP地址在设置网件(NETGE ...

  3. 三阶矩阵的lu分解详细步骤_数学 - 线性代数导论 - #4 矩阵分解之LU分解的意义、步骤和成立条件...

    线性代数导论 - #4 矩阵分解之LU分解的意义.步骤和成立条件 目前我们用于解线性方程组的方法依然是Gauss消元法.在Gauss消元法中,我们将右侧向量b与A写在一起作为一个增广矩阵进行同步的操作 ...

  4. kafka 同步提交 异步_极限MQ (5) Kafka 消费者

    要想知道如何从 Kafka 读取消息,需要先了解消费者和消费者群组的概念. 假设我们有一个应用程序需要从 Kafka 主题读取消息井验证这些消息,然后再把它们保存起来.应用程序需要创建一个消费者对象, ...

  5. django权限系统实现步骤_博主营地 | Unity红点系统如何实现?超全步骤分享

    「Unity博主营地第一期」于2019年11月开启,现已收到数百篇原创投稿.每周根据Unity Connect社区反馈,帮助大家发现最优质.最干货.最受欢迎的博文作品. 在使用Unity开发游戏的时候 ...

  6. win10默认安装路径修改_解放C盘!2种方法教你修改默认安装目录!

    C盘是什么风水宝地?怎么所有的应用都想往里装,每次都要手动修改安装位置,烦不胜烦. 不知道各位小伙伴有没有这样的困扰,为了不让c盘太臃肿,每次安装应用都要一遍遍的修改安装位置,但是有时候还是一不注意就 ...

  7. 文字台式计算机组装步骤,2017最新DIY主机教程 组装机图文+文字安装步骤

    最近京东618活动开始了,小编身边好几位同事都够买新主机了,不过对于喜欢动手的玩家们来说自己购买配件安装主机倒也是一种享受.因为是自己亲手组装配置的机子,会感觉备有成就感哦.好了,还不知道怎么DIY组 ...

  8. 查看宝塔面板账号密码命令_宝塔面板升级到最新版图文教程

    往期教程: 宝塔面板教程(1)基于云服务器搭建宝塔面板教程最全详解 宝塔面板教程(2)宝塔面板添加WordPress站点详细图文教程 宝塔面板教程(3)基于宝塔面板成功配置网站SSL安全证书 宝塔面板 ...

  9. 安装esp8266库_基于ESP8266,DIY低成本智能远程开关灯小装置

    废话不多说,直接开始 首先需要必备的环境arduino ide,Blinker app(应用商店均可下载)以及必备的一些库(aduino ide1.8.7及以上版本,esp8266拓展,blinker ...

最新文章

  1. php根据宽度显示html,我怎样才能动态地改变PHP的HTML div的宽度?
  2. Android 抽屉效果的导航菜单实现
  3. 1390: 队列问题(1)(vector模拟)
  4. map的详解及常见面试题
  5. 【已解决】请在位于当前 Web 应用程序根目录下的“web.config”配置文件中创建一个 <customErrors> 标记
  6. Android流量监控以及流量防火墙的概述
  7. UE使用EditorUtilityWidget完成简单的编辑器内工具
  8. 三大迷宫生成算法 (Maze generation algorithm) -- 深度优先,随机Prim,递归分割
  9. redis命令行查询某个key值
  10. 抓取中央气象台-台风报文数据
  11. 【应用安全】“我的QQ被盗,请大家不要相信任何消息.......”
  12. 关于内存、外存、磁盘、硬盘、软盘、光盘的区别
  13. linux can 总线socket接口测试使用
  14. Verilog分频器的设计(6分频和1.5分频)
  15. iPhone 13屏幕卡死黑屏、无法关机?如何解决
  16. [XSY 3147]子集计数
  17. 操作系统:文件的物理结构(文件分配方式)
  18. Xbox手柄转子马达的控制运用机制原理
  19. PTA 6-1 舞伴问题
  20. 微信论坛交流小程序系统毕业设计毕设(4)开题报告

热门文章

  1. vs中未能找到list.cs_意甲7-29 03:45国际米兰vs那不勒斯,国际米兰主场十拿九稳
  2. 从html富文本中提取纯文本
  3. [QA翻译]如何在Storm里拆分stream流?
  4. 彻底删除SQL Server2005(转)
  5. Android-JNI开发系列《三》-异常处理
  6. 用python绘制心形_如何利用Python绘制一个爱心
  7. PyCharm中文指南2.0
  8. C++初始化参数列表对成员函数初始化
  9. android MemeoryFile和Parcel操作文件描述符fd
  10. emacs for Mac命令