MongoDB 消息存储

配置 MongoDB 消息存储

配置文件: emqx_backend_mongo.conf

配置 MongoDB 服务器

支持配置多台 MongoDB 服务器连接池:## MongoDB 集群类型: single | unknown | sharded | rs

backend.mongo.pool1.type=single

## 如果 type = rs; 需要配置 setname

## backend.mongo.pool1.rs_set_name = testrs

## MongoDB 服务器地址列表

backend.mongo.pool1.server=127.0.0.1:27017

## MongoDB 连接池大小

backend.mongo.pool1.c_pool_size=8

## 连接的数据库名称

backend.mongo.pool1.database=mqtt

## MongoDB 认证用户名密码

## backend.mongo.pool1.login = emqtt

## backend.mongo.pool1.password = emqtt

## MongoDB 认证源

## backend.mongo.pool1.auth_source = admin

## 是否开启 SSL

## backend.mongo.pool1.ssl = false

## SSL 密钥文件路径

## backend.mongo.pool1.keyfile =

## SSL 证书文件路径

## backend.mongo.pool1.certfile =

## SSL CA 证书文件路径

## backend.mongo.pool1.cacertfile =

## MongoDB 数据写入模式: unsafe | safe

## backend.mongo.pool1.w_mode = safe

## MongoDB 数据读取模式: master | slaver_ok

## backend.mongo.pool1.r_mode = slave_ok

## MongoDB 底层 driver 配置, 保持默认即可

## backend.mongo.topology.pool_size = 1

## backend.mongo.topology.max_overflow = 0

## backend.mongo.topology.overflow_ttl = 1000

## backend.mongo.topology.overflow_check_period = 1000

## backend.mongo.topology.local_threshold_ms = 1000

## backend.mongo.topology.connect_timeout_ms = 20000

## backend.mongo.topology.socket_timeout_ms = 100

## backend.mongo.topology.server_selection_timeout_ms = 30000

## backend.mongo.topology.wait_queue_timeout_ms = 1000

## backend.mongo.topology.heartbeat_frequency_ms = 10000

## backend.mongo.topology.min_heartbeat_frequency_ms = 1000

## MongoDB Backend Hooks

backend.mongo.hook.client.connected.1={"action":{"function":"on_client_connected"},"pool":"pool1"}

backend.mongo.hook.session.created.1={"action":{"function":"on_subscribe_lookup"},"pool":"pool1"}

backend.mongo.hook.client.disconnected.1={"action":{"function":"on_client_disconnected"},"pool":"pool1"}

backend.mongo.hook.session.subscribed.1={"topic":"#","action":{"function":"on_message_fetch"},"pool":"pool1","offline_opts":{"time_range":"2h","max_returned_count":500}}

backend.mongo.hook.session.subscribed.2={"topic":"#","action":{"function":"on_retain_lookup"},"pool":"pool1"}

backend.mongo.hook.session.unsubscribed.1={"topic":"#","action":{"function":"on_acked_delete"},"pool":"pool1"}

backend.mongo.hook.message.publish.1={"topic":"#","action":{"function":"on_message_publish"},"pool":"pool1"}

backend.mongo.hook.message.publish.2={"topic":"#","action":{"function":"on_message_retain"},"pool":"pool1"}

backend.mongo.hook.message.publish.3={"topic":"#","action":{"function":"on_retain_delete"},"pool":"pool1"}

backend.mongo.hook.message.acked.1={"topic":"#","action":{"function":"on_message_acked"},"pool":"pool1"}

## 获取离线消息

## "offline_opts": 获取离线消息的配置

## - max_returned_count: 单次拉去的最大离线消息数目

## - time_range: 仅拉去在当前时间范围的消息

## backend.mongo.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1", "offline_opts": {"time_range": "2h", "max_returned_count": 500}}

## 如果需要存储 Qos0 消息, 可开启以下配置

## 警告: 当开启以下配置时, 需关闭 'on_message_fetch', 否则 qos1, qos2 消息会被存储俩次

## backend.mongo.hook.message.publish.4 = {"topic": "#", "action": {"function": "on_message_store"}, "pool": "pool1", "payload_format": "mongo_json"}

backend 消息存储规则包括:hooktopicaction说明client.connectedon_client_connected存储客户端在线状态

session.createdon_subscribe_lookup订阅主题

client.disconnectedon_client_disconnected存储客户端离线状态

session.subscribed#on_message_fetch获取离线消息

session.subscribed#on_retain_lookup获取retain消息

session.unsubscribed#on_acked_delete删除 acked 消息

message.publish#on_message_publish存储发布消息

message.publish#on_message_retain存储retain消息

message.publish#on_retain_delete删除retain消息

message.acked#on_message_acked消息ACK处理

MongoDB 数据库初始化usemqtt

db.createCollection("mqtt_client")

db.createCollection("mqtt_sub")

db.createCollection("mqtt_msg")

db.createCollection("mqtt_retain")

db.createCollection("mqtt_acked")

db.mqtt_client.ensureIndex({clientid:1,node:2})

db.mqtt_sub.ensureIndex({clientid:1})

db.mqtt_msg.ensureIndex({sender:1,topic:2})

db.mqtt_retain.ensureIndex({topic:1})

Tip

数据库名称可自定义

MongoDB 用户状态集合(Client Collection)

mqtt_client 存储设备在线状态:{

clientid:string,

state:0,1,//0离线 1在线

node:string,

online_at:timestamp,

offline_at:timestamp

}

查询设备在线状态:db.mqtt_client.findOne({clientid:${clientid}})

例如 ClientId 为 test 客户端上线:db.mqtt_client.findOne({clientid:"test"})

{

"_id":ObjectId("58646c9bdde89a9fb9f7fb73"),

"clientid":"test",

"state":1,

"node":"emqx@127.0.0.1",

"online_at":1482976411,

"offline_at":null

}

例如 ClientId 为 test 客户端下线:db.mqtt_client.findOne({clientid:"test"})

{

"_id":ObjectId("58646c9bdde89a9fb9f7fb73"),

"clientid":"test",

"state":0,

"node":"emqx@127.0.0.1",

"online_at":1482976411,

"offline_at":1482976501

}

MongoDB 用户订阅主题集合(Subscription Collection)

mqtt_sub 存储订阅关系:{

clientid:string,

topic:string,

qos:0,1,2

}

用户 test 分别订阅主题 test_topic0 test_topic1 test_topic2:db.mqtt_sub.insert({clientid:"test",topic:"test_topic1",qos:1})

db.mqtt_sub.insert({clientid:"test",topic:"test_topic2",qos:2})

某个客户端订阅主题:db.mqtt_sub.find({clientid:${clientid}})

查询 ClientId 为 “test” 的客户端已订阅主题:db.mqtt_sub.find({clientid:"test"})

{"_id":ObjectId("58646d90c65dff6ac9668ca1"),"clientid":"test","topic":"test_topic1","qos":1}

{"_id":ObjectId("58646d96c65dff6ac9668ca2"),"clientid":"test","topic":"test_topic2","qos":2}

MongoDB 发布消息集合(Message Collection)

mqtt_msg 存储 MQTT 消息:{

_id:int,

topic:string,

msgid:string,

sender:string,

qos:0,1,2,

retain:boolean(true,false),

payload:string,

arrived:timestamp

}

查询某个客户端发布的消息:db.mqtt_msg.find({sender:${clientid}})

查询 ClientId 为 “test” 的客户端发布的消息:db.mqtt_msg.find({sender:"test"})

{

"_id":1,

"topic":"/World",

"msgid":"AAVEwm0la4RufgAABeIAAQ==",

"sender":"test",

"qos":1,

"retain":1,

"payload":"Hello world!",

"arrived":1482976729

}

MongoDB 保留消息集合(Retain Message Collection)

mqtt_retain 存储 Retain 消息:{

topic:string,

msgid:string,

sender:string,

qos:0,1,2,

payload:string,

arrived:timestamp

}

查询 retain 消息:db.mqtt_retain.findOne({topic:${topic}})

查询topic为 “t/retain” 的 retain 消息:db.mqtt_retain.findOne({topic:"t/retain"})

{

"_id":ObjectId("58646dd9dde89a9fb9f7fb75"),

"topic":"t/retain",

"msgid":"AAVEwm0la4RufgAABeIAAQ==",

"sender":"c1",

"qos":1,

"payload":"Hello world!",

"arrived":1482976729

}

MongoDB 接收消息 ack 集合(Message Acked Collection)

mqtt_acked 存储客户端消息确认:{

clientid:string,

topic:string,

mongo_id:int

}

启用 MongoDB 数据存储插件./bin/emqx_ctl plugins load emqx_backend_mongo

emqtt数据存储mysql,数据存储 - MongoDB 消息存储 - 《EMQ X Enterprise v3.0 中文文档》 - 书栈网 · BookStack...相关推荐

  1. emq数据储存到mysql,规则引擎示例 - 保存数据到 MySQL - 《EMQ X Enterprise v4.0 中文文档》 - 书栈网 · BookStack...

    保存数据到 MySQL 搭建 MySQL 数据库,并设置用户名密码为 root/public,以 MacOS X 为例:$ brew install mysql $ brew services sta ...

  2. mysql t 保存_检查 (调试) - 离线消息保存到 MySQL - 《EMQ X Enterprise v4.1 中文文档》 - 书栈网 · BookStack...

    离线消息保存到 MySQL 搭建 MySQL 数据库,并设置用户名密码为 root/public,以 MacOS X 为例: $ brew install mysql $ brew services ...

  3. mysql集群跨地域同步部署_跨地域冗余 - 跨数据中心部署方案 - 《TiDB v2.1 用户文档》 - 书栈网 · BookStack...

    跨数据中心部署方案 作为 NewSQL 数据库,TiDB 兼顾了传统关系型数据库的优秀特性以及 NoSQL 数据库可扩展性,以及跨数据中心(下文简称"中心")场景下的高可用.本文档 ...

  4. mysql pmod项目_内置函数 - 数学函数 - 《Apache Doris 文档(201812)》 - 书栈网 · BookStack...

    数学函数 abs(double a) 功能: 返回参数的绝对值 返回类型:double类型 使用说明:使用该函数需要确保函数的返回值是整数. acos(double a) 功能: 返回参数的反余弦值 ...

  5. mysql 中间件 租户管理_多租户的设计与实现 - 《Gaea - MySQL 中间件》 - 书栈网 · BookStack...

    多租户的设计与实现 背景 gaea多租户是为了实现一套gaea集群,可以接入多个业务系统的不同数据库,方便部署.运维.gaea多租户为软多租户,一个租户称为一个namespace,多个namespac ...

  6. 【Scikit-Learn 中文文档】34 预处理数据 - 数据集转换 - 用户指南 | ApacheCN

    中文文档: http://sklearn.apachecn.org/cn/stable/modules/preprocessing.html 英文文档: http://sklearn.apachecn ...

  7. 【更新】全新的数据填报,更强的BI分析,Wyn Enterprise V3.0 Update 2 发布

    Wyn Enterprise 专注于商业智能和数据分析的需要,一个产品同时提供多源数据整合.自助式 BI 分析.数据可视化.在线报表设计,以及数据填报等多项功能,企业用户可独立部署使用,软件公司也可进 ...

  8. 存储mysql数据存在特殊字符时处理_转义 存储数据时特殊符号的处理

    function url_base64_encode($str){ //将这个方法处理后的数据可以存储,不会有特殊符号 if($str=="") return "&quo ...

  9. 存储mysql数据存在特殊字符时处理_SQL数据库对于保存特殊字符的解决办法

    数据库的Char.Vachar类型可以兼容汉字,但特殊字符不行,在保存包含有特殊字符的字符串.正文时,会将特殊符号替换成一个"?"号. 例如: "基础教育课程手机报•特刊 ...

最新文章

  1. Maven报错解决:Element 'dependency' cannot have character [children], because the type's content type is
  2. hibernate 时间 默认值 -(mysql) 注解_注解@ -shihchang
  3. 机器学习:SVM多分类,SVM回归(SVR)
  4. 【es】 check-rollover-ready read index [alinas-lcc] is not the wtiter index for alians [index-xx]
  5. java 盘符 系统_JAVA小白预备内容
  6. div contenteditable 换行_contenteditable跟style标签可真是天生一对
  7. UEFI+GPT安装Win10和RHEL6.5双系统
  8. VS2012中WebAPI项目中的IHttpActionResult不识别的问题----解决办法
  9. add file in debian/source/include-binaries if you want to store the modified binary in the debian
  10. 页面里引入电子表字体
  11. 《信号与系统》4.11.2系统函数的幅频特性和相频特性分析 MATLAB实现
  12. 资源变现微信小程序安装教程
  13. XP下微软雅黑粗体不起作用(不能显示)的解决方法
  14. 工控系统的全球安全现状:全球漏洞实例分析
  15. 改造Android手机为,便携式linux服务器,跑tomcat
  16. 情人节送男友什么有新意、2022送礼指南
  17. 寻址方法有哪些-七种数据寻址-三种内存寻址
  18. Typora配置图床
  19. 树莓派4B安装系统,配置远程连接和WiFi,更新源,更新中文支持,基本Linux命令,用Python输出hello和“你好,世界”
  20. 最新苹果开发者账号申请流程

热门文章

  1. lr增强细节_LR和PS如何进行风光后期细节增强
  2. pixel(css pixel device pixel)resolutioncss pixel(像素和分辨率)
  3. 网站建设常用的cms建站系统推荐
  4. 如何撰写好技术方案设计-真实案例分享
  5. FIRST集和FOLLOW集,FIRSTVT集和LASTVT集总结
  6. 小学计算机教案 认识鼠标,认识鼠标》
  7. 2020届顺丰科技视觉算法工程师提前批面经
  8. .DS_Store 文件是什么? / .DS_Store 文件是什么macOS
  9. zz .Net 实现游戏修改器
  10. 全网最全Android开发工具,Android开发框架大全