一、简介

EQueue是一个参照RocketMQ实现的开源消息队列中间件,具体可以参看作者的文章《分享一个c#写的开源分布式消息队列equeue》。项目开源地址:https://github.com/tangxuehua/equeue,项目中包含了队列的全部源代码以及如何使用的示例。

二、安装EQueue

Producer、Consumer、Broker支持分布式部署,安装EQueue需要.NET 4, Visual Studio 2010/2012/2013. 目前EQueue是个类库,需要自己实现Broker的宿主,可以参照QuickStart,创建一个QuickStart.BrokerServer项目,通过Visual Studio的Nuget 查找equeue

using System;
using System.Text;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.JsonNet;
using ECommon.Log4Net;
using EQueue.Broker;
using EQueue.Configurations;
using EQueue.Protocols;namespace QuickStart.BrokerServer
{class Program{static void Main(string[] args){InitializeEQueue();var setting = new BrokerSetting();setting.NotifyWhenMessageArrived = false;setting.DeleteMessageInterval = 1000;new BrokerController(setting).Initialize().Start();Console.ReadLine();}static void InitializeEQueue(){Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();}}
}

InitializeEQueue方法初始化EQueue的环境,使用了Autofac作为IOC容器,使用log4Net记录日志, 我们看一下RegisterEQueueComponents方法:

   public static class ConfigurationExtensions{public static Configuration RegisterEQueueComponents(this Configuration configuration){configuration.SetDefault<IAllocateMessageQueueStrategy, AverageAllocateMessageQueueStrategy>();configuration.SetDefault<IQueueSelector, QueueHashSelector>();configuration.SetDefault<ILocalOffsetStore, DefaultLocalOffsetStore>();configuration.SetDefault<IMessageStore, InMemoryMessageStore>();configuration.SetDefault<IMessageService, MessageService>();configuration.SetDefault<IOffsetManager, InMemoryOffsetManager>();return configuration;}}

代码中涉及到6个组件:

  • IAllocateMessageQueueStrategy
  • IQueueSelector
  • ILocalOffsetStore
  • IMessageStore
  • IMessageService
  • IOffsetManager

DeleteMessageInterval 这个属性是用来设置equeue的定时删除间隔,单位为毫秒,默认值是一个小时。另外还有ProducerSocketSetting 和 ConsumerSocketSetting 分别用于设置Producer连接Broker和Consumer连接Broker的IP和端口,默认端口是5000和5001。

 public class BrokerSetting{public SocketSetting ProducerSocketSetting { get; set; }public SocketSetting ConsumerSocketSetting { get; set; }public bool NotifyWhenMessageArrived { get; set; }public int DeleteMessageInterval { get; set; }public BrokerSetting(){ProducerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5000, Backlog = 5000 };ConsumerSocketSetting = new SocketSetting { Address = SocketUtils.GetLocalIPV4().ToString(), Port = 5001, Backlog = 5000 };NotifyWhenMessageArrived = true;DeleteMessageInterval = 1000 * 60 * 60;}}

运行项目,如果显示下面类似内容,说明Broker启动成功:

2014-03-23 20:10:30,255  INFO BrokerController - Broker started, producer:[169.254.80.80:5000], consumer:[169.254.80.80:5001]

三、在Visual Studio中开发测试

1.创建一个VS项目 QuickStart.ProducerClient,通过Nuget引用EQueue,编写下面Producer代码

 using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Clients.Producers;
using EQueue.Configurations;
using EQueue.Protocols;namespace QuickStart.ProducerClient
{class Program{static void Main(string[] args){InitializeEQueue();var scheduleService = ObjectContainer.Resolve<IScheduleService>();var producer = new Producer().Start();var total = 1000;var parallelCount = 10;var finished = 0;var messageIndex = 0;var watch = Stopwatch.StartNew();var action = new Action(() =>{for (var index = 1; index <= total; index++){var message = "message" + Interlocked.Increment(ref messageIndex);producer.SendAsync(new Message("SampleTopic", Encoding.UTF8.GetBytes(message)), index.ToString()).ContinueWith(sendTask =>{var finishedCount = Interlocked.Increment(ref finished);if (finishedCount % 1000 == 0){Console.WriteLine(string.Format("Sent {0} messages, time spent:{1}", finishedCount, watch.ElapsedMilliseconds));}});}});var actions = new List<Action>();for (var index = 0; index < parallelCount; index++){actions.Add(action);}Parallel.Invoke(actions.ToArray());Console.ReadLine();}static void InitializeEQueue(){Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();}}
}

Producer对象在使用之前必须要调用Start初始化,初始化一次即可, 注意:切记不可以在每次发送消息时,都调用Start方法。Producer 默认连接本机的5000端口,可以通过ProducerSetting 进行设置,可以参看下面的代码:

 public class ProducerSetting{public string BrokerAddress { get; set; }public int BrokerPort { get; set; }public int SendMessageTimeoutMilliseconds { get; set; }public int UpdateTopicQueueCountInterval { get; set; }public ProducerSetting(){BrokerAddress = SocketUtils.GetLocalIPV4().ToString();BrokerPort = 5000;SendMessageTimeoutMilliseconds = 1000 * 10;UpdateTopicQueueCountInterval = 1000 * 5;}

2、创建一个VS项目 QuickStart.ConsumerClient,通过Nuget引用EQueue,编写下面Consumer代码

using System;
using System.Linq;
using System.Text;
using System.Threading;
using ECommon.Autofac;
using ECommon.Configurations;
using ECommon.IoC;
using ECommon.JsonNet;
using ECommon.Log4Net;
using ECommon.Scheduling;
using EQueue.Broker;
using EQueue.Clients.Consumers;
using EQueue.Configurations;
using EQueue.Protocols;namespace QuickStart.ConsumerClient
{class Program{static void Main(string[] args){InitializeEQueue();var messageHandler = new MessageHandler();var consumer1 = new Consumer("Consumer1", "group1").Subscribe("SampleTopic").Start(messageHandler);var consumer2 = new Consumer("Consumer2", "group1").Subscribe("SampleTopic").Start(messageHandler);var consumer3 = new Consumer("Consumer3", "group1").Subscribe("SampleTopic").Start(messageHandler);var consumer4 = new Consumer("Consumer4", "group1").Subscribe("SampleTopic").Start(messageHandler);Console.WriteLine("Start consumer load balance, please wait for a moment.");var scheduleService = ObjectContainer.Resolve<IScheduleService>();var waitHandle = new ManualResetEvent(false);var taskId = scheduleService.ScheduleTask(() =>{var c1AllocatedQueueIds = consumer1.GetCurrentQueues().Select(x => x.QueueId);var c2AllocatedQueueIds = consumer2.GetCurrentQueues().Select(x => x.QueueId);var c3AllocatedQueueIds = consumer3.GetCurrentQueues().Select(x => x.QueueId);var c4AllocatedQueueIds = consumer4.GetCurrentQueues().Select(x => x.QueueId);if (c1AllocatedQueueIds.Count() == 1 && c2AllocatedQueueIds.Count() == 1 && c3AllocatedQueueIds.Count() == 1 && c4AllocatedQueueIds.Count() == 1){Console.WriteLine(string.Format("Consumer load balance finished. Queue allocation result: c1:{0}, c2:{1}, c3:{2}, c4:{3}",string.Join(",", c1AllocatedQueueIds),string.Join(",", c2AllocatedQueueIds),string.Join(",", c3AllocatedQueueIds),string.Join(",", c4AllocatedQueueIds)));waitHandle.Set();}}, 1000, 1000);waitHandle.WaitOne();scheduleService.ShutdownTask(taskId);Console.ReadLine();}static void InitializeEQueue(){Configuration.Create().UseAutofac().RegisterCommonComponents().UseLog4Net().UseJsonNet().RegisterEQueueComponents();}}class MessageHandler : IMessageHandler{private int _handledCount;public void Handle(QueueMessage message, IMessageContext context){var count = Interlocked.Increment(ref _handledCount);if (count % 1000 == 0){Console.WriteLine("Total handled {0} messages.", count);}context.OnMessageHandled(message);}}
}

使用方式给用户感觉是消息从EQueue服务器推到了应用客户端。 但是实际Consumer内部是使用长轮询Pull方式从EQueue服务器拉消息,然后再回调用户Listener方法。Consumer默认连接本机的5001端口,可以通过ConsumerSetting 进行设置,可以参看下面的代码:

    public class ConsumerSetting{public string BrokerAddress { get; set; }public int BrokerPort { get; set; }public int RebalanceInterval { get; set; }public int UpdateTopicQueueCountInterval { get; set; }public int HeartbeatBrokerInterval { get; set; }public int PersistConsumerOffsetInterval { get; set; }public PullRequestSetting PullRequestSetting { get; set; }public MessageModel MessageModel { get; set; }public MessageHandleMode MessageHandleMode { get; set; }public ConsumerSetting(){BrokerAddress = SocketUtils.GetLocalIPV4().ToString();BrokerPort = 5001;RebalanceInterval = 1000 * 5;HeartbeatBrokerInterval = 1000 * 5;UpdateTopicQueueCountInterval = 1000 * 5;PersistConsumerOffsetInterval = 1000 * 5;PullRequestSetting = new PullRequestSetting();MessageModel = MessageModel.Clustering;MessageHandleMode = MessageHandleMode.Parallel;}

本文转自 张善友 51CTO博客,原文链接:http://blog.51cto.com/shanyou/1381850,如需转载请自行联系原作者

c#开源消息队列中间件EQueue 教程相关推荐

  1. 消息队列中间件 Message Queue 简称:MQ

    一.什么是消息队列? 消息队列,一般我们会简称它为MQ(Message Queue) 队列是一种先进先出的数据结构. 现有常用的开源消息中间件有Kafka.CMQ.JBoss Messaging.JO ...

  2. rabbitmq实战:高效部署分布式消息队列_一文看懂消息队列中间件--AMQ及部署介绍...

    概述 最近有个小项目用到了AMQ来做消息队列,之前介绍的主要是rabbitmq,所以今天主要提一下AMQ,也简单介绍下两者的区别~ 消息队列中间件 消息队列中间件(简称消息中间件)是指利用高效可靠的消 ...

  3. 基于硬件的消息队列中间件 Solace 简介之二

    小短篇介绍关于Solace https://blog.csdn.net/aqudgv83/article/details/79495489 . 前面简单介绍了Solace来自于哪家公司, 主要能做哪些 ...

  4. 消息队列中间件之RabbitMQ(上)

    文章目录 1.MQ引言 1.1 什么是MQ 1.2 主流MQ以及其特点 ActiveMQ Kafka RocketMQ RabbitMQ 1.3 MQ的作用 2.RabbitMQ 的引言 2.1 Ra ...

  5. 浅谈消息队列及常见的分布式消息队列中间件

    背景 分布式消息队列中间件是是大型分布式系统不可缺少的中间件,通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理消息前不需要等待接收此消息.所以消息队列主要解决应用耦合.异步消 ...

  6. python消息队列中间件_常见的消息队列中间件介绍

    题目 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区别,以及适合哪些场景? 什么是消息队列 在正式介绍和对比Kafka. ...

  7. 消息中间件系列(七):如何从0到1设计一个消息队列中间件

    消息队列作为系统解耦,流量控制的利器,成为分布式系统核心组件之一. 如果你对消息队列背后的实现原理关注不多,其实了解消息队列背后的实现非常重要. 不仅知其然还要知其所以然,这才是一个优秀的工程师需要具 ...

  8. 使用kafka消息队列中间件实现跨进程,跨服务器的高并发消息通讯

    作者 | 陈屹       责编 | 欧阳姝黎 近来工作上接收到一项任务,实现c++后台服务器程序,要求它能承载千万级别的DAU读写请求.目前实现千万级高并发海量数据请求的服务器设计在"套路 ...

  9. ActiveMQ RabbitMQ RokcetMQ Kafka实战 消息队列中间件视频教程

    附上消息队列中间件百度网盘连接: 链接: https://pan.baidu.com/s/1FFZQ5w17e1TlLDSF7yhzmA 密码: hr63 转载于:https://www.cnblog ...

最新文章

  1. 数据蒋堂 | SQL是描述性语言?
  2. raid0、raid1、raid5、raid10 flash
  3. 第十天2017/04/21(3、泛型编程:STL)
  4. linux kernel的spinlock代码导读和分析
  5. 机器学习之聚类算法的原理推导及相关知识总结
  6. dom4j创建、解析xml文件(增删改查)
  7. poj 2965 The Pilots Brothers' refrigerator
  8. Python——集合字典解析的对象无序和无副本特性
  9. 用C#新建XML文件
  10. 天梯—输出GPLT(C语言)
  11. Vue之$options
  12. 幕墙计算软件_案例BIM在玻璃幕墙参数化设计的应用
  13. gcc和g++的区别 (很详细的描述)
  14. 使用SSE指令集优化memcpy
  15. java多态 -- 猫狗案列
  16. 用Qt实现Q-M算法化简逻辑表达式及Qt项目打包发布方法
  17. Linux学习笔记-exec族函数,system函数,popen函数的用法
  18. 三极管的经典之作,你知道吗?
  19. 任天堂(Switch)游戏机底座带网口功能方案
  20. java.sql.SQLException: Subquery returns more than 1 row

热门文章

  1. 蓝桥杯 ALGO-150 算法训练 6-1 递归求二项式系数值
  2. 【软件测试】单元测试不属于动态测试
  3. java trylock超时_老师,死锁设置超时这个我知道意思,但是您能举个例子么
  4. java 串口 rxtx_【Java】基于RXTX的Java串口通信
  5. html中灰色怎么写,css如何实现置灰不可点
  6. react style: 二级菜单
  7. Centos 7.4版本升级内核3.10+ 到4+过程
  8. AGG第四十二课 Blitting an image over another with transparency
  9. idea中spark项目Scala语言读取properties文件
  10. mysql的半同步复制