近期在工程上需要推送信息给kafka,即简单实现kafka的producer功能,基于C++代码,过程介绍如下:
(1)librdkafka的安装:
在centos7下较为简单,配置好源的情况下,直接运行:

yum install librdkafka librdkafka-devel

为了在基于cmake2.8的工程用使用,在CMakeLists.txt加上:

include_directories(ProjectDemo /usr/include)
target_link_library(ProjectDeme /usr/lib64/librdkafka.so)

在windows下,则建议使用vcpkg工具。安装好,在vcpkg.exe的目录用用命令行运行:

#安装64位版本的librdkafka
.\vcpkg.exe install librdkafka:x64-windows
#为了能在visual studio中直接使用,因为安装64位的librdkafa,配置管理器一栏也要设施64位。
.\vcpkg.exe integrate install

(2) librdkafka的producer的样例代码(参考example中produce.cpp):

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>#if _AIX
#include <unistd.h>
#endif#include <librdkafka/rdkafkacpp.h>static volatile sig_atomic_t run = 1;static void sigterm(int sig) {run = 0;
}//该类实现消息回显(callback)功能
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {public:void dr_cb(RdKafka::Message& message) {/* If message.err() is non-zero the message delivery failed permanently* for the message. */if (message.err())std::cerr << "% Message delivery failed: " << message.errstr() << std::endl;elsestd::cerr << "% Message delivered to topic " << message.topic_name() <<" [" << message.partition() << "] at offset " <<message.offset() << std::endl;}
};int main(int argc, char** argv) {if (argc != 3) {std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";exit(1);}//kafka核心参数是brokers和top//borkers可以是一系列地址和端口,例如:127.0.0.1:12345;127.0.0.2:123456std::string brokers = argv[1];//kafka的topic,例如:testtopicstd::string topic = argv[2];printf(brokers.c_str());printf(topic.c_str());RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;if (conf->set("bootstrap.servers", brokers, errstr) !=RdKafka::Conf::CONF_OK) {std::cerr << errstr << std::endl;exit(1);}signal(SIGINT, sigterm);signal(SIGTERM, sigterm);//设定消息回显功能ExampleDeliveryReportCb ex_dr_cb;if (conf->set("dr_cb", &ex_dr_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << errstr << std::endl;exit(1);}//在此建立producer实例,conf可以删除了,避免内存泄漏。RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;exit(1);}delete conf;//从键盘输入消息,发送给producerstd::cout << "% Type message value and hit enter " <<"to produce message." << std::endl;for (std::string line; run && std::getline(std::cin, line);) {if (line.empty()) {producer->poll(0);continue;}//消息发送,此次采用的是异步模式retry:RdKafka::ErrorCode err =producer->produce(// Topic 参数topic,// Partition 参数: 用于根据key参数分配topic;如果无key参数,则为随机。RdKafka::Topic::PARTITION_UA,// 复制一遍要发送的值RdKafka::Producer::RK_MSG_COPY /* Copy payload */,// 要发送的数据,根据需要在此修改const_cast<char*>(line.c_str()), line.size(),// key 参数NULL, 0,// 时间戳,末日当前时间0,// 消息头NULL,// 每条消息的不透明值输出到报告中NULL);if (err != RdKafka::ERR_NO_ERROR) {std::cerr << "% Failed to produce to topic " << topic << ": " <<RdKafka::err2str(err) << std::endl;if (err == RdKafka::ERR__QUEUE_FULL) {//中间消息队列满了,再重发producer->poll(1000/*block for max 1000ms*/);goto retry;}}else {std::cerr << "% Enqueued message (" << line.size() << " bytes) " <<"for topic " << topic << std::endl;}//调用poll函数发送报告producer->poll(0);}//flush函数 功能是等待所有消息发送完std::cerr << "% Flushing final messages..." << std::endl;producer->flush(10 * 1000 /* wait for max 10 seconds */);if (producer->outq_len() > 0)std::cerr << "% " << producer->outq_len() <<" message(s) were not delivered" << std::endl;delete producer;return 0;
}

(C++)librdkafka的producer样例相关推荐

  1. 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)

    1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...

  2. YOLOv4 资源环境配置和测试样例效果

    YOLOv4 资源环境配置和测试样例效果 基本环境:cuda=10.0,cudnn>=7.0, opencv>=2.4 一.下载yolov4 git clone https://githu ...

  3. 2021年大数据常用语言Scala(三十二):scala高级用法 样例类

    目录 样例类 定义样例类 样例类方法 样例对象 样例类 样例类是一种特殊类,它可以用来快速定义一个用于保存数据的类(类似于Java POJO类),而且它会自动生成apply方法,允许我们快速地创建样例 ...

  4. 在Ubuntu下构建Bullet以及执行Bullet的样例程序

    在Ubuntu下构建Bullet以及执行Bullet的样例程序 1.找到Bullet的下载页,地址是:https://code.google.com/p/bullet/downloads/list 2 ...

  5. JDBC连接MySQL数据库及演示样例

    JDBC是Sun公司制定的一个能够用Java语言连接数据库的技术. 一.JDBC基础知识         JDBC(Java Data Base Connectivity,java数据库连接)是一种用 ...

  6. Oracle简单脚本演示样例

    Oracle简单脚本演示样例 1.添加表 --改动日期:2014.09.21 --改动人:易小群 --改动内容:新增採购支付情况表 DECLARE VC_STR           VARCHAR2( ...

  7. 【ZooKeeper Notes 3】ZooKeeper Java API 使用样例

    查看PDF版本 转载请注明:@ni掌柜 nileader@gmail.com ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务框架,包含一组简单的原语集合.通过这些原语言的组合使用, ...

  8. ACMNO.24 C语言-转置矩阵 写一个函数,使给定的一个二维数组(3×3)转置,即行列互换。 输入 一个3x3的矩阵 输出 转置后的矩阵 样例

    题目描述 写一个函数,使给定的一个二维数组(3×3)转置,即行列互换. 输入 一个3x3的矩阵 输出 转置后的矩阵 样例输入 1 2 3 4 5 6 7 8 9 样例输出 1 4 7 2 5 8 3 ...

  9. ACMNO.21 C语言-逆序输出 输入10个数字,然后逆序输出。 输入 十个整数 输出 逆序输出,空格分开 样例输入 1 2 3 4 5 6 7 8 9 0

    题目描述 输入10个数字,然后逆序输出. 输入 十个整数 输出 逆序输出,空格分开 样例输入 1 2 3 4 5 6 7 8 9 0 样例输出 0 9 8 7 6 5 4 3 2 1 提示 数组?堆栈 ...

最新文章

  1. .NET经典资源站点汇总
  2. php的toast,使用toast组件实现提示用户忘记输入用户名或密码功能
  3. 判断 iframe 是否加载完成的完美方法(转)
  4. springmvc ajaxjson处理
  5. Focal loss及其实现
  6. 树莓派之安装docker
  7. 工作流实现自定义表单
  8. ITIL 4: 培训与认证
  9. 如何将自己的电脑做成服务器
  10. 快速开发~Rafy框架的初步认识
  11. Go sync.Pool 浅析
  12. 「解析」正则化 DropPath
  13. [Wc2008]游览计划 斯坦纳树
  14. 操作系统_逻辑地址转换为物理地址
  15. Ajax + $ajax
  16. [人脸对齐--综述] Facial Landmark Detection: a Literature Survey(2018)
  17. Google Pay支付遇到的问题
  18. JS实现腾讯地图搜索信息下拉以及定位...
  19. xp故障恢复控制台和它的命令 1
  20. php实现留言板功能代码,php实现留言板功能(会话控制)

热门文章

  1. Python:1-3、注释
  2. Linux-Samba的使用
  3. Bean named '' is expected to be of type '' but was actually of type 'com.sun.proxy.$Proxy30' 异常解决
  4. 流媒体之DirectShow——视频采集
  5. 遥测终端机RTU的特点以及应用领域
  6. 利用VS(Visual Studio)自带的工具查看dll/lib文件
  7. jasig cas java示例_单点登录cas jasig学习笔记
  8. STM32.UART5无法进入中断,HardFault
  9. 基于python的网上订餐系统论文模板
  10. 志愿者批量自动登录加团操作