消息队列 | java简单实现
1. 消息队列介绍:
消息队列
是MQ是一种系统间相互协作的通信机制
- Broker:消息处理中心,负责消息的接收、存储、转发等;
- Producer:消息生产者,负责产生和发送消息到消息处理中心;
- Consumer:消息消费者,负责从消息处理中心获取消息,并进行相应的处理。
2. java设计一个简单的消息队列
其结构如下所示:
2.1 消息处理中心
作为消息处理中心,至少有一个数据容器来保存接收到的消息。这里采用java中队列(Queue)的一个子类ArrayBockingQueue来实现。
如下是消息处理中心Broker的实现:
import java.util.concurrent.ArrayBlockingQueue;public class Broker {private final static int MAX_SIZE = 3;private static ArrayBlockingQueue<String> messageQueue = new ArrayBlockingQueue<>(MAX_SIZE);public static void produce(String msg){if(messageQueue.offer(msg)){System.out.println("成功向消息处理中心投递消息: " + msg + ",当前缓存的消息数量是:"+ messageQueue.size());} else{System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");}System.out.println("==============================");}public static String consume(){String msg = messageQueue.poll();if(msg != null){System.out.println("已经消费消息:" + msg + ",当前暂存的消息数量是:" + messageQueue.size());} else {System.out.println("消息处理中心内没有消息可供消费!");}System.out.println("==============================");return msg;}
}
有了消息处理中心类后,需要将该类的功能暴露出去,这样别人才能够用它来发送和接收消息。我们定义了BrokerServer类用来对外提供Broker类的服务。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;public class BrokerServer implements Runnable{public static int SERVICE_PORT = 9999;private final Socket socket;public BrokerServer(Socket socket){this.socket = socket;}@Overridepublic void run(){try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){while (true){String str = in.readLine();if (str == null){continue;}System.out.println("接收到原始数据: " + str);if (str.equals("CONSUME")){String message = Broker.consume();out.println(message);out.flush();}else {Broker.produce(str);}}} catch (Exception e){e.printStackTrace();}}public static void main(String[] args) throws Exception{ServerSocket server = new ServerSocket(SERVICE_PORT);while(true){BrokerServer brokerServer = new BrokerServer(server.accept());new Thread(brokerServer).start();}}
}
在java中设计服务其功能的软件一般少不了套接字(Socket)和线程(Thread),需要通过线程的方式将应用启动起来,而服务器和应用的客户端需要用Socket进行网络通信。
2.2 客户端访问
有了消息处理中心服务器后,自然需要相应客户端来与之通信,来发送和接收消息。
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.Socket;public class MyClient {public static void produce(String message) throws Exception{Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);try(PrintWriter out = new PrintWriter(socket.getOutputStream())){out.println(message);out.flush();}}public static String consume() throws Exception{Socket socket = new Socket(InetAddress.getLocalHost(),BrokerServer.SERVICE_PORT);try(BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));PrintWriter out = new PrintWriter(socket.getOutputStream())){out.println("CONSUME");out.flush();String message = in.readLine();return message;}}
}
以上是通用的客户端访问代码,接下来分别看一个生产消息和消费消息的示例:
生产消息:
public class ProduceClient {public static void main(String[] args) throws Exception{MyClient client = new MyClient();client.produce("hello World.");}
}
消费消息:
public class ConsumeClient {public static void main(String[] args) throws Exception{MyClient client = new MyClient();String message = client.consume();System.out.println("获得的消息为: " + message);}
}
2.3 运行效果
- 开启BrokerServer服务
- 生产消息:ProduceClient
- 消费消息:ConsumeClient
消息队列 | java简单实现相关推荐
- 消息队列--RabbitMQ简单使用
安装使用 地址:http://www.rabbitmq.com/ 需要根据不同的版本选择不同的erlang 安装erlang 安装RabbitMQ首先需要安装erlang环境,根据GitHub上erl ...
- RabbitMq--AMQP高级消息队列协议--简单了解
2019独角兽企业重金招聘Python工程师标准>>> 一.安装 Rabbitmq 是用 erlang 语言写的,所以使用还要安装 Erlang. 安装 erlang 还要安装 py ...
- 设计两个程序要求用消息队列实现简单的聊天功能linux,linux软件工程师(C语言)实用教程第7章.ppt...
第7章 进程间的通信 2 本章重点 进程通信中信号概念及信号处理进程间的管道通信编程进程间的内存共享编程 3 7 1 1信号及其使用 信号是在软件层次上对中断机制的一种模拟 是一种异步通信方式 信号可 ...
- 设计两个程序要求用消息队列实现简单的聊天功能linux,Linux C 消息队列实现简单的聊天功能...
消息队列是提供一种带有数据标识的特殊管道,使得每一段被写入的数据都变成带标识的消息,读取该段消息的进程只要指定这个标识就可以正确地读取,而不会受到其他消息的干扰,.一个带标识的消息队列,就像并存的管道 ...
- flask模拟集群实现消息队列和简单高并发支持
思路: 1.一个总端口实现服务的代理和分发--使用gevent做协程,解决高并发: 2.多个子端口实现集群构建业务层--使用make_server,构成消息队列: 3.总端口/子端口--增加延迟启动/ ...
- Java消息队列与JMS的诞生
Java 帝国之消息队列 Java帝国之JMS的诞生 原创: 刘欣 码农翻身 2017-02-06 张家村的历史 Java 帝国的张家村正在迎来一次重大的变革. 5年前网上购物兴起的时候, 帝国非常看 ...
- java redis延迟队列_基于redis实现的延迟消息队列
delay-queue redis实现延迟消息队列 需求背景 最近在做一个排队取号的系统 在用户预约时间到达前XX分钟发短信通知 在用户预约时间结束时要判断用户是否去取号了,不然就记录为爽约 在用户取 ...
- 第十二期:面试官问你什么是消息队列?把这篇甩给他!
消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的. 一.什么是消息队列? 消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我 ...
- 消息队列的其他实现方式
在 Redis 5.0 之前消息队列的实现方式有很多种,比较常见的除了我们上文介绍的发布订阅模式,还有两种:List 和 ZSet 的实现方式. List 和 ZSet 的方式解决了发布订阅模式不能持 ...
最新文章
- spring原理案例-基本项目搭建 01 spring framework 下载 官网下载spring jar包
- Python3 命名规范
- JavaWeb--数据库添加
- 定义分销渠道(distribution channel)
- 多个输出用java怎么写_请问用java写程序怎么输出这两个图形
- java中线程的生命周期
- 张高兴的 .NET Core IoT 入门指南:(二)GPIO 的使用
- 基于Carbide.C++ 的Symbian开发环境搭建
- 千呼万唤始出来!诺基亚发预热海报:5摄手机真要来了
- java 递增 实现_Java编程实现递增排序链表的合并
- 在Docker上运行微服务
- 无人机通信与网络学习笔记
- Windows更改系统字体
- 写给非网工的CCNA教程(1)IP地址和MAC地址
- invalid operands to binary expression 二进制表达式的无效操作数
- 移动终端开发详解总结(二)(kotlin版)| CSDN创作打卡
- Python学习手册之数据类型
- TypeError: __init__() got an unexpected keyword argument ‘rate‘
- 谨慎使用达梦manger工具
- rtx3050显卡什么水平 rtx3050显卡相当于GTX什么级别