使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ)

主要角色

首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色

ProducerBrokerConsumer

整体架构如下所示

自定义协议

首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , 消息的 生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息 ,所以在这里我们自定义一个协议如下.

消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费

消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除

消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求

消息生产者:需要遵循协议将生产的消息头部增加"SEND:" 表示生产消息

消息消费者:需要遵循协议向消息处理中心发送"CONSUME"字符串表示消费消息

流程顺序

项目构建流程

下面将整个MQ的构建流程过一遍

新建一个 Broker 类,内部维护一个 ArrayBlockingQueue 队列,提供生产消息和消费消息的方法, 仅仅具备存储服务功能

新建一个 BrokerServer 类,将 Broker 发布为服务到本地9999端口,监听本地9999端口的 Socket 链接,在接受的信息中进行我们的协议校验, 这里 仅仅具备接受消息,校验协议,转发消息功能;

新建一个 MqClient 类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法

测试:新建两个 MyClient 类对象,分别执行其生产方法和消费方法

具体使用流程

生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的 ArrayBlockingQueue 队列中.如果 ArrayBlockingQueue 队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败.

消息消息:客户端执行消费消息方法, Broker服务 会校验请求的信息的信息是否等于 CONSUME ,如果验证成功则从Broker内部维护的 ArrayBlockingQueue 队列的 Poll 出一个消息返回给客户端

代码演示

消息处理中心 Broker

/*** * 消息处理中心* */
public class Broker {// 队列存储消息的最大数量private final static int MAX_SIZE = 3;// 保存消息数据的容器private static ArrayBlockingQueue messageQueue = new ArrayBlockingQueue(MAX_SIZE);// 生产消息public static void produce(String msg) {if (messageQueue.offer(msg)) {System.out.println("成功向消息处理中心投递消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());} else {System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");}System.out.println("=======================");}// 消费消息public static String consume() { String msg = messageQueue.poll();if(msg !=null) {// 消费条件满足情况,从消息容器中取出一条消息System.out.println("已经消费消息:"+ msg +",当前暂存的消息数量是:"+ messageQueue.size());   }else{            System.out.println("消息处理中心内没有消息可供消费!");        }   System.out.println("=======================");returnmsg; }
}}

消息处理中心服务 BrokerServer

客户端 MqClient


/*** * 用于启动消息处理中心* */
public class BrokerServer implements Runnable {public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket) {this.socket = socket;}@Overridepublic void run() {try (BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())) {while (true) {String str = in.readLine();if (str == null) {continue;}System.out.println("接收到原始数据:" + str);if (str.equals("CONSUME")) {// CONSUME 表示要消费一条消息//从消息队列中消费一条消息String message = Broker.consume();out.println(message);out.flush();} else if (str.contains("SEND:")) {// 接受到的请求包含SEND:字符串 表示生产消息放到消息队列中Broker.produce(str);} else {System.out.println("原始数据:" + str + "没有遵循协议,不提供相关服务");}}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {ServerSocket server = new ServerSocket(SERVICE_PORT);while (true) {BrokerServer brokerServer = new BrokerServer(server.accept());new Thread(brokerServer).start();}}
}

测试MQ

public class ProduceClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();client.produce("SEND:Hello World");}
}public class ConsumeClient {public static void main(String[] args) throws Exception {MqClient client = newMqClient();String message = client.consume();System.out.println("获取的消息为:" + message);}
}

我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志

接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:

1=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:
2=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:
3=======================接收到原始数据:SEND:Hello World消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!
=======================接收到原始数据:Hello World原始数据:Hello World没有遵循协议,不提供相关服务接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
2=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
1=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:
0=======================接收到原始数据:CONSUME消息处理中心内没有消息可供消费!=======================

小结

本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 “生产者,消费者,消费处理中心,协议” 有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ

Java实现消息队列服务相关推荐

  1. java 结合redis队列_在 Java 中使用 redis 的消息队列服务

    前言 关于 redis 我们前面已经讨论过了缓存.分布式锁.分布式唯一标识.LBS服务的用法,这里我们来谈谈利用 redis 来实现一个消息服务. 典型的消息服务是一个生产者和消费者模式的服务.一般是 ...

  2. Java常用消息队列原理介绍及性能对比

    消息队列使用场景 为什么会需要消息队列(MQ)? 解耦  在项目启动之初来预测将来项目会碰到什么需求,是极其困难的.消息系统在处理过程中间插入了一个隐含的.基于数据的接口层,两边的处理过程都要实现这一 ...

  3. 理论修炼之RabbitMQ,消息队列服务的稳健者

    ????欢迎点赞 :???? 收藏 ⭐留言 ???? 如有错误敬请指正,赐人玫瑰,手留余香! ????本文作者:由webmote 原创,首发于 [掘金] ????作者格言:生活在于折腾,当你不折腾生活 ...

  4. 滴滴出行基于RocketMQ构建企业级消息队列服务的实践

    \n 本文整理自滴滴出行消息队列负责人 江海挺 在Apache RocketMQ开发者沙龙北京站的分享. \n \n 滴滴出行的消息技术选型 \n 历史 \n 初期,公司内部没有专门的团队维护消息队列 ...

  5. (四)RabbitMQ消息队列-服务详细配置与日常监控管理

    (四)RabbitMQ消息队列-服务详细配置与日常监控管理 原文:(四)RabbitMQ消息队列-服务详细配置与日常监控管理 RabbitMQ服务管理 启动服务:rabbitmq-server -de ...

  6. Amazon SQS 消息队列服务

    Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证. sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格 ...

  7. C#中使用消息队列服务

    C#中使用Windows消息队列服务 http://www.cnblogs.com/xinhaijulan/archive/2010/08/22/1805768.html http://h2appy. ...

  8. amazon sqs java_Amazon SQS 消息队列服务

    Amazon sqs是亚马逊提供的线上消息队列服务, 可以实现应用程序解耦,以及可靠性保证. sqs提供了两种消息队列, 一种是标准消息队列, 一种是先进先出队列(FIFO), 其区别是FIFO是严格 ...

  9. 快速搭建基于beanstalk的php消息队列服务

    本项目实现基于beanstalk的php消息队列服务,包括生产与消费消息案例 一.beanstalk介绍与安装:http://kr.github.io/beanstalkd/ 二.php消息队列处理, ...

  10. java 消息队列服务_ActiveMQ 消息队列服务

    1 ActiveMQ简介 1.1 ActiveMQ是什么 ActiveMQ是一个消息队列应用服务器(推送服务器).支持JMS规范. 1.1.1 JMS概述 全称:Java Message Servic ...

最新文章

  1. svm通俗讲解_机器学习算法:SVM
  2. 浅析NSTimer CADisplayLink内存泄露
  3. 递归算法题解析:设m,n均为自然数,m可表示为一些不超过n的自然数之和,f(m,n)为这种表示方式的数目
  4. Activiti学习(二)数据表结构
  5. 再度吐槽,PHP在centos7的安装方式稍不注意可能就打击你的积极性
  6. java实现log4j_log4j在java中实现
  7. python表示数字6_Python3 数字Number(六)
  8. Generator函数的基本概念
  9. ps -eo 用户自定义格式显示
  10. NPAPI插件:不要使用malloc,使用NPN_MemAlloc
  11. Head First设计模式(中文版)PDF
  12. 在linux下比较好用的chm阅读器和飞信软件
  13. Vue 的最大的优势是什么?
  14. 【Python】解决使用 plt.savefig 保存图片时一片空白
  15. 他妈的 Python(1):怎么发起一个同步的 HTTP 请求
  16. Endnote无法正确识别引文
  17. 做移动端电子签名发现canvas的 一些坑
  18. Android集成友盟推送服务
  19. 3D Human Pose Estimation with Spatial and Temporal Transformers论文笔记
  20. 倚天剑与屠龙刀java_菜鸟入门 java语言学习六大要点

热门文章

  1. Ubuntu查找文件
  2. 洛谷 [P3110] 驮运
  3. android 获取手机内存及SD卡内存可用空间
  4. 单片机课设中期报告_基于单片机的火灾自动报警系统 中期报告
  5. 读《彼得林奇教你理财》有感
  6. usb keyboard找不到驱动程序_让台式机也能用上蓝牙,毕亚兹USB蓝牙适配器体验
  7. bib文件引用参考文献方法--心得
  8. 下拉框输入模糊查询_高考英语听力考试查询、网上填报志愿时间、诈骗陷阱提防!全在这里了...
  9. 同窗情【之一】(词21首)
  10. 如果局域网当中两台电脑互相ping不通