kafka学习笔记:知识点整理

一、为什么需要消息系统

1.解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
2.冗余:消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
3.扩展性:因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
4.灵活性 & 峰值处理能力:在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
5.可恢复性:系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
6.顺序保证:在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
7.缓冲:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
8.异步通信:很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

二、kafka 架构

2.1 拓扑结构

如下图:

图.1

2.2 相关概念

如图.1中,kafka 相关名词解释如下:

1.producer:消息生产者,发布消息到 kafka 集群的终端或服务。
2.broker:kafka 集群中包含的服务器。
3.topic:每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
4.partition:partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
5.consumer:从 kafka 集群中消费消息的终端或服务。
6.Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
7.replica:partition 的副本,保障 partition 的高可用。
8.leader:replica 中的一个角色, producer 和 consumer 只跟 leader 交互。
9.follower:replica 中的一个角色,从 leader 中复制数据。
10.controller:kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。(失效备援 (为系统备援能力的一种,当系统中其中一项设备失效而无法运作时,另一项设备即可自动接手原失效系统所执行的工作);)
12.zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。

2.3 zookeeper 节点

kafka 在 zookeeper 中的存储结构如下图所示:

图.2

三、producer 发布消息

3.1 写入方式

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

3.2 消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

1. 指定了 patition,则直接使用;
2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
3. patition 和 key 都未指定,使用轮询选出一个 patition。

附上 java 客户端分区源码,一目了然:

//创建消息实例
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {if (topic == null)throw new IllegalArgumentException("Topic cannot be null");if (timestamp != null && timestamp < 0)throw new IllegalArgumentException("Invalid timestamp " + timestamp);this.topic = topic;this.partition = partition;this.key = key;this.value = value;this.timestamp = timestamp;
}//计算 patition,如果指定了 patition 则直接使用,否则使用 key 计算
private int partition(ProducerRecord<K, V> record, byte[] serializedKey , byte[] serializedValue, Cluster cluster) {Integer partition = record.partition();if (partition != null) {List<PartitionInfo> partitions = cluster.partitionsForTopic(record.topic());int lastPartition = partitions.size() - 1;if (partition < 0 || partition > lastPartition) {throw new IllegalArgumentException(String.format("Invalid partition given with record: %d is not in the range [0...%d].", partition, lastPartition));}return partition;}return this.partitioner.partition(record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}// 使用 key 选取 patition
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);int numPartitions = partitions.size();if (keyBytes == null) {int nextValue = counter.getAndIncrement();List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);if (availablePartitions.size() > 0) {int part = DefaultPartitioner.toPositive(nextValue) % availablePartitions.size();return availablePartitions.get(part).partition();} else {return DefaultPartitioner.toPositive(nextValue) % numPartitions;}} else {//对 keyBytes 进行 hash 选出一个 patitionreturn DefaultPartitioner.toPositive(Utils.murmur2(keyBytes)) % numPartitions;}
}

3.3 写入流程

producer 写入消息序列图如下所示:

图.3

流程说明:

1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
2. producer 将消息发送给该 leader
3. leader 将消息写入本地 log
4. followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

3.4 producer delivery guarantee

一般情况下存在三种情况:

1. At most once 消息可能会丢,但绝不会重复传输
2. At least one 消息绝不会丢,但可能会重复传输
3. Exactly once 每条消息肯定会被传输一次且仅传输一次

当 producer 向 broker 发送消息时,一旦这条消息被 commit,由于 replication 的存在,它就不会丢。但是如果 producer 发送数据给 broker 后,遇到网络问题而造成通信中断,那 Producer 就无法判断该条消息是否已经 commit。虽然 Kafka 无法确定网络故障期间发生了什么,但是 producer 可以生成一种类似于主键的东西,发生故障时幂等性的重试多次,这样就做到了 Exactly once,但目前还并未实现。所以目前默认情况下一条消息从 producer 到 broker 是确保了 At least once,可通过设置 producer 异步发送实现At most once。

四、broker 保存消息

4.1 存储方式

物理上把 topic 分成一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的所有消息和索引文件),如下:

图.4

4.2 存储策略

无论消息是否被消费,kafka 都会保留所有消息。有两种策略可以删除旧数据:

1. 基于时间:log.retention.hours=168
2. 基于大小:log.retention.bytes=1073741824

需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

4.3 topic 创建与删除

4.3.1 创建 topic

创建 topic 的序列图如下所示:

图.5

流程说明:

1. controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被创建,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. controller从 /brokers/ids 读取当前所有可用的 broker 列表,对于 set_p 中的每一个 partition:2.1 从分配给该 partition 的所有 replica(称为AR)中任选一个可用的 broker 作为新的 leader,并将AR设置为新的 ISR2.2 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
3. controller 通过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

4.3.2 删除 topic

删除 topic 的序列图如下所示:

图.6

流程说明:

1. controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会通过 watch 得到该 topic 的 partition/replica 分配。
2. 若 delete.topic.enable=false,结束;否则 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 通过回调向对应的 broker 发送 StopReplicaRequest。

五、kafka HA

5.1 replication

如图.1所示,同一个 partition 可能会有多个 replica(对应 server.properties 配置中的 default.replication.factor=N)。没有 replica 的情况下,一旦 broker 宕机,其上所有 patition 的数据都不可被消费,同时 producer 也不能再将数据存于其上的 patition。引入replication 之后,同一个 partition 可能会有多个 replica,而这时需要在这些 replica 之间选出一个 leader,producer 和 consumer 只与这个 leader 交互,其它 replica 作为 follower 从 leader 中复制数据。

Kafka 分配 Replica 的算法如下:

1. 将所有 broker(假设共 n 个 broker)和待分配的 partition 排序
2. 将第 i 个 partition 分配到第(i mod n)个 broker 上
3. 将第 i 个 partition 的第 j 个 replica 分配到第((i + j) mode n)个 broker上

5.2 leader failover

当 partition 对应的 leader 宕机时,需要从 follower 中选举出新 leader。在选举新leader时,一个基本的原则是,新的 leader 必须拥有旧 leader commit 过的所有消息。

kafka 在 zookeeper 中(/brokers/.../state)动态维护了一个 ISR(in-sync replicas),由3.3节的写入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成员才能选为 leader。对于 f+1 个 replica,一个 partition 可以在容忍 f 个 replica 失效的情况下保证消息不丢失。

当所有 replica 都不工作时,有两种可行的方案:

1. 等待 ISR 中的任一个 replica 活过来,并选它作为 leader。可保障数据不丢失,但时间可能相对较长。
2. 选择第一个活过来的 replica(不一定是 ISR 成员)作为 leader。无法保障数据不丢失,但相对不可用时间较短。

kafka 0.8.* 使用第二种方式。

kafka 通过 Controller 来选举 leader,流程请参考5.3节。

5.3 broker failover

kafka broker failover 序列图如下所示:

图.7

流程说明:

1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
2. controller 从 /brokers/ids 节点读取可用broker
3. controller决定set_p,该集合包含宕机 broker 上的所有 partition
4. 对 set_p 中的每一个 partition4.1 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR4.2 决定新 leader(如4.3节所描述)4.3 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
5. 通过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

5.4 controller failover

当 controller 宕机时会触发 controller failover。每个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,所有存活的 broker 收到 fire 的通知,每个 broker 都尝试创建新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成如下操作:

1. 读取并增加 Controller Epoch。
2. 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
3. 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
4. 通过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
5. 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
6. 通过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
7. 初始化 ControllerContext 对象,设置当前所有 topic,“活”着的 broker 列表,所有 partition 的 leader 及 ISR等。
8. 启动 replicaStateMachine 和 partitionStateMachine。
9. 将 brokerState 状态设置为 RunningAsController。
10. 将每个 partition 的 Leadership 信息发送给所有“活”着的 broker。
11. 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
12. 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

6. consumer 消费消息

6.1 consumer API

kafka 提供了两套 consumer API:

1. The high-level Consumer API
2. The SimpleConsumer API

其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则需要开发人员更多地关注细节。

6.1.1 The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 可以是多线程的应用,应当注意:

1. 如果消费线程大于 patition 数量,则有些线程将收不到消息
2. 如果 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
3. 如果一个线程消费多个 patition,则无法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

6.1.2 The SimpleConsumer API

如果你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,比如:

1. 多次读取一个消息
2. 只消费一个 patition 中的部分消息
3. 使用事务来保证一个消息仅被消费一次

但是使用此 API 时,partition、offset、broker、leader 等对你不再透明,需要自己去管理。你需要做大量的额外工作:

1. 必须在应用程序中跟踪 offset,从而确定下一条应该消费哪条消息
2. 应用程序需要通过程序获知每个 Partition 的 leader 是谁
3. 需要处理 leader 的变更

使用 SimpleConsumer API 的一般流程如下:

1. 查找到一个“活着”的 broker,并且找出每个 partition 的 leader
2. 找出每个 partition 的 follower
3. 定义好请求,该请求应该能描述应用程序需要哪些数据
4. fetch 数据
5. 识别 leader 的变化,并对之作出必要的响应

以下针对 high-level Consumer API 进行说明。

6.2 consumer group

如 2.2 节所说, kafka 的分配单位是 patition。每个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),但是多个 group 可以同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可以使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还可以将数据备份到另一个数据中心,只需要保证这三者属于不同的 consumer group。如下图所示:

图.8

6.3 消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 可以自己控制消费方式——即可批量消费也可逐条消费,同时还能选择不同的提交方式从而实现不同的传输语义。

6.4 consumer delivery guarantee

如果将 consumer 设置为 autocommit,consumer 一旦读到数据立即自动 commit。如果只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并非在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:

1.读完消息先 commit 再处理消息。这种模式下,如果 consumer 在 commit 后还没来得及处理消息就 crash 了,下次重新开始工作后就无法读到刚刚已提交而未处理的消息,这就对应于 At most once
2.读完消息先处理再 commit。这种模式下,如果在处理完消息之后 commit 之前 consumer crash 了,下次重新开始工作时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
3.如果一定要做到 Exactly once,就需要协调 offset 和实际操作的输出。精典的做法是引入两阶段提交。如果能让 offset 和操作输入存在同一个地方,会更简洁和通用。这种方式可能更好,因为许多输出系统可能不支持两阶段提交。比如,consumer 拿到数据后可能把数据放到 HDFS,如果把最新的 offset 和数据本身一起写到 HDFS,那就可以保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,无法存于HDFS,而SimpleConsuemr API的 offset 是由自己去维护的,可以将之存于 HDFS 中)

总之,Kafka 默认保证 At least once,并且允许通过设置 producer 异步提交来实现 At most once(见文章《kafka consumer防止数据丢失》)。而 Exactly once 要求与外部存储系统协作,幸运的是 kafka 提供的 offset 可以非常直接非常容易得使用这种方式。

更多关于 kafka 传输语义的信息请参考《Message Delivery Semantics》。

6.5 consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法如下:

1. 将目标 topic 下的所有 partirtion 排序,存于PT
2. 对某 consumer group 下所有 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
3. N=size(PT)/size(CG),向上取整
4. 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
5. 将第i*N到(i+1)*N-1个 partition 分配给 Ci

在 0.8.*版本,每个 consumer 都只负责调整自己所消费的 partition,为了保证整个consumer group 的一致性,当一个 consumer 触发了 rebalance 时,该 consumer group 内的其它所有其它 consumer 也应该同时触发 rebalance。这会导致以下几个问题:

1.Herd effect任何 broker 或者 consumer 的增减都会触发所有的 consumer 的 rebalance
2.Split Brain每个 consumer 分别单独通过 zookeeper 判断哪些 broker 和 consumer 宕机了,那么不同 consumer 在同一时刻从 zookeeper 看到的 view 就可能不一样,这是由 zookeeper 的特性决定的,这就会造成不正确的 reblance 尝试。
3. 调整结果不可控所有的 consumer 都并不知道其它 consumer 的 rebalance 是否成功,这可能会导致 kafka 工作在一个不正确的状态。

基于以上问题,kafka 设计者考虑在0.9.*版本开始使用中心 coordinator 来控制 consumer rebalance,然后又从简便性和验证要求两方面考虑,计划在 consumer 客户端实现分配方案。(见文章《Kafka Detailed Consumer Coordinator Design》和《Kafka Client-side Assignment Proposal》),此处不再赘述。

七、注意事项

7.1 producer 无法发送消息的问题

最开始在本机搭建了kafka伪集群,本地 producer 客户端成功发布消息至 broker。随后在服务器上搭建了 kafka 集群,在本机连接该集群,producer 却无法发布消息到 broker(奇怪也没有抛错)。最开始怀疑是 iptables 没开放,于是开放端口,结果还不行(又开始是代码问题、版本问题等等,倒腾了很久)。最后没办法,一项一项查看 server.properties 配置,发现以下两个配置:

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = security_protocol://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://:9092

 # Hostname and port the broker will advertise to producers and consumers. If not set, 
 # it uses the value for "listeners" if configured. Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092

以上说的就是 advertised.listeners 是 broker 给 producer 和 consumer 连接使用的,如果没有设置,就使用 listeners,而如果 host_name 没有设置的话,就使用 java.net.InetAddress.getCanonicalHostName() 方法返回的主机名。

修改方法:

1. listeners=PLAINTEXT://121.10.26.XXX:9092
2. advertised.listeners=PLAINTEXT://121.10.26.XXX:9092

修改后重启服务,正常工作。关于更多 kafka 配置说明,见文章《Kafka学习整理三(borker(0.9.0及0.10.0)配置)》。

八、参考文章

1. 《Kafka剖析(一):Kafka背景及架构介绍》

2. 《Kafka设计解析(二):Kafka High Availability (上)》

3. 《Kafka设计解析(二):Kafka High Availability (下)》

4. 《Kafka设计解析(四):Kafka Consumer解析》

5. 《Kafka设计解析(五):Kafka Benchmark》

6. 《Kafka学习整理三(borker(0.9.0及0.10.0)配置)》

7. 《Using the High Level Consumer》

8. 《Using SimpleConsumer》

9. 《Consumer Client Re-Design》

10. 《Message Delivery Semantics》

11. 《Kafka Detailed Consumer Coordinator Design》

12. 《Kafka Client-side Assignment Proposal》

13. 《Kafka和DistributedLog技术对比》

14. 《kafka安装和启动》

15. 《kafka consumer防止数据丢失》

============================================================================================================================================================================================================================

Kafka介绍

Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志)Kafka就出现了。Kafka可以起到两个作用:

  1. 降低系统组网复杂度。
  2. 降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的作用。

Kafka主要特点:

(

ETL,是英文 Extract-Transform-Load 的缩写,用来描述将数据从来源端经过抽取(extract)、交互转换(transform)、加载(load)至目的端的过程。ETL一词较常用在数据仓库,但其对象并不限于数据仓库。

ETL是构建数据仓库的重要一环,用户从数据源抽取出所需的数据,经过数据清洗,最终按照预先定义好的数据仓库模型,将数据加载到数据仓库中去。

)

  1. 同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
  2. 可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
  3. 分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
  4. 消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
  5. 支持online和offline的场景。

Kafka的架构:

hadoop cluster

real-time monitoring

data warehouse

Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。几个基本概念:

  1. Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
  2. Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。
  3. Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
  4. Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
  5. Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
  6. Broker:缓存代理,Kafka集群中的一台或多台服务器统称为broker。

消息发送的流程:

  1. Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
  2. kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
  3. Consumer从kafka集群pull数据,并控制获取消息的offset

Kafka的设计:

1、吞吐量

高吞吐是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:

  1. 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
  2. zero-copy:减少IO操作步骤
  3. 数据批量发送
  4. 数据压缩
  5. Topic划分为多个partition,提高parallelism

负载均衡

  1. producer根据用户指定的算法,将消息发送到指定的partition
  2. 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
  3. 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
  4. 通过zookeeper管理broker与consumer的动态加入与离开

拉取系统

由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:

  1. 简化kafka设计
  2. consumer根据消费能力自主控制消息拉取速度
  3. consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等

可扩展性

当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。

Kafka的应用场景:

1.消息队列

比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并常常依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR或RabbitMQ。

2.行为跟踪

Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

3.元信息监控

作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4.日志收集

日志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5.流处理

这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom和Samza是非常著名的实现这种类型数据转换的框架。

6.事件源

事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7.持久性日志(commit log)

Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。

Kafka的设计要点:

1、直接使用linux 文件系统的cache,来高效缓存数据。

2、采用linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次。根据测试结果,可以提高60%的数据发送性能。Zero-Copy详细的技术细节可以参考:https://www.ibm.com/developerworks/linux/library/j-zerocopy/

3、数据在磁盘上存取代价为O(1)。kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一个逻辑log,有多个segment组成。每个segment中存储多条消息(见下图),消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。发布者发到某个topic的消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

4、显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。Producer和broker之间没有负载均衡机制。broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。

参考资料:

  • Apache Kafka网站
  • 项目设计讨论
  • Github镜像
  • Morten Kjetland对Apache Kafka的介绍
  • Quora上与RabbitMQ的对比
  • Kafka: a Distributed Messaging System for Log Processing
  • Zero-copy原理
  • Kafka与Hadoop

================================================================================================================================================================================================================================================================================================================================================================================================

文章源地址:https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md

librdkafka 是Apache  Kafka  客户端C语言的高性能实现, 能够提供可靠并且表现优秀的客户端,同时它也提供比较初级的C++界面。

Contents

本文主要包含以下章节:

一、性能

-性能指标

-高吞吐量

-低延迟

-压缩

二、消息可靠性

三、用法

-文档介绍

-初始化

-配置

-线程和回调函数

-brokers

-producer API

-consumer API

四、其他

-测试细节

一、性能

librdkafka库是多线程的,给现在硬件系统设计,并尽力实现最小的内存拷贝。生产或消费的消息的payload在传输中没有拷贝,同时消息的尺寸没有任何限制。

“你可能需要高吞吐率或者低延迟,但你可以拥有这两个性能”。

librdkafka 允许你实现高吞吐率或者低延迟,这是通过可配置的属性设置实现的。

性能指标中最重要的两个配置属性是:

-batch.num.messages:在发送消息序列之前,本地消息队列中需要积累的最小消息数。

-queue.buffering.max.ms:等待batch.num.messags在本地队列中实现的时间长度。

1、性能指标

后面的测试都是使用以下性能配置指标:

-intel  Quad  Core i7  at 3.4GHz, 8GB 内存

-通过设置brokers 刷新的配置属性,采用简单的方式测试硬盘性能。

log.flush.interval.messages=10000000

log.flush.interval.ms=100000

-两个brokers和librdkafka都运行在同一台机器上

-每个topic有两个partitions

-每个broker只是一个partitions的leader

-使用子目录example下的rdkafka_performance进行测试

测试结果(注意,原文只有producer的测试)

Test1: 2 brokers, 2 partitions, required.acks=2, 100 byte messages: 850000 messages/second, 85 MB/second

Test2: 1 broker, 1 partition, required.acks=0, 100 byte messages: 710000 messages/second, 71 MB/second

Test3: 2 broker2, 2 partitions, required.acks=2, 100 byte messages, snappy compression: 300000 messages/second, 30 MB/second

Test4: 2 broker2, 2 partitions, required.acks=2, 100 byte messages, gzip compression: 230000 messages/second, 23 MB/second

注意:本文最后将描述详细的测试信息

注意:consumer的测试结果很快就会补上

2、高吞吐率

高吞吐率的关键是消息批处理实现--首先本地消息队列中会积累一定数量的消息,然后再将这些消息一块发送出去。这就减少了消息传递消耗并且减少了来回申请产生不良影响。

默认设置下: batch.num.messages=1000,queue.buffering.max.ms=1000,这有利于高吞吐率。默认设置使得librdkafka向broker发送累积消息之前,可以等待1000ms,消息最多累积1000条。这两个属性哪一个先满足,就停止消息累积并进行发送,无论另一个属性是否满足。

这些设置虽然是全局性的(在rd_kafka_conf_t结构中实现),但是却适用于每个top+partitions基本组成。

3、低延迟

当要求消息发送低延迟时,“queue.buffering.max.ms"应当符合producer-side延迟所允许的最大值。 queue.buffering.max.ms设置为0将使得消息尽可能快的发送。

3、压缩

Producer 消息压缩通过“compression.codec配置属性实现。

压缩是批处理本地队列中的消息的,批处理消息的数目越大,压缩率越高。本地批处理队列的容量取决于”batch.num.messages“”queue.buffering.max.ms"配置属性,这在上边高吞吐率章节讨论。

二、消息可靠性

消息可靠性是librdkafka重要指标----实际应用中可以通过两个特定设置(“reuqest.required.acks”和“message.send.max.retries”)保证消息可靠性。

如果topic配置属性"request.reuired.acks“(除了0之外的其他值,查看具体细节)设置用来等待brokers收到消息的确认回复,则librdkafka在收到所有预期acks之前会一直保存消息,这就可以很好的处理一下事件:

-brokers 连接失败

-topic leader发生变化

-brokers通知produce错误

这些都是librdkafka自动完成,具体应用中无需针对以上事件做任何处理。消息在收到失败反馈之前可以保有的时间为”message.send.max.retires"。

librdkafka使用回调函数应对不同发送报告,即应对不同的消息发送状态,它将在收到每条消息传送状态时调用响应的回调函数。

-如果error_code  非0,则消息发送失败,error_code指明失败原因(rd_kafka_resp_err_t enum)

-如果error_code 为0,则消息成功发送

更多细节需要查看Producer  API章节。

发送报告的回调函数是可选的。

三、用法

1、文档介绍

librdkafka API描述在rdkafka.h中,配置属性描述在CONFIGURATION.md。

2、初始化

实际应用中,需要创建一个top-level的对象 rd_kafka_t, 这个对象是基本的容器,它提供了全局性配置属性以及共享状态信息,它由rd_kafka_new()函数创建。

同时也需要创建一个或者多个topics对象rd_kafka_topic_t,给produer以及consumer使用。 topic对象具有topic特定的配置属性,同时还包含了所有可用partitions与leader  brokers映射关系。它通过调用rd_kafka_topic_new()函数创建。

这两个对象都包含了可配置的API。默认情况下将调用默认值,具体属性默认值描述在CONFIGURATION.md。

注意:实际应用中,可能会创建多个rd_kafka_t对象,它们并没有共享状态信息

注意:rd_kafka_topic_t对象只能由创建它的对象rd_kafka_t使用。

3、配置

为了简化与kafka的集成以及缩短学习曲线,librdkafka实现配置属性都可以在kafka官方客户端中找到。

在创建对象之前,需要使用rd_kafka_conf_set()以及rd_kafka_topic_conf_set()函数进行配置。

注意: rd_kafka.._conf_t对象们在rd_kafka.._new()函数使用过后是不能被再次使用的,而且在rd_kakfa.._new()函数调用之后,不需要释放配置资源。

例子:

 
  1. rd_kafka_conf_t * conf;

  2. char errstr[512];

  3. conf = rd_kafka_conf_new();

  4. rd_kafka_conf_set( conf, "compression.codec", "snappy", errstr, sizeof(errstr));

  5. rd_kafka_conf_set( conf, "batch.num.messages", "100", errstr, sizeof(errstr));

  6. rd_kafka_new(RD_KAFKA_PRODUCER,conf);

4、线程和回调函数

librdkafka 内部将会有多个线程,以充分利用硬件资源。API的实现是完全线程安全的,实际应用中可以在任何时候任何线程中调用任何API函数而不用担心线程安全。

一个以轮询为基础的API用来给实际应用提供信号反馈,实际应用应当按照固定时间间隔调用rd_kafka_poll()函数。这个轮询的API将会调用以下可的回调(都是可选的):

-消息发送报告回调:报告消息发送失败。这将允许实际应用采取措施应对发送失败,并释放消息发送过程中占有的资源。

-错误回调:报告错误;错误一般是信息化方面的,例如连接broker失败,实际应用通常不需要采取任何措施。错误的数据类型是通过rd_kafka_resp_err_t enum类型数据,可以描述本地错误和远程broker错误。

不是poll函数引起的可选回调函数,可能是由任意线程引发的:

-logging 回调:实际应用中,用于发送librdkafka产生的log消息。

-partitioner 回调:实际应用提供消息的partitioner。partitioner可能被任何线程任何时候调用,它可能由于同一个key而被调用多次。Partitioner 函数有以下限制:

一定不能调用rd_kafka_*()等函数

一定不能阻塞或延长执行

一定要返回一个0到partition_cnt-1之间的值,或者是在partitioning不能执行的时候返回特定RD_KAFKA_PARTITION_UA值,

5、Brokers

librdkafka 只需要一份最初的brokers列表(至少包含一个broker)。它将连接所有"metadata.broker.list"或者是rd_kafka_brokers_add()函数添加的brokers,然后向每个brokers申请一些元数据信息:包含brokers的完整列表、topic、partitions以及它们在Kafka 集群中的leaders broker信息。

Brokers名字的形式为:host:port; 其中port是可选的,默认是9092,host是任何一个可以解析的hostname或者ipv4或者ipv6地址。如果host是多个地址,librdkafka将会在每一次连接尝试中循环连接这些地址。包含所有broker 地址的DNS记录可以用来提供可靠的bootstrap broker。

6、Producer  API

在使用RD_KAFKA_PRODUCER设置完rd_kafka_t对象后,就可以创建一个或者多个rd_kafka_topic_t对象了,用来接受信息或者发送信息。

rd_kafka_produce()函数需要以下参数:

-rkt: topic, 由前面rd_kafka_topic_new()函数创建

-partition:partition,如果为RD_KAFKA_PARTITION_UA,则配置的partitioner函数将会选择目标partition

-payload,len: 消息主体

-msgflags:0或者以下数值之一:

RD_KAFKA_MSG_F_COPY: librdkafka 在发送前先将消息拷贝下来,以防消息主体所在的缓存不是长久使用的,例如堆栈。

RD_KAFKA_MSG_F_FREE: librdkafka 在使用完消息后,将释放消息缓存。

这两个标志是互斥的,只能设置一个,用来表示是拷贝还是释放。

如果没有设置RD_KAFKA_MSG_F_COPY标志,则没有数据拷贝,librdkafka将会占有消息payload指针直到消息发送完毕或者失败。发送报告回调函数将会在librdkafka使实际调用重新获得payload缓存控制权的时候被调用。在RD_KAFKA_MSG_F_FREE设置的时候,实际调用一定不能在发送报告回调函数中释放payload。

-key,keylen: 可选参数,消息关键字,可以用来分区。它将会传递到topic partitioner回调中,如果存在,则会添加到发向broker的消息中。

-msg_opaque:可选参数,每条消息的透明度指针,由消息传送回调所提供,使应用参考特定的消息。

rd_kafka_produce()是非阻塞的API, 它将使消息存储在内部队列中并立即返回。如果入队的消息数目超过了配置的“queue.buffering.max.messages"属性,则rd_kafka_produce()函数将会返回-1并将errno设置为ENOBUFS, 这样就提供了应对压力的机制。

注意:examples/rdkafka_performance.c提供了producer的实现。

7、Consumer  API

consumer API要比producer  API多一些状态。 在使用RD_KAFKA_CONSUMER类型创建rd_kafka_t 对象,然后创建rd_kakfa_topic_t对象之后,实际应用中必须调用rd_kafka_consumer_start()函数启动对给定partition的consumer。

rd_kafka_consume_start()函数的参数:

-rkt: 进行consume的topic, 由前面rd_kafka_topic_new()创建

-partition:进行consume的partition

-offset:开始consume的消息偏移。这个偏移可能是一个绝对消息偏移,或者是RD_KAKFA_OFFSET_STORED来使用存储的offset,也可能是两个特定偏移之一:RD_KAFKA_OFFSET_BEGINNING,从partition消息队列的开始进行consume;RD_KAFKA_OFFSET_END:从partition中的将要produce的下一条信息开始(忽略即当前所有的消息)。

在topic+partition的consumer启动之后,librdkafka将尝试使本地消息队列中的消息数目保持在queued.min.messages,一方反复的从broker获取消息。

本地消息队列将通过以下三种不同的consum  APIs进行consume:

-rd_kafka_consume():每次consume一条消息

-rd_kafka_consume_batch():批处理consume,一条或多条

-rd_kafka_consume_callback():consume本地消息队列中的所有消息,并调用回调函数处理每条消息

上述三种方式按照性能排列的,rd_kafka_consume()是最慢的,rd_kafka_consume_callback()最快。不同的需求可以选择不同的实现方式。

一条consumed消息,由每一个consume函数提供或返回,具体是由rd_kafka_messag_t类型对象保存。

rd_kafka_message_t对象成员:

-err:错误返回值。非0值表示出现错误,err是rd_kafka_resp_err_t类型数据。如果是0则表示进行了适当的消息抓取,并且payload中包含了message。

-rkt,partition:topic和partition信息

-payload,len:消息的payload数据或者错误的消息(err!=0)

-key,key_len:可选参数,主要是用来获取特定的消息。

-offset:消息的偏移地址

payload,key和消息一样,都是属于librdkafka,在rd_kafka_message_destroy()函数调用之后就不能再使用了。librdkafka将会使用相同的消息集接收缓存来存放消息消息集的playloads,这就避免过度拷贝,即意味着如果实际应用决定挂起某个单独的rd_kafka_message_t对象,这将会阻碍后面的缓存释放。

当实际应用完成consume消息,则应该调用rd_kafka_consume_stop()函数停止consumer。这将消除本地队列中中任何消息。

注意:examples/rdkafka_performance.c实现了consumer。

8、offset 管理

Offset管理可以通过本地offset保存文件完成,offset将会周期性的写入每个topic+partition的配置属性:

-auto.commit.enable

-auto.commit.interval.ms

-offset.store.path

-offset.store.sync.interval.ms

当前ZooKeeper还不支持offset管理。

9、Consumer  groups

当前还不支持consumer  groups, librdkafka consumer  API只编译了官方scala 简单版的Consumer。只有librdkafka能够支持这项应用,你才能拥有你的消费组。

10、Topics

Topic自动创建

topic自动创建是支持的。brokers需要使用”auto.create.topics.enable=true“进行配置。

四、其他:

测试细节:

Test1: Produce to two brokers, two partitions, required.acks=2, 100 byte messages

Each broker is leader for one of the two partitions. The random partitioner is used (default) and each broker and partition is assigned approximately 250000 messages each.

Command:

 
  1. # examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test1:TwoBrokers:500kmsgs:100bytes" -S 1 -a 2

  2. ....

  3. % 500000 messages and 50000000 bytes sent in 587ms: 851531 msgs/s and 85.15 Mb/s, 0 messages failed, no compression

Result:

Message transfer rate is approximately 850000 messages per second, 85 megabytes per second.

Test2: Produce to one broker, one partition, required.acks=0, 100 byte messages

Command:

 
  1. # examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test2:OneBrokers:500kmsgs:100bytes" -S 1 -a 0 -p 1

  2. ....

  3. % 500000 messages and 50000000 bytes sent in 698ms: 715994 msgs/s and 71.60 Mb/s, 0 messages failed, no compression

Result:

Message transfer rate is approximately 710000 messages per second, 71 megabytes per second.

Test3: Produce to two brokers, two partitions, required.acks=2, 100 byte messages, snappy compression

Command:

 
  1. # examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test3:TwoBrokers:500kmsgs:100bytes:snappy" -S 1 -a 2 -z snappy

  2. ....

  3. % 500000 messages and 50000000 bytes sent in 1672ms: 298915 msgs/s and 29.89 Mb/s, 0 messages failed, snappy compression

Result:

Message transfer rate is approximately 300000 messages per second, 30 megabytes per second.

Test4: Produce to two brokers, two partitions, required.acks=2, 100 byte messages, gzip compression

Command:

 
  1. # examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test3:TwoBrokers:500kmsgs:100bytes:gzip" -S 1 -a 2 -z gzip

  2. ....

  3. % 500000 messages and 50000000 bytes sent in 2111ms: 236812 msgs/s and 23.68 Mb/s, 0 messages failed, gzip compression

Result:

Message transfer rate is approximately 230000 messages per second, 23 megabytes per second.

================================================================================================================================================================================================================================================================================================

实际应用中,需要创建一个top-level的对象 rd_kafka_t, 这个对象是基本的容器,它提供了全局性配置属性以及共享状态信息,它由rd_kafka_new()函数创建。

================================================================================================================================================================================================================================================================================================================================================================================================

安装:

下载https://github.com/edenhill/librdkafka
预备环境:

The GNU toolchain
GNU make
pthreads
zlib (optional, for gzip compression support)
libssl-dev (optional, for SSL and SASL SCRAM support)
libsasl2-dev (optional, for SASL GSSAPI support)

编译和安装:

  ./configuremakesudo make install

server端开启

下载:https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz注意版本的选择,本文选择的是release版本。
http://kafka.apache.org/quickstart 中描述的是release版本。
开启ZooKeeper server :
bin/zookeeper-server-start.sh config/zookeeper.properties &
查看端口2181的使用:

说明此时的zookeeper已经启动。
开启Kafka server:
bin/kafka-server-start.sh config/server.properties &
从配置文件我们可以知道该服务占用的端口号是9092:

下面的使用当然可以用bin目录下的各个脚本,但是本文主要是介绍以lib的方式进行操作。

用法介绍:

Producer的使用方法:

创建kafka客户端配置占位符:
conf = rd_kafka_conf_new();即创建一个配置对象(rd_kafka_conf_t)。并通过rd_kafka_conf_set进行brokers的配置。

设置信息的回调:
用以反馈信息发送的成败。通过rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);实现。

创建producer实例:
1)初始化:
应用程序需要初始化一个顶层对象(rd_kafka_t)的基础容器,用于全局配置和共享状态。
通过调用rd_kafka_new()创建。创建之后,该实例就占有了conf对象,所以conf对象们在rd_kafka_new()调用之后是不能被再次使用的,而且在rd_kafka_new()调用之后也不需要释放配置资源的。
2)创建topic:
创建的topi对象是可以复用的(producer的实例化对象(rd_kafka_t)也是允许复用的,所以这两者就没有必要频繁创建)
实例化一个或多个 topic(rd_kafka_topic_t)用于生产或消费。
topic 对象保存 topic 级别的属性,并且维护一个映射,
该映射保存所有可用 partition 和他们的领导 broker 。
通过调用rd_kafka_topic_new()创建(rd_kafka_topic_new(rk, topic, NULL);)。
注:rd_kafka_trd_kafka_topic_t都源于可选的配置 API。
不使用该 API 将导致 librdkafka 使用列在文档CONFIGURATION.md中的默认配置。

3)Producer API:
通过调用RD_KAFKA_PRODUCER设置一个或多个rd_kafka_topic_t对象,就可以准备好接收消息,并组装和发送到 broker。
rd_kafka_produce()函数接受如下参数:
rkt : 待生产的topic,之前通过rd_kafka_topic_new()生成
partition : 生产的 partition。如果设置为RD_KAFKA_PARTITION_UA(未赋值的),则会根据builtin partitioner去选择一个确定 partition。kafka会回调partitioner进行均衡选取,partitioner方法需要自己实现。可以轮询或者传入key进行hash。未实现则采用默认的随机方法rd_kafka_msg_partitioner_random随机选择。
可以尝试通过partitioner来设计partition的取值。
msgflags : 0 或下面的值:
RD_KAFKA_MSG_F_COPY 表示librdkafka 在信息发送前立即从 payload 做一份拷贝。如果 payload 是不稳定存储,如栈,需要使用这个参数。这是以防消息主体所在的缓存不是长久使用的,才预先将信息进行拷贝。
RD_KAFKA_MSG_F_FREE 表示当 payload 使用完后,让 librdkafka 使用free(3)释放。 就是在使用完消息后,将释放消息缓存。
这两个标志互斥,如果都不设置,payload 既不会被拷贝也不会被 librdkafka 释放。
如果RD_KAFKA_MSG_F_COPY标志不设置,就不会有数据拷贝,librdkafka 将占用 payload 指针(消息主体)直到消息被发送或失败。librdkafka 处理完消息后,会调用发送报告回调函数,让应用程序重新获取 payload 的所有权。
如果设置了RD_KAFKA_MSG_F_FREE,应用程序就不要在发送报告回调函数中释放 payload。
payload,len : 消息 payload(message payload,即值),消息长度
key,keylen : 可选的消息键及其长度,用于分区。将会用于 topic 分区回调函数,如果有,会附加到消息中发送给 broker。
msg_opaque : 可选的,应用程序为每个消息提供的无类型指针,提供给消息发送回调函数,用于应用程序引用。

rd_kafka_produce() 是一个非阻塞 API,该函数会将消息塞入一个内部队列并立即返回
如果队列中的消息数超过queue.buffering.max.messages属性配置的值,rd_kafka_produce()通过返回 -1,并将errno设置为ENOBUFS这样的错误码来反馈错误。
提示: 见 examples/rdkafka_performance.c 获取生产者的使用。

Consumer的使用方法:

consumer API要比producer API多一些状态。 在使用RD_KAFKA_CONSUMER类型(调用rd_kafka_new时设置的函数参数)创建rd_kafka_t 对象,再通过调用rd_kafka_brokers_add对上述new出来的Kafka handle(rk)进行broker的添加(rd_kafka_brokers_add(rk, brokers)),
然后创建rd_kakfa_topic_t对象之后,

rd_kafka_query_watermark_offsets

创建topic:
rtk = rd_kafka_topic_new(rk, topic, topic_conf)

开始消费:
调用rd_kafka_consumer_start()函数(rd_kafka_consume_start(rkt, partition, start_offset))启动对给定partition的consumer。
调用rd_kafka_consumer_start需要的参数如下:
rkt : 要消费的 topic ,之前通过rd_kafka_topic_new()创建。
partition : 要消费的 partition。
offset : 消费开始的消息偏移量。可以是绝对的值或两种特殊的偏移量:
RD_KAFKA_OFFSET_BEGINNING 从该 partition 的队列的最开始消费(最早的消息)。
RD_KAFKA_OFFSET_END 从该 partition 产生的下一个消息开始消费。
RD_KAFKA_OFFSET_STORED 使用偏移量存储。

当一个 topic+partition 消费者被启动,librdkafka 不断尝试从 broker 批量获取消息来保持本地队列有queued.min.messages数量的消息。
本地消息队列通过 3 个不同的消费 API 向应用程序提供服务:

    rd_kafka_consume() - 消费一个消息rd_kafka_consume_batch() - 消费一个或多个消息rd_kafka_consume_callback() - 消费本地队列中的所有消息,且每一个都调用回调函数

这三个 API 的性能按照升序排列,rd_kafka_consume()最慢,rd_kafka_consume_callback()最快。不同的类型满足不同的应用需要。
被上述函数消费的消息返回rd_kafka_message_t类型。
rd_kafka_message_t的成员:

* err - 返回给应用程序的错误信号。如果该值不是零,payload字段应该是一个错误的消息,err是一个错误码(rd_kafka_resp_err_t)。
* rkt,partition - 消息的 topic 和 partition 或错误。
* payload,len - 消息的数据或错误的消息 (err!=0)。
* key,key_len - 可选的消息键,生产者指定。
* offset - 消息偏移量。

不管是payloadkey的内存,还是整个消息,都由 librdkafka 所拥有,且在rd_kafka_message_destroy()被调用后不要使用。
librdkafka 为了避免消息集的多余拷贝,会为所有从内存缓存中接收的消息共享同一个消息集,这意味着如果应用程序保留单个rd_kafka_message_t,将会阻止内存释放并用于同一个消息集的其他消息。
当应用程序从一个 topic+partition中消费完消息,应该调用rd_kafka_consume_stop()来结束消费。该函数同时会清空当前本地队列中的所有消息。
提示: 见 examples/rdkafka_performance.c 获取消费者的使用。

在Kafka broker中server.properties文件配置(参数log.dirs=/data2/logs/kafka/)使得写入到消息队列中的topic在该目录下对分区的形式进行存储。每个分区partition下是由segment file组成,而segment file包括2大部分:分别为index file和data file,此2个文件一一对应,成对出现,后缀”.index”和“.log”分别表示为segment索引文件、数据文件。
segment文件命名规则:partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。

具体示例:

本文所采用的是cpp方式,和上述介绍的只是函数使用上的不同,业务逻辑是一样的。
在producer过程中直接是使用PARTITION_UA 但是在消费的时候,不能够指定partition值为PARTITION_UA因为该值其实是-1,对于Consumer端来说,是无意义的。根据源码可以知道当不指定partitioner的时候,其实是有一个默认的partitioner,就是Consistent-Random partitioner所谓的一致性随机partitioner。一致性hash对关键字进行map映射之后到一个特定的partition。
函数原型:

rd_kafka_msg_partitioner_consistent_random (const rd_kafka_topic_t *rkt,const void *key, size_t keylen,int32_t partition_cnt,void *opaque, void *msg_opaque);

PARTITION_UA其实是Unassigned partition的意思,即是未赋值的分区。RD_KAFKA_PARTITION_UA (unassigned)其实是自动采用topic下的partitioner函数,当然也可以直接采用固定的值。
在配置文件config/server.properties中是可以设置partition的数量num.partitions

分配分区

在分配分区的时候,要注意。对于一个已经创建了分区的主题且已经指定了分区,那么之后的producer代码如果是直接修改partitioner部分的代码,直接引入key值进行分区的重新分配的话,是不行的,会继续按照之前的分区进行添加(之前的分区是分区0,只有一个)。此时如果在程序中查看partition_cnt我们是可以看到,该值并没有因为config/server.properties的修改而变化,这是因为此时的partition_cnt是针对该已经创建的主题topic的。
而如果尚自单纯修改代码中的partition_cnt在用于计算分区值时候:djb_hash(key->c_str(), key->size()) % 5 是会得到如下结果的:提示分区不存在。

我们可以通过rdkafka_example来查看某个topic下对应的partition数量的:
./rdkafka_example -L -t helloworld_kugou -b localhost:9092


从中我们可以看到helloworld_kugou主题只有一个partition,而helloworld_kugou1主题是有5个partition的,这个和我们预期的相符合。
我们可以对已经创建的主题修改其分区:
./bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --partition 5 --topic helloworld_kugou
修改完之后,我们可以看出,helloworld_kugou已经变为5个分区了。

具体示例:

创建topic为helloworld_kugou_test,5个partition。我们可以看到,在producer端进行输入之前,在预先设置好的log目录下是已经有5个partition:

producer端代码:

class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb
{public:void dr_cb (RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;if (message.key())std::cout << "Key: " << *(message.key()) << ";" << std::endl;}
};class ExampleEventCb : public RdKafka::EventCb {public:void event_cb (RdKafka::Event &event) {switch (event.type()){case RdKafka::Event::EVENT_ERROR:std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)run = false;break;case RdKafka::Event::EVENT_STATS:std::cerr << "\"STATS\": " << event.str() << std::endl;break;case RdKafka::Event::EVENT_LOG:fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());break;default:std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;break;}}
};/* Use of this partitioner is pretty pointless since no key is provided* in the produce() call.so when you need input your key */
class MyHashPartitionerCb : public RdKafka::PartitionerCb {public:int32_t partitioner_cb (const RdKafka::Topic *topic, const std::string *key,int32_t partition_cnt, void *msg_opaque) {std::cout<<"partition_cnt="<<partition_cnt<<std::endl;return djb_hash(key->c_str(), key->size()) % partition_cnt;}private:static inline unsigned int djb_hash (const char *str, size_t len) {unsigned int hash = 5381;for (size_t i = 0 ; i < len ; i++)hash = ((hash << 5) + hash) + str[i];std::cout<<"hash1="<<hash<<std::endl;return hash;}
};void TestProducer()
{std::string brokers = "localhost";std::string errstr;std::string topic_str="helloworld_kugou_test";//自行制定主题topicMyHashPartitionerCb hash_partitioner;int32_t partition = RdKafka::Topic::PARTITION_UA;int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;bool do_conf_dump = false;int opt;int use_ccb = 0;//Create configuration objectsRdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << errstr << std::endl;exit(1);}/** Set configuration properties*/conf->set("metadata.broker.list", brokers, errstr);ExampleEventCb ex_event_cb;conf->set("event_cb", &ex_event_cb, errstr);ExampleDeliveryReportCb ex_dr_cb;/* Set delivery report callback */conf->set("dr_cb", &ex_dr_cb, errstr);/** Create producer using accumulated global configuration.*/RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;exit(1);}std::cout << "% Created producer " << producer->name() << std::endl;/** Create topic handle.*/RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);if (!topic) {std::cerr << "Failed to create topic: " << errstr << std::endl;exit(1);}/** Read messages from stdin and produce to broker.*/for (std::string line; run && std::getline(std::cin, line);){if (line.empty()){producer->poll(0);continue;}/** Produce message// 1. topic// 2. partition// 3. flags// 4. payload// 5. payload len// 6. std::string key// 7. msg_opaque? NULL*/std::string key=line.substr(0,5);//根据line前5个字符串作为key值// int a = MyHashPartitionerCb::djb_hash(key.c_str(),key.size());// std::cout<<"hash="<<a<<std::endl;RdKafka::ErrorCode resp = producer->produce(topic, partition,RdKafka::Producer::RK_MSG_COPY /* Copy payload */,const_cast<char *>(line.c_str()), line.size(),key.c_str(), key.size(), NULL);//这里可以设计key值,因为会根据key值放在对应的partitionif (resp != RdKafka::ERR_NO_ERROR)std::cerr << "% Produce failed: " <<RdKafka::err2str(resp) << std::endl;elsestd::cerr << "% Produced message (" << line.size() << " bytes)" <<std::endl;producer->poll(0);//对于socket进行读写操作。poll方法才是做实际的IO操作的。return the number of events served}//run = true;while (run && producer->outq_len() > 0) {std::cerr << "Waiting for " << producer->outq_len() << std::endl;producer->poll(1000);}delete topic;delete producer;
}

在Consumer端进行验证的时候,可以发现不同的partition确实写入了不同的数据。结果如下:

Consumer端代码:


void msg_consume(RdKafka::Message* message, void* opaque)
{switch (message->err()){case RdKafka::ERR__TIMED_OUT:break;case RdKafka::ERR_NO_ERROR:/* Real message */std::cout << "Read msg at offset " << message->offset() << std::endl;if (message->key()){std::cout << "Key: " << *message->key() << std::endl;}printf("%.*s\n", static_cast<int>(message->len()),static_cast<const char *>(message->payload()));break;case RdKafka::ERR__PARTITION_EOF:/* Last message */if (exit_eof){run = false;}break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cerr << "Consume failed: " << message->errstr() << std::endl;run = false;break;default:/* Errors */std::cerr << "Consume failed: " << message->errstr() << std::endl;run = false;}
}
class ExampleConsumeCb : public RdKafka::ConsumeCb {public:void consume_cb (RdKafka::Message &msg, void *opaque){msg_consume(&msg, opaque);}
};
void TestConsumer()
{std::string brokers = "localhost";std::string errstr;std::string topic_str="helloworld_kugou_test";//helloworld_kugouMyHashPartitionerCb hash_partitioner;int32_t partition = RdKafka::Topic::PARTITION_UA;//为何不能用??在Consumer这里只能写0???无法自动吗???partition = 3;int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;bool do_conf_dump = false;int opt;int use_ccb = 0;//Create configuration objectsRdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (tconf->set("partitioner_cb", &hash_partitioner, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << errstr << std::endl;exit(1);}/** Set configuration properties*/conf->set("metadata.broker.list", brokers, errstr);ExampleEventCb ex_event_cb;conf->set("event_cb", &ex_event_cb, errstr);ExampleDeliveryReportCb ex_dr_cb;/* Set delivery report callback */conf->set("dr_cb", &ex_dr_cb, errstr);/** Create consumer using accumulated global configuration.*/RdKafka::Consumer *consumer = RdKafka::Consumer::create(conf, errstr);if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;exit(1);}std::cout << "% Created consumer " << consumer->name() << std::endl;/** Create topic handle.*/RdKafka::Topic *topic = RdKafka::Topic::create(consumer, topic_str, tconf, errstr);if (!topic){std::cerr << "Failed to create topic: " << errstr << std::endl;exit(1);}/** Start consumer for topic+partition at start offset*/RdKafka::ErrorCode resp = consumer->start(topic, partition, start_offset);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "Failed to start consumer: " << RdKafka::err2str(resp) << std::endl;exit(1);}ExampleConsumeCb ex_consume_cb;/** Consume messages*/while (run){if (use_ccb){consumer->consume_callback(topic, partition, 1000, &ex_consume_cb, &use_ccb);}else {RdKafka::Message *msg = consumer->consume(topic, partition, 1000);msg_consume(msg, NULL);delete msg;}consumer->poll(0);}/** Stop consumer*/consumer->stop(topic, partition);consumer->poll(1000);delete topic;delete consumer;
}

那么在producer端怎么根据key值获取具体是进入哪个partition的呢?是否有接口可以查看呢?这个有待补充。

================================================================================================================================================================================================================================================================================================================================================================================================

文章源地址:https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md

librdkafka 是Apache  Kafka  客户端C语言的高性能实现, 能够提供可靠并且表现优秀的客户端,同时它也提供比较初级的C++界面。

Contents

本文主要包含以下章节:

一、性能

-性能指标

-高吞吐量

-低延迟

-压缩

二、消息可靠性

三、用法

-文档介绍

-初始化

-配置

-线程和回调函数

-brokers

-producer API

-consumer API

四、其他

-测试细节

一、性能

librdkafka库是多线程的,给现在硬件系统设计,并尽力实现最小的内存拷贝。生产或消费的消息的payload在传输中没有拷贝,同时消息的尺寸没有任何限制。

“你可能需要高吞吐率或者低延迟,但你可以拥有这两个性能”。

librdkafka 允许你实现高吞吐率或者低延迟,这是通过可配置的属性设置实现的。

性能指标中最重要的两个配置属性是:

-batch.num.messages:在发送消息序列之前,本地消息队列中需要积累的最小消息数。

-queue.buffering.max.ms:等待batch.num.messags在本地队列中实现的时间长度。

1、性能指标

后面的测试都是使用以下性能配置指标:

-intel  Quad  Core i7  at 3.4GHz, 8GB 内存

-通过设置brokers 刷新的配置属性,采用简单的方式测试硬盘性能。

log.flush.interval.messages=10000000

log.flush.interval.ms=100000

-两个brokers和librdkafka都运行在同一台机器上

-每个topic有两个partitions

-每个broker只是一个partitions的leader

-使用子目录example下的rdkafka_performance进行测试

测试结果(注意,原文只有producer的测试)

Test1: 2 brokers, 2 partitions, required.acks=2, 100 byte messages: 850000 messages/second, 85 MB/second

Test2: 1 broker, 1 partition, required.acks=0, 100 byte messages: 710000 messages/second, 71 MB/second

Test3: 2 broker2, 2 partitions, required.acks=2, 100 byte messages, snappy compression: 300000 messages/second, 30 MB/second

Test4: 2 broker2, 2 partitions, required.acks=2, 100 byte messages, gzip compression: 230000 messages/second, 23 MB/second

注意:本文最后将描述详细的测试信息

注意:consumer的测试结果很快就会补上

2、高吞吐率

高吞吐率的关键是消息批处理实现--首先本地消息队列中会积累一定数量的消息,然后再将这些消息一块发送出去。这就减少了消息传递消耗并且减少了来回申请产生不良影响。

默认设置下: batch.num.messages=1000,queue.buffering.max.ms=1000,这有利于高吞吐率。默认设置使得librdkafka向broker发送累积消息之前,可以等待1000ms,消息最多累积1000条。这两个属性哪一个先满足,就停止消息累积并进行发送,无论另一个属性是否满足。

这些设置虽然是全局性的(在rd_kafka_conf_t结构中实现),但是却适用于每个top+partitions基本组成。

3、低延迟

当要求消息发送低延迟时,“queue.buffering.max.ms"应当符合producer-side延迟所允许的最大值。 queue.buffering.max.ms设置为0将使得消息尽可能快的发送。

3、压缩

Producer 消息压缩通过“compression.codec配置属性实现。

压缩是批处理本地队列中的消息的,批处理消息的数目越大,压缩率越高。本地批处理队列的容量取决于”batch.num.messages“”queue.buffering.max.ms"配置属性,这在上边高吞吐率章节讨论。

二、消息可靠性

消息可靠性是librdkafka重要指标----实际应用中可以通过两个特定设置(“reuqest.required.acks”和“message.send.max.retries”)保证消息可靠性。

如果topic配置属性"request.reuired.acks“(除了0之外的其他值,查看具体细节)设置用来等待brokers收到消息的确认回复,则librdkafka在收到所有预期acks之前会一直保存消息,这就可以很好的处理一下事件:

-brokers 连接失败

-topic leader发生变化

-brokers通知produce错误

这些都是librdkafka自动完成,具体应用中无需针对以上事件做任何处理。消息在收到失败反馈之前可以保有的时间为”message.send.max.retires"。

librdkafka使用回调函数应对不同发送报告,即应对不同的消息发送状态,它将在收到每条消息传送状态时调用响应的回调函数。

-如果error_code  非0,则消息发送失败,error_code指明失败原因(rd_kafka_resp_err_t enum)

-如果error_code 为0,则消息成功发送

更多细节需要查看Producer  API章节。

发送报告的回调函数是可选的。

三、用法

1、文档介绍

librdkafka API描述在rdkafka.h中,配置属性描述在CONFIGURATION.md。

2、初始化

实际应用中,需要创建一个top-level的对象 rd_kafka_t, 这个对象是基本的容器,它提供了全局性配置属性以及共享状态信息,它由rd_kafka_new()函数创建。

同时也需要创建一个或者多个topics对象rd_kafka_topic_t,给produer以及consumer使用。 topic对象具有topic特定的配置属性,同时还包含了所有可用partitions与leader  brokers映射关系。它通过调用rd_kafka_topic_new()函数创建。

这两个对象都包含了可配置的API。默认情况下将调用默认值,具体属性默认值描述在CONFIGURATION.md。

注意:实际应用中,可能会创建多个rd_kafka_t对象,它们并没有共享状态信息

注意:rd_kafka_topic_t对象只能由创建它的对象rd_kafka_t使用。

3、配置

为了简化与kafka的集成以及缩短学习曲线,librdkafka实现配置属性都可以在kafka官方客户端中找到。

在创建对象之前,需要使用rd_kafka_conf_set()以及rd_kafka_topic_conf_set()函数进行配置。

注意: rd_kafka.._conf_t对象们在rd_kafka.._new()函数使用过后是不能被再次使用的,而且在rd_kakfa.._new()函数调用之后,不需要释放配置资源。

例子:

[cpp] view plain copy print?

  1. rd_kafka_conf_t * conf;
  2. char errstr[512];
  3. conf = rd_kafka_conf_new();
  4. rd_kafka_conf_set( conf, "compression.codec", "snappy", errstr, sizeof(errstr));
  5. rd_kafka_conf_set( conf, "batch.num.messages", "100", errstr, sizeof(errstr));
  6. rd_kafka_new(RD_KAFKA_PRODUCER,conf);

4、线程和回调函数

librdkafka 内部将会有多个线程,以充分利用硬件资源。API的实现是完全线程安全的,实际应用中可以在任何时候任何线程中调用任何API函数而不用担心线程安全。

一个以轮询为基础的API用来给实际应用提供信号反馈,实际应用应当按照固定时间间隔调用rd_kafka_poll()函数。这个轮询的API将会调用以下可的回调(都是可选的):

-消息发送报告回调:报告消息发送失败。这将允许实际应用采取措施应对发送失败,并释放消息发送过程中占有的资源。

-错误回调:报告错误;错误一般是信息化方面的,例如连接broker失败,实际应用通常不需要采取任何措施。错误的数据类型是通过rd_kafka_resp_err_t enum类型数据,可以描述本地错误和远程broker错误。

不是poll函数引起的可选回调函数,可能是由任意线程引发的:

-logging 回调:实际应用中,用于发送librdkafka产生的log消息。

-partitioner 回调:实际应用提供消息的partitioner。partitioner可能被任何线程任何时候调用,它可能由于同一个key而被调用多次。Partitioner 函数有以下限制:

一定不能调用rd_kafka_*()等函数

一定不能阻塞或延长执行

一定要返回一个0到partition_cnt-1之间的值,或者是在partitioning不能执行的时候返回特定RD_KAFKA_PARTITION_UA值,

5、Brokers

librdkafka 只需要一份最初的brokers列表(至少包含一个broker)。它将连接所有"metadata.broker.list"或者是rd_kafka_brokers_add()函数添加的brokers,然后向每个brokers申请一些元数据信息:包含brokers的完整列表、topic、partitions以及它们在Kafka 集群中的leaders broker信息。

Brokers名字的形式为:host:port; 其中port是可选的,默认是9092,host是任何一个可以解析的hostname或者ipv4或者ipv6地址。如果host是多个地址,librdkafka将会在每一次连接尝试中循环连接这些地址。包含所有broker 地址的DNS记录可以用来提供可靠的bootstrap broker。

6、Producer  API

在使用RD_KAFKA_PRODUCER设置完rd_kafka_t对象后,就可以创建一个或者多个rd_kafka_topic_t对象了,用来接受信息或者发送信息。

rd_kafka_produce()函数需要以下参数:

-rkt: topic, 由前面rd_kafka_topic_new()函数创建

-partition:partition,如果为RD_KAFKA_PARTITION_UA,则配置的partitioner函数将会选择目标partition

-payload,len: 消息主体

-msgflags:0或者以下数值之一:

RD_KAFKA_MSG_F_COPY: librdkafka 在发送前先将消息拷贝下来,以防消息主体所在的缓存不是长久使用的,例如堆栈。

RD_KAFKA_MSG_F_FREE: librdkafka 在使用完消息后,将释放消息缓存。

这两个标志是互斥的,只能设置一个,用来表示是拷贝还是释放。

如果没有设置RD_KAFKA_MSG_F_COPY标志,则没有数据拷贝,librdkafka将会占有消息payload指针直到消息发送完毕或者失败。发送报告回调函数将会在librdkafka使实际调用重新获得payload缓存控制权的时候被调用。在RD_KAFKA_MSG_F_FREE设置的时候,实际调用一定不能在发送报告回调函数中释放payload。

-key,keylen: 可选参数,消息关键字,可以用来分区。它将会传递到topic partitioner回调中,如果存在,则会添加到发向broker的消息中。

-msg_opaque:可选参数,每条消息的透明度指针,由消息传送回调所提供,使应用参考特定的消息。

rd_kafka_produce()是非阻塞的API, 它将使消息存储在内部队列中并立即返回。如果入队的消息数目超过了配置的“queue.buffering.max.messages"属性,则rd_kafka_produce()函数将会返回-1并将errno设置为ENOBUFS, 这样就提供了应对压力的机制。

注意:examples/rdkafka_performance.c提供了producer的实现。

7、Consumer  API

consumer API要比producer  API多一些状态。 在使用RD_KAFKA_CONSUMER类型创建rd_kafka_t 对象,然后创建rd_kakfa_topic_t对象之后,实际应用中必须调用rd_kafka_consumer_start()函数启动对给定partition的consumer。

rd_kafka_consume_start()函数的参数:

-rkt: 进行consume的topic, 由前面rd_kafka_topic_new()创建

-partition:进行consume的partition

-offset:开始consume的消息偏移。这个偏移可能是一个绝对消息偏移,或者是RD_KAKFA_OFFSET_STORED来使用存储的offset,也可能是两个特定偏移之一:RD_KAFKA_OFFSET_BEGINNING,从partition消息队列的开始进行consume;RD_KAFKA_OFFSET_END:从partition中的将要produce的下一条信息开始(忽略即当前所有的消息)。

在topic+partition的consumer启动之后,librdkafka将尝试使本地消息队列中的消息数目保持在queued.min.messages,一方反复的从broker获取消息。

本地消息队列将通过以下三种不同的consum  APIs进行consume:

-rd_kafka_consume():每次consume一条消息

-rd_kafka_consume_batch():批处理consume,一条或多条

-rd_kafka_consume_callback():consume本地消息队列中的所有消息,并调用回调函数处理每条消息

上述三种方式按照性能排列的,rd_kafka_consume()是最慢的,rd_kafka_consume_callback()最快。不同的需求可以选择不同的实现方式。

一条consumed消息,由每一个consume函数提供或返回,具体是由rd_kafka_messag_t类型对象保存。

rd_kafka_message_t对象成员:

-err:错误返回值。非0值表示出现错误,err是rd_kafka_resp_err_t类型数据。如果是0则表示进行了适当的消息抓取,并且payload中包含了message。

-rkt,partition:topic和partition信息

-payload,len:消息的payload数据或者错误的消息(err!=0)

-key,key_len:可选参数,主要是用来获取特定的消息。

-offset:消息的偏移地址

payload,key和消息一样,都是属于librdkafka,在rd_kafka_message_destroy()函数调用之后就不能再使用了。librdkafka将会使用相同的消息集接收缓存来存放消息消息集的playloads,这就避免过度拷贝,即意味着如果实际应用决定挂起某个单独的rd_kafka_message_t对象,这将会阻碍后面的缓存释放。

当实际应用完成consume消息,则应该调用rd_kafka_consume_stop()函数停止consumer。这将消除本地队列中中任何消息。

注意:examples/rdkafka_performance.c实现了consumer。

8、offset 管理

Offset管理可以通过本地offset保存文件完成,offset将会周期性的写入每个topic+partition的配置属性:

-auto.commit.enable

-auto.commit.interval.ms

-offset.store.path

-offset.store.sync.interval.ms

当前ZooKeeper还不支持offset管理。

9、Consumer  groups

当前还不支持consumer  groups, librdkafka consumer  API只编译了官方scala 简单版的Consumer。只有librdkafka能够支持这项应用,你才能拥有你的消费组。

10、Topics

Topic自动创建

topic自动创建是支持的。brokers需要使用”auto.create.topics.enable=true“进行配置。

四、其他:

测试细节:

Test1: Produce to two brokers, two partitions, required.acks=2, 100 byte messages

Each broker is leader for one of the two partitions. The random partitioner is used (default) and each broker and partition is assigned approximately 250000 messages each.

Command:

# examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test1:TwoBrokers:500kmsgs:100bytes" -S 1 -a 2
....
% 500000 messages and 50000000 bytes sent in 587ms: 851531 msgs/s and 85.15 Mb/s, 0 messages failed, no compression

Result:

Message transfer rate is approximately 850000 messages per second, 85 megabytes per second.

Test2: Produce to one broker, one partition, required.acks=0, 100 byte messages

Command:

# examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test2:OneBrokers:500kmsgs:100bytes" -S 1 -a 0 -p 1
....
% 500000 messages and 50000000 bytes sent in 698ms: 715994 msgs/s and 71.60 Mb/s, 0 messages failed, no compression

Result:

Message transfer rate is approximately 710000 messages per second, 71 megabytes per second.

Test3: Produce to two brokers, two partitions, required.acks=2, 100 byte messages, snappy compression

Command:

# examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test3:TwoBrokers:500kmsgs:100bytes:snappy" -S 1 -a 2 -z snappy
....
% 500000 messages and 50000000 bytes sent in 1672ms: 298915 msgs/s and 29.89 Mb/s, 0 messages failed, snappy compression

Result:

Message transfer rate is approximately 300000 messages per second, 30 megabytes per second.

Test4: Produce to two brokers, two partitions, required.acks=2, 100 byte messages, gzip compression

Command:

# examples/rdkafka_performance -P -t test2 -s 100 -c 500000 -m "_____________Test3:TwoBrokers:500kmsgs:100bytes:gzip" -S 1 -a 2 -z gzip
....
% 500000 messages and 50000000 bytes sent in 2111ms: 236812 msgs/s and 23.68 Mb/s, 0 messages failed, gzip compression

Result:

Message transfer rate is approximately 230000 messages per second, 23 megabytes per second.

================================================================================================================================================================================================================================================================================================================================================================================================

c语言使用librdkafka库实现kafka的生产和消费实例(转)

关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的生产、消费

一、producer

librdkafka进行kafka生产操作的大致步骤如下:

1、创建kafka配置

[cpp] view plain copy

  1. rd_kafka_conf_t *rd_kafka_conf_new (void)

2、配置kafka各项参数

[cpp] view plain copy

  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
  2. const char *name,
  3. const char *value,
  4. char *errstr, size_t errstr_size)

3、设置发送回调函数

[cpp] view plain copy

  1. void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,
  2. void (*dr_msg_cb) (rd_kafka_t *rk,
  3. const rd_kafka_message_t *
  4. rkmessage,
  5. void *opaque))

4、创建producer实例

[cpp] view plain copy

  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)

5、实例化topic

[cpp] view plain copy

  1. rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)

6、异步调用将消息发送到指定的topic

[cpp] view plain copy

  1. int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,
  2. int msgflags,
  3. void *payload, size_t len,
  4. const void *key, size_t keylen,
  5. void *msg_opaque)

7、阻塞等待消息发送完成

[cpp] view plain copy

  1. int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)

8、等待完成producer请求完成

[cpp] view plain copy

  1. rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)

9、销毁topic

[cpp] view plain copy

  1. void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)

10、销毁producer实例

[cpp] view plain copy

  1. void rd_kafka_destroy (rd_kafka_t *rk)

完整代码如下my_producer.c:

[cpp] view plain copy

  1. #include <stdio.h>
  2. #include <signal.h>
  3. #include <string.h>
  4. #include "../src/rdkafka.h"
  5. static int run = 1;
  6. static void stop(int sig){
  7. run = 0;
  8. fclose(stdin);
  9. }
  10. /*
  11. 每条消息调用一次该回调函数,说明消息是传递成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR)
  12. 还是传递失败(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR)
  13. 该回调函数由rd_kafka_poll()触发,在应用程序的线程上执行
  14. */
  15. static void dr_msg_cb(rd_kafka_t *rk,
  16. const rd_kafka_message_t *rkmessage, void *opaque){
  17. if(rkmessage->err)
  18. fprintf(stderr, "%% Message delivery failed: %s\n",
  19. rd_kafka_err2str(rkmessage->err));
  20. else
  21. fprintf(stderr,
  22. "%% Message delivered (%zd bytes, "
  23. "partition %"PRId32")\n",
  24. rkmessage->len, rkmessage->partition);
  25. /* rkmessage被librdkafka自动销毁*/
  26. }
  27. int main(int argc, char **argv){
  28. rd_kafka_t *rk;            /*Producer instance handle*/
  29. rd_kafka_topic_t *rkt;     /*topic对象*/
  30. rd_kafka_conf_t *conf;     /*临时配置对象*/
  31. char errstr[512];
  32. char buf[512];
  33. const char *brokers;
  34. const char *topic;
  35. if(argc != 3){
  36. fprintf(stderr, "%% Usage: %s <broker> <topic>\n", argv[0]);
  37. return 1;
  38. }
  39. brokers = argv[1];
  40. topic = argv[2];
  41. /* 创建一个kafka配置占位 */
  42. conf = rd_kafka_conf_new();
  43. /*创建broker集群*/
  44. if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,
  45. sizeof(errstr)) != RD_KAFKA_CONF_OK){
  46. fprintf(stderr, "%s\n", errstr);
  47. return 1;
  48. }
  49. /*设置发送报告回调函数,rd_kafka_produce()接收的每条消息都会调用一次该回调函数
  50. *应用程序需要定期调用rd_kafka_poll()来服务排队的发送报告回调函数*/
  51. rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);
  52. /*创建producer实例
  53. rd_kafka_new()获取conf对象的所有权,应用程序在此调用之后不得再次引用它*/
  54. rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));
  55. if(!rk){
  56. fprintf(stderr, "%% Failed to create new producer:%s\n", errstr);
  57. return 1;
  58. }
  59. /*实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic
  60. 对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,*/
  61. rkt = rd_kafka_topic_new(rk, topic, NULL);
  62. if (!rkt){
  63. fprintf(stderr, "%% Failed to create topic object: %s\n",
  64. rd_kafka_err2str(rd_kafka_last_error()));
  65. rd_kafka_destroy(rk);
  66. return 1;
  67. }
  68. /*用于中断的信号*/
  69. signal(SIGINT, stop);
  70. fprintf(stderr,
  71. "%% Type some text and hit enter to produce message\n"
  72. "%% Or just hit enter to only serve delivery reports\n"
  73. "%% Press Ctrl-C or Ctrl-D to exit\n");
  74. while(run && fgets(buf, sizeof(buf), stdin)){
  75. size_t len = strlen(buf);
  76. if(buf[len-1] == '\n')
  77. buf[--len] = '\0';
  78. if(len == 0){
  79. /*轮询用于事件的kafka handle,
  80. 事件将导致应用程序提供的回调函数被调用
  81. 第二个参数是最大阻塞时间,如果设为0,将会是非阻塞的调用*/
  82. rd_kafka_poll(rk, 0);
  83. continue;
  84. }
  85. retry:
  86. /*Send/Produce message.
  87. 这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列,
  88. 对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb)
  89. 用于在消息传递成功或失败时向应用程序发回信号*/
  90. if (rd_kafka_produce(
  91. /* Topic object */
  92. rkt,
  93. /*使用内置的分区来选择分区*/
  94. RD_KAFKA_PARTITION_UA,
  95. /*生成payload的副本*/
  96. RD_KAFKA_MSG_F_COPY,
  97. /*消息体和长度*/
  98. buf, len,
  99. /*可选键及其长度*/
  100. NULL, 0,
  101. NULL) == -1){
  102. fprintf(stderr,
  103. "%% Failed to produce to topic %s: %s\n",
  104. rd_kafka_topic_name(rkt),
  105. rd_kafka_err2str(rd_kafka_last_error()));
  106. if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){
  107. /*如果内部队列满,等待消息传输完成并retry,
  108. 内部队列表示要发送的消息和已发送或失败的消息,
  109. 内部队列受限于queue.buffering.max.messages配置项*/
  110. rd_kafka_poll(rk, 1000);
  111. goto retry;
  112. }
  113. }else{
  114. fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s\n",
  115. len, rd_kafka_topic_name(rkt));
  116. }
  117. /*producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为
  118. 传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其
  119. 发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll()
  120. 仍然被调用*/
  121. rd_kafka_poll(rk, 0);
  122. }
  123. fprintf(stderr, "%% Flushing final message.. \n");
  124. /*rd_kafka_flush是rd_kafka_poll()的抽象化,
  125. 等待所有未完成的produce请求完成,通常在销毁producer实例前完成
  126. 以确保所有排列中和正在传输的produce请求在销毁前完成*/
  127. rd_kafka_flush(rk, 10*1000);
  128. /* Destroy topic object */
  129. rd_kafka_topic_destroy(rkt);
  130. /* Destroy the producer instance */
  131. rd_kafka_destroy(rk);
  132. return 0;
  133. }

二、consumer

librdkafka进行kafka消费操作的大致步骤如下:

1、创建kafka配置

[cpp] view plain copy

  1. rd_kafka_conf_t *rd_kafka_conf_new (void)

2、创建kafka topic的配置

[cpp] view plain copy

  1. rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)

3、配置kafka各项参数

[cpp] view plain copy

  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,
  2. const char *name,
  3. const char *value,
  4. char *errstr, size_t errstr_size)

4、配置kafka topic各项参数

[cpp] view plain copy

  1. rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,
  2. const char *name,
  3. const char *value,
  4. char *errstr, size_t errstr_size)

5、创建consumer实例

[cpp] view plain copy

  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)

6、为consumer实例添加brokerlist

[cpp] view plain copy

  1. int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)

7、开启consumer订阅

[cpp] view plain copy

  1. rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)

8、轮询消息或事件,并调用回调函数

[cpp] view plain copy

  1. rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)

9、关闭consumer实例

[cpp] view plain copy

  1. rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)

10、释放topic list资源

[cpp] view plain copy

  1. rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)

11、销毁consumer实例

[cpp] view plain copy

  1. void rd_kafka_destroy (rd_kafka_t *rk)

12、等待consumer对象的销毁

[cpp] view plain copy

  1. int rd_kafka_wait_destroyed (int timeout_ms)

完整代码如下my_consumer.c

[cpp] view plain copy

  1. #include <string.h>
  2. #include <stdlib.h>
  3. #include <syslog.h>
  4. #include <signal.h>
  5. #include <error.h>
  6. #include <getopt.h>
  7. #include "../src/rdkafka.h"
  8. static int run = 1;
  9. //`rd_kafka_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。
  10. static rd_kafka_t *rk;
  11. static rd_kafka_topic_partition_list_t *topics;
  12. static void stop (int sig) {
  13. if (!run)
  14. exit(1);
  15. run = 0;
  16. fclose(stdin); /* abort fgets() */
  17. }
  18. static void sig_usr1 (int sig) {
  19. rd_kafka_dump(stdout, rk);
  20. }
  21. /**
  22. * 处理并打印已消费的消息
  23. */
  24. static void msg_consume (rd_kafka_message_t *rkmessage,
  25. void *opaque) {
  26. if (rkmessage->err) {
  27. if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {
  28. fprintf(stderr,
  29. "%% Consumer reached end of %s [%"PRId32"] "
  30. "message queue at offset %"PRId64"\n",
  31. rd_kafka_topic_name(rkmessage->rkt),
  32. rkmessage->partition, rkmessage->offset);
  33. return;
  34. }
  35. if (rkmessage->rkt)
  36. fprintf(stderr, "%% Consume error for "
  37. "topic \"%s\" [%"PRId32"] "
  38. "offset %"PRId64": %s\n",
  39. rd_kafka_topic_name(rkmessage->rkt),
  40. rkmessage->partition,
  41. rkmessage->offset,
  42. rd_kafka_message_errstr(rkmessage));
  43. else
  44. fprintf(stderr, "%% Consumer error: %s: %s\n",
  45. rd_kafka_err2str(rkmessage->err),
  46. rd_kafka_message_errstr(rkmessage));
  47. if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||
  48. rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)
  49. run = 0;
  50. return;
  51. }
  52. fprintf(stdout, "%% Message (topic %s [%"PRId32"], "
  53. "offset %"PRId64", %zd bytes):\n",
  54. rd_kafka_topic_name(rkmessage->rkt),
  55. rkmessage->partition,
  56. rkmessage->offset, rkmessage->len);
  57. if (rkmessage->key_len) {
  58. printf("Key: %.*s\n",
  59. (int)rkmessage->key_len, (char *)rkmessage->key);
  60. }
  61. printf("%.*s\n",
  62. (int)rkmessage->len, (char *)rkmessage->payload);
  63. }
  64. /*
  65. init all configuration of kafka
  66. */
  67. int initKafka(char *brokers, char *group,char *topic){
  68. rd_kafka_conf_t *conf;
  69. rd_kafka_topic_conf_t *topic_conf;
  70. rd_kafka_resp_err_t err;
  71. char tmp[16];
  72. char errstr[512];
  73. /* Kafka configuration */
  74. conf = rd_kafka_conf_new();
  75. //quick termination
  76. snprintf(tmp, sizeof(tmp), "%i", SIGIO);
  77. rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
  78. //topic configuration
  79. topic_conf = rd_kafka_topic_conf_new();
  80. /* Consumer groups require a group id */
  81. if (!group)
  82. group = "rdkafka_consumer_example";
  83. if (rd_kafka_conf_set(conf, "group.id", group,
  84. errstr, sizeof(errstr)) !=
  85. RD_KAFKA_CONF_OK) {
  86. fprintf(stderr, "%% %s\n", errstr);
  87. return -1;
  88. }
  89. /* Consumer groups always use broker based offset storage */
  90. if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",
  91. "broker",
  92. errstr, sizeof(errstr)) !=
  93. RD_KAFKA_CONF_OK) {
  94. fprintf(stderr, "%% %s\n", errstr);
  95. return -1;
  96. }
  97. /* Set default topic config for pattern-matched topics. */
  98. rd_kafka_conf_set_default_topic_conf(conf, topic_conf);
  99. //实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态
  100. rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
  101. if(!rk){
  102. fprintf(stderr, "%% Failed to create new consumer:%s\n", errstr);
  103. return -1;
  104. }
  105. //Librdkafka需要至少一个brokers的初始化list
  106. if (rd_kafka_brokers_add(rk, brokers) == 0){
  107. fprintf(stderr, "%% No valid brokers specified\n");
  108. return -1;
  109. }
  110. //重定向 rd_kafka_poll()队列到consumer_poll()队列
  111. rd_kafka_poll_set_consumer(rk);
  112. //创建一个Topic+Partition的存储空间(list/vector)
  113. topics = rd_kafka_topic_partition_list_new(1);
  114. //把Topic+Partition加入list
  115. rd_kafka_topic_partition_list_add(topics, topic, -1);
  116. //开启consumer订阅,匹配的topic将被添加到订阅列表中
  117. if((err = rd_kafka_subscribe(rk, topics))){
  118. fprintf(stderr, "%% Failed to start consuming topics: %s\n", rd_kafka_err2str(err));
  119. return -1;
  120. }
  121. return 1;
  122. }
  123. int main(int argc, char **argv){
  124. char *brokers = "localhost:9092";
  125. char *group = NULL;
  126. char *topic = NULL;
  127. int opt;
  128. rd_kafka_resp_err_t err;
  129. while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){
  130. switch (opt) {
  131. case 'b':
  132. brokers = optarg;
  133. break;
  134. case 'g':
  135. group = optarg;
  136. break;
  137. case 't':
  138. topic = optarg;
  139. break;
  140. default:
  141. break;
  142. }
  143. }
  144. signal(SIGINT, stop);
  145. signal(SIGUSR1, sig_usr1);
  146. if(!initKafka(brokers, group, topic)){
  147. fprintf(stderr, "kafka server initialize error\n");
  148. }else{
  149. while(run){
  150. rd_kafka_message_t *rkmessage;
  151. /*-轮询消费者的消息或事件,最多阻塞timeout_ms
  152. -应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务
  153. 所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要,
  154. 因为它需要被正确地调用和处理以同步内部消费者状态 */
  155. rkmessage = rd_kafka_consumer_poll(rk, 1000);
  156. if(rkmessage){
  157. msg_consume(rkmessage, NULL);
  158. /*释放rkmessage的资源,并把所有权还给rdkafka*/
  159. rd_kafka_message_destroy(rkmessage);
  160. }
  161. }
  162. }
  163. done:
  164. /*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置),
  165. commit offset到broker,并离开consumer group
  166. 最大阻塞时间被设置为session.timeout.ms
  167. */
  168. err = rd_kafka_consumer_close(rk);
  169. if(err){
  170. fprintf(stderr, "%% Failed to close consumer: %s\n", rd_kafka_err2str(err));
  171. }else{
  172. fprintf(stderr, "%% Consumer closed\n");
  173. }
  174. //释放topics list使用的所有资源和它自己
  175. rd_kafka_topic_partition_list_destroy(topics);
  176. //destroy kafka handle
  177. rd_kafka_destroy(rk);
  178. run = 5;
  179. //等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1
  180. while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){
  181. printf("Waiting for librdkafka to decommission\n");
  182. }
  183. if(run <= 0){
  184. //dump rdkafka内部状态到stdout流
  185. rd_kafka_dump(stdout, rk);
  186. }
  187. return 0;
  188. }

在linux下编译producer和consumer的代码:

[cpp] view plain copy

  1. gcc my_producer.c -o my_producer  -lrdkafka -lz -lpthread -lrt
  2. gcc my_consumer.c -o my_consumer  -lrdkafka -lz -lpthread -lrt

在运行my_producer或my_consumer时可能会报错"error while loading shared libraries xxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录

在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test_topic”的topic
启动方式可参考 kafka0.8.2集群的环境搭建并实现基本的生产消费

启动consumer:

启动producer,并发送一条数据“hello world”:

consumer处成功收到producer发送的“hello world”:

http://orchome.com/5

https://github.com/edenhill/librdkafka

https://github.com/mfontanini/cppkafka

https://github.com/zengyuxing007/kafka_test_cpp

librdkafka相关推荐

  1. Kafka C++客户端库librdkafka笔记

    目录 目录 1 1. 前言 2 2. 缩略语 2 3. 配置和主题 3 3.1. 配置和主题结构 3 3.1.1. Conf 3 3.1.2. ConfImpl 3 3.1.3. Topic 3 3. ...

  2. librdkafka 安装

    1,Git clone git clone https://github.com/edenhill/librdkafka.git 2,cd librdkafka/ 3,./configure 4,ma ...

  3. 解决librdkafka 报WARN:Protocol read buffer underflow

    https://github.com/edenhill/librdkafka/issues/1660 RT,公司的kafka更换了版本, 用到librdkafka的场景就开始报这条警告 解决方式如下: ...

  4. librdkafka介绍文档

    ntroduction to librdkafka - the Apache Kafka C/C++ client library librdkafka 是一个C实现的高性能 Apache Kafka ...

  5. kafka的c/c++高性能客户端librdkafka简介/使用librdkafka的C++接口实现简单的生产者和消费者

    Librdkafka是c语言实现的apachekafka的高性能客户端,为生产和使用kafka提供高效可靠的客户端,并且提供了c++接口 性能: Librdkafka 是一款专为现代硬件使用而设计的高 ...

  6. 启动Nginx时报错:error while loading shared libraries: librdkafka.so.1: cannot open shared object file: No

    安装了Nginx之后,启动时报错: error while loading shared libraries: librdkafka.so.1: cannot open shared object f ...

  7. librdkafka开源库使用总结

    使用C/C++语言操作Kafka时,librdkafka是首选的开源库 使用librdkafka创建消费者客户端时,应配置如下属性 消费者会话组保持活动心跳间隔 自动提交偏移 自动重置偏移 自动重置偏 ...

  8. (C++)librdkafka的producer样例

    近期在工程上需要推送信息给kafka,即简单实现kafka的producer功能,基于C++代码,过程介绍如下: (1)librdkafka的安装: 在centos7下较为简单,配置好源的情况下,直接 ...

  9. 基于 librdkafka C API 的三种seek随机访问方法

    尽管Kafka一般意义上都是建议顺序的消费数据,但难免会遇到回滚.重新处理等需求.甚至有些应用希望把kafka当做一个缓存来用,比如保留1天内的近时的数据记录,并支持各个消费者通过拖拽进度条的方式来查 ...

  10. kafka的c/c++高性能客户端librdkafka简介

    Librdkafka是c语言实现的apachekafka的高性能客户端,为生产和使用kafka提供高效可靠的客户端,并且提供了c++接口 性能: Librdkafka 是一款专为现代硬件使用而设计的高 ...

最新文章

  1. 浅显易懂 Makefile 入门 (08)— 默认 shell (/bin/sh)、命令回显、make参数(-n 只显示命令但不执行,-s 禁止所有回显)、单行命令、多行命令、并发执行
  2. 8-14-Exercise
  3. 云计算设计模式(一)缓存预留模式
  4. java实现遍历树形菜单方法——struts.xml实现
  5. 实例讲解getopt()函数的使用
  6. 计算机二级1605错误,word 出现windows installer 1605错误
  7. 数据库每日一题 2020.05.07
  8. JavaScript Demo - so cool
  9. java堆栈有序无序,浅谈Java并发编程系列(四)—— 原子性、可见性与有序性
  10. linux mysql 端口 查看进程_Linux如何查看端口状态
  11. web元件库/常用web组件/常用表单/导航栏/边框/图标/日期时间选择器/评分组件/穿梭框/输入框/步骤条/计数器/输入框/Axure原型/axure元件库/rp原型/交互控件/五星评分器/导航框架
  12. mysql mha reference_MySQL MHA配置常见问题
  13. java反序列化的原理,java – 反序列化的工作原理?
  14. vlan 动态ospf综合网络配置
  15. JVM基础思维导图(持续更新中)
  16. DXGI 方式采集流程
  17. Web服务器性能/压力测试工具http_load、webbench、ab、Siege
  18. b5纸尺寸_画册设计一般多大 宣传册设计用多大尺寸比较合适
  19. 存储卡被格式化了咋恢复文件?
  20. 计算机思维在化工方面的应用,计算思维在化学上的应用.pdf

热门文章

  1. Spring Boot 2.1.8.RELEASE集成UReport2 (四) 添加Mysql存储器
  2. GDB调试 ORBSLAM3
  3. STM8L051低功耗实现
  4. 数据库的那些乱七八糟烦人的锁(数据库锁机制有这一篇就够了)
  5. 【数据库】数据库的锁机制及原理
  6. 格雷码与二进制码的转换
  7. DTU助力于智能配电房监控系统
  8. DSTE经营分析会(战略落地的核心抓手)
  9. matlab教程pdf,Matlab2010经典超强教程(清晰、版).pdf
  10. matlab2010b和7.0,Matlab的安装(以matlab2010b和matlab7.0的安装方法为例)