ZMQ(ØMQ、ZeroMQ, 0MQ)看起来像是一套嵌入式的网络链接库,但工作起来更像是一个并发式的框架。它提供的套接字可以在多种协议中传输消息,如线程间、进程间、TCP、广播等。你可以使用套接字构建多对多的连接模式,如扇出、发布-订阅、任务分发、请求-应答等。ZMQ的快速足以胜任集群应用产品。它的异步I/O机制让你能够构建多核应用程序,完成异步消息处理任务。ZMQ有着多语言支持,并能在几乎所有的操作系统上运行。ZMQ是iMatix公司的产品,以LGPL开源协议发布。

本人在这次的系统中选择了这个框架,理由有:

  • 开源,在github中可以找到代码
  • 英文文档详细
  • 示例代码详细
  • 安装方便,使用vcpkg可以直接集成到visual studio 2017中。
  • 使用方便,隐藏了socket通信这部分,本人开发的时候只关注业务逻辑,基本不需要管socket这部分代码。
  • 轻量且快

本文主要记录如何使用ZMQ。

1 Windows下安装,使用Visual Studio2017开发

  • 安装vcpkg,一个第三方库管理器,能够很好和visual studio 集成。
git clone https://github.com/Microsoft/vcpkg.git
cd vcpkg
./bootstrap-vcpkg.sh
./vcpkg integrate install
  • 安装ZMQ的C语言版的包:czmq
vcpkg install czmq:x64-windows
  • 如果要使用c++,可以进一步安装cppzmq,它是用c++对czmq的封装。
vcpkg install cppzmq:x64-windows

默认情况下vcpkg安装的是32位的安装包,加上x64-windows之后,安装的是64位的。因此,这个在使用visual studio2017的时候要注意。可以两者都安装。

通过上面的步骤,已经安装好了ZMQ,可以使用visual studio 2017来开发了。下面张贴一个例子。

Reference:

中文介绍: https://github.com/anjuke/zguide-cn/blob/master/chapter1.md

  • ZMQ的代码:https://github.com/booksbyus/zguide
  • 文档:http://zguide.zeromq.org/page:all
  • 文档2:http://zguide.zeromq.org/

示例代码:

所有的示例代码可以在这个链接下找到,是linux版的。
https://github.com/booksbyus/zguide/tree/3d6d233f2db5258adde9d151e077fb52841f08eb/examples

下面的代码已经经过本人适当修改,可以直接在visual studio 2017上面运行。

建两个新项目,分别存放服务器端的代码和客户端的代码。

服务器端的代码如下:

server.cpp


#pragma warning(disable : 4996)//
//  Task ventilator in C++
//  Binds PUSH socket to tcp://localhost:5557
//  Sends batch of tasks to workers via that socket
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//
#include <zmq.hpp>
#include <stdlib.h>
#include <stdio.h>
#include <Windows.h>
#include <iostream>#include <thread>#include <time.h>
#include <chrono>
#include <iostream>
#include "zhelpers.hpp"using namespace std;
using namespace chrono;const string ADDR = "tcp://127.0.0.1";struct Msg_t {char type;unsigned int len;char data[];
};typedef struct Msg_t Msg;void processSink(zmq::context_t& context)
{//  Prepare our context and socket//zmq::context_t context(1);zmq::socket_t receiver(context, ZMQ_PULL);receiver.bind(ADDR + ":5558");//  Wait for start of batchzmq::message_t message;receiver.recv(&message);//  Start our clock now//struct timeval tstart;// gettimeofday (&tstart, NULL);auto startTime = high_resolution_clock::now();//  Process 100 confirmationsint task_nbr;int total_msec = 0;     //  Total calculated cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {receiver.recv(&message);std::string smessage(static_cast<char*>(message.data()), message.size());cout << smessage;}auto now = high_resolution_clock::now();//  Calculate and report duration of batchdouble totalElapsed = ((duration<double, std::milli>)(now - startTime)).count();std::cout << "\nTotal elapsed time: " << totalElapsed << "; " << total_msec << " msec\n" << std::endl;}#define within(num) (int) ((float) num * rand() / (RAND_MAX + 1.0))int main(int argc, char *argv[])
{zmq::context_t context(1);std::thread sinkThread(processSink, ref(context));sinkThread.detach();zmq::socket_t publisher(context, ZMQ_PUB);//  避免慢持久化订阅者消息溢出的问题int sndhwm = 110000;zmq_setsockopt(publisher, ZMQ_SNDHWM, &sndhwm, sizeof(int));// 指定交换区大小,单位:字节。//uint64_t swap = 100000000;//zmq_setsockopt(publisher, ZMQ_SWAP, &swap, sizeof(swap));publisher.bind(ADDR + ":5557");//  Socket to send messages on//zmq::socket_t sender(context, ZMQ_PUSH);//sender.bind("tcp://127.0.0.1:5557");std::cout << "Press Enter when the workers are ready: " << std::endl;getchar();std::cout << "Sending tasks to workers...\n" << std::endl;//  The first message is "0" and signals start of batchzmq::socket_t sink(context, ZMQ_PUSH);sink.connect(ADDR + ":5558");zmq::message_t message(2);memcpy(message.data(), "1", 1);sink.send(message);//  Send 100 tasks//int task_nbr;//int total_msec = 0;     //  Total expected cost in msecs//int zipcode, temperature, relhumidity;string data = string(10000, 'a');size_t totalLen_msg = sizeof(Msg) + data.length();Msg * msg = (Msg*)malloc(totalLen_msg);msg->type = '1';msg->len = data.length();memcpy(msg->data, data.c_str(), data.length());for (int i = 0; i < 100000; i++) {//  Get values that will fool the boss//zipcode = 10001;//temperature = within(215) - 80;//relhumidity = within(50) + 10;//  Send message to all subscribers//zmq::message_t message(20+data.length());//snprintf((char *)message.data(), 20+data.length(),// "%05d %d %d %s", zipcode, temperature, relhumidity, data.c_str());//publisher.send(message);//  Send message to all subscriberszmq::message_t message(totalLen_msg);memcpy((void*)message.data(), msg, totalLen_msg);publisher.send(message);if(i % 1000 == 0) cout << "sent " << (i + 1) << endl;}std::cout << "Total expected cost: " << 0 << " msec" << std::endl;//  Give 0MQ time to deliverSleep(-1);//system("pause");return 0;
}

下面客户端的代码:

client.cpp

//
//  Task worker in C++
//  Connects PULL socket to tcp://localhost:5557
//  Collects workloads from ventilator via that socket
//  Connects PUSH socket to tcp://localhost:5558
//  Sends results to sink via that socket
//
//  Olivier Chamoux <olivier.chamoux@fr.thalesgroup.com>
//#include <zmq.hpp>
#include "zhelpers.hpp"
#include <string>using namespace std;struct Msg_t {char type;unsigned int len;char data[];
};
typedef struct Msg_t Msg;int main(int argc, char *argv[])
{zmq::context_t context(1);//  Socket to talk to serverstd::cout << "Collecting updates from weather server...\n" << std::endl;zmq::socket_t subscriber(context, ZMQ_SUB);subscriber.connect("tcp://127.0.0.1:5557");int hwm = 100000;zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm));//  Subscribe to zipcode, default is NYC, 10001//const char *filter = (argc > 1) ? argv[1] : "10001 ";//subscriber.setsockopt(ZMQ_SUBSCRIBE, filter, strlen(filter));subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);//  Socket to receive messages on//zmq::socket_t receiver(context, ZMQ_PULL);//receiver.connect("tcp://127.0.0.1:5557");//  Socket to send messages tozmq::socket_t sender(context, ZMQ_PUSH);sender.connect("tcp://127.0.0.1:5558");//  Process 100 updatesint update_nbr;long total_temp = 0;for (update_nbr = 0; update_nbr < 100000; update_nbr++) {zmq::message_t update;//int zipcode, temperature, relhumidity;subscriber.recv(&update);//std::istringstream iss(static_cast<char*>(update.data()));//iss >> zipcode >> temperature >> relhumidity >> data;Msg * msg = static_cast<Msg*>(update.data());//total_temp += temperature;if (update_nbr % 100 == 0) {cout << "rev:  = " << update_nbr  << ", "<< msg->type << ", "<< msg->len << ", "<< msg->data[msg->len - 1]<< "\n";}//  Do the work//s_sleep(workload);//  Simple progress indicator for the viewer//std::cout << "." << std::flush;//cout << zipcode << "\r";}int avg = (int)(total_temp / update_nbr);std::cout << "Average temperature for zipcode '" << "' was " << avg << "F" << std::endl;//  Send results to sinkstring avgS = to_string(avg);zmq::message_t reply(avgS.length());memcpy((void *)reply.data(), avgS.c_str(), avgS.length());sender.send(reply);return 0;
}

服务端和客户端的代码都用到了下面的工具头文件代码:

zhelpers.hpp

#pragma warning(disable : 4996)#ifndef __ZHELPERS_HPP_INCLUDED__
#define __ZHELPERS_HPP_INCLUDED__//  Include a bunch of headers that we will need in the examples#include <zmq.hpp> // https://github.com/zeromq/cppzmq#include <iostream>
#include <iomanip>
#include <string>
#include <sstream>#include <time.h>
#include <assert.h>
#include <stdlib.h>        // random()  RAND_MAX
#include <stdio.h>
#include <stdarg.h>
#include <signal.h>
#include <Windows.h>//  Bring Windows MSVC up to C99 scratch
#if (defined (WIN32))
typedef unsigned long ulong;
typedef unsigned int  uint;
typedef __int64 int64_t;
#endif//  On some version of Windows, POSIX subsystem is not installed by default.
//  So define srandom and random ourself.
//
#if (defined (WIN32))
#   define srandom srand
#   define random rand
#endif// Visual Studio versions below 2015 do not support sprintf properly. This is a workaround.
// Taken from http://stackoverflow.com/questions/2915672/snprintf-and-visual-studio-2010
#if defined(_MSC_VER) && _MSC_VER < 1900#define snprintf c99_snprintf
#define vsnprintf c99_vsnprintfinline int c99_vsnprintf(char *outBuf, size_t size, const char *format, va_list ap)
{int count = -1;if (size != 0)count = _vsnprintf_s(outBuf, size, _TRUNCATE, format, ap);if (count == -1)count = _vscprintf(format, ap);return count;
}inline int c99_snprintf(char *outBuf, size_t size, const char *format, ...)
{int count;va_list ap;va_start(ap, format);count = c99_vsnprintf(outBuf, size, format, ap);va_end(ap);return count;
}#endif//  Provide random number from 0..(num-1)
#define within(num) (int) ((float)((num) * random ()) / (RAND_MAX + 1.0))//  Receive 0MQ string from socket and convert into C string
//  Caller must free returned string.
inline static char *
s_recv(void *socket, int flags = 0) {zmq_msg_t message;zmq_msg_init(&message);int rc = zmq_msg_recv(&message, socket, flags);if (rc < 0)return nullptr;           //  Context terminated, exitsize_t size = zmq_msg_size(&message);char *string = (char*)malloc(size + 1);memcpy(string, zmq_msg_data(&message), size);zmq_msg_close(&message);string[size] = 0;return (string);
}//  Receive 0MQ string from socket and convert into string
inline static std::string
s_recv(zmq::socket_t & socket, int flags = 0) {zmq::message_t message;socket.recv(&message, flags);return std::string(static_cast<char*>(message.data()), message.size());
}inline static bool s_recv(zmq::socket_t & socket, std::string & ostring, int flags = 0)
{zmq::message_t message;bool rc = socket.recv(&message, flags);if (rc) {ostring = std::string(static_cast<char*>(message.data()), message.size());}return (rc);
}//  Convert C string to 0MQ string and send to socket
inline static int
s_send(void *socket, const char *string, int flags = 0) {int rc;zmq_msg_t message;zmq_msg_init_size(&message, strlen(string));memcpy(zmq_msg_data(&message), string, strlen(string));rc = zmq_msg_send(&message, socket, flags);assert(-1 != rc);zmq_msg_close(&message);return (rc);
}//  Convert string to 0MQ string and send to socket
inline static bool
s_send(zmq::socket_t & socket, const std::string & string, int flags = 0) {zmq::message_t message(string.size());memcpy(message.data(), string.data(), string.size());bool rc = socket.send(message, flags);return (rc);
}//  Sends string as 0MQ string, as multipart non-terminal
inline static int
s_sendmore(void *socket, char *string) {int rc;zmq_msg_t message;zmq_msg_init_size(&message, strlen(string));memcpy(zmq_msg_data(&message), string, strlen(string));//rc = zmq_send(socket, string, strlen(string), ZMQ_SNDMORE);rc = zmq_msg_send(&message, socket, ZMQ_SNDMORE);assert(-1 != rc);zmq_msg_close(&message);return (rc);
}//  Sends string as 0MQ string, as multipart non-terminal
inline static bool
s_sendmore(zmq::socket_t & socket, const std::string & string) {zmq::message_t message(string.size());memcpy(message.data(), string.data(), string.size());bool rc = socket.send(message, ZMQ_SNDMORE);return (rc);
}//  Receives all message parts from socket, prints neatly
//
inline static void
s_dump(zmq::socket_t & socket)
{std::cout << "----------------------------------------" << std::endl;while (1) {//  Process all parts of the messagezmq::message_t message;socket.recv(&message);//  Dump the message as text or binarysize_t size = message.size();std::string data(static_cast<char*>(message.data()), size);bool is_text = true;size_t char_nbr;unsigned char byte;for (char_nbr = 0; char_nbr < size; char_nbr++) {byte = data[char_nbr];if (byte < 32 || byte > 127)is_text = false;}std::cout << "[" << std::setfill('0') << std::setw(3) << size << "]";for (char_nbr = 0; char_nbr < size; char_nbr++) {if (is_text)std::cout << (char)data[char_nbr];elsestd::cout << std::setfill('0') << std::setw(2)<< std::hex << (unsigned int)data[char_nbr];}std::cout << std::endl;int more = 0;           //  Multipart detectionsize_t more_size = sizeof(more);socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);if (!more)break;              //  Last message part}
}#if (!defined (WIN32))
//  Set simple random printable identity on socket
//  Caution:
//    DO NOT call this version of s_set_id from multiple threads on MS Windows
//    since s_set_id will call rand() on MS Windows. rand(), however, is not
//    reentrant or thread-safe. See issue #521.
inline std::string
s_set_id(zmq::socket_t & socket)
{std::stringstream ss;ss << std::hex << std::uppercase<< std::setw(4) << std::setfill('0') << within(0x10000) << "-"<< std::setw(4) << std::setfill('0') << within(0x10000);socket.setsockopt(ZMQ_IDENTITY, ss.str().c_str(), ss.str().length());return ss.str();
}
#else
// Fix #521
inline std::string
s_set_id(zmq::socket_t & socket, intptr_t id)
{std::stringstream ss;ss << std::hex << std::uppercase<< std::setw(4) << std::setfill('0') << id;socket.setsockopt(ZMQ_IDENTITY, ss.str().c_str(), ss.str().length());return ss.str();
}
#endif//  Report 0MQ version number
//
inline static void
s_version(void)
{int major, minor, patch;zmq_version(&major, &minor, &patch);std::cout << "Current 0MQ version is " << major << "." << minor << "." << patch << std::endl;
}inline static void
s_version_assert(int want_major, int want_minor)
{int major, minor, patch;zmq_version(&major, &minor, &patch);if (major < want_major|| (major == want_major && minor < want_minor)) {std::cout << "Current 0MQ version is " << major << "." << minor << std::endl;std::cout << "Application needs at least " << want_major << "." << want_minor<< " - cannot continue" << std::endl;exit(EXIT_FAILURE);}
}//  Return current system clock as milliseconds
inline static int64_t
s_clock(void)
{
#if (defined (WIN32))FILETIME fileTime;GetSystemTimeAsFileTime(&fileTime);unsigned __int64 largeInt = fileTime.dwHighDateTime;largeInt <<= 32;largeInt |= fileTime.dwLowDateTime;largeInt /= 10000; // FILETIME is in units of 100 nanosecondsreturn (int64_t)largeInt;
#elsestruct timeval tv;gettimeofday(&tv, NULL);return (int64_t)(tv.tv_sec * 1000 + tv.tv_usec / 1000);
#endif
}//  Sleep for a number of milliseconds
inline static void
s_sleep(int msecs)
{
#if (defined (WIN32))Sleep(msecs);
#elsestruct timespec t;t.tv_sec = msecs / 1000;t.tv_nsec = (msecs % 1000) * 1000000;nanosleep(&t, NULL);
#endif
}inline static void
s_console(const char *format, ...)
{time_t curtime = time(NULL);struct tm *loctime = localtime(&curtime);char *formatted = new char[20];strftime(formatted, 20, "%y-%m-%d %H:%M:%S ", loctime);printf("%s", formatted);delete[] formatted;va_list argptr;va_start(argptr, format);vprintf(format, argptr);va_end(argptr);printf("\n");
}//  ---------------------------------------------------------------------
//  Signal handling
//
//  Call s_catch_signals() in your application at startup, and then exit
//  your main loop if s_interrupted is ever 1. Works especially well with
//  zmq_poll.static int s_interrupted = 0;
inline static void s_signal_handler(int signal_value)
{s_interrupted = 1;
}inline static void s_catch_signals()
{
#if (!defined(WIN32))struct sigaction action;action.sa_handler = s_signal_handler;action.sa_flags = 0;sigemptyset(&action.sa_mask);sigaction(SIGINT, &action, NULL);sigaction(SIGTERM, &action, NULL);
#endif
}#endif

一个c/c++分布式框架ZMQ或者ZeroMQ, 介绍和win下安装使用方法相关推荐

  1. Python 并行分布式框架 Celery

    Celery 官网:http://www.celeryproject.org Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index. ...

  2. C++分布式框架研究(一)

    近期打算使用C++写一个简单的分布式框架,以便提高自己的技术水平.框架计划采用grpc作为通信底层,无中心管理服务的形式.

  3. 直播 | 清华大学王晨阳:轻量级Top-K推荐框架及相关论文介绍

    「PW Live」是 PaperWeekly 的学术直播间,旨在帮助更多的青年学者宣传其最新科研成果.我们一直认为,单向地输出知识并不是一个最好的方式,而有效地反馈和交流可能会让知识的传播更加有意义, ...

  4. windows7安装python框架_windows7下安装python3的scrapy框架

    强大的Anaconda和Spyder.不过如何在这个平台上安装Scrapy呢. 打开MS-DOS(win+R输入cmd回车) 然后输入: conda install -c scrapinghub sc ...

  5. 干货!如何设计实现一个通用的分布式事务框架?

    来源:https://www.bytesoft.org/ 一个TCC事务框架需要解决的当然是分布式事务的管理.关于TCC事务机制的介绍,可以参考TCC事务机制简介. TCC事务模型虽然说起来简单,然而 ...

  6. 如何设计实现一个通用的分布式事务框架?

    公众号后台回复"学习",获取作者独家秘制精品资料 扫描下方海报二维码,试听课程: 本文来源:https://www.bytesoft.org/ 一个TCC事务框架需要解决的当然是分 ...

  7. 我的面试标准:第一能干活,第二Java基础要好,第三最好熟悉些分布式框架!...

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试资料 作者:hsm_computer www.cnblogs.com/J ...

  8. 大数据时代,如何根据业务选择合适的分布式框架

    内容来源:2018 年 5 月 5 日,小米HBase研发工程师吴国泉在"ACMUG & CRUG 2018 成都站"进行<大数据时代系统体系架构和对比:存储与计算& ...

  9. 淘宝分布式框架Fourinone2.0正式版发布

    淘宝Fourinone2.0提供了一个4合1分布式框架和简单易用的编程api,实现对多台计算机cpu,内存,硬盘的统一利用,从而获取到强大计算能力去解决复杂问题.Fourinone框架提供了一系列并行 ...

最新文章

  1. python 获取用户ip_Python爬虫教程:你还在苦苦拉票吗?刷票小程序案例原理剖析!...
  2. NVIDIA Jetson Xavier NX分配磁盘空间
  3. AIDL注意细节 简单Demo
  4. Qt 调用 Windows 接口实现窗口置顶
  5. C#将运算字符串直接转换成表达式且计算结果
  6. 3.SFB标准版前端安装
  7. winform第三方控件wmp
  8. 编译器vs.代码 谁之过
  9. SDN精华问答 | SDN可以做什么?
  10. 03_SpringCloud整合Ribbon实现负载均衡
  11. 95-50-040-java.nio.channels-NIO-NIO之Buffer(缓冲区)
  12. 面试妥了!2020 爬虫面试题目合集
  13. jmeter+接口测试练习+接口关联+Json提取
  14. QT编程编程入门系列文章之一——QT编程简介
  15. 命令与征服3:泰伯利亚战争和红警:共和国之辉
  16. Python网络编程(OSI Socket)
  17. 第三章 系统的时域分析
  18. python 查找excel内容所在的单元格_python 读取excel中单元格的内容
  19. 2022年安徽最新食品安全管理员模拟考点及答案
  20. 万科为并购平台投资39亿,王石退路浮出水面

热门文章

  1. 汇编转c语言,如何把汇编语言转换成C语言
  2. python怎么输出文本_python输出语句怎么用
  3. 关于API文档浏览神器Dash
  4. 反向代理or后端nginx 生产用于缓存视频需要解决的问题
  5. bootstrap之div居中
  6. 【算法导论学习笔记】第3章:函数的增长
  7. 织梦后台不显示验证码的解决
  8. python连接mysql
  9. 如何在自己工程项目中使用TouchJSON框架
  10. Linux机器建立密钥信任