熟悉FPGA硬件和实时软件开发的团队,一般对消息队列等中间件不是很熟悉。在以前的各类应用中,主要只涉及FPGA/DSP和上位机之间通信,团队积累了不少“点到点”高速吞吐+双向水位控制代码。因此,即使遇到PC之间的交互,也习惯使用自定义的TCP、UDP协议搞定一切。

但此单合同涉及的后续处理流程十分复杂,需要涉及多个独立的可伸缩节点,以及适应未来部署后用户的二次扩展应用。为了避免用户接触过多的底层协议,并降低伸缩节点带来的配置文件修改维护耦合度, 我们新进的软件部负责人建议公司尝试用消息队列取代传统的UDP/TCP协议。经过初步测试,我们认为以Kafka为代表的具备持久化和按需保序功能的消息队列,可取代私有TCP协议完成FPGA输出数据的灵活流转。

根据甲方的要求,所有的生产环境服务器均在windows Server2012下运行,这为项目的推进造成了意想不到的困扰。尤其是Kafka与windows环境、Visual C++、Qt环境的结合,一度牵制了项目的进度。经过不断尝试,工程师们最终找到了在windows C++环境中引入Kafka最为简便的方法。参考代码仓库
https://gitcode.net/coloreaglestdio/qtcpp_demo/-/tree/master/kafka/rdkafka_qt

本文不再探讨消息队列本身,以及Kafka的知识。相关知识建议参考朱忠华先生的著作《深入理解Kafka:核心设计与实践原理》,从简单的搭建到内部原理介绍的比较详细。

1. Kafka在Windows下更名失败崩溃问题

目前,windows并不是Kafka推荐的生产环境。尽管通过官网获取的kafka_2.13-3.1.0版本,在windows下有成套的批处理脚本,但存在严重问题。主要是Topic 手工删除、过期删除会导致进程抛出异常,导致服务崩溃。我们采用Always Up,或者编写批处理文件,进行重启。节点崩溃主要因为改名行为与windows的文件系统不契合,导致zookeeper、kafka文件夹下一些状态文件无法更名。

1.1 临时方法:自动重启

通过重启,可解决此问题。
启动Zookeeper的脚本:

cd /d %~dp0
:LOOP
CALL bin\windows\zookeeper-server-start.bat  config\zookeeper.properties
GOTO :LOOP

启动Kafka的脚本:

cd /d %~dp0
:LOOP
CALL bin\windows\kafka-server-start.bat  config\server.properties
GOTO :LOOP

如此处理后,各个节点即使崩溃,经过几次重启,可以恢复工作。但这种方法带来的坏处是每次清理一周以前的数据时,实时进程都会暂时被打断半分钟左右,非常不优雅。尤其是在大数据流量情况下,1GB文件的时间跨度很小,导致重启不可接受。

1.2 使用虚拟化技术

项目通过Oracle VirtualBox + Centos的方式,解决了该问题。使用VirtualBox 虚拟机,比Docker更为灵活。工作在16TB SSD 阵列上的虚拟机,宿主有64GB内存加持,根本不在乎OS的开销。与Docker相比,VM的重量级虚拟化维护更加傻瓜化,甲方经过稍微培训,即可完成操作。

需要注意的是,网络最好使用桥接。桥接保证了zookeeper和kafka的端口可达性。同时,在一台物理计算机上,不同盘阵中运行多个Kafka隔离器(broker实例), 在降低集群成本的同时,起到一定的备份容灾的作用。

2. 在MSYS2下取得librdkafka库

C++环境使用Kafka,一般用librdkafka。但我们决定只使用C接口。鉴于不同的团队(甲方后续会二次开发)可能使用的编译器不同, C++接口的库要为所有的可能编译器准备特定的二进制版本,维护性很差。要在VC2010,VC2013,VC2017,VC2019、 Mingw32, Mingw64,LLVM,Intel C++等编译器下进行适配,只有C接口是可行的。

librdkafka在windows下,编译也有点讨厌。有没有编译好的库,一股脑解决所有编译器的适配问题呢?答案是肯定的,当然就是 MSYS2环境了。

2.1 获取librdkafka

在MSYS2下,执行

$pacman -S mingw32/mingw-w64-i686-librdkafka mingw64/mingw-w64-x86_64-librdkafka

即可获得二进制库、头文件和动态链接库。

2.2 提取librdkafka文件

使用ldd查看DLL的依赖:

 $ ldd librdkafka.dll | grep "mingw64"libzstd.dll => /mingw64/bin/libzstd.dll (0x7fff99520000)libgcc_s_seh-1.dll => /mingw64/bin/libgcc_s_seh-1.dll (0x7fffc71d0000)liblz4.dll => /mingw64/bin/liblz4.dll (0x7fffc7490000)libcrypto-1_1-x64.dll => /mingw64/bin/libcrypto-1_1-x64.dll (0x7fffac400000)libwinpthread-1.dll => /mingw64/bin/libwinpthread-1.dll (0x7fffc1f80000)zlib1.dll => /mingw64/bin/zlib1.dll (0x7fffbb690000)libssl-1_1-x64.dll => /mingw64/bin/libssl-1_1-x64.dll (0x7fffc5230000)

拷贝文件到文件夹librdkafka, 文件夹结构:

D:\librdkafka
├─bin32
│      libcrypto-1_1.dll
│      libgcc_s_dw2-1.dll
│      liblz4.dll
│      librdkafka.dll
│      libssl-1_1.dll
│      libwinpthread-1.dll
│      libzstd.dll
│      zlib1.dll
│
├─bin64
│      libcrypto-1_1-x64.dll
│      libgcc_s_seh-1.dll
│      liblz4.dll
│      librdkafka.dll
│      libssl-1_1-x64.dll
│      libwinpthread-1.dll
│      libzstd.dll
│      zlib1.dll
│
├─include
│  └─librdkafka
│          rdkafka.h
│          rdkafka_mock.h
│
├─lib32
│      librdkafka.dll.a
│
└─lib64librdkafka.dll.a

3. 应用

一旦得到了完整的库文件,即可在多个编译器下使用了。要注意的是,使用C接口,Visual C++也可以直接使用.a 格式的LIB,不需要额外的操作变成.lib

3.1 使用C++从examples直接封装C接口

尽管使用C接口的librdkafka库,我们还是希望避免冗长的初始化和状态维护,做一些简单的封装。Kafka的官方produce.c, consumer.c是非常直观的例子,我们从这两个文件开始,进行封装。

(1)生产者

namespace KafkaClient {class kafka_producer{public:explicit kafka_producer(const std::string & brokers,const std::string & topic);virtual ~kafka_producer();public:bool write(const char * data,const int len,const int maxTry = 10,const char * key = nullptr,const int keylen = 0);protected:bool init();bool exit();protected:rd_kafka_t *rk = nullptr;       /* Producer instance handle */std::string m_topic;std::string m_brokers;};

(2)消费者

namespace KafkaClient {class kafka_consumer{public:explicit kafka_consumer(const std::string & brokers,const std::vector<std::string> topics,const std::string & group);virtual ~kafka_consumer();protected:bool init();bool exit();public:bool stop();bool run(std::function<void (rd_kafka_message_t *)> cb );protected:rd_kafka_t *rk = nullptr;       /* Producer instance handle */std::atomic<bool> m_stop;std::vector<std::string> m_topics;std::string m_brokers;std::string m_group;};
}

注意的是,我们希望用户在一个独立的线程里进行消费,因此设置了run()函数传入一个回调。

3.2 为图形界面适配printf

官方例子里,大量使用stderr作为状态输出。这对于控制台程序没有问题,对GUI程序就讨厌了。但通过可变参数回调,可以直接替换printf,官方的控制台例子妥妥变成GUI:

 //Default printf Callbackvoid csprintf (const char * format,...){va_list args;va_start(args, format);vfprintf(stderr,format, args);va_end(args);}//GUI printfvoid uiprintf (const char * format,...){va_list args;va_start(args, format);if (instance){char buf[1024];vsnprintf(buf,1024,format,args);buf[1023] = 0;GUI()->shootMsg(QString(buf));}elsevfprintf(stderr,format, args);va_end(args);}//选择Console 或者GUIvoid (*cbprintf) (const char *,...) = csprintf;

在处理GUI消息时,牵扯到多线程问题。我们对多线程进行了优化,详细情况参考代码库。

3.3 在独立的线程消费

在现代C++下,直接初始化线程进行消费. C++现代标准中的Lambada表达式捕获,显著降低了多线程编程的代码量。

 consumer = new kafka_consumer(ui->lineEdit_brokers->text().toStdString(),topics,ui->lineEdit_group->text().toStdString());m_runthread = new std::thread([&]()->void{consumer->run([&](rd_kafka_message_t * rkm)->void{if (rkm){/* Proper message. */cbprintf("Message on %s [%" PRId32 "] at offset %" PRId64 ":\n",rd_kafka_topic_name(rkm->rkt), rkm->partition,rkm->offset);/* Print the message key. */if (rkm->key )cbprintf(" Key: %.*s\n", (int)rkm->key_len,(const char *)rkm->key);/* Print the message value/payload. */if (rkm->payload)cbprintf(" Value: (%d bytes)\n", (int)rkm->len);}});});

4 范例工程

参考代码仓库
https://gitcode.net/coloreaglestdio/qtcpp_demo/-/tree/master/kafka/rdkafka_qt
该仓库中的范例在 Manjaro 和 Win10 MSYS2 / VC2022下编译通过。

MSYS2显著简化Kafka在windows C++下的使用门槛相关推荐

  1. kafka在Windows系统下的安装出现的错误汇总:

    kafka在Windows系统下的安装出现的部分奇奇怪怪的错误: 假设你渡过了Zookeeper版本和JDK版本问题,出现了如下问题. ①出现了'wmic' 不是内部或外部命令,也不是可运行的程序 或 ...

  2. Windows系统下安装ROS系统

    Windows上安装ROS系统 请参照以下链接和说明,完成Windows上安装ROS系统. Installation/Windows - ROS Wiki 注意事项: Windows版本:64-bit ...

  3. kafka在windows下单机版搭建

    kafka在windows下单机版搭建 1.win本地单机版搭建 1.1安装zookeeper环境 下载地址 Apache Downloads 添加配置文件 在config目录下复制一份zoo_sam ...

  4. DeepLearning:windows环境下C++环境实现Tensorflow编译部署

    [写在前面] 都说深度学习的这条大船上来了就应该不惧风雨,可是在配置环境这条路上的坑真是刚出旧坑又入新坑,2021年最后一天了.想想rensorflow的源代码在windows 环境下的编译历程,就忍 ...

  5. Android 开发之Windows环境下Android Studio安装和使用教程(图文详细步骤)

    鉴于谷歌最新推出的Android Studio备受开发者的推崇,所以也跟着体验一下. 一.介绍Android Studio  Android Studio 是一个Android开发环境,基于Intel ...

  6. Windows环境下Dapr入门

    Dapr是Distributed Application Runtime(分布式应用运行时)的缩写.它是一个可移植的.事件驱动的运行时. 下面介绍如何在Windows平台安装Dapr: 安装Dapr ...

  7. Windows 7下实现×××连接自动创建

    在完成了Windows XP下×××自动创建并配置连接后,其实我们还需要对Windows 7下×××自动配置做一个介绍,因为目前实际上很多企业也已经开始进行了大批量的Windows 7的使用,而且很多 ...

  8. windows环境下安装RabbitMQ(超详细)

    windows环境下安装RabbitMQ(超详细) 记录RabbitMQ安装过程,欢迎大家和我交流.(安装过程中遇到的问题也有总结哈,请查看) RabbitMq简介 安装准备工具 安装步骤(图文) 安 ...

  9. kafka安装(windows版)

    一.安装JDK 过程比较简单,就不在详细说明 二.安装zookeeper # 1.下载安装包 https://zookeeper.apache.org/releases.html#download # ...

最新文章

  1. 不能打游戏的汽车不是好电影院!特斯拉面向国内推送V10.0系统,能辅助变道还能看爱奇艺...
  2. python降序排列说true不存在_Python数据类型串讲(中)
  3. jmeter正则表达式提取器多模块相互调用
  4. Jenkins 创建一个freestyle的Job
  5. 学计算机之路写一篇作文,写我的学艺之路作文
  6. Linux 一句话 命令
  7. quill鼠标悬浮 出现提示_html实现鼠标悬停显示气泡文字内容
  8. 路科验证示例_角度形式验证示例
  9. 为VMware虚拟机内安装的Ubuntu 16.04设置静态IP地址
  10. java常见的异常_Java常见的10个异常
  11. Nodejs安装教程
  12. 工程课系列-Level3-Web应用课
  13. 聚观早报 | 羊了个羊幕后推手月流水曾破亿;雷军卸任小米董事长
  14. FPGA实现的SPI协议(一)----SPI驱动
  15. 模仿百度首页的图片轮播
  16. ccc-sklearn-13-朴素贝叶斯(1)
  17. Learning Attribute Representations with Localization for Flexible Fashion Search
  18. k8s集群reset恢复重置
  19. (Model-Contrastive Federated Learning)模型对比联邦学习
  20. html标签语义化的好处,(2019)[前端]面试题[9]:HTML5语义化标签和新特性

热门文章

  1. OpenStack OVS GRE/VXLAN
  2. MAC OS X10 10上Android开发环境搭建
  3. 能在学生用计算机的面上画的图画,中小学生电脑绘画作品图片
  4. 高阶低通无源滤波器的设计
  5. 宽字节 多字节 单字节 的问题
  6. 积水问题几种解决方案
  7. [转] Xcode 高级调试技巧
  8. shell编程-06-判断语句和循环语句
  9. Understand Tensor Deeply
  10. 浅谈5G 与4G的区别