librdkafka 使用了 C++11,使用 VS2010 无法编译,需要使用更高版本的 VS 才能编译,我这里使用的是 VS2017。

1、编译版本

编译环境:windows VS2017
openssl 版本:openssl-1.0.2t(如果不想编译,可下载 Win32OpenSSL-1_0_2t.exe安装,同时编译库文件路径不使用 …\lib\VC\static,改为…\lib\VC即可)
librdkafka 版本:librdkafka-1.2.1(下载的是releases版本,相对提交版本较稳定,不建议下载最新版本)

2、编译openssl

1)安装 ActivePerl 初始化的时候,需要用到 perl 解释器
下载一个安装包,然后一直下一步就行,没有特殊处理;
2)打开 VS2017 开发命令提示符,进入 openssl 解压目录
3)配置 config 脚本
在提示符中执行下例语句:

#  编译release32位:
perl Configure VC-WIN32 no-asm --prefix=D:\openssl_win32#  编译release64位:
perl Configure VC-WIN64A#  编译debug32位:
perl Configure debug-VC-WIN32#  编译debug64位:
perl Configure debug-VC-WIN64A

我只使用过 VC-WIN64A 和 VC-WIN32 ,–prefix 是指定头文件、库文件路径生成路径,no-asm 使用是因为编译时报错,错误如下:

tmp32dll\sha1-586.asm(1432) : error A2070:invalid instruction operands
tmp32dll\sha1-586.asm(1576) : error A2070:invalid instruction operands
NMAKE : fatal error U1077: “"E:\Visuol Studio 2012\VC\BIN\cl.EXE"”: 返回代码“0x1”

还有另外一个错误需要禁用IPV6,这个错误我没有遇到,不过还是记录一下:

#  使用
perl Configure VC-WIN32 -DOPENSSL_USE_IPV6=0
#  错误
tmp32dll\sha1-586.asm(1432) : error A2070:invalid instruction operands
tmp32dll\sha1-586.asm(1576) : error A2070:invalid instruction operandsN
MAKE : fatal error U1077: “"E:\Visuol Studio 2012\VC\BIN\cl.EXE"”: 返回代码“0x2”

4)创建 makefile 文件
在提示符中执行下例语句:

#  创建32位makefile文件
ms\do_ms.bat
#  创建64位makefile文件
ms\do_win64a.ba

5)执行编译命令
在编译过程中,不论是32位还是64位编译,编译动态库都报错 (LIBEAY32.def : error LNK2001: 无法解析的外部符号 OPENSSL_rdtsc),编译静态库可以成功
(1)、编译动态库:
在提示符中执行下例语句:

#  编译dll
nmake -f ms\ntdll.mak
#  测试dll
nmake -f ms\ntdll.mak test

(2)、编译静态库:
在提示符中执行下例语句:

#  编译lib
nmake -f ms\nt.mak
#  测试lib
nmake -f ms\nt.mak test

6)库安装

#  生成lib库
nmake -f ms\nt.mak install
#  测试dll
nmake -f ms\ntdll.mak install

最终会在指定路径,生成如下文件:

7)清理编译内容

nmake -f ms\nt.mak clean
nmake -f ms\ntdll.mak clean

如果要重新编译,只需执行清理指令,再按1-6步骤重新执行即可。

3、编译 librdkafka

1)错误修改
1、若直接运行,会报以下错误

1、Error This project references NuGet package(s) that are missing on this computer. Use NuGet Package Restore to download them. For more information, see http://go.microsoft.com/fwlink/?LinkID=322105.

修改方法:
找到 项目文件.csproj,打开后,移除下面的内容

<Target Name="EnsureNuGetPackageBuildImports" BeforeTargets="PrepareForBuild"><PropertyGroup><ErrorText>This project references NuGet package(s) that are missing on this computer. Enable NuGet Package Restore to download them.  For more information, see http://go.microsoft.com/fwlink/?LinkID=322105. The missing file is {0}.</ErrorText></PropertyGroup><Error Condition="!Exists('$(SolutionDir)\.nuget\NuGet.targets')" Text="$([System.String]::Format('$(ErrorText)', '$(SolutionDir)\.nuget\NuGet.targets'))" />
</Target>

2、未包含 zlib.h 文件
这个是应该 NuGet 管理包的问题,我也不是很熟悉,我找到的解决方法是从 win32/packages/ 文件中找到 zlib 库文件及头文件,添加到附加库目录中。
2)添加库文件及头文件



3)编译 librdkafkacpp
若库文件编译只编译 librdkafka,在后续使用代码编译报错:

librdkafka.dll : fatal error LNK1107: 文件无效或损坏: 无法在 0x3A8 处读取

后续代码中的库应使用 librdkafkacpp 编译出来的 librdkafka.dll 与 librdkafkacpp.dll 动态库。

4、librdkafka的使用

编译完的 librdkafka 库还是无法在 VS2018 中使用,可以编译通过,但是运行报错。
1)创建项目,添加库文件
头文件:头文件:\src-cpp\rdkafkacpp.h \src\rdkafka.h ;
库文件:配置 librdkafka.dll 与 librdkafkacpp.dll 动态库(debug 和 release 路径中 kafka 库文件还需要 libzstd.dll 和 zlib.dll);
2、生产者代码
KafkaProducerClient.h

#ifndef KAFKAPRODUCERCLIENT_H
#define KAFKAPRODUCERCLIENT_H
#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <list>
#include <kafka/rdkafkacpp.h>
#include <vector>
#include <fstream>using namespace std;
using std::string;
using std::list;
using std::vector;
using std::fstream;class KafkaProducerDeliveryReportCallBack : public RdKafka::DeliveryReportCb {public:void dr_cb(RdKafka::Message &message) {std::cout << "Message delivery for (" << message.len() << " bytes): " <<message.errstr() << std::endl;if (message.key())std::cout << "Key: " << *(message.key()) << ";" << std::endl;}
};
class KafkaProducerEventCallBack : public RdKafka::EventCb {public:void event_cb(RdKafka::Event &event) {switch (event.type()){case RdKafka::Event::EVENT_ERROR:std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)break;case RdKafka::Event::EVENT_STATS:std::cerr << "\"STATS\": " << event.str() << std::endl;break;case RdKafka::Event::EVENT_LOG:fprintf(stderr, "LOG-%i-%s: %s\n",event.severity(), event.fac().c_str(), event.str().c_str());break;default:std::cerr << "EVENT " << event.type() <<" (" << RdKafka::err2str(event.err()) << "): " <<event.str() << std::endl;break;}}
};
class KafkaProducerClient
{public:KafkaProducerClient(const string &brokers, const string &topics, int nPpartition = 0);virtual ~KafkaProducerClient();bool Init();void Send(const string &msg);void Stop();
private:RdKafka::Producer *m_pProducer;RdKafka::Topic *m_pTopic;KafkaProducerDeliveryReportCallBack m_producerDeliveryReportCallBack;KafkaProducerEventCallBack m_producerEventCallBack;std::string m_strTopics;std::string m_strBroker;bool m_bRun;int m_nPpartition;
};
#endif // KAFKAPRODUCERCLIENT_H

KafkaProducerClient.cpp

#include <stdafx.h>
#include "KafkaProducerClient.h"KafkaProducerClient::KafkaProducerClient(const string &brokers, const string &topics, int nPpartition /*= 1*/): m_bRun(true), m_strTopics(topics), m_strBroker(brokers), m_nPpartition(nPpartition)
{m_pTopic = NULL;m_pProducer = NULL;m_nPpartition = 0;
}KafkaProducerClient::~KafkaProducerClient()
{Stop();
}bool KafkaProducerClient::Init()
{string errstr = "";/** Create configuration objects*/RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);/*Set configuration properties,设置broker list*/if (conf->set("metadata.broker.list", m_strBroker, errstr) != RdKafka::Conf::CONF_OK){std::cerr << "RdKafka conf set brokerlist failed :" << errstr.c_str() << endl;}/* Set delivery report callback */conf->set("dr_cb", &m_producerDeliveryReportCallBack, errstr);conf->set("event_cb", &m_producerEventCallBack, errstr);/** Create producer using accumulated global configuration.*/m_pProducer = RdKafka::Producer::create(conf, errstr);if (!m_pProducer) {std::cerr << "Failed to create producer: " << errstr << std::endl;return false;}std::cout << "% Created producer " << m_pProducer->name() << std::endl;/** Create topic handle.*/m_pTopic = RdKafka::Topic::create(m_pProducer, m_strTopics,tconf, errstr);if (!m_pTopic) {std::cerr << "Failed to create topic: " << errstr << std::endl;return false;}return true;
}
void KafkaProducerClient::Send(const string &msg)
{if (!m_bRun)return;/** Produce message*/RdKafka::ErrorCode resp = m_pProducer->produce(m_pTopic, m_nPpartition,RdKafka::Producer::RK_MSG_COPY /* Copy payload */,const_cast<char *>(msg.c_str()), msg.size(),NULL, NULL);if (resp != RdKafka::ERR_NO_ERROR)std::cerr << "Produce failed: " << RdKafka::err2str(resp) << std::endl;elsestd::cerr << "Produced message (" << msg.size() << " bytes)" << std::endl;m_pProducer->poll(0);/* Wait for messages to be delivered */  //firecat addwhile (m_bRun && m_pProducer->outq_len() > 0) {std::cerr << "Waiting for " << m_pProducer->outq_len() << std::endl;m_pProducer->poll(100);}
}void KafkaProducerClient::Stop()
{delete m_pTopic;delete m_pProducer;
}

KafkaProducer.cpp

#include "stdafx.h"
#include <iostream>
#include "KafkaProducerClient.h"int _tmain(int argc, _TCHAR* argv[])
{KafkaProducerClient* KafkaprClient_ = new KafkaProducerClient("10.10.10.182:9092", "test", 1);KafkaprClient_->Init();char str_msg[] = "Hello Kafka!";while (fgets(str_msg, sizeof(str_msg), stdin)){size_t len = strlen(str_msg);if (str_msg[len - 1] == '\n'){str_msg[--len] = '\0';}if (strcmp(str_msg, "end") == 0){break;}KafkaprClient_->Send(str_msg);}return 0;
}

3、生产者代码
KafkaConsumerClient.h

#ifndef KAFKACONSUMERCLIENT_H
#define KAFKACONSUMERCLIENT_H#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>
#include <list>
#include <kafka/rdkafkacpp.h>
#include <vector>
#include <fstream>using namespace std;class KafkaConsumerClient {public:KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition = 0, int64_t offset = 0);virtual ~KafkaConsumerClient();//初始化bool Init();//开始获取消息void Start(int timeout_ms);//停止void Stop();private:void Msg_consume(RdKafka::Message* message, void* opaque);private:std::string m_strBrokers;std::string m_strTopics;std::string m_strGroupid;int64_t m_nLastOffset;RdKafka::Consumer *m_pKafkaConsumer;RdKafka::Topic    *m_pTopic;int64_t           m_nCurrentOffset;int32_t           m_nPartition;bool m_bRun;
};
#endif // KAFKACONSUMERCLIENT_H

KafkaConsumerClient.cpp

#include <stdafx.h>
#include "KafkaConsumerClient.h"KafkaConsumerClient::KafkaConsumerClient(const std::string& brokers, const std::string& topics, std::string groupid, int32_t nPartition /*= 0*/, int64_t offset /*= 0*/):m_strBrokers(brokers),m_strTopics(topics),m_strGroupid(groupid),m_nPartition(nPartition),m_nCurrentOffset(offset)
{m_nLastOffset = 0;m_pKafkaConsumer = NULL;m_pTopic         = NULL;m_nCurrentOffset  = RdKafka::Topic::OFFSET_BEGINNING;m_nPartition      = 0;m_bRun = false;
}KafkaConsumerClient::~KafkaConsumerClient()
{Stop();
}bool KafkaConsumerClient::Init() {std::string errstr;RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);if (!conf) {std::cerr << "RdKafka create global conf failed" << endl;return false;}/*设置broker list*/if (conf->set("metadata.broker.list", m_strBrokers, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "RdKafka conf set brokerlist failed ::" << errstr.c_str() << endl;}/*设置consumer group*/if (conf->set("group.id", m_strGroupid, errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "RdKafka conf set group.id failed :" << errstr.c_str() << endl;}std::string strfetch_num = "10240000";/*每次从单个分区中拉取消息的最大尺寸*/if (conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != RdKafka::Conf::CONF_OK){std::cerr << "RdKafka conf set max.partition failed :" << errstr.c_str() << endl;}/*创建kafka consumer实例*/ //Create consumer using accumulated global configuration.m_pKafkaConsumer = RdKafka::Consumer::create(conf, errstr);if (!m_pKafkaConsumer) {std::cerr << "failed to ceate consumer" << endl;}std::cout << "% Created consumer " << m_pKafkaConsumer->name() << std::endl;delete conf;/*创建kafka topic的配置*/RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);if (!tconf) {std::cerr << "RdKafka create topic conf failed" << endl;return false;}if (tconf->set("auto.offset.reset", "smallest", errstr) != RdKafka::Conf::CONF_OK) {std::cerr << "RdKafka conf set auto.offset.reset failed:" << errstr.c_str() << endl;}/** Create topic handle.*/m_pTopic = RdKafka::Topic::create(m_pKafkaConsumer, m_strTopics, tconf, errstr);if (!m_pTopic) {std::cerr << "RdKafka create topic failed :" << errstr.c_str() << endl;}delete tconf;/** Start consumer for topic+partition at start offset*/RdKafka::ErrorCode resp = m_pKafkaConsumer->start(m_pTopic, m_nPartition, m_nCurrentOffset);if (resp != RdKafka::ERR_NO_ERROR) {std::cerr << "failed to start consumer : " << errstr.c_str() << endl;}return true;
}
void KafkaConsumerClient::Msg_consume(RdKafka::Message* message, void* opaque) {switch (message->err()) {case RdKafka::ERR__TIMED_OUT:break;case RdKafka::ERR_NO_ERROR:/* Real message */std::cout << "Read msg at offset " << message->offset() << std::endl;if (message->key()) {std::cout << "Key: " << *message->key() << std::endl;}printf("%.*s\n",static_cast<int>(message->len()),static_cast<const char *>(message->payload()));m_nLastOffset = message->offset();break;case RdKafka::ERR__PARTITION_EOF:/* Last message */cout << "Reached the end of the queue, offset: " << m_nLastOffset << endl;//Stop();break;case RdKafka::ERR__UNKNOWN_TOPIC:case RdKafka::ERR__UNKNOWN_PARTITION:std::cerr << "Consume failed: " << message->errstr() << std::endl;Stop();break;default:/* Errors */std::cerr << "Consume failed: " << message->errstr() << std::endl;Stop();break;}
}
void KafkaConsumerClient::Start(int timeout_ms){RdKafka::Message *msg = NULL;m_bRun = true;while (m_bRun) {msg = m_pKafkaConsumer->consume(m_pTopic, m_nPartition, timeout_ms);Msg_consume(msg, NULL);delete msg;m_pKafkaConsumer->poll(0);}m_pKafkaConsumer->stop(m_pTopic, m_nPartition);m_pKafkaConsumer->poll(1000);if (m_pTopic) {delete m_pTopic;m_pTopic = NULL;}if (m_pKafkaConsumer) {delete m_pKafkaConsumer;m_pKafkaConsumer = NULL;}/*销毁kafka实例*/ //Wait for RdKafka to decommission.RdKafka::wait_destroyed(5000);
}void KafkaConsumerClient::Stop()
{m_bRun = false;
}

KafkaConsumer.cpp

#include "stdafx.h"
#include <iostream>
#include "KafkaConsumerClient.h"int _tmain(int argc, _TCHAR* argv[])
{KafkaConsumerClient *KafkaConsumerClient_ = new KafkaConsumerClient("10.10.10.182:9092", "test", "0", 0, RdKafka::Topic::OFFSET_BEGINNING);//OFFSET_BEGINNING,OFFSET_ENDif (!KafkaConsumerClient_->Init()){fprintf(stderr, "kafka server initialize error\n");return -1;}KafkaConsumerClient_->Start(1000);return 0;
}

5、kafka测试

1、下载 kafka
下载 kafka,并解压
2、启动 zookeeper
正式运行需要安装 zookeeper,若测试只需在解压的 kafka 路径下执行:

.\bin\windows\zookeeper-server-start.bat config\zookeeper.properties

zookeeper.properties是zookeeper配置文件,默认 zookeeper 监听本地2181端口
3、启动 kafka,执行以下语句:

.\bin\windows\kafka-server-start.bat config\server.properties# kafka文件内还有测试的生成和消费
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092 --topic test# 消息
.\bin\windows\kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
# 0.90版本之后消费者启动:
.\bin\windows\kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

4、启动编写的 kafka 程序,验证库文件是否异常
测试结果:

librdkafka编译及简单使用过程简介相关推荐

  1. RISC-V嵌入式开发准备篇1:编译过程简介

    原文出处:https://mp.weixin.qq.com/s/-syKN0DibKGGPCllaeNqMg 随着国内第一本RISC-V中文书籍<手把手教你设计CPU--RISC-V处理器篇&g ...

  2. webassembly_全球最简单的WebAssembly简介?

    webassembly by Martin Olsansky (olso) 由Martin Olsansky(olso) 全球最简单的WebAssembly简介? (The world's easie ...

  3. MacPorts打包过程简介

    本文主要为大家介绍一下MacPorts的打包过程. 往期回顾:通过git bisect快速定位大型工程中的问题 MacPorts 与 Homebrew Homebrew 相信很多人都听说过,它是 ma ...

  4. java 程序编译和运行的过程

    Java整个编译以及运行的过程相当繁琐,本文通过一个简单的程序来简单的说明整个流程. 如下图,Java程序从源文件创建到程序运行要经过两大步骤:1.源文件由编译器编译成字节码(ByteCode)  2 ...

  5. UA MATH565C 随机微分方程II Wiener过程简介

    UA MATH565C 随机微分方程II Wiener过程简介 Wiener过程的简单性质 Wiener过程的定义 在上一讲我们定义了WtW_tWt​: dWt=ηtdt⇔Wt=∫0tηsdsdW_t ...

  6. Java程序编译和运行的过程

    Java整个编译以及运行的过程相当繁琐,本文通过一个简单的程序来简单的说明整个流程. 如下图,Java程序从源文件创建到程序运行要经过两大步骤:1.源文件由编译器编译成字节码(ByteCode)  2 ...

  7. C++ —— C++程序编译的四个过程

    C++ -- C++程序编译的四个过程 g++是Linux下C++的编译器:我为什么会选择Linux下的g++编译器,就是因为g++可以看到程序从编译到运行的过程做了些什么.而VS等集成开发环境看不到 ...

  8. Thrift 在Windows环境下的编译与简单C++应用

    前言 Thrift是Facebook提供的一个跨语言的服务部署框架,可以实现客户端和服务器远程过程调用.相较于Google的grpc框架,Thrift对三方库依赖更少,编译更简单,并且运行效率也更高. ...

  9. 执行引擎的工作过程、Java代码编译和执行的过程、解释器、JIT编译器

    执行引擎概述 执行引擎是Java虛拟机核心的组成部分之一. "虚拟机"是-一个相对于"物理机"的概念,这两种机器都有代码执行能力,其区别是物理机的执行引擎是直接 ...

  10. oracle xe 安装配置,(转)oracle 10g xe 我的安装实践及简单配置过程

    (转)oracle 10g xe 我的安装实践及简单配置过程 我的安装过程 1.编辑 /etc/apt/sources.list : sudo cp /etc/apt/sources.list /et ...

最新文章

  1. 求介绍matlab函数用法的书,MATLAB初学者教程--函数用法的简单介绍
  2. 我最喜欢的几个苏州美食
  3. 操作系统原理:中断,异常,系统调用
  4. 解密昇腾AI处理器--Ascend310简介
  5. angular 字符串转换成数字_Angular日期在TypeScript中格式化转换应用
  6. c语言规定 程序中用到的变量一定要,C语言为什么要规定对所用到的变量要“先定义,后使用”...
  7. cs231n---CNN架构
  8. C++中map的用法详解
  9. echart 世界地图发光_echarts生成世界地图,百度echarts生成世界地图方法
  10. 用简单的语句讲解浏览器输入地址进入servlet原理
  11. 计算机系统中引入多道程序设计的目的在于,引入多道程序的目的在于什么
  12. da8da八字排盘官方下载_da8da六爻排盘
  13. 设计师:设计师的知识储备之异形曲面设计 巴洛克、洛可可 设计理论(三角形构图、平衡式构图、三分法构图 、九宫格构图)之详细攻略
  14. matlab的菜单编程实例,MATLAB-GUI 里面包含9个小例子,简单的讲述了各种控件和菜单menu的应用方法 ComboBox 组合框 266万源代码下载- www.pudn.com...
  15. 深度神经网络之Keras(二)——监督学习:回归
  16. RabbitMQ服务启动成功后就自动停止
  17. 默认页面设置html5,PHICOMM默认登录入口管理页面
  18. Kubernetes 存活探针和就绪探针的最佳实践
  19. 笔的图片 html,笔的素描画图片
  20. 鼠标滑轮控制Div水平滚动

热门文章

  1. 学习c语言编程用什么软件_用C编程
  2. linux下重启tomcat命令
  3. 小程序简介和开发工具
  4. OLDX-FC开源飞控
  5. 【前端react 粒子特效】
  6. IIS服务的命令行方式重启命令
  7. 华为s5720默认用户名和密码_华为华为交换机的默认用户名和密码是多少?谢谢!...
  8. 高速系统设计自学笔记——信号完整性6
  9. PHP获取客户端IP的方法
  10. 高德地图:热力图、点位基础使用