Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)
目录
基本概念
代码与实例
基本概念
当某些客户端处理比较强的时候,就多发数据让其处理,当某些客户端处理一般的时候,就少发数据让其处理。
主要是让消费者处理完后,回信息给RabbitMQ,然后RabbitMQ才会发送下一个。
使用basicQos(perfetch = 1)
注意:使用公平分发必须关闭自动应答ack改成手动。
代码与实例
程序运行截图如下:
生产者:
两个消费者,其中一个的效率是另外一个的2倍(X2的效率)
源码如下:
Send.java
package work;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Send {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {Connection connection = ConnectionUtils.getConnect();Channel channel = connection.createChannel();//声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);//每个消费者发送确认消息之前,消息队列不发送下一个消息到消费者,一次只处理一个//限制发送给一个消费者不超过一条int prefetchCount = 1;channel.basicQos(prefetchCount);for(int i = 0; i < 50; i++){String msg = "Hello World : " + i;System.out.println("send msg : " + msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());Thread.sleep(i * 20);}channel.close();connection.close();}
}
Recv1.java
package work;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv1 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnect();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);//保证一次只发一个channel.basicQos(1);System.out.println("recv1 running");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("Recv[1] msg is : " + msg);try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("Recv[1] done");/*关闭自动应答,手动处理*/channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}
Recv2.java
package work;import com.rabbitmq.client.*;
import util.ConnectionUtils;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Recv2 {private static final String QUEUE_NAME = "test_work_queue";public static void main(String[] args) throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnect();final Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.basicQos(1);System.out.println("recv2 running");Consumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String msg = new String(body, "utf-8");System.out.println("Recv[2] msg is : " + msg);try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("Recv[1] done");channel.basicAck(envelope.getDeliveryTag(), false);}}};channel.basicConsume(QUEUE_NAME, false, consumer);}
}
源码下载地址:
https://github.com/fengfanchen/Java/tree/master/FairDispatch
Java笔记-使用RabbitMQ的Java接口实现Fair dispatch(公平分发)相关推荐
- Java笔记-使用RabbitMQ的Java接口实现topic(主题模式)
目录 基本概念 代码与实例 基本概念 实现的就是官方给出的这个模型: Topic exchange:将路由和某模式匹配 其中 #:匹配一个或多个 *:匹配一个 比如下面要举得这个例子 交换机设置为to ...
- Java笔记-使用RabbitMQ的Java接口实现Routing(路由模式)
目录 基本概念 代码与实例 基本概念 过程图如下: 主要是把交换机设置为直连的方式direct直连的方式然后把 数据 发送给交换机. 交换机再通过路由的Key值转发到队列上. 每一个客户端,都有一个队 ...
- Java笔记-使用RabbitMQ的Java接口实现Publish/Subscribe(订阅模式)
目录 基本概念 代码与实例 基本概念 模型如上: 1. 一个生产者,多个消费者: 2. 每个消费者都有自己的队列: 3. 生产者没有直接把消息发送到队列,而是发送到交换机,通过交换机转发到队列: 4. ...
- Java笔记-使用RabbitMQ的Java接口实现round-robin(轮询分发)
目录 基本概念 代码与实例 基本概念 简单队列的不足: 1. 耦合性高: 2. 如果生产者把生产队列该了,消费者也要同时改: Work Queues工作队列,模型如下: 代码与实例 程序运行截图如下: ...
- Java笔记-使用RabbitMQ的Java接口生产数据并消费
目录 基本概念 代码及演示 基本概念 就是官方的这个模型: p为生产者不经过交换机,直接把数据传给消息队列,c为consumer用于消费. 这种结构在本科生的时候,经常自己写,现在用RabbitMQ来 ...
- RabbitMQ笔记-使用rabbitmq-c实现Fair dispatch(公平分发)
目录 概念及注意 代码与实例 概念及注意 这里C接口中有2个函数一个是: amqp_basic_get() 另外一个是: amqp_basic_consume() 前者可以一条一条的读取,后者是一次读 ...
- Java笔记整理五(Iterator接口,泛型,常见数据结构(栈,队列,数组,链表,红黑树,集合),jdk新特性,异常,多线程,Lambda表达式)
Java笔记整理五 1.1Iterator接口 Collection接口与Map接口主要用于存储元素,而Iterator主要用于迭代访问(即遍历)Collection中的元素,因此Iterator对象 ...
- 9.1-全栈Java笔记: 容器泛型—认识Collection接口
开发和学习中需要时刻和数据打交道,如果组织这些数据是我们编程中重要的内容. 我们一般通过"容器"来容纳和管理数据. 事实上,数组就是一种容器,可以在其中放置对象或基本类型数据. ...
- C/C++笔记-使用RabbitMQ的C接口生产数据并消费
目录 基本概念 代码与实例 基本概念 使用C语言接口完成官方的这个模型: 很有意思.感觉开源的东西真的好,不用自己去写C/C++服务端. p为生产者不经过交换机,直接把数据传给消息队列,c为consu ...
最新文章
- android 应用程序框架
- Python input()
- java项目中怎么查看用的序列_如何在Java应用程序中使用序列化分类器对...
- Java NIO浅析
- tomcat7.027-webSocket应用程序构建01
- Taro+react开发(50) 小程序触底操作
- 非现场执法管理计算机(工业级),浅析非现场执法中存在问题的方法及对策
- 支持vxlan的服务器网卡,3台服务器互通vxlan
- 基于逻辑回归算法模型搭建思路
- Java-JUC(六):创建线程的4种方式
- Nginx日志和http模块相关变量
- 今天,强行打个广告!
- fluent-bit 本地安装及配置
- 百旺如何看是否清卡_百旺开票系统每月清卡怎么操作?
- qt添加蒙版代码(子窗口位置)
- DoDataExchange(CDataExchange* pDX)没有执行到原因
- VS2022 支持XP
- 考研视频有点难,以后继续早上锻炼
- Edit conflicts
- 【地球上最欢乐跑步活动】第四届草原马拉松彩跑节▪舞动青春 跑出色彩 千人篝火狂欢大型烟花表演