RabbitMQ 学习开发笔记
基本概念
ConnectionFactory、Connection、Channel
ConnectionFactory、Connection、Channel,这三个都是RabbitMQ对外提供的API中最基本的对象。不管是服务器端还是客户端都会首先创建这三类对象。
ConnectionFactory为Connection的制造工厂。
Connection是与RabbitMQ服务器的socket链接,它封装了socket协议及身份验证相关部分逻辑。
Channel是我们与RabbitMQ打交道的最重要的一个接口,大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。
Queue
Queue(队列)是RabbitMQ的内部对象,用于存储消息
RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。
队列是有Channel声明的,而且这个操作是幂等的。同名的队列多次声明也只会创建一次。我们发送消息就是想这个声明的队列里发送消息
工作队列的主要思想是不用等待资源密集型的任务处理完成,
为了确保消息或者任务不会丢失,rabbitmq 支持消息确信 ACK。ACK机制是消费者端从rabbitmq收到消息并处理完成后,反馈给rabbitmq,rabbitmq收到反馈信息后将消息从队列中删除
如果rabbitmq向消费者改善消息时,消费者服务器挂了,消息也不会超时,即使一个消息需要非常长的时间处理,也不会导致消息超时,永远不会从rabbitmq中删除,
忘记通过basicAck返回确认信息是个严重的错误
rabbitmq不允许重新定义一个已有的队列信息
QueueingConsumer
队列消费者,用于监听队列中的消息。调用nextDelivery方法时,内部实现就是调用队列的take方法。该方法的作用:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。说白了就是如果没有消息,就处于阻塞状态。
RabbitMQ 笔记-Exchanges
Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。
有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。
Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。
生产者: // 声明交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 发送消息for (String severity : routingKeys) {String message = "Send the message level: " + severity;channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}消费者// 声明交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 获取匿名队列名称String queueName = channel.queueDeclare().getQueue();// 根据路由关键字进行多重绑定for (String severity : routingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, severity);System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);}
Fanout exchange: 会向响应的queue广播。
生产者channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 分发消息for (int i=0; i<5; i++) {String message = "Hello World!" + i;channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}消费者channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");
Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。
生产者// 声明一个匹配模式的交换器channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 待发送的消息String routingKeys[] = new String[]{"quick.orange.rabbit","lazy.orange.elephant","quick.orange.fox","lazy.brown.fox","quick.brown.fox","quick.orange.male.rabbit","lazy.orange.male.rabbit"};// 发送消息for (String severity : routingKeys) {String message = "From "+severity+" routingKey' s message!";channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());System.out.println("TopicSend [x] Sent '" + severity + "':'" + message + "'");}消费者// 声明一个匹配模式的交换器channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();// 路由关键字String routingKeys[] = new String[] {"*.orange.*"};// 绑定路由关键字for (String bindingKey : routingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);}
匿名: 直接发送到queue。
生产者for (int i=0; i<5; i++) {String message = "hello world! " + i;channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}消费者channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
RabbitMQ 笔记-RPC
RabbitMQ中实现RPC的机制是:
- 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
- 服务器端收到消息并处理
- 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
- 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
public class RPCClient {private static final String RPC_QUEUE_NAME = "rpc_queue";private Connection connection;private Channel channel;private String replyQueueName;private QueueingConsumer consumer;public RPCClient() throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置MabbitMQ所在主机ip或者主机名factory.setHost("192.168.65.136");factory.setUsername("rabbitmq");factory.setPassword("123456");// 创建一个连接connection = factory.newConnection();// 创建一个频道channel = connection.createChannel();//声明队列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);//为每一个客户端获取一个随机的回调队列replyQueueName = channel.queueDeclare().getQueue();//为每一个客户端创建一个消费者(用于监听回调队列,获取结果)consumer = new QueueingConsumer(channel);//消费者与队列关联channel.basicConsume(replyQueueName, true, consumer);}/*** 获取斐波列其数列的值** @param message* @return* @throws Exception*/public String call(String message) throws Exception{String response = null;String corrId = java.util.UUID.randomUUID().toString();//设置replyTo和correlationId属性值BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();//发送消息到rpc_queue队列channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());while (true) {System.out.println("OK?");QueueingConsumer.Delivery delivery = consumer.nextDelivery();System.out.println("OK");if (delivery.getProperties().getCorrelationId().equals(corrId)) {response = new String(delivery.getBody(),"UTF-8");break;}}return response;}public static void main(String[] args) throws Exception {RPCClient fibonacciRpc = new RPCClient();String result = fibonacciRpc.call("4");System.out.println( "fib(4) is " + result);} }
public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置MabbitMQ所在主机ip或者主机名factory.setHost("192.168.65.136");factory.setUsername("rabbitmq");factory.setPassword("123456");// 创建一个连接Connection connection = factory.newConnection();// 创建一个频道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);//限制:每次最多给一个消费者发送1条消息channel.basicQos(1);//为rpc_queue队列创建消费者,用于处理请求QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(RPC_QUEUE_NAME, false, consumer);System.out.println(" [x] Awaiting RPC requests");while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();//获取请求中的correlationId属性值,并将其设置到结果消息的correlationId属性中BasicProperties props = delivery.getProperties();BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();//获取回调队列名字String callQueueName = props.getReplyTo();String message = new String(delivery.getBody(),"UTF-8");System.out.println(" [.] fib(" + message + ")");//获取结果String response = "" + fib(Integer.parseInt(message));//先发送回调结果channel.basicPublish("", callQueueName, replyProps,response.getBytes());//后手动发送消息反馈channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}/*** 计算斐波列其数列的第n项** @param n* @return* @throws Exception*/private static int fib(int n) throws Exception {if (n < 0)throw new Exception("参数错误,n必须大于等于0");if (n == 0)return 0;if (n == 1)return 1;return fib(n - 1) + fib(n - 2);} }
转载于:https://www.cnblogs.com/m2492565210/p/7931972.html
RabbitMQ 学习开发笔记相关推荐
- Android Studio学习开发笔记--基础
关于进阶项目篇,点击这里 文章目录 前言 构建首个应用 运行 第一次可能会出现的问题 想要重新下载安装Android Studio 文件在哪里编辑 android基础--控件 基础属性 带阴影的Tex ...
- QT学习开发笔记(项目实战之智能家居物联 UI 界面开发 )
智能家居物联 UI 界面开发 项目路径为 4/01_smarthome/01_smarthome/01_smarthome.pro,先看项目界面.项目界面如 下,采用暗黑主题设计,结合黄色作为亮色,让 ...
- QT学习开发笔记(UDP通信)
UDP 通信 11.3.1 UDP 简介 UDP(User Datagram Protocol 即用户数据报协议)是一个轻量级的,不可靠的,面向数据 报的无连接协议.我们日常生活中使用的 QQ,其聊天 ...
- RabbitMQ 学习笔记
RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...
- Rabbitmq学习笔记(尚硅谷2021)
Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...
- RabbitMQ学习笔记(高级篇)
RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...
- Rabbitmq学习笔记教程-尚硅谷
Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...
- RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)
RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...
- Android初级开发笔记-- activity启动模式的学习(1)
第一次学习Android中一个很重要的概念,启动模式.文章记录的也只是一些入门知识,随着学习的深入还会有activity启动模式的学习(2)和(3). 下面分三个小点说一下对启动模式的理解区别以及如何 ...
- RabbitMQ学习笔记(3)----RabbitMQ Worker的使用
1. Woker队列结构图 这里表示一个生产者生产了消息发送到队列中,但是确有两个消费者在消费同一个队列中的消息. 2. 创建一个生产者 Producer如下: package com.wangx.r ...
最新文章
- 24点游戏c语言链表做法,C语言实现24点程序(示例代码)
- matlab看fft帮助,日记 [2009年06月02日] MATLAB FFT HELP 帮助文档及我的翻译
- 意念实时转语音!Facebook的非植入式脑机接口,解码准确率达到76%
- Swagger3.0
- count sort, radix sort, bucket sort
- js遍历Object所有属性
- spirng mvc 中使用验证码
- mathmagic pro mac使用教程|快速地创建任何方程
- Query Layer介绍
- stm32产生100k时钟信号_stm32f105/107系统时钟变慢
- 工程选择LibGdx--开发环境搭建Strut2教程-java教程
- Call to your teacher(深度搜索)
- ThinkPHP商城系统与外部系统用户互通,集成UCenter
- 以云服务器产品为例,深度分析比对华为云、阿里云、腾讯云
- nginx: [emerg] CreateFile() “D:\项目资料\nginx-1.12.2/conf/nginx.conf“ failed (1113: No mapping for t
- “囍”博物馆与Interesting 有点意思
- 分类算法SVM(支持向量机)
- 二叉搜索树(BST)——基本概念及基本实现代码
- A2F-轻量级SISR网络 | Lightweight Single-Image Super-Resolution Network with Attentive Auxiliary Feature
- mysql.data.dll 位置_MySqlData.dll,下载,简介,描述,修复,等相关问题一站搞定_DLL之家...
热门文章
- Android getReadableDatabase() 和 getWritableDatabase()
- hb:一个简单的 http/web bench 工具
- Oracle常用SQL总结
- 百度地图出现网格,不显示地图
- C#中获取本机IP地址,子网掩码,网关地址
- [转]Android学习系列(1)--为App签名(为apk签名)
- .net中模拟键盘和鼠标操作
- STM32CubeMX使用(七)之通用定时器和系统定时器
- linux下source filename,./filename,. filename,......
- Zabbix Windos agent 安装