目录

基本概念

代码与实例


基本概念

当某些客户端处理比较强的时候,就多发数据让其处理,当某些客户端处理一般的时候,就少发数据让其处理。

主要是让消费者处理完后,回信息给RabbitMQ,然后RabbitMQ才会发送下一个。

使用basicQos(perfetch = 1)

注意:使用公平分发必须关闭自动应答ack改成手动。

代码与实例

程序运行截图如下:

生产者:

两个消费者,其中一个的效率是另外一个的2倍(X2的效率)

源码如下:

Send.java

package work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnect();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个//限制发送给一个消费者不超过一条int prefetchCount = 1;channel.basicQos(prefetchCount);for(int i = 0; i < 50; i++){String msg = "Hello World : " + i;System.out.println("send msg : " + msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());Thread.sleep(i * 20);}channel.close();connection.close();}
}

Recv1.java

package work;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnect();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//保证一次只发一个channel.basicQos(1);System.out.println("recv1 running");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("Recv[1] msg is : " + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("Recv[1] done");/*关闭自动应答,手动处理*/channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}

Recv2.java

package work;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnect();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);System.out.println("recv2 running");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("Recv[2] msg is : " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("Recv[1] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}

源码下载地址:

https://github.com/fengfanchen/Java/tree/master/FairDispatch

Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)相关推荐

  1. Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)

    目录 基本概念 代码与实例 基本概念 实现的就是官方给出的这个模型: Topic exchange:将路由和某模式匹配 其中 #:匹配一个或多个 *:匹配一个 比如下面要举得这个例子 交换机设置为to ...

  2. Java笔记-使用RabbitMQ的Java接口实现Routing(路由模式)

    目录 基本概念 代码与实例 基本概念 过程图如下: 主要是把交换机设置为直连的方式direct直连的方式然后把 数据 发送给交换机. 交换机再通过路由的Key值转发到队列上. 每一个客户端,都有一个队 ...

  3. Java笔记-使用RabbitMQ的Java接口实现Publish/Subscribe(订阅模式)

    目录 基本概念 代码与实例 基本概念 模型如上: 1. 一个生产者,多个消费者: 2. 每个消费者都有自己的队列: 3. 生产者没有直接把消息发送到队列,而是发送到交换机,通过交换机转发到队列: 4. ...

  4. Java笔记-使用RabbitMQ的Java接口实现round-robin(轮询分发)

    目录 基本概念 代码与实例 基本概念 简单队列的不足: 1. 耦合性高: 2. 如果生产者把生产队列该了,消费者也要同时改: Work Queues工作队列,模型如下: 代码与实例 程序运行截图如下: ...

  5. Java笔记-使用RabbitMQ的Java接口生产数据并消费

    目录 基本概念 代码及演示 基本概念 就是官方的这个模型: p为生产者不经过交换机,直接把数据传给消息队列,c为consumer用于消费. 这种结构在本科生的时候,经常自己写,现在用RabbitMQ来 ...

  6. RabbitMQ笔记-使用rabbitmq-c实现Fair dispatch(公平分发)

    目录 概念及注意 代码与实例 概念及注意 这里C接口中有2个函数一个是: amqp_basic_get() 另外一个是: amqp_basic_consume() 前者可以一条一条的读取,后者是一次读 ...

  7. Java笔记整理五(Iterator接口,泛型,常见数据结构(栈,队列,数组,链表,红黑树,集合),jdk新特性,异常,多线程,Lambda表达式)

    Java笔记整理五 1.1Iterator接口 Collection接口与Map接口主要用于存储元素,而Iterator主要用于迭代访问(即遍历)Collection中的元素,因此Iterator对象 ...

  8. 9.1-全栈Java笔记: 容器泛型—认识Collection接口

    开发和学习中需要时刻和数据打交道,如果组织这些数据是我们编程中重要的内容. 我们一般通过"容器"来容纳和管理数据.   事实上,数组就是一种容器,可以在其中放置对象或基本类型数据. ...

  9. C/C++笔记-使用RabbitMQ的C接口生产数据并消费

    目录 基本概念 代码与实例 基本概念 使用C语言接口完成官方的这个模型: 很有意思.感觉开源的东西真的好,不用自己去写C/C++服务端. p为生产者不经过交换机,直接把数据传给消息队列,c为consu ...

最新文章

  1. android 应用程序框架
  2. Python input()
  3. java项目中怎么查看用的序列_如何在Java应用程序中使用序列化分类器对...
  4. Java NIO浅析
  5. tomcat7.027-webSocket应用程序构建01
  6. Taro+react开发(50) 小程序触底操作
  7. 非现场执法管理计算机(工业级),浅析非现场执法中存在问题的方法及对策
  8. 支持vxlan的服务器网卡,3台服务器互通vxlan
  9. 基于逻辑回归算法模型搭建思路
  10. Java-JUC(六):创建线程的4种方式
  11. Nginx日志和http模块相关变量
  12. 今天,强行打个广告!
  13. fluent-bit 本地安装及配置
  14. 百旺如何看是否清卡_百旺开票系统每月清卡怎么操作?
  15. qt添加蒙版代码(子窗口位置)
  16. DoDataExchange(CDataExchange* pDX)没有执行到原因
  17. VS2022 支持XP
  18. 考研视频有点难,以后继续早上锻炼
  19. Edit conflicts
  20. 【地球上最欢乐跑步活动】第四届草原马拉松彩跑节▪舞动青春 跑出色彩 千人篝火狂欢大型烟花表演

热门文章

  1. socket 编程原理1
  2. 轻松搞定 Nginx 配置的好工具!
  3. C 语言程序设计基础不好,想10天考国二C语言程序设计证书,可能吗?
  4. 听到表扬的飞鸽传书2011
  5. CADFANS2012网站源码
  6. 圳不完全启示录之初来乍到----2
  7. byte[]、sbyte[]、int[]以及Array的故事
  8. 一个简单的完成端口(服务端/客户端)类
  9. WDM驱动程序入门(1)-Hello WDM
  10. 程序员年纪越大,工作被取代性越强