客快物流大数据项目(六十二):主题及指标开发
目录
主题及指标开发
一、主题开发业务流程
二、离线模块初始化
1、创建包结构
2、创建时间处理工具
3、定义主题宽表及指标结果表的表名
4、物流字典码表数据类型定义枚举类
5、封装公共接口
主题及指标开发
一、主题开发业务流程
二、离线模块初始化
1、创建包结构
本次项目采用scala编程语言,因此创建scala目录
包名 |
说明 |
cn.it.logistics.offline |
离线指标统计程序所在包 |
cn.it.logistics.offline.dwd |
离线指标dwd层程序所在包 |
cn.it.logistics.offline.dws |
离线指标dws层程序所在包 |
2、创建时间处理工具
实现步骤:
- 在公共模块的scala目录下的common程序包下创建DateHelper对象
- 实现获取当前日期
- 实现获取昨天日期
package cn.it.logistics.commonimport java.text.SimpleDateFormat
import java.util.Date/*** 时间处理工具类*/
object DateHelper {/*** 返回昨天的时间*/def getyesterday(format:String)={//当前时间减去一天(昨天时间)new SimpleDateFormat(format).format(new Date(System.currentTimeMillis() - 1000 * 60 * 60 * 24))}/*** 返回今天的时间* @param format*/def gettoday(format:String) = {//获取指定格式的当前时间new SimpleDateFormat(format).format(new Date)}
}
3、定义主题宽表及指标结果表的表名
每个主题都需要拉宽操作将拉宽后的数据存储到kudu表中,同时指标计算的数据最终也需要落地到kudu表,因此提前将各个主题相关表名定义出来
实现步骤:
- 在公共模块的scala目录下的common程序包下创建OfflineTableDefine单例对象
- 定义各个主题相关的表名
参考代码:
package cn.it.logistics.common/*** 自定义离线计算结果表*/
object OfflineTableDefine {//快递单明细表val expressBillDetail = "tbl_express_bill_detail"//快递单指标结果表val expressBillSummary = "tbl_express_bill_summary"//运单明细表val wayBillDetail = "tbl_waybill_detail"//运单指标结果表val wayBillSummary = "tbl_waybill_summary"//仓库明细表val wareHouseDetail = "tbl_warehouse_detail"//仓库指标结果表val wareHouseSummary = "tbl_warehouse_summary"//网点车辆明细表val dotTransportToolDetail = "tbl_dot_transport_tool_detail"//仓库车辆明细表val warehouseTransportToolDetail = "tbl_warehouse_transport_tool_detail"//网点车辆指标结果表val ttDotSummary = "tbl_dot_transport_tool_summary"//仓库车辆指标结果表val ttWsSummary = "tbl_warehouse_transport_tool_summary"//客户明细表数据val customerDetail = "tbl_customer_detail"//客户指标结果表数据val customerSummery = "tbl_customer_summary"
}
4、物流字典码表数据类型定义枚举类
为了后续使用方便且易于维护,根据物流字典表的数据类型定义成枚举工具类,物流字典表的数据如下:
来自:tbl_codes表
name |
type |
注册渠道 |
1 |
揽件状态 |
2 |
派件状态 |
3 |
快递员状态 |
4 |
地址类型 |
5 |
网点状态 |
6 |
员工状态 |
7 |
是否保价 |
8 |
运输工具类型 |
9 |
运输工具状态 |
10 |
仓库类型 |
11 |
是否租赁 |
12 |
货架状态 |
13 |
回执单状态 |
14 |
出入库类型 |
15 |
客户类型 |
16 |
下单终端类型 |
17 |
下单渠道类型 |
18 |
实现步骤:
- 在公共模块的scala目录下的common程序包下创建CodeTypeMapping对象
- 根据物流字典表数据类型定义属性
实现过程:
- 在公共模块的scala目录下的common程序包下创建CodeTypeMapping对象
- 根据物流字典表数据类型定义属性
package cn.it.logistics.common/*** 定义物流字典编码类型映射工具类*/
class CodeTypeMapping {//注册渠道val RegisterChannel = 1//揽件状态val CollectStatus = 2//派件状态val DispatchStatus = 3//快递员状态val CourierStatus = 4//地址类型val AddressType = 5//网点状态val DotStatus = 6//员工状态val StaffStatus = 7//是否保价val IsInsured = 8//运输工具类型val TransportType = 9//运输工具状态val TransportStatus = 10//仓库类型val WareHouseType = 11//是否租赁val IsRent = 12//货架状态val GoodsShelvesStatue = 13//回执单状态val ReceiptStatus = 14//出入库类型val WarehousingType = 15//客户类型val CustomType = 16//下单终端类型val OrderTerminalType = 17//下单渠道类型val OrderChannelType = 18
}
object CodeTypeMapping extends CodeTypeMapping{
}
5、封装公共接口
根据分析:主题开发数据的来源都是来自于kudu数据库,将数据进行拉宽或者将计算好的指标最终需要写入到kudu表中,因此根据以上流程抽象出来公共接口
实现步骤:
- 在offline目录下创建OfflineApp单例对象
- 定义数据的读取方法:getKuduSource
- 定义数据的处理方法:execute
- 定义数据的存储方法:save
参考代码:
package cn.it.logistics.offlineimport cn.it.logistics.common.{Configuration, DateHelper, Tools}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, date_format}/*** 根据不同的主题开发定义抽象方法* 1)数据读取* 2)数据处理* 3)数据保存*/
trait OfflineApp {/*** 读取kudu表的数据* @param sparkSession* @param tableName* @param isLoadFullData*/def getKuduSource(sparkSession: SparkSession, tableName:String, isLoadFullData:Boolean = false)= {if (isLoadFullData) {//加载全部的数据sparkSession.read.format(Configuration.SPARK_KUDU_FORMAT).options(Map("kudu.master" -> Configuration.kuduRpcAddress,"kudu.table" -> tableName,"kudu.socketReadTimeoutMs"-> "60000")).load().toDF()} else {//加载增量数据sparkSession.read.format(Configuration.SPARK_KUDU_FORMAT).options(Map("kudu.master" -> Configuration.kuduRpcAddress,"kudu.table" -> tableName,"kudu.socketReadTimeoutMs"-> "60000")).load().where(date_format(col("cdt"), "yyyyMMdd") === DateHelper.getyesterday("yyyyMMdd")).toDF()}}/*** 数据处理* @param sparkSession*/def execute(sparkSession: SparkSession)/*** 数据存储* dwd及dws层的数据都是需要写入到kudu数据库中,写入逻辑相同* @param dataFrame* @param isAutoCreateTable*/def save(dataFrame:DataFrame, tableName:String, isAutoCreateTable:Boolean = true): Unit = {//允许自动创建表if (isAutoCreateTable) {Tools.autoCreateKuduTable(tableName, dataFrame)}//将数据写入到kudu中dataFrame.write.format(Configuration.SPARK_KUDU_FORMAT).options(Map("kudu.master" -> Configuration.kuduRpcAddress,"kudu.table" -> tableName)).mode(SaveMode.Append).save()}
}
-
客快物流大数据项目(六十二):主题及指标开发相关推荐
- 客快物流大数据项目(六十五):仓库主题
文章目录 仓库主题 一.背景介绍 二.指标明细
- 客快物流大数据项目(六十六):车辆主题
文章目录 车辆主题 一.背景介绍 二.指标明细
- 客快物流大数据项目(五十二):根据数据库表及字段创建公共模块
根据数据库表及字段创建公共模块 根据数据库的表及表结构创建Bean对象 一.在公共模块创建包结构 在公共模块的java目录下,创建如下程序包:
- 客快物流大数据项目(九十二):ClickHouse的MergeTree系列引擎介绍和MergeTree深入了解
文章目录 ClickHouse的MergeTree系列引擎介绍和MergeTree深入了解 一.MergeTree系列引擎介绍 二.MergeTree深入了解 1.创建MergeTree ...
- 客快物流大数据项目(四十六):Spark操作Kudu dataFrame操作kudu
Spark操作Kudu dataFrame操作kudu 一.DataFrameApi读取kudu表中的数据 虽然我们可以通过上面显示的KuduContext执行大量操作,但我们还可以直接从默认数据源本 ...
- 客快物流大数据项目(五十六): 编写SparkSession对象工具类
编写SparkSession对象工具类 后续业务开发过程中,每个子业务(kudu.es.clickhouse等等)都会创建SparkSession对象,以及初始化开发环境,因此将环境初始化操作封装成工 ...
- 客快物流大数据项目(五十):项目框架初始化
目录 项目框架初始化 一.搭建工程 二.导入依赖 1.父工程依赖
- 客快物流大数据项目(三十):软件下载后存放位置
软件下载后存放位置 一.虚拟机数据路径配置 创建文件夹路径 mkdir -p /export/softwares /export/services /export/datas /export/soft ...
- 客快物流大数据项目(六十一):将消费的kafka数据同步到Kudu中
目录 将消费的kafka数据同步到Kudu中 一.导入表名映射关系类
最新文章
- 2022-2028年中国碘矿行业竞争格局分析及市场需求前景报告
- 什css3新增的属性,CSS
- Android libcutils库中整数溢出导致的堆破坏漏洞的发现与利用
- mysql 两张表合并查询_mysql中的分区表和合并表详解(一个常见知识点)
- Linux之部署虚拟环境、安装系统
- system.gc会立即执行垃圾回收吗_JVM基础到实战03-垃圾回收概念
- python剪切文件如何恢复_用python实现的可以拷贝或剪切一个文件列表中的所有文件...
- jquery动态插入行,不用拼写html,简洁版
- ASP.NET母版页和内容页之间如何互相传值?
- 【1.0】忘记mysql 密码 如何修改之后
- Oracle使用exp导出命令备份数据库
- 计算机体系结构课后答案
- 【WPF】资源--《深入浅出WPF》by刘铁锰
- 2004世界五百强企业
- phalapi做登录检测_欢迎使用PhalApi!
- 【数学建模】多元回归分析模型(评价与决策)
- 项目经理如何才能控制项目进度
- 【重载和重写的区别】
- 别混淆数据争用(data race) 和竞态条件(race condition)
- AToken每日简讯 1.17 星期四
热门文章
- 客快物流大数据项目(六十五):仓库主题