2019独角兽企业重金招聘Python工程师标准>>>

###工作队列 ###(使用Java客户端)

在这第一指南部分,我们写了通过同一命名的队列发送和接受消息。在这一部分,我们将会创建一个工作队列,在多个工作者之间使用分布式时间任务。 工作队列(亦称:任务队列)背后主要的思想是避免立即处理一个资源密集型任务并且不得不一直等待完成。相反我们可以计划着让任务后续执行。我们将任务封装成消息,发送到队列中。一个工作者进程在后台运行,获取任务并最终执行任务。当你运行多个工作者,所有的任务将会被他们所共享。

在web应用程序中,这个理念是特别有用的,你无法在一个短暂的http请求中处理一个复杂的任务。

###准备 在先前的指南中,我们发送了一个包含"Hello World!"消息。现在我们将要发送一些字符串,用来代表复杂的任务。我们没有一个真实的任务,比如图片的调整大小或者pdf文件渲染,所以我们通过Thread.sleep()函数,伪装一个我们是很忙景象。我们将会把字符串中点的数量来代表它的复杂度;每一个点将要花费一秒的工作。例如,一个使用Hello...描述的假任务会发送三秒。

我们将会轻量的修改我们以前例子中Send.java代码,使其允许任意的消息可以通过命令行发出。这个程序将要计划安排任务到我们的工作队列中,所以我们把它命名为NewTask.java:

String message = getMessage(argv);
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

一些帮助从命令行中获取消息参数:

private static String getMessage(String[] strings){if (strings.length < 1)return "Hello World!";return joinStrings(strings, " ");
}private static String joinStrings(String[] strings, String delimiter) {int length = strings.length;if (length == 0) return "";StringBuilder words = new StringBuilder(strings[0]);for (int i = 1; i < length; i++) {words.append(delimiter).append(strings[i]);}return words.toString();
}

我们老的Recv.java程序也要求做些改变:它需要将消息体中每个点伪装成一秒。从队列中获取消息,运行任务,所以我们将它称之为Worker.java:

while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");        doWork(message);System.out.println(" [x] Done");
}

我们伪装的任务中冒充执行时间:

private static void doWork(String task) throws InterruptedException {for (char ch: task.toCharArray()) {if (ch == '.') Thread.sleep(1000);}
}

在第一部分指南中那样编译它们(jar 文件需要再工作路径上):

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

###循环分派

使用任务队列的优势之一是我们是容易并行处理。如果我们正在处理一些堆积的文件的话,我们仅仅需要增加更多的工作者,通过这种方式我们是容易扩展的。 首先,让我们试着在同一时间运行两个工作者实例。他们都会从队列中获取消息,但是具体怎样做呢?让我们一起来看一看。 你需要三个打开的控制平台,其中两个用来运行工作者程序。他们将会是我们的两个消费者-C1和C2。

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C

在这第三个控制平台我们用来发布新的任务。一旦你启动消费者,你就可以发布消息了:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

让我们看看什么被投递到我们工作者那里:

shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..'[x] Received 'Fourth message....'

默认情况想,RabbitMQ将会把每一个消息发送给下一个消费者。平均下来每个消费者获取的消息数量是相同的。这种分布式消息方式被称为轮询。试试三个或更多的工作者。

###消息确认

处理一个任务可能花费数秒时间,你可能会好奇如果一个消费者开始一个长任务,并且在处理完成部分的情况下就死掉了会发生什么情况。就我们当前的代码来说,一旦RabbitMQ将消息传递给消费者,它就会立即将消息从内存中删除。在这种情况下,如果你杀掉一个正在处理的工作者你会丢失它正在处理的消息。我们也同时失去了已经分配给这个工作者并且没有开始处理的消息。 但是我们不想丢失任何任务,如果一个工作者死掉,我们期望将任务传递给另一个工作者。 为了保证每一个消息不会丢失,RabbitMQ支持消息确认机制。一个消息确认是由消费者发出,告诉RabbitMQ这个消息已经被接受,处理完成,RabbitMQ 可以删除它了。 如果一个消费者没有发送确认信号,RabbitMQ将会认定这个消息没有完全处理成功,将会把它传递给另一个消费者。通过这种方式,即使工作者有时会死掉,你依旧可以保证没有消息会被丢失。 这里不存在消息超时;RabbitMQ只会在工作者连接死掉才重新传递这个消息。即使一个消息要被处理很长很长时间,也不是问题。 消息确认机制默认情况下是开着的。在先前的例子中我们是明确的将这个功能关闭no_ack=True。是时候移除这个标识了,一旦我们完成一个任务,工作者需要发送一个确认信号。

QueueingConsumer consumer = new QueueingConsumer(channel);
boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();//...      channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

使用这段代码,我们可以保证即使你将一个正在处理消息的工作者通过CTRL+C来终止它运行,依旧没有消息会丢失。稍后,工作者死亡后没有发送确认的消息会被重新传递。

忘掉确认

这是一个普遍的错误,就是忘记确认。这是一个很简单的错误,但是这后果是严重的。当你的客户端退出,消息会重新传递(看上去是随机传递的),RabbitMQ会越来越占用内存,因为它不会释放哪些没有发送确认的消息。

为了调试这种类型的错误,你可以使用rabbitmqctl打印出messages_unacknowledged属性:

$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello    0       0
...done.

###消息持久化 我们已经学习了如何在确定消费者是否已经死掉,并且保证任务不被丢失。但是如果RabbitMQ服务器停止,我们的任务依旧会丢失。

当RabbitMQ退出或者崩溃,它将会忘记这队列和消息,除非你告诉它不要这样做。两个事情需要做来保证消息不会丢失:我们标记队列和消息持久化。

首先,我们需要确保RabbitMQ不会丢失我们的队列,为了这样做,我们需要将它声明为持久化:

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

虽然这命令是正确的,但它不会立即在我们的程序里运行。那是因为我们已经定义了一个不持久化的hello队列。RabbitMQ不允许你使用不同的参数重新定义一个存在的队列,如果你试着那样做它会返回一个错误。有个快速的变通方案-让我们声明一个不同名字的队列,比如task_queue:

boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

这个queuqDeclare的改变需要应用在生产者和消费者的代码中。 在这点上,我们可以保证即使RabbitMQ重启,task_queue队列也不会丢失。现在我们需要标记消息持久化 - 通过设置MessageProperties(实现了BasicProperties)的值为PERSISTENT_TEXT_PLAIN

import com.rabbitmq.client.MessageProperties;channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

注意消息持久化 标记消息持久化不能完全保证消息不会被丢失,虽然这样会告诉RabbitMQ保存消息到硬盘上。但是对于RabbitMQ依旧有个短暂的时间窗口对于接收一个消息并且还没有完成保存。同样,RabbitMQ不能让每个消息同步--它可能仅仅保存在缓存中,还没有真正的写入到硬盘中。这持久化的保证不是健壮的,但是对我们的简单的任务队列来说是足够了。如果你需要更健壮的持久化保证,你可以使用出版者确认。

###公平分发 你可能注意到了,分发过程并没有如我们想的那样运作。例如,在某一种情况下有两个工作者,当所有奇数消息是很多的并且所有偶数的是少量的,一个工作者会一直忙碌下去,而另一个则会几乎不做什么事情。好吧,RabbitMQ不会在意那个事情,它会一直均匀的分发消息。 这种情况发生因为RabbitMQ仅仅分发消息到队列中。它不关心有多少消息没有由发送者发送确认信号。它仅仅盲目的将N个消息发送到N个消费者。

为了解决这个问题,我们可以使用basicQos方法,设置prefetchCount=1。这样将会告知RabbitMQ不要同时给一个工作者超过一个任务,或者换句话说在一个工作者处理完成,发送确认之前不要给它分发一个新的消息。代替,把消息分发到下一个不繁忙的工作者。

int prefetchCount = 1;
channel.basicQos(prefetchCount);

注意队列大小

如果你的所有工作者是在忙碌,你的队列就会被填满。你将会想关注这件事,可能要添加更多的工作者,或者有些其他策略。

###把它们放在一起 我们的NewTask.java最终代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;public class NewTask {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv) throws java.io.IOException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);String message = getMessage(argv);channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}      //...
}

(NewTask.java source) 我们的Worker.java代码:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;public class Worker {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] argv)throws java.io.IOException,java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");channel.basicQos(1);QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(TASK_QUEUE_NAME, false, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");   doWork(message); System.out.println(" [x] Done" );channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}}//...
}

(Worker.java source) 使用消息确认和预读数量你可以建立一个工作队列。持久化选项使得RabbitMQ重启之后任务依旧存在。

想要了解更多关于通道方法和消息属性,你可以浏览javadocs online

现在我们可以移到指南3了,学习怎么样将相同的消息传递给多个消费者

转载于:https://my.oschina.net/OpenSourceBO/blog/379735

RabbitMQ入门(2)--工作队列相关推荐

  1. RabbitMQ入门:发布/订阅(Publish/Subscribe)

    在前面的两篇博客中 RabbitMQ入门:Hello RabbitMQ 代码实例 RabbitMQ入门:工作队列(Work Queue) 遇到的实例都是一个消息只发送给一个消费者(工作者),他们的消息 ...

  2. RabbitMQ入门到进阶

    1.MQ简介 MQ 全称为 Message Queue,是在消息的传输过程中保存消息的容器.多用于分布式系统 之间进行通信. 2.为什么要用 MQ 1.流量消峰 没使用MQ 使用了MQ 2.应用解耦 ...

  3. RabbitMQ入门到进阶(Spring整合RabbitMQSpringBoot整合RabbitMQ)

    1.MQ简介 MQ 全称为 Message Queue,是在消息的传输过程中保存消息的容器.多用于分布式系统 之间进行通信. ​ 编辑切换为居中 添加图片注释,不超过 140 字(可选) 2.为什么要 ...

  4. RabbitMQ入门到精通

    RabbitMQ 1. 消息中间件概述 1.1. 为什么学习消息队列 电子商务应用中,经常需要对庞大的海量数据进行监控,随着网络技术和软件开发技术的不断提高,在实战开发中MQ的使用与日俱增,特别是Ra ...

  5. RabbitMQ入门到掌握

    RabbitMQ入门到掌握 一.消息队列 1.MQ 的相关概念 1.2 什么是MQ 1.2 为什么要用MQ ①流量消峰 ②应用解耦 ③异步处理 1.3 MQ 的分类 ①ActiveMQ ②Kafka ...

  6. 超详细的RabbitMQ入门

    转载:超详细的RabbitMQ入门,看这篇就够了!-阿里云开发者社区 思维导图 一.什么是消息队列 消息指的是两个应用间传递的数据.数据的类型有很多种形式,可能只包含文本字符串,也可能包含嵌入对象. ...

  7. RabbitMQ 入门系列(10)— RabbitMQ 消息持久化、不丢失消息

    消息要保持"持久化",即不丢失,必须要使得消息.交换器.队列,必须全部 "持久化". 1. 生产者怎么确认 RabbitMQ 已经收到了消息? # 打开通道的确 ...

  8. rabbitMQ入门程序

    1.生产者 /*** rabbitMQ入门程序消费者** @author xiaoss* @date 2020年10月27日 22:02*/ public class Producer01 {//队列 ...

  9. rabbitmq 入门demo

    rabbitmq 入门demo http://www.cnblogs.com/jimmy-muyuan/p/5428715.html http://www.cnblogs.com/shanyou/p/ ...

  10. RabbitMQ 入门:2. Exchange 和 Queue

    上文RabbitMQ 入门:1. Message Broker(消息代理)提到过 RabbitMQ 实现了 AMQP 这个协议(RabbitMQ 所支持的 AMQP 的版本是 0.9.1),这个协议的 ...

最新文章

  1. HTML 表格垂直对齐方式
  2. C 语言快速入门,21 个小项目足矣!「不走弯路就是捷径」
  3. java 上传文件编码_(java)有什么办法把MultipartFile上传的文件转为utf-8的编码吗
  4. 为easyui添加多条件验证
  5. php打造自己的喜马拉雅,打造自己的私人知识宝库利器——mybase 7.3.5
  6. centos下安装JDK8的方法
  7. 在线火星文转换器工具
  8. 基于Angular5和WebAPI的增删改查(一)
  9. css img 适配尺寸_一次解决你的图像尺寸和定位问题
  10. 【路径规划】基于matlab粒子群融合遗传算法栅格地图路径规划【含Matlab源码 526期】
  11. 高级软件工程2017第2次作业
  12. UniWebView3 使用中遇到的坑
  13. 天津理工大学物联网通信技术实验1:数字基带信号(NRZ、NRZ-I、AMI、HDB3信道编码)
  14. 在Delphi2007下安装ReportMachine6.5
  15. 如何选择和阅读科技论文
  16. 互联网金融指导意见落地 行业发展开始步入正轨
  17. js unshift性能分析
  18. 跑步用挂脖耳机好还是无线耳机、公认最好的跑步耳机推荐
  19. 元学习入门详解(MAML算法及Reptile算法复现)
  20. jQuery实现无刷新切换主题皮肤功能

热门文章

  1. java log4j logback jcl_进阶之路:Java 日志框架全画传(下)
  2. mysql新增字段会锁表_MySQL锁(二)表锁:为什么给小表加字段会导致整个库挂掉?...
  3. php赋值一个数组,PHP入门教程之数组的定义和赋值
  4. php中访问控制_PHP之Trait详解
  5. 数据建模_浅谈数据仓库建设中的数据建模方法
  6. 两台linux电脑怎么互推文件夹,Llinux文件目录权限及chmod命令简析
  7. git 改了一段代码不想要了_想要壁纸不收费吗?简简单单用python代码实现
  8. 广东机电职业技术学校计算机怎么样,广东机电职业技术学院宿舍怎么样 住宿条件好不好...
  9. 20200217:下一个排列(leetcode31)
  10. java终结方法_java编程思想之并发(终结任务)