1、首先需要编译rdkafka  编译kafka

2、编译好kafka 我们只需要用到 librdkafka.lib librdkafkacpp.lib librdkafka.dll librdkafka.dll (本人编译的是 windows下的release x64位 版本)这四个文件

3、在Qt 中将kafka消费者封装在线程中

头文件.h

#ifndef UDPCLIENT_H
#define UDPCLIENT_H
#include <QThread>
#include "rdkafkacpp.h"class udpclient : public QThread
{Q_OBJECTpublic:explicit udpclient(std::string, QObject *parent = nullptr);~udpclient();public:void msg_consume(RdKafka::Message* message, void* opaque);void setRun();private://重写线程执行函数void run();std::string m_kafkaIp;signals:void valueUpdate(const QString& str, const std::string& str1);};#endif // UDPCLIENT_H

源文件.cpp

#include "udpclient.h"
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <signal.h>static bool m_run = true;
static bool exit_eof = true;
static int eof_cnt = 0;
static int partition_cnt = 0;
static int verbosity = 1;
static long msg_cnt = 0;
static int64_t msg_bytes = 0;static long msg_cnt1 = 0;
static int64_t msg_bytes1 = 0;static void sigterm (int sig) {m_run = false;
}class ExampleEventCb : public RdKafka::EventCb {
public:void event_cb (RdKafka::Event &event) {switch (event.type()){case RdKafka::Event::EVENT_ERROR:std::cerr<<"ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str();if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)m_run = false;break;case RdKafka::Event::EVENT_STATS:std::cerr << "\"STATS\": " << event.str() << std::endl;break;case RdKafka::Event::EVENT_LOG:fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());break;case RdKafka::Event::EVENT_THROTTLE:std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " <<event.broker_name() << " id " << (int)event.broker_id() << std::endl;break;default:std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;break;}}
};class ExampleConsumeCb : public RdKafka::ConsumeCb {
public:void msg_consume(RdKafka::Message* message, void* opaque) {}void consume_cb (RdKafka::Message &msg, void *opaque) {
//        msg_consume(&msg, opaque);}
};udpclient::udpclient(std::string ip, QObject *parent):QThread(parent)
{m_kafkaIp = ip;
}udpclient::~udpclient()
{}void udpclient::msg_consume(RdKafka::Message* message, void* opaque) {switch (message->err()) {case RdKafka::ERR__TIMED_OUT://std::cerr << "RdKafka::ERR__TIMED_OUT"<<std::endl;break;case RdKafka::ERR_NO_ERROR:/* Real message */msg_cnt++;msg_bytes += message->len();if (verbosity >= 3)std::cerr << "Read msg at offset " << message->offset() << std::endl;RdKafka::MessageTimestamp ts;ts = message->timestamp();if (verbosity >= 2 &&ts.type != RdKafka::MessageTimestamp::MSG_TIMESTAMP_NOT_AVAILABLE) {std::string tsname = "?";if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_CREATE_TIME)tsname = "create time";else if (ts.type == RdKafka::MessageTimestamp::MSG_TIMESTAMP_LOG_APPEND_TIME)tsname = "log append time";std::cout << "Timestamp: " << tsname << " " << ts.timestamp << std::endl;}if (verbosity >= 2 && message->key()) {std::cout << "Key: " << *message->key() << std::endl;}if (verbosity >= 1) {//数据在这里可以进行处理/*处理代码*/emit valueUpdate(static_cast<const char *>(message->payload()), *message->key());}break;case RdKafka::ERR__PARTITION_EOF:/* Last message */if (exit_eof && ++eof_cnt == partition_cnt) {std::cerr << "%% EOF reached for all " << partition_cnt <<" partition(s)" << std::endl;m_run = false;}break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cerr << "Consume failed: " << message->errstr() << std::endl;m_run = false;break;default:/* Errors */std::cerr << "Consume failed: " << message->errstr() << std::endl;m_run = false;}
}void udpclient::setRun()
{m_run = false;
}//重写线程执行函数
void udpclient::run()
{std::string brokers = m_kafkaIp;std::string errstr;std::string topic_str="主题名";
//  std::string topic_str1 = "xxx";
//  std::string topic_str2 = "xxx";
//  std::string topic_str3 = "xxxs";std::vector<std::string> topics, topics1;std::string group_id="2221";RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);//group.id必须设置if (conf->set("group.id", group_id, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << errstr << std::endl;exit(1);}topics.push_back(topic_str);
//   topics.push_back(topic_str2);
//   topics1.push_back(topic_str1);
//   topics1.push_back(topic_str3);//bootstrap.servers可以替换为metadata.broker.listconf->set("bootstrap.servers", brokers, errstr);ExampleConsumeCb ex_consume_cb;conf->set("consume_cb", &ex_consume_cb, errstr);ExampleEventCb ex_event_cb;conf->set("event_cb", &ex_event_cb, errstr);conf->set("default_topic_conf", tconf, errstr);signal(SIGINT, sigterm);signal(SIGTERM, sigterm);RdKafka::KafkaConsumer *consumer = RdKafka::KafkaConsumer::create(conf, errstr);if (!consumer) {std::cerr << "Failed to create consumer: " << errstr << std::endl;exit(1);}std::cout << "% Created consumer " << consumer->name() << std::endl;RdKafka::ErrorCode err = consumer->subscribe(topics);if (err) {std::cerr << "Failed to subscribe to " << topics.size() << " topics: "<< RdKafka::err2str(err) << std::endl;exit(1);}while (m_run){//0毫秒未订阅到消息,触发RdKafka::ERR__TIMED_OUTRdKafka::Message *msg = consumer->consume(0);consumer->commitSync();msg_consume(msg, NULL);delete msg;msleep(100);}consumer->close();delete conf;delete tconf;delete consumer;std::cerr << "% Consumed " << msg_cnt << " messages ("<< msg_bytes << " bytes)" << std::endl;//应用退出之前等待rdkafka清理资源RdKafka::wait_destroyed(1000);quit();}

4、使用前需要将librdkafka.lib 和 librdkafkacpp.lib库文件添加到你自己的项目中

5、添加完库后,编译你自己的项目的时候需要注意编译器的选择需要和 librdkafka.lib库文件配套。比如你编译完的librdkafka.lib 是release x64位的,那么选择编译器来编译项目的时候也需要选择 release x64的

6、项目如果编译没有报错,但是运行的时候直接奔溃的话,只需要把librdkafka.dll librdkafka.dll这两个dll 放到你编译输出的xx.exe同级目录下即可

Qt 中使用librdkafka librdkafka++ 创建消费者相关推荐

  1. Qt中另一种创建线程的方式

    文章目录 1 Qt中另一种创建线程的方式 1.1 另一种创建线程的方式 1.2 同步型线程的设计 1.3 异步型线程的设计 1 Qt中另一种创建线程的方式 1.1 另一种创建线程的方式 历史的痕迹: ...

  2. 在Qt中使用C++代码创建界面

    好儿郎~志在四方 Qt视频教程地址:http://space.bilibili.com/84360636/#!/index 目录视图 摘要视图 订阅 图灵赠书--程序员11月书单    [思考]Pyt ...

  3. qt中opengl窗口的创建

    该笔记借鉴自 : "懂deeee珍惜"的 现代OpenGL+Qt学习笔记之二:程序框架 "爱种鱼的猫"的 QT中使用OpenGL(0)--创建一个窗口 引用引自 ...

  4. 在Qt中如何使用QtDesigner创建的UI文件(一) (转)

    使用Qt有一些时间了,一直在IDE环境(qtcreator和VS2003+集成器)中使用,自然少了很多麻烦的步骤.但是在享受这种便利的同时,我们也失去了理解更多知识背后的点滴.在IDE中,如果我们要开 ...

  5. Qt中多个动态创建的按钮同时绑定一个槽函数,判断被点击的是哪个按钮

    当动态创建按钮,每一个创建的按钮都与同一个槽函数绑定,点击按钮的时候获取被点击的按钮的文本. 代码如下: QString getClickedBtn() {outPut<<"ge ...

  6. qt中的数据库可以创建在主函数中吗_在qt中怎么建立数据库

    {"moduleinfo":{"card_count":[{"count_phone":1,"count":1}],&q ...

  7. 【Qt】在Qt中使用opencv,不要使用opencv创建窗口

    问题描述 在ubuntu14.04.5 Qt5.6中使用opencv创建窗口显示摄像头时,报错: (:1103): Gtk-WARNING **: gtk_disable_setlocale() mu ...

  8. Qt中创建excel文件

    1.用Qt自带的实现创建excel文件 QFile file(filename); file.open(QIODevice::ReadWrite); file.close(); 创建是创建了excel ...

  9. linux创建自定义组件qt,QT中的元对象系统:创建自定义的QT类型

    原创文章,转载请注明出处,谢谢! 作者:清林,博客名:飞空静渡 QVariant可以表示QT中的大部分类型,它和pascal中的variant类型或c中的void类型有点相似,不过它的使用和c中的un ...

  10. QT中创建条形统计图的方法

    QT中创建条形统计图的方法 .pro中 QT += charts 头文件 #include <QtCharts/QChartView> #include <QtCharts/QBar ...

最新文章

  1. 系统服务器巡查表,服务器操作系统巡检表
  2. 给Java初学者福利——Java语法基础
  3. java 18 - 6 TreeMap嵌套使用
  4. linux标准I/O——按行输入和输出
  5. 面试官最常问的问题总结(一)
  6. oracle10.2.0.4 dbca,在rhel5上oracle 10.2.0.4 上dbca silent删除数据库
  7. 基于MDC的SOA方案
  8. 00-自测3. 数组元素循环右移问题
  9. python go rpc_Python RPC 之 gRPC
  10. [转载]Qt之模型/视图(实时更新数据)
  11. java三角函数计算器_编程实现一个科学计算器,能够实现加减乘除,三角函数计算等。用户界面自己设计...
  12. 计算机操作系统|汤小丹|第四版|习题答案
  13. 04Selenium剩余部分及练习:爬取京东商品信息
  14. linux程序休眠,Linux 休眠原理与实现
  15. TML5期末大作业:咖啡网站设计——咖啡网站pc端带轮播js (5页) 学生酒水网页作业, 生鲜水果网页作业成品, 零食小吃网页作 美食网页业模板
  16. java虚拟机 for win7 64位_最新win7 64位旗舰版安装版下载(64位iso镜像)
  17. 4PCS、super4PCS粗配准算法理解
  18. JAVA并发编程:悲观锁与乐观锁
  19. 启益电商:商品详情页排版布局怎么做
  20. Qt多线程http下载器之一:仿百度网盘的http下载器

热门文章

  1. java list转成map对象_在Java 8中将List转换为Map对象方法
  2. 比特率与波特率的区别
  3. pycharm2016破解方法
  4. Fanuc机器人从控制柜导出GSD文件
  5. 基于JAVA+SpringBoot+Mybatis+MYSQL的应急值班值守管理系统
  6. STM32F1移植到STM32F407 (LD3320)
  7. ps制作计算机考试证件照,如何通过PS制作一寸证件照(超详细流程)?
  8. 超图openlayers
  9. 学会这些Sketchup技巧,工作效率提高一半
  10. java excel 导入试题