RabbitMq队列 queue
目录
RabbitMq队列
消息确认机制
负载均衡
生产者代码
消费者1
消费者2
RabbitMq队列
在上篇文章中讲了mq的队列,这篇用代码实现。在例子中存在一个生产者,和两个消费者。生产者将生产的消息传递给队列(queue),由消费者一、消费者二区消费。
消息确认机制
在处理消息的过程中,消费者由于服务器、网络、网卡等原因出现故障不能接受消息,那可能这条正在处理的消息或者任务就没有完成,就会失去这个消息和任务。 rabbitmq为了确保消息或者任务不会丢失,RabbitMQ提供了消息确认机制ACK。
ACK是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。但是如果消费者由于网络不稳定、服务器异常等原因在处理消息时挂掉,那么他就不会有ACK确认反馈,RabbitMQ会认为这个消息没有正常消费,会将此消息重新放入队列中。如果有其他消费者同时在线,RabbitMQ会立即将这个消息推送给这个在线的消费者。这种机制保证了在消费者服务器故障的时候,能不丢失任何消息和任务。
消息的ACK确认机制默认是打开的。在上面的代码中,我们显示返回autoAck=true 这个标签。
负载均衡
在正常情况下,队列是将消息随机分配给每一个消费者,这时候就有可能出现分配不均的问题。这时候mq不会负责调度消息,不会根据确认机制来分析哪一个消费者确认慢。这时候为了解决这个问题可以在代码中设置 prefetchcount = 1。这个设置告诉RabbitMQ,不要一次将多个消息发送给一个消费者。这样做的好处是只有当消费者处理完成当前消息并反馈后,才会收到另外一条消息或任务。这样就避免了负载不均衡的事情了
注意:如果服务器所有消费者负载都很高,你的队列很可能会被塞满。这时我们就要考虑增加更多的消费者或者其他方案进行解决
channel.queueDeclare(QUEUE_NAME, true, false, false, null);//确定获取数量channel.basicQos(1);
生产者代码
package com.ll.mq.hellomq.queue;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;/*** * @author ll 生产者**/
public class Producer {public final static String QUENE_NAME = "hello";// 定义队列名称public static void main(String[] args) {try {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();// RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");// 创建一个连接Connection connection = factory.newConnection();// 创建一个频道Channel channel = connection.createChannel();//设置为持久化channel.queueDeclare(QUENE_NAME, true, false, false, null);// 发送消息到队列中for(int i = 0 ; i < 6; i++){String message = "Hello mq! " + i;channel.basicPublish("", QUENE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());System.out.println(" [Producer] Sent '" + message + "'");}// 关闭频道和连接channel.close();connection.close();} catch (Exception e) {e.printStackTrace();} }}
消费者1
package com.ll.mq.hellomq.queue;import java.io.IOException;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;/*** * @author ll ConsumerOne **/
public class ConsumerOne {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");
// 创建一个连接Connection connection = factory.newConnection();
// 创建一个频道final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, true, false, false, null);// DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDeliveryDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ConsumerOne [x] Received '" + message + "'");try {work(message);} finally {System.out.println("ConsumerOne [x] Done");// 消息处理完成确认channel.basicAck(envelope.getDeliveryTag(), false);}}};
// 自动回复队列应答channel.basicConsume(QUEUE_NAME, false, consumer);}//睡眠public static void work(String task) {try {System.out.println("task++++========"+task);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}
}
运行结果:
ConsumerOne [x] Received 'Hello mq! 1'
task++++========Hello mq! 1
ConsumerOne [x] Done
ConsumerOne [x] Received 'Hello mq! 4'
task++++========Hello mq! 4
ConsumerOne [x] Done
消费者2
package com.ll.mq.hellomq.queue;import java.io.IOException;import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
/*** * @author ll ConsumerTwo **/
public class ConsumerTwo {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {// 创建连接工厂ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ地址factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("kysc");factory.setPassword("123456");
// 创建一个连接Connection connection = factory.newConnection();
// 创建一个频道final Channel channel = connection.createChannel();
// 声明要关注的队列 -- 在RabbitMQ中,队列声明是幂等性的(一个幂等操作的特点是其任意多次执行所产生的影响均与一次执行的影响相同),也就是说,如果不存在,就创建,如果存在,不会对已经存在的队列产生任何影响。channel.queueDeclare(QUEUE_NAME, true, false, false, null);// DefaultConsumer类实现了Consumer接口,通过传入一个频道,告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDeliveryDefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("ConsumerTwo [x] Received '" + message + "'");try {work(message);} finally {System.out.println("ConsumerTwo [x] Done");// 消息处理完成确认channel.basicAck(envelope.getDeliveryTag(), false);}}};
// 自动回复队列应答channel.basicConsume(QUEUE_NAME, false, consumer);}//睡眠public static void work(String task) {try {System.out.println("task++++========"+task);Thread.sleep(1000);} catch (Exception e) {e.printStackTrace();}}
}
运行结果
ConsumerTwo [x] Received 'Hello mq! 2'
task++++========Hello mq! 2
ConsumerTwo [x] Done
ConsumerTwo [x] Received 'Hello mq! 5'
task++++========Hello mq! 5
ConsumerTwo [x] Done
参考 https://www.rabbitmq.com/api-guide.html
下一篇 发布订阅 https://blog.csdn.net/lilongwangyamin/article/details/105112696
RabbitMq队列 queue相关推荐
- Day10-Python3基础-协程、异步IO、redis缓存、rabbitMQ队列
内容目录: Gevent协程 Select\Poll\Epoll异步IO与事件驱动 Python连接Mysql数据库操作 RabbitMQ队列 Redis\Memcached缓存 Paramiko S ...
- python rabitmq_python RabbitMQ队列使用
原博文 2019-01-17 21:17 − python RabbitMQ队列使用 关于python的queue介绍 关于python的队列,内置的有两种,一种是线程queue,另一种是进程queu ...
- RabbitMQ 队列消息持久化
参考链接: https://www.cnblogs.com/Keep-Ambition/p/8044752.html 假如消息队列test里面还有消息等待消费者(consumers)去接收,但是这个时 ...
- RabbitMQ 声明Queue时的参数们的Power
RabbitMQ 声明Queue时的参数们的Power 参数们的Power 在声明队列的时候会有很多的参数 public static QueueDeclareOk QueueDeclare(this ...
- rabbitmq队列中消息过期配置
最近公司某个行情推送的rabbitmq服务器由于客户端异常导致rabbitmq队列中消息快速堆积,还曾导致过内存积压导致rabbitmq客户端被block的情况.考虑到行情信息从业务上来说可以丢失部分 ...
- Python开发【十一章】:RabbitMQ队列
RabbitMQ队列 rabbitMQ是消息队列:想想之前的我们学过队列queue:threading queue(线程queue,多个线程之间进行数据交互).进程queue(父进程与子进程进行交互或 ...
- 消息队列Queue大全
消息队列Queue大全 (http://queues.io/) 作业队列,消息队列和其他队列.几乎所有你能想到的都在这. 关于 那里有很多排队系统.他们每个人都不同,是为解决某些问题而创建的.这个页面 ...
- rabbitmq队列模式以及交换机模式
常用命令 ## rabbitmq 常用命令 进入到sbin目录 启用管理插件:rabbitmq-plugins enable rabbitmq_management 启动服务: net start R ...
- 七RabbitMQ队列、Redis
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们.消息传递指的是程序之间 ...
最新文章
- 哈希一致性、哈希取余、自定义轮询分片的比较
- 利用CSS实现文本省略效果
- jekyll网站上传服务器,jekyll 高效搭建个人博客之完整流程
- 无招胜有招之Java进阶JVM(四)内存模型plus
- Eclipse export导出war包报错(Module name is invalid.)
- python支持按指定字符串分割成数组_python – 如何切割numpy数组字符串的每个元素?...
- 带你认识4种设计模式:代理模式、装饰模式、外观模式和享元模式
- python3中解码base64(线下base64解码模板)
- EPOLL AND Nonblocking I/O
- 一博商业进销存管理系统 v2008 怎么用
- Securing Big Data Provenance for Auditors: The Big Data Provenance Black Box as Reliable Evidence
- matlab中min()函数,matlab基本函数min
- redisRDB持久化中dir路径配置问题
- watchOS7.2新增“心适能功能” 监测和分类心肺适能水平
- 西部数据移动硬盘识别不了
- Unity Hub和Unity项目的关系
- 面试时,可以问面试官问题总结
- 物联网网线POE供电主控设计方案
- 足球相关的英文专业术语(持续更新中...Ctrl+F可直接进行搜索)
- JVM学习笔记07-垃圾回收
热门文章
- postgresql模糊匹配正则表达式性能问题
- sudo: Cannot execute /usr/local/bin/zsh: No such file or directory 问题
- SpringMVC启动过程详解(li)
- 好友消息和群消息区别
- 入门monkeyrunner7-monkeyrunner demo3 EasyMonkeyDevice+hierarchyviewer +monkeyrunner+截图对比
- [转]CPoint+CSize+CRect学习大纲
- [转帖]ISE与Modelsim联合观察中间信号
- 如何修改Vs2008环境变量
- vsphere client中部署OVF项目后为项目分配IP
- Mysql 外键创建失败原因