首先介绍一下ActiveMQ的版本:apache-activemq-5.10.2

启动MQ:activemq.bat

下面来编写MQ的发送类:

里面的发送ip和模式名称可以根据具体的实际情况填写。

SendMessageByMq.java

  1 public class SendMessageByMq {
  2
  3     public static void main(String[] args) {
  4         String url = "";
  5         // String url = "D:/mqfile/84.zip";
  6         File file = new File(url);// 发送的文件
  7         System.out.println("file=======" + file);
  8         String sendType = "2";// 发送的类型 1发布一对一 ;2订阅一对多
  9         String isNotFile = "false";// 是否有附件true有 false没有
 10         String ip = ContentUtils.MQ_SEND_IP;// 发送ip
 11         String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名称
 12         String json = "[{\"name\":\"40013386.jpg\",\"url\":\"http://h.hiphotos.baidu.com/baik23e5.jpg\"}]";// 要发送的json数据
 13         // 发送方法
 14         String result = send(sendType, ip, modeName, json, file);
 15
 16         if (result.equals("success")) {
 17             try {
 18                 System.out.println("开始接收1");
 19                 // 接收方法
 20                 ReceiveMessageByMq.receive(sendType, ip, isNotFile, modeName);
 21             } catch (JMSException e) {
 22                 e.printStackTrace();
 23             }
 24         }
 25     }
 26
 27     /**
 28      *
 29      *
 30      * Title String Description
 31      *
 32      * @author jacun
 33      * @date 2017-4-11上午11:44:17
 34      * @param sendType
 35      *            发送类型 1发布一对一 ;2订阅一对多
 36      * @param ipport
 37      *            发送ip和端口
 38      * @param modeName
 39      *            模式名称
 40      * @param jsonData
 41      *            要发送的json数据
 42      * @param file
 43      *            发送的文件
 44      * @return
 45      */
 46     public static String send(String sendType, String ip, String modeName,
 47             String jsonData, File file) {
 48         String str = null;
 49         System.out.println("开始发送1");
 50         try {
 51             // 获取 ConnectionFactory,ConnectionFactory:连接工厂,JMS用它创建连接
 52             ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
 53                     "tcp://"
 54                             + ip
 55                             + ":61616?jms.blobTransferPolicy.defaultUploadUrl=http://"
 56                             + ip + ":8161/fileserver/");
 57             // 创建 Connection,Connection:JMS客户端到JMS Provider的连接
 58             Connection connection = connectionFactory.createConnection();
 59             connection.start();
 60             // 创建 Session,Session:一个发送或接收消息的线程
 61             ActiveMQSession session = (ActiveMQSession) connection
 62                     .createSession(false, Session.AUTO_ACKNOWLEDGE);
 63             // 创建 Destination,Destination:消息的目的地;消息发送给谁.
 64             // Destination destination = null;
 65             // 判断是点对点1还是发布订阅2
 66             if ("2".equals(sendType)) {
 67                 System.out.println("一对多发布2");
 68                 createTopic(session, modeName, jsonData, file);
 69             } else {
 70                 System.out.println("一对一发布2");
 71                 // 点对点发布
 72                 createQueue(session, modeName, jsonData, file);
 73             }
 74
 75             session.close();
 76             // 不关闭 Connection, 程序则不退出
 77             connection.close();
 78             // 发送完成删除文件
 79             // if (file != null) {
 80             // if (file.exists()) {
 81             // file.delete();
 82             // }
 83             // }
 84             str = "success";
 85             return str;
 86         } catch (JMSException e) {
 87             e.printStackTrace();
 88             str = "fail";
 89             return str;
 90         }
 91     }
 92
 93     private static void createQueue(ActiveMQSession session, String modeName,
 94             String jsonData, File file) {
 95         try {
 96             Destination destination = session.createQueue(modeName);
 97             // 创建 Producer,MessageProducer:消息发送者
 98             MessageProducer producer = session.createProducer(destination);
 99             // 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件
100             producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置为持久性
101             if (file.length() > 0) {
102                 System.out.println("一对一上传文件3");
103                 // 构造 blobMessage,用来传输文件
104                 isFileTransfer(producer, session, file, jsonData);
105             } else {
106                 System.out.println("一对一无文件3");
107                 notFileTransfer(producer, session, jsonData);
108             }
109
110         } catch (JMSException e) {
111             e.printStackTrace();
112         }
113
114     }
115
116     // 点对点无文件发送
117     private static void notFileTransfer(MessageProducer producer,
118             ActiveMQSession session, String jsonData) {
119         try {
120             TextMessage message = session.createTextMessage();
121             message.setStringProperty("sendType", "1");
122             message.setStringProperty("jsonData", jsonData);
123             message.setStringProperty("isNotFile", "false");
124             // 设置该消息的超时时间(有效期)
125             producer.setTimeToLive(60000);
126             // 发送
127             producer.send(message);
128             producer.close();
129             System.out.println("发送成功无文件4");
130         } catch (JMSException e) {
131             e.printStackTrace();
132         }
133
134     }
135
136     // 点对点有文件发送
137     private static void isFileTransfer(MessageProducer producer,
138             ActiveMQSession session, File file, String jsonData) {
139         try {
140             BlobMessage blobMessage = session.createBlobMessage(file);
141             blobMessage.setStringProperty("sendType", "1");
142             blobMessage.setStringProperty("jsonData", jsonData);
143             blobMessage.setStringProperty("isNotFile", "true");
144             // 设置该消息的超时时间(有效期)
145             producer.setTimeToLive(60000);
146             // 发送
147             producer.send(blobMessage);
148             producer.close();
149             System.out.println("发送成功有文件4");
150         } catch (JMSException e) {
151             e.printStackTrace();
152         }
153
154     }
155
156     private static void createTopic(ActiveMQSession session, String modeName,
157             String jsonData, File file) {
158         try {
159             Topic topic = session.createTopic(modeName);
160             MessageProducer producer = session.createProducer(topic);
161             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
162             if (file.length() > 0) {
163                 System.out.println("一对多上传文件3");
164                 // 构造 blobMessage,用来传输文件
165                 isFileTransfer(producer, session, file, jsonData);
166             } else {
167                 System.out.println("一对多无文件3");
168                 notFileTransfer(producer, session, jsonData);
169             }
170         } catch (JMSException e) {
171             e.printStackTrace();
172         }
173     }
174 }

ActiveMQ的接收类:

里面的发送ip和模式名称可以根据具体的实际情况填写。

ReceiveMessageByMq.java

  1 public class ReceiveMessageByMq {
  2
  3     public static void main(String[] args) {
  4
  7         String receiveType = "1";// 接收的类型 1发布一对一 ;2订阅一对多
  8         String isNotFile = "true";// 是否有附件
  9         String ip = ContentUtils.MQ_RECEIVE_IP;// 接收ip
 10         String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名称
 11         try {
 12             receive(receiveType, ip, isNotFile, modeName);
 13         } catch (JMSException e) {
 14             e.printStackTrace();
 15         }
 16     }
 17
 18     /**
 19      *
 20      *
 21      * Title void Description
 22      *
 23      * @author jacun
 24      * @param modeName
 25      * @param ip
 26      * @param receiveType
 27      * @date 2017-4-11上午10:43:10
 28      * @throws JMSException
 29      */
 30     public static void receive(String receiveType, String ip, String isNotFile,
 31             String modeName) throws JMSException {
 32         System.out.println("开始接收2");
 33         // 获取 ConnectionFactory
 34         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
 35                 "tcp://" + ip + ":61616");
 36         // 创建 Connection
 37         Connection connection = connectionFactory.createConnection();
 38         connection.start();
 39         // 创建 Session
 40         Session session = connection.createSession(false,
 41                 Session.AUTO_ACKNOWLEDGE);
 42         // 创建 Destinatione
 43         // 判断是一对一还是一对多
 44         if ("2".equals(receiveType)) {
 45             // 一对多
 46             System.out.println("一对多接收数据3");
 47             receiveTopic(session, isNotFile, modeName);
 48         } else {
 49             // 一对一
 50             System.out.println("一对一接收数据3");
 51             receiveQueue(session, isNotFile, modeName);
 52         }
 53
 54     }
 55
 56     private static void receiveTopic(Session session, String isNotFile,
 57             String modeName) {
 58         try {
 59             final String isFile = isNotFile;
 60             Destination destination = session.createTopic(modeName);
 61             // 创建 Consumer
 62             MessageConsumer consumer = session.createConsumer(destination);
 63             // 注册消息监听器,当消息到达时被触发并处理消息
 64             consumer.setMessageListener(new MessageListener() {
 65                 // 监听器中处理消息
 66                 public void onMessage(Message message) {
 67                     if ("true".equals(isFile)) {
 68                         System.out.println("有文件接收数据4");
 69                         ReceiveMessageByMq.receiveFile(message);
 70                     } else {
 71                         System.out.println("无文件接收数据4");
 72                         ReceiveMessageByMq.receiveData(message);
 73
 74                     }
 75
 76                 }
 77
 78             });
 79         } catch (JMSException e) {
 80             e.printStackTrace();
 81         }
 82
 83     }
 84
 85     private static void receiveQueue(Session session, String isNotFile,
 86             String modeName) {
 87         try {
 88             final String isFile = isNotFile;
 89             Destination destination = session.createQueue(modeName);
 90             // 创建 Consumer
 91             MessageConsumer consumer = session.createConsumer(destination);
 92             // 注册消息监听器,当消息到达时被触发并处理消息
 93             consumer.setMessageListener(new MessageListener() {
 94                 // 监听器中处理消息
 95
 96                 public void onMessage(Message message) {
 97                     if ("true".equals(isFile)) {
 98                         System.out.println("有文件接收数据4");
 99                         ReceiveMessageByMq.receiveFile(message);
100                     } else {
101                         System.out.println("无文件接收数据4");
102                         ReceiveMessageByMq.receiveData(message);
103
104                     }
105
106                 }
107
108             });
109         } catch (JMSException e) {
110             e.printStackTrace();
111         }
112
113     }
114
115     protected static void receiveData(Message message) {
116         String sendType = null;
117         String jsonData = null;
118         try {
119             TextMessage msg = (TextMessage) message;
120             sendType = msg.getStringProperty("sendType");
121             jsonData = msg.getStringProperty("jsonData");
122         } catch (JMSException e) {
123             e.printStackTrace();
124         }
125         System.out.println("无文件接收成功5");
126         System.out.println(sendType);
127         System.out.println(jsonData);
128     }
129
130     private static void receiveFile(Message message) {
131         String sendType = null;
132         String jsonData = null;
133         if (message instanceof BlobMessage) {
134             BlobMessage blobMessage = (BlobMessage) message;
135             try {
136                 String path = CreateZipFile.createZip("test");
137                 JFileChooser fileChooser = new JFileChooser();
138                 fileChooser.setDialogTitle("请指定文件保存位置");
139                 fileChooser.setSelectedFile(new File(path));
140                 File file = fileChooser.getSelectedFile();
141                 OutputStream os = new FileOutputStream(file);
142                 InputStream inputStream = blobMessage.getInputStream();
143                 // 写文件,你也可以使用其他方式
144                 byte[] buff = new byte[256];
145                 int len = 0;
146                 while ((len = inputStream.read(buff)) > 0) {
147                     os.write(buff, 0, len);
148                 }
149                 os.close();
150                 System.out.println("有文件接收成功5");
151                 sendType = blobMessage.getStringProperty("sendType");
152                 jsonData = blobMessage.getStringProperty("jsonData");
153                 System.out.println(sendType);
154                 System.out.println(jsonData);
155
156             } catch (Exception e) {
157                 e.printStackTrace();
158             }
159
160         }
161
162     }
163 }

亲测好使,这两个工具类包含了发送和接收的方法,而且可以点对点或者发布订阅、有无附件均可,对了还有一点,ActiveMQ需要的jar包,网上信息很多!

补充:补充一个mq接收端自动连接到mq服务器的方法:

那就是将连接方式换一下就可以了:

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin","failover:("tcp://" + ip + ":61616")?initialReconnectDelay=1000&maxReconnectDelay=30000");

转载于:https://www.cnblogs.com/zjiacun/p/6697721.html

ActiveMQ的消息的(含附件)发送和接收使用相关推荐

  1. javasocket连续给服务器发送消息,Java通过Socket发送和接收多条消息

    我们需要实现一个Socket客户端,它应该连接到一个接受TCP连接的服务器.如果我通过netcap与服务器进行通信,我会立即得到它的响应(通过命令行).Java通过Socket发送和接收多条消息 的工 ...

  2. IPC 共享内存和 消息队列(发送、接收、移除)以及键值的生成

    一.消息对列 消息队列,是消息的链接表,存放在内核中.一个消息队列由一个标识符(即队列ID)来标识. 特点: 消息队列是面向记录的,其中的消息具有特定的格式以及特定的优先级.消息队列独立于发送与接收进 ...

  3. rocket mq 监听端口_如何使用Jmeter实现MQ数据的发送和接收?性能测试实战篇

    JMeter是性能测试中被普遍使用的一种工具,常用于压力测试.该工具具有丰富的扩展插件用以满足不同情况下性能测试的需求.消息队列(Message Queue)简称为MQ,作为目前的主流中间件,在很多软 ...

  4. mfc 开启指定服务器,用MFC实现消息的发送和接收(含服务器)

    <用MFC实现消息的发送和接收(含服务器)>由会员分享,可在线阅读,更多相关<用MFC实现消息的发送和接收(含服务器)(33页珍藏版)>请在人人文库网上搜索. 1.精品好资料学 ...

  5. ActiveMQ 实现消息接收发送

    一.接收者 package com.demo.initservice;import javax.jms.Connection; import javax.jms.ConnectionFactory; ...

  6. ActiveMQ 发送和接收消息

    一.添加 jar 包 <dependency><groupId>org.apache.activemq</groupId><artifactId>act ...

  7. JAVA ActiveMQ消息发送和接收

    JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. ...

  8. java activeMQ消息的发送与接收

    java activeMQ消息的发送与接收 activemq是我们经常用到的消息队列之一,比如说速度快,对spring的很好的支持,支持多种协议等等,今天我们就来看一下activeMQ消息的发送与接收 ...

  9. C# .Net 邮箱发送含附件方法

    获取源码方式: 第一种:打开微信,搜一搜"别打我女儿的主意"打开微信小程序,找到菜单栏,点击源码,进去就可以获得链接 第二种:可以给本文点赞.好评,然后发邮件到792166417@ ...

最新文章

  1. 分布式任务调度平台 XXL-JOB
  2. 简述css样式的三种引入html的方式,css-1,css的三种引入方式 基本选择器
  3. [置顶] 状态压缩DP 简单入门题 11题
  4. php调用谷歌翻译,PHP调用谷歌翻译 | 学步园
  5. Yapi 部署及遇到的坑
  6. 【ZeroClipboard is not defined】的解决方法
  7. 半波对称振子方向图_画好服装款式图的五个要点
  8. [转载] numpy教程:逻辑函数Logic functions
  9. informix设置数据库默认插入时间_informix常用时间运算和操作方式
  10. 集合框架-ArrayList,Vector,Linkedlist
  11. 学习Dart语言,看这一篇文章就够了!(详细介绍)
  12. html个人所得税计算器,上海个人所得税计算器_个税计算器_税后工资计算器
  13. Springboot Swagger2 Unable to infer base url问题解决
  14. 武汉大学计算机学院樊浩南,今年高考光荣榜?谁能告之??谢谢!!
  15. 各大浏览器清除缓存的快捷键
  16. Blender 3.5 面的操作(二)
  17. 技术-正则表达式 处理读书器的章、节、回问题
  18. 虚拟现实x3d简明教程1
  19. 笔记本插入网线没反应,WiFi又可以使用
  20. JavaScript闭包函数理解

热门文章

  1. mariadb mysql 重建_(MariaDB/MySQL)之DML(2):数据更新、删除
  2. anaconda python_Anaconda下Python环境下载及安装
  3. 《密码爆破漏洞详解》——黑客必修的入门操作( 建议收藏 )
  4. 由浅入深学习Apache httpd原理与配置
  5. 【转】Intellij IDEA 14中使用MyBatis-generator 自动生成MyBatis代码
  6. 管理对象空间——管理存储参数
  7. 爱情六十三课,定个开放日
  8. PAT1018. 锤子剪刀布
  9. 基于实数编码的参数自适应遗传算法(matlab代码)
  10. vue将经纬度转换成地理名称_新武汉北,红安有了一个新的地理名称,恒大项目将对标上海迪士尼...