(C++)librdkafka的producer样例
近期在工程上需要推送信息给kafka,即简单实现kafka的producer功能,基于C++代码,过程介绍如下:
(1)librdkafka的安装:
在centos7下较为简单,配置好源的情况下,直接运行:
yum install librdkafka librdkafka-devel
为了在基于cmake2.8的工程用使用,在CMakeLists.txt加上:
include_directories(ProjectDemo /usr/include)
target_link_library(ProjectDeme /usr/lib64/librdkafka.so)
在windows下,则建议使用vcpkg工具。安装好,在vcpkg.exe的目录用用命令行运行:
#安装64位版本的librdkafka
.\vcpkg.exe install librdkafka:x64-windows
#为了能在visual studio中直接使用,因为安装64位的librdkafa,配置管理器一栏也要设施64位。
.\vcpkg.exe integrate install
(2) librdkafka的producer的样例代码(参考example中produce.cpp):
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>#if _AIX
#include <unistd.h>
#endif#include <librdkafka/rdkafkacpp.h>static volatile sig_atomic_t run = 1;static void sigterm(int sig) {run = 0;
}//该类实现消息回显(callback)功能
class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {public:void dr_cb(RdKafka::Message& message) {/* If message.err() is non-zero the message delivery failed permanently* for the message. */if (message.err())std::cerr << "% Message delivery failed: " << message.errstr() << std::endl;elsestd::cerr << "% Message delivered to topic " << message.topic_name() <<" [" << message.partition() << "] at offset " <<message.offset() << std::endl;}
};int main(int argc, char** argv) {if (argc != 3) {std::cerr << "Usage: " << argv[0] << " <brokers> <topic>\n";exit(1);}//kafka核心参数是brokers和top//borkers可以是一系列地址和端口,例如:127.0.0.1:12345;127.0.0.2:123456std::string brokers = argv[1];//kafka的topic,例如:testtopicstd::string topic = argv[2];printf(brokers.c_str());printf(topic.c_str());RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);std::string errstr;if (conf->set("bootstrap.servers", brokers, errstr) !=RdKafka::Conf::CONF_OK) {std::cerr << errstr << std::endl;exit(1);}signal(SIGINT, sigterm);signal(SIGTERM, sigterm);//设定消息回显功能ExampleDeliveryReportCb ex_dr_cb;if (conf->set("dr_cb", &ex_dr_cb, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << errstr << std::endl;exit(1);}//在此建立producer实例,conf可以删除了,避免内存泄漏。RdKafka::Producer* producer = RdKafka::Producer::create(conf, errstr);if (!producer) {std::cerr << "Failed to create producer: " << errstr << std::endl;exit(1);}delete conf;//从键盘输入消息,发送给producerstd::cout << "% Type message value and hit enter " <<"to produce message." << std::endl;for (std::string line; run && std::getline(std::cin, line);) {if (line.empty()) {producer->poll(0);continue;}//消息发送,此次采用的是异步模式retry:RdKafka::ErrorCode err =producer->produce(// Topic 参数topic,// Partition 参数: 用于根据key参数分配topic;如果无key参数,则为随机。RdKafka::Topic::PARTITION_UA,// 复制一遍要发送的值RdKafka::Producer::RK_MSG_COPY /* Copy payload */,// 要发送的数据,根据需要在此修改const_cast<char*>(line.c_str()), line.size(),// key 参数NULL, 0,// 时间戳,末日当前时间0,// 消息头NULL,// 每条消息的不透明值输出到报告中NULL);if (err != RdKafka::ERR_NO_ERROR) {std::cerr << "% Failed to produce to topic " << topic << ": " <<RdKafka::err2str(err) << std::endl;if (err == RdKafka::ERR__QUEUE_FULL) {//中间消息队列满了,再重发producer->poll(1000/*block for max 1000ms*/);goto retry;}}else {std::cerr << "% Enqueued message (" << line.size() << " bytes) " <<"for topic " << topic << std::endl;}//调用poll函数发送报告producer->poll(0);}//flush函数 功能是等待所有消息发送完std::cerr << "% Flushing final messages..." << std::endl;producer->flush(10 * 1000 /* wait for max 10 seconds */);if (producer->outq_len() > 0)std::cerr << "% " << producer->outq_len() <<" message(s) were not delivered" << std::endl;delete producer;return 0;
}
(C++)librdkafka的producer样例相关推荐
- 详细讲解如何使用Java连接Kafka构建生产者和消费者(带测试样例)
1 缘起 学习消息队列的过程中,先补习了RabbitMQ相关知识, 接着又重温了Kafka相关的知识, 发现,我并没有积累Java原生操作Kafka的文章, 只使用SpringBoot集成过Kafka ...
- YOLOv4 资源环境配置和测试样例效果
YOLOv4 资源环境配置和测试样例效果 基本环境:cuda=10.0,cudnn>=7.0, opencv>=2.4 一.下载yolov4 git clone https://githu ...
- 2021年大数据常用语言Scala(三十二):scala高级用法 样例类
目录 样例类 定义样例类 样例类方法 样例对象 样例类 样例类是一种特殊类,它可以用来快速定义一个用于保存数据的类(类似于Java POJO类),而且它会自动生成apply方法,允许我们快速地创建样例 ...
- 在Ubuntu下构建Bullet以及执行Bullet的样例程序
在Ubuntu下构建Bullet以及执行Bullet的样例程序 1.找到Bullet的下载页,地址是:https://code.google.com/p/bullet/downloads/list 2 ...
- JDBC连接MySQL数据库及演示样例
JDBC是Sun公司制定的一个能够用Java语言连接数据库的技术. 一.JDBC基础知识 JDBC(Java Data Base Connectivity,java数据库连接)是一种用 ...
- Oracle简单脚本演示样例
Oracle简单脚本演示样例 1.添加表 --改动日期:2014.09.21 --改动人:易小群 --改动内容:新增採购支付情况表 DECLARE VC_STR VARCHAR2( ...
- 【ZooKeeper Notes 3】ZooKeeper Java API 使用样例
查看PDF版本 转载请注明:@ni掌柜 nileader@gmail.com ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务框架,包含一组简单的原语集合.通过这些原语言的组合使用, ...
- ACMNO.24 C语言-转置矩阵 写一个函数,使给定的一个二维数组(3×3)转置,即行列互换。 输入 一个3x3的矩阵 输出 转置后的矩阵 样例
题目描述 写一个函数,使给定的一个二维数组(3×3)转置,即行列互换. 输入 一个3x3的矩阵 输出 转置后的矩阵 样例输入 1 2 3 4 5 6 7 8 9 样例输出 1 4 7 2 5 8 3 ...
- ACMNO.21 C语言-逆序输出 输入10个数字,然后逆序输出。 输入 十个整数 输出 逆序输出,空格分开 样例输入 1 2 3 4 5 6 7 8 9 0
题目描述 输入10个数字,然后逆序输出. 输入 十个整数 输出 逆序输出,空格分开 样例输入 1 2 3 4 5 6 7 8 9 0 样例输出 0 9 8 7 6 5 4 3 2 1 提示 数组?堆栈 ...
最新文章
- .NET经典资源站点汇总
- php的toast,使用toast组件实现提示用户忘记输入用户名或密码功能
- 判断 iframe 是否加载完成的完美方法(转)
- springmvc ajaxjson处理
- Focal loss及其实现
- 树莓派之安装docker
- 工作流实现自定义表单
- ITIL 4: 培训与认证
- 如何将自己的电脑做成服务器
- 快速开发~Rafy框架的初步认识
- Go sync.Pool 浅析
- 「解析」正则化 DropPath
- [Wc2008]游览计划 斯坦纳树
- 操作系统_逻辑地址转换为物理地址
- Ajax + $ajax
- [人脸对齐--综述] Facial Landmark Detection: a Literature Survey(2018)
- Google Pay支付遇到的问题
- JS实现腾讯地图搜索信息下拉以及定位...
- xp故障恢复控制台和它的命令 1
- php实现留言板功能代码,php实现留言板功能(会话控制)
热门文章
- Python:1-3、注释
- Linux-Samba的使用
- Bean named '' is expected to be of type '' but was actually of type 'com.sun.proxy.$Proxy30' 异常解决
- 流媒体之DirectShow——视频采集
- 遥测终端机RTU的特点以及应用领域
- 利用VS(Visual Studio)自带的工具查看dll/lib文件
- jasig cas java示例_单点登录cas jasig学习笔记
- STM32.UART5无法进入中断,HardFault
- 基于python的网上订餐系统论文模板
- 志愿者批量自动登录加团操作