文章目录

  • 1 Kafka的安装与配置
  • 2 Kafka命令行操作
    • 2.1 查看当前服务器中的所有topic
    • 2.2 创建topic
    • 2.3 删除topic
    • 2.4 发送消息
    • 2.5 消费消息
    • 2.6 查看某个topic的详情
  • 3 Kafka群起脚本
  • 4 项目经验——Kafka压力测试
    • 4.1 Kafka压测
    • 4.2 生产者压测 producer
    • 4.3 消费者压测 consumer
  • 5 项目经验——Kafka机器数量计算
  • 6 Hadoop、zookeeper、flume、kafka群起群停脚本
  • 7 hadoop部署相关的默认端口
    • 7.1 系统
    • 7.2 Web UI
    • 7.3 内部端口
    • 7.4 相关产品端口
    • 7.5 YARN(Hadoop 2.0)缺省端口
    • 7.6 第三方产品端口

欢迎访问笔者个人技术博客: http://rukihuang.xyz/
学习视频来源于尚硅谷,视频链接: 尚硅谷大数据项目数据仓库,电商数仓V1.2新版,Respect!

1 Kafka的安装与配置

  1. kafka_2.11-0.11.0.2.tgz上传至102机器/opt/software目录下,并解压安装至/opt/module,重命名为kafka
tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
mv kafka_2.11-0.11.0.0/ kafka
  1. /opt/module/kafka创建logs文件夹
mkdir logs
  1. 修改配置文件/opt/module/kafka/config/server.properties

    1. 集群3个节点的broker.id必须不一样,hadoop102的为0,103的为1,104的为2
    2. 修改log.dirs=/opt/module/kafka/logs
    3. 放开delete.topic.enable=true,使topic可以被删除
#broker 的全局唯一编号,不能重复
broker.id=0
#删除topic 功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO 的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka 运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic 在当前broker 上的分区个数
num.partitions=1
#用来恢复和清理data 下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment 文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper 集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
  1. 分发至其他机器,/opt/module/kafka/config/server.properties的broker.id必须得改
xsync /opt/module/kafka
  1. 配置环境变量,新增kafka环境变量(另外2个节点的也要手动修改)
vim /etc/profile.d/env.sh
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
#使配置文件生效
source /etc/profile.d/env.sh
  1. 单节点启动脚本(群起的话,在3个节点挨个执行)(/opt/module/kafka)
bin/kafka-server-start.sh config/server.properties &
  1. 单节点停止脚本(群停的话,在3个节点挨个之心)(同上)
bin/kafka-server-stop.sh stop

2 Kafka命令行操作

  • 均在/opt/module/kafka目录下执行

2.1 查看当前服务器中的所有topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

2.2 创建topic

#创建topic_start
bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions 1 --topic topic_start
#创建topic_event
bin/kafka-topics.sh --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --create --replication-factor 1 --partitions 1 --topic topic_event
  • 选项说明:

    • --topic 定义topic 名
    • --replication-factor定义副本数
    • --partitions 定义分区数

2.3 删除topic

bin/kafka-topics.sh --delete --zookeeper hadoop102:2181,hadoop103:2181,hadoop104:2181 --topic topic_stop
  • 需要server.properties中设置delete.topic.enable=true,否则只是标记删除或者直接重启。

2.4 发送消息

bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic topic_start

2.5 消费消息

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic topic_start
  • --from-beginning:会把该topic中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

2.6 查看某个topic的详情

bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic topic_start

3 Kafka群起脚本

  1. ~/bin目录下创建kf.sh
#!/bin/bashcase $1 in
"start"){for i in hadoop102 hadoop103 hadoop104doecho --------$i 启动kafka---------ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"done
};;
"stop"){for i in hadoop102 hadoop103 hadoop104doecho --------$i 停止kafka---------ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"done
};;
esac
  1. 增加脚本执行权限
chmod 777 kf.sh
  1. kafka集群启动
kf.sh start
  1. kafka集群停止
kf.sh stop

4 项目经验——Kafka压力测试

4.1 Kafka压测

  • 用Kafka 官方自带的脚本,对Kafka 进行压测。Kafka 压测时,可以查看到哪个地方出现了瓶颈(CPU,内存,网络IO)。一般都是网络IO 达到瓶颈。生产和消费的脚本分别为:

    • kafka-consumer-perf-test.sh
    • kafka-producer-perf-test.sh

4.2 生产者压测 producer

  • /opt/module/kafka
bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
  • 说明

    • record-size是一条信息有多大,单位是字节。
    • num-records 是总共发送多少条信息。
    • throughput 是每秒多少条信息,设成-1,表示不限流,可测出生产者最大吞吐量。
  • 结果

4.3 消费者压测 consumer

  • /opt/module/kafka
bin/kafka-consumer-perf-test.sh --zookeeper hadoop102:2181 --topic test --fetch-size 10000 --messages 10000000 --threads 1
  • 说明

    • --zookeeper 指定zookeeper 的链接信息
    • --topic 指定topic 的名称
    • --fetch-size 指定每次fetch 的数据的大小
    • --messages总共要消费的消息个数
  • 结果

5 项目经验——Kafka机器数量计算

Kafka 机器数量(经验公式)=2* (峰值生产速度 * 副本数/100)+1
先拿到峰值生产速度,再根据设定的副本数,就能预估出需要部署Kafka 的数量。
比如我们的峰值生产速度是50M/s。副本数为2。
Kafka 机器数量=2* (50*2/100)+ 1=3 台

6 Hadoop、zookeeper、flume、kafka群起群停脚本

case $1 in
"start"){echo " -------- 启动集群-------"echo " -------- 启动hadoop 集群-------"/opt/module/hadoop-2.7.2/sbin/start-dfs.shssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/start-yarn.sh"#启动Zookeeper 集群zk.sh startsleep 4s;#启动Flume 采集集群f1.sh start#启动Kafka 采集集群kf.sh startsleep 6s;#启动Flume 消费集群f2.sh start
};;
"stop"){echo " -------- 停止集群-------"#停止Flume 消费集群f2.sh stop#停止Kafka 采集集群kf.sh stopsleep 6s;#停止Flume 采集集群f1.sh stop#停止Zookeeper 集群zk.sh stopecho " -------- 停止hadoop 集群-------"ssh hadoop103 "/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh" /opt/module/hadoop-2.7.2/sbin/stop-dfs.sh
};;
esac
  • 注意点:

    • 其中 kafka依赖zk,zk一挂,kafka就再也关不了了,需要手动Kill,或者加延时,flume之前也要加延时,避免提前消费导致消费不到数据
    • 使用kafka-server-stop脚本无法关闭kafka,解决方式参考https://blog.csdn.net/KingAnne/article/details/101034794

7 hadoop部署相关的默认端口

7.1 系统

  • 808080 :用于tomcat和apache的端口。

  • 22 :ssh的端口

7.2 Web UI

用于访问和监控Hadoop系统运行状态

Daemon 缺省端口 配置参数
HDFS Namenode 50070 dfs.http.address
Datanodes 50075 dfs.datanode.http.address
Secondarynamenode 50090 dfs.secondary.http.address
Backup/Checkpoint node* 50105 dfs.backup.http.address
MR Jobracker 50030 mapred.job.tracker.http.address
Tasktrackers 50060 mapred.task.tracker.http.address
HBase HMaster 60010 hbase.master.info.port
HRegionServer 60030 hbase.regionserver.info.port
** hadoop 0.21以后代替secondarynamenode*

7.3 内部端口

Daemon 缺省端口 配置参数 协议 用于
Namenode 9000 fs.default.name IPC: ClientProtocol Filesystem metadata operations.
Datanode 50010 dfs.datanode.address Custom Hadoop Xceiver: DataNodeand DFSClient DFS data transfer
Datanode 50020 dfs.datanode.ipc.address IPC:InterDatanodeProtocol,ClientDatanodeProtocol ClientProtocol Block metadata operations and recovery
Backupnode 50100 dfs.backup.address 同 namenode HDFS Metadata Operations
Jobtracker 9001 mapred.job.tracker IPC:JobSubmissionProtocol,InterTrackerProtocol Job submission, task tracker heartbeats.
Tasktracker 127.0.0.1:0* mapred.task.tracker.report.address IPC:TaskUmbilicalProtocol 和 child job 通信
** 绑定到未用本地端口*

7.4 相关产品端口

产品 服务 缺省端口 参数 范围 协议 说明
HBase Master 60000 hbase.master.port External TCP IPC
Master 60010 hbase.master.info.port External TCP HTTP
RegionServer 60020 hbase.regionserver.port External TCP IPC
RegionServer 60030 hbase.regionserver.info.port External TCP HTTP
HQuorumPeer 2181 hbase.zookeeper.property.clientPort TCP HBase-managed ZK mode
HQuorumPeer 2888 hbase.zookeeper.peerport TCP HBase-managed ZK mode
HQuorumPeer 3888 hbase.zookeeper.leaderport TCP HBase-managed ZK mode
REST Service 8080 hbase.rest.port External TCP
ThriftServer 9090 Pass -p<port> on CLI External TCP
Avro server 9090 Pass –port <port> on CLI External TCP
Hive Metastore 9083 External TCP
HiveServer 10000 External TCP
Sqoop Metastore 16000 sqoop.metastore.server.port External TCP
ZooKeeper Server 2181 clientPort External TCP Client port
Server 2888 X in server.N=host:X:Y Internal TCP Peer
Server 3888 Y in server.N=host:X:Y Internal TCP Peer
Server 3181 X in server.N=host:X:Y Internal TCP Peer
Server 4181 Y in server.N=host:X:Y Internal TCP Peer
Hue Server 8888 External TCP
Beeswax Server 8002 Internal
Beeswax Metastore 8003 Internal
Oozie Oozie Server 11000 OOZIE_HTTP_PORT in oozie-env.sh External TCP HTTP
Oozie Server 11001 OOZIE_ADMIN_PORT in oozie-env.sh localhost TCP Shutdown port

7.5 YARN(Hadoop 2.0)缺省端口

产品 服务 缺省端口 配置参数 协议
Hadoop YARN ResourceManager 8032 yarn.resourcemanager.address TCP
ResourceManager 8030 yarn.resourcemanager.scheduler.address TCP
ResourceManager 8031 yarn.resourcemanager.resource-tracker.address TCP
ResourceManager 8033 yarn.resourcemanager.admin.address TCP
ResourceManager 8088 yarn.resourcemanager.webapp.address TCP
NodeManager 8040 yarn.nodemanager.localizer.address TCP
NodeManager 8042 yarn.nodemanager.webapp.address TCP
NodeManager 8041 yarn.nodemanager.address TCP
MapReduce JobHistory Server 10020 mapreduce.jobhistory.address TCP
MapReduce JobHistory Server 19888 mapreduce.jobhistory.webapp.address TCP

7.6 第三方产品端口

ganglia用于监控Hadoop和Hbase运行情况。kerberos是一种网络认证协议,相应软件由麻省理工开发。

产品 服务 安全 缺省端口 协议 访问 配置
Ganglia ganglia-gmond 8649 UDP/TCP Internal
ganglia-web 80 TCP External 通过 Apache httpd
Kerberos KRB5 KDC Server Secure 88 UDP*/TCP External [kdcdefaults] 或 [realms]段下的kdc_ports 和 kdc_tcp_ports
KRB5 Admin Server Secure 749 TCP Internal Kdc.conf 文件:[realms]段kadmind_

数据仓库 — 07_Kafka的安装与部署(Kafka命令行操作指令、Kafka集群群起脚本、压力测试、节点数量计算、hadoop_zookeeper_flume_kafka群起脚本、默认端口总结)相关推荐

  1. 【kafka】三、kafka命令行操作

    kafka命令行操作 kafka的相关操作命令脚本文件在bin目录下 查看所有的topic kafka-topics.sh --zookeeper hll1:2181 --list 或 kafka-t ...

  2. kafka 命令行操作大全

    kafka 命令行操作大全 一.集群相关常用命令 二.topic相关常用命令 2.1 脚本&参数简介 2.2 举例 三.生产者命令行常用操作 3.1 脚本&参数简介 3.2 举例发送消 ...

  3. 简单的kafka命令行操作

    目录 一.主题topic命令行操作 1.查看操作主题的命令参数 2.连接kafka地址,创建名为kaf的主题,指定分区和副本数量 3.查看所有主题的名称 4.查看主题的详细信息 5.修改主题(修改分区 ...

  4. kafka命令行操作大全

    最近利用flink使用一个流式SQL处理平台,利用kafka, mysql, hive等组件比较多,命令行突然间需要操作一次记不住命令很麻烦,索性直接整理成笔记. 在 0.9.0.0 之后的 Kafk ...

  5. linux下zookeeper启动命令,For Linux Zookeeper客户端命令行操作指令

    目录 客户端命令行操作 1.启动客户端 2.停止客户端 3.显示所有操作命令 4.查看当前节点信息 ls ~ 详细信息  ls2 5.分别创建两个普通节点 6.获取节点的值 7.创建短暂节点 ~ cr ...

  6. kafka命令行操作

    [README] kafka集群 -- kafka集群 3台机器 centos201 192.168.163.201 centos202 192.168.163.202 centos203 192.1 ...

  7. Kafka 命令行操作

    1)查看当前服务器中的所有 topic bin/kafka-topics.sh --zookeeper backup01:2181 使用命令  bin/kafka-topics.sh --list  ...

  8. kafka命令行操作,topic相关命令

    查看当前服务器中的所有topic: bin/kafka-topics.sh --zookeeper 主机:2181 --list 创建topic: bin/kafka-topics.sh --zook ...

  9. 大数据6_03_Kafka命令行操作

    3 Kafka命令行操作 总结: # 和topic有关的命令:都使用 kafka-topics-sh --zookeeper hadoop102:2181 + 操作 # 和生产消息有关的:使用brok ...

最新文章

  1. React history.push 传递参数
  2. Java(CallableStatement)调用Oracle存储过程返回结果集(ResultSet)
  3. Eclipse快捷键-方便查找
  4. windows上telnet用法 测试端口号
  5. 洛谷P4867 Gty的二逼妹子序列(莫队+树状数组)
  6. 高德地图只显示某个省份
  7. 小程序如何上传代码到服务器,云服务器怎么上传小程序代码
  8. 网页测速 php,2020年8月更新 站长必备/测速工具网站推荐
  9. GD32F103使用串口DMA收+发 信息(无中断)
  10. 国际化地区语言码对照表(i18n)
  11. c语言中char的作用,c语言中char的用法简介
  12. 旷视科技(Face++)和孙剑博士近期一些研究工作总结
  13. 已成功与服务器建立连接,但是在登录过程中发生错误。
  14. mysql事务回滚是什么意思_Mysql事务提交及事务回滚是什么意思
  15. 【java期末复习题】第4章 面向对象基础
  16. C++简单输入输出-计算火车运行时间
  17. 数学建模 2013B碎纸片的拼接复原
  18. COOX培训材料 — PMT(5.物料配方工单)
  19. 2020-10-24周总结
  20. springboot+mybatis+Oauth2 +vue 框架实现登录认证

热门文章

  1. python实现远程ssh登录
  2. FlowLayout
  3. 一个免费、开源、好用的GIF动画制作工具
  4. JAVA发送邮件以及添加附件
  5. 申万宏源:破发股票投资机会研究
  6. python-绘图-matplotlib
  7. Javascript阻止冒泡方法
  8. 食饮品牌如何用Social Power抓住消费者心智?
  9. 苹果xsmax有高通基带吗_高通基带终于来了!彻底抛弃英特尔的苹果12:信号真的稳了吗?...
  10. 泛微签约佛山市翁开尔贸易有限公司