1. 前言

librdkafka提供的异步的生产接口,异步的消费接口和同步的消息接口,没有同步的生产接口。

2. 缩略语

缩略语

缩略语全称

示例或说明

rd

Rapid Development

rd.h

rk

RdKafka

toppar

Topic Partition

struct rd_kafka_toppar_t { };

rep

Reply,

struct rd_kafka_t {   rd_kafka_q_t *rk_rep };

msgq

Message Queue

struct rd_kafka_msgq_t { };

rkb

RdKafka Broker

Kafka代理

rko

RdKafka Operation

Kafka操作

rkm

RdKafka Message

Kafka消息

payload

存在Kafka上的消息(或叫Log)

3. 配置和主题

3.1. 配置和主题结构

3.1.1. Conf

配置接口,配置分两种:全局的和主题的。

3.1.2. ConfImpl

配置的实现。

3.1.3. Topic

主题接口。

3.1.4. TopicImpl

主题的实现。

4. 线程

RdKafka编程涉及到三类线程:

1) 应用线程,业务代码的实现

2) Kafka Broker线程rd_kafka_broker_thread_main,负责与Broker通讯,多个

3) Kafka Handler线程rd_kafka_thread_main,每创建一个consumer或producer即会创建一个Handler线程。

5. 消费者

5.1. 消费者结构

5.1.1. Handle

定义了poll等接口,它的实现者为HandleImpl。

5.1.2. HandleImpl

实现了消费者和生产者均使用的poll等,其中poll的作用为:

1) 为生产者回调消息发送结果;

2) 为生产者和消费者回调事件。

class Handle {/*** @brief Polls the provided kafka handle for events.** Events will trigger application provided callbacks to be called.** The \p timeout_ms argument specifies the maximum amount of time* (in milliseconds) that the call will block waiting for events.* For non-blocking calls, provide 0 as \p timeout_ms.* To wait indefinately for events, provide -1.** Events:*   - delivery report callbacks (if an RdKafka::DeliveryCb is configured) [producer]*   - event callbacks (if an RdKafka::EventCb is configured) [producer & consumer]** @remark  An application should make sure to call poll() at regular*          intervals to serve any queued callbacks waiting to be called.** @warning This method MUST NOT be used with the RdKafka::KafkaConsumer,*          use its RdKafka::KafkaConsumer::consume() instead.** @returns the number of events served.*/virtual int poll(int timeout_ms) = 0;};

5.1.3. ConsumeCb

只针对消费者的Callback。

5.1.4. RebalanceCb

只针对消费者的Callback。

5.1.5. EventCb

消费者和生产者均可设置EventCb,如:_global_conf->set("event_cb", &_event_cb, errmsg);。


/*** @brief Event callback class** Events are a generic interface for propagating errors, statistics, logs, etc* from librdkafka to the application.** @sa RdKafka::Event*/class RD_EXPORT EventCb {public:/*** @brief Event callback** @sa RdKafka::Event*/virtual void event_cb (Event &event) = 0;virtual ~EventCb() { }};/*** @brief Event object class as passed to the EventCb callback.*/class RD_EXPORT Event {public:/** @brief Event type */enum Type {EVENT_ERROR,     /**< Event is an error condition */EVENT_STATS,     /**< Event is a statistics JSON document */EVENT_LOG,       /**< Event is a log message */EVENT_THROTTLE   /**< Event is a throttle level signaling from the broker */};};

5.1.6. Consumer

简单消息者,一般不使用,而是使用KafkaConsumer。

5.1.7. KafkaConsumer

消费者和生产者均采用多重继承方式,其中KafkaConsumer为消费者接口,KafkaConsumerImpl为消费者实现。

5.1.8. KafkaConsumerImpl

KafkaConsumerImpl为消费者实现。

5.1.9. rd_kafka_message_t

消息结构。

5.1.10. rd_kafka_msg_s

消息结构,但消息数据实际存储在rd_kafka_message_t,结构大致如下:

struct rd_kafka_msg_s{rd_kafka_message_t rkm_rkmessage;struct{rd_kafka_msg_s* tqe_next;rd_kafka_msg_s** tqe_prev;int64_t rkm_timestamp;rd_kafka_timestamp_type_t rkm_tstype;}rkm_link;};

5.1.11. rd_kafka_msgq_t

存储消息的消息队列,生产者生产的消息并不直接socket发送到brokers,而是放入了这个队列,结构大致如下:

struct rd_kafka_msgq_t{struct{rd_kafka_msg_s* tqh_first; // 队首rd_kafka_msg_s* tqh_last;  // 队尾};// 消息个数rd_atomic32_t rkmq_msg_cnt;// 所有消息加起来的字节数rd_atomic64_t rkmq_msg_bytes;};

5.1.12. rd_kafka_toppar_t

Topic-Partition队列,很复杂的一个结构,部分内容如下:


// Topic + Partition combinationtypedef struct rd_kafka_toppar_s{struct{rd_kafka_toppar_s* tqe_next;rd_kafka_toppar_s** tqe_prev;}rktp_rklink;struct{rd_kafka_toppar_s* tqe_next;rd_kafka_toppar_s** tqe_prev;}rktp_rkblink;struct{rd_kafka_toppar_s* cqe_next;rd_kafka_toppar_s* cqe_prev;}rktp_fetchlink;struct{rd_kafka_toppar_s* tqe_next;rd_kafka_toppar_s** tqe_prev;}rktp_rktlink;struct{rd_kafka_toppar_s* tqe_next;rd_kafka_toppar_s** tqe_prev;}rktp_cgrplink;rd_kafka_itopic_t* rktp_rkt;int32_t rktp_partition;int32_t rktp_leader_id;rd_kafka_broker_t* rktp_leader;rd_kafka_broker_t* rktp_next_leader;rd_refcnt_t rktp_refcnt;rd_kafka_msgq_t rktp_msgq; // application->rdkafka queue}rd_kafka_toppar_t;

6. 生产者

6.1. 生产者结构

6.1.1. DeliveryReportCb

消息已经成功递送到Broker时回调,只针对生产者有效。

6.1.2. PartitionerCb

计算分区号回调函数,只针对生产者有效。

6.1.3. Producer

Producer为生产者接口,它的实现者为ProducerImpl。

6.1.4. ProduceImpl

ProducerImpl为生产者的实现。

6.2. 生产者启动过程1

启动时会创建两组线程:一组Broker线程(rd_kafka_broker_thread_main,多个),实为与Broker间的网络IO线程;一组Handler线程(rd_kafka_thread_main,单个),每调用一次RdKafka::Producer::create或rd_kafka_new即创建一Handler线程。

Handler线程调用栈:


(gdb) t 17[Switching to thread 17 (Thread 0x7ff7059d3700 (LWP 16765))]#0  0x00007ff7091e6cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0(gdb) bt#0  0x00007ff7091e6cf2 in pthread_cond_timedwait@@GLIBC_2.3.2 () from /lib64/libpthread.so.0#1  0x00000000005b4d2f in cnd_timedwait_ms (cnd=0x1517748, mtx=0x1517720, timeout_ms=898) at tinycthread.c:501#2  0x0000000000580e16 in rd_kafka_q_serve (rkq=0x1517720, timeout_ms=898, max_cnt=0, cb_type=RD_KAFKA_Q_CB_CALLBACK, callback=0x0, opaque=0x0) at rdkafka_queue.c:440#3  0x000000000054ee9b in rd_kafka_thread_main (arg=0x1516df0) at rdkafka.c:1227#4  0x00000000005b4e0f in _thrd_wrapper_function (aArg=0x15179d0) at tinycthread.c:624#5  0x00007ff7091e2e25 in start_thread () from /lib64/libpthread.so.0#6  0x00007ff7082d135d in clone () from /lib64/libc.so.6

6.3. 生产者启动过程2

创建网络IO线程,消费者启动过程类似,只是一个调用rd_kafka_broker_producer_serve(rkb),另一个调用rd_kafka_broker_consumer_serve(rkb)。

IO线程负责消息的收和发,发送底层调用的是sendmsg,收调用的是recvmsg(但MSVC平台调用send和recv)。

6.4. 生产者生产过程

生产者生产的消息并不直接socket发送到brokers,而是放入队列rd_kafka_msgq_t中。Broker线程(rd_kafka_broker_thread_main)消费这个队列。

Broker线程同时监控与Broker间的网络连接,又要监控队列中是否有数据,如何实现的?这个队列和管道绑定在一起的,绑定的是管道写端(rktp->rktp_msgq_wakeup_fd = rkb->rkb_toppar_wakeup_fd; rkb->rkb_toppar_wakeup_fd=rkb->rkb_wakeup_fd[1])。

这样Broker线程即可同时监听网络数据和管道数据。

// int rd_kafka_msg_partitioner(rd_kafka_itopic_t *rkt, rd_kafka_msg_t *rkm,int do_lock)(gdb) p *rkm$7 = {rkm_rkmessage = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x1590c10, partition = 1, payload = 0x7f48c4001260, len = 203, key = 0x7f48c400132b, key_len = 14, offset = 0,_private = 0x0}, rkm_link = {tqe_next = 0x5b5d47554245445b, tqe_prev = 0x6361667265746e69}, rkm_flags = 196610, rkm_timestamp = 1524829399009,rkm_tstype = RD_KAFKA_TIMESTAMP_CREATE_TIME, rkm_u = {producer = {ts_timeout = 16074575505526, ts_enq = 16074275505526}}}(gdb) p rkm->rkm_rkmessage$8 = {err = RD_KAFKA_RESP_ERR_NO_ERROR, rkt = 0x1590c10, partition = 1, payload = 0x7f48c4001260, len = 203, key = 0x7f48c400132b, key_len = 14, offset = 0, _private = 0x0}(gdb) p rkm->rkm_rkmessage->payload$9 = (void *) 0x7f48c4001260(gdb) p (char*)rkm->rkm_rkmessage->payload$10 = 0x7f48c4001260 "{\"p\":\"f\",\"o\":1,\"d\":\"m\",\"d\":\"m\",\"i\":\"f2\",\"ip\":\"127.0.0.1\",\"pt\":2018,\"sc\":0,\"fc\":1,\"tc\":0,\"acc\":395,\"mcc\":395,\"cd\":\"test\",\"cmd\":\"tester\",\"cf\":\"main\",\"cp\":\"1.49.16.9"...

7. poll过程

poll的作用是触发回调,生产者即使不调用poll,消息也会发送出去,但是如果不通过poll触发回调,则不能确定消息发送状态(成功或失败等)。

消费队列rd_kafka_t->rk_rep,rk_rep为响应队列,类型为rd_kafka_q_t或rd_kafka_q_s:

Kafka C++客户端库librdkafka详解相关推荐

  1. Kafka C++客户端库librdkafka笔记

    目录 目录 1 1. 前言 2 2. 缩略语 2 3. 配置和主题 3 3.1. 配置和主题结构 3 3.1.1. Conf 3 3.1.2. ConfImpl 3 3.1.3. Topic 3 3. ...

  2. Python爬虫之selenium库使用详解

    Python爬虫之selenium库使用详解 本章内容如下: 什么是Selenium selenium基本使用 声明浏览器对象 访问页面 查找元素 多个元素查找 元素交互操作 交互动作 执行JavaS ...

  3. python时间函数详解_Python:Numpy库基础分析——详解datetime类型的处理

    原标题:Python:Numpy库基础分析--详解datetime类型的处理 Python:Numpy库基础分析--详解datetime类型的处理 关于时间的处理,Python中自带的处理时间的模块就 ...

  4. Zookeeper客户端Curator使用详解

    http://www.jianshu.com/p/70151fc0ef5d Zookeeper客户端Curator使用详解 简介 Curator是Netflix公司开源的一套zookeeper客户端框 ...

  5. linux mysql 静态库_Linux静态库与动态库实例详解

    Linux静态库与动态库实例详解 1. Linux 下静态链接库编译与使用 首先编写如下代码: // main.c #include "test.h" int main(){ te ...

  6. python可以处理多大的数据_科多大数据之Python基础教程之Excel处理库openpyxl详解...

    原标题:科多大数据之Python基础教程之Excel处理库openpyxl详解 科多大数据小课堂来啦~Python基础教程之Excel处理库openpyxl详解 openpyxl是一个第三方库,可以处 ...

  7. NodeMCU 之 U8G2 库使用详解

    NodeMCU 之 U8G2 库使用详解 1.指令 u8g2.clearDisplay(); // 清除显示数据及屏幕u8g2.clearBuffer(); // 清Buffer缓冲区的数据u8g2. ...

  8. cJSON库用法详解

    cJSON库用法详解_宁静致远2021的博客-CSDN博客_cjson cJSON库用法详解 问题和需要注意的地方 一.JSON.cJSON简介 1. JSON 简介 2. JSON 语法 3. 开源 ...

  9. STC8H开发(一): 在Keil5中配置和使用FwLib_STC8封装库(图文详解)

    目录 STC8H开发(一): 在Keil5中配置和使用FwLib_STC8封装库(图文详解) STC8H开发(二): 在Linux VSCode中配置和使用FwLib_STC8封装库(图文详解) ST ...

  10. python的excell库_扣丁学堂Python基础教程之Excel处理库openpyxl详解

    扣丁学堂Python基础教程之Excel处理库openpyxl详解 2018-05-04 09:49:49 3197浏览 openpyxl是一个第三方库,可以处理xlsx格式的Excel文件.pipi ...

最新文章

  1. linux vsftp的配置
  2. 服务端工程师入门与进阶 Java 版
  3. HarmonyOS之数据管理·关系型数据库的应用
  4. redirect java 配置_Java从后台重定向(redirect)到另一个项目的方法
  5. 获取结构体某成员偏移
  6. WPF界面设计中常用的一些代码片段及属性
  7. GB7714-1987文后参考文献著录规则
  8. IDEA Maven 使用教程
  9. Pyrene-PEG-Biotin,芘丁酸聚乙二醇生物素,Biotin-PEG-Pyrene
  10. 国家高新技术企业认定知识产权这样拿高分
  11. 网易云课堂吴恩达Andrew Ng深度学习笔记(二)
  12. 全球及中国智能家居市场十四五竞争形势及营销模式咨询报告2021-2027年
  13. Matlab数字信号处理的仿真系统(具有界面)
  14. 服务器维护后稀有怪刷新,北极稀有怪刷新规律 时光龙能100%取得?
  15. Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: You hav
  16. 牛客网试题+答案分析+大牛面试经验(12)
  17. 阿龙学堂-中缀-后缀表达式的计算
  18. 一次使用ffmpeg将多张图片合成视频的经历
  19. app是互联网信息服务器地址,app的服务器地址
  20. Android对话框(普通对话框、单选对话框、多选对话框、进度条对话框)

热门文章

  1. 【数据库】数据库的锁机制及原理
  2. 格雷码与二进制的转换
  3. Apache自带的ab压力测试工具用法详解
  4. java pdf 转tif_JAVA中 PDF文件转成TIFF文件的2种方式
  5. Vue select默认选中第一个
  6. windows捕获串口数据_如何下载和安装用于Windows数据包捕获的Npcap库?
  7. c语言的舞蹈机器人开题报告范文,程序设计开题报告
  8. CCNA考试题库中英文翻译版及答案16
  9. android直播刷礼物特效,Android直播送礼物发消息页面(仿印客直播)
  10. Android 直播礼物动画实现之SVGA动画