java使用RabbitMQ,学习了解
1、MQ的基本概念
2、RabbitMQ的安装和配置
RabbitMq官网地址:http://www.rabbitmq.com/
安装所需要的三个包,百度网盘下载链接:
https://pan.baidu.com/s/1-AD8NrZa2N9JO6yTR1h3Yg
提取码:7ccy
# 上传三个软件包到/opt/
erlang-22.3.4.3-1.el7.x86_64.rpm
rabbitmq-server-3.8.11-1.el7.noarch.rpm
socat-1.7.3.2-2.el7.x86_64.rpm
# 安装依赖环境
yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
# 执行安装命令
rpm -ivh erlang-22.3.4.3-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.11-1.el7.noarch.rpm
# 安装管理界面
rabbitmq-plugins enable rabbitmq_management
# 服务器开放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
# 重启防火墙
systemctl restart firewalld.service 或 firewall-cmd --reload
# 最后如下图,默认登陆账号密码都是:guest# 添加并编辑配置
vim /etc/rabbitmq/rabbitmq.config
# 加入下面配置代码,注意后面有个点,guest不是必要的,可以去掉
[{rabbit, [{loopback_users, [guest]}]}].
# 重启rabbitmq服务,重新登录即可
systemctl restart rabbitmq-server.service 或 systemctl restart rabbitmq-server# rabbitmq安装路径
cd /usr/share/doc/rabbitmq-server-3.8.11/
RabbitMQ常用命令
# 添加用户
rabbitmqctl add_user <username> <password> # 删除用户
rabbitmqctl delete_user <username> # 修改用户密码
rabbitmqctl change_password <username> <newpassword> # 清除用户密码(该用户将不能使用密码登陆,但是可以通过SASL登陆如果配置了SASL认证)
rabbitmqctl clear_password <username> # 设置用户tags(相当于角色,包含administrator,monitoring,policymaker,management)
rabbitmqctl set_user_tags <username> <tag># 列出所有用户
rabbitmqctl list_users # 创建一个vhosts
rabbitmqctl add_vhost <vhostpath> # 删除一个vhosts
rabbitmqctl delete_vhost <vhostpath> # 列出vhosts
rabbitmqctl list_vhosts [<vhostinfoitem> ...] # 针对一个vhosts给用户赋予相关权限;
rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read> # 清除一个用户对vhosts的权限;
rabbitmqctl clear_permissions [-p <vhostpath>] <username> # 列出哪些用户可以访问该vhosts;
rabbitmqctl list_permissions [-p <vhostpath>] # 列出用户访问权限;
rabbitmqctl list_user_permissions <username>
添加用户,分配虚拟机权限
3、RabbitMQ快速入门
php想使用RabbitMQ的话,需要安装扩展,参考我的文章:
https://blog.csdn.net/cxhblog/article/details/114370062
生产者代码
package com.itheima.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer_HelloWorld {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* queue:队列名称* durable:是否持久化,当mq重启之后,还在* exclusive:是否独占,只能有一个消费者监听队列,当connection关闭时,是否删除队列* autoDelete:是否自动删除,当没有consumer时,自动删除掉* arguments:参数*///如果没有hello_world队列,则会创建该队列,有就不会创建channel.queueDeclare("hello_world",true,false,false,null);//发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* exchange:交换机名称,简单模式下,默认""* routingKey:路由名称* props:配置信息* body:消息数据*/String body = "hello rabbitmq......";channel.basicPublish("","hello_world",null,body.getBytes());//释放资源channel.close();connection.close();}
}
消费者代码
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer_HelloWorld {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* queue:队列名称* durable:是否持久化,当mq重启之后,还在* exclusive:是否独占,只能有一个消费者监听队列,当connection关闭时,是否删除队列* autoDelete:是否自动删除,当没有consumer时,自动删除掉* arguments:参数*///如果没有hello_world队列,则会创建该队列,有就不会创建channel.queueDeclare("hello_world",true,false,false,null);//接收消息/*** String queue, boolean autoAck, Consumer callback* queue:队列名称,跟生产者一致* autoAck:是否自动确认* callback:回调对象*/Consumer consumer = new DefaultConsumer(channel){/*** 回调方法,当收到消息后,会自动执行该方法* @param consumerTag:标识* @param envelope:获取一些信息,交换机,路由键。。。* @param properties:配置信息* @param body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("consumerTag:" + consumerTag);System.out.println("envelope:" + envelope.getExchange());System.out.println("properties:" + properties);System.out.println("body:" + new String(body));}};channel.basicConsume("hello_world",true,consumer);}
}
RabbitMQ 的工作模式
1、Work queues工作队列模式
生产者代码
package com.itheima.producer;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer_WorkQueues {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* queue:队列名称* durable:是否持久化,当mq重启之后,还在* exclusive:是否独占,只能有一个消费者监听队列,当connection关闭时,是否删除队列* autoDelete:是否自动删除,当没有consumer时,自动删除掉* arguments:参数*///如果没有hello_world队列,则会创建该队列,有就不会创建channel.queueDeclare("work_queues",true,false,false,null);//发送消息/*** String exchange, String routingKey, BasicProperties props, byte[] body* exchange:交换机名称,简单模式下,默认""* routingKey:路由名称* props:配置信息* body:消息数据*/for (int i = 1; i <= 10; i++){String body = i + "hello rabbitmq......";channel.basicPublish("","work_queues",null,body.getBytes());}//释放资源channel.close();connection.close();}
}
两份消费者代码
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer_WorkQueues1 {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建队列Queue/*** String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments* queue:队列名称* durable:是否持久化,当mq重启之后,还在* exclusive:是否独占,只能有一个消费者监听队列,当connection关闭时,是否删除队列* autoDelete:是否自动删除,当没有consumer时,自动删除掉* arguments:参数*///如果没有hello_world队列,则会创建该队列,有就不会创建channel.queueDeclare("work_queues",true,false,false,null);//接收消息/*** String queue, boolean autoAck, Consumer callback* queue:队列名称,跟生产者一致* autoAck:是否自动确认* callback:回调对象*/Consumer consumer = new DefaultConsumer(channel){/*** 回调方法,当收到消息后,会自动执行该方法* @param consumerTag:标识* @param envelope:获取一些信息,交换机,路由键。。。* @param properties:配置信息* @param body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:" + new String(body));}};channel.basicConsume("work_queues",true,consumer);}
}
2、Pub/Sub订阅模式
生产者代码
package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer_PubSub {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建交换机/*** String exchange,交换机名称* BuiltinExchangeType type,交换机类型(4种)* DIRECT("direct"), 定向* FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列* TOPIC("topic"),通配符的方式* HEADERS("headers"):参数匹配** boolean durable,是否持久化* boolean autoDelete,是否自动删除* boolean internal,内部使用,一般false* Map<String, Object> arguments:参数*/String exchangeName = "test_fanout";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//6、创建队列String queueName1 = "test_fanout_queue1";String queueName2 = "test_fanout_queue2";channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//7、绑定队列和交换机/*** String queue, 队列名称* String exchange,交换机* String routingKey:路由键,绑定规则,如果交换机的类型为fanout,routingKey设置为""*/channel.queueBind(queueName1,exchangeName,"");channel.queueBind(queueName2,exchangeName,"");//8、发送消息String body = "日志信息:张三调用了findAll方法。。。日志级别:info。。。";channel.basicPublish(exchangeName,"",null,body.getBytes());//9、释放资源channel.close();connection.close();}
}
两份消费者代码
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer_PubSub1 {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//接收消息/*** String queue, boolean autoAck, Consumer callback* queue:队列名称,跟生产者一致* autoAck:是否自动确认* callback:回调对象*/Consumer consumer = new DefaultConsumer(channel){/*** 回调方法,当收到消息后,会自动执行该方法* @param consumerTag:标识* @param envelope:获取一些信息,交换机,路由键。。。* @param properties:配置信息* @param body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:" + new String(body));System.out.println("将日志信息打印到控制台.....");}};String queueName = "test_fanout_queue1";channel.basicConsume(queueName,true,consumer);}
}
Routing路由模式
生产者代码
package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer_Routing {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建交换机/*** String exchange,交换机名称* BuiltinExchangeType type,交换机类型(4种)* DIRECT("direct"), 定向* FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列* TOPIC("topic"),通配符的方式* HEADERS("headers"):参数匹配** boolean durable,是否持久化* boolean autoDelete,是否自动删除* boolean internal,内部使用,一般false* Map<String, Object> arguments:参数*/String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);//6、创建队列String queueName1 = "test_direct_queue1";String queueName2 = "test_direct_queue2";channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//7、绑定队列和交换机/*** String queue, 队列名称* String exchange,交换机* String routingKey:路由键,绑定规则,如果交换机的类型为fanout,routingKey设置为""*///队列1channel.queueBind(queueName1,exchangeName,"error");//队列2channel.queueBind(queueName2,exchangeName,"error");channel.queueBind(queueName2,exchangeName,"info");channel.queueBind(queueName2,exchangeName,"warning");//8、发送消息String body = "日志信息:张三调用了findAll方法。。。日志级别:info。。。";channel.basicPublish(exchangeName,"info",null,body.getBytes());//9、释放资源channel.close();connection.close();}
}
两份消费者代码
package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer_Routing {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建交换机/*** String exchange,交换机名称* BuiltinExchangeType type,交换机类型(4种)* DIRECT("direct"), 定向* FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列* TOPIC("topic"),通配符的方式* HEADERS("headers"):参数匹配** boolean durable,是否持久化* boolean autoDelete,是否自动删除* boolean internal,内部使用,一般false* Map<String, Object> arguments:参数*/String exchangeName = "test_direct";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);//6、创建队列String queueName1 = "test_direct_queue1";String queueName2 = "test_direct_queue2";channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//7、绑定队列和交换机/*** String queue, 队列名称* String exchange,交换机* String routingKey:路由键,绑定规则,如果交换机的类型为fanout,routingKey设置为""*///队列1channel.queueBind(queueName1,exchangeName,"error");//队列2channel.queueBind(queueName2,exchangeName,"error");channel.queueBind(queueName2,exchangeName,"info");channel.queueBind(queueName2,exchangeName,"warning");//8、发送消息String body = "日志信息:张三调用了findAll方法。。。日志级别:info。。。";channel.basicPublish(exchangeName,"info",null,body.getBytes());//9、释放资源channel.close();connection.close();}
}
Topics通配符模式
*:代表一个单词,#:代码0或多个单词
生产者代码
package com.itheima.producer;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class Producer_Topics {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//5、创建交换机/*** String exchange,交换机名称* BuiltinExchangeType type,交换机类型(4种)* DIRECT("direct"), 定向* FANOUT("fanout"), 扇形(广播),发送消息到每一个与之绑定队列* TOPIC("topic"),通配符的方式* HEADERS("headers"):参数匹配** boolean durable,是否持久化* boolean autoDelete,是否自动删除* boolean internal,内部使用,一般false* Map<String, Object> arguments:参数*/String exchangeName = "test_topics";channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);//6、创建队列String queueName1 = "test_topics_queue1";String queueName2 = "test_topics_queue2";channel.queueDeclare(queueName1,true,false,false,null);channel.queueDeclare(queueName2,true,false,false,null);//7、绑定队列和交换机/*** String queue, 队列名称* String exchange,交换机* String routingKey:路由键,绑定规则,如果交换机的类型为fanout,routingKey设置为""*///系统的名称.日志的级别//需求:所有error级别的日志存入数据库,所有order系统的日志存入数据库String routingKey = "";channel.queueBind(queueName1,exchangeName,"#.error");channel.queueBind(queueName1,exchangeName,"order.*");channel.queueBind(queueName2,exchangeName,"*.*");//8、发送消息String body = "日志信息:张三调用了findAll方法。。。日志级别:info。。。";channel.basicPublish(exchangeName,"order.info",null,body.getBytes());//9、释放资源channel.close();connection.close();}
}
两份消费者代码
package com.itheima.consumer;import com.rabbitmq.client.*;import java.io.IOException;public class Consumer_Topic1 {public static void main(String[] args) throws Exception {//1、创建连接工厂ConnectionFactory conn = new ConnectionFactory();//2、设置连接参数conn.setVirtualHost("/");conn.setPort(5672);conn.setHost("106.14.36.65");conn.setUsername("guest");conn.setPassword("guest");//3、创建连接Connection connection = conn.newConnection();//4、创建channel管道Channel channel = connection.createChannel();//接收消息/*** String queue, boolean autoAck, Consumer callback* queue:队列名称,跟生产者一致* autoAck:是否自动确认* callback:回调对象*/Consumer consumer = new DefaultConsumer(channel){/*** 回调方法,当收到消息后,会自动执行该方法* @param consumerTag:标识* @param envelope:获取一些信息,交换机,路由键。。。* @param properties:配置信息* @param body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:" + new String(body));System.out.println("将日志信息保存到数据库.....");}};String queueName = "test_topic_queue1";channel.basicConsume(queueName,true,consumer);}
}
java使用RabbitMQ,学习了解相关推荐
- RabbitMQ学习笔记(3)----RabbitMQ Worker的使用
1. Woker队列结构图 这里表示一个生产者生产了消息发送到队列中,但是确有两个消费者在消费同一个队列中的消息. 2. 创建一个生产者 Producer如下: package com.wangx.r ...
- RabbitMQ学习
RabbitMQ学习 1.概述 用于进程通信的中间件. 优势: 劣势: 1.应用解耦:提高了系统容错性和可维护性 1.系统依赖越多不能保证MQ的高可用 2.异步提速:提升用户体验和系统吞吐量 2.复杂 ...
- Java初学者的学习路线建议
网络上看到过很多的java工程师的学习路线,内容很多,对于java初级人员来说,这种学习路线看的多了,就很容易混淆,最后并不能给自己一个深刻的认识和理解,我将一些重点,干货整理出来. 一.基础 Jav ...
- RabbitMQ 学习笔记
RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...
- Rabbitmq学习笔记(尚硅谷2021)
Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...
- 系统自学Java语言(学习视频整理)
以下分享的视频教程 99% 来源于B站(哔哩哔哩),其余来自于慕课网,希望这些视频能帮助你系统全面地自学 Java 语言. 目录 一.Java基础 二.数据结构与算法 三.图解Java设计模式 四.J ...
- 乐行学院RabbitMQ学习教程 第一章 RabbitMQ介绍(可供技术选型时使用)
乐行学院RabbitMQ学习教程 第一章 RabbitMQ介绍 RabbitMQ介绍 1.RabbitMQ技术简介 2.RabbitMQ其他扩展插件 2.1监控工具rabbitmq-managemen ...
- RabbitMQ学习(十五):消极确认(Negative Acknowledgements)
说明 在之前的一篇博文<RabbitMQ学习(十三):死信交换机 (Dead Letter Exchanges)>中我们了解到,消息变为死信有三个原因,其中就有因为消费者的消极确认(neg ...
- 【Java开发】2021最新最全的Java开发工程师学习路线
2021最新最全的Java开发学习路线 阶段一 (夯实基础) 一.Java基础语法 二.Java面向对象编程 三.Java核心类库 四.XML与JSON 五.算法与数据结构 六.数据库 七.JDBC技 ...
- Java 架构师学习路线
Java 架构师学习路线 一. 框架源码专题 1. 应用框架Spring 1.1. Spring IOC源码剖析 1.2. Spring AOP 源码剖析 1.3. Spring MVC 源码剖析 1 ...
最新文章
- MATLAB 只是冰山一角!一个海外资深程序员聊被卡脖子……
- 给妹子讲python-S01E23初识异常处理
- 服务器数据恢复难题--操作系统恢复的方法和思路
- [译]在CUDA C/C++中如何衡量代码性能
- LeetCode 406. 根据身高重建队列(排序)
- Acitivity创建与配置
- node.js学习笔记之模拟路由
- vue多语言插件vue-i18n
- 操作系统面试知识复习
- 51单片机——蓝牙远程点灯
- leap通过掌心或手指的某一关节做一条射线
- 分别使用御剑工具和dirsearch工具(需要在kali下进行安装)对http://159.75.16.25进行扫描, 扫描出敏感文件,敏感文件内有flag值
- python selenium 键盘操作 常用
- python处理xps文件_xps/pdf/png/json转换
- ps怎么更改背景图层大小_ps怎么修改图层大小
- 如何在微信窗口使用计算机,电脑微信窗口太大怎么办
- React-Native笔记--Debugger and device times had drifted by more than 60s.
- 华为交换机配置consol密码及vty密码
- 2021-04-06
- 第十四届蓝桥杯. 接龙数列(线性DP)