目录

场景:设备监控

解决方案架构

先决条件:

设置基础设施组件

设置本地服务

MQTT代理

Grafana

Kafka 连接

创建MQTT源连接器实例

部署设备数据处理器应用程序

启动模拟设备数据生成器

享受Grafana仪表板!

那么,您想运行一些RedisTimeSeries命令吗?

删除资源

其他注意事项

优化RedisTimeSeries

长期数据保留怎么样?

可扩展性

结论


RedisTimeSeries是一个Redis模块,它为Redis带来了原生的时间序列数据结构。时间序列解决方案早先建立在排序集(或Redis流)之上,可以从RedisTimeSeries功能中受益,例如高容量插入、低延迟读取、灵活的查询语言、下采样等等!

一般来说,时间序列数据(相对)简单。话虽如此,我们还需要考虑其他特征:

  • 数据速度:例如,考虑每秒来自数千台设备的数百个指标
  • 容量(大数据):考虑数月(甚至数年)的数据积累

因此,RedisTimeSeries等数据库只是整体解决方案的一部分。您还需要考虑如何收集(摄取)、处理所有数据并将其发送到RedisTimeSeries。你真正需要的是一个可扩展的数据管道,它可以作为一个缓冲区来解耦生产者和消费者。

这就是Apache Kafka的用武之地!除了核心代理之外,它还拥有丰富的组件生态系统,包括Kafka Connect(这是本博文中介绍的解决方案架构的一部分)、多语言客户端库、Kafka Streams、Mirror Maker等。

这篇博文提供了一个实际示例,说明如何将RedisTimeSeries与Apache Kafka结合使用来分析时间序列数据。

该代码可在此GitHub存储库中找到https://github.com/abhirockzz/redis-timeseries-kafka

让我们首先探索用例。请注意,为了博客文章的目的,它保持简单,然后在后续部分中进一步解释。

场景:设备监控

假设有很多位置,每个位置都有多个设备,您的任务是监控设备指标——现在我们将考虑温度和压力。这些指标将存储在RedisTimeSeries中(当然!)并使用以下键命名约定— <metric name>:<location>:<device>。例如,位置5中设备1的温度将表示为 temp:5:1。每个时间序列数据点还将具有以下标签(键值对)— metric, location, device。这是为了允许灵活的查询,正如您将在接下来的部分中看到的那样。

以下是几个示例,可让您了解如何使用该TS.ADD命令添加数据点:

#位置3中设备2的温度以及标签:

TS.ADD temp:3:2 * 20 LABELS metric temp location 3 device 2

#位置3中设备2的压力:

TS.ADD pressure:3:2 * 60 LABELS metric pressure location 3 device 2

解决方案架构

以下是该解决方案的高层次外观:

让我们分解一下:

源(本地)组件

  • MQTT代理(mosquitto):MQTT是物联网用例的事实上的协议。我们将使用的场景是物联网和时间序列的组合——稍后会详细介绍。
  • Kafka Connect:MQTT源连接器用于将数据从MQTT代理传输到Kafka集群。

Azure服务

  • 适用于Redis企业层的Azure缓存:企业层基于Redis Enterprise,这是Redis Labs的Redis商业变体。除了RedisTimeSeries,企业层还支持RediSearch和RedisBloom。客户无需担心企业层的许可证获取。Azure Redis缓存将促进这一过程,其中,客户可以通过Azure市场优惠获得并支付此软件的许可。
  • Confluent Cloud on Azure:一个完全托管的产品,提供Apache Kafka即服务,这要归功于从Azure到Confluent Cloud的集成供应层。它减轻了跨平台管理的负担,并为在Azure基础架构上使用Confluent Cloud提供了统一的体验,从而使您可以轻松地将Confluent Cloud与Azure应用程序集成。
  • Azure Spring Cloud:借助Azure Spring Cloud,可以更轻松地将Spring Boot微服务部署到Azure。Azure Spring Cloud减轻了基础结构问题,提供了配置管理、服务发现、CI/CD集成、蓝绿部署等。该服务完成了所有繁重的工作,因此开发人员可以专注于他们的代码。

请注意,有些服务是在本地托管的,只是为了简单起见。在生产级部署中,您也希望在Azure中运行它们。例如,您可以在Azure Kubernetes服务中操作Kafka Connect集群和MQTT连接器。

总而言之,这是端到端流程:

  • 脚本生成发送到本地MQTT代理的模拟设备数据。
  • 此数据由MQTT Kafka Connect源连接器获取,并发送到Azure中运行的Confluent Cloud Kafka集群中的主题。
  • 它由托管在Azure Spring Cloud中的Spring Boot应用程序进一步处理,然后将其保存到Azure Cache for Redis实例。

是时候从实用的东西开始了!在此之前,请确保您具备以下条件。

先决条件:

  • Azure帐户——您可以在此处免费获取
  • 安装Azure CLI
  • JDK 11用于例如OpenJDK
  • Maven和Git的最新版本

设置基础设施组件

按照文档预配RedisTimeSeries模块附带的Azure Redis缓存(企业层)。

在Azure Marketplace上配置Confluent Cloud集群。还要创建一个Kafka主题(使用名称mqtt.device-stats)并创建凭证(API密钥和秘密),稍后您将使用这些凭证安全地连接到您的集群。

您可以使用Azure门户或使用Azure CLI预配Azure Spring Cloud实例:

az spring-cloud create -n <name of Azure Spring Cloud service> -g <resource group name> -l <enter location e.g southeastasia>

在继续之前,请确保克隆GitHub存储库:

git clone https://github.com/abhirockzz/redis-timeseries-kafka
cd redis-timeseries-kafka

设置本地服务

组件包括:

  • Mosquitto MQTT代理
  • 使用MQTT源连接器的Kafka Connect
  • Grafana用于跟踪仪表板中的时间序列数据

MQTT代理

我在Mac上本地安装并启动了mosquitto代理。

brew install mosquitto
brew services start mosquitto

您可以按照与您的操作系统相对应的步骤进行操作,也可以随意使用此Docker镜像。

Grafana

我在Mac上本地安装并启动了Grafana。

brew install grafana
brew services start grafana

你可以对你的操作系统做同样的事情,也可以随意使用这个Docker镜像。

docker run -d -p 3000:3000 --name=grafana -e "GF_INSTALL_PLUGINS=redis-datasource" grafana/grafana

Kafka 连接

您应该能够在刚刚克隆的存储库中找到connect-distributed.properties文件。替换bootstrap.servers、sasl.jaas.config等属性的值。

首先,在本地下载并解压Apache Kafka。

启动本地Kafka Connect集群:

export KAFKA_INSTALL_DIR=<kafka installation directory e.g. /home/foo/kafka_2.12-2.5.0>$KAFKA_INSTALL_DIR/bin/connect-distributed.sh connect-distributed.properties

要手动安装MQTT源连接器:

  • 从此链接下载连接器/插件ZIP文件,然后,
  • 将其解压缩到Connect worker的plugin.path配置属性中列出的目录之一

如果您在本地使用Confluent Platform,只需使用Confluent Hub CLI: confluent-hub install confluentinc/kafka-connect-mqtt:latest

创建MQTT源连接器实例

确保检查mqtt-source-config.json文件。确保输入正确的主题名称kafka.topic并且mqtt.topics保持不变。

curl -X POST -H 'Content-Type: application/json'
http://localhost:8083/connectors -d @mqtt-source-config.json# wait for a minute before checking the connector status
curl http://localhost:8083/connectors/mqtt-source/status

部署设备数据处理器应用程序

在您刚刚克隆的GitHub存储库中,在consumer/src/resources文件夹中查找application.yaml文件并替换以下值:

  • Azure Redis缓存主机、端口和主访问密钥
  • Confluent Cloud on Azure API密钥和秘密

构建应用程序JAR文件:

cd consumerexport JAVA_HOME=<enter absolute path e.g. /Library/Java/JavaVirtualMachines/zulu-11.jdk/Contents/Home>mvn clean package

创建一个Azure Spring Cloud应用程序并将JAR文件部署到它:

az spring-cloud app create -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --runtime-version Java_11az spring-cloud app deploy -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group> --jar-path target/device-data-processor-0.0.1-SNAPSHOT.jar

启动模拟设备数据生成器

您可以使用刚刚克隆的GitHub存储库中的脚本:

./gen-timeseries-data.sh

注意——它所做的只是使用mosquitto_pub CLI命令发送数据。

数据被发送到device-stats MQTT topic(这不是Kafka主题)。您可以使用CLI订阅者仔细检查:

mosquitto_sub -h localhost -t device-stats

检查Confluent Cloud门户中的Kafka主题。您还应该检查Azure Spring Cloud中设备数据处理器应用的日志:

az spring-cloud app logs -f -n device-data-processor -s <name of Azure Spring Cloud instance> -g <name of resource group>

享受Grafana仪表板!

浏览到Grafana UI localhost:3000。

Grafana的Redis数据源插件适用于任何Redis数据库,包括Azure Redis缓存。按照此博客文章中的说明配置数据源。

在您克隆的GitHub存储库中的grafana_dashboards文件夹中导入仪表板(如果您需要有关如何导入仪表板的帮助,请参阅Grafana文档)。

例如,这里有一个仪表板,显示了location 1的device 5 (使用TS.MRANGE)的平均压力(超过30秒)。

这是另一个仪表板,显示了多个设备的最高温度(超过15秒)location 3(再次感谢TS.MRANGE)。

那么,您想运行一些RedisTimeSeries命令吗?

启动redis-cli并连接到Azure Cache for Redis实例:

redis-cli -h <azure redis hostname e.g. myredis.southeastasia.redisenterprise.cache.azure.net> -p 10000 -a <azure redis access key> --tls

从简单的查询开始:

# pressure in device 5 for location 1
TS.GET pressure:1:5# temperature in device 5 for location 4
TS.GET temp:4:5

按位置过滤并获取所有设备的温度和压力:

TS.MGET WITHLABELS FILTER location=3

提取特定时间范围内一个或多个位置的所有设备的温度和压力:

TS.MRANGE - + WITHLABELS FILTER location=3
TS.MRANGE - + WITHLABELS FILTER location=(3,5)

– +指的是从开始到最新时间戳的所有内容,但您可以更具体。

MRANGE正是我们所需要的!我们还可以按某个位置的特定设备进行过滤,然后按温度或压力进一步深入分析:

TS.MRANGE - + WITHLABELS FILTER location=3 device=2
TS.MRANGE - + WITHLABELS FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS FILTER location=3 device=2 metric=temp

所有这些都可以与聚合相结合。

# all the temp data points are not useful. how about an average (or max) instead of every temp data points?
TS.MRANGE - + WITHLABELS AGGREGATION avg 10000 FILTER location=3 metric=temp
TS.MRANGE - + WITHLABELS AGGREGATION max 10000 FILTER location=3 metric=temp

也可以创建一个规则来进行此聚合并将其存储在不同的时间序列中。

完成后,不要忘记删除资源以避免不必要的成本。

删除资源

  • 按照文档中的步骤删除Confluent Cloud集群——您只需要删除Confluent组织即可。
  • 同样,您也应该删除Azure Cache for Redis实例。

在您的本地机器上:

  • 停止Kafka Connect集群
  • 停止蚊子经纪人(例如酿造服务停止mosquito)
  • 停止Grafana服务(例如brew services stop grafana)

我们探索了使用Redis和Kafka摄取、处理和查询时间序列数据的数据管道。当您考虑下一步并转向生产级解决方案时,您应该考虑更多的事情。

其他注意事项

优化RedisTimeSeries

  • 保留策略:考虑这一点,因为默认情况下您的时间序列数据点不会被修剪或删除。
  • 下采样和聚合规则:您不想永远存储数据,对吗?确保配置适当的规则来解决这个问题(例如TS.CREATERULE temp:1:2 temp:avg:30 AGGREGATION avg 30000 )。
  • 重复数据政策:您希望如何处理重复样本?确保默认策略(BLOCK)确实是您所需要的。如果没有,请考虑其他选项。

这不是一个详尽的清单。其他配置选项请参考RedisTimeSeries文档

长期数据保留怎么样?

数据是宝贵的,包括时间序列!您可能想要进一步处理它(例如运行机器学习以提取洞察力、预测性维护等)。为此,您需要将这些数据保留更长的时间,并且为了经济高效且经济高效,您需要使用可扩展的对象存储服务,例如Azure Data Lake Storage Gen2 (ADLS Gen2) .

有一个连接器!您可以通过使用完全托管的Azure Data Lake Storage Gen2 Sink Connector for Confluent Cloud来处理和存储ADLS中的数据,然后使用Azure Synapse Analytics或Azure Databricks运行机器学习来增强现有数据管道。

可扩展性

您的时间序列数据量只能向上移动!解决方案的可扩展性至关重要:

  • 核心基础设施:托管服务允许团队专注于解决方案,而不是设置和维护基础设施,尤其是在涉及复杂的分布式系统(如数据库和流媒体平台,如Redis和Kafka)时。
  • Kafka Connect:就数据管道而言,由于Kafka Connect平台本质上是无状态且水平可扩展的,因此您掌握得很好。在如何构建和调整Kafka Connect工作集群的大小方面,您有很多选择。
  • 自定义应用程序:与本解决方案一样,我们构建了一个自定义应用程序来处理Kafka主题中的数据。幸运的是,同样的可伸缩性特征也适用于它们。在水平扩展方面,它仅受您拥有的Kafka主题分区数量的限制。

集成:不仅仅是Grafana!RedisTimeSeries还与Prometheus和Telegraf集成。但是,在撰写本文时还没有Kafka连接器——这将是一个很棒的附加组件!

结论

当然,您可以将Redis用于(几乎)所有事情,包括时间序列工作负载!请务必考虑从时间序列数据源到Redis及其他范围的数据管道和集成的端到端架构。

https://www.codeproject.com/Articles/5309620/Processing-Time-Series-Data-with-Redis-and-Apache

使用Redis和Apache Kafka处理时间序列数据相关推荐

  1. 重磅开源 KSQL:用于 Apache Kafka 的流数据 SQL 引擎 2017.8.29

    Kafka 的作者 Neha Narkhede 在 Confluent 上发表了一篇博文,介绍了Kafka 新引入的KSQL 引擎--一个基于流的SQL.推出KSQL 是为了降低流式处理的门槛,为处理 ...

  2. Apache Kafka:大数据的实时处理时代

    在过去几年,对于 Apache Kafka 的使用范畴已经远不仅是分布式的消息系统:我们可以将每一次用户点击,每一个数据库更改,每一条日志的生成,都转化成实时的结构化数据流,更早的存储和分析它们,并从 ...

  3. Apache Kafka:下一代分布式消息系统

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  4. kafka 复制因子_选择正确的分区计数复制因子apache kafka

    kafka 复制因子 A Small Introduction to Kafka! 卡夫卡小介绍! So before we learn about Kafka, let's learn how co ...

  5. 7.1.5 智慧物流【车辆监控Structured Streaming、整合kafka、Redis、Mysql、HBASE 写入数据】

    车辆监控 文章目录 车辆监控 第一节 Structured Streaming 1.1 Structured Streaming发展历史 1.1.1 Spark Streaming 1.1.2 Dat ...

  6. Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

    文章目录 案例:实时处理电商订单信息 需求一:统计商城实时订单实收金额 需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice) Redis Sink 自定义 Red ...

  7. 实用 | 从Apache Kafka到Apache Spark安全读取数据

    引言 随着在CDH平台上物联网(IoT)使用案例的不断增加,针对这些工作负载的安全性显得至关重要.本篇博文对如何以安全的方式在Spark中使用来自Kafka的数据,以及针对物联网(IoT)使用案例的两 ...

  8. 大数据开发hadoop核心的分布式消息系统:Apache Kafka 你知道吗

    简介 Apache Kafka是分布式发布-订阅消息系统.它最初由LinkedIn公司开发,之后成为Apache项目的一部分.Kafka是一种快速.可扩展的.设计内在就是分布式的,分区的和可复制的提交 ...

  9. kafka处理流式数据_通过Apache Kafka集成流式传输大数据

    kafka处理流式数据 从实时过滤和处理大量数据,到将日志数据和度量数据记录到不同来源的集中处理程序中,Apache Kafka越来越多地集成到各种系统和解决方案中. 使用CData Sync ,可以 ...

最新文章

  1. proe输入数字时成双出现_Proe/Creo步进电机正反转仿真详解
  2. 接受map_[译] 图解 Map、Reduce 和 Filter 数组方法
  3. 读取剪贴板英语转换为国际莫斯码
  4. asp.net 日期转换
  5. C/C++ 中判断某一文件或目录是否存在
  6. iOS -转载-开发之个人开发者账号转公司开发者账号
  7. 顶配12599元!三星Galaxy S22国行价格来了...
  8. 周日慕田峪生鱼片之旅,失败的第一次出台
  9. HttpClient post 请求实例
  10. c++ primer 6.3.3节练习答案
  11. WinForm 去掉DataGridView最后一行的空白行,删除空白行
  12. div中border属性
  13. allgro显示网络名称_ALLEGRO如何显示网络标号?
  14. win7升级win10系统
  15. dell电脑如何安装ubuntu系统_Dell电脑 U盘启动盘 安装ubuntu
  16. python tkinter如何隐藏控件
  17. 平狄克微观经济学笔记和课后习题答案
  18. [网友问答1]STM32驱动PCF8591模块,实现AD/DA转换
  19. 光场相机重聚焦--焦点堆栈深度估计法
  20. 3dmax 视图切换

热门文章

  1. 数据结构最佳路径代码_【微服务】149:商品数据结构
  2. java lang jar_在运行时,Java jar返回java.lang.NoClassDefFoundError
  3. 在python中获取当前工作目录可以通过_python-获取当前工作路径
  4. ios 平滑移动view_iOS 关于列表上拉(平滑加载数据)自动加载数据的问题
  5. 交换机putty怎么调试_弱电工程视频监控系统设计、安装、调试、维护全过程讲解...
  6. 设计灵感|高饱和渐变创意海报设计作品,值得学习
  7. 老板催你交圣诞海报设计?PSD源文件素材,直接编辑
  8. 高品质静物空间海报模板PSD分层素材
  9. 古典人物海报设计PSD分层模板,浓郁国风,展古典韵味
  10. 计算机与应用化学ppt,应用化学专用课件.ppt