二、Work Queues(using the Java Client) 走起

   在第上一个教程中我们写程序从一个命名队列发送和接收消息。在这一次我们将创建一个工作队列,将用于分发耗时的任务在多个工作者(worker)之间。
背后的主要思想工作队列(又名:任务队列)是为了避免立即做一个资源密集型任务,不得不等待它完成。相反,我们安排的任务要做。我们封装任务作为消息并将其发送到一个队列。工作进程在后台运行将流行的任务和最终执行的工作。当您运行许多worker的任务将在他们之间共享。这个概念是特别有用的web应用程序中处理复杂的任务是不可能在一个短的HTTP请求窗口。  为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。 

三、Preparation(预备)

 在RabbitMQ系列教程中,在前一章柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)
  • 我们稍微改一下send.java 代码(我们上一章的事例中有源代码),允许任意从命令文件发送消息,这个程序将我们的工作计划放在任务队列中,现在让我先做个类NewTask.java
String message = getMessage(argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

  从命令行参数,得到一些帮助信息:
private static String getMessage(String[] strings){ if (strings.length < 1) return "Hello World!"; return joinStrings(strings, " "); } private static String joinStrings(String[] strings, String delimiter) { int length = strings.length; if (length == 0) return ""; StringBuilder words = new StringBuilder(strings[0]); for (int i = 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }

我们的老的Recv.java程序也需要一些变化:
  • 它需要假的第二个消息体的每一个点的工作。
  • 它将从队列中发布消息并执行任务,所以我们称之为Worker.java:
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done"); }

我们的假任务模拟执行时间

private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }
编译它们 (工作目录中的jar文件):

$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

四、Round-robin dispatching

 使用一个任务队列的优点之一是能够轻易平行的工作。如果我们建立一个积压的工作,我们可以添加更多的任务(worker),这样,很容易。
首先,让我们试着两个任务(worker)实例同时运行。他们都将从队列中获取消息,但究竟如何?让我们来看看。你需要三个主机开放。 两人将运行工计划。这些游戏机将我们两个消费者- C1和C2。
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C

shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C

在第三个我们将发布新的任务。一旦你开始了消费者可以发布几条消息:

shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

   让我们看看我们的workers都传输什么:
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker[*] Waiting for messages. To exit press CTRL+C[x] Received 'Second message..' [x] Received 'Fourth message....'

默认情况下,RabbitMQ将每个消息发送到下一个消费者,在序列。平均每个消费者将获得相同数量的信息。这种方式称为循环分配消息。试试这三个或更多的worker。

五、Message acknowledgment

 做一个任务可能需要几秒钟。如果一个消费者开始漫长的任务而死,只有部分完成,你可能想知道到底发生什么事情了?
在我们当前的代码情况下,一旦RabbitMQ向客户传递一个消息立即从内存中删除。在这种情况下,如果你kill(杀)了一个任务(worker), 我们将失去刚刚处理的消息。我们也会失去所有的消息被派往这个特殊的工人,但尚未处理。  但是我们不想失去任何任务。如果一个工作者(worker)死亡,我们想要交付的任务到另一个worker。 为了确保消息是从来都不会迷失的,RabbitMQ支持消息应答。发送ack(knowledgement)从消费者告诉RabbitMQ特定的消息已经收到, 处理和RabbitMQ是免费的,删除它。如果一个消费者停止没有发送ack,RabbitMQ会明白一个消息
QueueingConsumer consumer = new QueueingConsumer(channel); boolean autoAck = false; channel.basicConsume("hello", autoAck, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //... channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }

使用这段代码,我们可以确信,即使你杀了一个工作者(worker)使用CTRL + C处理消息时,没有将丢失。工作者(worker)死亡后不久所有未得到确认的消息将被发送。
  问题:
   消息响应
  • 我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
  • 为了防止消息丢失,RabbitMQ提供了消息[i]响应(acknowledgments)[/i]。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
  • 如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,及时工作者(workers)偶尔的挂掉,也不会丢失消息。
  • 消息是没有超时这个概念的;当工作者与它断开连的时候,RabbitMQ会重新发送消息。这样在处理一个耗时非常长的消息任务的时候就不会出问题了。
  • 消息响应默认是开启的。之前的例子中我们可以使用no_ack=True标识把它关闭。是时候移除这个标识了,当工作者(worker)完成了任务,就发送一个响应。

六、Message durability

 我们已经学会了如何确保即使消费者死亡,任务也不会丢失。但是如果RabbitMQ服务器停止我们的任务仍将失去。
RabbitMQ退出或崩溃时它会忘记队列和消息,除非你告诉它不要。两件事必须确保消息不会丢失:我们需要两个队列和消息标记为耐用。
首先,我们需要确保RabbitMQ永远不会失去我们的队列。为了这样做,我们需要声明它经久耐用:  channel.queueDeclare("hello", durable, false, false, null); 尽管这个命令本身是正确的,它不会在我们目前的设置工作。这是因为我们已经定义了一个名为hello的队列不耐用。RabbitMQ不允许您重新定义现有队列具有不同参数并返回一个错误的任何程序,试图这样做。但有一个快速解决方案——让我们声明一个队列具有不同名称,例如task_queue:  channel.queueDeclare("task_queue", durable, false, false, null); 这queueDeclare改变需要应用于生产者和消费者的代码。在这一点上我们确信task_queue队列不会丢失,即使RabbitMQ重启。现在我们需要我们的消息标记为持久性——通过设置MessageProperties PERSISTENT_TEXT_PLAIN(实现BasicProperties)的价值。  channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());  问题: 消息标记为持久性并不能完全保证信息不会丢失。虽然它告诉RabbitMQ将消息保存到磁盘,还有一个短的时间窗口当RabbitMQ已经接受消息和尚未保存它。 RabbitMQ也不做fsync(2)为每个消息——它可能只是保存到缓存和不写入磁盘。持久性保证不强,但它是足够为我们简单的任务队列。 如果你需要一个更强的保证,那么你可以使用发布者证实。
boolean durable = true;

boolean durable = true;

import com.rabbitmq.client.MessageProperties;

七、Fair dispatch

 您可能已经注意到,调度仍然不会完全按照我们想要的工作。例如在两名工人(worker)的情况,当所有奇怪的消息是沉重的,甚至消息是光,一名工人将不停地忙,另一个几乎不做任何工作。RabbitMQ并不了解,仍将均匀调度信息。这仅仅是因为RabbitMQ分派消息进入队列的消息的时候。它不看看消费者未得到确认的消息的数量。只是盲目地分派每n个消息到n个消费者。
为了打败,我们可以使用basicQos prefetchCount = 1设置方法。这告诉RabbitMQ不给多个消息到一个工人。或者,换句话说,不要派遣工人的新消息,直到处理和承认了前一个。相反,它会分派到下一个工人,不是仍然很忙。
  channel.basicQos(prefetchCount);

int prefetchCount = 1;

问题:
如果所有的工人们正忙着,你的队列可以填满。你要留意,也许添加更多的工人,或有其他一些策略。

八、Putting it all together

 Final code of our NewTask.java class:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... } 

(NewTask.java source)

And our Worker.java:

import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Channel; import com.rabbitmq.client.QueueingConsumer; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... }

柯南君:看大数据时代下的IT架构(5)消息队列之RabbitMQ--案例(Work Queues起航)...相关推荐

  1. 柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航)...

    柯南君:看大数据时代下的IT架构(4)消息队列之RabbitMQ--案例(Helloword起航) 二.起航 本章节,柯南君将从几个层面,用官网例子讲解一下RabbitMQ的实操经典程序案例,让大家重 ...

  2. 看大数据时代下的IT架构(1)图片服务器之演进史

            柯南君的公司最近产品即将上线,由于产品业务对图片的需求与日俱增,花样百出,与此同时,在大数据时代,大流量的冲击下,对图片服务器的压力可想而知,那么今天,柯南君结合互联网的相关热文,加上 ...

  3. 隐私保护与隐私计算研讨会 | 余维仁:大数据时代下需要各界更新对个人隐私保护的固有认识

    8月13日下午,由深圳市信息服务业区块链协会.陀螺研究院.矩阵元主办,中国生物识别与计算机视觉科技创新产业联盟.金砖国家未来网络研究院中国分院.中国船舶综合技术经济研究院.深圳市人工智能产业协会.深圳 ...

  4. 在目前大数据时代下,怎么能成为一名合格的数据分析师

    "21世纪什么最贵,人才",在目前大数据时代下,什么最难找,什么最贵,实现数据价值的人,数据分析师. 但是对于数据分析师的认识,比较极端,但对数据分析师价值的认识正在回归理性.很多 ...

  5. 独家 | Michael I.Jordan:大数据时代下的安全实时决策堆栈与增强学习(视频+精华笔记)

    金秋九月,2017国际大数据产业技术创新高峰论坛暨大数据系统软件国家工程实验室第一次会议盛大开幕,大数据系统软件国家工程实验室作为大数据系统软件技术研发与工程化的国家级创新平台,将通过大数据系统软件技 ...

  6. 在大数据时代下金融风控的分类

    @Date:2018-05-24 @Author:等等 依托城市数据湖海量数据资源,尤其是在信贷领域对企业或者个人的个人信贷画像描述评判准则已经是第三方房贷企业或者银行对借贷人的评分标准.风控建模以数 ...

  7. 大数据时代下对马克思主义的一些探讨

    1 引言 最近因为查阅文献的原因,让我接触到了大数据,以前也听到过"大数据"这个词语,但却从未引起我的重视.平时需要查阅大量文献,相比于数十年前,现在的搜索引擎为学习提供了莫大的方 ...

  8. Thinking in BigData(二)大数据时代下的变革

    大数据时代的思维变革 A Revolution That Will Transform How We Live, Work, and Think. 不期而遇的一本<大数据时代>将我引进大数 ...

  9. 爱肤宝医生产品负责人王照陆:大数据时代下的人工智能医疗

    嘉宾介绍 王照陆:爱肤宝医生产品负责人.前华为研发工程师,CSDN博客专家,同济大学MBA,负责过医疗智能硬件血压.血糖.体脂等产品设计与数据分析:现负责皮肤医疗大数据产品,从0-1实现皮肤轻问诊预约 ...

最新文章

  1. (转载)C语言的零长数组
  2. dlib 68个关键点 人脸姿态
  3. python读取中文文件乱码-详解Python的json文件读取及中文乱码显示问题解决方法...
  4. P2473 [SCOI2008]奖励关
  5. bzoj5368 [Pkusc2018]真实排名
  6. 图解http-ping使用
  7. Asp.net生成缩略图
  8. 机器学习、人工智能 博文链接汇总
  9. IDEA同时使用maven和gradle
  10. [css] 使用rem的优缺点是什么?和使用百分比有什么区别?
  11. Linux 下的0 1 2特殊文件描述符~
  12. 基于go的微服务搭建(七) - 服务发现和负载均衡
  13. java 数字信号_GitHub - Bazingaliu/JavaDsp: 数字信号处理(DSP)方面的Java封装,包含常用的一些处理方法,如滤波、信号变换等等。...
  14. 脚本语言和编程语言的比较
  15. java条码扫描_JAVA生成扫描条形码
  16. CVPR 9999 Best Paper:《一种加辣椒的番茄炒蛋》
  17. 龙芯电脑的详细资料,支持国货的请进来!
  18. RV-LINK:用RISC-V开发板做RISC-V仿真器
  19. 【图片新闻】美海军的下一艘战舰与“祖姆沃尔特”DDG-1000极其相似
  20. python+opencv别踩白块儿游戏辅助,一天一个opencv小项目(已开源)

热门文章

  1. 从一个需求看问题的无限复杂化和简单化
  2. 两个无序单链表,排序后合并成一个有序链表
  3. 巧用CSS的RevealTrans滤镜
  4. 一套使用注入和Hook技术托管入口函数的方案
  5. 日期与unix时间戳之间的转换C++实现
  6. linux驱动:音频驱动(二)ASoc
  7. php symfony 安装,Symfony的安装和配置方法
  8. ios超级签名_ios超级签名何以固若金汤?原因在这里
  9. map multimapc++_C++的Map和Multimap
  10. php insert failed,较大的MySQL INSERT语句导致PHP错误