一、Kafka集群环境

1、环境版本

版本:kafka2.11,zookeeper3.4

注意:这里zookeeper3.4也是基于集群模式部署。

2、解压重命名

tar -zxvf kafka_2.11-0.11.0.0.tgzmv kafka_2.11-0.11.0.0 kafka2.11

创建日志目录

[root@en-master kafka2.11]# mkdir logs

注意:以上操作需要同步到集群下其他服务上。

3、添加环境变量

vim /etc/profileexport KAFKA_HOME=/opt/kafka2.11export PATH=$PATH:$KAFKA_HOME/binsource /etc/profile

4、修改核心配置

[root@en-master /opt/kafka2.11/config]# vim server.properties-- 核心修改如下# 唯一编号broker.id=0# 开启topic删除delete.topic.enable=true# 日志地址log.dirs=/opt/kafka2.11/logs# zk集群zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

注意:broker.id安装集群服务个数编排即可,集群下不能重复。

5、启动kafka集群

# 启动命令[root@node02 kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties# 停止命令[root@node02 kafka2.11]# bin/kafka-server-stop.sh# 进程查看[root@node02 kafka2.11]# jps

注意:这里默认启动了zookeeper集群服务,并且集群下的kafka分别启动。

6、基础管理命令

创建topic

bin/kafka-topics.sh --zookeeper zk01:2181 --create --replication-factor 3 --partitions 1 --topic one-topic

参数说明:

  • replication-factor 定义副本个数
  • partitions 定义分区个数
  • topic:定义topic名称

查看topic列表

bin/kafka-topics.sh --zookeeper zk01:2181 --list

修改topic分区

bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5

查看topic

bin/kafka-topics.sh --zookeeper zk01:2181 --describe --topic one-topic

发送消息

bin/kafka-console-producer.sh --broker-list 192.168.72.133:9092 --topic one-topic

消费消息

bin/kafka-console-consumer.sh --bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic

删除topic

bin/kafka-topics.sh --zookeeper zk01:2181 --delete --topic first

7、Zk集群用处

Kafka集群中有一个broker会被选举为Controller,Controller依赖Zookeeper环境,管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。

二、消息拦截案例

1、拦截器简介

Kafka中间件的Producer拦截器主要用于实现消息发送的自定义控制逻辑。用户可以在消息发送前以及回调逻辑执行前有机会对消息做一些自定义,比如消息修改等,发送状态监控等,用户可以指定多个拦截器按顺序执行拦截。

核心方法

  • configure:获取配置信息和初始化数据时调用;
  • onSend:消息被序列化以及和计算分区前调用该方法,可以对消息做操作;
  • onAcknowledgement:消息发送到Broker之后,或发送过程失败时调用;
  • close:关闭拦截器调用,执行一些资源清理工作;

注意:这里说的拦截器是针对消息发送流程。

2、自定义拦截

定义方式:实现ProducerInterceptor接口即可。

拦截器一:在onSend方法中,对拦截的消息进行修改。

@Componentpublic class SendStartInterceptor implements ProducerInterceptor {    private final Logger LOGGER = LoggerFactory.getLogger("SendStartInterceptor");    @Override    public void configure(Map configs) {        LOGGER.info("configs...");    }    @Override    public ProducerRecord onSend(ProducerRecord record) {        // 修改消息内容        return new ProducerRecord<>(record.topic(), record.partition(),                                    record.timestamp(), record.key(),                              "onSend:{" + record.value()+"}");    }    @Override    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {        LOGGER.info("onAcknowledgement...");    }    @Override    public void close() {        LOGGER.info("SendStart close...");    }}

拦截器二:在onAcknowledgement方法中,判断消息是否发送成功。

@Componentpublic class SendOverInterceptor implements ProducerInterceptor {    private final Logger LOGGER = LoggerFactory.getLogger("SendOverInterceptor");    @Override    public void configure(Map configs) {        LOGGER.info("configs...");    }    @Override    public ProducerRecord onSend(ProducerRecord record) {        LOGGER.info("record...{}", record.value());        return record ;    }    @Override    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {        if (exception != null){            LOGGER.info("Send Fail...exe-msg",exception.getMessage());        }        LOGGER.info("Send success...");    }    @Override    public void close() {        LOGGER.info("SendOver close...");    }}

加载拦截器:基于一个KafkaProducer配置Bean,加入拦截器。

@Configurationpublic class KafkaConfig {    @Bean    public Producer producer (){        Properties props = new Properties();        // 省略其他配置...        // 添加拦截器        List interceptors = new ArrayList<>();        interceptors.add("com.kafka.cluster.interceptor.SendStartInterceptor");        interceptors.add("com.kafka.cluster.interceptor.SendOverInterceptor");        props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);        return new KafkaProducer<>(props) ;    }}

3、代码案例

@RestControllerpublic class SendMsgWeb {    @Resource    private KafkaProducer producer ;    @GetMapping("/sendMsg")    public String sendMsg (){        producer.send(new ProducerRecord<>("one-topic", "msgKey", "msgValue"));        return "success" ;    }}

基于上述自定义Bean类型,进行消息发送,关注拦截器中打印日志信息。

三、Kafka存储分析

说明:该过程基于上述案例producer.send方法追踪的源码执行流程,源码中的过程相对清楚,涉及的核心流程如下。

1、消息生成过程

Producer发送消息采用的是异步发送的方式,消息发送过程如下:

  • Producer发送消息之后,经过拦截器,序列化,事务判断;
  • 流程执行后,消息内容放入容器中;
  • 容器在指定时间内如果装满(size),会唤醒Sender线程;
  • 容器如果在指定时间内没有装满,也会执行一次Sender线程唤醒;
  • 唤醒Sender线程之后,把容器数据拉取到topic中;

絮叨一句:读这些中间件的源码,不仅能开阔思维,也会让自己意识到平时写的代码可能真的叫搬砖。

2、存储机制

Kafka中消息是以topic进行标识分类,生产者面向topic生产消息,topic分区(partition)是物理上的存储,基于消息日志文件的方式。

  • 每个partition对应于一个log文件,发送的消息不断追加到该log文件末端;
  • log文件中存储的就是producer生产的消息数据,采用分片和索引机制;
  • partition分为多个segment。每个segment对应两个(.index)和(.log)文件;
  • index文件类型存储的索引信息;
  • log文件存储消息的数据;
  • 索引文件中的元数据指向对应数据文件中message的物理偏移地址;
  • 消费者组中的每个消费者,都会实时记录消费的消息offset位置;
  • 当然消息消费出错时,恢复是从上次的记录位置继续消费;

3、事务控制机制

Kafka支持消息的事务控制

Producer事务

跨分区跨会话的事务原理,引入全局唯一的TransactionID,并将Producer获得的PID和TransactionID绑定。Producer重启后可以通过正在进行的TransactionID获得原来的PID。 Kafka基于TransactionCoordinator组件管理Transaction,Producer通过和TransactionCoordinator交互获得TransactionID对应的任务状态。TransactionCoordinator将事务状态写入Kafka的内部Topic,即使整个服务重启,进行中的事务状态可以得到恢复。

Consumer事务

Consumer消息消费,事务的保证强度很低,无法保证消息被精确消费,因为同一事务的消息可能会出现重启后已经被删除的情况。


推荐关联阅读:


推荐阅读:源码 -> GitHub || GitEE

数据源管理 | 基于JDBC模式,适配和管理动态数据源

数据源管理 | 基于DataX组件,同步数据和源码分析

数据源管理 | OLAP查询引擎,ClickHouse集群化管理

数据源管理 | 主从库动态路由,AOP模式读写分离

数据源管理 | 关系型分库分表,列式库分布式计算

融云发送自定义消息_数据源管理 | Kafka集群环境搭建,消息存储机制详解相关推荐

  1. 数据源管理 | Kafka集群环境搭建,消息存储机制详解

    本文源码:GitHub·点这里 || GitEE·点这里 一.Kafka集群环境 1.环境版本 版本:kafka2.11,zookeeper3.4 注意:这里zookeeper3.4也是基于集群模式部 ...

  2. 高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper)

    高吞吐消息中间件Kafka集群环境搭建(3台kafka,3台zookeeper) 一.集群搭建要求 1.搭建设计 2.分配六台Linux,用于安装拥有三个节点的Kafka集群和三个节点的Zookeep ...

  3. mysql集群多管理节点_项目进阶 之 集群环境搭建(三)多管理节点MySQL集群

    多管理节点MySQL的配置很easy,仅须要改动之前的博文中提高的三种节点的三个地方. 1)改动管理节点配置 打开管理节点C:\mysql\bin下的config.ini文件,将当中ndb_mgmd的 ...

  4. 大数据 -- zookeeper和kafka集群环境搭建

    一 运行环境 从阿里云申请三台云服务器,这里我使用了两个不同的阿里云账号去申请云服务器.我们配置三台主机名分别为zy1,zy2,zy3. 我们通过阿里云可以获取主机的公网ip地址,如下: 通过secu ...

  5. Kafka集群环境搭建

    首先准备至少三台虚拟机. 每台虚拟机解压下载好的kafka压缩包并重命名 cd /usr/local wget http://mirror.bit.edu.cn/apache/kafka/1.0.0/ ...

  6. 09 Confluent_Kafka权威指南 第九章:管理kafka集群

    文章目录 CHAPTER 9 Administering Kafka 管理kafka Topic Operations 主题操作 Creating a New Topic 创建新的topic Spec ...

  7. php连接kafka集群,Kafka集群环境配置

    Kafka集群环境配置 1 环境准备 1.1 集群规划 Node02 Node03 Node04 zk zk zk kafka kafka kafka 1.2 jar包下载 安装包:kafka_2.1 ...

  8. 搭建Kafka集群环境

    计划使用三台主机:11.12.112.206.11.12.112.207.11.12.112.208搭建Kafka集群环境, 使用的zookeeper集群为:11.12.112.215:2181,11 ...

  9. Kafka:ZK+Kafka+Spark Streaming集群环境搭建(九)安装kafka_2.11-1.1.0

    如何搭建配置centos虚拟机请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.& ...

最新文章

  1. 分布式系统理论之两阶段提交协议
  2. Python生产环境部署(fastcgi,uwsgi)
  3. Santa Cruz 上手测试,动作控制器自由度惊人
  4. 禁用微信 webview 调整字体大小
  5. [OS复习]操作系统综述2
  6. boost::math模块两个 Lambert W 函数的最基本调用示例
  7. HDU1506(天然的笛卡尔树)
  8. PHP----练习-----新闻管理----增删改查
  9. python定时任务是异步的吗_python异步实现定时任务和周期任务的方法
  10. 【错误分析】NX error status: 32
  11. CCF NOI1121 逆波兰表达式
  12. 技术干货 | 六分钟学会使用 HBuilder 引入构建 mPaaS 小程序
  13. 自己动手写网络爬虫-----(1)
  14. 配置cts performance最优范围
  15. java throw throwable_异常处理:throw,throws和Throwable
  16. GSoC: GitHub Checks API 项目第一阶段总结
  17. 敏俊物联MJIOT-AMB-03 RTL8710BN 高性能wifi模块
  18. 阿里云ECS每天一件事D1:配置SSH
  19. matlab相机标定Options选项解析
  20. -5.5V~-1.4V,DC-DC电源芯片,电荷泵芯片,负压芯片。负压稳定,纹波小,电路简单。

热门文章

  1. 文件怎么上传远程服务器,怎么上传文件到远程服务器
  2. r语言kendall协和系数_数据挖掘|R相关性分析及检验
  3. java 内部类 抽象类_Java 内部类、匿名内部类、抽象类
  4. npm WARN saveError ENOENT: no such file or directory, open ‘/Users/....../package.json‘的解决办法
  5. Python工具包werkzeug
  6. shell编程中的 ${ }强大功能
  7. 在html表单中设置数组的方法是什么,js如何处理表单中的数组方式?
  8. 什么是负边沿触发_晶闸管的导通条件是什么 晶闸管(可控硅)检测方法
  9. Java没有panel_java panel 问题
  10. 微型计算机总线不包括,微型计算机总线不包括( )。