broker的目的

相对于XPub/XSub模式,我们很容易想到Pub/Sub模式,即订阅发布模式。当我们使用ZeroMQ创建一个包含订阅发布模式的系统时,我们通常创建一个消息的发布者,即Publisher,和若干个消息订阅者(Subscriber)。消息的发布者绑定端口,订阅者通过发布者的IP和端口连接发布者,并且注册消息主题(Topic),然后进行接收匹配主题的消息。整体结构如下图:

在订阅发布模式下,订阅者可以动态加入,随时连接消息的发布者,然后接收消息。但是,在这种结构中,如果有新的Publisher加入,那么所有订阅者都需要连接到这个Publisher上。如果系统中有成百上千的订阅者,每一个新的Publisher的加入都会给系统造成很大的操作成本,这显然限制了系统规模。
要解决这个问题,也很简单,就像只有一个发布者情况,所有的订阅者都只与这一个消息发布者交互,不管是Publisher内部发生什么变化,Subscriber都可以动态感知这种变化。所以很容易我们可以想到创建一个中间件来解耦Publishers和Subscribers,所有Subscriber都只与这一个中间件交互,换句话说,这个中间件从很多个Publisher那里接收消息,然后转发给Subscibers。事实上,有了这个中间件,我们可以做很多Pub/Sub模式做不了的事情,比如说对传送过程中的消息进行管理,重构,或者对系统进行负载均衡等等。我们把这个中间件称为Broker,上面说的这种模式,我们称之为XPub/XSub模式。

XPub/XSub

在XPub/XSub模式中,对Publisher来说由Pub/Sub模式中的bind操作变成了connect操作,connect的对象为Broker中的XSub端口。对Subscriber而言,和Publisher的操作一样,只不过connect的是Broker的XPub端口。在Broker中我们绑定XSub和XPub这两个端口。Proxy的作用即为中转消息,在ZMQ的API中提供了zmq.proxy方法来中转消息,其实Proxy就是一个代码块,在这个代码块中可以做任何我们想做的操作。后面会介绍一个简单的例子。从XPub/XSub这个模式中,我们可以发现,不管是Publisher还是Subscriber,它们的加入和离开都可以被系统动态发现。

从上图可以看出,在增加了broker(XSUB_XPUB)之后,Publish和Subscriber都可以动态的加入和离开,并且双方都是透明的,互不影响。这就达到了动态扩展和横向水平扩容的目的。

示例代码

publisher

/*** @file pub.cpp* @brief pub demo* @author shlian* @version 1.0* @date 2020-11-06*/#include <chrono>
#include <iostream>
#include <string>#include <gflags/gflags.h>#include <zmqpp/context.hpp>
#include <zmqpp/context_options.hpp>
#include <zmqpp/loop.hpp>
#include <zmqpp/message.hpp>
#include <zmqpp/socket_types.hpp>
#include <zmqpp/zmqpp.hpp>#include "../include/common.h"DEFINE_string(broker_endpoint,"tcp://127.0.0.1:15556","the broker backend endpoint that connected by pub server");
DEFINE_int32(pub_interval,1000,"publish interval,ms");
DEFINE_int32(io_thread_count,1,"the io_thread count of the zeromq context");
DEFINE_string(id,"pub","the server id");bool pub_message(zmqpp::socket &socket);int main(int argc, char *argv[])
{gflags::SetUsageMessage("Usage");gflags::ParseCommandLineFlags(&argc,&argv,true);zmqpp::context context;context.set(zmqpp::context_option::io_threads,FLAGS_io_thread_count);zmqpp::socket pub_socket(context,zmqpp::socket_type::pub);pub_socket.connect(FLAGS_broker_endpoint);zmqpp::loop looper;looper.add(std::chrono::milliseconds(FLAGS_pub_interval),0,std::bind(pub_message,std::ref(pub_socket)));looper.start();return 0;
}bool pub_message(zmqpp::socket &socket)
{static unsigned long long index=0;zmqpp::message msg;if(index++%2==0){msg.add("even");}else{msg.add("odd");}msg.add(FLAGS_id+":"+common::format_time());LOG_INFO(FLAGS_id<<" pub["<<msg.get(0)<<"],["<<msg.get(1)<<"]");auto res=socket.send(msg);return res;
}

broker

/*** @file broker.cpp* @brief broker demo* @author shlian* @version 1.0* @date 2020-11-06*/
#include <gflags/gflags.h>#include <zmqpp/context.hpp>
#include <zmqpp/context_options.hpp>
#include <zmqpp/loop.hpp>
#include <zmqpp/socket_options.hpp>
#include <zmqpp/socket_types.hpp>
#include <zmqpp/zmqpp.hpp>#include "../include/common.h"DEFINE_string(front_endpoint,"tcp://*:15555","the endpoint of the broker that connected by sub client");
DEFINE_string(backend_endpoint,"tcp://*:15556","the endpoint of the broker that connected by pub server");
DEFINE_int32(io_thread_count,1,"the io_thread count of the zeromq context");bool front_proxy(zmqpp::socket &xpub_socket,zmqpp::socket &xsub_scoket);
bool backend_proxy(zmqpp::socket &xsub_scoket,zmqpp::socket &xpub_socket);int main(int argc, char *argv[])
{gflags::SetUsageMessage("Usage");gflags::ParseCommandLineFlags(&argc,&argv,true);zmqpp::context context;context.set(zmqpp::context_option::io_threads,FLAGS_io_thread_count);//bind front endpointzmqpp::socket xpub_socket(context,zmqpp::socket_type::xpub);xpub_socket.set(zmqpp::socket_option::xpub_verbose,1);xpub_socket.bind(FLAGS_front_endpoint);//bind backend endpointzmqpp::socket xsub_socket(context,zmqpp::socket_type::xsub);xsub_socket.bind(FLAGS_backend_endpoint);xsub_socket.send(std::string(1,0x01));//start event loopzmqpp::loop looper;looper.add(xsub_socket,std::bind(backend_proxy,std::ref(xsub_socket),std::ref(xpub_socket)));looper.add(xpub_socket,std::bind(front_proxy,std::ref(xpub_socket),std::ref(xsub_socket)));looper.start();return 0;
}unsigned long long forward_topic=0;
bool front_proxy(zmqpp::socket &xpub_socket,zmqpp::socket &xsub_socket)
{zmqpp::message msg;bool res=xpub_socket.receive(msg);if(res){++forward_topic;//std::string topic=msg.get(0);//must manage the topics and process subscribe and unsubscribe topic,because socket receives only topic part and do not known if subscribe or unsubscribeLOG_INFO("handle subscribe topic:["<<msg.get(0)<<"],parts="<<msg.parts());std::string topic(1,0x01);topic.append(msg.get(0));res=xsub_socket.send(topic);}return res;
}unsigned long long forward_data_msg=0;
bool backend_proxy(zmqpp::socket &xsub_socket,zmqpp::socket &xpub_socket)
{zmqpp::message msg;bool res=xsub_socket.receive(msg);if(res){res=xpub_socket.send(msg);if((res)&&(forward_data_msg++%100==0)){LOG_INFO("forward:"<<forward_data_msg<<" data messages");}}return res;
}

subscriber

/*** @file sub.cpp* @brief sub demo* @author shlian* @version 1.0* @date 2020-11-06*/#include <gflags/gflags.h>#include <zmqpp/context.hpp>
#include <zmqpp/context_options.hpp>
#include <zmqpp/socket.hpp>
#include <zmqpp/socket_types.hpp>
#include <zmqpp/zmqpp.hpp>#include "../include/common.h"DEFINE_string(broker_endpoint,"tcp://127.0.0.1:15555","the broker endpoint that connected by sub client");
DEFINE_string(topic,"","the subscribing topics");
DEFINE_int32(io_thread_count,1,"the io_thread count of the zeromq context");bool handle_message(zmqpp::socket &socket);int main(int argc, char *argv[])
{gflags::SetUsageMessage("Usage");gflags::ParseCommandLineFlags(&argc,&argv,true);zmqpp::context context;context.set(zmqpp::context_option::io_threads,FLAGS_io_thread_count);zmqpp::socket sub_socket(context,zmqpp::socket_type::sub);sub_socket.connect(FLAGS_broker_endpoint);sub_socket.subscribe(FLAGS_topic);LOG_INFO("subscribe:"<<FLAGS_topic);zmqpp::loop looper;looper.add(sub_socket,std::bind(handle_message,std::ref(sub_socket)));looper.start();return 0;
}bool handle_message(zmqpp::socket &socket)
{zmqpp::message msg;auto res=socket.receive(msg);if(res){LOG_INFO("recv ["<<msg.get(0)<<"]"<<"["<<msg.get(1)<<"]");}return res;
}

CMakeLists

cmake_minimum_required( VERSION 3.8 FATAL_ERROR)
project(broker LANGUAGES CXX)#set dirs
set(PROJECT_ROOT ${CMAKE_CURRENT_LIST_DIR})
message("project dir:${PROJECT_ROOT}")SET(BIN_DESTINATION ${PROJECT_SOURCE_DIR}/bin)
SET(CMAKE_ARCHIVE_OUTPUT_DIRECTORY ${BIN_DESTINATION})
SET(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${BIN_DESTINATION})
SET(CMAKE_RUNTIME_OUTPUT_DIRECTORY ${BIN_DESTINATION})#include cmake files
include(${PROJECT_ROOT}/../version.cmake)#set compile flags
#add_definitions(-std=c++11 -g -rdynamic)
set(CMAKE_CXX_FLAGS "-g3 -rdynamic -std=c++11")
set(CMAKE_CXX_FLAGS_DEBUG "-g3 -O0 ")#-fsanitize=address -fno-omit-frame-pointer -fsanitize=leak")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")#include dirs
include_directories(./ ../include/)#link dirs
link_directories(${BIN_DESTINATION})#execute
SET(SRC_MAIN broker.cpp ../include/common.cpp)
add_executable( ${PROJECT_NAME} ${SRC_MAIN})
set_target_properties(${PROJECT_NAME} PROPERTIES VERSION ${PROJECT_VERSION})
target_link_libraries(${PROJECT_NAME} pthread zmq zmqpp gflags)add_executable(pub pub.cpp ../include/common.cpp)
set_target_properties(pub PROPERTIES VERSION ${PROJECT_VERSION})
target_link_libraries(pub pthread zmq zmqpp gflags)add_executable(sub sub.cpp ../include/common.cpp)
set_target_properties(sub PROPERTIES VERSION ${PROJECT_VERSION})
target_link_libraries(sub pthread zmq zmqpp gflags)

运行效果截图

跟我一起学习ZeroMQ(8):带broker的发布订阅模式:ZMQ_PUB、broker(ZMQ_XPUB和ZMQ_XSUB)、ZMQ_SUB相关推荐

  1. Service Broker实现发布-订阅(Publish-Subscribe)框架(3)

    Service Broker实现发布-订阅(Publish-Subscribe)框架(3) 这一主题前面相关的文章如下:Service Broker实现发布-订阅(Publish-Subscribe) ...

  2. Service Broker实现发布-订阅(Publish-Subscribe)框架

    Service Broker实现发布-订阅(Publish-Subscribe)框架 Service Broker 实现一套完整的发布-订阅方案,其中author 发送Service Broker M ...

  3. 跟我一起学习ZeroMQ(10):Exclusive pair模式——ZMQ_PAIR

    Exclusive pair模式简介 The exclusive pair pattern is used to connect a peer to precisely one other peer. ...

  4. RabbitMQ入门学习系列(四) 发布订阅模式

    什么时发布订阅模式 把消息发送给多个订阅者.也就是有多个消费端都完整的接收生产者的消息 换句话说 把消息广播给多个消费者 消息模型的核心 RabbitMQ不发送消息给队列,生产者也不知道消息发送到队列 ...

  5. Vue组件学习之事件总线和消息发布订阅

    简介 主要介绍事件总线的定义和编写方法和Vue是如何实现消息的订阅与发布的. 事件总线 事件总线是组件间通信的一种方式,适用于任意组件间的通信,比如毫不相干的两个组件.父子组件间.后代组件等等,都能通 ...

  6. 每日学习一个设计模式--观察者模式(发布-订阅模式)

    定义 观察者(Observer)模式的定义:指多个对象间存在一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新.这种模式有时又称作发布-订阅模式.模型-视图模式,它 ...

  7. java自带的发布订阅模式

    1. 继承ApplicationEvent,定义基础事件 public class BaseEvent extends ApplicationEvent implements Serializable ...

  8. vue源码深入解读MVVM(视图模板引擎),你真的了解双向绑定(v-model),数据劫持(observe),发布订阅模式吗?带你手鲁mvvm引擎。源码奉上(详细注释)!

    文章目录 #1.vue的强大之处不必细说,vue的核心v-model的实现原理,网上都有很多.但是真正自己实现双向绑定,mvvm的源码却几乎没见过. #1.2本人根据源码的解读,理解,以及借鉴网上的视 ...

  9. Spring Boot基础学习笔记25:RabbitMQ - 发布/订阅工作模式

    文章目录 零.学习目标 一.准备工作 (一)创建Spring Boot项目 - PublishSubscribeDemo (二)在应用属性文件里配置RabbitMQ 二.基于API进行消息发布和订阅 ...

  10. Node 学习六、核心模块 events之 01 events 和 EventEmitter 类、发布订阅、EventEmitter 类源码解析和模拟实现

    events 事件模块 events 与 EventEmitter node.js 是基于事件驱动的异步操作架构,内置 events 模块 events 模块提供了 EventEmitter 类 这个 ...

最新文章

  1. 面向对象数据库NDatabase_初识
  2. bootsrtap h5 移动版页面 在苹果手机ios滑动上下拉动滚动卡顿问题解决方法
  3. 1088 最长回文子串
  4. php -q poller.php --force,cacti 安装后没有图像
  5. 《Python Cookbook 3rd》笔记(1.12):序列中出现次数最多的元素
  6. LINUX内核下跑单片机按键,S3C2440下linux按键驱动编写及测试程序
  7. 视频号、抖音、海外Tiktok到底该选择那个平台更好
  8. 单链表插入、删除操作单步解析(十三)
  9. $(document).ready()方法和window.onload()方法
  10. 阿里矢量图三种在线引用方法
  11. php 判断是否是拼音,php汉字转拼音的示例
  12. 关于Android的方向传感器
  13. html里覆写css样式,!important覆写css行内样式
  14. 服务器微信了早上好,每天问候早上好的微信话语
  15. 国内10大技术网站,你最爱和哪个玩耍?
  16. 小度霸屏头部综艺,智能音箱迎来国民品牌
  17. 计算机二级报名江南大学,江南大学2017年上半年计算机二级报名时间
  18. 直流电机/步进电机/伺服电机 简介
  19. MATLAB中 Command Window 常用的命令及功能
  20. 190528每日一句,在最黑暗的时刻,我们必须集中精神寻找光明

热门文章

  1. 2022年,雅迪电动车还能保持销量神话吗?
  2. JDK8绿色安装详细步骤
  3. 怎样设置CorelDRAW中的网格参数
  4. mysql 数组_mysql怎么存数组
  5. 生物信息分析服务器平台,高通量数据生物信息分析平台
  6. 赛道和资本的玩儿法已经过气,SaaS公司活下去还能靠什么?
  7. 正版授权| iObit Uninstaller 12 Pro 专业卸载器工具
  8. 最大约数(秋季每日一题 34)
  9. 企业估值研究到底从何处着手?
  10. B站的经典封面制作方法