在项目中我们如果使用MQ,就一定问好后台给的数据是同步或者是异步的情况,之前在项目中都遇到了这样的坑,后台也没说是同步还是异步,自己用异步的方式去请求,一直收到的数据都是自己发送的情况,

在用的时候先要导入

implementation 'com.rabbitmq:amqp-client:5.7.0'  

还需要我们在类中声明

/*** 处理handler发送的消息,然后进行操作(在主线程)*/
@SuppressLint("HandlerLeak")
private Handler incomingMessageHandler = new Handler() {@Overridepublic void handleMessage(Message msg) {msg.getData().getString("msg");}
};
/*** 关联AMPQ** @param routingKey*/
private void publishToAMPQ(final String routingKey) {publishThread = new Thread(new Runnable() {@Overridepublic void run() {while (true) {try {// 创建连接Connection connection = rbmqFactory.newConnection();// 创建通道Channel ch = connection.createChannel();ch.confirmSelect();while (true) {String message = queue.takeFirst();LogUtils.e("设备类型请求发送的数据.........." + message);try {// 发布消息ch.basicPublish(AppConstant.MQ_EXCHANGE_CAR, routingKey, null, message.getBytes());ch.waitForConfirmsOrDie();} catch (Exception e) {queue.putFirst(message);throw e;}}} catch (InterruptedException e) {break;} catch (Exception e) {LogUtils.e("TAG_Publish", "Connection broken: " + e.getClass().getName());try {Thread.sleep(5000); //sleep and then try again} catch (InterruptedException e1) {break;}}}}});publishThread.start();
}
/*** 创建消费者线程* 接收消息** @param*/
private void startSubscribe(final Handler incomingHandler) {subscribeThread = new Thread(new Runnable() {@Overridepublic void run() {try {// 需要再次初始化数据的时候就关闭上一个连接if (connectionCar != null) {connectionCar.close();}// 创建新的连接connectionCar = rbmqFactory.newConnection();// 创建通道Channel channel = connectionCar.createChannel();// 处理完一个消息,再接收下一个消息channel.basicQos(1);// 随机命名一个队列名称  baio_terminal_keepaliveString queueName = "baio_terminal_keepalive";// 声明交换机类型channel.exchangeDeclare(AppConstant.MQ_EXCHANGE_CAR, "topic", true);// 声明队列(持久的、非独占的、连接断开后队列会自动删除)AMQP.Queue.DeclareOk q = channel.queueDeclare(queueName, true, false, false, null);// 声明共享队列// 根据路由键将队列绑定到交换机上(需要知道交换机名称和路由键名称)channel.queueBind(q.getQueue(), AppConstant.MQ_EXCHANGE_CAR, AppConstant.MQ_ROUTINGKEY_CAR2);// 创建消费者获取rabbitMQ上的消息。每当获取到一条消息后,就会回调handleDelivery()方法,该方法可以获取到消息数据并进行相应处理Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {super.handleDelivery(consumerTag, envelope, properties, body);// 通过getBody方法获取消息中的数据String bodyStr = new String(body);LogUtils.e("设备类型请求返回的数据.........bodyStr:" + bodyStr);}};channel.basicConsume(q.getQueue(), true, consumer);} catch (Exception e) {e.printStackTrace();try {Thread.sleep(5000);} catch (InterruptedException e1) {e1.printStackTrace();}}}});subscribeThread.start();// 开启线程获取RabbitMQ推送消息
}

这样是我们的大多数用法,但是还有一部分是用到我们MQ的另外一种模式

MQ的RPC模式

就是这样的一个消费过程,RPC的代码量要比普通的MQ代码量要少,但是这种模式不是经常用到,在之前的一个项目中用到过这种模式,就给大家看一下代码的流程。

public class RpcClient {private Connection connection;private Channel channel;private String requestQueueName = "baio_recorded_broadcast_course";private String replyQueueName;/*** 设置连接** @throws IOException* @throws TimeoutException*/public RpcClient() throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost(AppConstant.MQ_HOST);//设置服务地址factory.setPort(AppConstant.MQ_PORT);//设置端口factory.setUsername(AppConstant.MQ_USERNAME);//用户名和密码factory.setPassword(AppConstant.MQ_PASSWORD);connection = factory.newConnection();//创建一个连接channel = connection.createChannel();//创建了一个连接的通道replyQueueName = channel.queueDeclare().getQueue();//声明回调队列LogUtils.e("RpcClient:" + replyQueueName);}/*** 请求返回** @param message* @return* @throws UnsupportedEncodingException* @throws IOException* @throws InterruptedException*/public String call(String message) throws UnsupportedEncodingException, IOException, InterruptedException {String corrId = UUID.randomUUID().toString();//此处为返回的核心AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId)//队列的唯一值认证.replyTo(replyQueueName)//回调队列.build();//发送消息
//        channel.basicPublish(AppConstant.MQ_EXCHANGE_CAR, requestQueueName, props, message.getBytes("utf8"));channel.basicPublish("", requestQueueName, props, message.getBytes("utf8"));//用于存放数据的阻塞队列final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);//接收返回数据channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {if (properties.getCorrelationId().equals(corrId)) {response.offer(new String(body));}}});return response.take();}/*** 关闭连接** @throws IOException*/public void close() throws IOException {if (connection != null) {this.connection.close();}}}

先定义一个工具类出来,然后在我们需要用到的地方再写入我们要发送的消息和消费的消息逻辑,

/*** 同步请求设备名称*/
private void publishAutoAttendClassMessage() {new Thread(new Runnable() {@Overridepublic void run() {DeviceReqInfo deviceNameBean = new DeviceReqInfo();deviceNameBean.setType("baio");deviceNameBean.setSubtype("query_device_name");deviceNameBean.setSn("BAIOA202003160231");String sendMessage = JsonUtil.parseObjectToJson(deviceNameBean);RpcClient rc = null;try {rc = new RpcClient();LogUtils.e("设备名称发送消息.........." + sendMessage);String response = rc.call(sendMessage);LogUtils.e("设备名称请求返回的数据:" + response);DeviceNameBackBean tFromJson = JsonUtil.getTFromJson(response, DeviceNameBackBean.class);int code = tFromJson.getCode();DeviceNameBackBean.DataBean data = tFromJson.getData();String name = data.getName();appSp.setFacility(name);} catch (Exception e) {e.printStackTrace();} finally {try {if (rc != null) {rc.close();}} catch (IOException e) {e.printStackTrace();}}}}).start();
}

这样就可以完成我们的功能实现了。

RabbitMQ的两种不同写法相关推荐

  1. python 字符编码的两种方式写法:# coding=utf-8和# -*- coding:utf-8 -*-

    python运行文件是总会出现乱码问题,为了解决这个问题,在文件开头加上: # coding=utf-8 或者 # -*- coding:utf-8  -*- # coding=<encodin ...

  2. mysql写什么不同_mysql - 编译配置PHP时,两种配置写法有什么不同

    在编译PHP时, --with-扩展库=DIR --enable-扩展库 这两种配置有什么不同 回复内容: 在编译PHP时, --with-扩展库=DIR --enable-扩展库 这两种配置有什么不 ...

  3. 关于两种指针写法: int* ptr 与 int *ptr的区别

    两种定义方法在使用的结果上没有区别,只是所表达的含义有所不同. int*  ptr 强调 int* 是一种类型----指向int的指针.符合C++编程的习惯. int   *ptr 强调的是 *ptr ...

  4. 关于vue3的两种API写法——选项API和组合API

    理解什么是选项API写法,什么是组合API写法 Options API 什么是选项API写法:Options API 在vue2.x项目中使用的就是选项API写法 代码风格:data选项写数据,met ...

  5. spark从hbase读数据到存入hbase数据两种版本写法

    spark2版本: object SparkCoreTest {def main(args: Array[String]): Unit = {// 使用sparksession来创建对象val spa ...

  6. iOS开发笔记-两种单例模式的写法

    iOS开发笔记-两种单例模式的写法 单例模式是开发中最常用的写法之一,iOS的单例模式有两种官方写法,如下: 不使用GCD #import "ServiceManager.h"st ...

  7. python基本判断语句_python两种简洁的条件判断语句写法

    了Python返回真假值(True or False)小技巧,本文探讨的是最简洁的条件判断语句写法,本文给出了两种简洁写法,需要的朋友可以参考下 如下一段代码:def isLen(strString) ...

  8. 函数指针--Nginx和Redis中两种回调函数写法

    1.Nginx和Redis中两种回调函数写法 #include <stdio.h>//仿Nginx风格 //结构外声明函数指针类型 typedef void (*ngx_connectio ...

  9. 两种写法的效果一样,那么到底哪一种更好呢?

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 有时候,我们在写一些循环逻辑的时候,并不是按执行次数等作 ...

  10. Model层的两种写法

    Model层的两种写法 第一种写法 namespace MyMVC.Models {public class Child{ //属性private int id;public int Id{get { ...

最新文章

  1. rxswift中hud_如何在RxSwift中运行测试
  2. STM32 进阶教程 8 - 位带操作
  3. CentOS下的freenx配置
  4. 关于Hyper-V备份的四大注意事项
  5. Web程序员的Mysql进阶序二之sql多条数据插入、多条数据更新、多表同时查询
  6. Angular开发准备
  7. oracle更新视图。
  8. 经验之谈:学习 Visual Studio Code 不会错!
  9. 【行为型】Strategy模式
  10. 11.2 正睿停课训练 Day15
  11. 数据挖掘应用实例分析
  12. 灵格斯与众多常用软件的冲突问题
  13. 小小知识点(十九)护眼色豆沙绿的设置
  14. IE8 SysFader:IEXPLORE.EXE应用程序错误解决办法
  15. win10设置pg/pc接口_win10安装postgresql
  16. python如何计算成绩平方根_python 使用二分法计算平方根
  17. 【解决】萤石云接入视频报错视频编码类型非H264
  18. 对文件进行筛选c语言,用c语言实现文本文件中的字符筛选分析。
  19. 108页6万字某小区施工组织设计方案
  20. java applet 在线demo_编写可在线收发E-mail的Java Applet

热门文章

  1. C++ limits头文件的用法(numeric_limits)
  2. linear-gradient实现Ps标尺
  3. python开发者是谁_Python 太蹩脚了?开发者总结了 8 大缘故
  4. 三相差分编码器转成脉冲信号或集电极开路转换模块
  5. 培训机构常见sql查询练习题目,你会做吗?
  6. 双非计算机硕士何去何从(2)
  7. Spring4 Spring MVC实战(一)——读《Spring in action》搭建最简单的MVC
  8. 单片机中的数据存储器ram
  9. Acrel-1200分布式光伏运维平台
  10. 智能运维|AIRIOT智慧光伏管理解决方案