RabbitMmq基础(三)入门应用
目录
Rabbitmq基础(一)Rabbitmq简介
Rabbitmq基础(二)安装与配置
1. RabbitMQ 的工作模式
1.1 Work queues 工作队列模式和简单模式
1.1.1 模式说明
- Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。但是逻辑是相同的。
- 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
1.1.2 代码实现
生产者:
/*** 发送消息*/
public class Producer_HelloWorld {public static void main(String[] args) throws IOException, TimeoutException {//1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();//2. 设置参数factory.setHost("192.168.1.1");//ip 默认值 localhostfactory.setPort(5672); //端口 默认值 5672factory.setVirtualHost("/demo");//虚拟机 默认值/factory.setUsername("yy");//用户名 默认 guestfactory.setPassword("abc123");//密码 默认值 guest//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();//5. 创建队列Queue/*queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)参数:1. queue:队列名称2. durable:是否持久化,当mq重启之后,还在3. exclusive:* 是否独占。只能有一个消费者监听这队列* 当Connection关闭时,是否删除队列*4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉5. arguments:参数。*///如果没有一个名字叫hello_world的队列,则会创建该队列,如果有则不会创建channel.queueDeclare("hello_world",true,false,false,null);/*basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)参数:1. exchange:交换机名称。简单模式下交换机会使用默认的 ""2. routingKey:路由名称3. props:配置信息4. body:发送消息数据*/String body = "hello rabbitmq~~~";//6. 发送消息channel.basicPublish("","hello_world",null,body.getBytes());//7.释放资源channel.close();connection.close();}
}
消费者:
省略多余的代码
//连接参数同上Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare("hello_world",true,false,false,null);// 接收消息Consumer consumer = new DefaultConsumer(channel){/*回调方法,当收到消息后,会自动执行该方法1. consumerTag:标识2. envelope:获取一些信息,交换机,路由key...3. properties:配置信息4. body:数据*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));}};channel.basicConsume("hello_world",true,consumer);//关闭资源?不要 因为消费者是监听功能,需要一直开启
1.2 Pub/Sub 订阅模式
1.2.1 模式说明
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
- P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
- C:消费者,消息的接收者,会一直等待消息到来
- Queue:消息队列,接收消息、缓存消息
- Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、
递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。 - Exchange有常见以下3种类型:
➢ Fanout:广播,将消息交给所有绑定到交换机的队列
➢ Direct:定向,把消息交给符合指定routing key 的队列
➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合
路由规则的队列,那么消息会丢失!
代码实现:
生产者:
//连接参数同上//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String exchangeName = "test_fanout";//5. 创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);//6. 创建队列String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*/channel.queueBind(queue1Name,exchangeName,"");channel.queueBind(queue2Name,exchangeName,"");String body = "日志信息:张三调用了findAll方法...日志级别:info...";//8. 发送消息channel.basicPublish(exchangeName,"",null,body.getBytes());//9. 释放资源channel.close();connection.close();
消费者:
需要多个消费者,这里只写一个
//连接参数同上//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name = "test_fanout_queue1";String queue2Name = "test_fanout_queue2";// 接收消息Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("properties:"+properties);*/System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};channel.basicConsume(queue1Name,true,consumer);
1.3 Routing 路由模式
1.3.1 模式说明
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)
- 消息的发送方在向 Exchange 发送消息时,也必须指定消息的 RoutingKey
- Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列的Routingkey 与消息的 Routing key 完全一致,才会接收到消息
- P:生产者,向 Exchange 发送消息,发送消息时,会指定一个routing key
- X:Exchange(交换机),接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列
- C1:消费者,其所在队列指定了需要 routing key 为 error 的消息
- C2:消费者,其所在队列指定了需要 routing key 为 info、error、warning 的消息
生产者:
//连接参数同上//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1. exchange:交换机名称2. type:交换机类型DIRECT("direct"),:定向FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。TOPIC("topic"),通配符的方式HEADERS("headers");参数匹配3. durable:是否持久化4. autoDelete:自动删除5. internal:内部使用。 一般false6. arguments:参数*/String exchangeName = "test_direct";//5. 创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);//6. 创建队列String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*///队列1绑定 errorchannel.queueBind(queue1Name,exchangeName,"error");//队列2绑定 info error warningchannel.queueBind(queue2Name,exchangeName,"info");channel.queueBind(queue2Name,exchangeName,"error");channel.queueBind(queue2Name,exchangeName,"warning");String body = "日志信息:张三调用了delete方法...出错误了。。。日志级别:error...";//8. 发送消息channel.basicPublish(exchangeName,"warning",null,body.getBytes());//9. 释放资源channel.close();connection.close();
消费者:
//连接参数同上//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name = "test_direct_queue1";String queue2Name = "test_direct_queue2";// 接收消息Consumer consumer = new DefaultConsumer(channel){ @Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息打印到控制台.....");}};channel.basicConsume(queue2Name,true,consumer);
1.4 Topics 通配符模式
1.4.1 模式说明
- Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!
- Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
- 通配符规则:# 匹配一个或多个词,* 匹配不多不少恰好1个词,例如:item.# 能够匹配 item.insert.abc 或者 item.insert,item.* 只能匹配 item.insert
图解:
红色 Queue:绑定的是 usa.# ,因此凡是以 usa. 开头的 routing key 都会被匹配到
黄色 Queue:绑定的是 #.news ,因此凡是以 .news 结尾的 routing key 都会被匹配
1.4.2 代码实现
生产者:
//设置参数同上
Connection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();/*exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)参数:1. exchange:交换机名称2. type:交换机类型DIRECT("direct"),:定向FANOUT("fanout"),:扇形(广播),发送消息到每一个与之绑定队列。TOPIC("topic"),通配符的方式HEADERS("headers");参数匹配3. durable:是否持久化4. autoDelete:自动删除5. internal:内部使用。 一般false6. arguments:参数*/String exchangeName = "test_topic";//5. 创建交换机channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);//6. 创建队列String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";channel.queueDeclare(queue1Name,true,false,false,null);channel.queueDeclare(queue2Name,true,false,false,null);//7. 绑定队列和交换机/*queueBind(String queue, String exchange, String routingKey)参数:1. queue:队列名称2. exchange:交换机名称3. routingKey:路由键,绑定规则如果交换机的类型为fanout ,routingKey设置为""*/// routing key 系统的名称.日志的级别。//=需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库channel.queueBind(queue1Name,exchangeName,"#.error");channel.queueBind(queue1Name,exchangeName,"order.*");channel.queueBind(queue2Name,exchangeName,"*.*");String body = "日志信息:张三调用了findAll方法...日志级别:info...";//8. 发送消息channel.basicPublish(exchangeName,"goods.error",null,body.getBytes());//9. 释放资源channel.close();connection.close();
消费者:
//连接参数同上//3. 创建连接 ConnectionConnection connection = factory.newConnection();//4. 创建ChannelChannel channel = connection.createChannel();String queue1Name = "test_topic_queue1";String queue2Name = "test_topic_queue2";// 接收消息Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("body:"+new String(body));System.out.println("将日志信息存入数据库.......");}};channel.basicConsume(queue1Name,true,consumer);
如果喜欢请点赞,收藏
RabbitMmq基础(三)入门应用相关推荐
- 零基础数据挖掘入门系列(三) - 数据清洗和转换技巧
思维导图:零基础入门数据挖掘的学习路径 1. 写在前面 零基础入门数据挖掘是记录自己在Datawhale举办的数据挖掘专题学习中的所学和所想, 该系列笔记使用理论结合实践的方式,整理数据挖掘相关知识, ...
- 零基础快速入门SpringBoot2.0教程 (三)
一.SpringBoot Starter讲解 简介:介绍什么是SpringBoot Starter和主要作用 1.官网地址:https://docs.spring.io/spring-boot/doc ...
- 零基础JavaScript入门(第三天)
一.JavaScript 流程控制-循环 1.循环 循环目的 :实际问题中,有许多具有规律性的重复操作,因此在程序中要完成这类操作就需要重复执行某些语句 JS 中的循环 在Js 中,主要有三种类型的循 ...
- 如何零基础学习python语言_零基础如何入门Python语言?有哪些学习建议?
众所周知,Python目前是最受欢迎的编程语言之一,尤其是对于零基础的初学者来说,Python语言更是十分的友好.因此,不少初学者常常会有这样一个共同的疑惑,零基础如何入门Python语言?本文就来给 ...
- ui设计培训需要什么基础?如何入门学习?
UI设计是一种直观面向用户的一个技术岗位,在互联网公司,UI设计岗位是不可或缺的,那么对于零基础想要学习UI设计的同学来说,ui设计培训需要什么基础?如何入门学习呢?我们来看看下面的详细介绍. ...
- 零基础AJAX入门(含Demo演示源文件)
零基础AJAX入门(含Demo演示源文件) 作者:一点一滴的Beer 个人主页:http://www.cnblogs.com/beer 摘要:因为笔者的大四毕业设计是做WebGIS系统,用过Web版 ...
- 零基础学python语言_零基础如何入门Python语言?有哪些学习建议?
众所周知,Python目前是最受欢迎的编程语言之一,尤其是对于零基础的初学者来说,Python语言更是十分的友好.因此,不少初学者常常会有这样一个共同的疑惑,零基础如何入门Python语言?本文就来给 ...
- 零基础自学python看什么书-零基础Python入门看哪本书好?这里有答案
原标题:零基础Python入门看哪本书好?这里有答案 Python入门看哪本书好呢?Python入门不知道该选哪本书?Python入门没有一本好书引导,会很难吗?你还在为这些问题困扰吗?今天小编就来解 ...
- python图形编程基础-Python从基础到入门系列教程
本教程集合了Python基础&系统管理,从基础到入门,带你走进Python世界!对Python有兴趣的可以学习一下哦基础系列:1.课程简介2.Python下载和安装3.IDLE使用简介4.第1 ...
- SQL基础使用入门(二): DML语句和DCL语句
SQL语句第二个类别--DML 语句 DML是数据操作语言的缩写,主要用来对数据表中数据记录实例对象进行操作,包括插入.删除.查找以及修改四大操作,这也是开发人员使用中最为频繁的操作. 1.插入记录 ...
最新文章
- 字节跳动的5条远程办公最佳实践
- 2017-2018 2 20179214《网络实践攻防》第三周作业(二)
- vue通过监听实现相同路径的视图重新加载
- php验证时区是否存在,php – 验证来自不同网站的时区名称?
- PolarDB-X 2.0:使用一个透明的分布式数据库是一种什么体验?
- aftool提示15天未更新_微信版本更新至7.0.15 视频号新增3种功能
- 江湖急诏令:腾讯数据库王者挑战赛赏金万两募英豪!
- 关于jsp页面显示的时间格式和Oracle数据库中的格式不一样的问题
- Lesson 1- exchange 2010 installing
- 修改mysql默认字符集为latin1_修改MYSQL默认编码为UTF8
- 手机网速正常电脑很慢_路由器WiFi速度正常,但电脑和手机上网速度慢怎么办?...
- IMDG产品功能扩展
- 三态门及其在I2C总线中的应用_普中_89C52单片机
- 微信域名防封,微信网址域名防封的几种办法
- ocx 访问 html,HTML 加载ocx VB编写的控件
- 《论语》原文及其全文翻译 学而篇8
- 【C语言进阶17——程序环境和预处理】
- 即将一起变革的区块链项目xx network
- 湖南大学.大学物理实验5:示波器的使用
- Windows Azure安全概述
热门文章
- 域名抢注代码_如何停止域名抢注攻击
- Spring Cloud Stream初窥
- Elasticsearch 父子关系
- 在境内服务器、虚拟主机上运行未备案域名方法,cf worker反向代理
- 系统优化怎么做-开篇
- 英特尔第二代神经计算棒(Intel Neural Compute Stick 2)相关测试
- windows linux 双系统启动,windows linux 双系统默认启动windows 的几种方法
- 用自己的设备跑各种VI-SLAM算法(1)——VINS/PL-VINS/ROVIO/MSCKF
- matlab 安装coder工具包,matlab coder 工具箱使用教程
- 域名备案和网站备案有什么区别?