消息中间件之RabbitMQ使用
目录
- 简介
- RabbitMQ
- 生产者封装
- 消费者封装
- 父类
- 消费者类
声明: 在本文以及本系列文中, 不会涉及公司内部相关内容,旨在能帮助到努力前进的人
本系列会依次涉及到ActiveMQ
,RabbitMQ
,kafka
,RocketMQ
简介
什么是消息中间件?
一个进行消息传递的组件, 好比一个快递
, 当你上网购买了一个东西, 传统方式是店家--------->顾客
,也就是店家直接把东西给你送过来. 那么消息中间件就是店家------>快递------>顾客
这种方式.
消息中间件的优势?
数据传递, 系统间解耦, 错峰 …
我们在设计一套系统的时候, 有些时候会面临一些例如业务处理逻辑耗时长
, 并发请求压力大
等等问题. 有些时候我们通过优化代码或者横向扩展等等方式也可以解决一些问题. 那么我们除了采用上面的方式之外,还可以使用消息中间件
的优势来优雅的帮助我们解决一些问题.
本文涉及到的是使用比较广泛的消息中间件之一RabbitMQ
RabbitMQ
开发语言:Erlang – 面向并发的编程语言
支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗.
运行模式
- 简单队列
- work模式
- 发布\订阅模式
- 路由模式
- 主题模式
以上便是它所支持的五种消息传递模式, 除此之外,他还有一种支持RPC
调用的方式.
关于它的一些基础知识, 我们就介绍到这里了, 如果你还想具体的了解其他的概念或者详细的信息, 可以通过官网查看, 笔者这里不再赘述.
生产者封装
基本概念介绍完了, 我们接下来要对生产者和消费者进行一个封装, 大致的逻辑如下:
- 首先连接上MQ
- 然后封装一个将Object转字节码的方法
- 最后根据想要的发送数据的模式, 进行发送数据即可.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @Title RabbitMQProducer* @author GuoQ.yin*/
public class RabbitMQProducer {private static Logger logger = LoggerFactory.getLogger(RabbitMQProducer.class);private Connection connection = null;private Channel channel = null;private String rabbitHost; //rabbitMQ地址private String userName; //用户名private String passWord; //密码public RabbitMQProducer(String rabbitHost, String userName, String passWord) {this.rabbitHost = rabbitHost;this.userName = userName;this.passWord = passWord;getConnection();}/** 获取连接* */private void getConnection() {if(connection == null) {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(rabbitHost);connectionFactory.setUsername(userName);connectionFactory.setPassword(passWord);connectionFactory.setAutomaticRecoveryEnabled(true);try {connection = connectionFactory.newConnection();channel = connection.createChannel();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}}/*** 简单模式发送消息* @param String queueName 队列名称* @param Object sendData 发送数据* * */public void sendForSimpleQueue(String queueName, Object sendData) throws IOException, TimeoutException {channel.queueDeclare(queueName,false,false,false,null);channel.basicPublish("",queueName,null, RabbitMQProducer.toByteArray(sendData));logger.debug("send----->>> " + String.valueOf(sendData));
// channel.close();
// connection.close();}/*** 订阅模式发送消息* @param String exchangeName 交换机名称* @param Object sendData 发送数据* * */public void sendForSubScribe(String exchangeName, Object sendData) throws IOException {channel.exchangeDeclare(exchangeName,"fanout", true,true,null);channel.basicPublish(exchangeName,"",null,String.valueOf(sendData).getBytes());logger.debug("send----->>> " + String.valueOf(sendData));}/*** 路由模式发送消息* @param String exchangeName 交换机名称* @param String routingKey 路由键* @param Object sendData 发送数据* * */public void SendForRouting(String exchangeName, String routingKey, Object sendData) throws IOException {channel.exchangeDeclare(exchangeName, "direct");channel.basicPublish(exchangeName, routingKey, null, String.valueOf(sendData).getBytes());logger.debug("send----->>> " + String.valueOf(sendData));}/*** topics模式模式发送消息* @param String exchangeName 交换机名称* @param String routingKey 路由键(# 表示匹配一个或多个词;(lazy.a.b.c), 表示匹配一个词;(a.orange.b))* @param Object sendData 发送数据* * */public void SendForTopics(String exchangeName, String routingKey, Object sendData) throws IOException {channel.exchangeDeclare(exchangeName,"topic");channel.basicPublish(exchangeName,routingKey,false,false,null,String.valueOf(sendData).getBytes());logger.debug("send----->>> " + String.valueOf(sendData));}/** * 对象转数组 * @param obj * @return */ public static byte[] toByteArray (Object obj) { byte[] bytes = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); bytes = bos.toByteArray (); oos.close(); bos.close(); } catch (IOException ex) { ex.printStackTrace(); } return bytes; } //调用示例public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {RabbitMQProducer RMQ = new RabbitMQProducer("192.168.1.162", "test", "test");List<Map<String, Object>> sendList = new ArrayList<Map<String,Object>>();Map<String, Object> sendMap = new HashMap<String, Object>();sendMap.put("hello", "world");sendList.add(sendMap);RMQ.sendForSimpleQueue("haha", sendList);}
}
消费者封装
因为要封装一个工具类, 所以要通用. 在rabbitmq
接收数据的时候, 代码会一直停留在他获取数据的那一块, 所以我们会先封装一个父类(声明一个队列
), 然后让消费者的封装类,继承父类, 然后每次我们消费者获取到放到我们自己的队列里, 然后我们在从队列里面获取数据, 这样也不会阻塞原有的数据接收.
父类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;public abstract class AbstractReceiver implements Runnable {private static Logger logger = LoggerFactory.getLogger(AbstractReceiver.class);private int MAX_QUEUE_SIZE = 100000;protected BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(MAX_QUEUE_SIZE);public Object getData() {try {Object data = queue.take();logger.debug("take que, queue size=" + queue.size());return data;} catch (Exception exp) {logger.error("getData error", exp);return null;}}protected void putData(Object data) {try {queue.put(data); logger.debug("put que, queue size=" + queue.size());} catch (Exception exp) {logger.error("putData error", exp);}}public int getDataSize(){int size = queue.size();return size;}
}
消费者类
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.TimeoutException;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.AMQP.BasicProperties;/*** @Title RabbitMQReceiver* @author GuoQ.yin*/
public class RabbitMqReceiver extends AbstractReceiver{private static Logger logger = LoggerFactory.getLogger(RabbitMqReceiver.class);private static Connection connection = null; //连接private static Channel channel = null; //通道private static RabbitMqReceiver rabbitMq = null;private static boolean ackFlag = true;private static String rabbitHost; //rabbitMQ地址private static String userName; //用户名private static String passWord; //密码private static String queueName; //队列名称private static String exchangeName; //交换机名称private static String routingKey; //路由键private static String routingMode; //路由模式 fanout direct topic /**简单模式* @param rabbitHost rabbitMQ地址* @param userName 用户名* @param passWord 密码* @param queueName 队列名称* */public static RabbitMqReceiver getInstance(String rabbitHost, String userName, String passWord, String queueName) {if(rabbitMq == null) {rabbitMq = new RabbitMqReceiver(rabbitHost, userName, passWord, queueName);}getConnection();return rabbitMq;}/**订阅模式* @param rabbitHost rabbitMQ地址* @param userName 用户名* @param passWord 密码* @param queueName 队列名称* @param exchangeName 交换机名称* */public static RabbitMqReceiver getInstance(String rabbitHost, String userName, String passWord, String queueName, String exchangeName) {if(rabbitMq == null) {rabbitMq = new RabbitMqReceiver(rabbitHost, userName, passWord, queueName, exchangeName);}getConnection();return rabbitMq;}/**路由模式* @param rabbitHost rabbitMQ地址* @param userName 用户名* @param passWord 密码* @param queueName 队列名称* @param exchangeName 交换机名称* @param routingKey 路由键(在topic模式下, #:表示匹配一个或多个词;(lazy.a.b.c) *:表示匹配一个词;(a.orange.b))* @param routingMode 路由模式 支持 direct直接路由, topic通配符 两种模式* */public static RabbitMqReceiver getInstance(String rabbitHost, String userName, String passWord, String queueName, String exchangeName, String routingKey, String routingMode) {if(rabbitMq == null) {rabbitMq = new RabbitMqReceiver(rabbitHost, userName, passWord, queueName, exchangeName, routingKey, routingMode);}getConnection();return rabbitMq;}/**简单模式* @param rabbitHost rabbitMQ地址* @param userName 用户名* @param passWord 密码* @param queueName 队列名称* */@SuppressWarnings("static-access")public RabbitMqReceiver(String rabbitHost, String userName, String passWord, String queueName) {this.rabbitHost = rabbitHost;this.userName = userName;this.passWord = passWord;this.queueName = queueName;}/**订阅模式* @param rabbitHost rabbitMQ地址* @param userName 用户名* @param passWord 密码* @param queueName 队列名称* @param exchangeName 交换机名称* */@SuppressWarnings("static-access")public RabbitMqReceiver(String rabbitHost, String userName, String passWord, String queueName, String exchangeName) {this.rabbitHost = rabbitHost;this.userName = userName;this.passWord = passWord;this.queueName = queueName;this.exchangeName = exchangeName;}/**路由模式* @param rabbitHost rabbitMQ地址* @param userName 用户名* @param passWord 密码* @param queueName 队列名称* @param exchangeName 交换机名称* @param routingKey 路由键(在topic模式下, #:表示匹配一个或多个词;(lazy.a.b.c) *:表示匹配一个词;(a.orange.b))* @param routingMode 路由模式 支持 direct直接路由, topic通配符 两种模式* */@SuppressWarnings("static-access")public RabbitMqReceiver(String rabbitHost, String userName, String passWord, String queueName, String exchangeName, String routingKey, String routingMode) {this.rabbitHost = rabbitHost;this.userName = userName;this.passWord = passWord;this.queueName = queueName;this.exchangeName = exchangeName;this.routingKey = routingKey;this.routingMode = routingMode;}public RabbitMqReceiver() {}@Overridepublic void run() {try {//单队列模式channel.basicQos(1);//能者多劳channel.queueDeclare(queueName,false,false,false,null); //声明队列if(exchangeName != null) {if(routingMode!=null && routingKey!=null) {//路由模式 或者 Topics模式 #:表示匹配一个或多个词;(lazy.a.b.c) *:表示匹配一个词;(a.orange.b)channel.exchangeDeclare(exchangeName, routingMode);channel.queueBind(queueName, exchangeName, routingKey);} else {//订阅模式channel.queueBind(queueName,exchangeName,"");ackFlag = false;}}DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);logger.debug("RabbitMqReceiver------->>>" + new String(body,"UTF-8"));rabbitMq.putData(RabbitMqReceiver.toObject(body));if(ackFlag == false) {channel.basicAck(envelope.getDeliveryTag(),false);}}};channel.basicConsume(queueName,ackFlag,consumer);logger.info("RabbitMqReceiver BootUp Success!!");} catch (IOException e) {e.printStackTrace();logger.error("RabbitMqReceiver Consumer: ",e);}}/** 获取连接* */private static void getConnection() {if(connection == null) {ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(rabbitHost);connectionFactory.setUsername(userName);connectionFactory.setPassword(passWord);connectionFactory.setAutomaticRecoveryEnabled(true);try {connection = connectionFactory.newConnection();channel = connection.createChannel();} catch (IOException e) {e.printStackTrace();} catch (TimeoutException e) {e.printStackTrace();}}}/** * 数组转对象 * @param bytes * @return */ public static Object toObject (byte[] bytes) { Object obj = null; try { ByteArrayInputStream bis = new ByteArrayInputStream (bytes); ObjectInputStream ois = new ObjectInputStream (bis); obj = ois.readObject(); ois.close(); bis.close(); } catch (IOException ex) { ex.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } return obj; }//调用示例public static void main(String[] args) throws IOException {// RabbitMqReceiver RabbitMqReceiver = getInstance("192.168.1.162", "test", "test", "haha");
// Thread t = new Thread(RabbitMqReceiver);
// t.start();
//
// while(true){// Object data = RabbitMqReceiver.getData();
// System.out.println("getdata>>>"+((List<Map<String, Object>>) data).size());
// }}
}
以上便是针对RabbitMQ的生产者和消费者的封装, 可能有些地方还需要琢磨一下, 等有后续有时间了优化之后,笔者会第一时间更新.
最后和大家说一声: 加油~
消息中间件之RabbitMQ使用相关推荐
- 分布式消息中间件之RabbitMQ学习笔记[一]
写在前面 嗯,陆续的整理一些中间件的笔记 今天和小伙伴们分享RabbitMQ 相关笔记 博文偏理论,内容涉及: RabbitMQ的简单介绍 AMQP协议标准介绍 RabbitMQ Demo 食用方式: ...
- 【消息中间件】RabbitMQ 高级特性与应用问题
消息的可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 r ...
- 消息中间件之RabbitMq
消息中间件是系统间异步交互的重要手段之一,目前常用的消息中间件很多,包括Rabbitmq.Activemq.Rocketmq.IBM MQ.kafka等,这些都是成熟的技术体系,理论上,只要是可以存储 ...
- 消息中间件系列 - RabbitMQ
前言 本内容仅用于个人学习笔记,如有侵扰,联系删除 学习视频:https://www.bilibili.com/video/BV1cb4y1o7zz?spm_id_from=333.337.searc ...
- 【手把手】教你玩转消息中间件之RabbitMQ
1.微服务下现存的各种问题 服务调用问题 当两个服务调用时,可以通过传统的HTTP方式,让服务A直接去调用服务B的接口,但是这种方式是同步的方式,虽然可以采用SpringBoot提供的@Async注解 ...
- 消息中间件之RabbitMq学习
消息中间件的理论 消息中间件的3大作用 应用解耦:多服务之间互不关联,即便接收方服务挂掉也不影响发送方服务对客户交互 异步处理:类似解耦,提高响应速度 流量控制:也叫削峰,是高并发情况的解决方法之一, ...
- Spring Cloud Stream消息中间件通过RabbitMQ实现消息推送
一.Spring Cloud Stream是什么 Spring Cloud Stream 是一个构建消息驱动微服务的框架. Spring Cloud Stream解决了开发人员无感知的使用消息中间件的 ...
- mfc 消息消息队列概念_必看入门秘籍——解密原理:消息中间件之RabbitMQ
一.序言 RabbitMQ目前在开发中被广泛应用,这篇文章用尽可能浅显的语言来解释RabbitMQ的入门知识,希望能够帮助更多的人花费更好的时间入门. 如果你想更深入的了解RabbitMQ,可以继续关 ...
- 消息中间件之rabbitMQ实战-死信队列
该篇文章内容较多,包括有rabbitMq相关的一些简单理论介绍,集成spring Boot,provider消息推送实例,consumer消息消费实例,Direct(直连类型交换机).Fanout(广 ...
最新文章
- SCCM 2012系列4 配置SCCM2012 Endpoint Protection上
- 图的存储结构与操作--C语言实现
- 商用机型和家用机型区别
- DataSource--DBCP--C3P0--DBUtils
- 前端MVC Vue2学习总结(四)——条件渲染、列表渲染、事件处理器
- Linux网络编程 之 IO复用epoll(十)
- java 监控 native 内存_JVM NativeMemoryTracking 分析堆外内存泄露
- wordpress category.php,wordpress自定义分类目录模板
- oracle客观题题库,oralce题库及答案.doc
- 基于Rainbond开发Python云原生应用
- 二叉树遍历的一些非递归算法
- opencv4找不到opencv2/core/core.hpp: No such file or directory
- 金蝶K3 WISE所有单据数据库内码及描述对照表
- 5G学习-3GPP协议入门
- virtualbox 搭建opnsense 防火墙
- 算法 - 程序的灵魂
- 复权、前复权和后复权
- openGauss数据库开发调试工具指导
- 做PO难,难于上青天
- [编程入门]猴子吃桃的问题(JAVA解法)