Kafka集群与运维

10.1 集群应用场景

10.1.1 消息传递

Kafka可以很好地替代传统邮件代理。消息代理的使用有多种原因(将处理与数据生产者分离,缓冲未处理的消息等)。与大多数邮件系统相比,Kafka具有更好的吞吐量,内置的分区,复制和容错功能,这使其成为大规模邮件处理应用程序的理想解决方案。根据我们的经验,消息传递的使用通常吞吐量较低,但是可能需要较低的端到端延迟,并且通常取决于Kafka提供的强大的持久性保证。

在这个领域,Kafka与ActiveMQ或 RabbitMQ等传统消息传递系统相当。

10.1.2 网站活动路由

Kafka最初的用例是能够将用户活动跟踪管道重建为一组实时的发布-订阅。这意味着将网站活动 (页面浏览,搜索或用户可能采取的其他操作)发布到中心主题,每种活动类型只有一个主题。这些提要可用于一系列用例的订阅,包括实时处理,实时监控,以及加载到Hadoop或脱机数据仓库系统中以进行脱机处理和报告。

活动跟踪通常量很大,因为每个用户页面视图都会生成许多活动消息。

10.1.3 监控指标

Kafka通常用于操作监控数据。这涉及汇总来自分布式应用程序的统计信息,以生成操作数据的集中。

10.1.4 日志汇总

许多人使用Kafka代替日志聚合解决方案。日志聚合通常从服务器收集物理日志文件,并将它们放在中央位置(也许是文件服务器或HDFS)以进行处理。Kafka提取文件的详细信息,并以日志流的形式更清晰地抽象日志或事件数据。这允许较低延迟的处理,并更容易支持多个数据源和分布式数据消耗。 与以日志为中心的系统(例如Scribe或Flume)相比,Kafka具有同样出色的性能,由于复制而提供的更强的耐用性保证以及更低的端到端延迟。.3

10.1.5 流处理

Kafka的许多用户在由多个阶段组成的处理管道中处理数据,其中原始输入数据从Kafka主题中使用,然后进行汇总,充实或以其他方式转换为新主题,以供进一步使用或后续处理。例如,用于推荐新闻文章的处理管道可能会从RSS提要中检索文章内容,并将其发布到“文章”主题中。进一步的处理可能会使该内容规范化或重复数据删除,并将清洗后的文章内容发布到新主题中;最后的处理阶段可能会尝试向用户推荐此内容。这样的处理管道基于各个主题创建实时数据流的图形。从0.10.0.0开始,一个轻量但功能强大的流处理库称为Kafka Streams 可以在Apache Kafka中使用来执行上述数据处理。除了Kafka Streams以外,其他开源流处理工具还包括Apache Storm和 Apache Samza。

10.1.6 活动采集

事件源是一种应用程序,其中状态更改以时间顺序记录。Kafka对大量存储的日志数据的支持使其成为以这种样式构建的应用程序的绝佳后端。

10.1.7 提交日志

Kafka可以用作分布式系统的一种外部提交日志。该日志有助于在节点之间复制数据,并充当故障节点恢复其数据的重新同步机制。Kafka中的日志压缩功能有助于支持此用法。在这种用法中,Kafka类似于Apache BookKeeper项目。

kafka集群的作用:

1. 横向扩展,提高Kafka的处理能力

2. 镜像,副本,提供高可用。

10.2 集群搭建

1.搭建设计

2. 分配三台Linux,用于安装拥有三个节点的Kafka集群。

  • node2(192.168.100.102)
  • node3(192.168.100.103)
  • node4(192.168.100.104)

以上三台主机的/etc/hosts配置:

192.168.100.101 node1
192.168.100.102 node2
192.168.100.103 node3
192.168.100.104 node4

10.2.1 zookeeper集群搭建

参考分布式协调服务框架——zookeeper集群搭建部分内容!

10.2.2 Kafka集群搭建

1.上传并解压kafka到/opt

# 解压到/opt
tar -zxf kafka_2.12-1.0.2.tgz -C /opt
# 拷贝到node3和node4
scp -r /opt/kafka_2.12-1.0.2/ node3:/opt
scp -r /opt/kafka_2.12-1.0.2/ node4:/opt

2.配置kafka

# 配置环境变量,三台Linux都要配置
vim /etc/profile
# 添加以下内容:
export KAFKA_HOME=/opt/kafka_2.12-1.0.2
export PATH=$PATH:$KAFKA_HOME/bin# 让配置生效
source /etc/profile# linux121配置
vim /opt/kafka_2.12-1.0.2/config/server.propertiesbroker.id=0
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://linux121:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka
# 其他使用默认配置# linux122配置
vim /opt/kafka_2.12-1.0.2/config/server.propertiesbroker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://linux122:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka
# 其他使用默认配置# linux123配置
vim /opt/kafka_2.12-1.0.2/config/server.propertiesbroker.id=2
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://linux123:9092
log.dirs=/var/lagou/kafka/kafka-logs
zookeeper.connect=linux121:2181,linux122:2181,linux123:2181/myKafka
# 其他使用默认配置

3.启动kafka(先启动zookeeper集群)

[root@node2 ~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
[root@node3 ~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
[root@node4 ~]# kafka-server-start.sh /opt/kafka_2.12-1.0.2/config/server.properties
kafka-server-start.sh -daemon /opt/lagou/servers/kafka_2.12-1.0.2/config/server.properties

4.验证kafka

三个节点上的cluster ID都是一样的,表示三台节点在同一个集群。

  • Cluster Id是一个唯一的不可变的标志符,用于唯一标志一个Kafka集群。
  • 该Id最多可以有22个字符组成,字符对应于URL-safe Base64。
  • Kafka 0.10.1版本及之后的版本中,在集群第一次启动的时候,Broker从Zookeeper的<Kafka_ROOT>/cluster/id节点获取。如果该Id不存在,就自动生成一个新的。
zkCli.sh
# 查看每个Broker的信息
get /myKafka/brokers/ids/0
get /myKafka/brokers/ids/1
get /myKafka/brokers/ids/2

10.3 集群监控

10.3.1 监控度量指标

Kafka使用Yammer Metrics在服务器和Scala客户端中报告指标。Java客户端使用Kafka Metrics, 它是一个内置的度量标准注册表,可最大程度地减少拉入客户端应用程序的传递依赖项。两者都通过JMX公开指标,并且可以配置为使用可插拔的统计报告器报告统计信息,以连接到您的监视系统。

具体的监控指标可以查看官方文档。

10.3.1.1 JMX

10.3.1.1.1 Kafka开启Jmx端口

[root@node4 bin]# vim /opt/kafka_2.12-1.0.2/bin/kafka-server-start.sh

所有kafka机器添加一个JMX_PORT ,并重启kafka

10.3.1.1.2 验证JMX开启

首先打印9581端口占用的进程信息,然后使用进程编号对应到Kafka的进程号,搞定。

也可以查看Kafka启动日志,确定启动参数 -Dcom.sun.management.jmxremote.port=9581 存在即可

10.3.1.2 使用JConsole链接JMX端口

1. win/mac,找到jconsole工具并打开,在 Mac电脑可以直接命令行输入 jconsole

详细的监控指标

官方文档:http://kafka.apache.org/10/documentation.html#monitoring

这里列出常用的:

OS监控项

broker指标

producer以及topic指标

consumer指标

10.3.1.3 编程手段来获取监控指标

查看要监控哪个指标:

# 制造数据
for i in `seq 10000`;do echo "hello word $i" >> msg.txt;done# 创建主题
kafka-topic --zookeeper linux121:2181/mykafka --create --topic tp_demo_01 --partitions 3 --replication-factor 2# 把数据向主题传输
kafka-console-producer.sh --broker-list linux121:9092,linux122:9092 --topic tb_demo_01 < msg.txt

代码实现

package com.lagou.kafka.demo.monitor;import javax.management.*;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;public class JMXMonitorDemo {public static void main(String[] args) throws IOException, MalformedObjectNameException, AttributeNotFoundException, MBeanException, ReflectionException, InstanceNotFoundException {String jmxServiceURL = "service:jmx:rmi:///jndi/rmi://192.168.100.103:9581/jmxrmi";JMXServiceURL jmxURL = null;JMXConnector jmxc = null;MBeanServerConnection jmxs = null;ObjectName mbeanObjName = null;Iterator sampleIter = null;Set sampleSet = null;// 创建JMXServiceURL对象,参数是jmxURL = new JMXServiceURL(jmxServiceURL);// 建立到指定URL服务器的连接jmxc = JMXConnectorFactory.connect(jmxURL);// 返回代表远程MBean服务器的MBeanServerConnection对象jmxs = jmxc.getMBeanServerConnection();// 根据传入的字符串,创建ObjectName对象
//        mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec");
//        mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec,topic=tp_eagle_01");mbeanObjName = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec");// 获取指定ObjectName对应的MBeanssampleSet = jmxs.queryMBeans(null, mbeanObjName);// 迭代器sampleIter = sampleSet.iterator();if (sampleSet.isEmpty()) {} else {// 如果返回了,则打印信息while (sampleIter.hasNext()) {// Used to represent the object name of an MBean and its class name.// If the MBean is a Dynamic MBean the class name should be retrieved from the MBeanInfo it provides.// 用于表示MBean的ObjectName和ClassNameObjectInstance sampleObj = (ObjectInstance) sampleIter.next();ObjectName objectName = sampleObj.getObjectName();// 查看指定MBean指定属性的值String count = jmxs.getAttribute(objectName, "Count").toString();System.out.println(count);}}// 关闭jmxc.close();}
}

10.3.2 监控工具 Kafka Eagle

我们可以使用Kafka-eagle管理Kafka集群

核心模块:

  • 面板可视化
  • 主题管理,包含创建主题、删除主题、主题列举、主题配置、主题查询等
  • 消费者应用:对不同消费者应用进行监控,包含Kafka API、Flink API、Spark API、Storm API、Flume API、LogStash API等
  • 集群管理:包含对Kafka集群和Zookeeper集群的详情展示,其内容包含Kafka启动时间、 Kafka端口号、Zookeeper Leader角色等。同时,还有多集群切换管理,Zookeeper Client 操作入口
  • 集群监控:包含对Broker、Kafka核心指标、Zookeeper核心指标进行监控,并绘制历史趋势图
  • 告警功能:对消费者应用数据积压情况进行告警,以及对Kafka和Zookeeper监控度进行告警。同时,支持邮件、微信、钉钉告警通知
  • 系统管理:包含用户创建、用户角色分配、资源访问进行管理

架构:

  • 可视化:负责展示主题列表、集群健康、消费者应用等
  • 采集器:数据采集的来源包含Zookeeper、Kafka JMX & 内部Topic、Kafka API(Kafka 2.x以 后版本)
  • 数据存储:目前Kafka Eagle存储采用MySQL或SQLite,数据库和表的创建均是自动完成的, 按照官方文档进行配置好,启动Kafka Eagle就会自动创建,用来存储元数据和监控数据
  • 监控:负责见消费者应用消费情况、集群健康状态
  • 告警:对监控到的异常进行告警通知,支持邮件、微信、钉钉等方式
  • 权限管理:对访问用户进行权限管理,对于管理员、开发者、访问者等不同角色的用户,分配不同的访问权限

1.需要Kafka节点开启JMX。前面讲过了。

# 下载编译好的包
wget http://pkgs-linux.cvimer.com/kafka-eagle.zip
# 配置kafka-eagle
unzip kafka-eagle.zip
cd kafka-eagle/kafka-eagle-web/target
mkdir -p test
cp kafka-eagle-web-2.0.1-bin.tar.gz test/
tar xf kafka-eagle-web-2.0.1-bin.tar.gz
cd kafka-eagle-web-2.0.1

2.需要配置环境变量:

# KAKA_EAGLE_HOME
export KE_HOME=/opt/lagou/servers/kafka-eagle/kafka-eagle-web/target/test/kafka-eagle-web-2.0.1
export PATH=$PATH:$KE_HOME/bin

3.conf下的配置文件:system-config.properties

# 集群的别名,用于在kafka-eagle中进行区分。
# 可以配置监控多个集群,别名用逗号隔开
kafka.eagle.zk.cluster.alias=cluster1
# 配置当前集群的zookeeper地址,此处的值要与Kafka的server.properties中的 zookeeper.connect的值一致
# 此处的前缀就是集群的别名
cluster2.zk.list=linux121:2181,linux122:2181,linux123:2181/myKafka
# 修改url路径,并把路径创建出来
kafka.eagle.url=jdbc:sqlite:/root/hadoop/kafka-eagle/db/ke.db

4.也可以自行编译,https://github.com/smartloli/kafka-eagle 创建Eagel的存储目录: mkdir -p /hadoop/kafka-eagle

5.在三台机器都配置JMX_PORT=9581

vim /opt/lagou/servers/kafka_2.12-1.0.2/bin/kafka-run-class.shJMX_PORT=9581
 

6.重新启动zookeeper、kafka、kafka_eagle

# 启动kafka-eagle
./bin/ke.sh start

界面

输入默认的账号密码:admin/123456

可以在该界面操作、监控kafka集群。

代码实现

10 Kafka集群与运维相关推荐

  1. 4.2.5 Kafka集群与运维(集群的搭建、监控工具 Kafka Eagle)

    Kafka集群与运维 文章目录 Kafka集群与运维 1.集群的搭建 1.1 搭建zookeeper集群 1.1.1 上传JDK到linux,安装并配置JDK 1.1.2. Linux 安装Zooke ...

  2. 4.2.9 Kafka集群与运维, 应用场景, 集群搭建, 集群监控JMX(度量指标, JConsole, 编程获取, Kafka Eagle)

    目录 3.1 集群应用场景 1 消息传递 2 网站活动路由 3 监控指标 4 日志汇总 5 流处理 6 活动采集 7 提交日志 总结 3.2 集群搭建 3.2.1 Zookeeper集群搭建 3.2. ...

  3. rabbitmq基础5——集群节点类型、集群基础运维,集群管理命令,API接口工具

    文章目录 一.集群节点类型 1.1 内存节点 1.2 磁盘节点 二.集群基础运维 2.1 剔除单个节点 2.1.1 集群正常踢出正常节点 2.1.2 服务器异常宕机踢出节点 2.1.3 集群正常重置并 ...

  4. RocketMQ 集群平滑运维

    前言 在 RocketMQ 集群的运维实践中,无论线上 Broker 节点启动和关闭,还是集群的扩缩容,都希望是平滑的,业务无感知.正所谓 "随风潜入夜,润物细无声" ,本文以实际 ...

  5. 从400+节点ElasticSearch集群的运维中,我们总结了这些经验

    墨墨导读:国外一家舆情监控公司Meltwater每天处理的数据非常庞大--在高峰期需要索引大约300多万社论文章,和近1亿条社交帖子数据.其中社论数据长期保存以供检索(可回溯到2009年),社交帖子数 ...

  6. Hadoop集群日常运维

    (一)备份namenode的元数据 namenode中的元数据非常重要,如丢失或者损坏,则整个系统无法使用.因此应该经常对元数据进行备份,最好是异地备份. 1.将元数据复制到远程站点 (1)以下代码将 ...

  7. KingbaseES V8R6集群管理运维案例之---repmgr standby switchover故障

    案例说明: 在KingbaseES V8R6集群备库执行"repmgr standby switchover"时,切换失败,并且在执行过程中,伴随着"repmr stan ...

  8. ELK集群+Kafka集群+FileBeat——命运多舛的安装采坑之路

    欢迎大家关注我的公众号,添加我为好友! 开始的时候感觉日志监控是比较NB的技术,感觉很神奇,那么多日志,为什么一下子就能够找到自己想要的?后来初步了解到了ELK(ElasticSearch + Log ...

  9. 滴滴Logi-KafkaManager开源之路:一站式Kafka集群指标监控与运维管控平台

    导读 从2019年4月份计划开源到2021月1月14号完成开源,历时22个月终于修成正果,一路走来实属不易,没有前端.设计.产品,我们找实习生.合作方.外部资源支持,滴滴Kafka服务团队人员也几经调 ...

最新文章

  1. 学习Windows Phone手机开发:Tile的使用
  2. Oracle NVL函数的用法
  3. 入选 Forrester 领导者象限,阿里云 Serverless 产品能力全球第一
  4. CodeForces - 1537E2 Erase and Extend (Hard Version)(扩展KMP-比较两个前缀无限循环后的字典序大小)
  5. 代码小结:时区的时间问题
  6. 如何利用系统自带的小工具制作特殊字符
  7. 2.aop原理:@EnableAspectJAutoProxy
  8. 机器学习的环境搭建流程
  9. 苹果电脑忘记开机密码重设教程
  10. SAP顾问简历中常见的英文说法,可能你不太熟悉哦~~
  11. 盗QQ号的现在越来越牛B了,我差点被骗!大家要小心了
  12. uview ui与element ui的区别和用法
  13. Python语言程序设计 习题1
  14. 10 种跨域解决方案(附终极方案)
  15. 关于Anaconda(Miniconda)虚拟环境中的包的问题pkgs
  16. Python3脚本抢票
  17. 【地图】高德静态地图(页面展示和导出Word)
  18. Javascript-基础-学习笔记
  19. shineblink SYN6288语音输出
  20. 千万要找一个程序员谈恋爱!

热门文章

  1. C语言编程>第二十二周 ② 请补充fun函数,该函数的功能是:返回字符数组中指定字符的个数,指定字符从键盘输入。
  2. 用java下载apk解析包出错_教大家解析包时出现问题怎么解决
  3. pythonstdin_理解Python中的stdin stdout stderr - The Hard Way Is Easier
  4. 计算英文句子中有多少单词?
  5. 阿里云部署Java网站和微信开发调试心得技巧(下)
  6. 如何快速一键重装系统 一键重装系统图文教程
  7. 调用高德地图API接口,实现地铁站经纬度采集
  8. 【企业管理】管理学十大原理
  9. 高通MSM8916后面的0VV 1VV 3VV 等代表什么?
  10. 余弦窗cosine window