基于 librdkafka C API 的三种seek随机访问方法
尽管Kafka一般意义上都是建议顺序的消费数据,但难免会遇到回滚、重新处理等需求。甚至有些应用希望把kafka当做一个缓存来用,比如保留1天内的近时的数据记录,并支持各个消费者通过拖拽进度条的方式来查看。这个时候,当然就需要进行seek操作。
在librdkafka的新C语言API中,建议使用函数 rd_kafka_subscribe 在一个线程中处理多个 topic 的请求,并把以前的逐topic的seek函数 rd_kafka_seek 标记为“deprecated”,提示使用新的rd_kafka_seek_partitions来处理。 librdkafka 这种批量消费主题的策略,可以在重平衡时,用正则式匹配新创建的专题。但这种高级的功能,是对底层请求API进行了更深的封装,使得我们“摸到”真正的 topic:partition绑定的时机更难把握,搞不好就没有效果。
本文介绍最常见的三个seek的时机和方法,希望对初学者有所帮助。感谢我学弟的帮助,对Kafka的各类seek行为进行了深入的测试。
1. 消费前指定偏移
在消费前,创建专题分区描述表时,可以指定偏移。在官方的例子里,使用的是 rd_kafka_subscribe函数来登记topics,这样,是无需指定具体消费什么分区的。因此,用 RD_KAFKA_PARTITION_UA 作为分区的标记,参考下面代码:
rd_kafka_t *rk; /* Consumer instance handle */rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics *///...rd_kafka_poll_set_consumer(rk);/* Convert the list of topics to a format suitable for librdkafka */subscription = rd_kafka_topic_partition_list_new(topic_cnt);for (i = 0; i < topic_cnt; i++){rd_kafka_topic_partition_list_add(subscription, topics[i],/* the partition is ignored by subscribe() */RD_KAFKA_PARTITION_UA);}rd_kafka_subscribe(rk, subscription);
本方法,需要提前知道自己想消费的分区的ID,并显式指定,并用分配函数 rd_kafka_assign 取代 rd_kafka_subscribe,使得server能够立刻把确切的topic-partition绑定到这个消费者. 同时,还有必要通过rd_kafka_query_watermark_offsets提前获取offset的合理范围。代码如下:
rd_kafka_t *rk; /* Consumer instance handle */rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics *///...rd_kafka_poll_set_consumer(rk);/* Convert the list of topics to a format suitable for librdkafka */subscription = rd_kafka_topic_partition_list_new(topic_cnt);for (i = 0; i < topic_cnt; i++){int nPartID = my_get_part_id(topics[i]);/*second place can seek a partion. query water mark and seek*/int64_t low, high;rd_kafka_query_watermark_offsets(rk,topics[i], 0, &low, &high, 5000);rd_kafka_topic_partition_list_add(subscription, topics[i],nPartID)->offset = (low+high)/2;}rd_kafka_assign(rk, subscription);
这种指定方式的特点:
- 可以立刻从offset消费
- 即使上次已经消费到末尾,也能马上回到特定的offset
- 在当前rk的生命周期内只能设置一次。设置多次可以用更宏观的生命周期包裹rk.
- 必须指明分区号。一般通过特定的策略人为指定。
2. 在 rebalance回调中分配
在重新平衡时,可以有机会触摸到当前的专题分区表。
void myrebalance(rd_kafka_t *rk,rd_kafka_resp_err_t err,rd_kafka_topic_partition_list_t *partitions,void *opaque)
{//first place can seek a partion, when using rd_kafka_subscribefor (int i=0;i<partitions->cnt;++i){if(!is_my_topic(partitions->elems[i].topic))continue;int64_t low, high;rd_kafka_query_watermark_offsets(rk,partitions->elems[i].topic, partitions->elems[i].partition, &low, &high, 5000); partitions->elems[i].offset = (low + high)/2;}auto e = rd_kafka_assign(rk,partitions);puts(rd_kafka_err2str(e));
}int consumer(...)
{//...rd_kafka_t *rk; /* Consumer instance handle */rd_kafka_topic_partition_list_t *subscription; /* Subscribed topics *///设置回调!rd_kafka_conf_set_rebalance_cb(conf,myrebalance);rd_kafka_poll_set_consumer(rk);/* Convert the list of topics to a format suitable for librdkafka */subscription = rd_kafka_topic_partition_list_new(topic_cnt);for (i = 0; i < topic_cnt; i++){rd_kafka_topic_partition_list_add(subscription, topics[i],/* the partition is ignored by subscribe() */RD_KAFKA_PARTITION_UA);}rd_kafka_subscribe(rk, subscription);
}
这种指定方式的特点:
- 无法立刻从offset消费。只有等到服务方回调才能设置
- 即使上次已经消费到末尾,也能回到特定的offset,但要等待时机
- 在rk的生命周期内可以发挥多次作用。
- 无需指明分区号,分区号会携带给调用者。
3. 在消费时指定
第三种方法,就是在消费的过程中指定。因为即使 rd_kafka_subscribe 没有指定分区,在消费时,也会在元数据中携带确切的分区。
while (run) {rd_kafka_message_t *rkm;rkm = rd_kafka_consumer_poll(rk, 100);if (!rkm)continue; /* Timeout: */if (needSeek){rd_kafka_seek(rkm->rkt,rkm->partition,getNewOffset(),100);}//...}
这种指定方式的特点:
- 在消费活跃时,可以立即设置offset,并在下一包回到offset
- 如果已经消费到末尾,则无法设置(运行不到这里)。
- 可以发挥多次作用(除非消费到末尾)。
- 无需指明分区号,分区号会携带给调用者。
4. 总结
可见,kafka seek的行为,是由它的原理决定的。
- 只有消费者已经assign到确切的partition,才能修改offset
- assign partition的行为可以是手动的(第一种),也可以是自动的(2、3)
- 当自动assign时,必须选取一个时机,获取当前的分区号,才能设置offset
- 最佳的方法是联合使用1、2、3这三种策略。
基于 librdkafka C API 的三种seek随机访问方法相关推荐
- 基于STM32F407最小系统板三种矩阵键盘实现方法
这里采用的八个端口为PA0-PA7. 此处先给出矩阵键盘的原理图: 一.八个端口采用开漏输出,配置上拉电阻,实现同51一样的双向IO口功能. //按键初始化函数 void KEY_Init(void) ...
- Action访问Servlet API的三种方法
一.为什么要访问Servlet API ? Struts2的Action并未与Servlet API进行耦合,这是Struts2 的一个改良,从而方便了单独对Action进行测试.但是对于Web控制器 ...
- ML之FE:数据处理—特征工程之特征选择常用方法之基于搜索策略的三种分类、基于评价准则划分的三种分类(Filter/Wrapper/Embedded)及其代码实现
ML之FE:数据处理-特征工程之特征选择常用方法之基于搜索策略的三种分类.基于评价准则划分的三种分类(Filter/Wrapper/Embedded)及其代码实现 目录 Wrapper包裹式/封装式- ...
- matlab win8打不开了,win8打不开防火墙的三种原因和解决方法
win8系统自带有防火墙功能,开启防火墙可以过滤不安全的服务而减低风险极大地提高内部网络的安全性.不过有win8系统用户说防火墙打不开,也不知道哪里出现问题了,其实导致win8打不开防火墙的原因有很多 ...
- vue三种调用接口的方法
注:此博客仅用于学习,自己还处于菜鸟阶段,希望给相同处境的人提供一个可参考的博客.如果您觉得不合理,您的指导,非常欢迎,但请不要否定别人的努力,谢谢您了! vue三种调用接口的方法 1. this.$ ...
- 三种权重的初始化方法
总结了三种权重的初始化方法,前两种比较常见,后一种是最新的. 1. Gaussian Weights are randomly drawn from Gaussian distributions wi ...
- day01 js三种导入html的方法、js书写规范、变量的基本使用、变量提升
昨天是初学js的第一天,为什么今天才写,我觉得这样可以帮助我复习昨天的知识,加深对js的理解. 我之前学过java的,昨天转入js的学习,对js略有些体会和大家分享下,js刚入门感觉js相对于java ...
- (转)Putty server refused our key的三种原因和解决方法
(转)Putty server refused our key的三种原因和解决方法 参考文章: (1)(转)Putty server refused our key的三种原因和解决方法 (2)http ...
- tomcat中三种部署项目的方法(转)
tomcat中三种部署项目的方法 第一种方法:在tomcat中的conf目录中,在server.xml中的,<host/>节点中添加: <Context path="/h ...
最新文章
- docker实战部署Javaweb项目
- emiya-canvas.js 解决ios下拍照倾斜与canvas高清屏下绘图模糊问题 1
- SetNoneScaleMutableGraph
- 点钞机语音怎么打开_微信语音怎么转发?原来方法这么简单,你还不知道吗
- linux usb驱动u盘启动不了,Linux环境下USB的原理、驱动和配置(4)
- 方舟编译器的安装和编译Helloword
- QT笔记之VS2010 Qt中导入qrc资源文件
- Linux编程训练网站,OK6410汇编程序练习
- java ajax文字搜素,JAVA-WEB AJAX 搜索条自动提示
- 常见的SAS接口类型、接口连接器外观详细解读
- 360笔试题-字符置换
- 拓端tecdat|在R语言中使用概率分布:dnorm,pnorm,qnorm和rnorm
- Silverlight 4常用StringFormat格式总结(转)
- c语言求区间内素数个数_C语言题目
- Selenium + C# 实现模拟百度贴吧签到 1
- ubuntu18.04使用网易云音乐 ubuntu网易云音乐打不开怎么办? ubuntu安装网易云音乐
- 基于VUE + Echarts 实现可视化数据大屏智慧校园可视化
- Git Commit Message校验踩坑指南
- 解决 SQLite报错:OperationalError: row value misused
- CF546C. Soldier and Cards(队列+模拟)