Java 小记 — RabbitMQ 的实践与思考

前言

本篇随笔将汇总一些我对消息队列 RabbitMQ 的认识,顺便谈谈其在高并发和秒杀系统中的具体应用。

1. 预备示例

想了下,还是先抛出一个简单示例,随后再根据其具体应用场景进行扩展,我觉得这样表述条理更清晰些。

RabbitConfig:

@Configuration
public class RabbitConfig { @Bean public Queue callQueue() { return new Queue(MQConstant.CALL); } }

Client:

@Component
public class Client { @Autowired private RabbitTemplate rabbitTemplate; public void sendCall(String content) { for (int i = 0; i < 10000; i++) { String message = i + "-" + content; System.out.println(String.format("Sender: %s", message)); rabbitTemplate.convertAndSend(MQConstant.CALL, message); } } }

Server:

@Component
public class Server { @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { Thread.sleep(1000); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message)); } }

Result:

Sender: Hello, are you there!
Receiver: reply("Hello, are you there!") Yes, I just saw your message!

以上示例会在 rabbitmq 中创建一条队列 CALL, 消息在其中等待消费:

在此基础上的简单扩展我就不再写案例了,比如领域模块完成了其核心业务规则之后可能需要更新缓存、写个邮件、记个复杂日志、做个统计报表等等,这些不需要及时反馈或者耗时的附属业务都可以通过异步队列分发,以此来提升核心业务的响应速度,同时如此处理能让领域边界更加清晰,代码的可维护性和持续拓展的能力也会有所提升。

2. 削峰

上个示例中我提到的应用场景是解耦和通知,再接着扩展,因其具备良好的缓冲性质,所以还有一个非常适合的应用场景那就是削峰。对于突如其来的极高并发请求,我们可以先瞬速地将其加入队列并回复用户一个友好提示,然后服务器可在其能承受的范围内慢慢处理,以此来防止突发的 CPU 和内存 “爆表”。

改造之后对于发送方来说当然是比较爽的,他只是将请求加入消息队列而已,处理压力都归到了消费端。接着思考,这样处理有没有副作用?如果这个请求刚好是线程阻塞的,那还要加入队列慢慢排队处理,那不是完蛋了,用户要猴年马月才能得到反馈?所以针对此,我觉得应该将消费端的方法改为异步调用(即多线程)以提升吞吐量,在 Spring Boot 中的写法也非常简单:

@Component
public class Server { @Async @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { Thread.sleep(100); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!", message)); } }

参照示例一的方法,我发布了 10000 条消息加入队列,且消费端的调用每次阻塞一秒,那可有意思了,什么时候能处理完?但如果开几百个线程同时处理的话,那几十秒就够了,当然具体多少合适还应根据具体的业务场景和服务器配置酌情考虑。另外,别忘了配线程池:

@Configuration
public class AsyncConfig { @Bean public Executor asyncExecutor(){ ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(500); executor.setQueueCapacity(10); executor.setThreadNamePrefix("MyExecutor-"); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }

3. Exchange

RabbitMQ 可能为 N 个应用同时提供服务,要是你和你的蓝颜知己突然心有灵犀,在不同的业务上使用了同一个 routingKey,想想就刺激。因此,队列多了自然要进行分组管理,限定好 Exchange 的规则,接下来就可以独自玩耍了。

MQConstant:

public class MQConstant { public static final String EXCHANGE = "YOUCLK-MESSAGE-EXCHANGE"; public static final String CALL = MQConstant.EXCHANGE + ".CALL"; public static final String ALL = MQConstant.EXCHANGE + ".#"; }

RabbitConfig:

@Configuration
public class RabbitConfig { @Bean public Queue callQueue() { return new Queue(MQConstant.CALL); } @Bean TopicExchange exchange() { return new TopicExchange(MQConstant.EXCHANGE); } @Bean Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with(MQConstant.ALL); } }

此时我们再去查队列 CALL,可以看到已经绑定了Exchange:

当然 Exchange 的作用远不止如此,以上示例为 Topic 模式,除此之外还有 Direct、Headers 和 Fanout 模式,写法都差不多,感兴趣的童鞋可以去查看 “官方文档” 进行更深入了解。

4. 延时队列

延时任务的场景相信小伙伴们都接触过,特别是抢购的时候,在规定时间内未付款订单就被回收了。微信支付的 API 里面也有一个支付完成后的延时再确认消息推送,实现原理应该都差不多。

利用 RabbitMQ 实现该功能首先要了解他的两个特性,分别是 Time-To-Live Extensions 和 Dead Letter Exchanges,字面意思上就能理解个大概,一个是生存时间,一个是死信。整个过程也很容易理解,TTL 相当于一个缓冲队列,等待其过期之后消息会由 DLX 转发到实际消费队列,如此便实现了他的延时过程。

MQConstant:

public class MQConstant { public static final String PER_DELAY_EXCHANGE = "PER_DELAY_EXCHANGE"; public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE"; public static final String DELAY_CALL_TTL = "DELAY_CALL_TTL"; public static final String CALL = "CALL"; }

ExpirationMessagePostProcessor:

public class ExpirationMessagePostProcessor implements MessagePostProcessor { private final Long ttl; public ExpirationMessagePostProcessor(Long ttl) { this.ttl = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties() .setExpiration(ttl.toString()); return message; } }

Client:

@Component
public class Client { @Autowired private RabbitTemplate rabbitTemplate; public void sendCall(String content) { for (int i = 1; i <= 3; i++) { long expiration = i * 5000; String message = i + "-" + content; System.out.println(String.format("Sender: %s", message)); rabbitTemplate.convertAndSend(MQConstant.DELAY_CALL_TTL, (Object) message, new ExpirationMessagePostProcessor(expiration)); } } }

Server:

@Component
public class Server { @Async @RabbitHandler @RabbitListener(queues = MQConstant.CALL) public void callProcess(String message) throws InterruptedException { String date = (new SimpleDateFormat("HH:mm:ss")).format(new Date()); System.out.println(String.format("Receiver: reply(\"%s\") Yes, I just saw your message!- %s", message, date)); } }

Result:

Sender: 1-Hello, are you there!
Sender: 2-Hello, are you there!
Sender: 3-Hello, are you there! Receiver: reply("1-Hello, are you there!") Yes, I just saw your message!- 23:04:12 Receiver: reply("2-Hello, are you there!") Yes, I just saw your message!- 23:04:17 Receiver: reply("3-Hello, are you there!") Yes, I just saw your message!- 23:04:22

结果一目了然,分别在队列中延迟了 5秒,10秒,15秒,当然,以上只是我的简单示例,童鞋们可翻阅官方文档(“ ttl ” && “ dlx ”)进一步深入学习。

结语

本篇随笔不该就这么结束,但晚上心情不好,百感交集,无法继续写作,无奈至此。近期正在寻觅新的工作机会,我的微信:youclk,无论有没有推荐的,给我点鼓励,谢谢!

作者:捷义 
出处:http://www.cnblogs.com/youclk/ 
说明:转载请标明来源和作者

Java 小记 — RabbitMQ 的实践与思考相关推荐

  1. Dubbo Cloud Native 实践与思考

    Dubbo Cloud Native 实践与思考 分享简介 Cloud Native 应用架构随着云技术的发展受到业界特别重视和关注,尤其是 CNCF(Cloud Native Computing F ...

  2. Dubbo Cloud Native 之路的实践与思考

    Dubbo Cloud Native 实践与思考 Dubbo Cloud Native 实践与思考 分享简介 自我介绍 主要议程 Cloud Native 基础设施 服务发现(Service Disc ...

  3. Dubbo Cloud Native 实践与思考 1

    分享简介 Cloud Native 应用架构随着云技术的发展受到业界特别重视和关注,尤其是 CNCF(Cloud Native Computing Foundation)项目蓬勃发展之际.Dubbo ...

  4. 编写高性能Java代码的最佳实践

    编写高性能Java代码的最佳实践 摘要:本文首先介绍了负载测试.基于APM工具的应用程序和服务器监控,随后介绍了编写高性能Java代码的一些最佳实践.最后研究了JVM特定的调优技巧.数据库端的优化和架 ...

  5. 高性能Java代码的最佳实践

    高性能Java代码的最佳实践 前言 在这篇文章中,我们将讨论几个有助于提升Java应用程序性能的方法.我们首先将介绍如何定义可度量的性能指标,然后看看有哪些工具可以用来度量和监控应用程序性能,以及确定 ...

  6. 《Java程序设计》课堂实践内容总结

    <Java程序设计>课堂实践内容总结 实践一 要求 修改教材P98 Score2.java, 让执行结果数组填充是自己的学号: 提交在IDEA或命令行中运行结查截图,加上学号水印,没学号的 ...

  7. 分享狼叔关于《大前端工程化的实践与思考》

    前言 本文来自极客前端训练营的主题公开课,非原创. 作者简介 桑世龙(狼叔),阿里巴巴前端技术专家,nodejs<狼书>作者. 快速发展的大背景 前端发展太快了,在2004年之前,大概只要 ...

  8. 阿里总监谢纯良,讲透《阿里中台架构实践与思考》,PPT 音频!

    欢迎关注"技术领导力",每天早上8:30推送 来源| AS大会 本文整理了,阿里技术方案总监--谢纯良,在AS大会上的题为<阿里巴巴中台技术架构--实践与思考>的分享. ...

  9. 新工科教育的实践与思考——曾勇校长在工程教育高峰论坛上的报告

    2020年10月20日,中国工程教育认证协会在北京举办工程教育高峰论坛,电子科技大学校长曾勇以<新工科教育的实践与思考>为题,分享工程教育和工程人才培养的精彩观点,100多位工程教育和行业 ...

  10. 阿里云马劲:保证云产品持续拥有稳定性的实践和思考\n

    对所有的技术人员来说,业务可靠性提升是一个系统工程,涉及网络管理.IDC管理.服务器管理.交付管理.变更管理.故障管理.监控管理.预案管理.根因分析.容量规划.容灾演练.标准化建设.集成测试.泛操作管 ...

最新文章

  1. 【Thread】简单说说java.lang.Thread.State
  2. 尚硅谷Docker---1、docker杂记
  3. 用计算机写作文的好处,《用计算机写作文》教案
  4. VMware 安装kali——linux
  5. 南通大学python期末考试试卷答案_南通大学2015-2016年1学期《软工》作业点评总结...
  6. redlock java_Redlock分布式锁
  7. 定期定量采购_企业常见的六种采购策略
  8. Java 四种线程池
  9. 如何对西数硬盘固件进行逆向分析(下)
  10. 《深入浅出WPF》——命令学习
  11. 使用JS获取当前地理位置方法汇总
  12. C++ Const 初步总结(《C++程序设计语言》读后感)
  13. TypeScript介绍
  14. Yar服务端与客户端交互,请求原理
  15. html5弹页面腮红,腮红可以用手涂吗?腮红用刷子还是粉扑?
  16. jav中spark迁移hive到mongo(更新数据)
  17. exlc表格怎么换行_Excel表格如何自动换行
  18. 2022年最新android studio连接雷电模拟器 真机调试教程
  19. 自动抓取QQ好友列表?Windows UIA教你轻松实现
  20. WPF基础之XAML----(XAML 根元素和 xmlns,事件和 XAML 代码隐藏)

热门文章

  1. 关于linux操作系统的特点,LINUX操作系统有哪些概念和特点?
  2. java8中class怎么用_如何在Java中使用Class T?
  3. Decode Ways
  4. 用Tensorflow基于Deep Q Learning DQN 玩Flappy Bird
  5. 每日一题/015/tr(AB)=tr(BA)/反对称矩阵的充要条件/如果 AA‘=-A^2,那么是反对称矩阵
  6. 炼丹中遇到的一些BUG
  7. 93.复原IP地址(力扣leetcode) 博主可答疑该问题
  8. html遍历1到100,bat for循环100次:循环100求和
  9. SpringMVC前后台数据传递中Json格式的相互转换(前台显示格式、Json-lib日期处理)及Spring中的WebDataBinder浅析...
  10. 又延伸到socket去了。