rabbitmq注意事项
发送各种类型的消息
如果是以字节数组的形式包装成Message,那么接收的时候是用byte[]的形式接收,之后再转换成对应的类型
以byte[]形式发送:
// String类型Message msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(message)).build();rabbitTemplate.send(msg);// pojoMessage msg=MessageBuilder.withBody(objectMapper.writeValueAsBytes(user)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();rabbitTemplate.convertAndSend(msg);// Map类型Message message=MessageBuilder.withBody(objectMapper.writeValueAsBytes(dataMap)).build();rabbitTemplate.convertAndSend(message);
以byte[]形式接收
@RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")public void consumeMessage(@Payload byte[] message){try {//TODO:接收StringString result=new String(message,"UTF-8");log.info("接收到消息: {} ",result);//TODO:接收对象User user=objectMapper.readValue(message, User.class);log.info("接收到消息: {} ",user);//TODO:接收多类型字段信息Map<String,Object> resMap=objectMapper.readValue(message,Map.class);log.info("接收到消息: {} ",resMap);}catch (Exception e){log.error("监听消费消息 发生异常: ",e.fillInStackTrace());}}
如果发送的时候不是发送的message,而是发送的对象,那么接收的时候直接用相应的对象接收就可以。
以对象的形式发送
// 直接发送字符串格式rabbitTemplate.convertAndSend(str);// 直接发送pojorabbitTemplate.convertAndSend(user);// 直接发送maprabbitTemplate.convertAndSend(message);
以对象的形式接收
@RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")public void consumeMessage(Map<String,Object> resMap){try {//Map<String,Object> resMap=objectMapper.readValue(message,Map.class);log.info("接收到消息: {} ",resMap);}catch (Exception e){log.error("监听消费消息 发生异常: ",e.fillInStackTrace());}}
@RabbitListener(queues = "${basic.info.mq.queue.name}",containerFactory = "singleListenerContainer")public void consumeMessage(@Payload User user){try {log.info("接收到实体消息: {} ",user);}catch (Exception e){log.error("监听消费消息 发生异常: ",e.fillInStackTrace());}}
注意:此时接收的都是消息体的数据,如果想要接收消息头中的信息
- 可以直接接收Message对象,从中取body和Properties.
//支持自动声明绑定,声明之后自动监听队列的队列,此时@RabbitListener注解的queue和bindings不能同时指定,否则报错@RabbitListener(bindings ={@QueueBinding(value = @Queue(value = "q5",durable = "true"),exchange =@Exchange(value = "zhihao.miao.exchange",durable = "true"),key = "welcome")})public void handleMessage(Message message){System.out.println("====消费消息"+message.getMessageProperties().getConsumerQueue()+"===handleMessage");System.out.println(message.getMessageProperties());System.out.println(new String(message.getBody()));}
- 使用注解@Payload取的是消息体的信息, @Header取的是消息头的信息
//获取消息的头属性和body属性@RabbitListener(queues = "zhihao.miao.order")public void handleMessage(@Payload String body, @Headers Map<String,Object> headers){System.out.println("====消费消息===handleMessage");System.out.println(headers);System.out.println(body);}
//获取特定的消息@RabbitListener(queues = "zhihao.miao.order")public void handleMessage(@Payload String body,@Header String token){System.out.println("====消费消息===handleMessage");System.out.println(token);System.out.println(body);}
关于消息转换的问题可以参考RabbitMQ笔记十三:使用@RabbitListener注解消费消息
发送消息的回馈
发送消息的消息反馈可以参考RabbitMQ:消息发送确认 与 消息接收确认(ACK)
@Beanpublic RabbitTemplate rabbitTemplate(){connectionFactory.setPublisherConfirms(true);connectionFactory.setPublisherReturns(true);RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMandatory(true);rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);}});rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);}});return rabbitTemplate;}
如果Mandatory为false,找不到路由的队列,会直接丢弃消息,除了设置为true,如果路由失败,会将消息返回的方式外,还有一个方式是给目的队列设置为一个备份交换机,没有路由的消息直接发送给这个备份的就可以,详细参考:springboot整合rabbirmq(3.7.9)中使用mandatory参数获取匹配失败的消息以及存入rabbitmq备份交换器中!
多线程处理消息
默认情况下,消费者一次只能消费一个消息,为了提高性能,一个消费者可以同时消费多条消息,setConcurrentConsumers,setMaxConcurrentConsumers,setPrefetchCount,设置三个属性,
@Bean(name = "singleListenerContainer")public SimpleRabbitListenerContainerFactory listenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setConcurrentConsumers(1);factory.setMaxConcurrentConsumers(1);factory.setPrefetchCount(1);factory.setTxSize(1);factory.setAcknowledgeMode(AcknowledgeMode.AUTO);return factory;}
@Bean(name = "multiListenerContainer")public SimpleRabbitListenerContainerFactory multiListenerContainer(){SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factoryConfigurer.configure(factory,connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.NONE);factory.setConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.concurrency",int.class));factory.setMaxConcurrentConsumers(env.getProperty("spring.rabbitmq.listener.max-concurrency",int.class));factory.setPrefetchCount(env.getProperty("spring.rabbitmq.listener.prefetch",int.class));return factory;}
接收消息的时候,设置使用的factory,设置多线程的containerFactory。
@RabbitListener(queues = "${mail.queue.name}",containerFactory = "multiListenerContainer")public void consumeMailQueue(@Payload byte[] message){try {log.info("监听消费邮件发送 监听到消息: {} ",new String(message,"UTF-8"));mailService.sendEmail();}catch (Exception e){e.printStackTrace();}}
消息的确认机制
可以参考文章RabbitMQ:消息发送确认 与 消息接收确认(ACK)
项目实战
本文参考SpringBoot整合RabbitMQ之 典型应用场景实战一
rabbitmq注意事项相关推荐
- ABP vNext分布式事件总线RabbitMQ注意事项
[https://docs.abp.io/zh-Hans/abp/latest/Distributed-Event-Bus-RabbitMQ-Integration](ABP vNext官方文档链接) ...
- windows安装RabbitMQ注意事项
1.首先下载好ERLANG.RabbitMQ安装包,先安装erlang,设置好环境变量,然后再去安装MQ; 2.别人有两个报错: 一:RabbitMQ安装目录中不允许有空格; 二:安装rabbitmq ...
- RabbitMQ基础知识介绍、RabbitMQ的安装
RabbitMQ基础知识介绍 官方解释:MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法.应用程序通过 读写出入队列的消息 ...
- 阿里云服务器配置(一)基础
目录 卸载阿里云盾监控 安装docker 安装docker-compose 有网环境安装redis docker安装mysql5.7.35 第三步:拷贝容器配置文件 第四步:删除容器,重新启动新容器 ...
- Windows下RabbitMQ安装及注意事项
Windows下RabbitMQ安装及注意事项 简介 背景 1. RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现. Rabbit ...
- python 连接 rabbitMQ以及rabbitMQssl注意事项,password
pip3 install pika==1.1.0 官方对于pika有如下介绍# Since threads aren't appropriate to every situation, it does ...
- linux erlang安装教程,linux(CentOS7)中安装erlang(20.3)以及rabbitmq(3.6.15)的步骤以及一些注意事项...
标签: 首先下载安装包,之后先安装erlang,安装erlang需要很多依赖,所以一步步来: 首先 wxWidgets会报错,这个不是必须的,可以不安装,不影响 然后需要安装一些必须的依赖: yum ...
- 60分钟快速掌握RabbitMQ,java面试技巧和注意事项
简介 HikariCP 是用于创建和管理连接,利用"池"的方式复用连接减少资源开销,和其他数据源一样,也具有连接数控制.连接可靠性测试.连接泄露控制.缓存语句等功能,另外,和 dr ...
- 几年前,我撸了一套RabbitMQ的客户端
不好意思,又好多天没更文章了-- 眼看着离过年越来越近了,很多工作都要在年前冲刺.收个尾.比如:工作总结.绩效考核.奖金.确定今年 KPI-- 由于我负责的部门一百多人,虽然有下面的各位 Leader ...
- 别人家的团队怎么用RabbitMQ:我总结的5点规范
大概从 2013 年开始,我就开始了自己和 RabbitMQ 的接触,到现在已经有七年多了. 在这七年中,既有一些对 RabbitMQ 的深度体验,更有无数的血泪史. 而根据我这么多年的使用经验,我将 ...
最新文章
- Python,OpenCV中的非局部均值去噪(Non-Local Means Denoising)
- 接收不到其他机器发来的报文_大厂真实案例:线上四台机器同一时间全部 OOM......
- ​SoundCloud的web播放库Maestro演进之路
- C# WPF MVVM开发框架Caliburn.Micro 名称Transformer⑩①
- mysql aes encrypt_mysql加密函数aes_encrypt()和aes_decrypt()使用教程
- JavaScript 原型总结三 函数和对象的关系
- php快速删除,学习猿地-php如何快速删除文章
- 美赛如何选题matlab,2017美赛D题—学习记录
- 运放虚短虚断的成立条件
- 从QQtabBar看css命名规范BEM
- linux命令测网速
- 测试工程师面试一般常用问题
- 打印资料可以当天发货的网上打印平台选哪家
- 左移赋值运算符 (<<=)
- mysql数据库存储生僻字_mysql 生僻字存储
- 如何实现3D网页游戏?
- 爆款小游戏用的都是什么游戏开发引擎?
- 2021-01-06 PMP 群内练习题 - 光环
- 精益思想Lean thinking
- python简笔画蚂蚁_使用python turtle绘制简笔画大白-Go语言中文社区
热门文章
- java填空题 在非静态成员方法中_Java编程基础知识点汇总习题集--答案word版本
- MacOS ClashX 配置代理端口为0问题7890 配置iterm2终端代理
- Python 3 从入门到精通 Mac OS
- JAVA rs 是否要关闭_关闭结果集rs和statement以后,是否还要关闭数据库连接呢?...
- 基于用户画像的商品推荐挑战赛
- matplotlib绘制横向柱状图
- php directory lister,PHP目录展示工具DirectoryLister
- 机器学习课程 Neural Netword for Machine Learning笔记
- 【Codeforces Round #585 (Div. 2) E】Marbles【状压DP】
- 动态DP详解(转载)