前言

InfluxDB 是一个用于存储和分析时间序列数据的开源数据库,内置 HTTP API,类 SQL 语句的支持和无结构的特性对使用者而言都非常友好。它强大的数据吞吐能力以及稳定的性能表现使其非常适合 IoT 领域。

通过 EMQ X 消息引擎,我们可以自定义 Template 文件,然后将 Json 格式的 MQTT 消息转换为 Measurement 写入 InfluxDB:

场景介绍

该场景需要将 EMQ X 指定主题下且满足条件的消息存储到 InfluxDB 时序数据库。为了便于后续分析检索,消息内容需要进行拆分存储。

该场景下客户端上报数据如下:

Topic:data/sensor

Payload:

{

"location": "bedroom",

"data": {

"temperature": 25,

"humidity": 46.4,

"pm2_5": 0.5

}

}

准备工作

数据库安装及初始化

创建 db 数据库并开放 8089 UDP 端口。

$ docker pull influxdb

$ git clone -b v1.0.0 https://github.com/palkan/influx_udp.git

$ cd influx_udp

$ docker run --name=influxdb --rm -d -p 8086:8086 -p 8089:8089/udp -v ${PWD}/files/influxdb.conf:/etc/influxdb/influxdb.conf:ro -e INFLUXDB_DB=db influxdb:latest

配置说明

创建资源

打开 EMQ X Dashboard,进入左侧菜单的 资源 页面,点击 新建 按钮,选择 InfluxDB 资源类型并完成相关配置进行资源创建。

创建规则

进入左侧菜单的 规则 页面,点击 新建 按钮,进行规则创建。触发事件 选择 message.publish,即在 EMQ X 收到 PUBLISH 消息时触发该规则进行数据处理。

选定触发事件后,我们可在界面上看到可选字段及示例 SQL:

筛选所需字段

规则引擎使用 SQL 语句过滤和处理数据。例如前文提到的场景中我们需要将 payload 中的字段提取出来使用,则可以通过 payload. 实现。同时我们仅仅期望处理 data/sensor 主题,那么可以在 WHERE 子句中使用主题通配符 =~ 对 topic 进行筛选:topic =~ 'data/sensor', 最终我们得到 SQL 如下:

SELECT

payload.location as location,

payload.data.temperature as temperature,

payload.data.humidity as humidity,

payload.data.pm2_5 as pm2_5

FROM

"message.publish"

WHERE

topic =~ 'data/sensor'

SQL 测试

借助 SQL 测试功能,我们可以快速确认刚刚填写的 SQL 语句是否能达到我们的目的。首先填写用于测试的 payload 等数据如下:

然后点击 测试 按钮,得到以下输出结果,与预期相符。

{

"humidity": 46.4,

"location": "bedroom",

"pm2_5": 0.5,

"temperature": 25

}

添加响应动作,存储消息到 InfluxDB

SQL 条件输入输出无误后,我们继续添加响应动作,配置写入 SQL 语句,将筛选结果存储到 InfluxDB。

点击响应动作中的 添加 按钮,选择动作 保存数据到 InfluxDB,选取刚刚创建的 InfluxDB 资源,再按照实际需求将 ${fieldName} 填写到 Field Keys, Tag Keys 和 Timestamp Key 中,Measurement 表示将数据写入 InfluxDB 时使用的 Measurement,最后点击 新建 按钮完成规则创建。

测试

预期结果

我们成功创建了一条规则,包含一个处理动作,动作期望效果如下:

客户端向 data/sensor 主题上报消息时,该消息将命中规则,规则列表中 已命中 数字将会增加 1;

InfluxDB 的 db 数据库中将会增加一条数据,数据内容与处理后的消息内容一致。

使用 Dashboard 中的 Websocket 工具测试

切换到 工具 --> Websocket 页面,使用任意 Client ID 连接到 EMQ X,连接成功后在 消息 卡片中发送如下消息:

Topic:data/sensor

Payload:

{

"location": "bedroom",

"data": {

"temperature": 25,

"humidity": 46.4,

"pm2_5": 0.5

}

}

点击 发送 按钮,发送成功后可以看到当前规则已命中次数已经变为 1。

然后检查 InfluxDB,新的 data point 是否添加成功:

$ docker exec -it influxdb influx

> use db

Using database db

> select * from "sensor_data"

name: sensor_data

time humidity location pm2_5 temperature

---- -------- -------- ----- -----------

1561535778444457348 46.4 bedroom 0.5 25

至此,我们通过规则引擎实现了存储消息到 InfluxDB 数据库的业务开发。

在阅读该教程之前,假定你已经了解 MQTT、EMQ X 的简单知识。

emq数据储存到mysql_EMQ X 规则引擎系列(三)存储消息到 InfluxDB 时序数据库相关推荐

  1. emq数据储存到mysql,规则引擎示例 - 保存数据到 MySQL - 《EMQ X Enterprise v4.0 中文文档》 - 书栈网 · BookStack...

    保存数据到 MySQL 搭建 MySQL 数据库,并设置用户名密码为 root/public,以 MacOS X 为例:$ brew install mysql $ brew services sta ...

  2. bs4爬取的时候有两个标签相同_PYTHON爬取数据储存到excel

    PYTHON爬取数据储存到excel 大家周末好呀,我是滑稽君.前两天上网课时朋友发来消息说需要爬取一些数据,然后储存到excel里.记得我公众号里发过关于爬虫的文章,所以找我帮个忙.于是滑稽君花时间 ...

  3. beautifulsoup爬取网页中的表格_PYTHON爬取数据储存到excel

    PYTHON爬取数据储存到excel 大家周末好呀,我是滑稽君.前两天上网课时朋友发来消息说需要爬取一些数据,然后储存到excel里.记得我公众号里发过关于爬虫的文章,所以找我帮个忙.于是滑稽君花时间 ...

  4. 大数据风控项目实战 Drools规则引擎

    可以借鉴的干货 1,统一存储服务,包含:多种存储库连接封装和服务封装 在统一存储服务 2.获取配置的环境 类:EnvVariable 一.风控项目介绍 对一个复杂支付系统提供统一.全面.高效的风险控制 ...

  5. 规则引擎系列—初识规则引擎

    什么是规则引擎 规则引擎由推理引擎发展而来,是一种嵌入在应用程序中的组件,实现了将业务决策从应用程序代码中分离出来,并使用预定义的语义模块编写业务决策.接受数据输入,解释业务规则,并根据业务规则做出业 ...

  6. 时间序列数据的存储和计算 - 开源时序数据库解析

    摘要: Prometheus 开源时序数据库解析的系列文章在之前已经完成了几篇,对比分析了Hbase系的OpenTSDB.Cassandra系的KairosDB.BlueFlood及Heroic,最后 ...

  7. 时间序列数据的存储和计算 - 开源时序数据库解析(一)

    开源时序数据库   如图是17年6月在db-engines上时序数据库的排名,我会挑选开源的.分布式的时序数据库做详细的解析.前十的排名中,RRD是一个老牌的单机存储引擎,Graphite底层是Whi ...

  8. influxdb时序数据库修改数据保存时间

    docker安装的时序数据库使用方式 1.portainer的web端进入时序数据库命令行(或者docker命令进入时序数据库容器内部) influx -username admin -passwor ...

  9. 规则引擎drools系列(一)

    规则引擎 Drools 1. 问题引出 现有一个在线申请信用卡的业务场景,用户需要录入个人信息,如下图所示: //此处为伪代码 ​ //检查用户信息合法性,返回true表示检查通过,返回false表示 ...

最新文章

  1. 免费的私人代码托管(bitbucket) 和 常用git指令
  2. vim 直接跳转到标签
  3. 【 C 】对左值与右值的一些个人思考
  4. spring中Validation设计与实现
  5. Vmware安装与使用
  6. 本地共享映射文件夹进行删除操作_从集群建立到航测建模CC(Smart3D)实用操作教程...
  7. c语言学习之用筛选法求100之内的素数。
  8. localdate计算相差天数_干掉 Date,LocalDate 真香!
  9. @Repository , @Service , @Controller 和 @Component
  10. sony android电视软件,Sony TV Remote app
  11. 多投资机构点评神州信息2019业绩:看好金融信创,给出增持评级!
  12. 在计算机上配置超级终端,超级终端设置步骤【详细步骤】
  13. 【MIKE21】mesh导入文件
  14. 【jvm jdk】锁状态位之偏向锁
  15. 猎头推荐转行大数据分析师骗局
  16. diskpart命令_Windows Diskpart命令教程
  17. 从不确定性中寻找“确定性”——一位交管从业者对行业发展趋势的思考
  18. Swift - 视频录制教程3(设置拍摄窗口大小,录制正方形视频)
  19. JSD-2204-Dubbo实现微服务调用-Seata-Day04
  20. 网页使用百度地图后,只显示灰色框框(已解决)

热门文章

  1. 5月19-20日WebRTCon 2018 梳理全球WebRTC技术实践与案例
  2. 人工智能在视频应用领域的探索
  3. leetcode 236. Lowest Common Ancestor of a Binary Tree | 236. 二叉树的最近公共祖先(Java)
  4. 【EasyUI】DataGrid自定义排序
  5. 【PAT甲级 链表去重】1097 Deduplication on a Linked List (25 分) C++ 全部AC
  6. C# 实验四 获取系统时间、点击加一秒功能
  7. Reactor中的Thread和Scheduler
  8. C++ unordered_map 使用详解(含C++20新特性)
  9. nodeJS 的 path.resolve() 用法解析
  10. 案例4-1.6 树种统计 (25 分)_18行代码AC