基本概念

ConnectionFactory、Connection、Channel

ConnectionFactory、Connection、Channel,这三个都是RabbitMQ对外提供的API中最基本的对象。不管是服务器端还是客户端都会首先创建这三类对象。
       ConnectionFactory为Connection的制造工厂。

Connection是与RabbitMQ服务器的socket链接,它封装了socket协议及身份验证相关部分逻辑。

Channel是我们与RabbitMQ打交道的最重要的一个接口,大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

Queue

Queue(队列)是RabbitMQ的内部对象,用于存储消息

  RabbitMQ中的消息都只能存储在Queue中,生产者生产消息并最终投递到Queue中,消费者可以从Queue中获取消息并消费。

  队列是有Channel声明的,而且这个操作是幂等的。同名的队列多次声明也只会创建一次。我们发送消息就是想这个声明的队列里发送消息

  工作队列的主要思想是不用等待资源密集型的任务处理完成,

为了确保消息或者任务不会丢失,rabbitmq 支持消息确信 ACK。ACK机制是消费者端从rabbitmq收到消息并处理完成后,反馈给rabbitmq,rabbitmq收到反馈信息后将消息从队列中删除

  如果rabbitmq向消费者改善消息时,消费者服务器挂了,消息也不会超时,即使一个消息需要非常长的时间处理,也不会导致消息超时,永远不会从rabbitmq中删除,

忘记通过basicAck返回确认信息是个严重的错误

rabbitmq不允许重新定义一个已有的队列信息

QueueingConsumer

队列消费者,用于监听队列中的消息。调用nextDelivery方法时,内部实现就是调用队列的take方法。该方法的作用:获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。说白了就是如果没有消息,就处于阻塞状态。

RabbitMQ 笔记-Exchanges

  Procuder Publish的Message进入了Exchange。接着通过“routing keys”, RabbitMQ会找到应该把这个Message放到哪个queue里。queue也是通过这个routing keys来做的绑定。

有三种类型的Exchanges:direct, fanout,topic。 每个实现了不同的路由算法(routing algorithm)。

  Direct exchange: 如果 routing key 匹配, 那么Message就会被传递到相应的queue中。其实在queue创建时,它会自动的以queue的名字作为routing key来绑定那个exchange。

生产者:     // 声明交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 发送消息for (String severity : routingKeys) {String message = "Send the message level: " + severity;channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());System.out.println(" [x] Sent '" + severity + "':'" + message + "'");}消费者// 声明交换器channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 获取匿名队列名称String queueName = channel.queueDeclare().getQueue();// 根据路由关键字进行多重绑定for (String severity : routingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, severity);System.out.println("ReceiveLogsDirect1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + severity);}

  Fanout exchange: 会向响应的queue广播。

生产者channel.exchangeDeclare(EXCHANGE_NAME, "fanout");// 分发消息for (int i=0; i<5; i++) {String message = "Hello World!" + i;channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}消费者channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");

  Topic exchange: 对key进行模式匹配,比如ab*可以传递到所有ab*的queue。

生产者// 声明一个匹配模式的交换器channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 待发送的消息String routingKeys[] = new String[]{"quick.orange.rabbit","lazy.orange.elephant","quick.orange.fox","lazy.brown.fox","quick.brown.fox","quick.orange.male.rabbit","lazy.orange.male.rabbit"};// 发送消息for (String severity : routingKeys) {String message = "From "+severity+" routingKey' s message!";channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());System.out.println("TopicSend [x] Sent '" + severity + "':'" + message + "'");}消费者// 声明一个匹配模式的交换器channel.exchangeDeclare(EXCHANGE_NAME, "topic");String queueName = channel.queueDeclare().getQueue();// 路由关键字String routingKeys[] = new String[] {"*.orange.*"};// 绑定路由关键字for (String bindingKey : routingKeys) {channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);System.out.println("ReceiveLogsTopic1 exchange:"+EXCHANGE_NAME+", queue:"+queueName+", BindRoutingKey:" + bindingKey);}

  匿名: 直接发送到queue。

生产者for (int i=0; i<5; i++) {String message = "hello world! " + i;channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}消费者channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

RabbitMQ 笔记-RPC

RabbitMQ中实现RPC的机制是:

  • 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
  • 服务器端收到消息并处理
  • 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
  • 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理
public class RPCClient {private static final String RPC_QUEUE_NAME = "rpc_queue";private Connection connection;private Channel channel;private String replyQueueName;private QueueingConsumer consumer;public RPCClient() throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置MabbitMQ所在主机ip或者主机名factory.setHost("192.168.65.136");factory.setUsername("rabbitmq");factory.setPassword("123456");// 创建一个连接connection = factory.newConnection();// 创建一个频道channel = connection.createChannel();//声明队列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);//为每一个客户端获取一个随机的回调队列replyQueueName = channel.queueDeclare().getQueue();//为每一个客户端创建一个消费者(用于监听回调队列,获取结果)consumer = new QueueingConsumer(channel);//消费者与队列关联channel.basicConsume(replyQueueName, true, consumer);}/*** 获取斐波列其数列的值** @param message* @return* @throws Exception*/public String call(String message) throws Exception{String response = null;String corrId = java.util.UUID.randomUUID().toString();//设置replyTo和correlationId属性值BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();//发送消息到rpc_queue队列channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes());while (true) {System.out.println("OK?");QueueingConsumer.Delivery delivery = consumer.nextDelivery();System.out.println("OK");if (delivery.getProperties().getCorrelationId().equals(corrId)) {response = new String(delivery.getBody(),"UTF-8");break;}}return response;}public static void main(String[] args) throws Exception {RPCClient fibonacciRpc = new RPCClient();String result = fibonacciRpc.call("4");System.out.println( "fib(4) is " + result);}
}

public class RPCServer {private static final String RPC_QUEUE_NAME = "rpc_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();// 设置MabbitMQ所在主机ip或者主机名factory.setHost("192.168.65.136");factory.setUsername("rabbitmq");factory.setPassword("123456");// 创建一个连接Connection connection = factory.newConnection();// 创建一个频道Channel channel = connection.createChannel();//声明队列channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);//限制:每次最多给一个消费者发送1条消息channel.basicQos(1);//为rpc_queue队列创建消费者,用于处理请求QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(RPC_QUEUE_NAME, false, consumer);System.out.println(" [x] Awaiting RPC requests");while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();//获取请求中的correlationId属性值,并将其设置到结果消息的correlationId属性中BasicProperties props = delivery.getProperties();BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();//获取回调队列名字String callQueueName = props.getReplyTo();String message = new String(delivery.getBody(),"UTF-8");System.out.println(" [.] fib(" + message + ")");//获取结果String response = "" + fib(Integer.parseInt(message));//先发送回调结果channel.basicPublish("", callQueueName, replyProps,response.getBytes());//后手动发送消息反馈channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}/*** 计算斐波列其数列的第n项** @param n* @return* @throws Exception*/private static int fib(int n) throws Exception {if (n < 0)throw new Exception("参数错误,n必须大于等于0");if (n == 0)return 0;if (n == 1)return 1;return fib(n - 1) + fib(n - 2);}
}

转载于:https://www.cnblogs.com/m2492565210/p/7931972.html

RabbitMQ 学习开发笔记相关推荐

  1. Android Studio学习开发笔记--基础

    关于进阶项目篇,点击这里 文章目录 前言 构建首个应用 运行 第一次可能会出现的问题 想要重新下载安装Android Studio 文件在哪里编辑 android基础--控件 基础属性 带阴影的Tex ...

  2. QT学习开发笔记(项目实战之智能家居物联 UI 界面开发 )

    智能家居物联 UI 界面开发 项目路径为 4/01_smarthome/01_smarthome/01_smarthome.pro,先看项目界面.项目界面如 下,采用暗黑主题设计,结合黄色作为亮色,让 ...

  3. QT学习开发笔记(UDP通信)

    UDP 通信 11.3.1 UDP 简介 UDP(User Datagram Protocol 即用户数据报协议)是一个轻量级的,不可靠的,面向数据 报的无连接协议.我们日常生活中使用的 QQ,其聊天 ...

  4. RabbitMQ 学习笔记

    RabbitMQ 学习笔记 RabbitMQ 学习笔记 1. 中间件 1.1 什么是中间件 1.2 为什么要使用消息中间件 1.3 中间件特点 1.4 在项目中什么时候使用中间件技术 2. 中间件技术 ...

  5. Rabbitmq学习笔记(尚硅谷2021)

    Rabbitmq学习笔记 (尚硅谷) 1.MQ 的概念 1.1 什么是 MQ? 1.2 为什么要用 MQ? 削峰 解耦 异步 1.3 MQ 的分类 ActiveMQ Kafka RocketMQ Ra ...

  6. RabbitMQ学习笔记(高级篇)

    RabbitMQ学习笔记(高级篇) 文章目录 RabbitMQ学习笔记(高级篇) RabbitMQ的高级特性 消息的可靠投递 生产者确认 -- confirm确认模式 生产者确认 -- return确 ...

  7. Rabbitmq学习笔记教程-尚硅谷

    Rabbitmq学习笔记 (尚硅谷) 尚硅谷 rabbitmq 教程 1.MQ 的概念 1.1 什么是 MQ? 存放消息的队列,互联网架构中常见的一种服务与服务之间通信的方式. 1.2 为什么要用 M ...

  8. RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决)

    RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) 参考文章: (1)RabbitMQ学习笔记四:RabbitMQ命令(附疑难问题解决) (2)https://www.cnblogs. ...

  9. Android初级开发笔记-- activity启动模式的学习(1)

    第一次学习Android中一个很重要的概念,启动模式.文章记录的也只是一些入门知识,随着学习的深入还会有activity启动模式的学习(2)和(3). 下面分三个小点说一下对启动模式的理解区别以及如何 ...

  10. RabbitMQ学习笔记(3)----RabbitMQ Worker的使用

    1. Woker队列结构图 这里表示一个生产者生产了消息发送到队列中,但是确有两个消费者在消费同一个队列中的消息. 2. 创建一个生产者 Producer如下: package com.wangx.r ...

最新文章

  1. 24点游戏c语言链表做法,C语言实现24点程序(示例代码)
  2. matlab看fft帮助,日记 [2009年06月02日] MATLAB FFT HELP 帮助文档及我的翻译
  3. 意念实时转语音!Facebook的非植入式脑机接口,解码准确率达到76%
  4. Swagger3.0
  5. count sort, radix sort, bucket sort
  6. js遍历Object所有属性
  7. spirng mvc 中使用验证码
  8. mathmagic pro mac使用教程|快速地创建任何方程
  9. Query Layer介绍
  10. stm32产生100k时钟信号_stm32f105/107系统时钟变慢
  11. 工程选择LibGdx--开发环境搭建Strut2教程-java教程
  12. Call to your teacher(深度搜索)
  13. ThinkPHP商城系统与外部系统用户互通,集成UCenter
  14. 以云服务器产品为例,深度分析比对华为云、阿里云、腾讯云
  15. nginx: [emerg] CreateFile() “D:\项目资料\nginx-1.12.2/conf/nginx.conf“ failed (1113: No mapping for t
  16. “囍”博物馆与Interesting 有点意思
  17. 分类算法SVM(支持向量机)
  18. 二叉搜索树(BST)——基本概念及基本实现代码
  19. A2F-轻量级SISR网络 | Lightweight Single-Image Super-Resolution Network with Attentive Auxiliary Feature
  20. mysql.data.dll 位置_MySqlData.dll,下载,简介,描述,修复,等相关问题一站搞定_DLL之家...

热门文章

  1. Android getReadableDatabase() 和 getWritableDatabase()
  2. hb:一个简单的 http/web bench 工具
  3. Oracle常用SQL总结
  4. 百度地图出现网格,不显示地图
  5. C#中获取本机IP地址,子网掩码,网关地址
  6. [转]Android学习系列(1)--为App签名(为apk签名)
  7. .net中模拟键盘和鼠标操作
  8. STM32CubeMX使用(七)之通用定时器和系统定时器
  9. linux下source filename,./filename,. filename,......
  10. Zabbix Windos agent 安装