消费者

import com.rabbitmq.client.*;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.shterm.test.ConnectionUtil;import java.io.IOException;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class MyConsumer {private final static String QUEUE_NAME = "topic_queue_1";private final static String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws InterruptedException {System.out.println("Consumer start.......");ExecutorService exe= Executors.newCachedThreadPool();for (int i=0;i<10;i++){Thread.sleep(2);exe.execute(()->work());}}public static void work(){try(Connection connection = ConnectionUtil.getConnection("127.0.0.1",5671,"/","test","test");){Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME,false,false,false,null);channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"update.#");channel.basicQos(1);Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();String response = "";try {response += "ok";} catch (RuntimeException e) {System.out.println(" [.] " + e.toString());} finally {// 返回处理结果队列channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));//  确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。channel.basicAck(envelope.getDeliveryTag(), false);// RabbitMq consumer worker thread notifies the RPC// server owner threadsynchronized (this) {this.notify();}}}};channel.basicConsume(QUEUE_NAME,consumer);while (true) {synchronized (consumer) {try {consumer.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}catch (Exception e){e.printStackTrace();}}}

生产者

import com.rabbitmq.client.*;
import com.shterm.test.ConnectionUtil;import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class Producer {private final static String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws InterruptedException {ExecutorService exe = Executors.newFixedThreadPool(5);System.out.println("Producer start.......");while(true){Thread.sleep(1000);for(int i=0;i<130;i++){exe.execute(() -> produce());}}}public static void produce() {try (Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5671, "/", "test", "test");) {String replyQueueName;Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");String corrId = UUID.randomUUID().toString();String message = "this is topic :" + corrId;replyQueueName = channel.queueDeclare().getQueue();synchronized (replyQueueName) {//发送请求消息,消息使用了两个属性:replyto和correlationId//服务端根据replyto返回结果,客户端根据correlationId判断响应是不是给自己的AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();channel.basicPublish(EXCHANGE_NAME, "update.Name", props, message.getBytes());final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);// String basicConsume(String queue, boolean autoAck, Consumer callback)channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {//检查它的correlationId是否是我们所要找的那个if (properties.getCorrelationId().equals(corrId)) {//如果是,则响应BlockingQueueresponse.offer(new String(body, "UTF-8"));}}});channel.close();}} catch (Exception e) {e.printStackTrace();}}}

mq使用replyto队列进行消息回复相关推荐

  1. ibm mq并发访问队列_消息队列之九问九答

    问题1 为什么要用消息队列呀? 答:如下图所示,外呼系统需要将外呼结果发送给业务系统,如果采用rpc的调用方式:则带来的后果, 首先,1.外呼系统与业务系统严重耦合,多个业务系统需要外呼系统传输数据, ...

  2. ibm mq qname java_IBMMQ 从队列获取消息并将消息发送到特定主题上面

    IBMMQ 从队列获取消息并将消息发送到特定主题上面 注:IBMMQ的CCSID:1381 package com; import java.io.EOFException; import java. ...

  3. SpringBoot整合RabbitMQ 消息可靠投递、手动ack、延迟队列、死信队列、消息幂等性保障、消息积压

    1.消息可靠投递 在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景.RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式. confirm 确认模式 ...

  4. posix自定义消息队列_消息队列开源框架,基于Io,节约服务器资源

    dophon框架中的消息队列模块,包括本地消息中心,远程消息中心,以及生产消费相关装饰器以及内部操作模块等 dophon-mq 项目介绍 dophon框架中的消息队列模块,包括本地消息中心,远程消息中 ...

  5. 阿里Java面试题剖析:为什么使用消息队列?消息队列有什么优点和缺点?

    面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区别,以及适合哪些场景? 面试官心理分析 其实面试官主要是想看看: ...

  6. 消息队列_消息队列:kafka

    概念 kafka是一个分布式的基于发布/订阅模式的消息队列,主要用于大数据实时处理领域. 要理解kafka首先要有分布式的概念,要有消息队列的概念.分布式系统最大的优势就是解耦和削峰,这种情况下,A系 ...

  7. php 消息队列_消息队列篇——windows本地搭建RabbitMQ Server

    前言: 最近的PHP项目中有使用AMQP,解耦一些业务性的功能模块.因为工作使用的是线上Linux搭建,为了方便测试所以我决定本地搭建一个MQ服务. RabbitMQ简介: MQ全称为Message ...

  8. rabbitmq 不同的消费者消费同一个队列_消息队列王者--rabbitMQ深入理解--工作过程、消费模式、持久化等...

    概述 之前已经对rabbitMQ的一些基本概念做了介绍和不同MQ之间的比较,今天主要对rabbitMQ的一些方面做扩展. 01 消息队列 Broker:简单来说就是消息队列服务器实体. Exchang ...

  9. 消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点?

    消息队列面试 - 为什么使用消息队列,消息队列有什么优点和缺点? 面试题 为什么使用消息队列? 消息队列有什么优点和缺点? Kafka.ActiveMQ.RabbitMQ.RocketMQ 都有什么区 ...

  10. rabbitmq队列中消息过期配置

    最近公司某个行情推送的rabbitmq服务器由于客户端异常导致rabbitmq队列中消息快速堆积,还曾导致过内存积压导致rabbitmq客户端被block的情况.考虑到行情信息从业务上来说可以丢失部分 ...

最新文章

  1. Visual Studio 2008 断点调试直接跳出代码窗口
  2. 7-6 红豆生南国 (25 分)
  3. #if、#if defined 的使用
  4. NHibernate视频教程
  5. java服务端验证框架_SpringBoot服务端数据校验过程详解
  6. 人力资源经理的选择(转载)
  7. Julia: save 与 @save
  8. Tableau 学习 区分软件功能
  9. element拼音模糊搜索
  10. Linux 配置双网卡,同时访问内外网
  11. 图像彩色化方法(基于颜色传递、颜色扩展)
  12. c语言void delay是什么意思,delay是什么意思(单片机中delay)
  13. python并行编程 - 线程篇
  14. 启动jupyter notebook 报错:ImportError:DLL load failed,找不到指定模块的解决办法
  15. Java里的char类型能不能存储一个中文字符?
  16. python将中文数字转化成阿拉伯数字
  17. 关于720p和1080p观看距离和效果
  18. 应用在触摸电视机中的触摸芯片
  19. 【更新】【Windows Server 2019】存储服务器的配置和管理——iSCSI的安装和配置(上)
  20. PHPExcel设置列宽行高及插入URL

热门文章

  1. 动画:面试官问我 JS「变量提升」我头皮发麻,最后把这篇动画甩给了他
  2. string.h 详解
  3. LQ0195 史丰收速算【程序填空】
  4. leetcode 森林中的兔子
  5. mysql 1006_MySQL: 1006 - Can't create database '***' (errno: 13) 错误 解决方法
  6. 公司内部分享【富有成效的每日站会】总结
  7. 安全性测试(一)--网页安全检查
  8. php cpu飙高,PHP-FPM进程CPU 飙高的原因及解决方案
  9. 记:《洛克菲勒留给儿子的38封信》-- 8
  10. 冲刺大厂每日算法面试题,动态规划21天——第十二天