上一篇博文中简单介绍了一下RabbitMQ的基础知识,并写了一个经典语言入门程序——HelloWorld。本篇博文中我们将会创建一个工作队列用来在工作者(consumer)间分发耗时任务。同样是翻译的官网实例。

工作队列

在前一篇博文中,我们完成了一个简单的对声明的队列进行发送和接受消息程序。下面我们将创建一个工作队列,来向多个工作者(consumer)分发耗时任务。

工作队列(又名:任务队列)的主要任务是为了避免立即做一个资源密集型的却又必须等待完成的任务。相反的,我们进行任务调度:将任务封装为消息并发给队列。在后台运行的工作者(consumer)将其取出,然后最终执行。当你运行多个工作者(consumer),队列中的任务被工作进行共享执行。

这样的概念对于在一个HTTP短链接的请求窗口中处理复杂任务的web应用程序,是非常有用的。

准备

使用Thread.Sleep()方法来模拟耗时。采用小数点的数量来表示任务的复杂性。每一个点将住哪用1s的“工作”。例如,Hello... 处理完需要3s的时间。

发送端(生产者):NewTask.java

public class NewTask {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException {/*** 创建连接连接到MabbitMQ*/ConnectionFactory factory = new ConnectionFactory();// 设置MabbitMQ所在主机ip或者主机名factory.setHost("127.0.0.1");// 创建一个连接Connection connection = factory.newConnection();// 创建一个频道Channel channel = connection.createChannel();// 指定一个队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 发送的消息String message = "Hello World...";// 往队列中发出一条消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");// 关闭频道和连接channel.close();connection.close();}
}

工作者(消费者)Worker.java

public class Worker {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws IOException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");// 创建队列消费者final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());try {for (char ch: message.toCharArray()) {if (ch == '.') {Thread.sleep(1000);}}} catch (InterruptedException e) {} finally {System.out.println(" [x] Done! at " +new Date().toLocaleString());}}};channel.basicConsume(QUEUE_NAME, true, consumer);}
}

运行结果如下:


任务分发机制

正主来了。。。下面开始介绍各种任务分发机制。

Round-robin(轮询分发)

使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。

修改一下NewTask,使用for循环模拟多次发送消息的过程:

        for (int i = 0; i < 5; i++) {// 发送的消息String message = "Hello World"+Strings.repeat(".", i);// 往队列中发出一条消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}

我们先启动1个生产者实例,2个工作者实例,看一下如何执行:

从上述的结果中,我们可以得知,在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

Fair dispatch(公平分发)

您可能已经注意到,任务分发仍然没有完全按照我们想要的那样。比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。

这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将第n条消息发给第n个消费者。

为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注:如果所有的工作者都处于繁忙状态,你的队列有可能被填充满。你可能会观察队列的使用情况,然后增加工作者,或者使用别的什么策略。
       还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。这些内容会在下篇博文中讲述。

整体代码如下:生产者NewTask.java

public class NewTask {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws IOException {/*** 创建连接连接到MabbitMQ*/ConnectionFactory factory = new ConnectionFactory();// 设置MabbitMQ所在主机ip或者主机名factory.setHost("127.0.0.1");// 创建一个连接Connection connection = factory.newConnection();// 创建一个频道Channel channel = connection.createChannel();// 指定一个队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);int prefetchCount = 1;//限制发给同一个消费者不得超过1条消息channel.basicQos(prefetchCount);for (int i = 0; i < 5; i++) {// 发送的消息String message = "Hello World"+Strings.repeat(".",5-i)+(5-i);// 往队列中发出一条消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}// 关闭频道和连接channel.close();connection.close();}
}

消费者Worker.java

public class Worker {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws IOException, InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");// 打开连接和创建频道,与发送端一样Connection connection = factory.newConnection();final Channel channel = connection.createChannel();// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);//保证一次只分发一个// 创建队列消费者final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println(" [x] Received '" + message + "'");try {for (char ch: message.toCharArray()) {if (ch == '.') {Thread.sleep(1000);}}} catch (InterruptedException e) {} finally {System.out.println(" [x] Done! at " +new Date().toLocaleString());channel.basicAck(envelope.getDeliveryTag(), false);  }}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}

运行结果如下:

轻松搞定RabbitMQ(二)——工作队列之消息分发机制相关推荐

  1. 轻松搞定RabbitMQ开篇:Java消息队列与JMS的诞生

    Java 帝国之消息队列 原创: 刘欣 码农翻身 2017-02-06 张家村的历史 Java 帝国的张家村正在迎来一次重大的变革. 5年前网上购物兴起的时候, 帝国非常看好, 决定向这个领域进军, ...

  2. 一键换机显示二维码错误_【丽迪资讯】装路由器,忘记上网账号跟密码?别担心360amp;磊科智能路由器一键换机轻松搞定!...

    问:刚买了路由器宽带帐号密码设置了好多次都显示帐号或密码有错误(常见的错误代码是拨号678),宽带帐号密码反复设置了好多次一直不成功,因为宽带帐号密码这个事情给运营商客服也打过好多次电话,但就是显示帐 ...

  3. 计算机无法上网的软件故障,解决你99%无法联网问题,高手教你只用1招轻松搞定...

    原标题:解决你99%无法联网问题,高手教你只用1招轻松搞定 无限君:现在生活中不管是电脑还是手机,没有了网络绝大多数功能会无法使用,在日常生活中手机连接无线网的故障非常罕见,基本上通过手机重启就可以轻 ...

  4. 52讲轻松搞定网络爬虫(笔记)

    52讲轻松搞定网络爬虫 模块一:爬虫基础原理 1.HTTP基本原理 请求 响应 2.Web网页基础 网页的组成--HTML.CSS.JS 节点树 CSS选择器 3. 了解爬⾍的基本原理 获取⽹⻚: 提 ...

  5. 如何阻止华为杀应用_华为手机“杀”后台严重受不了?别慌,这些小技巧就能轻松搞定...

    原标题:华为手机"杀"后台严重受不了?别慌,这些小技巧就能轻松搞定 现在有越来越多的小伙伴喜欢使用华为手机,觉得其性价比非常高.但是不少使用华为手机的朋友会发现,华为手机" ...

  6. 函数字节不对齐函数崩溃_Excel中统计字符数,不需要一个一个的数,len函数能轻松搞定...

    简介:要统计Excel单元格中的字符数,不需要一个一个的数,利用len函数就能轻松搞定. 问:什么是len函数? 答:自动统计字符数的函数 问:怎样记住len函数 答:len是length(长度)的简 ...

  7. 十招轻松搞定社会媒体

    十招轻松搞定社会媒体 十招轻松搞定社会媒体   社会媒体化成为在线营销的重要手段不再是什么秘密了,写一些好的内容并且进行推广只是这个过程的一小部分,你还必须加强在网络社会的曝光机会,有些人不知道如何开 ...

  8. captura录屏发生了一个错误_录屏教程的方法有哪些?学会这两种轻松搞定

    录屏教程的方法有哪些?在日常的工作以及生活当作,录屏教程的需求是经常需要做的.比方说:工作的时候,需要录屏新产品操作教程.生活中需要录屏学习基础教程等等.所以,对于录屏教程能够找到一个好用的方法是非常 ...

  9. U盘启动盘制作方法 2种绝招轻松搞定

    U盘装系统目前是非常流行的,有这么一句话说的好"U盘在手,系统无忧".就一个小小的U盘便于随身携带,就一个小小的U盘就能解决我们Windows系统崩溃的烦恼,那么好的事不用白不用. ...

最新文章

  1. 网易云信国际短信上线啦!
  2. 成本预算的四个步骤_全网推广步骤有哪些?
  3. 电脑开机启动修复无法自动修复此计算机,Win7系统开机无法自动修复此计算机如何解决...
  4. Python自定义分页组件
  5. Docker学习总结(24)——在Docker中监视Java应用程序的5种方法
  6. 植物大战僵尸全明星服务器维修多长时间,植物大战僵尸全明星常见问题FAQ详解...
  7. 小米路由器mini 刷潘多拉固件教程
  8. AltiumDesigner 的 PcbDoc文件转 pads 的 PCB文件
  9. PCIe扫盲——基于WinDriver快速开发PCIe驱动简明教程
  10. 如何删除360浏览器的桔梗导航
  11. GeoServer style(sld)中文乱码解决方法
  12. uniapp中简单方法之上传图片到腾讯云
  13. 图像压缩之DCT变换
  14. win10 子系统之 Ubuntu,解放你的生产力
  15. Win10中如何把语言栏缩到系统托盘
  16. CTF-PWN学习-为缺少指导的同学而生
  17. 浏览器兼容性总结: IE 火狐 谷歌 360 搜狗
  18. SPARK-SQL - group分组聚合api,agg()
  19. JS window.open()打开新窗口、监听页面打开关闭状态(详细)
  20. ubuntu 查看mysql数据库_ubuntu mysql查看数据库

热门文章

  1. Vue入门 ---- 简易留言板
  2. kylin与superset集成实现数据可视化
  3. html5 制作会转的风扇,HTML5学习第5天[乱撞的球]可以听到风扇声的哟
  4. webform中提交按钮同时执行更新和插入操作_软件测试中的功能测试点(三)
  5. matlab检测串口数据帧头,MATLAB 串口读取姿态数据及GUI实时动态显示设计
  6. 在电脑上显示未知发布者怎么办_电脑开机后显示器黑屏只有鼠标能动,怎么办呢?...
  7. 计算机主机外部的连接端口有何作用,微机原理 课后题 标准答案
  8. 一个server搭建多个tomcat的时候session混乱情况及解决
  9. 如何用原型体现你的专业度?
  10. java 对象的交互_Java中什么是对象的交互?解释一下交互怎么操作?