ActiveMQ的消息的(含附件)发送和接收使用
首先介绍一下ActiveMQ的版本:apache-activemq-5.10.2
启动MQ:activemq.bat
下面来编写MQ的发送类:
里面的发送ip和模式名称可以根据具体的实际情况填写。
SendMessageByMq.java
1 public class SendMessageByMq { 2 3 public static void main(String[] args) { 4 String url = ""; 5 // String url = "D:/mqfile/84.zip"; 6 File file = new File(url);// 发送的文件 7 System.out.println("file=======" + file); 8 String sendType = "2";// 发送的类型 1发布一对一 ;2订阅一对多 9 String isNotFile = "false";// 是否有附件true有 false没有 10 String ip = ContentUtils.MQ_SEND_IP;// 发送ip 11 String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名称 12 String json = "[{\"name\":\"40013386.jpg\",\"url\":\"http://h.hiphotos.baidu.com/baik23e5.jpg\"}]";// 要发送的json数据 13 // 发送方法 14 String result = send(sendType, ip, modeName, json, file); 15 16 if (result.equals("success")) { 17 try { 18 System.out.println("开始接收1"); 19 // 接收方法 20 ReceiveMessageByMq.receive(sendType, ip, isNotFile, modeName); 21 } catch (JMSException e) { 22 e.printStackTrace(); 23 } 24 } 25 } 26 27 /** 28 * 29 * 30 * Title String Description 31 * 32 * @author jacun 33 * @date 2017-4-11上午11:44:17 34 * @param sendType 35 * 发送类型 1发布一对一 ;2订阅一对多 36 * @param ipport 37 * 发送ip和端口 38 * @param modeName 39 * 模式名称 40 * @param jsonData 41 * 要发送的json数据 42 * @param file 43 * 发送的文件 44 * @return 45 */ 46 public static String send(String sendType, String ip, String modeName, 47 String jsonData, File file) { 48 String str = null; 49 System.out.println("开始发送1"); 50 try { 51 // 获取 ConnectionFactory,ConnectionFactory:连接工厂,JMS用它创建连接 52 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 53 "tcp://" 54 + ip 55 + ":61616?jms.blobTransferPolicy.defaultUploadUrl=http://" 56 + ip + ":8161/fileserver/"); 57 // 创建 Connection,Connection:JMS客户端到JMS Provider的连接 58 Connection connection = connectionFactory.createConnection(); 59 connection.start(); 60 // 创建 Session,Session:一个发送或接收消息的线程 61 ActiveMQSession session = (ActiveMQSession) connection 62 .createSession(false, Session.AUTO_ACKNOWLEDGE); 63 // 创建 Destination,Destination:消息的目的地;消息发送给谁. 64 // Destination destination = null; 65 // 判断是点对点1还是发布订阅2 66 if ("2".equals(sendType)) { 67 System.out.println("一对多发布2"); 68 createTopic(session, modeName, jsonData, file); 69 } else { 70 System.out.println("一对一发布2"); 71 // 点对点发布 72 createQueue(session, modeName, jsonData, file); 73 } 74 75 session.close(); 76 // 不关闭 Connection, 程序则不退出 77 connection.close(); 78 // 发送完成删除文件 79 // if (file != null) { 80 // if (file.exists()) { 81 // file.delete(); 82 // } 83 // } 84 str = "success"; 85 return str; 86 } catch (JMSException e) { 87 e.printStackTrace(); 88 str = "fail"; 89 return str; 90 } 91 } 92 93 private static void createQueue(ActiveMQSession session, String modeName, 94 String jsonData, File file) { 95 try { 96 Destination destination = session.createQueue(modeName); 97 // 创建 Producer,MessageProducer:消息发送者 98 MessageProducer producer = session.createProducer(destination); 99 // 设置持久性的话,文件也可以先缓存下来,接收端离线再连接也可以收到文件 100 producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 设置为持久性 101 if (file.length() > 0) { 102 System.out.println("一对一上传文件3"); 103 // 构造 blobMessage,用来传输文件 104 isFileTransfer(producer, session, file, jsonData); 105 } else { 106 System.out.println("一对一无文件3"); 107 notFileTransfer(producer, session, jsonData); 108 } 109 110 } catch (JMSException e) { 111 e.printStackTrace(); 112 } 113 114 } 115 116 // 点对点无文件发送 117 private static void notFileTransfer(MessageProducer producer, 118 ActiveMQSession session, String jsonData) { 119 try { 120 TextMessage message = session.createTextMessage(); 121 message.setStringProperty("sendType", "1"); 122 message.setStringProperty("jsonData", jsonData); 123 message.setStringProperty("isNotFile", "false"); 124 // 设置该消息的超时时间(有效期) 125 producer.setTimeToLive(60000); 126 // 发送 127 producer.send(message); 128 producer.close(); 129 System.out.println("发送成功无文件4"); 130 } catch (JMSException e) { 131 e.printStackTrace(); 132 } 133 134 } 135 136 // 点对点有文件发送 137 private static void isFileTransfer(MessageProducer producer, 138 ActiveMQSession session, File file, String jsonData) { 139 try { 140 BlobMessage blobMessage = session.createBlobMessage(file); 141 blobMessage.setStringProperty("sendType", "1"); 142 blobMessage.setStringProperty("jsonData", jsonData); 143 blobMessage.setStringProperty("isNotFile", "true"); 144 // 设置该消息的超时时间(有效期) 145 producer.setTimeToLive(60000); 146 // 发送 147 producer.send(blobMessage); 148 producer.close(); 149 System.out.println("发送成功有文件4"); 150 } catch (JMSException e) { 151 e.printStackTrace(); 152 } 153 154 } 155 156 private static void createTopic(ActiveMQSession session, String modeName, 157 String jsonData, File file) { 158 try { 159 Topic topic = session.createTopic(modeName); 160 MessageProducer producer = session.createProducer(topic); 161 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 162 if (file.length() > 0) { 163 System.out.println("一对多上传文件3"); 164 // 构造 blobMessage,用来传输文件 165 isFileTransfer(producer, session, file, jsonData); 166 } else { 167 System.out.println("一对多无文件3"); 168 notFileTransfer(producer, session, jsonData); 169 } 170 } catch (JMSException e) { 171 e.printStackTrace(); 172 } 173 } 174 }
ActiveMQ的接收类:
里面的发送ip和模式名称可以根据具体的实际情况填写。
ReceiveMessageByMq.java
1 public class ReceiveMessageByMq { 2 3 public static void main(String[] args) { 4 7 String receiveType = "1";// 接收的类型 1发布一对一 ;2订阅一对多 8 String isNotFile = "true";// 是否有附件 9 String ip = ContentUtils.MQ_RECEIVE_IP;// 接收ip 10 String modeName = ContentUtils.MQ_POINT_QUEUENAME;// 模式名称 11 try { 12 receive(receiveType, ip, isNotFile, modeName); 13 } catch (JMSException e) { 14 e.printStackTrace(); 15 } 16 } 17 18 /** 19 * 20 * 21 * Title void Description 22 * 23 * @author jacun 24 * @param modeName 25 * @param ip 26 * @param receiveType 27 * @date 2017-4-11上午10:43:10 28 * @throws JMSException 29 */ 30 public static void receive(String receiveType, String ip, String isNotFile, 31 String modeName) throws JMSException { 32 System.out.println("开始接收2"); 33 // 获取 ConnectionFactory 34 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( 35 "tcp://" + ip + ":61616"); 36 // 创建 Connection 37 Connection connection = connectionFactory.createConnection(); 38 connection.start(); 39 // 创建 Session 40 Session session = connection.createSession(false, 41 Session.AUTO_ACKNOWLEDGE); 42 // 创建 Destinatione 43 // 判断是一对一还是一对多 44 if ("2".equals(receiveType)) { 45 // 一对多 46 System.out.println("一对多接收数据3"); 47 receiveTopic(session, isNotFile, modeName); 48 } else { 49 // 一对一 50 System.out.println("一对一接收数据3"); 51 receiveQueue(session, isNotFile, modeName); 52 } 53 54 } 55 56 private static void receiveTopic(Session session, String isNotFile, 57 String modeName) { 58 try { 59 final String isFile = isNotFile; 60 Destination destination = session.createTopic(modeName); 61 // 创建 Consumer 62 MessageConsumer consumer = session.createConsumer(destination); 63 // 注册消息监听器,当消息到达时被触发并处理消息 64 consumer.setMessageListener(new MessageListener() { 65 // 监听器中处理消息 66 public void onMessage(Message message) { 67 if ("true".equals(isFile)) { 68 System.out.println("有文件接收数据4"); 69 ReceiveMessageByMq.receiveFile(message); 70 } else { 71 System.out.println("无文件接收数据4"); 72 ReceiveMessageByMq.receiveData(message); 73 74 } 75 76 } 77 78 }); 79 } catch (JMSException e) { 80 e.printStackTrace(); 81 } 82 83 } 84 85 private static void receiveQueue(Session session, String isNotFile, 86 String modeName) { 87 try { 88 final String isFile = isNotFile; 89 Destination destination = session.createQueue(modeName); 90 // 创建 Consumer 91 MessageConsumer consumer = session.createConsumer(destination); 92 // 注册消息监听器,当消息到达时被触发并处理消息 93 consumer.setMessageListener(new MessageListener() { 94 // 监听器中处理消息 95 96 public void onMessage(Message message) { 97 if ("true".equals(isFile)) { 98 System.out.println("有文件接收数据4"); 99 ReceiveMessageByMq.receiveFile(message); 100 } else { 101 System.out.println("无文件接收数据4"); 102 ReceiveMessageByMq.receiveData(message); 103 104 } 105 106 } 107 108 }); 109 } catch (JMSException e) { 110 e.printStackTrace(); 111 } 112 113 } 114 115 protected static void receiveData(Message message) { 116 String sendType = null; 117 String jsonData = null; 118 try { 119 TextMessage msg = (TextMessage) message; 120 sendType = msg.getStringProperty("sendType"); 121 jsonData = msg.getStringProperty("jsonData"); 122 } catch (JMSException e) { 123 e.printStackTrace(); 124 } 125 System.out.println("无文件接收成功5"); 126 System.out.println(sendType); 127 System.out.println(jsonData); 128 } 129 130 private static void receiveFile(Message message) { 131 String sendType = null; 132 String jsonData = null; 133 if (message instanceof BlobMessage) { 134 BlobMessage blobMessage = (BlobMessage) message; 135 try { 136 String path = CreateZipFile.createZip("test"); 137 JFileChooser fileChooser = new JFileChooser(); 138 fileChooser.setDialogTitle("请指定文件保存位置"); 139 fileChooser.setSelectedFile(new File(path)); 140 File file = fileChooser.getSelectedFile(); 141 OutputStream os = new FileOutputStream(file); 142 InputStream inputStream = blobMessage.getInputStream(); 143 // 写文件,你也可以使用其他方式 144 byte[] buff = new byte[256]; 145 int len = 0; 146 while ((len = inputStream.read(buff)) > 0) { 147 os.write(buff, 0, len); 148 } 149 os.close(); 150 System.out.println("有文件接收成功5"); 151 sendType = blobMessage.getStringProperty("sendType"); 152 jsonData = blobMessage.getStringProperty("jsonData"); 153 System.out.println(sendType); 154 System.out.println(jsonData); 155 156 } catch (Exception e) { 157 e.printStackTrace(); 158 } 159 160 } 161 162 } 163 }
亲测好使,这两个工具类包含了发送和接收的方法,而且可以点对点或者发布订阅、有无附件均可,对了还有一点,ActiveMQ需要的jar包,网上信息很多!
补充:补充一个mq接收端自动连接到mq服务器的方法:
那就是将连接方式换一下就可以了:
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin","failover:("tcp://" + ip + ":61616")?initialReconnectDelay=1000&maxReconnectDelay=30000");
转载于:https://www.cnblogs.com/zjiacun/p/6697721.html
ActiveMQ的消息的(含附件)发送和接收使用相关推荐
- javasocket连续给服务器发送消息,Java通过Socket发送和接收多条消息
我们需要实现一个Socket客户端,它应该连接到一个接受TCP连接的服务器.如果我通过netcap与服务器进行通信,我会立即得到它的响应(通过命令行).Java通过Socket发送和接收多条消息 的工 ...
- IPC 共享内存和 消息队列(发送、接收、移除)以及键值的生成
一.消息对列 消息队列,是消息的链接表,存放在内核中.一个消息队列由一个标识符(即队列ID)来标识. 特点: 消息队列是面向记录的,其中的消息具有特定的格式以及特定的优先级.消息队列独立于发送与接收进 ...
- rocket mq 监听端口_如何使用Jmeter实现MQ数据的发送和接收?性能测试实战篇
JMeter是性能测试中被普遍使用的一种工具,常用于压力测试.该工具具有丰富的扩展插件用以满足不同情况下性能测试的需求.消息队列(Message Queue)简称为MQ,作为目前的主流中间件,在很多软 ...
- mfc 开启指定服务器,用MFC实现消息的发送和接收(含服务器)
<用MFC实现消息的发送和接收(含服务器)>由会员分享,可在线阅读,更多相关<用MFC实现消息的发送和接收(含服务器)(33页珍藏版)>请在人人文库网上搜索. 1.精品好资料学 ...
- ActiveMQ 实现消息接收发送
一.接收者 package com.demo.initservice;import javax.jms.Connection; import javax.jms.ConnectionFactory; ...
- ActiveMQ 发送和接收消息
一.添加 jar 包 <dependency><groupId>org.apache.activemq</groupId><artifactId>act ...
- JAVA ActiveMQ消息发送和接收
JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信. ...
- java activeMQ消息的发送与接收
java activeMQ消息的发送与接收 activemq是我们经常用到的消息队列之一,比如说速度快,对spring的很好的支持,支持多种协议等等,今天我们就来看一下activeMQ消息的发送与接收 ...
- C# .Net 邮箱发送含附件方法
获取源码方式: 第一种:打开微信,搜一搜"别打我女儿的主意"打开微信小程序,找到菜单栏,点击源码,进去就可以获得链接 第二种:可以给本文点赞.好评,然后发邮件到792166417@ ...
最新文章
- 分布式任务调度平台 XXL-JOB
- 简述css样式的三种引入html的方式,css-1,css的三种引入方式 基本选择器
- [置顶] 状态压缩DP 简单入门题 11题
- php调用谷歌翻译,PHP调用谷歌翻译 | 学步园
- Yapi 部署及遇到的坑
- 【ZeroClipboard is not defined】的解决方法
- 半波对称振子方向图_画好服装款式图的五个要点
- [转载] numpy教程:逻辑函数Logic functions
- informix设置数据库默认插入时间_informix常用时间运算和操作方式
- 集合框架-ArrayList,Vector,Linkedlist
- 学习Dart语言,看这一篇文章就够了!(详细介绍)
- html个人所得税计算器,上海个人所得税计算器_个税计算器_税后工资计算器
- Springboot Swagger2 Unable to infer base url问题解决
- 武汉大学计算机学院樊浩南,今年高考光荣榜?谁能告之??谢谢!!
- 各大浏览器清除缓存的快捷键
- Blender 3.5 面的操作(二)
- 技术-正则表达式 处理读书器的章、节、回问题
- 虚拟现实x3d简明教程1
- 笔记本插入网线没反应,WiFi又可以使用
- JavaScript闭包函数理解
热门文章
- mariadb mysql 重建_(MariaDB/MySQL)之DML(2):数据更新、删除
- anaconda python_Anaconda下Python环境下载及安装
- 《密码爆破漏洞详解》——黑客必修的入门操作( 建议收藏 )
- 由浅入深学习Apache httpd原理与配置
- 【转】Intellij IDEA 14中使用MyBatis-generator 自动生成MyBatis代码
- 管理对象空间——管理存储参数
- 爱情六十三课,定个开放日
- PAT1018. 锤子剪刀布
- 基于实数编码的参数自适应遗传算法(matlab代码)
- vue将经纬度转换成地理名称_新武汉北,红安有了一个新的地理名称,恒大项目将对标上海迪士尼...