ZeroMQ -- 四种模型简介
1. REQ/REP 请求响应模型
hwclient.c
//
// Hello World 客户端
// 连接REQ套接字至 tcp://localhost:5555
// 发送Hello给服务端,并接收World
//
// Hello World client
#include <zmq.h>
#include <string.h>
#include <stdio.h>
#include <unistd.h>
int main (void)
{printf ("Connecting to hello world server...\n");void *context = zmq_ctx_new ();// 连接至服务端的套接字void *requester = zmq_socket (context, ZMQ_REQ);zmq_connect (requester, "tcp://localhost:5555");int request_nbr;for (request_nbr = 0; request_nbr != 10; request_nbr++) {char buffer [10];printf ("正在发送 Hello %d...\n", request_nbr);zmq_send (requester, "Hello", 5, 0);zmq_recv (requester, buffer, 10, 0);printf ("接收到 World %d\n", request_nbr);}zmq_close (requester);zmq_ctx_destroy (context);return 0;
}
hwserver.c
//
// Hello World 服务端
// 绑定一个REP套接字至tcp://*:5555
// 从客户端接收Hello,并应答World
//#include <zmq.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <assert.h>
//gcc -o hwserver hwserver.c -lzmq
int main (void)
{// Socket to talk to clientsvoid *context = zmq_ctx_new ();// 与客户端通信的套接字void *responder = zmq_socket (context, ZMQ_REP);int rc = zmq_bind (responder, "tcp://*:5555");assert (rc == 0);while (1) {// 等待客户端请求char buffer [10];zmq_recv (responder, buffer, 10, 0);printf ("收到 Hello\n");sleep (1); // Do some 'work'// 返回应答zmq_send (responder, "World", 5, 0);}return 0;
}
2. PUB/SUB发布订阅模型
wuserver.c
// Weather update server
// Binds PUB socket to tcp://*:5556
// Publishes random weather updates#include "zhelpers.h"int main (void)
{// Prepare our context and publishervoid *context = zmq_ctx_new ();void *publisher = zmq_socket (context, ZMQ_PUB);int rc = zmq_bind (publisher, "tcp://*:5556");assert (rc == 0);// Initialize random number generatorsrandom ((unsigned) time (NULL));while (1) {// Get values that will fool the bossint zipcode, temperature, relhumidity;zipcode = randof (100000);temperature = randof (215) - 80;relhumidity = randof (50) + 10;// Send message to all subscriberschar update [20];sprintf (update, "%05d %d %d", zipcode, temperature, relhumidity);s_send (publisher, update);}zmq_close (publisher);zmq_ctx_destroy (context);return 0;
}
wuclient.c
// Weather update client
// Connects SUB socket to tcp://localhost:5556
// Collects weather updates and finds avg temp in zipcode#include "zhelpers.h"int main (int argc, char *argv [])
{// Socket to talk to serverprintf ("Collecting updates from weather server...\n");void *context = zmq_ctx_new ();void *subscriber = zmq_socket (context, ZMQ_SUB);int rc = zmq_connect (subscriber, "tcp://localhost:5556");assert (rc == 0);// Subscribe to zipcode, default is NYC, 10001const char *filter = (argc > 1)? argv [1]: "10001 ";rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));assert (rc == 0);// Process 100 updatesint update_nbr;long total_temp = 0;for (update_nbr = 0; update_nbr < 100; update_nbr++) {char *string = s_recv (subscriber);int zipcode, temperature, relhumidity;sscanf (string, "%d %d %d",&zipcode, &temperature, &relhumidity);total_temp += temperature;free (string);}printf ("Average temperature for zipcode '%s' was %dF\n",filter, (int) (total_temp / update_nbr));zmq_close (subscriber);zmq_ctx_destroy (context);return 0;
}
3. Push/Pull推拉模型
taskevent.c
// Task ventilator
// Binds PUSH socket to tcp://localhost:5557
// Sends batch of tasks to workers via that socket#include "zhelpers.h"int main (void)
{void *context = zmq_ctx_new ();// Socket to send messages onvoid *sender = zmq_socket (context, ZMQ_PUSH);zmq_bind (sender, "tcp://*:5557");// Socket to send start of batch message onvoid *sink = zmq_socket (context, ZMQ_PUSH);zmq_connect (sink, "tcp://localhost:5558");printf ("Press Enter when the workers are ready: ");getchar ();printf ("Sending tasks to workers...\n");// The first message is "0" and signals start of batchs_send (sink, "0");// Initialize random number generatorsrandom ((unsigned) time (NULL));// Send 100 tasksint task_nbr;int total_msec = 0; // Total expected cost in msecsfor (task_nbr = 0; task_nbr < 100; task_nbr++) {int workload;// Random workload from 1 to 100msecsworkload = randof (100) + 1;total_msec += workload;char string [10];sprintf (string, "%d", workload);s_send (sender, string);}printf ("Total expected cost: %d msec\n", total_msec);zmq_close (sink);zmq_close (sender);zmq_ctx_destroy (context);return 0;
}
taskwork.c
// Task worker
// Connects PULL socket to tcp://localhost:5557
// Collects workloads from ventilator via that socket
// Connects PUSH socket to tcp://localhost:5558
// Sends results to sink via that socket#include "zhelpers.h"int main (void)
{// Socket to receive messages onvoid *context = zmq_ctx_new ();void *receiver = zmq_socket (context, ZMQ_PULL);zmq_connect (receiver, "tcp://localhost:5557");// Socket to send messages tovoid *sender = zmq_socket (context, ZMQ_PUSH);zmq_connect (sender, "tcp://localhost:5558");// Process tasks foreverwhile (1) {char *string = s_recv (receiver);printf ("%s.", string); // Show progressfflush (stdout);s_sleep (atoi (string)); // Do the workfree (string);s_send (sender, ""); // Send results to sink}zmq_close (receiver);zmq_close (sender);zmq_ctx_destroy (context);return 0;
}
tasksink.c
// Task sink
// Binds PULL socket to tcp://localhost:5558
// Collects results from workers via that socket#include "zhelpers.h"int main (void)
{// Prepare our context and socketvoid *context = zmq_ctx_new ();void *receiver = zmq_socket (context, ZMQ_PULL);zmq_bind (receiver, "tcp://*:5558");// Wait for start of batchchar *string = s_recv (receiver);free (string);// Start our clock nowint64_t start_time = s_clock ();// Process 100 confirmationsint task_nbr;for (task_nbr = 0; task_nbr < 100; task_nbr++) {char *string = s_recv (receiver);free (string);if (task_nbr % 10 == 0)printf (":");elseprintf (".");fflush (stdout);}// Calculate and report duration of batchprintf ("Total elapsed time: %d msec\n", (int) (s_clock () - start_time));zmq_close (receiver);zmq_ctx_destroy (context);return 0;
}
4. Router/Dealer模型
rrbroker.c
// Simple request-reply broker#include "zhelpers.h"int main (void)
{// Prepare our context and socketsvoid *context = zmq_ctx_new ();void *frontend = zmq_socket (context, ZMQ_ROUTER);void *backend = zmq_socket (context, ZMQ_DEALER);zmq_bind (frontend, "tcp://*:5559");zmq_bind (backend, "tcp://*:5560");// Initialize poll setzmq_pollitem_t items [] = {{ frontend, 0, ZMQ_POLLIN, 0 },{ backend, 0, ZMQ_POLLIN, 0 }};// Switch messages between socketswhile (1) {zmq_msg_t message;zmq_poll (items, 2, -1);if (items [0].revents & ZMQ_POLLIN) {while (1) {// Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, frontend, 0);int more = zmq_msg_more (&message);zmq_msg_send (&message, backend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break; // Last message part}}if (items [1].revents & ZMQ_POLLIN) {while (1) {// Process all parts of the messagezmq_msg_init (&message);zmq_msg_recv (&message, backend, 0);int more = zmq_msg_more (&message);zmq_msg_send (&message, frontend, more? ZMQ_SNDMORE: 0);zmq_msg_close (&message);if (!more)break; // Last message part}}}// We never get here, but clean up anyhowzmq_close (frontend);zmq_close (backend);zmq_ctx_destroy (context);return 0;
}
rrwoker.c
// Hello World worker
// Connects REP socket to tcp://localhost:5560
// Expects "Hello" from client, replies with "World"#include "zhelpers.h"
#include <unistd.h>int main (void)
{void *context = zmq_ctx_new ();// Socket to talk to clientsvoid *responder = zmq_socket (context, ZMQ_REP);zmq_connect (responder, "tcp://localhost:5560");while (1) {// Wait for next request from clientchar *string = s_recv (responder);printf ("Received request: [%s]\n", string);free (string);// Do some 'work'sleep (1);// Send reply back to clients_send (responder, "World");}// We never get here, but clean up anyhowzmq_close (responder);zmq_ctx_destroy (context);return 0;
}
rrclient.c
// Hello World client
// Connects REQ socket to tcp://localhost:5559
// Sends "Hello" to server, expects "World" back#include "zhelpers.h"int main (void)
{void *context = zmq_ctx_new ();// Socket to talk to servervoid *requester = zmq_socket (context, ZMQ_REQ);zmq_connect (requester, "tcp://localhost:5559");int request_nbr;for (request_nbr = 0; request_nbr != 10; request_nbr++) {s_send (requester, "Hello");char *string = s_recv (requester);printf ("Received reply %d [%s]\n", request_nbr, string);free (string);}zmq_close (requester);zmq_ctx_destroy (context);return 0;
}
参考资料:
[1] -- www.0voice.com
[2] --
ZeroMQ -- 四种模型简介相关推荐
- Java更新XML的四种常用方法简介
http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=393702 本文简要的讨论了Java语言编程中更新XML文档的四种常用方法,并且分 ...
- 7-4四种模型的解释_虚拟变量的设置以及交互项的解释
什么时候取对数 一.伍德里奇的取对数规则: 为了解决 (1)减弱数据的异方差性 (2)如果变量本身不符合正态分布,取 了对数后可能渐近服从正态分布 (3)模型形式的需要,让模型具有经济学意义. 采用四 ...
- Java四种引用简介
引语: 我们知道java相比C,C++中没有令人头痛的指针,但是却有和指针作用相似的引用对象(Reference),就是常说的引用,比如,Object obj = new Object():这个obj ...
- 人工神经网络模型有哪些,神经网络分类四种模型
有哪些深度神经网络模型 目前经常使用的深度神经网络模型主要有卷积神经网络(CNN).递归神经网络(RNN).深信度网络(DBN).深度自动编码器(AutoEncoder)和生成对抗网络(GAN)等. ...
- 神经网络分类四种模型,神经网络分类特点区别
神经网络有哪些主要分类规则并如何分类? 神经网络模型的分类人工神经网络的模型很多,可以按照不同的方法进行分类.其中,常见的两种分类方法是,按照网络连接的拓朴结构分类和按照网络内部的信息流向分类. 1按 ...
- rabbitMq第四种模型--direct
在Fanout模式中,一条消息,会被所有订阅的队列都消费. 但是,在某些场景下,我们希望不同的消息被不同的队列消费. 这时就要用到Direct类型的Exchange. 在Direct模型下:队列与交换 ...
- vs2013在使用ef6时,创建模型向导过程中,四种模型方式缺少2种
下载eftool,并安装 https://download.microsoft.com/download/2/C/F/2CF7AFAB-4068-4DAB-88C6-CEFD770FAECD/EFTo ...
- 已知空间中的三点 求三角形面积_【气宇轩昂】解三角形最值问题的四大模型尤其是第四种模型,简直不要太赞哦!!!...
点击上方蓝色字体"高中数学王晖"关注王晖老师,免费获取各种知识干货和学习经验~~~您的点赞转发是对老师的最大鼓舞~~~ 距高考还有262天 1 三角函数有界性 在三角函数中,正弦函 ...
- 语义解析(一) —— 概述(数据和模型简介)
一. 简介 语义解析是近几年发展起来的一个NLP的分支,主要目的是将自然语言的文本描述,自动转成机器语言(SQL)语句.也称Text-to-SQL, nl2SQL等.随着知识图谱的发展,也逐渐孵化出很 ...
- FLUENT中MRF模型简介及应用实例
本文主要介绍了FLUENT中的多重参考系(MRF)模型,并运用此模型以离心泵内部的流场为例,进行了数值模拟,得到了其压力分布.速度分布情况. 1.多重参考系(MRF)模型简介 FLUENT 可以进行整 ...
最新文章
- SQL Server存储过程输入参数使用表值
- 将文件名发送到服务器,将Paperclip路径文件名从服务器更新到s3(Updating Paperclip path file names from on server to s3)...
- android 通知显示时间,android:在特定时间显示通知?
- [第一章]一、面向对象思想的发展
- bzoj 2844: albus就是要第一个出场(线性基)
- android scrollview listview显示不全
- 微信小程序文本溢出的处理方法
- 工业机器人综合教学实训平台
- 中值滤波器和双边滤波器(python实现)
- 达梦数据库DCA培训总结
- window窗口切换快捷键
- Ubuntu调整缩放
- 禁用红蜘蛛自启的简单办法
- 在SVN服务器彻底删除文件
- 【JavaMap接口】HashMap源码解读实例
- 软件设计师:07-法律法规与标准化与多媒体基础
- Symantec赛门铁克官网下载地址
- php 计算一年中周数,php 计算出一年中每周的周一日期
- Python_Task09:文件与文件系统
- 【面试篇】前端点滴(css/css3)