传感器上传数据到阿里云Iot,然后从阿里云Iot传输数据到我的服务器和数据库
整个系统架构如下:
MQTT 和RocketMQ的区别:
1、传感器采用GPRS传输数据,采用微消息队列MQTT,微消息队列 MQTT 主要承担移动端连接接入、连接管理、数据转发等工作,相当于一个无限扩展能力的连接网关。传感器传送数据到阿里云Iot。MQTT支持Python,java,有相应的sdk,地址如下:
https://help.aliyun.com/product/100973.html?spm=a2c4g.11186623.6.540.791c5695zBTImA
阿里云Iot的设备传输数据格式是json格式,采用post方式提交数据,具体格式如下:
设备属性上报
通过该Topic获取设备上报的属性信息。
Topic:/sys/{productKey}/{deviceName}/thing/event/property/post
数据格式:
{"iotId":"4z819VQHk6VSLmmBJfrf00107ee200","productKey":"1234556554","deviceName":"deviceName1234","gmtCreate":1510799670074,"deviceType":"Ammeter","items":{"Power":{"value":"on","time":1510799670074},"Position":{"time":1510292697470,"value":{"latitude":39.9,"longitude":116.38}}}
}
参数说明:
参数 | 类型 | 说明 |
---|---|---|
iotId | String | 设备在平台内的唯一标识 |
productKey | String | 设备所属产品的唯一标识 |
deviceName | String | 设备名称 |
deviceType | String | 设备类型 |
items | Object | 设备数据 |
Power | String | 属性名称,产品所具有的属性名称请参考TSL描述 |
Position | String | 属性名称,产品所具有的属性名称请参考TSL描述 |
value | 根据TSL定义 | 属性值 |
time | Long | 属性产生时间,如果设备没有上报默认采用云端生成时间 |
gmtCreate | Long | 数据流转消息产生时间 |
发送方式如下:
MQTT采用topic形式发送数据,生产者产生数据,通过topic进行发送,消费者订阅消息,接收消息。
2、MQTT将消息进行转储,使用RocketMQ进行转储,RocketMQ支持HTTP协议,有相应的sdk开发包,进行开发,如Python,java,通过ip地址直接订阅消息,地址如下:https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.71694fb3z3Z3Kg
RocketMQ是一个消息队列,吞吐性能强大。格式如下,
自定义生产者与消费者如下:
producer
发送消息
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("pay_topic_01");
producer.setNamesrvAddr("100.8.8.88:9876");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
}
catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
订阅消息
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("100.8.8.88:9876");
/**
* 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
* 如果非第一次启动,那么按照上次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
3、阿里云官方Python 的sdk中的producer和consumer如下:
(1)producer
#初始化 client
mq_client = MQClient(
#设置HTTP接入域名(此处以公共云生产环境为例)
"${HTTP_ENDPOINT}",
#AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${ACCESS_KEY}",
#SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${SECRET_KEY}"
)
#所属的 Topic
topic_name = "${TOPIC}"
#Topic所属实例ID,默认实例为空None
instance_id = "${INSTANCE_ID}"
producer = mq_client.get_producer(instance_id, topic_name)
# 循环发布多条消息
msg_count = 100
print "%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count)
try:
for i in range(msg_count):
msg_body = "I am test message %s." % i
msg = TopicMessage(
# 消息内容
"I am test message %s." % i,
# 消息标签
""
)
re_msg = producer.publish_message(msg)
print "Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5)
time.sleep(1)
except MQExceptionBase, e:
if e.type == "TopicNotExist":
print "Topic not exist, please create it."
sys.exit(1)
print "Publish Message Fail. Exception:%s" % e
(2)consumer
重要的是message的body中的数据,取出数据即可实现通讯。
#初始化 client
mq_client = MQClient(
#设置HTTP接入域名(此处以公共云生产环境为例)
"${HTTP_ENDPOINT}",
#AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${ACCESS_KEY}",
#SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
"${SECRET_KEY}"
)
#所属的 Topic
topic_name = "${TOPIC}"
#您在控制台创建的 Consumer ID(Group ID)
group_id = "${GROUP_ID}"
#Topic所属实例ID,默认实例为空None
instance_id = "${INSTANCE_ID}"
consumer = mq_client.get_consumer(instance_id, topic_name, group_id)
#长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
#长轮询时间3秒(最多可设置为30秒)
wait_seconds = 3
#一次最多消费3条(最多可设置为16条)
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
try:
#长轮询消费消息
recv_msgs = consumer.consume_message(batch, wait_seconds)
for msg in recv_msgs:
print "Receive, MessageId: %s\nMessageBodyMD5: %s \
\nMessageTag: %s\nConsumedTimes: %s \
\nPublishTime: %s\nBody: %s \
\nNextConsumeTime: %s \
\nReceiptHandle: %s" % \
(msg.message_id, msg.message_body_md5,
msg.message_tag, msg.consumed_times,
msg.publish_time, msg.message_body,//这里封装的数据问题
msg.next_consume_time, msg.receipt_handle)
except MQExceptionBase, e:
if e.type == "MessageNotExist":
print "No new message! RequestId: %s" % e.req_id
continue
print "Consume Message Fail! Exception:%s\n" % e
time.sleep(2)
continue
#msg.next_consume_time前若不确认消息消费成功,则消息会重复消费
#消息句柄有时间戳,同一条消息每次消费拿到的都不一样
try:
receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
consumer.ack_message(receipt_handle_list)
print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
except MQExceptionBase, e:
print "\nAk Message Fail! Exception:%s" % e
#某些消息的句柄可能超时了会导致确认不成功
if e.sub_errors:
for sub_error in e.sub_errors:
print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])
具体的json数据解析请看另一篇文章:https://blog.csdn.net/u014535666/article/details/92848237
数据流转网址:https://help.aliyun.com/document_detail/73736.html?spm=a2c4g.11186623.6.599.3b4e2fcaPLzDhO
阿里物联网平台:https://iotnext.console.aliyun.com/lk/summary
消息服务控制台:https://mns.console.aliyun.com/?spm=5176.6660585.774526198.1.11946bf8o9CsYO#/logging/cn-beijing
微消息队列MQTT,消息队列RocketMQ:https://ons.console.aliyun.com/?spm=5176.11485173.0.0.23f759afSBZrGK#/?regionId=mq-internet-access
传感器上传数据到阿里云Iot,然后从阿里云Iot传输数据到我的服务器和数据库相关推荐
- 2-STM32+ESP8266连接onenet并上传数据(HTTP)
上一篇文章内容链接为下 1-ESP8266-AT指令初试化及部分基础知识 2-STM32+ESP8266连接onenet并上传数据(HTTP) 一.预备知识小插曲 ESP8266,onenet云平台, ...
- 从零开始,打造基于阿里IoT云平台的LoRa解决方案(3)_配置产品功能,将上传数据解析为阿里云平台数据格式
本篇是 <从零开始,打造基于阿里IoT云平台的LoRa解决方案>系列教程的第3 篇,将为大家讲解:1-如何配置产品功能?2-如何将产品的上传数据解析为阿里云平台数据格式? 查看阿里物联网平 ...
- RS485设备通过DTU上传数据到阿里云物联网平台
在开始之前,首先需要搞明白以下几个概念,RS485, Modbus协议和DTU. RS485,基础概念自行度娘,这里如果简单理解的话就是一种串行通信标准.非硬件工程师其实记住RS485有4条线,A,B ...
- MQTT协议-报文分析及网络客户端报文测试(MQTT报文连接阿里云上传数据+订阅数据)
文章目录 一.本文章所涉及到的内容 二.感性认识MQTT协议 三.准备信息 (一)工具获取 (二)获取信息 1.获取三元组信息 2.获取发布topic和订阅topic 3.客户端ID,用户名,哈希加密 ...
- 树莓派上传数据到onenet云平台
背景:通过树莓派上传数据到onenet云平台 操作:看代码 # -*- coding:utf-8 -*- # File: cputemp.py #向平台已经创建的数据流发送数据点 import url ...
- python分片上传_分片上传_分片上传_上传文件_Python_SDK 示例_对象存储 OSS - 阿里云...
OSS提供的分片上传(Multipart Upload)功能,将要上传的较大文件(Object)分成多个数据块(Part)来分别上传,上传完成后再调用CompleteMultipartUpload接口 ...
- 上传数据,直接分析,这才是真正的生物云
计算资源,生物软件安装,数据库配置,往往占据生物数据分析80%以上的时间,一直是阻挡生物数据分析的三座大山.为了实现我们"上传数据,直接分析"的理念.我们对计算资源的追求是无止境的 ...
- 怎么把手机文件导入华为云服务器,华为手机如何上传数据到云服务器
华为手机如何上传数据到云服务器 内容精选 换一换 对象存储服务OBS是华为云提供的稳定.安全.高效.易用的云存储服务,具备标准Restful API接口,可存储任意数量和形式的非结构化数据.弹性文件服 ...
- 使用FTP上传数据到云服务器 CuteFTP和LeapFTP软件使用教程
从本地向服务器上传大文件时通常需要采用FTP数据传输方式,本文整理了目前常用的CuteFTP和LeapFTP两款FTP第三方服务器软件,教大家如何将本地文件上传至服务器. CuteFTP软件下载 Le ...
最新文章
- Python爬虫常见面试题(二)
- 一起探讨NLP的边界和未来,学术界与工业界在“语言与智能高峰论坛”上擦出火花...
- 网络服务之DNS基本应用
- Android Activity的生命周期、意图(Intent)
- Servlet 请求处理
- QCon北京2016启动筹备 众多热点专题诚征演讲嘉宾
- css隐藏滚动条、兼容
- 批处理call和start
- UI设计灵感|仪表盘界面如何设计?优质案例给你帮助
- 几个北大和南开学霸公众号,值得学习
- paip.提升安全性--360,WI,AWVS三款WEB程序安全检测软件使用总结
- 第三方支付牌照(支付业务许可证)
- datawhale8月组队学习《pandas数据处理与分析》(下)(文本、分类、时序数据)
- 搜索引擎原理第三阶段之排名
- CANoe如何查看总线负载率?
- 这个Python自动扫雷算法写完了,估计看懂的人十不存一了吧
- 利用Bootstrap制作汉堡按钮(header部分)
- 猴子吃桃问题(记录自己的学习)
- (三十三 :2021.01.12)MICCAI 2016 追踪之论文纲要
- 考研数学模拟卷经典题总结
热门文章
- 虚拟服务器磁盘空间多大,虚拟机硬盘大小是从主机硬盘里划分的吗
- 线程间通信方式Linux,线程间的通信、同步方式与进程间通信方式
- 释义:Linear temporal logic (LTL)浅析
- matlab低通滤波器库函数代码_利用Matlab filterDesigner 工具生成FIR滤波器函数,并调用实现低通滤波...
- python word操作添加超链接_使用pythondocx在MSWord中添加超链接
- C++各大有名库的介绍——XML
- cfile read 最大读取限制_pandas数据处理:常用却不甚了解的函数,pd.read_excel()
- 几何画板绘制正方形网格的技巧
- html+css实现猫眼电影“榜单”静态页面
- 如何同步两台Linux机器的时间?