一种基于kafka实现物联设备数据精细化存取的方法

摘要:

基于kafka实现物联设备数据精细化存取的方法是指能够缓存物联网平台连接的所有设备的所有数据并且能高效精确地读取指定设备的指定类型数据的一种方法。属于物联网领域。本发明分为设备数据分类存储和精确获取目标数据两部分。其中设备数据分类存储部分包括设备数据的具体分类方法以进行缓存数据,精确获取目标数据部分包括通过动态订阅和动态拦截器实现精确筛选目标数据。

技术领域:

基于kafka生产者的通过设备分类和数据分类实现粗粒度设备数据存储和基于kafka消费者的动态订阅、动态拦截器实现精细化获取指定设备的指定类型数据。

背景技术:

所谓物联网技术,是把电子、通信、计算机三大领域的技术融合起来,在互联网的基础上实现物物相连。物联网是万物互联的基础,也是未来智慧工厂、智慧城市、智慧社区、智慧家庭等应用场景实现的基础。目前各国和各行业对物联网的应用力度都很大,物联设备数量急剧增加。
这些设备会产生大量的数据,这些数据并不能仅仅一次使用后就丢弃,因为这些数据有可能需要进行持久化存储、数据分析或支持物联网上层应用;当然也并不能将所有的数据都持久化存储,这会造成大量无意义的数据的存储带来的性能问题。因此,需要对这些设备的数据进行一定时间的缓存,并能够精确获取指定数据,以合理地解决数据的流转和处理问题。

常规的基于kafka存取的方案是在存储时将数据详细区分并创建对应的主题,再通过kafka消费者订阅所有主题,从而消费目标数据。或者创建少量kafka主题进行数据存储,在订阅所有主题并消费到所有数据后再对数据通过逻辑处理进行筛选。会存在以下几种问题:

  1. 设备数量庞大,所以创建的kafka主题数目会十分庞大,生成巨量文件,导致占用资源过多,并造成topic读写方式改变为随机读写,使得读写性能下降
  2. 应用启动时订阅所有topic以保证能消费到目标topic。因为如果要更新订阅需要重新启动应用
  3. 消费topic数据时,会消费所有数据,造成不必要的性能下降。

技术方案:

设备数据分类存储

kafka中间件具有高通量、低延迟、容错性、消息持久性的特点,还具有可扩展性、高并发性、实时处理等优势。最终能够达到实时处理和缓存大量设备数据的目的。
设备数据存储包含以下步骤:

(一) 按项目进行设备分类

将每台设备划分到产品,再将产品划分到项目。一个项目下连接大量的设备。

(二)按数据意义进行数据分类

每台设备的数据按照设备上下线状态、设备采集信息、设备拓扑关系等维度进行不同意义的分类

(三)创建kafka主题集合

按照项目数量和数据意义数量的笛卡尔积来创建kafka主题集合。kafka主题创建示例:项目集合为A,数据意义集合为B,则
kafka topic =A×B={(x,y)|x∈A∧y∈B}

(四)将属于相同项目且具有相同意义的数据通过kafka生产者存储到同一个对应主题。

精确获取目标数据:

在经过步骤1将设备数据存储到kafka后。在获取指定设备的指定类型数据时便通过对kafka存储的数据进行消费,这个过程既可以是异步读取历史的数据,也可以同步进行数据的流转。
精确获取目标数据包含以下步骤:

(一) 确定数据所属kafka主题。

获取设备数据的第一步是确定数据来源,即kafka主题。通过确定项目编码和数据意义来确定kafka的topic。例如设备A属于项目B,同时要获取设备状态status,就可以确定kafka主题为BmqttStatus。

(二) 动态订阅kafka主题。

kafka采用的是发布-订阅式的模式,所以获取kafka中的数据,首先需要对上一步确定的主题进行订阅。本发明支持动态订阅,以满足使用者实时变更订阅主题和减少不必要的订阅。具体实现方式为:借助Java提供的ConcurrentLinkedQueue来实现。
① 构建ConcurrentLinkedQueue对象分别给两个线程使用(这里并不限定于两个线程,但这个需求最可能的实际场景是consumer主线程和一个后台管理类的用户线程,而后者负责触发“动态修改订阅”逻辑)
② 调用KafkaConsumer.poll(timeout)来不断消费消息。
③ 每次poll之后尝试去探查ConcurrentLinkedQueue是否有新内容(如果有说明订阅topic列表发生变化),有则进行响应。
④ 使用另一个线程向ConcurrentLinkedQueue中插入新的订阅信息

(三)使用动态消费者拦截器筛选至特定设备

订阅到kafka主题后就可以从中消费到存储的数据,但是数据中包含了大量不属于目标设备的数据。通过动态自定义消费者拦截器实现对目标设备数据进行筛选拦截。在消费时,每改变一次目标设备,生成新的拦截器对象,并通过更新拦截器中产品和设备编码的值,来实现目标数据筛选。

有益效果:

本方法的有益效果在于,与现有技术相比,本方法中一种基于kafka实现物联设备数据精细化存取的方法能够在物联管理平台中配置设备和设备数据分类规则以进行缓存数据,并根据现场环境对获取数据时的订阅主题进行灵活修改,同时形成了主题内容的自动拦截,从而提高了数据流转和数据处理的灵活程度,精确了获取数据的粒度,提高了从缓存中获取目标数据的速度和准确度。同时,本方法减轻了对物联管理平台的网络流量和数据流转等性能的冲击,提高了平台运行的稳定性。

附图说明

1、 流程图

整体分析图

一种基于kafka实现物联网设备数据精细化存取的方法相关推荐

  1. 物联网设备数据是如何流转的:基于EMQX与TDengine的前后端分离项目实践

    背景 在我写了TDengine极简实战:从采集到入库,从前端到后端,体验物联网设备数据流转这篇文章后,不少读者朋友评论.私信说可不可以提供代码参考学习下,那必须是可以的.那篇文章主要介绍了数据采集.数 ...

  2. TDengine极简实战:从采集到入库,从前端到后端,体验物联网设备数据流转

    作者:牛晓青 背景 我们的项目涉及物联网相关业务,由于一开始的年少无知,传感器数据采用了 MySQL 进行存储,经过近两年的数据累积,目前几个核心表单表数据已过亿,虽然通过索引优化. SQL 优化以及 ...

  3. 物联网设备数据流转之数据如何实时推送至前端:WebSocket前端接收

    背景 在实现 WebSocket 前端接收前,我们先说明白一件事,为什么要使用WebSocket? 这要从 HTTP 协议说起,我们知道 HTTP 协议只能由客户端发起,而且是短链接,这就会导致我们在 ...

  4. 物联网设备数据流转之搭建前端服务框架:Vue3.0, ElementPlus, Axios, Echarts

    背景 有了后端服务接口,我们就要开始前端项目搭建啦,终于可以看到展示物联网设备数据的页面了.这篇文章搭建基于最新版 Vue 3.2.13 . ElementPlus 2.1.9 的极简前端脚手架,方便 ...

  5. kafka mysql 迁移_一种Kafka与Elasticsearch数据库数据的互相迁移方法与流程

    本发明属于数据库迁移领域,具体地讲涉及一种kafka与elasticsearch数据库数据的互相迁移方法. 背景技术: 实现数据共享,可以使更多的人更充分地使用已有数据资源,减少资料收集.数据采集等重 ...

  6. 物联网设备数据流转之数据如何导出:Excel文件

    背景 其实,到现在为止,前面的页面已经实现了对设备数据的展现.这一篇文章中对当前的项目做一个增强: 绘制Echarts 图表展示数据变化趋势: 导出设备数据到Excel,体验TDengine的查询效率 ...

  7. 物联网设备数据流转之数据如何实时推送至前端:WebSocket服务端推送

    背景 还记得,我们在物联网设备数据流转之实时数据从哪里来.如何转发:Node.js, MQTT, EMQX的WebHook这篇文章中,当 EMQX 的 WebHook 收到来自设备的消息时,我们当时只 ...

  8. 物联网设备数据流转之数据何时存储:Spring事件及监听机制, 数据入库

    背景 还记得,我们在物联网设备数据流转之实时数据从哪里来.如何转发:Node.js, MQTT, EMQX的WebHook这篇文章中,当 EMQX 的 WebHook 收到来自设备的消息时,我们当时只 ...

  9. 云计算机是一种基于资源,一种基于云平台和云计算的资源管理系统和方法

    一种基于云平台和云计算的资源管理系统和方法 [技术领域] [0001]本发明涉及云计算系统,尤其涉及一种基于云平台和云计算的资源管理系统和方法. [背景技术] [0002]云计算(Cloud Comp ...

最新文章

  1. ThreadPoolExecutor 的八种拒绝策略 | 含番外!
  2. 为全局变量赋值_Postman全局变量设置和运用
  3. 亚洲杯:打平韩国即可小组第一 国足会继续带来惊喜吗?
  4. Python学习资源 | 3个高性能开源计算机视觉库
  5. (chap8 确认访问用户身份的认证) BASIC认证(基本认证)
  6. 10.16 ln软硬链接的创建等
  7. mysql逻辑备份之mysqldump
  8. Redis常用数据类型的数据结构
  9. uniapp页面传参使用encodeURIComponent转义特殊符号
  10. stella forum v1.2 的初始设计模型
  11. WSL2 即将普遍可用,Linux 内核提供方式改变
  12. 【异常】No suitable driver
  13. 矩阵分解程序及报告:LU分解、QR分解、Householder变换、Givens变换、URV分解
  14. 基于51单片机的红外遥控器设计
  15. 华为最强科普:什么是DSP?
  16. EasySwoole3 Crontab的使用
  17. distill_bert和tiny_bert
  18. 心知天气API如何调用与json数据如何显示
  19. 争议不断的AI绘画,靠什么成为了顶流?
  20. flux 中的 buffer 的原理

热门文章

  1. 基于 VANET 的车辆网络通信系统的 Matlab 仿真实现
  2. 未来网页设计的流行趋势是什么
  3. 【机器学习】线性回归【下】正则化最小二乘估计
  4. FRP服务 内网穿透
  5. Gartner CRM魔力象限:不同企业需要什么样的CRM?
  6. 好不容易约来了一名程序员来面试,结果又被否定了,心累
  7. Etnetera Brevity Challenge CTUOpenContest2016/Gym101505K
  8. Informer--用于长序列时序预测【2021AAAI Best Paper】
  9. 在OpenCV里实现负片函数imcomplement
  10. 用python画雪人-python中的多态和继承