整个系统架构如下:

MQTT 和RocketMQ的区别:


1、传感器采用GPRS传输数据,采用微消息队列MQTT,微消息队列 MQTT 主要承担移动端连接接入、连接管理、数据转发等工作,相当于一个无限扩展能力的连接网关。传感器传送数据到阿里云Iot。MQTT支持Python,java,有相应的sdk,地址如下:

https://help.aliyun.com/product/100973.html?spm=a2c4g.11186623.6.540.791c5695zBTImA

阿里云Iot的设备传输数据格式是json格式,采用post方式提交数据,具体格式如下:

设备属性上报

通过该Topic获取设备上报的属性信息。

Topic:/sys/{productKey}/{deviceName}/thing/event/property/post

数据格式:

{"iotId":"4z819VQHk6VSLmmBJfrf00107ee200","productKey":"1234556554","deviceName":"deviceName1234","gmtCreate":1510799670074,"deviceType":"Ammeter","items":{"Power":{"value":"on","time":1510799670074},"Position":{"time":1510292697470,"value":{"latitude":39.9,"longitude":116.38}}}
}

参数说明:

参数 类型 说明
iotId String 设备在平台内的唯一标识
productKey String 设备所属产品的唯一标识
deviceName String 设备名称
deviceType String 设备类型
items Object 设备数据
Power String 属性名称,产品所具有的属性名称请参考TSL描述
Position String 属性名称,产品所具有的属性名称请参考TSL描述
value 根据TSL定义 属性值
time Long 属性产生时间,如果设备没有上报默认采用云端生成时间
gmtCreate Long 数据流转消息产生时间

发送方式如下:

MQTT采用topic形式发送数据,生产者产生数据,通过topic进行发送,消费者订阅消息,接收消息。

2、MQTT将消息进行转储,使用RocketMQ进行转储,RocketMQ支持HTTP协议,有相应的sdk开发包,进行开发,如Python,java,通过ip地址直接订阅消息,地址如下:https://help.aliyun.com/product/29530.html?spm=a2c4g.11186623.6.540.71694fb3z3Z3Kg

RocketMQ是一个消息队列,吞吐性能强大。格式如下,

自定义生产者与消费者如下:

producer

发送消息
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("pay_topic_01");
        producer.setNamesrvAddr("100.8.8.88:9876");
        producer.start(); 
        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "TagA",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        producer.shutdown();
    }
}

订阅消息
public class Consumer { 
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("100.8.8.88:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); 
        consumer.subscribe("TopicTest", "*"); 
        consumer.registerMessageListener(new MessageListenerConcurrently() { 
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
         consumer.start(); 
        System.out.println("Consumer Started.");
    }

3、阿里云官方Python 的sdk中的producer和consumer如下:

(1)producer

#初始化 client
mq_client = MQClient(
    #设置HTTP接入域名(此处以公共云生产环境为例)
    "${HTTP_ENDPOINT}",
    #AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    "${ACCESS_KEY}",
    #SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    "${SECRET_KEY}"
    )
#所属的 Topic
topic_name = "${TOPIC}"
#Topic所属实例ID,默认实例为空None
instance_id = "${INSTANCE_ID}"

producer = mq_client.get_producer(instance_id, topic_name)

# 循环发布多条消息
msg_count = 100
print "%sPublish Message To %s\nTopicName:%s\nMessageCount:%s\n" % (10 * "=", 10 * "=", topic_name, msg_count)

try:
    for i in range(msg_count):
        msg_body = "I am test message %s." % i
        msg = TopicMessage(
            # 消息内容
            "I am test message %s." % i, 
            # 消息标签
            ""
        )
        re_msg = producer.publish_message(msg)
        print "Publish Message Succeed. MessageID:%s, BodyMD5:%s" % (re_msg.message_id, re_msg.message_body_md5)
        time.sleep(1)
except MQExceptionBase, e:
    if e.type == "TopicNotExist":
        print "Topic not exist, please create it."
        sys.exit(1)
    print "Publish Message Fail. Exception:%s" % e

(2)consumer

重要的是message的body中的数据,取出数据即可实现通讯。

#初始化 client
mq_client = MQClient(
    #设置HTTP接入域名(此处以公共云生产环境为例)
    "${HTTP_ENDPOINT}",
    #AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
    "${ACCESS_KEY}",
    #SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
    "${SECRET_KEY}"
  )
#所属的 Topic
topic_name = "${TOPIC}"
#您在控制台创建的 Consumer ID(Group ID)
group_id = "${GROUP_ID}"
#Topic所属实例ID,默认实例为空None
instance_id = "${INSTANCE_ID}"

consumer = mq_client.get_consumer(instance_id, topic_name, group_id)

#长轮询表示如果topic没有消息则请求会在服务端挂住3s,3s内如果有消息可以消费则立即返回
#长轮询时间3秒(最多可设置为30秒)
wait_seconds = 3
#一次最多消费3条(最多可设置为16条)
batch = 3
print "%sConsume And Ak Message From Topic%s\nTopicName:%s\nMQConsumer:%s\nWaitSeconds:%s\n" % (10 * "=", 10 * "=", topic_name, group_id, wait_seconds)
while True:
    try:
        #长轮询消费消息
        recv_msgs = consumer.consume_message(batch, wait_seconds)
        for msg in recv_msgs:
            print "Receive, MessageId: %s\nMessageBodyMD5: %s \
                              \nMessageTag: %s\nConsumedTimes: %s \
                              \nPublishTime: %s\nBody: %s \
                              \nNextConsumeTime: %s \
                              \nReceiptHandle: %s" % \
                             (msg.message_id, msg.message_body_md5,
                              msg.message_tag, msg.consumed_times,
                              msg.publish_time, msg.message_body,//这里封装的数据问题
                              msg.next_consume_time, msg.receipt_handle)
    except MQExceptionBase, e:
        if e.type == "MessageNotExist":
            print "No new message! RequestId: %s" % e.req_id
            continue

print "Consume Message Fail! Exception:%s\n" % e
        time.sleep(2)
        continue

#msg.next_consume_time前若不确认消息消费成功,则消息会重复消费
    #消息句柄有时间戳,同一条消息每次消费拿到的都不一样
    try:
        receipt_handle_list = [msg.receipt_handle for msg in recv_msgs]
        consumer.ack_message(receipt_handle_list)
        print "Ak %s Message Succeed.\n\n" % len(receipt_handle_list)
    except MQExceptionBase, e:
        print "\nAk Message Fail! Exception:%s" % e
        #某些消息的句柄可能超时了会导致确认不成功
        if e.sub_errors:
          for sub_error in e.sub_errors:
            print "\tErrorHandle:%s,ErrorCode:%s,ErrorMsg:%s" % (sub_error["ReceiptHandle"], sub_error["ErrorCode"], sub_error["ErrorMessage"])

具体的json数据解析请看另一篇文章:https://blog.csdn.net/u014535666/article/details/92848237

数据流转网址:https://help.aliyun.com/document_detail/73736.html?spm=a2c4g.11186623.6.599.3b4e2fcaPLzDhO

阿里物联网平台:https://iotnext.console.aliyun.com/lk/summary

消息服务控制台:https://mns.console.aliyun.com/?spm=5176.6660585.774526198.1.11946bf8o9CsYO#/logging/cn-beijing

微消息队列MQTT,消息队列RocketMQ:https://ons.console.aliyun.com/?spm=5176.11485173.0.0.23f759afSBZrGK#/?regionId=mq-internet-access

传感器上传数据到阿里云Iot,然后从阿里云Iot传输数据到我的服务器和数据库相关推荐

  1. 2-STM32+ESP8266连接onenet并上传数据(HTTP)

    上一篇文章内容链接为下 1-ESP8266-AT指令初试化及部分基础知识 2-STM32+ESP8266连接onenet并上传数据(HTTP) 一.预备知识小插曲 ESP8266,onenet云平台, ...

  2. 从零开始,打造基于阿里IoT云平台的LoRa解决方案(3)_配置产品功能,将上传数据解析为阿里云平台数据格式

    本篇是 <从零开始,打造基于阿里IoT云平台的LoRa解决方案>系列教程的第3 篇,将为大家讲解:1-如何配置产品功能?2-如何将产品的上传数据解析为阿里云平台数据格式? 查看阿里物联网平 ...

  3. RS485设备通过DTU上传数据到阿里云物联网平台

    在开始之前,首先需要搞明白以下几个概念,RS485, Modbus协议和DTU. RS485,基础概念自行度娘,这里如果简单理解的话就是一种串行通信标准.非硬件工程师其实记住RS485有4条线,A,B ...

  4. MQTT协议-报文分析及网络客户端报文测试(MQTT报文连接阿里云上传数据+订阅数据)

    文章目录 一.本文章所涉及到的内容 二.感性认识MQTT协议 三.准备信息 (一)工具获取 (二)获取信息 1.获取三元组信息 2.获取发布topic和订阅topic 3.客户端ID,用户名,哈希加密 ...

  5. 树莓派上传数据到onenet云平台

    背景:通过树莓派上传数据到onenet云平台 操作:看代码 # -*- coding:utf-8 -*- # File: cputemp.py #向平台已经创建的数据流发送数据点 import url ...

  6. python分片上传_分片上传_分片上传_上传文件_Python_SDK 示例_对象存储 OSS - 阿里云...

    OSS提供的分片上传(Multipart Upload)功能,将要上传的较大文件(Object)分成多个数据块(Part)来分别上传,上传完成后再调用CompleteMultipartUpload接口 ...

  7. 上传数据,直接分析,这才是真正的生物云

    计算资源,生物软件安装,数据库配置,往往占据生物数据分析80%以上的时间,一直是阻挡生物数据分析的三座大山.为了实现我们"上传数据,直接分析"的理念.我们对计算资源的追求是无止境的 ...

  8. 怎么把手机文件导入华为云服务器,华为手机如何上传数据到云服务器

    华为手机如何上传数据到云服务器 内容精选 换一换 对象存储服务OBS是华为云提供的稳定.安全.高效.易用的云存储服务,具备标准Restful API接口,可存储任意数量和形式的非结构化数据.弹性文件服 ...

  9. 使用FTP上传数据到云服务器 CuteFTP和LeapFTP软件使用教程

    从本地向服务器上传大文件时通常需要采用FTP数据传输方式,本文整理了目前常用的CuteFTP和LeapFTP两款FTP第三方服务器软件,教大家如何将本地文件上传至服务器. CuteFTP软件下载 Le ...

最新文章

  1. Python爬虫常见面试题(二)
  2. 一起探讨NLP的边界和未来,学术界与工业界在“语言与智能高峰论坛”上擦出火花...
  3. 网络服务之DNS基本应用
  4. Android Activity的生命周期、意图(Intent)
  5. Servlet 请求处理
  6. QCon北京2016启动筹备 众多热点专题诚征演讲嘉宾
  7. css隐藏滚动条、兼容
  8. 批处理call和start
  9. UI设计灵感|仪表盘界面如何设计?优质案例给你帮助
  10. 几个北大和南开学霸公众号,值得学习
  11. paip.提升安全性--360,WI,AWVS三款WEB程序安全检测软件使用总结
  12. 第三方支付牌照(支付业务许可证)
  13. datawhale8月组队学习《pandas数据处理与分析》(下)(文本、分类、时序数据)
  14. 搜索引擎原理第三阶段之排名
  15. CANoe如何查看总线负载率?
  16. 这个Python自动扫雷算法写完了,估计看懂的人十不存一了吧
  17. 利用Bootstrap制作汉堡按钮(header部分)
  18. 猴子吃桃问题(记录自己的学习)
  19. (三十三 :2021.01.12)MICCAI 2016 追踪之论文纲要
  20. 考研数学模拟卷经典题总结

热门文章

  1. 虚拟服务器磁盘空间多大,虚拟机硬盘大小是从主机硬盘里划分的吗
  2. 线程间通信方式Linux,线程间的通信、同步方式与进程间通信方式
  3. 释义:Linear temporal logic (LTL)浅析
  4. matlab低通滤波器库函数代码_利用Matlab filterDesigner 工具生成FIR滤波器函数,并调用实现低通滤波...
  5. python word操作添加超链接_使用pythondocx在MSWord中添加超链接
  6. C++各大有名库的介绍——XML
  7. cfile read 最大读取限制_pandas数据处理:常用却不甚了解的函数,pd.read_excel()
  8. 几何画板绘制正方形网格的技巧
  9. html+css实现猫眼电影“榜单”静态页面
  10. 如何同步两台Linux机器的时间?