之前一直按官方文档来的,但是topic消息一直积压,服务端老是订阅不到。

坑: 

String iotInstanceId = "${iotInstanceId}";

这个参数是要填实例ID,文档说:“没有购买实例可忽略”。然后我就保持没动。结果订阅不到。

改:

String iotInstanceId = "";

附代码:

package amqp;import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;class AmqpJavaClient {private final static Logger logger = LoggerFactory.getLogger(AmqpJavaClient.class);//业务处理异步线程池,线程池参数可以根据您的业务特点调整;或者您也可以用其他异步方式处理接收到的消息private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(50000));public static void main(String[] args) throws Exception {System.out.println("测试0");//参数说明,请参见上一篇文档:AMQP客户端接入说明。String accessKey = "LTAI4GCvKHPiaJUX7wmXPZYz";String accessSecret = "H5u7cJbFPsvioZhWe1fLc6R6BrYZ8U";String consumerGroupId = "DEFAULT_GROUP";String iotInstanceId = "";  //这里如果没有购买实例就别填,不然接收不到消息long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5,hmacsha1和hmacsha256String signMethod = "hmacsha1";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。String clientId = "XB001";//UserName组装方法,请参见上一篇文档:AMQP客户端接入说明。String userName = clientId + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",iotInstanceId=" + iotInstanceId //如果是购买的实例,需要传实例ID+ ",consumerGroupId=" + consumerGroupId+ "|";//password组装方法,请参见上一篇文档:AMQP客户端接入说明。String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;String password = doSign(signContent,accessSecret, signMethod);//按照qpid-jms的规范,组装连接URL。String connectionUrl = "failover:(amqps://1773647983940296.iot-amqp.cn-shanghai.aliyuncs.com:5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF",connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");Destination queue = (Destination)context.lookup("QUEUE");// Create ConnectionConnection connection = cf.createConnection(userName, password);((JmsConnection) connection).addConnectionListener(myJmsConnectionListener);// Create Session// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// Create Receiver LinkMessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);System.out.println("测试0.1");}private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {try {System.out.println("测试1");//1.收到消息之后一定要ACK// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。//message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() -> processMessage(message));} catch (Exception e) {System.out.println("测试2");logger.error("submit task occurs exception ", e);}}};/*** 在这里处理您收到消息后的具体业务逻辑。*/private static void processMessage(Message message) {try {System.out.println("测试3");byte[] body = message.getBody(byte[].class);String content = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");logger.info("receive message"+ ", topic = " + topic+ ", messageId = " + messageId+ ", content = " + content);} catch (Exception e) {logger.error("processMessage occurs error ", e);}}private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立*/@Overridepublic void onConnectionEstablished(URI remoteURI) {System.out.println("连接成功");logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {System.out.println("试过最大重试次数之后,最终连接失败。");logger.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {System.out.println("连接中断。");logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {System.out.println("连接中断后又自动重连上。");logger.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** password签名计算方法,请参见上一篇文档:AMQP客户端接入说明。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}
}

阿里IOT用AMQP在服务端订阅消息,踩坑相关推荐

  1. 物联网学习篇:Python SDK接入阿里云物联网平台,接收服务端订阅消息

    1. 下载SDK SDK下载链接 下载之后,可见得到了一个 qpid-proton-0.29.0.tar.gz 的压缩包. 有两个方法: 1. 直接运用SCP软件拖进服务器中: 2. 直接用wget下 ...

  2. 小程序统一服务消息_[miniblog]小程序订阅消息踩坑记

    有阵子没有更新我的mini-blog了,这次把推送消息那块做了些改动,小程序的模板消息即将废弃,订阅消息终于来了. 关于订阅消息 订阅消息分为一次性订阅和长期订阅,长期订阅就不说啦,不是个人号可以染指 ...

  3. vue开发微信公众号订阅消息踩坑记录

    今天做了一个微信公众号的网页开发, 使用的是vue做的开发,没错,就是微信公众号的开发,这都2021年了, 还有人有开发微信公众号的需求,我也是晕了, 微信公众号是我开发中感觉,最难调试的开发工作,没 ...

  4. 腾讯云IM服务端API集成踩坑记录(一)账号管理调试

    1.新增用户要注意大小写: 2.除体验版外,其他版本都无法删除账号: 3.连接失败问题报错,需要增加pom依赖 问题截取:Caused by: java.lang.NoSuchMethodError: ...

  5. [笔记]阿里云物联网之业务服务端(java、php)接入阿里云平台

    文章目录 前言 准备 相关资料 相关介绍 消息通信 云产品流转 RocketMQ 服务订阅 MNS AMQP Topic通信 创建设备 Mqttfx设备接入(模拟设备) 安装mqtt.fx 1.71版 ...

  6. 抖音、腾讯、阿里、美团春招服务端开发岗位硬核面试(二)

    在上一篇 文章中,我们分享了几大互联网公司面试的题目,本文就来详细分析面试题答案以及复习参考和整理的面试资料,小民同学的私藏珍品????. 首先是面试题答案公布,在讲解时我们主要分成如下几块:语言的基 ...

  7. Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息

    Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据 ...

  8. java服务器向客户端发消息_java一个简单的客户端向服务端发送消息

    java一个简单的客户端向服务端发送消息 客户端代码: package com.chenghu.tcpip; import java.io.IOException; import java.io.Ou ...

  9. Netty简单实现客户端与服务端收发消息

    Netty简单实现客户端与服务端收发消息 这个小案例主要是实现netty收发消息,分为客户端,及服务端,以及包含了相关状态处理,主要的代码会放在最后 gitHub 地址上,有需要可以看一下 首先来简单 ...

最新文章

  1. suse11 oracle11g 安装
  2. R3抹掉加载的DLL
  3. nginx 学习笔记(5) nginx调试日志
  4. 漫画:什么是插入排序?
  5. python时间库_Python处理日期时间的标准库:time和datetime
  6. Semaphore 里面居然有这么一个大坑!
  7. USB(UVC协议)摄像头
  8. 计算机考研复试面试常问问题 数据结构篇(上)
  9. 1102: 【入门】字符图形1-星号矩形
  10. stony大学计算机科学找工作,美国STEM专业毕业生薪资最高的院校有哪些?
  11. windows10电脑连接小爱音箱(完美解决连接上无声音)
  12. [VBS]_[活动分组程序]
  13. ORA-12514: TNS:listener does not currently know of service requested in connect descript
  14. python list diff_PythonList交集,并集,差集的应用
  15. 被社会毒打的20年毕业的后端
  16. python 移动平均函数_「EMA系列之I」如何理解EMA指数移动平均值以及Python实现
  17. ghost系统无法启动
  18. linux忘记管理员密码解决方法
  19. 淘宝 item_recommend - 获取推荐商品列表
  20. B2C电商运营模式的短板

热门文章

  1. PAT乙级 1003 我要通过! (20分)
  2. Quoted-printable 编码认识、介绍、编码解码转换
  3. 《微观经济学》第八章 博弈论与寡头市场初步笔记
  4. XtraReport绑定数据源的三种方式
  5. ARL资产灯塔收集系统
  6. dedecms中[field:imglink/]图片大小问题解决办法
  7. B. Shifting Sort- Codeforces Round #744 (Div. 3)
  8. LeetCode 37. 解数独 Sudoku Solver
  9. Spine动画 导入COCOS和U3D
  10. 2021 上海科技大学信息学院SIST夏令营经验+记录贴