Aliyun IOT 使用

服务端订阅

官网:https://help.aliyun.com/document_detail/142376.html?spm=a2c4g.11186623.6.622.46b92cf0vmZSwV

准备工作

一、首先开通并进入阿里云物联网平台,创建一个公共实例/企业版实例【公共实例可用于测试,生产最好用企业版实例】

二、点击进入实例,创建产品,参数根据实际情况输入。

三、产品创建完成后,添加设备

四、若想测试发布订阅消息,还需要添加自定义的 topic 主题【当然也可以用阿里官方提供端】以及 创建服务端订阅

服务端订阅编码实现【Java】

工程下导入 pom 依赖

  <!--aliyun core-->
<dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-core</artifactId><version>4.5.6</version>
</dependency>
<!--aliyun Iot-->
<!-- https://mvnrepository.com/artifact/com.aliyun/aliyun-java-sdk-iot -->
<dependency><groupId>com.aliyun</groupId><artifactId>aliyun-java-sdk-iot</artifactId><version>7.16.0</version>
</dependency>
<!-- IOT用于监听阿里平台消息 -->
<dependency><groupId>com.aliyun.openservices</groupId><artifactId>iot-client-message</artifactId><version>1.1.5</version>
</dependency>
<!-- amqp 1.0 qpid client -->
<dependency><groupId>org.apache.qpid</groupId><artifactId>qpid-jms-client</artifactId><version>0.47.0</version>
</dependency>
<!-- util for base64-->
<dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.10</version>
</dependency>

在项目目录下创建一个 config.properties 配置文件,具体参数如下:

## 阿里云密钥ID
iot.accessKeyID=*************
# 阿里云密钥
iot.accessKeySecret=*************
iot.uid=**********
# 要访问的iot的regionId 目前支持的 cn-shanghai(华东2)、ap-southeast-1(新加坡) 、us-west-1(美西)
iot.regionId=cn-shenzhen
#iot套件对应的产品code 保持不变即可
iot.productCode=Iot
#Iot api的服务地址 跟regionId对应 这是华东2的
iot.domain=iot.${iot.regionId}.aliyuncs.com
#Iot api 的版本
iot.version=2020-01-20
# 消费组ID
iot.consumerGroupId=DEFAULT_GROUP
# 企业版iot实例ID,公共版没有
iot.iotInstanceId=*************
# 签名方法:支持hmacmd5、hmacsha1和hmacsha256。
iot.signMethod=hmacsha1
# iot公网终端节点url,企业版
iot.amqp.connectionUrl=*************

参数介绍:

【accessKeyID 、accessKeySecret 、uid】 为阿里云用户信息;建议自己创建一个只具有 IOT 权限的用户步骤:创建用户,设置权限,创建 accessKeySecret

【regionId、productCode、domain、version】regionId 为区域ID,productCode 固定为 iot,domain 为区域对应的访问地址,version 为版本号;这些参数都很好获取就不做介绍了。

【consumerGroupId、iotInstanceId、signMethod,connectionUrl】consumerGroupId 为分组id,在上面创建订阅时右边的就是;iotInstanceId:为企业版实例的ID,公共实例不需要;signMethod 为加密方式,固定为上面给出的即可;connectionUrl:为企业版实例 AMQP 接入方式的 url

创建 IOT 连接工具类

import com.akieay.cloudprint.common.util.aliyun.iot.util.LogUtil;
import com.aliyun.openservices.iot.api.Profile;
import com.aliyun.openservices.iot.api.message.MessageClientFactory;
import com.aliyun.openservices.iot.api.message.api.MessageClient;
import com.aliyuncs.DefaultAcsClient;
import com.aliyuncs.profile.DefaultProfile;
import com.aliyuncs.profile.IClientProfile;
import org.apache.commons.codec.binary.Base64;import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.io.IOException;
import java.util.Hashtable;
import java.util.Properties;/*** iot相关配置信息,以及client的生产* @author akieay*/
public class IotConnectionUtil {/*** 阿里云*/private static String accessKeyID;private static String accessKeySecret;private static String uid;private static String regionId;private static String product;private static String domain;private static String version;private static String consumerGroupId;private static String iotInstanceId;private static String signMethod;private static String amqpConnectionUrl;private static String CONNECTION_NAME= "SBCF";static {Properties prop = new Properties();try {prop.load(Object.class.getResourceAsStream("/config.properties"));accessKeyID = prop.getProperty("iot.accessKeyID");accessKeySecret = prop.getProperty("iot.accessKeySecret");uid = prop.getProperty("iot.uid");regionId = prop.getProperty("iot.regionId");product = prop.getProperty("iot.productCode");domain = prop.getProperty("iot.domain");version = prop.getProperty("iot.version");consumerGroupId = prop.getProperty("iot.consumerGroupId");iotInstanceId = prop.getProperty("iot.iotInstanceId");signMethod = prop.getProperty("iot.signMethod");amqpConnectionUrl = prop.getProperty("iot.amqp.connectionUrl");} catch (IOException e) {e.printStackTrace();}}public static String getAmqpConnectionUrl() {//企业版实例
//       return "failover:(amqps://"+amqpConnectionUrl+":5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";//公共实例return "failover:(amqps://"+uid+".iot-amqp."+regionId+".aliyuncs.com:5671?amqp.idleTimeout=80000)?failover.reconnectDelay=30";}public static Connection getConnectionByEnterpriseInstance( Hashtable<String, String> hashtable ) throws Exception {Connection connection = null;try {long timeStamp = System.currentTimeMillis();String userName = uid + "|authMode=aksign"+ ",signMethod=" + signMethod+ ",timestamp=" + timeStamp+ ",authId=" + accessKeyID
//                   //公共版不需要该参数,企业版必须填写该参数
//                    + ",iotInstanceId=" + iotInstanceId+ ",consumerGroupId=" + consumerGroupId+ "|";String signContent = "authId=" + accessKeyID + "&timestamp=" + timeStamp;String password = doSign(signContent,accessKeySecret, signMethod);Context context = new InitialContext(hashtable);ConnectionFactory cf = (ConnectionFactory)context.lookup(CONNECTION_NAME);connection = cf.createConnection(userName, password);} catch (Exception e) {LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());}return connection;}public static MessageClient getMessageClientByPublicInstance() {MessageClient messageClient = null;try {String endPoint = "https://" + uid + ".iot-as-http2." + regionId + ".aliyuncs.com";// 连接配置Profile profile = Profile.getAccessKeyProfile(endPoint, regionId, accessKeyID, accessKeySecret);// 构造客户端messageClient = MessageClientFactory.messageClient(profile);} catch (Exception e) {LogUtil.print("初始化messageClient失败!exception:" + e.getMessage());}return messageClient;}/*** 计算签名,password组装方法,请参见AMQP客户端接入说明文档。*/private static String doSign(String toSignString, String secret, String signMethod) throws Exception {SecretKeySpec signingKey = new SecretKeySpec(secret.getBytes(), signMethod);Mac mac = Mac.getInstance(signMethod);mac.init(signingKey);byte[] rawHmac = mac.doFinal(toSignString.getBytes());return Base64.encodeBase64String(rawHmac);}public static DefaultAcsClient getClient() {DefaultAcsClient client = null;try {IClientProfile profile = DefaultProfile.getProfile(regionId, accessKeyID, accessKeySecret);DefaultProfile.addEndpoint(regionId, product, domain);// 初始化clientclient = new DefaultAcsClient(profile);} catch (Exception e) {LogUtil.print("初始化client失败!exception:" + e.getMessage());}return client;}public static String getRegionId() {return regionId;}public static void setRegionId(String regionId) {IotConnectionUtil.regionId = regionId;}public static String getDomain() {return domain;}public static void setDomain(String domain) {IotConnectionUtil.domain = domain;}public static String getVersion() {return version;}public static void setVersion(String version) {IotConnectionUtil.version = version;}
}

创建服务端订阅业务类

import com.akieay.cloudprint.common.util.aliyun.iot.connection.IotConnectionUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import java.net.URI;
import java.util.Hashtable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;/*** @author akieay* @Date: 2020/11/18 11:09*/
@Slf4j
public class ServerSideSubscription {/*** 业务处理异步线程池,线程池参数可以根据您的业务特点调整,或者您也可以用其他异步方式处理接收到的消息。*/private final static ExecutorService executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),Runtime.getRuntime().availableProcessors() * 2, 60, TimeUnit.SECONDS,new LinkedBlockingQueue<>(50000));public static void main(String[] args) throws Exception {Hashtable<String, String> hashtable = new Hashtable<>();String connectionUrl = IotConnectionUtil.getAmqpConnectionUrl();hashtable.put("connectionfactory.SBCF",connectionUrl);hashtable.put("queue.QUEUE", "default");hashtable.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");Connection connection = IotConnectionUtil.getConnectionByEnterpriseInstance(hashtable);Context context = new InitialContext(hashtable);Destination queue = (Destination)context.lookup("QUEUE");((JmsConnection) connection).addConnectionListener(jmsConnectionListener);// 创建会话。// Session.CLIENT_ACKNOWLEDGE: 收到消息后,需要手动调用message.acknowledge()。// Session.AUTO_ACKNOWLEDGE: SDK自动ACK(推荐)。Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);connection.start();// 创建Receiver连接。MessageConsumer consumer = session.createConsumer(queue);consumer.setMessageListener(messageListener);}/*** 消息监听器*/private static MessageListener messageListener = new MessageListener() {@Overridepublic void onMessage(Message message) {try {//1.收到消息之后一定要ACK。// 推荐做法:创建Session选择Session.AUTO_ACKNOWLEDGE,这里会自动ACK。// 其他做法:创建Session选择Session.CLIENT_ACKNOWLEDGE,这里一定要调message.acknowledge()来ACK。// message.acknowledge();//2.建议异步处理收到的消息,确保onMessage函数里没有耗时逻辑。// 如果业务处理耗时过程过长阻塞住线程,可能会影响SDK收到消息后的正常回调。executorService.submit(() -> processMessage(message));} catch (Exception e) {log.error("submit task occurs exception ", e);}}};/*** 在这里处理您收到消息后的具体业务逻辑。*/private static void processMessage(Message message) {try {String topic = message.getStringProperty("topic");String messageId = message.getStringProperty("messageId");byte[] body = message.getBody(byte[].class);String content = null;if (null != body) {content = new String(body);}System.out.println("receive message"+ ", topic = " + topic+ ", messageId = " + messageId+ ", content = " + content);} catch (Exception e) {log.error("processMessage occurs error ", e);}}/*** 连接状态监听器*/private static JmsConnectionListener jmsConnectionListener = new JmsConnectionListener() {/*** 连接成功建立。*/@Overridepublic void onConnectionEstablished(URI remoteURI) {log.info("onConnectionEstablished, remoteUri:{}", remoteURI);}/*** 尝试过最大重试次数之后,最终连接失败。*/@Overridepublic void onConnectionFailure(Throwable error) {log.error("onConnectionFailure, {}", error.getMessage());}/*** 连接中断。*/@Overridepublic void onConnectionInterrupted(URI remoteURI) {log.info("onConnectionInterrupted, remoteUri:{}", remoteURI);}/*** 连接中断后又自动重连上。*/@Overridepublic void onConnectionRestored(URI remoteURI) {log.info("onConnectionRestored, remoteUri:{}", remoteURI);}@Overridepublic void onInboundMessage(JmsInboundMessageDispatch envelope) {}@Overridepublic void onSessionClosed(Session session, Throwable cause) {}@Overridepublic void onConsumerClosed(MessageConsumer consumer, Throwable cause) {}@Overridepublic void onProducerClosed(MessageProducer producer, Throwable cause) {}};}

设备端接入

以下介绍的设备端接入为 java 版,主要用于配合服务端订阅的调试
官网:https://help.aliyun.com/document_detail/97331.html?spm=a2c4g.11186623.6.675.7636277c2CMzcp

设备端接入编码实现

导入 pom 依赖

<!--aliyun client-->
<dependency><groupId>com.aliyun.alink.linksdk</groupId><artifactId>iot-linkkit-java</artifactId><version>1.2.0.1</version><scope>compile</scope>
</dependency>
<dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.1</version><scope>compile</scope>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.25</version><scope>compile</scope>
</dependency>

客户端接入业务类

import com.akieay.cloudprint.common.util.aliyun.iot.constant.TopicConstant;
import com.aliyun.alink.dm.api.DeviceInfo;
import com.aliyun.alink.dm.api.InitResult;
import com.aliyun.alink.linkkit.api.ILinkKitConnectListener;
import com.aliyun.alink.linkkit.api.IoTMqttClientConfig;
import com.aliyun.alink.linkkit.api.LinkKit;
import com.aliyun.alink.linkkit.api.LinkKitInitParams;
import com.aliyun.alink.linksdk.cmp.connect.channel.MqttSubscribeRequest;
import com.aliyun.alink.linksdk.cmp.core.base.AMessage;
import com.aliyun.alink.linksdk.cmp.core.base.ConnectState;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectNotifyListener;
import com.aliyun.alink.linksdk.cmp.core.listener.IConnectSubscribeListener;
import com.aliyun.alink.linksdk.tools.AError;
import lombok.extern.slf4j.Slf4j;/*** @author akieay* @Date: 2020/11/18 16:59*/
@Slf4j
public class ClientSideSubscription {public static void main(String[] args) {/*** 主要注意参数 channelHost,公共实例格式为:{productKey}+".iot-as-mqtt."+{regionId}+".aliyuncs.com:1883"* 企业版实例格式为:实现详情下的公网终端节点(Endpoint)中的 AMQP 的路径 + ":1883"*/new ClientSideSubscription().init("产品Key", "设备deviceName","设备deviceSecret", "你的channelHost");}public void init(String productKey, String deviceName, String deviceSecret, String channelHost){LinkKitInitParams params = new LinkKitInitParams();/*** 设置 Mqtt 初始化参数*/IoTMqttClientConfig config = new IoTMqttClientConfig();config.productKey = productKey;config.deviceName = deviceName;config.deviceSecret = deviceSecret;config.channelHost = channelHost;/*** 是否接受离线消息* 对应 mqtt 的 cleanSession 字段*/config.receiveOfflineMsg = false;params.mqttClientConfig = config;/*** 设置初始化三元组信息,用户传入*/DeviceInfo deviceInfo = new DeviceInfo();deviceInfo.productKey = productKey;deviceInfo.deviceName = deviceName;deviceInfo.deviceSecret = deviceSecret;params.deviceInfo = deviceInfo;//初始化连接LinkKit.getInstance().init(params, new ILinkKitConnectListener() {@Overridepublic void onError(AError aError) {log.info("Mqtt connect fail");}@Overridepublic void onInitDone(InitResult initResult) {log.info("Mqtt connect success");// 订阅MqttSubscribeRequest request = new MqttSubscribeRequest();// topic 用户根据实际场景填写request.topic = "/" + productKey + "/" + deviceName + TopicConstant.TEST_TOPIC;request.isSubscribe = true;LinkKit.getInstance().subscribe(request, new IConnectSubscribeListener() {@Overridepublic void onSuccess() {// 订阅成功log.info("subscribe topic " + request.topic + " success");// 注册下行监听,包括长连接的状态和云端下行的数据LinkKit.getInstance().registerOnNotifyListener(new IConnectNotifyListener() {@Overridepublic void onNotify(String connectId, String topic, AMessage aMessage) {// 云端下行数据回调// connectId 连接类型 topic 下行 topic; aMessage 下行数据String data = new String((byte[]) aMessage.data);log.info("topic: " + topic + " \t data: " + data);// pushData 示例  {"method":"thing.service.test_service","id":"123374967","params":{"vv":60},"version":"1.0.0"}// method 服务类型; params 下推数据内容}@Overridepublic boolean shouldHandle(String connectId, String topic) {// 选择是否不处理某个 topic 的下行数据// 如果不处理某个topic,则onNotify不会收到对应topic的下行数据return true; //TODO 根基实际情况设置}@Overridepublic void onConnectStateChange(String connectId, ConnectState connectState) {log.info(connectId, connectState);// 对应连接类型的连接状态变化回调,具体连接状态参考 SDK ConnectState}});}@Overridepublic void onFailure(AError aError) {// 订阅失败log.info("onFailure " + (aError==null?"":(aError.getCode()+aError.getMsg())));}});}});}
}

演示工程:https://download.csdn.net/download/qq_39668819/13124188

阿里云 IOT 物联网平台简单使用【随笔】相关推荐

  1. 【微信小程序控制硬件⑧ 】微信小程序以 websocket 连接阿里云IOT物联网平台mqtt服务器,封装起来使用就是这么简单!(附带Demo)

    [微信小程序控制硬件第1篇 ] 全网首发,借助 emq 消息服务器带你如何搭建微信小程序的mqtt服务器,轻松控制智能硬件! [微信小程序控制硬件第2篇 ] 开始微信小程序之旅,导入小程序Mqtt客户 ...

  2. HaaS学习笔记 | 终端设备接入和断开阿里云IoT物联网平台的明细教程

    [1]题目要求 [本教程视频]:终端设备连接阿里云物联网平台 [2]理论基础 aliyunIoT是HaaS轻应用扩展库中模块,能帮助厂商将设备安全地接入到阿里云IoT物联网平台,继而让设备可以被物联网 ...

  3. 阿里云IoT物联网平台

    IoT物联网平台 一.工作原理 1.MQTT(Message Queuing Telemetry Transport) 是一种轻量级的消息传输协议,专门设计用于物联网(IoT)应用中的通信.它是一种发 ...

  4. 微信小程序使用MQTT.js连接阿里云IoT物联网平台

    官方已经开源了一个SDK版本,也是基于mqtt.js,进行了各种封装: https://github.com/aliyun/alibabacloud-iot-device-sdk · 阅读以下内容需要 ...

  5. 【小程序案例】支付宝小程序-MQTT模器,IoT设备通过WSS接入阿里云IoT物联网平台...

    支付宝小程序-MQTT模拟器通过WSS接入阿里云IoT物联网平台 准备工作 1.1 注册阿里云账号 开通阿里云账号,并通过支付宝实名认证 https://www.aliyun.com 1.2 免费开通 ...

  6. 解密阿里云IoT物联网平台MQTT Access Server核心架构

    MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布.订阅信息传输协议.MQTT已逐渐成为IoT领域最热门的协议,也是国内外各大物联网平台最主流的传输协议,阿里云IoT物联网平台 ...

  7. 支付宝小程序使用MQTT over WebSocket连接阿里云IoT物联网平台

    前言 之前写了一篇微信小程序使用MQTT over WebSocket连接阿里云IoT物联网平台,介绍了如何使用mqtt.js在微信小程序上连接mqtt服务器,文中顺带提了mqtt.js是支持支付宝小 ...

  8. MQTT协议与阿里云IoT物联网平台

    1.MQTT协议介绍 1.1 MQTT协议 MQTT(消息队列遥测传输) 是基于 TCP/IP 协议栈而构建的支持在各方之间异步通信的消息协议.MQTT在空间和时间上将消息发送者与接收者分离,因此可以 ...

  9. 基于TCP协议的GPS定位器设备迁移到阿里云IoT物联网平台实践——实践类

    背景 GPS定位器是内置了GPS模块和移动通信模块的终端,用来将GPS模块获得的定位数据通过移动通信模块传至Internet上的一台服务器上,从而可以实现在电脑或手机上查询终端位置. GPS定位器可用 ...

最新文章

  1. Jsoncpp 使用方法解析
  2. 树莓派:人在太空,刚下火箭,诚招小于19岁的代码开发者
  3. 2021年人工神经网络第四次作业要求:第七题
  4. 问题集锦(43-45)
  5. [Spring cloud 一步步实现广告系统] 9. 主类和配置文件
  6. 10道C++输出易错笔试题收集
  7. testng的报告自定义笔记
  8. 协议簇:TCP 解析: 建立连接
  9. 【CCCC】L2-010 排座位 (25分),,并查集+二维矩阵判定关系
  10. SCP对拷如何连接指定端口远程主机
  11. 230. 二叉搜索树中第K小的元素
  12. oracle12c ora01017,ORACLE 12C 之 ORA-01017
  13. 无线通信中载波带宽是什么?
  14. php弱口令总结,web漏洞之弱口令
  15. 如何创造一种团队文化
  16. requests库安装和简单功能学习总结
  17. 网易web安全:课后问题-CSRF
  18. IE8的一个麻烦的问题
  19. Vue项目实战之电商后台管理系统(一) 用户登录模块
  20. 鸡兔同笼,已知鸡兔共有 50 只,共有 140 只脚,编程求解鸡有几只?兔子几只?

热门文章

  1. vue3+ts+vite后台管理模板
  2. 自动铅笔的简笔画怎么画,自动化简笔画图片大全
  3. 〖Python 数据库开发实战 - Python与MySQL交互篇⑯〗- 项目实战 - 实现用户管理 - 新增用户功能
  4. 西安恒智小寨java_长安反编译工具 java
  5. 【面试题】网易互娱(游戏)2021校园招聘在线笔试 - 服务端开发工程师[螺旋矩阵]
  6. css 细线表格,如何在Dreamweaver中制作细线表格?
  7. bilibili 哔哩哔哩 2018秋招试题
  8. RFID资产管理|超高频RFID技术在医院资产管理项目中的应用-铨顺宏
  9. LabVIEW编程开发Agilent 34401A(Keysight 34401A)例程与相关资料
  10. JAV----------数组操作