全生命周期大数据处理系列

任何一件复杂的事物,简化它的方法就是分而治之,只是这个法,万变不离其宗,可能会因人因事而大同小异而已.我在车联网大数据的处理实践中不断沉淀,在公司产品的迭代升级中逐步升华,深有感触略有所得,分享出来抛砖引玉,希望能引起共鸣,共同进步.

我把数据的生命周期分为如下几个阶段,而这么划分的标准是什么呢?我认为是"价值",数据的不断处理,是价值的不断提炼,其目的是为了获取其潜在的价值,这个价值的体现可能是引导了决策走向,展现了数据全貌,给出了时间趋势,挖掘出了不易发现的规律,预测了可能发生的事件等等.任何计算框架,无论什么工具产品,抑或是数据处理的策略和方法论,都是为了这个目标而产生的.

这个系列的讲解也是为了达成这一目标,怎么从最初产生的大数据,应用一系列的方法和工具,最后萃取出有价值的高纯度的小数据,我们可以称这个过程为数据掘金,也可以说是数据挖掘.

第一阶段: 数据采集

数据采集是由"米"到"炊"的最早阶段,这个阶段一般来说技术难度不大,但是有一定的规划难度.

通常也称这个阶段为数据埋点,"埋点"是数据采集阶段常用的术语,"埋"有设置和隐藏的含义.

以新能源电动汽车为例,司机在车辆使用甚至是停车的过程当中,有大量的数据产生,行驶过程中电池在放电,就会有电压,电流,电池单体温度,电阻,故障信号,车速,加速度等等,充电过程中电池在充电,就会有充电桩位置经纬度,充电电流,时长,室外温度,是否过充信号等等,而且这些信号在以一定的频率产生,短则毫秒级,长则秒级.这些信号数量数以千级万级,很多信号还是二维数组甚至多维数组,同时采集频率也很高,因为加速度的计算故障的分析尤其是碰撞事故发生时的诊断,对数据密度有较高的要求.这样的数据事实要求我们,数据埋点要规划好:

我们要采集哪些信号? 过多会产生无效传输和存储,过少不能满足分析需要.

采集频率应该多大? 这个采集频率是指车端的数据发送到服务器端的频率,而不是车端的采集频率,在车辆端采集频率可以很高,信号基本是全部采集,但是往往只存储一段时间,过后即会被清除.把这些数据发送到服务器端的频率,除了数据分析的最低要求外,还要考虑到存储和传输成本,所以往往存在两个采集频率,比如平时以1秒的频率传输,在有信号故障发生时则以50毫秒的频率传输.

采用什么协议? 协议即约定好的数据格式和标准,以便在发送端和接收端能做出一致的解释.在新能源领域,有国家规定的标准即国标,它定义了必须采集和传输的数据信号及频率,但其定义往往比较宽泛和基础,不同的车企还有各自的数据分析需求,便产生了众多的企业标准即企标.定义这个协议关键是定义数据结构,比如从哪个字节开始是温度字段,是单值还是数组,数组的每个温度值,应该采用几个字节,值域范围是多大,采用什么字节顺序,物理单位是什么,信号字段往往只是一个逻辑值怎么才能节约存储空间等等.

第一目标: 数据采集


车辆端采集的数据经网络传输到服务器端,一般以分布式的消息队列的存储形式接收比如Kafka,用它的主要好处是可以弹性扩展,即可以随时增减存储节点资源以应对变化的采集数据体积和速度,不至于数据堆积也不至于存储浪费,但是Kafka的数据往往不能直接用于计算,虽然它本身也是支持流数据计算的,通常的做法是把它当作缓冲的存储,在我们的实际应用中,它的数据有3个应用方向,一方面用于实时的故障诊断计算,另一方面把轨迹及信号数据转储于HBase用于数据回溯,还有一方面把数据存为离线数据用于数据仓库指标体系建设.

接收到的数据是二进制非结构化的,是不能用于常规的数据分析的,这个时候就要涉及到数据解析了,底层的数据解析是字节级别的操作,面对那么多的信号,各种数据格式,让很多数据开发人员望而却步,调试难易出错,而且解析代码在不同企标之间往往不能移植,即一次性代码,这大大违背了程序的可重用的本质.所以相应的工具产品也就应运而生了,这就是Mouth.你可以通过文档来深入了解它.

第二目标: 数据解析

怎么用 Mouth 来简单地把二进制数据解析成结构化数据呢?可以说一个XML文件足矣,你从如下的文件结构中一眼即可看出端倪,它用简单的几个指令比如string_item,byte_item描述了二进制数据中的结构,有了这个描述文件,Mouth就知道该怎么来解析二进制数据了.

    <!-- file: parser_vehicle.xml -->
<parser xmlns="https://www.rocy-data.com/parser_ex"><string_item name="tsp_start" byte_count="2" fixed_value="##"/><!--first read string of length=2,and its name=tsp_start --><byte_item name="tsp_command"><!-- read a byte value to `tsp_command` --><value_map value="1" name="register"/> <!-- for value 1 of `tsp_command`,convert it to `register` --><value_map value="2" name="real info up"/></byte_item><byte_item name="tsp_ack"><value_map value="1" name="success"/><value_map value="2" name="error"/></byte_item><string_item name="vin" byte_count="17"/><dword_item name="flow_number"/><!-- read a long number and name it `flow_number` --><byte_item name="tsp_encrypt" is_temp="true"/><dword_item name="data.tsp_got_time"/><!-- read a long number and name it in dot struct, --><byte_item name="data.info_type"/><!-- make same group with above data because they have same prefix --><byte_item name="data.battery_number"/><!-- make same group with above data because they have same prefix --><switch_item switch="data.info_type"> <!-- generate different data structure based on the value of `data.info_type`,`switch` is expression returning boolean that con contains expression such as `data.info_type + 2 > data.battery_number` --><case value="2"><word_item name="dbt_probe_count"/><byte_item name="db_package_count"/><byte_item name="dbt_temperature_array_size"/><set_variable name="index" value="0"/> <!-- set variable used to control loop operation --><while_item condition="index &lt; dbt_temperature_array_size"><byte_item name="dbt_temperature_array[index].db_package_high_temp"/> <!--  the `[]` symbol used to construct array --><byte_item name="dbt_temperature_array[index].db_package_low_temp"/><byte_item name="dbt_temperature_array[index].db_package_temperature_probe_count"/><array type="byte" length="dbt_probe_count" name="dbt_temperature_array[index].db_probe_temperatures"/> <!--for basic array type,its type and length is required--><set_variable name="index" value="index + 1"/><!-- set variable used to control loop operation --></while_item></case></switch_item>
</parser>

细心的读者可能已经发现了,它还有while_item,switch_item,set_variable这些指令来帮助我们描述更为复杂的灵活的数据结构,而且不管是怎样的企标数据,咱们用的都是这些指令,这便达到了可重用可移植的目的。

只需要一行代码就可以根据上述XML解析文件将接收到的字节数组转换为JSON数据,在这个步骤把数据结构化之后,就可以执行其他业务逻辑了。

    val bytes = ...// get bytesval json = ParserTemplate.parseAsJson(bytes.map(_.toUnsigned), "parser_vehicle.xml") // above bytes and file

输出的JSON,大概是如下示例这个样子的,其实不只如此,即便更为复杂的动态结构,也是能够处理的,除了switch_item+case,if_item也可以根据其他信号的值构成的评估表达式来决定如何解析,这使得数据结构可以是动态的,即结构并不固定,这个能力在其他类似的产品中也是很难实现的:

{"flow_number": 123,"data": {"info_type": 2,"tsp_got_time": 123456789,"battery_number": 2},"tsp_start": "##","tsp_ack": "success","dbt_temperature_array": [{"db_package_high_temp": 38,"db_probe_temperatures": [ 31.0, 32.0, 33.0, 38.0, 31.0, 32.0, 33.0, 38.0 ],"db_package_low_temp": 31,"db_package_temperature_probe_count": 8},{"db_package_low_temp": 31,"db_probe_temperatures": [ 31.0, 32.0, 33.0, 38.0, 31.0, 32.0, 33.0, 37.0 ],"db_package_temperature_probe_count": 8,"db_package_high_temp": 37}],"dbt_temperature_array_size": 2,"tsp_command": "real info up","dbt_probe_count": 8,"vin": "ROCY0123456789123","db_package_count": 2
}

第三目标: 数据回溯

数据回溯的常见需求是查询某个ID(车联网领域即是VIN)在某个时间段的数据,数据查询往往要求实时响应,HBase正好能满足这个需求,其RowKey可以这样设计:

  1. VIN+数据的时间. 这样在查询时指定VIN和时间范围即可快速查询.
  2. VIN存储为倒序形式. 其目的是让各车辆数据在各存储节点上分散存储,为什么这样就能分散存储呢? 大致有这样的公式: 数据应存储的节点位置=RowKey的Hash值%节点数,而VIN值在同一车企往往有相同的前缀,致使有相同Hash值的概率加大,从而导致数据存储倾斜.把VIN存储为倒序形式即可使数据分散存储避免倾斜.

那么具体的数据怎么存储呢?技术难度不大,但是通常的做法很麻烦,麻烦在于数据信号的数量是庞大的,数以千计,而且数据类型各种各样,存储的代码如果对信号逐一处理是枯燥无味的,Pulse把这个过程简化了:

  1. 定义TSourceParse的一个实现,用于把Kafka的数据转化为JSON,如果你已经完成了第二目标: 数据解析这一步骤,那么这一步就可以省略了,这一步对JSON的要求只是:具有vintsp_got_time这两个字段.
  2. 定义Family. 通过sink_signal_name2categories指定哪些类似的信号存储到某一个Family
  3. 定义数据从哪里来存储到哪里,即Kafka和HBase的配置.

如此而已,再不需要其他了,数据已经具备,如果需要图表展示,前端页面所需要做的是:筛选车辆,指定时间范围,筛选关心的信号量,展示趋势图或是对比图即可.

第二阶段: 质量监控

前一阶段使数据结构化,可理解,可分析,可计算,而数据质量是一切计算的基础和保障,数据质量可从整体上反映了数据的质量情况,这一过程应该尽早完成,及时避免错误数据的向下传播.在这个过程中,我们希望得到的答案或者说目标是:

第一目标 缺失情况

  1. 有多少监控对象,比如有多少车辆?
  2. 每个目标有多少记录,打点频率怎样?
  3. 有多少打点字段,每个字段空值数量及比例?

第二目标 数值错误

  1. 各数值字段值域是否合理?
  2. 数值分布是否符合期望?
  3. 数值型数组的值域及分布是否在合理区间?

第三目标 频度分布

  1. 单维度频率分布,比如车型有值model1,model2,model3,那么每个车型有多少车辆?
  2. 多维度频率分布,在单维度频率分布基础之上,比如每个省份每个年份的每个车型有多少车辆?

第四目标 质量告警

  1. 质量报告. 当数据质量问题发生时,能够以质量报告的形式发送给数据相关负责人,以便采取措施;
  2. 触发事件. 比如能够触发某个脚本的执行,这个脚本可以是任何相关任务,比如字段1的统计值异常,那么就把和字段1相关的模型任务停掉.

仍然是数据字段很庞大的原因,使得我们按照常规的方法做这些质量检查根本不可行.可以把这一过程简化的根据是:数据类型是有限的,相同类型的数据其检查方法是类似的,比如数值型做常规数理统计,枚举型做频度统计等等,Health正是这样做的.

它的输出是类似这样的结果:

{"row_count":4,"null_stat":[{ "col1":1, "col2":1 }],"number_stat":[{ "field":"col1", "agg":"min", "agg_value":1.0 },{ "field":"col1", "agg":"max", "agg_value":3.0 },{ "field":"col1", "agg":"mean", "agg_value":2.0 },{ "field":"col1", "agg":"avg", "agg_value":2.0 },{ "field":"col1", "agg":"stddev", "agg_value":1.0 },{ "field":"col1", "agg":"skewness", "agg_value":0.0 },{ "field":"col1", "agg":"kurtosis", "agg_value":-1.5 },{ "field":"col1", "agg":"countDistinct", "agg_value":3.0 }],"arr_stat":[{ "arrName":"col3", "statType":"mean", "value":2.5 },{ "arrName":"col3", "statType":"avg", "value":2.5 },{ "arrName":"col3", "statType":"kurtosis", "value":-1.2000000000000006 },{ "arrName":"col3", "statType":"min", "value":1.0 },{ "arrName":"col3", "statType":"countDistinct", "value":3.0 },{ "arrName":"col3", "statType":"stddev", "value":1.2907110929399985 },{ "arrName":"col3", "statType":"max", "value":4.0 },{ "arrName":"col3", "statType":"skewness", "value":0.0 }],"enum_stat":[{ "name":"col2", "count":3, "freq":{ "b":1, "a":2, "NULL":1 } }]
}

输出这样详细的结果,只需要我们做一个简单的配置,那就是要做数据质量检查的文件路径.字段的类型是自动推断的.
当然这只是一个最基本的需求,所以它必然允许我们个性化定制,通过配置文件的详细配置,我们还可以:

  1. 指定哪些文件需要检查数据质量,而不限于1个.
  2. 每个要检查的文件都做哪些质量检查,不需要的可以不做.
  3. 可以做更丰富的交叉表检查,这样更聚焦更细致.

如果有一定的Coding基础,通过插件扩展,我们还可以:

  1. 检查其他数据源比如JDBC或者ES的数据质量;
  2. 根据质量检查结果和上次检查结果做异常发现,告警通知,执行某个脚本控制其他任务的执行等等.

第三阶段: 实时诊断

上述的质量监控,是从整体上把握数据,这一阶段的目标是要掌握数据的极端情况,极端数据并没有数据质量问题,但从业务逻辑上却属于异常情况,是需要给予特别关注的.从诊断方法方面分类,可分为如下两类目标.

第一目标 单记录诊断

以数据实例来说,在如下的记录中voltage_list是电池单体电压数据,其数据类型是数组,元素个数代表单体个数,过高的均值或者方差都是不正常的,对应的现象可能是电池充电故障或者单体故障.

{"voltage_list":[3.2,3.4,3.41],"tsp_got_time":1674736665000,"vin":"A0","other":1}
{"voltage_list":[4,3.6,3.7],"tsp_got_time":1674736675000,"vin":"A1","other":2}

Pulse 处理这类异常,无论是单记录还是多记录都能很好地处理.我们要做的就是指定异常规则,其形式如下所示:

expression=array_min(voltage_list)>3.5,eng_name=test,chinese_name=电压过高
  1. expression
    它是用来指定判定异常的逻辑表达式,在这里可以使用统计函数,也可以使用数组函数,不限于单个字段,也可以指定任意多个字段的复杂表达式
  2. rule
    上述示例指定的rule,可以指定多个,它们之间是的关系,即满足任意一条rule,即把这条数据记录视为异常.
  3. 数据读取及存储
    在 Pulse 的实现中,数据读自 Kafka,发现的异常数据存入Kafka. 以方便其他应用端能够方便的订阅这些异常数据.

第二目标 多记录诊断

上述单记录的实时监控,其能力还是有限的,车联网或者其他物联网的数据,往往都是实时数据,且数据的前后是存在关联的,比如车辆的碰撞事故是跟速度及加速度存在很大关系的,而这两个物理量的计算是要根据前后多条记录才能计算出来.更复杂一点的,车辆在某个时间点发生故障比如熄火,其实在这一时间点之前的一段时间内比如1分钟内或者1小时内已经有很多信号反映出异常,这在理论上是存在提前预测甚至避免故障的可能的.

下面以数据案例来举例说明:

{"vin":"vin1","tsp_got_time":1674736665000,"esd_volt":48}
{"vin":"vin1","tsp_got_time":1674736675000,"esd_volt":50}
{"vin":"vin1","tsp_got_time":1674736685000,"esd_volt":55}
{"vin":"vin1","tsp_got_time":1674736695000,"esd_volt":60}
{"vin":"vin1","tsp_got_time":1674736705000,"esd_volt":58}
{"vin":"vin1","tsp_got_time":1674736715000,"esd_volt":50}
{"vin":"vin1","tsp_got_time":1674736725000,"esd_volt":45}
{"vin":"vin2","tsp_got_time":1674736665000,"esd_volt":30}
{"vin":"vin2","tsp_got_time":1674736675000,"esd_volt":50}
{"vin":"vin2","tsp_got_time":1674736685000,"esd_volt":55}
{"vin":"vin2","tsp_got_time":1674736695000,"esd_volt":60}
{"vin":"vin2","tsp_got_time":1674736705000,"esd_volt":62}
{"vin":"vin2","tsp_got_time":1674736715000,"esd_volt":48}
{"vin":"vin3","tsp_got_time":1674736725000,"esd_volt":45}

数据说明:

  1. vin: 车辆的唯一标识符
  2. tsp_got_time: 数据的采集时间,示例中是以毫秒表示的,上下两条数据基本上是10秒间隔
  3. esd_volt: 电压数据

某个车辆的某个信号量或者是多个信号经过计算得到的信号量,发生并持续一段时间,是一个常见的监控需求,仅此一项就能发现很多异常,我们可以通过指定rule,来让Pulse 帮我们把相应的数据找出来,这个rule可以这样指定:

[{"partition_by": "vin","expression": "esd_volt>50","eng_name": "test","chinese_name": "test","duration_seconds": 90,"names_output": ["tsp_got_time","esd_volt"],"agg_outputs": [{"expression": "avg(esd_volt)","eng_name": "avg_esd_volt","chinese_name": "平均电压"}]
}]

数据说明:

  1. partition_by: 指定分组字段. 示例说明,异常是按照车辆分组的
  2. expression: 指定异常rule的逻辑表达式,返回逻辑布尔值,如上所说,它可以表达很丰富很复杂的业务逻辑,而不是示例中这么简单.
  3. duration_seconds: 指定满足expression这个条件的数据要持续多少秒才视为异常
  4. names_output: 指定一些输出字段,输出结果中,会包含这些字段在起止时间上的数值
  5. agg_outputs: 指定一些统计值,统计是基于duration_seconds的异常时间区间计算的.
  6. rule: 从上述rule可以看到,可以指定任意多个rule
  7. 在设计上,Pulse 数据读自 Kafka 并将异常记录存入 Kafka,以便其他客户端进行订阅.

有了这样的rule提交给Pulse,它会给我们输出什么呢?

{"vin":"vin2","start_tsp_got_time":1674736685000,"start_esd_volt":55.0,"end_tsp_got_time":1674736705000,"end_esd_volt":62.0,"avg_esd_volt":59.0,"tag":"test"}
{"vin":"vin1","start_tsp_got_time":1674736685000,"start_esd_volt":55.0,"end_tsp_got_time":1674736705000,"end_esd_volt":58.0,"avg_esd_volt":57.67,"tag":"test"}

数据说明:

  1. start_XXX. 其中的XXX是在上述 rulenames_output指定的字段,其值是异常开始时间的值
  2. end_XXX. 其中的XXX是在上述 rulenames_output指定的字段,其值是异常结束时间的值
  3. avg_esd_volt. 是由上述 ruleagg_outputs指定的统计值,名称对应为eng_name,统计表达式由expression指定,这里计算电压均值.

第三目标: 电子围栏

上述的两个实时诊断目标是从数据记录方面划分的,在实际的实时应用中,还有一类问题,对指定目标的在指定地理区域的监控,专业术语称为电子围栏.比如4S店对试驾车的监控,驾校对教练车的监控等等,都属于这一类问题,电子围栏的常见需求是:

  1. 只监控部分目标,比如有一个车辆的列表,只监控这一部分;
  2. 指定一个监控地理范围,简单的情况可以指定一个点半径,矩形区域,复杂一点的可以是任意多边形区域;
  3. 当目标跨过电子围栏时,需要记录这一时刻的状态数据,比如时间,经纬度,由内向外还是由外向内,车辆ID等等.
  4. 当目标离开电子围栏时,还应时刻监控其轨迹.

由于这类问题比较普遍,需求也大致如此,所以它完全可以模块化产品化,Pulse 已经能够满足这些需求.

第四阶段: 数据仓库

数据仓库是大数据处理过程中非常重要的步骤,它的最重要的产出是指标体系,可用于展现为可视化易理解的图表,也可用于挖掘模型产出更为复杂更为抽象的数据价值,这个阶段的显著特点是:

  1. 工作量大,指标量大.
    为满足各种各样的业务需求,往往要编写数以百计千计的SQL任务,Hive任务或者是Spark任务,每个任务会产生几个几十个指标,根据指标之间的计算依赖关系或者业务逻辑关系,会产生指标层级和分类体系.维护这些指标是个很不小的工程.
  2. 输入输出数据表众多.
    不同类型的数据存储有不同的应用场景,所以不可避免地要面对多种数据源,从关系型数据库到文件型存储HDFS再到文档型数据库MongoDB,从内存型数据库Redis到键值型数据库Hbase,从行式存储到列式存储,形式多样,数据开发人员面临的问题就是做好它们之间的连接,数据转换和关联计算.
  3. 关系复杂.
    当某个指标计算有误,或者需要弄清楚这个指标的计算逻辑时,这个难度是很大的,文档的维护不仅困难而且作用也是有限的,因为指标的计算深度可能很大,可能涉及多个数据开发人员,可能涉及多个数据表.
  4. 技术难度较大.
    数据仓库有离线数仓和实时数仓,前者一般采用Hive QLSpark计算框架,后者一般采用Flink+Kafka,同时可能结合其他工具产品Impata,Kudu等等.

Bloods 正是要解决当前数据仓库建设当中的种种难题,它很特别的亮点有两个,1是零代码,2是数据血缘. 前者带来的好处是降低了数据仓库建设难度,后者带来的好处是无论数仓多么庞大,你仍然可以容易的掌握它的全貌它的细节,它的脉络它的枝叶.

数据仓库是个庞大的系统工程,但是无论什么样的业务场景,其建设过程无非如下这么几个关键步骤.

第一目标:数据标准化

什么是数据标准化呢,简单地说就是要有一致的语义,一个字段的含义要有明确的定义,无论是名称,值域,类型,计算逻辑,即便它们在不同的存储介质中,在不同的参与人员看来,都应该是一致的,这在数据表较多,数据字段较多的数据环境中,更显得尤为重要.

这个步骤也应该尽量在数据参与计算之前完成.Bloods 提供了一些简单实用的手段.

  1. 名称标准化
    如下的配置片段中,name_standardize 告知 Bloods,应该把名为product_id的字段更名为标准化的product_number,而product_number在另外一个字典文件中有定义,字典中定义了所有标准化的名称,你在输入product_number的时候,编辑器会给你智能提示,就像输入法的智能提示一下,如果输入错误,会给出错误提示.
    如此这般把成百上千的数据字段都定义完成,即完成了名称的标准化.
    在车联网领域,车机端的信号至少以千计,Nerve 整理了上千个信号并做了标准化定义,这使得车联网领域的信号标准化有了可以参考的标准线.
 <source_csv id="id_aggregation_before_name_standardize" path="csv_path_none"><!--standardized_name values are defined in standard_field.xsd--><name_standardize_list><name_standardize name="product_id" standardized_name="product_number"/><name_standardize name="seller_id" standardized_name="seller_number"/></name_standardize_list>
</source_csv>
  1. 数值标准化
    如下的配置片段中,value_standardize告知 Bloods,应该把名为amount字段的值做数值变换,把值域为[0,100]的数值线性变换到[0,1],如果值为40则会变换为0.4,这种情况很常见,比如车速在车机端的表示可能为[0,300000]用来表示最高车速为300公里/小时,之所以用300000来表示,是为了表示精度能达到0.001公里/小时,在实际计算时就需要把它变换为易于理解的[0,300]范围.
<source_csv id="id_aggregation_before_value_standardize" path="csv_path_none"><value_standardize_list><value_standardize name="amount" expression="0-100_0-1"/><value_standardize name="speed" expression="0-300000_0-300"/></value_standardize_list>
</source_csv>
  1. 逻辑值标准化
    如下的配置片段中,boolean_convert告知 Bloods,包含on_(其为正则表达式形式)的所有字段,如果其值是yes,那么把它转换为逻辑值true
<source_csv id="id_aggregation_before_boolean_convert" path="csv_path_none"><boolean_convert_list><boolean_convert true_values="yes" names_regex="on_"/></boolean_convert_list>
</source_csv>

上述的配置描述是接近自然语言的,容易编写容易理解,Bloods会把它们自动转换成大数据的代码,而不需要我们编写一行代码.

第二目标:数据治理

上面的数据标准化过程,其实咱们也可以说它是数据治理,跟这里要说的数据治理有所区别的是,标准化强调的是统一和规范,而这里强调的是,把一些相对异常的记录甚至是数组,不是简单的丢弃,而是尽量成可用的数据.
以实际的数据案例说明,更容易理解.

  1. 去除噪点和异常数据

假设有这样的数据:

    v1,v21,12,13,14,150,16,17,18,19,010,1

然后用这样的配置

<transform_data_governance id="id_transform_drop_if_exception" transform_ref="id_data_governance_csv"><data_governance_list><drop_if_exception column="v1" k="1.5"/></data_governance_list>
</transform_data_governance>

数据说明:
column 指定要对列v1做治理
k 用来描述异常的程度,值越大异常程度越大
drop_if_exception 表示如果指定列的某些值异常程度够大,那么相对应的行记录会被去除. 在这个数据案例中,50,1将会被删除.

  1. 去除符合条件的记录
    仍然是上面的示例数据,采用这样的配置
<transform_data_governance id="id_transform_drop_if_expression" transform_ref="id_data_governance_csv"><data_governance_list><drop_if_expression expression="v1 &gt; 8 and v2=0 "/></data_governance_list>
</transform_data_governance>

数据说明:
expression 如果表达式的值评估为true,那么该记录被删除.这里的表达式可以涉及多个字段,可以包含常用的数学函数,所以可以表达很丰富的逻辑.
drop_if_expression 使更丰富的逻辑表达变得容易.

  1. 插值法替换异常数据
    假设有这样的数据要处理:
vin,time,test
vin1,0,1
vin1,10,2
vin1,20,3
vin1,30,4
vin1,40,5
vin1,50,700
vin1,60,7
vin1,70,8
vin1,80,9
vin1,90,10
vin2,0,1

数据说明:
它代表的是车辆在一段时间内的数据,test是物理信号,可能是车速,电流,电压什么的.
testtime=50的时刻出现异常

由于数据的连续性,time=50时刻的数据,其实是可以推断出来的,而不适合丢弃处理.插值法替换处理可以这样做:

<transform_data_governance id="id_transform_interpolation_governance" transform_ref="id_interpolation_governance_csv"><data_governance_list><interpolation_governance column="test" time_column="time" partition_by="vin" k="1.6"/></data_governance_list>
</transform_data_governance>

数据说明

time_column 指定时间列名, 是以毫秒为单位的记录时间.
partition_by 指定分组列.不同的分组分别处理.
k 分位数异常检测的参数,其值越大异常程度越大.

正如咱们预想的,Bloods 会把值为700的异常值替换为6,这是根据插值法得到的.

  1. 数组异常用均值替换
+-----------------------------+-----------------------------+
|v1                           |v2                           |
+-----------------------------+-----------------------------+
|[1, 2, 3, 4, 5, 600, 7, 8, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[1, 2, 3, 4, 500, 6, 7, 8, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[1, 2, 3, 400, 5, 6, 7, 8, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[1, 2, 300, 4, 5, 6, 7, 8, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[1, 200, 3, 4, 5, 6, 7, 8, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[100, 2, 3, 4, 5, 6, 7, 8, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[1, 2, 3, 4, 5, 6, 700, 8, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[1, 2, 3, 4, 5, 6, 7, 800, 9]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
|[1, 2, 3, 4, 5, 6, 7, 8, 900]|[1, 2, 3, 4, 5, 600, 7, 8, 9]|
+-----------------------------+-----------------------------+

上面的数据格式是数组,数组项存在异常,可以采用这样的配置处理:

<transform_data_governance id="id_transform_set_as_avg_if_exception_array" transform_ref="id_source_json_array"><data_governance_list><set_as_avg_if_exception_array column="v1,v2"/></data_governance_list>
</transform_data_governance>

数据说明:
column 指定要治理的列,可以同时指定多个,每1列应该是数值数组.

这个配置的处理结果,会把数据的异常项用数组的均值替换.
Bloods还提供了其他多种数据治理手段,这里就不再一一列举了.

第三目标:数据血缘

按常理和时间先后看,先有指标体系后有数据血缘,这里先提出来,是为了对复杂的指标体系能有一个宏观的理解,避免一下子陷入泥潭.另外,在构建指标体系之前,也确实要提前勾画和设计一下数据血缘,但无论怎么设计,想提前勾画清楚全貌基本是不可能的,这是因为很少有业务场景允许我们这么做,业务实践往往是先完成一部分指标需求,再完成一批,逐步迭代完成的,所以指标体系也是逐渐丰富起来的.

  1. 整体血缘图

在命令行Bloods 的控制台中,在已经配置好指标体系的前提下,输入如下命令可以输出数据仓库项目的整体血缘图. 对一个比较大的数据仓库项目,这个血缘图可能会很大.

graph

输出的图表大概是长这个样子的:

从这样的血缘图表中我们得到很多信息:

  • 哪些数据源或者中间表被引用的较少.
    这可能在一定程度上说明,这些表没有充分的利用,或者实际价值不大,甚至有必须删除它或者把它整合到其他表中.
  • 哪些表被高频引用.
    和上述的情况相反,意味着它的价值很大,也同样意味着对这个表的"读"压力很大,如果是只读的文件还好,如果是数据库表呢,就要考虑一下优化策略了,比如读写分隔,大表拆成小表,宽表窄化等等.
  • 哪些表单入单出.
    这样的表基本上起着临时表的作用,如果它占用的存储空间较大,就要考虑下是否有必要合并到上游表或者下游表中了.
  • 哪些表是孤立的.
    这些表无入无出,如果是长期保持这样,这样的表基本上是没有价值的,是该考虑腾出空间和相应的计算任务了.
  • 哪些表深度较大.
    深度大意味着计算复杂度高,相应的指标计算步骤链条长,这样越难于理解和维护,除非确实有必要,否则应该尽量减小深度,比如把多个计算步骤合并为一步,避免或减少中间表的存在等等.
  • 哪些表是叶子节点.
    越是接近叶子节点,往往越接近于实际的业务需要,最能直接反映它的实际应用价值,如果对这个表的应用较少而相应的计算成本又很高,那这个节点就应该引起注意是否要优化掉了.
  1. 单表数据血缘
    上述血缘图是整体轮廓,当定位到具体的数据表,需要进一步分析时,我们就只关心某几个节点的血缘图了,在Bloods的控制台,使用up 这个命令就可以得到某个数据表的上游血缘图.
[rocy@Nervous iov_dw_samples]>up charge_statcharge_stat[run_date=20210110,input_dates=day_one,output_dates=day_one,path=/dw/trip_stat.xml]charge_trip_detail[run_date=20210110,input_dates=day_one,output_dates=day_one,path=/dw/trip_stat.xml]charge_trip[run_date=20210110,input_dates=day_one,output_dates=day_one,path=/dw/trip/charging.xml]transform_trip_split[run_date=20210110,input_dates=day_one,output_dates=day_one,path=/dw/trip/trip_split.xml]add_state[run_date=20210110,input_dates=day_one,output_dates=day_one,path=/dw/trip/trip_split.xml]tsp_normalized_data_day2[run_date=20210110,input_dates=day_two,output_dates=day_one,path=/dw/data_governance.xml]tsp_normalized_data_day[run_date=20210109-20210110,input_dates=day_one,output_dates=day_one,path=/dw/data_governance.xml[generated]]tsp_raw_data_day_command2_5[run_date=20210109-20210110,input_dates=day_one,output_dates=day_one,path=/dw/name_standardize.xml]tsp_normalized_data_day[run_date=20210110,input_dates=day_one,output_dates=day_one,path=/dw/data_governance.xml]tsp_raw_data_day_command2_5[run_date=20210110,input_dates=day_one,output_dates=day_one,path=/dw/name_standardize.xml]
[rocy@Nervous iov_dw_samples]>

在这个输出中,除了能得到整体血缘图中也有的血缘关系外,它排除了无关血缘,而且给出了数据表的详情信息,比如上下游的存储周期,根据哪天的计算得到的血缘关系,数据的存储位置是什么等等.

除了up,自然有down,相应的可得到数据表的下游血缘关系,根据这两个命令就足以在血缘的脉络中随意畅游分析了.

  1. 单表指标血缘
    整体血缘图和上述的单表数据血缘,它们针对的是数据表,而不是指标,指标通常是数据表的某个字段,这个指标从何而来怎么计算而来?这在排查指标计算错误时,是极为有用的.up_index 这个指令正是做这个的.
    假设我们有这样的计算配置:
<dw xmlns="https://www.rocy-data.com/dw/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://www.rocy-data.com/dw/v1.0.0 ../../.dw/dw.xsd" dw_market="market1" dw_subject="subject1"owner="whq" dw_level="ods"
><sources><source_csv id="id_sell_percent" path="csv_path_none"><computed_column_group><computed_column name="amount_percent" expression="amount * 0.8"/><computed_column name="money" expression="amount_percent * price"/></computed_column_group></source_csv></sources><transforms><transform_transform id="id_sell_add" transform_ref="id_sell_percent"><computed_column_group><computed_column name="amount_add" expression="amount_percent + 10"/><computed_column name="money_add" expression="amount_add * price"/></computed_column_group></transform_transform></transforms>
</dw>

然后执行如下命令:

up_index id_sell_add

输出是这样的:

{"amount_percent": {"exp": "amount_percent","bloods": {"id_sell_percent": {"amount_percent": {"exp": "amount * 0.8","bloods": {"id_sell_percent": ["amount"]}}}}},"money_add": {"exp": "amount_add * price","bloods": {"id_sell_percent": {"amount_percent": {"exp": "amount * 0.8","bloods": {"id_sell_percent": ["amount"]}},"price": {"exp": "price","bloods": {"id_sell_percent": ["price"]}}}}},"money": {"exp": "money","bloods": {"id_sell_percent": {"money": {"exp": "amount_percent * price","bloods": {"id_sell_percent": ["amount", "price"]}}}}},"amount_add": {"exp": "amount_percent + 10","bloods": {"id_sell_percent": {"amount_percent": {"exp": "amount * 0.8","bloods": {"id_sell_percent": ["amount"]}}}}}
}

从输出中可见:

1.bloods: 表明了指标血缘,从结果中可看出,它是嵌套定义的,所以这个血缘可能是任意深度.它还输出了血缘上的字段,以及计算所用的表达式.
2.从哪里来的: 指标 index1 来自哪些表.
3.怎么来的: 指标index1 是通过哪个表达式来计算的.
4.来源路线: 如果指标 index1 是通过两个指标 index2index3得到的,那么 index2index3又来自哪里.

有了这些工具做为分析利器,想定位指标的计算错误,想理清指标的计算步骤,就会很清晰了,这比用文档描述更为直接更为清楚,也更容易产出相应的文档.

第四目标:指标体系

无论指标体系多么复杂,大致都是通过如下几个步骤完成的,而其中定义的数据源,变换和Sink的步骤基本上对应着数据处理常说的ETL,而其中的T又尤其关键,多数的工作和时间都是消耗在这,它也决定着最终成果的规模和好坏.

创建项目

Bloods Console,输入如下命令:

create -t dw <dw project path>

项目结构即创建完成,其中的-t参数指定了4种数据仓库的类型之1,dw是最常见的一种,其他几种是涉及数据智能和车联网的.在创建 之后,<dw project path>下文件的改变即会被Bloods监控,以用于生成一些元数据,而这些元数据可用于智能提示配置文件的编写.最经常使用的是各种ID和文件路径的引用的智能提示.

定义数据源

数据源是数据的开始,示例中只是给出文本格式,不过常见的数据源它都是支持的,比如JDBC,parquet,csv等等.

<dw xmlns="https://www.rocy-data.com/dw/v1.0.0"xsi:schemaLocation="https://www.rocy-data.com/dw/v1.0.0  ../.dw/dw.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"dw_market="market1" dw_subject="subject1" owner="whq" dw_level="dws"><path_list><profile name="test"><path name="name_hello_world_path" path="dw/data/hello_world" pattern="none" parent="PROJECT_PATH"/></profile></path_list><sources><source_whole_text_files id="id_whole_text_files_source" path="name_hello_world_path" sort_columns="true"/></sources>
</dw>

数据说明:
source_whole_text_files 用于读取整个文件内容,类似于 source_csv
path 建议集中定义在某个文件,以便做统一管理.
智能提示 当定义完name_hello_world_path这个路径后,在定义id_whole_text_files_source通过path来引用这个路径时,智能提示会帮助完成
pattern 可以定义成yyyyMMdd这样的格式,以应对按日期\月\季度\年的存储
parent 用于指定父级路径定义

定义变换

<transforms><transform_transform id="id_word_count_stat_transform" transform_ref="id_whole_text_files_source"><computed_column_group><computed_column name="file_content_splits" expression="split(file_content,'[,\r\n ]+')"/></computed_column_group></transform_transform>
</transforms>

数据说明:
computed_column 用于添加计算列, 其中的expression 可以是任意的 hive 表达式,name 是新列名称。
transform_transform 是基本的数据变换,类似的节点还有 transform_join,transform_union 等等。更多的transform可参考这里
transform_ref 应被设定为输入数据的id,在这里是引用1个上游,而transform_join可引用2个上游,它使得这些action之间上下关联,形成树型结构.

定义Sink

<sinks><sink_show id="id_word_count_stat_sink" transform_ref="id_word_count_stat_transform"/>
</sinks>

数据说明:
sink: 通常对应的是存储行为,也可以是建模输出,而在这个示例中,是打印输出结果,以用于调试配置的正确性
执行: 在生产环境下,只有sink可被执行,但在本地Debug模式下,不仅是sink,source 和 transform 也都是可以执行的,用于检查任意节点的输出是否正确。
transform_ref: sink 的 transform_ref 应设定为 source 或者 transform 的 id
sink_show: 它是是用于调试目的的sink,类似的节点还有 sink_parquet,sink_csv…

调试执行

项目创建好之后,就已经包含了一些示例,可以直接输入命令sinks就能列出所有的任务,其中的id_word_count_stat_sink任务读取位于目录dw/data/hello_world的数据然后做单词统计word count and word file count,执行如下命令即可获取输出,其中参数t用于指定任务ID列表:

run -t id_word_count_stat_sink

项目部署

上述步骤均是在本地执行的,在确保逻辑正确性之后,即可进行集群部署。下述命令将生成部署脚本用于在集群上执行任务。

deploy

生成的脚本中,若不指定日期,默认以T-1执行,若不指定任务列表,将会执行所有的sinks

插件扩展

如上提到的source,transform,sink,虽然Bloods已经内置了很多常用的,但是难免会存在满足不了需要的场景,这个时候就需要做一下插件扩展了.实现起来也是力求简单的,基本上就是两步,第1步编写一个实现类并继承相应的接口,第2步把这个实现类打包并放置在Bloods的指定的插件路径下.
如果有一批的source,transform,sink需要扩展,比如是一个独特的领域银行,IOT或是科研领域,有自己独到的需求时,也是比较容易扩展实现的,这个称为流扩展.上面创建项目时指定的项目类型dw,其实就是内置的1种流扩展,其他的3种是di(用于数据智能数据挖掘算法),iov_dw(用于车联网指标体系建设),iov_di(用于车联网数据智能故障诊断)

批量处理

通常情况下,我们需要加载一个日期段的数据,比如这一周,这个月,这个季度,这一年等等.Range Date 能处理这个场景. 假设我们有如下数据.

  • 周-天的情况
orders_daydate=20220101order.csvdate=20220102date=20220103date=20220104date=20220105date=20220106date=20220107

我们设置range_datethis_week,它将加载这一周的数据.

<path_list><profile name="test"><path name="csv_path_day" path="${PROJECT_PATH}/dw/data/ods/orders_day" pattern="day"/></profile>
</path_list>
<transforms><transform_transform id="id_transform_this_week" transform_ref="id_source_csv_path_day"range_date="this_week"><measures><computed_column name="days" expression="count(distinct PARTITIONS)"/><computed_column name="min_day" expression="min(PARTITIONS)"/><computed_column name="max_day" expression="max(PARTITIONS)"/></measures></transform_transform>
</transforms>
<sources><source_csv id="id_source_csv_path_day" path="csv_path_day" range_date="day_one"/>
</sources>
#运行如下命令来检查
run -t id_transform_this_week -d 20220105
输出如下,请注意 星期一(20220103)是一周的第一天.
+----+-------------+-------------+
|days|min_day      |max_day      |
+----+-------------+-------------+
|3   |date=20220103|date=20220105|
+----+-------------+-------------+

除此之外,还有月级\季度\半年度\年度方面的处理能力,而且不只在输入日期上可以批量处理,在输出方面也具有同样的能力,这里有比较详细的文档说明.

调度能力

上面的指标体系配置,如果你希望以小时为单位来调度任务,那么需要你自己做一些工作比如用crontab来调度,但是如果是以天单位的任务调度,Bloods内置了一些调度能力,使调度工作更为灵活.
在框架内部,Bloods 分析 节点之间的 DAG(有向无环图)关系,自动调度和执行任务. 值得一提的是,更多的配置是基于以为单位的.


<dw xmlns="https://www.rocy-data.com/dw/v1.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="https://www.rocy-data.com/dw/v1.0.0 ../../.dw/dw.xsd" dw_market="market1" dw_subject="subject1"owner="whq" dw_level="ods"
><sinks><!--variables:year,month,day,week,month_end.--><sink_show id="id_sink_schedule" transform_ref="id_test_constants" sink_cycle="daily"/><sink_show id="id_sink_schedule_year1" transform_ref="id_test_constants" sink_cycle="daily" sink_cycle_pattern="year==2022"/><sink_show id="id_sink_schedule_year2" transform_ref="id_test_constants" sink_cycle="daily" sink_cycle_pattern="!(year==2022)"/><sink_show id="id_sink_schedule_month_day1" transform_ref="id_test_constants" sink_cycle="daily" sink_cycle_pattern="month==10 &amp;&amp; day==1"/><sink_show id="id_sink_schedule_month_end" transform_ref="id_test_constants" sink_cycle="daily" sink_cycle_pattern="month==10 &amp;&amp; day==month_end"/><sink_show id="id_sink_schedule_weekly" transform_ref="id_test_constants" sink_cycle="weekly"/></sinks>
</dw>
  • 周期性调度

这是常见的情况,sink 周期包括:

  1. daily.
    每天执行.
  2. weekly.
    每周的 SUNDAY执行
  3. monthly.
    每月的最后一天执行.
  4. quarterly.
    每个季度的最后一天即 0331, 0630, 0930, 1231执行
  5. half_yearly.
    半年的最后一天即 0630, 1231执行
  6. yearly.
    每年的最后一天即 1231执行
  • 自定义调度

但是我们仍然可以个性化定义任务的调度,这由sink_cycle_pattern 表达式来做到. 在表达式中,我们可以应用数学及逻辑表达式来返回一个布尔值.
如果这个表达式评估为 false 那么这个任务今天(run date)就不会执行.
很多变量是内置支持的:

  1. year
    运行日期的当前年份,如果 run date 是20221001,它的值将为2022
  2. month
    运行日期的当前月份,如果 run date 是20221001,它的值将为10
  3. day
    运行日期的当天,如果 run date 是20221001,它的值将为1
  4. week
    运行日期的周几,如果 run date 是20221001,它的值将为6
  5. month_end
    运行月份的最后一天,如果 run date 是20221001,它的值将为31

第五阶段: 数据智能

若馨数据的Bloods成功之处在于,她让一众数据开发者从繁杂的数据处理中解放出来,实现了从编码到配置(零代码)的转变,开发效率极大提升,数据维护起来更显清晰,而它的技术本质是:数据开发人员用接近自然语言的XML像搭积木一样描述处理逻辑,而Bloods把这些描述自动转化成Spark代码. 这样的思路不仅限于上述的指标体系建设,同样也适用于Bloods 数据智能,数据挖掘和AI,如果你是Spark ML的爱好者,如果你想快速融入到智能的模型算法中来,这是一个很好的起点,你可以查看这里得到更为详细的内容.

第一目标:模型训练

模型训练最常见的工作是什么呢?参数调优!而参数调优就是配置,不同的配置即不同的模型不同的效果,这用Bloods DI来零代码配置是很自然的事情,多数情况我们并不需要深入到算法中深入到代码的细微控制中,如下的各类算法常见工作示例,足以让人体会到模型训练的简洁而又清晰.

聚类

如下示例中,id_clustering_lda 读取数据 source_sample_lda_libsvm_data 然后应用 LDA 算法训练它并将模型存储到 path_clustering_lda_model

<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq" xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0  ../.dw/di.xsd" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_ml_model_invoke id="lda_model_invoke_describeTopics" path="path_clustering_lda_model" method_name="describeTopics" method_parameters="3"/><source_ml_model_invoke id="lda_model_invoke_logLikelihood" path="path_clustering_lda_model" method_name="logLikelihood" method_parameters="source_sample_lda_libsvm_data"/><source_ml_model_invoke id="lda_model_invoke_logPerplexity" path="path_clustering_lda_model" method_name="logPerplexity" method_parameters="source_sample_lda_libsvm_data"/></sources><transforms><transform_ml_estimator_clustering_km id="id_kmeans" transform_ref="source_sample_kmeans_data" k="2" seed="1"/><transform_ml_estimator_clustering_lda id="id_clustering_lda" transform_ref="source_sample_lda_libsvm_data" k="10" max_iter="10" model_path="path_clustering_lda_model"/></transforms><sinks><sink_ml_evaluator_clustering id="id_evaluator_clustering" transform_ref="id_kmeans"/></sinks>
</dw>

现在我们来使用source_ml_model_invoke来检查一下模型.

run -t lda_model_invoke_describeTopics
+-----+-----------+---------------------------------------------------------------+
|topic|termIndices|termWeights                                                    |
+-----+-----------+---------------------------------------------------------------+
|0    |[2, 5, 7]  |[0.10596582700827585, 0.10560579109860191, 0.10421656683012902]|
|1    |[1, 6, 2]  |[0.10177934362911985, 0.09812186737848058, 0.09628916613024666]|
|2    |[1, 9, 4]  |[0.10587329435318685, 0.09746396510036567, 0.09650800754627996]|
|3    |[5, 4, 0]  |[0.16140487918106045, 0.13157416711460962, 0.12125555977641359]|
|4    |[9, 6, 4]  |[0.10444172332018084, 0.1040635944390557, 0.10097465247362353] |
|5    |[10, 6, 3] |[0.18500622562463037, 0.16489028958649327, 0.15527004414864845]|
|6    |[3, 7, 4]  |[0.11621765255437844, 0.0989645753475578, 0.09790795515141672] |
|7    |[4, 0, 2]  |[0.10844113271172434, 0.10326267091975808, 0.10028860890038724]|
|8    |[0, 7, 8]  |[0.10995536322709686, 0.09914310583037018, 0.09806206271783646]|
|9    |[9, 6, 8]  |[0.1009940937221744, 0.10007205188894182, 0.0976478953418414]  |
+-----+-----------+---------------------------------------------------------------+
run -t lda_model_invoke_logLikelihood
+------------------+
|logLikelihood     |
+------------------+
|-788.3752801566864|
+------------------+

再评估一下模型的效果如何:

run -t id_evaluator_clustering

输出为 :

-RECORD 0------------------------------Evaluator Result | 0.9997530305375207

协同过滤

现在让我们在训练数据上使用ALS算法构建推荐模型

<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq"xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0  ../.dw/di.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_ml_model_invoke id="id_als_invoke_recommendForAllUsers" path="path_als_model" method_name="recommendForAllUsers" method_parameters="10" setColdStartStrategy="drop"/><source_ml_model_invoke id="id_als_invoke_recommendForAllItems" path="path_als_model" method_name="recommendForAllItems" method_parameters="10" setColdStartStrategy="drop"/><source_ml_model_invoke id="id_als_invoke_recommendForUserSubset" path="path_als_model" method_name="recommendForUserSubset" method_parameters="sample_movielens_ratings,10" setColdStartStrategy="drop"/><source_ml_model_invoke id="id_als_invoke_recommendForItemSubset" path="path_als_model" method_name="recommendForItemSubset" method_parameters="sample_movielens_ratings,10" setColdStartStrategy="drop"/></sources><transforms><transform_ml_utility_random_split id="id_random_split_get_training" transform_ref="sample_movielens_ratings"random_radio="7,3" pick_index="0"/><transform_ml_utility_random_split id="id_random_split_get_test" transform_ref="sample_movielens_ratings"random_radio="7,3" pick_index="1"/><transform_ml_utility_random_split id="id_random_split_random_test" transform_ref="sample_movielens_ratings"random_radio="2,13" pick_index="1"/><transform_ml_estimator_recommendation_als id="id_als_training" model_path="path_als_model"transform_ref="id_random_split_get_training" max_iter="5"reg_param="0.01" user_col="userId" item_col="movieId"rating_col="rating"/><transform_ml_model_recommendation_als id="id_als_test" transform_ref="id_random_split_get_test"cold_start_strategy="drop"estimator_id="id_als_training"/><transform_ml_model_recommendation_als id="id_als_test_path" transform_ref="id_random_split_get_test"cold_start_strategy="drop"model_path="path_als_model"/></transforms><sinks><sink_ml_evaluator_regression id="id_evaluator_als" transform_ref="id_als_test" metric_name="rmse" label_col="rating" prediction_col="prediction"/></sinks>
</dw>
  1. id_als_training 读取数据id_random_split_get_training,设置它的关键参数 user_col="userId" item_col="movieId",训练它并将模型存储到 path_als_model
run -t id_als_training

运行如上命令来训练模型.
2. id_als_test 读取数据 id_random_split_get_test 并用如上模型来做预测

+---------------------+------+-------+------+----------+----------+
|value                |userId|movieId|rating|timestamp |prediction|
+---------------------+------+-------+------+----------+----------+
|13::31::1::1424380312|13.0  |31.0   |1.0   |1424380312|0.9999178 |
|5::31::1::1424380312 |5.0   |31.0   |1.0   |1424380312|0.9934486 |
|1::85::3::1424380312 |1.0   |85.0   |3.0   |1424380312|2.9621942 |
|1::81::1::1424380312 |1.0   |81.0   |1.0   |1424380312|1.0560087 |
|6::81::1::1424380312 |6.0   |81.0   |1.0   |1424380312|1.0258846 |
+---------------------+------+-------+------+----------+----------+
only showing top 5 rows
  1. id_evaluator_als 读取数据 id_als_test 然后用rmse 方法来评估效果
run -t id_evaluator_als

输出为:

-RECORD 0------------------------------Evaluator Result | 1.9722660132618175

分类及回归

如下XML配置中, 读取数据training 然后应用 logistic_regression算法做训练,然后将模块存储到 lr_model,各个参数的配置均有智能提示,配置体验还是很不错的.

<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq"xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0  ../.dw/di.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_ml_model_file id="id_model_check" path="lr_model" is_output_metadata="false"/></sources><transforms><transform_ml_estimator_classification_logistic_regression id="lr" transform_ref="training"max_iter="10" reg_param="0.3"elastic_net_param="0.8" model_path="lr_model"/></transforms>
</dw>

执行如下的命令即可训练模型.

run -t lr

执行如下的命令即可检查一下模型的截距和系数

run -t id_model_check -v true

抽取变换特征提取

<dw dw_level="tmp" dw_subject="subject1" dw_market="RX-DATA" owner="whq"xsi:schemaLocation="https://www.rocy-data.com/di/v1.0.0  ../.dw/di.xsd"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns="https://www.rocy-data.com/di/v1.0.0"><sources><source_sequence_constants id="sentenceData" quote_char="'"><sequence_constant column_name="label" column_values="0.0,0.0,0.1"/><sequence_constant column_name="sentence" column_values="'Hi I heard about Spark','I wish Java could use case classes','Logistic regression models are neat'"/></source_sequence_constants><source_sequence_constants id="dataFrame"><sequence_constant column_name="label" column_values="0,1,2"/><sequence_constant column_name="features" column_values="1.0 0.5 -1.0,2.0 1.0 1.0,4.0 10.0 5.0"/></source_sequence_constants></sources><transforms><transform_ml_transformer_feature_tokenizer id="id_feature_tokenizer" transform_ref="sentenceData" input_col="sentence" output_col="words"/><transform_ml_transformer_feature_hashing_tf id="hashingTF" transform_ref="id_feature_tokenizer" output_col="rawFeatures" input_col="words" num_features="20"/><transform_ml_estimator_feature_idf id="idf" transform_ref="hashingTF" input_col="rawFeatures" output_col="features" model_path="path_idf_model"/><transform_ml_transformer_feature_normalizer id="normalizer" transform_ref="dataFrame" input_col="features" output_col="normFeatures" p="1.0"/><transform_ml_transformer_feature_sqlt id="sqlTrans" transform_ref="sentenceData" statement="select label,sentence,concat(label,sentence) as concat from __THIS__"/></transforms>
</dw>

上述示例是这样工作的:

  1. id_feature_tokenizer 读取数据 sentenceData,对列 sentence 进行分词得到 words
+-----+-----------------------------------+------------------------------------------+
|label|sentence                           |words                                     |
+-----+-----------------------------------+------------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|
|0.1  |Logistic regression models are neat|[logistic, regression, models, are, neat] |
+-----+-----------------------------------+------------------------------------------+
  1. hashingTFwords 数组转换为 vector feature
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+
|label|sentence                           |words                                     |rawFeatures                              |
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |
|0.1  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+
  1. idf 应用 IDF 算法生成 feature 并将模型存储到 path_idf_model
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|label|sentence                           |words                                     |rawFeatures                              |features                                                                                                              |
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
|0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |(20,[0,5,9,17],[0.6931471805599453,0.6931471805599453,0.28768207245178085,1.3862943611198906])                        |
|0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |(20,[2,7,9,13,15],[0.6931471805599453,0.6931471805599453,0.8630462173553426,0.28768207245178085,0.28768207245178085]) |
|0.1  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|(20,[4,6,13,15,18],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.28768207245178085,0.6931471805599453])|
+-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
  1. normalizer 读取数据 dataFrame 然后把 features 做标准化变成 normFeatures
+-----+--------------+-----------------------------------------------------------+
|label|features      |normFeatures                                               |
+-----+--------------+-----------------------------------------------------------+
|0.0  |[1.0,0.5,-1.0]|[0.4,0.2,-0.4]                                             |
|1.0  |[2.0,1.0,1.0] |[0.5,0.25,0.25]                                            |
|2.0  |[4.0,10.0,5.0]|[0.21052631578947367,0.5263157894736842,0.2631578947368421]|
+-----+--------------+-----------------------------------------------------------+
  1. sqlTrans 读取数据 sentenceData 然后应用 SQL变换 select label,sentence,concat(label,sentence) as concat from __THIS__ .

第二目标:异常发现

以车联网的实际案例为例,分析车辆故障其实是在分析车辆端收集的物理信号或者经过计算得到的一些指标,如下几种分析均属于异常发现领域:

  1. 电池容量 & 循环充电次数
  2. 电池容量 & 使用次数
  3. 充电温度异常
  4. 充电电压 & 电流
    这里以第1种情况说明一下:
    假如我们有这样的输入数据,输入字段分别表示循环充电次数\电池容量\模型及车辆ID
+----------------+--------------------+-----+---+
|acc_cyclic_times|capacity_attenuation|model|vin|
+----------------+--------------------+-----+---+
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|60              |0.7                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m1   |v1 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|70              |0.6                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
|20              |0.1                 |m2   |v2 |
+----------------+--------------------+-----+---+

然后简单地做如下配置:

<transform_ml_iov_exception_capacity_usage_time id="id_transform_iov_exception_capacity_usage_time"transform_ref="id_source_iov_exception_capacity_usage_time"acc_usage_time_col="acc_usage_time"capacity_attenuation_name_col="capacity_attenuation"/>

即可得到异常结果:

[rocy@Nervous iov_di_samples]>run -t id_transform_iov_exception_capacity_usage_time
Running Data Day:20210106
NO sinks to run
running id_transform_iov_exception_capacity_usage_time/20210106
+--------------+--------------------+-----+---+------------+----------+
|acc_usage_time|capacity_attenuation|model|vin|features    |prediction|
+--------------+--------------------+-----+---+------------+----------+
|2000          |0.3                 |m1   |v8 |[2000.0,0.3]|1         |
+--------------+--------------------+-----+---+------------+----------+

第三目标:模式发现

IOV T-BOX 数据有大量的报警信号,即便是领域专家也很难分析他们.但是不容置疑的是,这些报警信号之间是存在某种关联的,比如某个报警信号A出现后大概率会出现B信号.这种模式可以通过如下办法得到:
假如有如下的输入数据:

{"vin": "v1", "time": 1, "alarm_common_brk": true, "alarm_common_dcdc_st": true, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 2, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": true, "alarm_common_esd_charge_over": true}
{"vin": "v1", "time": 3, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": true, "alarm_common_esd_charge_over": true}
{"vin": "v1", "time": 4, "alarm_common_brk": true, "alarm_common_dcdc_st": true, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 5, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 6, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 7, "alarm_common_brk": true, "alarm_common_dcdc_st": true, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 8, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": true, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 9, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": true}
{"vin": "v1", "time": 10, "alarm_common_brk": true, "alarm_common_dcdc_st": true, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 11, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": true, "alarm_common_esd_charge_over": true}
{"vin": "v1", "time": 12, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": true, "alarm_common_esd_charge_over": true}
{"vin": "v1", "time": 13, "alarm_common_brk": true, "alarm_common_dcdc_st": true, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 14, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 15, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 16, "alarm_common_brk": true, "alarm_common_dcdc_st": true, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 17, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": true, "alarm_common_driver_motor_temp": true, "alarm_common_esd_charge_over": false}
{"vin": "v1", "time": 18, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": true}
{"vin": "v1", "time": 19, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": true}
{"vin": "v1", "time": 20, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": true}
{"vin": "v2", "time": 1, "alarm_common_brk": false, "alarm_common_dcdc_st": false, "alarm_common_dcdc_temp": false, "alarm_common_driver_motor_temp": false, "alarm_common_esd_charge_over": true}

我们做如下的配置:

<transform_iov_exception_pattern_extract id="id_transform_iov_exception_pattern_extract"transform_ref="id_source_iov_exception_pattern_extract"alarms_cols="alarm_common_brk,alarm_common_dcdc_st,alarm_common_dcdc_temp,alarm_common_driver_motor_temp,alarm_common_esd_charge_over"partition_col="vin"time_col="time"/>

数据说明:
alarms_cols: 上述由alarms_cols指定的报警信号可以是任意多个.
partition_col: 用于指定分组字段.
time_col: 用于指定数据采集的时间.
transform_ref: 用于指定上游数据.

然后在Bloods控制台输入命令即可得到模式结果:

[rocy@Nervous iov_di_samples]>run -t id_transform_iov_exception_pattern_extract
Running Data Day:20210106
NO sinks to run
running id_transform_iov_exception_pattern_extract/20210106
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+------------------+-------------------+
|from                                                                                  |to                                                                                    |confidence        |support            |
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+------------------+-------------------+
|[alarm_common_brk, alarm_common_dcdc_st]                                              |[alarm_common_dcdc_temp, alarm_common_driver_motor_temp]                              |0.3333333333333333|0.46153846153846156|
|[alarm_common_brk, alarm_common_dcdc_st]                                              |[alarm_common_dcdc_temp]                                                              |0.3333333333333333|0.46153846153846156|
|[alarm_common_brk, alarm_common_dcdc_st]                                              |[alarm_common_dcdc_temp, alarm_common_driver_motor_temp, alarm_common_esd_charge_over]|0.3333333333333333|0.46153846153846156|
|[alarm_common_dcdc_temp]                                                              |[alarm_common_brk, alarm_common_dcdc_st]                                              |1.0               |0.15384615384615385|
|[alarm_common_dcdc_temp, alarm_common_driver_motor_temp]                              |[alarm_common_esd_charge_over]                                                        |1.0               |0.15384615384615385|
|[alarm_common_dcdc_temp, alarm_common_driver_motor_temp, alarm_common_esd_charge_over]|[alarm_common_brk, alarm_common_dcdc_st]                                              |1.0               |0.15384615384615385|
|[alarm_common_esd_charge_over]                                                        |[alarm_common_brk, alarm_common_dcdc_st]                                              |1.0               |0.07692307692307693|
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+------------------+-------------------+

第四目标:事件预测

事件预测实际上是模型算法的一个实践应用,电池健康度衰减预测就是实际应用之一,在车联网领域,对电池健康的预测是比较困难的,因为它涉及的因素很多,但理论上它是可行的,只要相关的因素收集的足够精准足够多,那么预测结果也会更接近实际值.
假设我们已经具备这样一些统计指标:

+------+-------------------+------------------+--------------+--------------------+--------------------+
|health|med_charge_duration|med_charge_end_soc|med_charge_soc|med_charge_start_soc|sum_charge_soc_times|
+------+-------------------+------------------+--------------+--------------------+--------------------+
|90    |500                |78                |1000          |45                  |187                 |
|87    |530                |78                |2000          |55                  |187                 |
|84    |560                |68                |3000          |45                  |284                 |
|81    |510                |68                |2000          |55                  |381                 |
|77    |570                |68                |4000          |45                  |477                 |
|74    |640                |88                |4000          |35                  |274                 |
|71    |610                |88                |5000          |25                  |571                 |
|68    |680                |88                |6000          |25                  |668                 |
|64    |740                |98                |4000          |25                  |764                 |
|63    |730                |98                |5000          |25                  |563                 |
|62    |720                |98                |6000          |15                  |862                 |
|60    |800                |98                |7000          |15                  |860                 |
|58    |880                |98                |8000          |15                  |858                 |
|53    |830                |10                |9000          |5                   |1153                |
|51    |910                |100               |10000         |5                   |1151                |
+------+-------------------+------------------+--------------+--------------------+--------------------+

然后做如下的配置:

<transform_ml_iov_health_factor_analysistransform_ref="id_source_iov_health_factor_analysis"id="id_transform_iov_health_factor_analysis"health_col="health"med_charge_duration_col="med_charge_duration"med_charge_start_soc_col="med_charge_start_soc"med_charge_end_soc_col="med_charge_end_soc"sum_charge_soc_times_col="sum_charge_soc_times"med_charge_soc_col="med_charge_soc"/>

我们就可以得到电池健康度跟其他输入指标之间的关联关系数学表达式:

[rocy@Nervous iov_di_samples]>run -t id_transform_iov_health_factor_analysis
Running Data Day:20210106
NO sinks to run
running id_transform_iov_health_factor_analysis/20210106
+---------------------+--------------------+---------------------+---------------------+---------------------+----------------------+-----------------+
|med_charge_duration  |med_charge_start_soc|med_charge_end_soc   |sum_charge_soc_times |sum_charge_soc_times |med_charge_soc        |constant         |
+---------------------+--------------------+---------------------+---------------------+---------------------+----------------------+-----------------+
|-0.036789554905818964|0.14260919055819518 |-0.004554520647164639|-0.006634642511633963|-0.006634642511633934|-1.8557214490735148E-4|99.92485117304537|
+---------------------+--------------------+---------------------+---------------------+---------------------+----------------------+-----------------+

这样当输入新的一批指标时,我们就可以量化健康度了,如果指标是实时输入的,健康度也就可以实时计算了.
更进一步,假如有了一批历史上的健康度数值,那么通过回归计算,我们就可以预测未来某个日期的健康度了,相对地,指定一个健康值,Bloods也可以告诉我们,未来的哪个日期车辆将会衰减到这个健康度,这就达到预测的效果了.
示例如下,输入的数据很简单:

+--------+------+
|date    |health|
+--------+------+
|20210101|90    |
|20210201|87    |
|20210301|84    |
|20210401|81    |
|20210501|77    |
|20210601|74    |
|20210701|71    |
|20210801|68    |
|20210901|64    |
|20211001|63    |
|20211101|62    |
|20211201|60    |
|20220101|58    |
|20220201|53    |
|20220301|51    |
+--------+------+

配置也很简单:


<transform_ml_iov_health_down_predict id="id_transform_iov_health_down_predict"transform_ref="id_source_iov_health_down_predict" date_col="date"health_col="health" health_to="20"/>

预测结果也很清楚:

[rocy@Nervous iov_di_samples]>run -t id_transform_iov_health_down_predict
Running Data Day:20210106
NO sinks to run
running id_transform_iov_health_down_predict/20210106
+---------------------+--------+
|predict_date         |predict |
+---------------------+--------+
|1.6766845868672517E12|20230218|
+---------------------+--------+

第六阶段: 数据部署

大数据的任务,在一个公司里或是在一个项目上,都会有大量的任务,怎么让它们有序的执行,是任何数据开发人员都需要面对的问题.在资源方面的任务调度有Yarn,在一个项目一个JAR内的任务有上述的Bloods,而在一个团队的多个数据开发人员之间或者是多个团队之间的大数据任务该怎么调度呢,这些任务可能属于不同的JAR包,可能属于不同的数据开发人员,任务之间的依赖既有数据文件上的依赖,也可能是某个逻辑条件的依赖比如数据库中的某个查询是否有记录等等,这样的任务该如何编排呢?

市场上已经有很多相关的产品,各有利弊,不过市场上的多数产品都需要这样一个工作,需要人工指定任务的依赖关系,指定任务的调度周期执行队列所需要的资源等等,这些工作其实是很繁杂的,Brain任务调度,就是为了尽一步简化这一工作,它的理论基础很简单,如果您是数据开发人员,会很容易理解:任务的依赖可以通过任务ID引用来表达,也可以通过逻辑表达式来表达,甚至也可以通过指定一个JDBC的查询来表达,如果这些依赖能够通过反射的形式来得到,不就可以实现任务依赖的自动解析了嘛,Brain正是这样做的.

第一目标:任务调度

先体会[Brain任务调度]的安装示例,便能马上领略它的简单和大致原理.

安装示例中含有两个JAR包,分别含有一些可以调度的任务. 执行如下步骤:

  1. 启动Hadoop,然后把yarn的路径配置到conf/application.properties,将文件中的http://localhost:8088替换成实际的值
scheduler.yarn_rest_url=http://localhost:8088
scheduler.jars=../samples/bigdata-scheduler-test2-1.0.0.jar,../samples/bigdata-scheduler-test2-1.0.0.jar
scheduler.boostrap_jars=../lib
  1. 执行如下命令
cd ${ROCY_DATA_HOME}/bloods-1.0.0/bin
./start-brain-scheduler.sh

然后示例中的JAR即提交到Yarn执行,而JAR中的任务也按照自动分析得到的依赖关系执行起来,所以接下来的仅有的工作,就是把你的JAR包放到scheduler.jars指定的位置.

Brain架构

下图是Brain的架构,需要重点关注的一些关键点如下:

  • 提交任务的方法是提交一些jar,每个jar可以包含许多任务,它们的依赖关系是自动分析的。

  • 可以随时提交 jar。

  • 在接受任何一个任务后选择最佳队列,因为这时集群中的剩余资源可能会发生改变。

  • 开发人员可以调试任务。如果以前的jar包含一些错误,他(她)可以重新提交jar。

  • 依赖关系可以跨jar。任务可以依赖于其他jar中的任务。

调度配置

下列配置位于文件 application.properties

  • scheduler.jars
    必赞项.逗号分隔的jar文件或目录路径,Brain将监视目录中的这些jar,根据所有的jar生成DAG任务树。这些jar处于热加载模式,因此您不需要重新启动Brain.
  • scheduler.boostrap_jars
    任务中依赖的的所有jar都应该放在这里,包括Brain的jar,在运行时,这些jar将被上传到集群。
  • scheduler.priorities
    配置示例:scheduler.priorities=com.rocy.data.bigdata.scheduler.test.spark.Spark01=1,com.rocy.data.bigdata.scheduler.test.spark.Spark02=2,逗号分隔多个任务,完整类名和优先级由=分隔,更小的数字有更高的优先级.
  • scheduler.ignore_tasks
    任务有两个办法可以忽略执行.
  1. IgnoreSchedule
    实现接口 IgnoreSchedule.
object Root01 extends TSchedulable with IgnoreSchedule
  1. 通过配置项scheduler.ignore_tasks
    逗号分隔的多个完整类名,它的好处是不需要数据开发人员更改代码.
  • scheduler.state_save_seconds
    状态管理器将在这个时间间隔后将运行状态的快照保存到文件或数据库中.
  • scheduler.queue_sync_seconds
    用于yarn的队列状态同步,此信息用于为Spark任务选择最佳队列。
  • scheduler.yarn_rest_url
    用于获取这些信息:

    • 获取集群信息,例如,剩余内存和CPU Core等资源。
    • 获取用于选择最佳队列的队列信息。
    • 获取应用程序的状态。
  • scheduler.task_count_parallel
    同时运行任务的个数。
  • default.state_manager
    用于扩展状态管理器. 你的实现应该继承 com.rocy.data.schedule.run.TStateManager,内部的默认实现是 ParquetStateManager,这保存任务状态到这个目录rocy_scheduler/states.
  • default.queue_select
    用于扩展队列选择算法. 你的实现应该继承 com.rocy.data.schedule.run.TQueueSelect,内部的默认实现是选择具有最多资源的队列.

第二目标:任务血缘

Brain提供图表来显示计划任务的当前运行状态,它们可以帮助您分析错误的原因。
Brain根据依赖关系对调度的任务进行聚类,您可以在Graph表的右上角切换不同的聚类。
如果您有聚类1聚类2,那么聚类1中的任何任务都与聚类2没有关系。
在实际的大数据任务工作中,我们可以有很多聚类,有些聚类可以有依赖链路非常深的任务。

第三目标:任务监控

在状态表中,有任务运行列表的详细属性。


  • 任务甘特图
    在下面的任务甘特图中,Y轴列出任务名称,X轴是时间。
    图形中的矩形,左侧对应运行时间,右侧对应完成时间,文本显示为时间跨度,以秒为单位。
    如果您的任务聚类很庞大,或者任务的时间跨度更长,可以通过水平和垂直滚动条进行放大.

和数据开发者一起成长

如上这些就是大数据全生命周期中,我们最常见到的一些工作了,看起来内容较多,但在Bloods等一系列产品的精心组装之下,每个工作做起来都要简单很多,这让数据开发变成了让人愉悦的事情.当然,它们虽然收获了不小的成就,但还在成长中,好在它们提供了相当丰富的开发接口,可以让数据开发的实践者参与进来,让它更丰富更好用.

全生命周期大数据处理系列相关推荐

  1. 从全生命周期管理角度看大数据安全技术研究

    从全生命周期管理角度看大数据安全技术研究 李树栋1,2, 贾焰2, 吴晓波3, 李爱平2, 杨小东4, 赵大伟5 1. 广州大学网络空间先进技术研究院,广东 广州 510006 2. 国防科技大学计算 ...

  2. 大数据在锂电池产品全生命周期中的应用

    大数据在锂电池全生命周期管理中的应用 摘要:在数字经济时代,大数据已被视为一种重要的生产要素.通过数据采集.清洗.分析和建模,将隐藏在数据背后的客观规律充分挖掘利用,成为促进企业智能化管理和产业升级的 ...

  3. 大数据开源项目,一站式全自动化全生命周期运维管家ChengYing(承影)走向何方?

    原文链接:三分钟走进袋鼠云一站式全自动化全生命周期运维管家ChengYing(承影) 课件获取:关注公众号 ** "数栈研习社",后台私信 "ChengYing" ...

  4. 神策数据杨宁:券商财富管理数字化转型客户全生命周期解读

    收入下滑.客户增长红利消失,中国证券业进入转型期.历时三个月,我们对市面上 49 家大中型券商进行了深度调研,本文根据<财富管理数字化转型现状与趋势洞察报告>系列解读课 1 中神策数据业务 ...

  5. PPT 下载 | 神策数据张涛:企业服务客户全生命周期运营三步曲客情诊断 解决方案库...

    本文根据神策数据副总裁张涛关于企业服务客户全生命周期系列的直播内容整理,共 3 篇,上篇回顾<PPT 下载 | 神策数据张涛:企业服务客户全生命周期运营三步曲总览篇>,本篇主要内容如下: ...

  6. PPT 下载 | 神策数据张涛:企业服务客户全生命周期运营三步曲总览篇

    本文根据神策数据副总裁张涛关于企业服务客户全生命周期系列的直播内容整理,共 3 篇,本篇主要内容如下: 为什么要做企业服务客户全生命周期运营? 客户全生命周期运营存在哪些坑? 面对坑,企业该如何自救? ...

  7. 如何保障企业数据资产的全生命周期安全?看这篇就够了

    摘要:随着国家大数据战略的不断推动和深化,做好数据安全治理成为了极大挑战.我们很难在繁多的数据安全能力中去构建适合自己安全体系,业界也缺乏具有指导建设意义的数据安全产品. 什么是数据?可以是音乐,时间 ...

  8. 热温冷冰,数据存储需要全生命周期管理

    随着云计算.大数据等新兴应用广泛普及,业务数据呈现爆炸式增长,海量数据的高效访问.经济存储和智能管理变得越来越重要. 从数据生命周期来看,在数据刚生成并使用时,属于在线数据,在线数据访问的频率最高,数 ...

  9. 对话《旅行青蛙》制作团队:游戏就是将现实中的不可能变为可能 | 覆盖客户全生命周期管理,神州云动六朵云来袭

    每一个企业级的人  都置顶了 中国软件网 中国软件网  为你带来最新鲜的行业干货 小编点评 自从我养了青蛙以后 心里多了一份牵挂 娃儿的吃的够不够 帐篷好不好 当然啦 作为一个互联网人 我一边牵挂我得 ...

最新文章

  1. 为什么RStudio Server这么慢?
  2. 制药行业SAP项目里的那些LIMS系统
  3. AI就是“大数据+机器学习”?答案是否定的
  4. Windows7 Home高级 64 中文版 + TortoiseSVN 64 英文版 + SVN Server 32 英文版安装过程
  5. android ,动态布局 butterknife,与Butterknife绑定在android中动态添加视图
  6. 【Python作业】定义动物类Animal,...编写猫类Cat...
  7. 《高性能JavaScript》第七章 Ajax
  8. 廖雪峰说python_廖雪峰官网Python部分的疑问及解决
  9. 06链队列_LinkQueue--(栈与队列)
  10. Android  Doze and App Standby模式详解
  11. Linq to sql并发与事务
  12. 反卷积可视化工具--deconv-deep-vis-toolbox
  13. Firefox浏览Flash网页乱码的解决方案
  14. oracle服务怎么手动删除,彻底删除oracle服务
  15. 大型网站技术架构读书笔记01—大型网站架构演化史
  16. 数学符号——指示函数(样子像空心的1的一个数学符号)
  17. 微信网页版扫码登录原理
  18. Android日历控件
  19. 利用栈实现四则运算,带负数,带括号,带小数
  20. Linux/Centos安装oracle11超详细图文教程

热门文章

  1. 开源openstack
  2. php 强制用户 退出,Spring Security 强制退出指定用户的方法
  3. css 去掉i标签默认斜体样式
  4. select中文件描述符上限为什么是1024?
  5. IPC与TCP/IP应用及比较
  6. 【C语言】按位与、按位或、按位异或
  7. 豌豆荚应用市场上传时提示“抽取icon失败”解决方案
  8. #2020.02.05训练题解#最小生成树入门(F题)
  9. Zabbix监控深信服Sangfor设备
  10. 移动硬盘恢复数据多少钱?恢复几率有多大?