简介: 获取更详细的 Databricks 数据洞察相关信息,可至产品详情页查看:https://www.aliyun.com/product/bigdata/spark

作者

美的暖通与楼宇事业部 先行研究中心智能技术部

美的暖通 IoT 数据平台建设背景

美的暖通与楼宇事业部(以下简称美的暖通)是美的集团旗下五大板块之一,产品覆盖多联机组、大型冷水机组、单元机、机房空调、扶梯、直梯、货梯以及楼宇自控软件和建筑弱电集成解决方案,远销海内外200多个国家。当前事业部设备数据上云仅停留在数据存储层面,缺乏挖掘数据价值的平台,造成大量数据荒废,并且不断消耗存储资源,增加存储费用和维护成本。另一方面,现有数据驱动应用缺乏部署平台,难以产生实际价值。因此,急需统一通用的 IoT 数据平台,以支持设备运行数据的快速分析和建模。

我们的 IoT 数据平台建设基于阿里云 Databricks 数据洞察全托管 Spark 产品,以下是整体业务架构图。在本文后面的章节,我们将就IoT数据平台建设技术选型上的一些思考,以及 Spark 技术栈尤其是 Delta Lake 场景的应用实践做一下分享。

选择 Spark & Delta Lake

在数据平台计算引擎层技术选型上,由于我们数据团队刚刚成立,前期的架构选型我们做了很多的调研,综合各个方面考虑,希望选择一个成熟且统一的平台:既能够支持数据处理、数据分析场景,也能够很好地支撑数据科学场景。加上团队成员对 Python 及 Spark 的经验丰富,所以,从一开始就将目标锁定到了 Spark 技术栈。

选择 Databricks 数据洞察 Delta Lake

通过与阿里云计算平台团队进行多方面的技术交流以及实际的概念验证,我们最终选择了阿里云 Databricks 数据洞察产品。作为 Spark 引擎的母公司,其商业版 Spark 引擎,全托管 Spark 技术栈,统一的数据工程和数据科学等,都是我们决定选择 Databricks 数据洞察的重要原因。

具体来看,Databricks 数据洞察提供的核心优势如下:

  • Saas 全托管 Spark:免运维,无需关注底层资源情况,降低运维成本,聚焦分析业务
  • 完整 Spark 技术栈集成:一站式集成 Spark 引擎和 Delta Lake 数据湖,100%兼容开源 Spark 社区版;Databricks 做商业支持,最快体验 Spark 最新版本特性
  • 总成本降低:商业版本 Spark 及 Delta Lake 性能优势显著;同时基于计算存储分离架构,存储依托阿里云 OSS 对象存储,借助阿里云 JindoFS 缓存层加速;能够有效降低集群总体使用成本
  • 高品质支持以及SLA保障:阿里云和 Databricks 提供覆盖 Spark 全栈的技术支持;提供商业化 SLA 保障与7*24小时 Databricks 专家支持服务

IoT 数据平台整体架构

整体的架构如上图所示。

我们接入的 IoT 数据分为两部分,历史存量数据和实时数据。目前,历史存量数据是通过 Spark SQL 以天为单位从不同客户关系数据库批量导入 Delta Lake 表中;实时数据通过 IoT 平台采集到云 Kafka ,经由 Spark Structured Streaming 消费后实时写入到 Delta Lake 表中。在这个过程中,我们将实时数据和历史数据都 sink 到同一张 Delta 表里,这种批流一体操作可大大简化我们的 ETL 流程(参考后面的案例部分)。数据管道下游,我们对接数据分析及数据科学工作流。

IoT 数据采集:从 Little Data 到 Big Data

作为 IoT 场景的典型应用,美的暖通最核心的数据均来自 IoT 终端设备。在整个 IoT 环境下,分布着无数个终端传感器。从小的维度看,传感器产生的数据本身属于 Small Data(或者称为 Little Data)。当把所有传感器连接成一个大的 IoT 网络,产生自不同传感器的数据经由 Gateway 与云端相连接,并最终在云端形成 Big Data 。

在我们的场景下,IoT 平台本身会对不同协议的数据进行初步解析,通过定制的硬件网络设备将解析后的半结构化 JSON 数据经由网络发送到云 Kafka。云 Kafka 扮演了整个数据管道的入口。

数据入湖:Delta Lake

IoT 场景下的数据有如下几个特点:

  • 时序数据:传感器产生的数据记录中包含时间相关的信息,数据本身具有时间属性,因此不同的数据之间可能存在一定的相关性。利用 as-of-join 将不同时间序列数据 join 到一起是下游数据预测分析的基础
  • 数据的实时性:传感器实时生成数据并以最低延迟的方式传输到数据管道,触发规则引擎,生成告警和事件,通知相关工作人员。
  • 数据体量巨大:IoT 网络环境下遍布各地的成千上万台设备及其传感器再通过接入服务将海量的数据归集到平台
  • 数据协议多样:通常在 IoT 平台接入的不同种类设备中,上传数据协议种类多样,数据编码格式不统一

IoT 数据上述特点给数据处理、数据分析及数据科学等带来了诸多挑战,庆幸的是,这些挑战借助 Spark 和 Delta Lake 都可以很好地应对。Delta Lake 提供了 ACID 事务保证,支持增量更新数据表以及流批同时写数据。借助 Spark Structed Streaming 可以实现 IoT 时序数据实时入湖。

以下是 Delta Lake 经典的三级数据表架构。具体到美的暖通 IoT 数据场景,我们针对每一层级的数据表分别做了如下定义:

  • Bronze 表:存储原生数据(Raw Data),数据经由 Spark Structed Streaming 从 Kafka 消费下来后 upsert 进 Delta Lake 表,该表作为唯一的真实数据表  (Single Source of Truth)
  • Silver表:该表是在对 Bronze 表的数据进行加工处理的基础上生成的中间表,在美的暖通的场景下,数据加工处理的步骤涉及到一些复杂的时序数据计算逻辑,这些逻辑都包装在了 Pandas UDF 里提供给 Spark 计算使用
  • Gold 表:Silver 表的数据施加 Schema 约束并做进一步清洗后的数据汇入 Gold 表,该表提供给下游的 Ad Hoc 查询分析及数据科学使用

数据分析:Ad-Hoc 查询

我们内部在开源 Superset 基础上定制了内部版本的 SQL 查询与数据可视化平台,通过 PyHive 连接到 Databricks 数据洞察 Spark Thrift Server 服务,可以将 SQL 提交到集群上。商业版本的 thrift server 在可用性及性能方面都做了增强,Databricks 数据洞察针对 JDBC 连接安全认证提供了基于 LDAP 的用户认证实现。借助 Superset ,数据分析师及数据科学家可以快速高效的对 Delta Lake 表进行数据探索。

数据科学:Workspace

楼宇能耗预测与设备故障诊断预测是美的暖通 IoT 大数据平台建设的两个主要业务目标。在 IoT 数据管道下游,需要对接机器学习平台。现阶段为了更快速方便地支撑起数据科学场景,我们将 Databricks 数据洞察集群与阿里云数据开发平台 DDC 打通。DDC 集成了在数据科学场景下更友好的 Jupyter Notebook ,通过在 Jupyter 上使用 PySpark ,可以将作业跑到 Databricks 数据洞察集群上;同时,也可以借助 Apache Airflow 对作业进行调度。同时,考虑到机器学习模型构建、迭代训练、指标检测、部署等基本环节,我们也在探索 MLOps ,目前这部分工作还在筹备中。

典型应用场景介绍

Delta Lake 数据入湖(批流一体)

使用 UDF 函数定义流数据写入 Delta Lake 的 Merge 规则

%spark
import org.apache.spark.sql._
import io.delta.tables._// Function to upsert `microBatchOutputDF` into Delta table using MERGE
def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {// Set the dataframe to view namemicroBatchOutputDF.createOrReplaceTempView("updates")// Use the view name to apply MERGE// NOTE: You have to use the SparkSession that has been used to define the `updates` dataframemicroBatchOutputDF.sparkSession.sql(s"""MERGE INTO delta_{table_name} tUSING updates sON s.uuid = t.uuidWHEN MATCHED THEN UPDATE SET t.device_id = s.device_id,t.indoor_temperature =
s.indoor_temperature,t.ouoor_temperature = s.ouoor_temperature,t.chiller_temperature =
s.chiller_temperature,t.electricity = s.electricity,t.protocal_version = s.protocal_version,t.dt=s.dt,t.update_time=current_timestamp()WHEN NOT MATCHED THEN INSERT (t.uuid,t.device_id,t.indoor_temperature,t.ouoor_temperature ,t.chiller_temperature
,t.electricity,t.protocal_version,t.dt,t.create_time,t.update_time)values
(s.uuid,s.device_id,s.indoor_temperature,s.ouoor_temperature,s.chiller_temperature,s.electricity,s.protocal_version
,s.dt,current_timestamp(),current_timestamp())""")
}

使用 Spark Structured Streaming 实时流写入 Delta Lake

%sparkimport org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Triggerdef getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) {var streamingInputDF =  spark.readStream.format("kafka").option("kafka.bootstrap.servers", servers).option("subscribe", topic)     .option("startingOffsets", "latest")  .option("minPartitions", "10")  .option("failOnDataLoss", "true").load()
val resDF=streamingInputDF.select(col("value").cast("string")).withColumn("newMessage",split(col("value"), " ")).filter(col("newMessage").getItem(7).isNotNull).select(col("newMessage").getItem(0).as("uuid"),col("newMessage").getItem(1).as("device_id"),col("newMessage").getItem(2).as("indoor_temperature"),col("newMessage").getItem(3).as("ouoor_temperature"),col("newMessage").getItem(4).as("chiller_temperature"),col("newMessage").getItem(5).as("electricity"),col("newMessage").getItem(6).as("protocal_version")).withColumn("dt",date_format(current_date(),"yyyyMMdd"))
val query = resDF.writeStream.format("delta").option("checkpointLocation", checkpoint_dir).trigger(Trigger.ProcessingTime("60 seconds")) // 执行流处理时间间隔.foreachBatch(upsertToDelta _) //引用upsertToDelta函数.outputMode("update")query.start()
}

数据灾备:Deep Clone

由于 Delta Lake 的数据仅接入实时数据,对于存量历史数据我们是通过 SparkSQL 一次性 Sink Delta Lake 的表中,这样我们流和批处理时只维护一张 Delta 表,所以我们只在最初对这两部分数据做一次 Merge 操作。同时为了保证数据的高安全,我们使用 Databricks Deep Clone 来做数据灾备,每天会定时更新来维护一张从表以备用。对于每日新增的数据,使用 Deep Clone 同样只会对新数据 Insert 对需要更新的数据 Update 操作,这样可以大大提高执行效率。

CREATE OR REPLACE TABLE delta.delta_{table_name}_cloneDEEP CLONE delta.delta_{table_name};

性能优化:OPTIMIZE & Z-Ordering

在流处理场景下会产生大量的小文件,大量小文件的存在会严重影响数据系统的读性能。Delta Lake 提供了 OPTIMIZE 命令,可以将小文件进行合并压缩,另外,针对 Ad-Hoc 查询场景,由于涉及对单表多个维度数据的查询,我们借助 Delta Lake 提供的 Z-Ordering 机制,可以有效提升查询的性能。从而极大提升读取表的性能。DeltaLake 本身提供了 Auto Optimize 选项,但是会牺牲少量写性能,增加数据写入 delta 表的延迟。相反,执行 OPTIMIZE 命令并不会影响写的性能,因为 Delta Lake 本身支持 MVCC,支持 OPTIMIZE 的同时并发执行写操作。因此,我们采用定期触发执行 OPTIMIZE 的方案,每小时通过 OPTIMIZE 做一次合并小文件操作,同时执行 VACCUM 来清理过期数据文件:

OPTIMIZE delta.delta_{table_name} ZORDER by device_id, indoor_temperature;
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta.delta_{table_name} RETAIN 1 HOURS;

另外,针对 Ad-Hoc 查询场景,由于涉及对单表多个维度数据的查询,我们借助 Delta Lake 提供的 Z-Ordering 机制,可以有效提升查询的性能。

总结与展望

我们基于阿里云 Databricks 数据洞察产品提供的商业版 Spark 及 Delta Lake 技术栈快速构建了 IoT 数据处理平台,Databricks 数据洞察全托管免运维、商业版本引擎性能上的优势以及计算/存储分离的架构,为我们节省了总体成本。同时,Databricks 数据洞察产品自身提供的丰富特性,也极大提升了我们数据团队的生产力,为数据分析业务的快速开展交付奠定了基础。未来,美的暖通希望与阿里云 Databricks 数据洞察团队针对 IoT 场景输出更多行业先进解决方案。

原文链接
本文为阿里云原创内容,未经允许不得转载。

【实践案例】Databricks 数据洞察在美的暖通与楼宇的应用实践相关推荐

  1. 【实践案例】Databricks 数据洞察 Delta Lake 在基智科技(STEPONE)的应用实践

    简介: 获取更详细的 Databricks 数据洞察相关信息,可至产品详情页查看:https://www.aliyun.com/product/bigdata/spark 作者 高爽,基智科技数据中心 ...

  2. 超详攻略!Databricks 数据洞察 - 企业级全托管 Spark 大数据分析平台及案例分析

    简介: 5分钟读懂 Databricks 数据洞察 ~ 更多详细信息可登录 Databricks 数据洞察 产品链接:https://www.aliyun.com/product/bigdata/sp ...

  3. 自建Hive数据仓库跨版本迁移到阿里云Databricks数据洞察

    简介:客户在IDC或者公有云环境自建Hadoop集群构建数据仓库和分析系统,购买阿里云Databricks数据洞察集群之后,涉及到数仓数据和元数据的迁移以及Hive版本的订正更新. 直达最佳实践:[自 ...

  4. 案例▍大数据可视化 “灵鲲”态势感知系统中的理论与实践

    36大数据 -数据可视化专栏 作者| 潘洛斯团队  |腾讯大数据可视化项目组 在大数据时代,数字化转型已经成为行业迫切的需求.2016-2018年金融.医疗.政府.安全等行业在大数据方向上的投入持续增 ...

  5. 《SaltStack技术入门与实践》—— 实践案例 中小型Web架构2 Keepalived

    实践案例 <中小型Web架构>2 本章节参考<SaltStack技术入门与实践>,感谢该书作者: 刘继伟.沈灿.赵舜东(本章节中有好几处错误) Keepalived配置管理 首 ...

  6. 浅议数据中心规划设计阶段暖通系统节能措施

    最近几年以来,国内的数据中心工程数量变得越来越多,有些数据中心其实际建筑面积甚至可以达到上万平方米.这样的变化使得传统的机房精密空调冷却模式不再适合我国当前发展的实际需求.PUE 在节能效果评估工作正 ...

  7. 视频解读:案例透视消费者洞察实践与收益

     点击上方蓝字关注我们  本文根据Stratifyd资深解决方案经理段鑫龙(Bruce Duan)在"如何洞察您的消费者"直播课主题演讲整理.点击文末"阅读原文" ...

  8. 【TOP100】100个中国大数据应用最佳实践案例—为您打开万亿元大数据产业的财富之门

    热门下载(点击标题即可阅读) ☞[下载]2015中国数据分析师行业峰会精彩PPT下载(共计21个文件) 2017年3月28日至29日,由工业和信息化部指导.中国信息通信研究院和数据中心联盟主办的&qu ...

  9. 【视频课】言有三每天答疑,38课深度学习+超60小时分类检测分割数据算法+超15个Pytorch框架使用与实践案例助你攻略CV...

    计算机视觉中大大小小可以包括至少30个以上的方向,在基于深度学习的计算机视觉研究方向中,图像分类,图像分割,目标检测无疑是最基础最底层的任务,掌握好之后可以很快的迁移到其他方向,比如目标识别,目标跟踪 ...

最新文章

  1. 几种软负载均衡策略分析
  2. 金融领域下的数据挖掘算法应用:XGboost模型
  3. 经常吹空调皮肤是不是会变黑
  4. Vuex——使用namespace的store使用mapState获取state为undefined
  5. (转)基于MVC4+EasyUI的Web开发框架形成之旅--基类控制器CRUD的操作
  6. cmd xcopy 拷贝文件夹_u盘文件夹被病毒隐藏怎么解决 u盘文件夹被病毒隐藏解决方法【详细步骤】...
  7. Java10的新特性
  8. 今天将IE升级到了7.0版
  9. c++ primer plus(第6版)中文版 第十二章编程练习答案
  10. 车辆路径跟踪算法及数学模型
  11. JAVA复习总结 一( 详细,干货!)
  12. 百度搜索下拉框,下拉菜单怎么做?如何刷?
  13. UVALive7461 - Separating Pebbles 判断两个凸包相交
  14. C++ 完全平方数
  15. jq.ajax+php+mysql实现瀑布流缓冲加载数据
  16. html5 mp3播放器源码,HTML5自定义mp3播放器源码
  17. change在c语言中的用法,change的过去式和用法例句意思及阅读
  18. 从循环神经网络到卷积神经网络
  19. 解决外贸电商难题,PayPal中国外贸电商大会圆满礼成
  20. (转)Moodle平台简介

热门文章

  1. ieee39节点系统介绍_Java秒杀系统实战系列-基于ZooKeeper的分布式锁优化秒杀逻辑...
  2. 全国战争linux添加eth0,linux服务器双线路接入配置
  3. Java程序员遇到瓶颈后我们可以试着朝四个方向拓展?你们觉得呢?
  4. java数组 arraylist_JAVA 用数组实现 ArrayList
  5. 【LeetCode笔记】56. 合并区间(Java、排序)
  6. mybatis依赖_Spring Boot2 系列教程(二十一)整合 MyBatis
  7. linux mysql 实战_linux实用实战
  8. 华硕2020年显卡_TrendForce集邦咨询:2020年液晶显示器年出货成长率达5.4%,华硕成长率居冠、三星排名上升...
  9. 安卓第一行代码第3版pdf_SPECFEM2D用户手册——第3章 网格生成——3.1 如何使用SPECFEM2D...
  10. python以垂直方式输出hello world_python3提问:垂直输出Hello World,全部代码不超过2行....