官方文档: https://help.aliyun.com/document_detail/143601.html?spm=a2c4g.11174283.6.646.28ba1668pisoQW

一、物联网平台准备

1、准备消费组
打开物联网平台 -> 规则引擎 -> 服务端订阅 -> 消费组列表
点击创建消费组,输入名称。

创建完成,为创建的消费组添加订阅。点击查看,打开订阅产品一栏,创建订阅。



2、准备AccessKey


创建完成

二、IDEA端

1、添加依赖

       <!-- amqp 1.0 qpid client --><dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.56.0</version></dependency><!-- util for base64--><dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version></dependency><!-- ================================================= --><!-- 日志及相关依赖(用slf4j+logback) --><!-- ================================================= --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.5</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.2.3</version></dependency>

2、创建AmqpClient类
贴上官方代码:

import java.net.URI;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;import org.apache.commons.codec.binary.Base64;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class AmqpClient {private final static Logger logger = LoggerFactory.getLogger(AmqpClient.class);private static String accessKey = "${YourAccessKey}";private static String accessSecret = "${YourAccessSecret}";private static String consumerGroupId = "${YourConsumerGroupId}";//iotInstanceId:企业版实例请填写实例ID,公共实例请填空字符串""。private static String iotInstanceId = "${YourIotInstanceId}";//控制台服务端订阅中消费组状态页客户端ID一栏将显示clientId参数。//建议使用机器UUID、MAC地址、IP等唯一标识等作为clientId。便于您区分识别不同的客户端。private static String clientId = "${YourClientId}";//${YourHost}为接入域名,请参见AMQP客户端接入说明文档。private static String host = "${YourHost}";// 指定单个进程启动的连接数// 单个连接消费速率有限,请参考使用限制,最大64个连接// 连接数和消费速率及rebalance相关,建议每500QPS增加一个连接private static int connectionCount = 4;//业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue(50000));public static void main(String[] args) throws Exception {List<Connection> connections = new ArrayList<>();//参数说明,请参见AMQP客户端接入说明文档。for (int i = 0; i < connectionCount; i++) {long timeStamp = System.currentTimeMillis();//签名方法:支持hmacmd5、hmacsha1和hmacsha256。String signMethod = "hmacsha1";//userName组装方法,请参见AMQP客户端接入说明文档。String userName = clientId + "-" + i + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKey+ ",iotInstanceId=" + iotInstanceId+ ",consumerGroupId=" + consumerGroupId+ "|";//计算签名,password组装方法,请参见AMQP客户端接入说明文档。String signContent = "authId=" + accessKey + "&timestamp=" + timeStamp;String password = doSign(signContent, accessSecret, signMethod);String connectionUrl = "failover:(amqps://" + host + ":5671?amqp.idleTimeout=80000)"+ "?failover.reconnectDelay=30";Hashtable<String, String> hashtable = new Hashtable<>();hashtable.put("connectionfactory.SBCF", connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup("SBCF");Destination queue = (Destination)context.lookup("QUEUE");// 创建连接。Connection connection = cf.createConnection(userName, password);connections.add(connection);((JmsConnection)connection).addConnectionListener(myJmsConnectionListener);// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// 创建Receiver连接。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}logger.info("amqp demo is started successfully, and will exit after 60s ");// 结束程序运行 Thread.sleep(60 * 1000);logger.info("run shutdown");connections.forEach(c-> {try {c.close();} catch (JMSException e) {logger.error("failed to close connection", e);}});executorService.shutdown();if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {logger.info("shutdown success");} else {logger.info("failed to handle messages");}}private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(final Message message) {try {//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(new Runnable() {@Overridepublic void run() {processMessage(message);}});} catch (Exception e) {logger.error("submit task occurs exception ", e);}}};/*** 在这里处理您收到消息后的具体业务逻辑。*/private static void processMessage(Message message) {try {byte[] body = message.getBody(byte[].class);String content = new String(body);String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");logger.info("receive message"+ ",\n topic = " + topic+ ",\n messageId = " + messageId+ ",\n content = " + content);} catch (Exception e) {logger.error("processMessage occurs error ", e);}}private static JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立。*/@Overridepublic void onConnectionEstablished(URI remoteURI) {logger.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {logger.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {logger.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {logger.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}
}

需要我们修改的为开头一系列参数。

先放官方文档介绍。

(1)AccessKey
回到刚刚的AccessKey管理页面,点击后面的查看secret,输入验证码。

(2)consumerGroupId
回到刚刚的消费组列表,点击后面的查看,获取消费组ID。

(3)iotInstanceId
如官方所说,企业版实例请填写实例ID,公共实例请填空字符串""。我用的为公共实例,所以用空字符串。
(4)clientId
自己起一个名字,起到标识作用。
(5)host
查看官方文档。

${uid}.iot-amqp.${YourRegionId}.aliyuncs.com



最后修改后我的如下,供参考。

三、测试

修改完毕后IDEA运行该程序。注意启动后有时间限制。

打开物联网平台 -> 监控运维 -> 设备模拟器,选择调试设备,点击属性上报,点击启动设备模拟器

输入参数,点击发送指令

回到IDEA,可以看到已经可以接收到消息。

完成。

IDEA连接阿里云物联网平台获取设备上传数据相关推荐

  1. 微信小程序连接阿里云物联网平台操控设备(IOT)三

    文章导航: 微信小程序连接阿里云物联网平台操控设备(IOT)一 设备上云 微信小程序连接阿里云物联网平台操控设备(IOT)二 微信小程序开发(一) 微信小程序连接阿里云物联网平台操控设备(IOT)三 ...

  2. 微信小程序连接阿里云物联网平台操控设备(IOT)一

    文章导航: 微信小程序连接阿里云物联网平台操控设备(IOT)一 设备上云 微信小程序连接阿里云物联网平台操控设备(IOT)二 微信小程序开发(一) 微信小程序连接阿里云物联网平台操控设备(IOT)三 ...

  3. java实现mqtt服务端_基于Swoole使用MQTT协议连接阿里云物联网平台设备实现消息订阅

    阿里云物联网平台为设备提供安全可靠的连接通信能力,支撑设备数据采集上云,我们这里认为阿里云物联网平台是 MQTT 服务端,那么我们自己的设备作为客户端,应该如何实现消息订阅? 阿里云没有提供 PHP ...

  4. 树莓派mqtt协议连接阿里云物联网平台,手机端获取数据并控制

    树莓派mqtt协议连接阿里云物联网平台(三) 前面树莓派的数据已经上传到云端,可是我的android手机该如何获取树莓派上传的这些数据呢,,困惑了我好几天的疑问,解开的那一刻,真的时拨开云雾见青天啊. ...

  5. 利用PYTHON连接阿里云物联网平台

    语言:python 3.7 环境:windows 10 实例:公共实例(免费) 阿里云的官方文档只有C语言和Linux环境,因此自我探索出利用PYTHON连接阿里云物联网平台的方法和步骤. 概述 - ...

  6. 使用arduino D1 wifi模块(WeMos D1)连接阿里云物联网平台并成功实现APP点亮板载LED(九)---制作APP

    前几篇文章讲解了如何在阿里云控制台上给设备登记"身份证",如何用MQTT.fx客户端模拟一个设备连接阿里云物联网平台,分析了arduino编程环境如何配置依赖库,在arduino ...

  7. ESP-MQTT-AT指令连接阿里云物联网平台

    文章目录 文章背景 关键的关键词 本章使用 本章约定 所需资源 技术正文 一:连接方式 二:步骤(简介 3.ESP模块+stm32单片机) 1.创建设备复制三元素组 2.信息生成 3.使用串口工具和云 ...

  8. ESP8266 AT指令连接阿里云物联网平台

    模组:ESP-12F   ||   ESP-12S  ||   (保险来说,ESP模组的flash应该 大于4MB.或者说为32Mbit 才可以)待补充 ... 平台:阿里云物联网平台 固件:ESP8 ...

  9. 移远BC25/28/35GMQTT连接阿里云物联网平台并实现属性上报

    一.平台侧操作 创建一个产品,并添加一个设备,获取到设备接入所需的验证信息.本实验选择的产品品类为标准品类--智慧园区--气象站监测仪.(你也可以自定义产品类别,但是后期数据上报的关键字需要自己去添加 ...

最新文章

  1. Ubuntu操作系统安装之开发人员必备
  2. 马云携阿里17位创始人及合伙人捐赠浙大一院5.6亿,杭州渐成中国硅谷
  3. jQuery学习笔记(一):入门
  4. 最好电脑操作系统_软件开发人员该如何选择笔记本电脑?朋克老师来教你
  5. C语言1e12怎么识别,求大神帮助词法分析,当输入第一个1.2e12时可以输出,当时输入第二个1.2e12时就不能输出了,万分感谢,还有不能识别x=7*8+9中的+9,...
  6. This generally means that another instance of this process was already runni
  7. RUP大讲堂(第三讲):如何建立软件产品的愿景
  8. escape character.
  9. ASP.NET AJAX入门系列(8):自定义异常处理
  10. 'tensorflow' has no attribute 'sub'
  11. 叙述计算机网络的分类与拓扑结构,计算机的网络中有线网络和无线网络最主要的区别是()。...
  12. c# combobox 绑定枚举方式
  13. Ubuntu升级php7.0配置fpm socket
  14. 七月算法机器学习 6 特征工程 小案例
  15. ubuntu20.04安装ROS极简教程 (noetic)
  16. python显示invalid character_python提示invalid character in identifier
  17. SOFA企业应用框架
  18. python100个必备包_这套python教程超详细,包你1小时入门Python,100天摇身变大牛...
  19. 耗电优化(上):Android App 耗电分析
  20. 番茄学习--番茄工具推荐

热门文章

  1. 基于C4D的3d设计
  2. 算法刷题打卡第34天:有效的井字游戏
  3. [转]教你修复win7中复制粘贴失效的问题
  4. 【SRAM】CubeMX配置STM32H743+IS61WV204816外部扩展SRAM
  5. python牛顿迭代法求根例题_python求根算法
  6. python-计算字符个数
  7. 基于Nginx构建七牛云CDN静态资源加速
  8. Win10系统上设置Microsoft store的默认下载路径
  9. 清华计算机科学四字班,清华大学里四个特殊班
  10. 工业机械设备设计与艺术设计