简介

Prophet是facebook开源的时间序列预测工具,使用时间序列分解与机器学习拟合的方法进行建模预测
,关于prophet模型优点本文不再累述,网络上的文章也比较多了,各种可视化,参数的解释与demo演示,但是真正用到工业上大规模的可供学习的中文材料并不多。

本文打算使用PySpark进行多序列预测建模,会给出一个比较详细的脚本,供交流学习,重点在于使用hive数据/分布式,数据预处理,以及pandas_udf对多条序列进行循环执行。
tips:背景说明,在十万级别的sku序列上使用prophet预测每个序列未来七天的销售。


文章目录

  • 1.导入库和初始化设置
  • 2.数据预处理
  • 3.建模
  • 4.读取hive数据,调用spark进行prophet模型预测

1.导入库和初始化设置

Pandas Udf 构建在 Apache Arrow 之上,因此具有低开销,高性能的特点,udf对每条记录都会操作一次,数据在 JVM 和 Python 中传输,pandas_udf就是使用 Java 和 Scala 中定义 UDF,然后在 python 中调用。

#导入库
import datetime
from dateutil.relativedelta import relativedelta
from fbprophet import Prophet
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *#初始化
spark = SparkSession. \Builder(). \config("spark.sql.execution.arrow.enabled", "true"). \enableHiveSupport(). \getOrCreate()

其中初始化config:开启spark df与pandas df 相互转化的性能优化配置.


2.数据预处理


def sale_ds(df):df['ds'] = pd.to_datetime(df['ds'])df = df[['store_sku', 'ds', 'y']]# 控制长度,周期不用太长,关注最近的几个完整周期即可start_day = (df['ds'].max() -relativedelta(days=63)).strftime('%Y-%m-%d')df = df[df['ds'] >= start_day][['store_sku', 'ds', 'y']]# 筛选条件:1 序列长度大于等于14,且过去最少有七天的销售记录;# 条件1,保障模型有两个完整的周期数据;# 条件2,避免出现0,0,0,0,0,0,1,0,1这样数据稀疏的数据出现sale_set = df.groupby(['store_sku']).filter(lambda x: len(x) >= 14 and np.sum(x['y']) > 7)return sale_setdef replace_fill(data):"""先尝试使用上周的数据填补,再针对极端的数据进行cap,保障序列的完整和平滑性:param data:单个序列:param name: 序列名称,store_sku:return: 修复后的一条序列"""data['ds'] = pd.to_datetime(data['ds'], format='%Y-%m-%d')data['y'] = data['y'].astype(float)data.loc[data['y'] <= 0, 'y'] = np.NaNdata.loc[data['y'].isnull(), 'y'] = data['y'].shift(7).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-7).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-14).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].shift(14).values[0]data.loc[data['y'].isnull(), 'y'] = data['y'].interpolate(methon='nearest', order=3)low = data[data['y'] > 0]['y'].quantile(0.10)high = data[data['y'] > 0]['y'].quantile(0.90)data.loc[data['y'] < low, 'y'] = np.NaNdata.loc[data['y'] > high, 'y'] = np.NaNdata['y'] = data['y'].fillna(data['y'].mean())data['y'] = np.log1p(data['y'])return data

以上为数据预处理,具体内容见注释.

放入模型中的时间和y值名称必须是ds和y,首先控制数据的周期长度,如果预测天这种粒度的任务,则使用最近的4-6周即可。

因为是放入了长度不一的多个序列,为了让预测更加可靠,对序列的长度有一定的限定,比如,序列长度至少有14天,还要一个需要注意的问题是,如果出现0,0,0,0,0,0,1,0,1这样数据稀疏的数据的时候,prophet会报错,报错内容大致为,std太低,反推回去就是放入的数据类似于常量,模型无法拟合。

至于缺失值的填充,prophet可以设置y为nan,模型在拟合过程中也会自动填充一个预测值,因为我们预测的为sku销量,是具有星期这种周期性的,所以如果出现某一天的缺失,我们倾向于使用最近几周同期数据进行填充,没有优先使用均值或众数进行填充,是因为,均值和众数会掩盖序列的周期性,破坏整个序列的规律,为了进一步对数据进行平滑,对于异常值还进行了分位数盖帽,因为时序数据往往是偏态分布,所以我们对原始值做了取对数处理。

以上的数据预处理比较简单,其中多数可以使用hive进行操作,会更加高效,这里放出来的目的是演示一种思路以及python函数和最后的pandas_udf交互。


3.建模

def prophet_train(data):model = Prophet(daily_seasonality=False,yearly_seasonality=False,holidays=holiday_df,holidays_prior_scale=10)model.add_seasonality(name='weekly',period=7,fourier_order=3,prior_scale=0.10)model.fit(data)future = model.make_future_dataframe(periods=7, freq='d')forecast = model.predict(future)forecast['pro_pred'] = np.expm1(forecast['yhat'])forecast_df=forecast[['store_sku','ds','pro_pred']]# 对预测值修正forecast_df.loc[forecast_df['pro_pred'] < 0, 'pro_pred'] = 0low = (1 + 0.1) * data['y'].min()hight = min((1 + 0.05) * data['y'].max(), 10000)forecast_df.loc[forecast_df['pro_pred'] < low, 'pro_pred'] = lowforecast_df.loc[forecast_df['pro_pred'] > hight, 'pro_pred'] = hightreturn forecast_df

以上参数设置详见https://zhuanlan.zhihu.com/p/52330017

函数内部的holiday_df是假日数据,数据格式需要按照文档要求进行定义,改函数部分也会和整个代码一起放在github,如果序列中最近呈现出较大的下滑或者增长,那么预测值很容易得到负数或者非常大,这个时候我们依然需要对预测值进行修正,而非完全交给模型,当然你也可以在放入数据中设置上下限。

data['cap'] = 1000  #上限
data['floor'] = 6  #下限

该函数把前面的数据预处理函数和模型训练函数放在一个函数中,类似于主函数,目的是使用统一的输入和输出。

def prophet_main(data):true_time = pd.datetime.now().strftime('%Y-%m-%d')data.dropna(inplace=True)data['ds'] = pd.to_datetime(data['ds'])data = data[data['ds'] < true_time]data['ds'] = data['ds'].astype(str)data['ds'] = pd.to_datetime(data['ds'])# 异常值替换data = replace_fill(data)pro_back = prophet_train(data)return pro_back

4.读取hive数据,调用spark进行prophet模型预测

schema = StructType([StructField("store_sku", StringType()),StructField("ds", StringType()),StructField("pro_pred", DoubleType())
])@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def run_model(data):data['store_sku']=data['store_sku'].astype(str)df = prophet_main(data)uuid = data['store_sku'].iloc[0]df['store_sku']=uniddf['ds']=df['ds'].astype(str)df['pro_pred']=df['pro_pred'].astype(float)cols=['store_sku','ds','pro_pred']return df[cols]

假设我们希望输出的结果为三列,分别是store_sku,ds,pro_pred,则定义它们的数据类型,定义的数据类型和顺序要和放入的数据类型一致,然后通过@pandas_udf进行装饰,PandasUDFType有两种类型一种是Scalar(标量映射),另一种是Grouped Map(分组映射).我们显然是要使用分组映射,通过store_sku作为id进行分组,从而实现split-apply-combine

以上是纯python内容,下面展示通过hive数据库读取和运行python并把结果写入hive中。

data = spark.sql("""select concat(store_code,'_',goods_code) as store_sku,qty_fix as y,dsfrom scmtemp.redsku_store_sku_sale_fix_d""")
data.createOrReplaceTempView('data')
sale_predict = data.groupby(['store_sku']).apply(run_model)
sale_predict.createOrReplaceTempView('test_read_data')
# 保存到数据库
spark.sql(f"drop table if exists scmtemp.store_sku_sale_prophet")
spark.sql(f"create table scmtemp.store_sku_sale_prophet as select * from store_sku_predict_29 ")
print('完成预测')

当然也可以不用pandas_udf的形式进行
,在旧版spark中使用sc.parallelize()实现分组并行化
如:sc.parallelize(data,800).map(run_model).reduce(merge)

上文还有一个节假日数据没有给出来,限于篇幅有限,整个代码就放在github上了,如需要请自取。

基本交代清楚了,暂更于此。

完整代码[pyspark_prophet]

PySpark-prophet预测相关推荐

  1. 用Prophet预测USDCNY走势--------仿照forecasting-stock-perfomance-with-prophet对美元人民币走势进行预测

    时间序列预测一直是一项挺复杂且结果挺玄学的领域.一次偶然的机会,我在微信上看到一篇用Prophet对股票表现进行预测的文章,于是对这款不用提取特征的简易时序预测工具产生了好奇,想着仿照该文章思路对Pr ...

  2. 采用 facebook 的prophet 预测科大讯飞的股票开盘价

    在运行代码前需要安装fbprophet和tushare,然后拷贝代码可直接运行,个人认为facebook的这个开源不怎么样,回测的效果不行.不如lstm甚至简单的多元回归. #!/usr/bin/en ...

  3. linux python prophet,在python中使用Prophet预测每个类别的值

    我对用Python和Prophet做时间序列非常陌生.我有一个带有变量的数据集文章代码,日期和销售数量.我试图用python中的Prophet来预测每个月每件商品的销售量.dataset"/ ...

  4. 独家 | 手把手教你用Python的Prophet库进行时间序列预测

    作者:Jason Brownlee 翻译:殷之涵 校对:吴振东 本文长度为4800字,建议阅读10+分钟 本文为大家介绍了如何在Python中使用由Facebook开发的Prophet库进行自动化的时 ...

  5. 手把手教你用Prophet快速进行时间序列预测(附Prophet和R代码)

    作者:ANKIT CHOUDHARY 翻译:王雨桐 校对:丁楠雅 本文约3000字,建议阅读12分钟. 本文将通过拆解Prophet的原理及代码实例来讲解如何运用Prophet进行时间序列预测. 简介 ...

  6. porphet论文_Facebook 时间序列预测算法 Prophet 的研究

    Prophet 简介 Facebook 去年开源了一个时间序列预测的算法,叫做 fbprophet,它的官方网址与基本介绍来自于以下几个网站: 从官网的介绍来看,Facebook 所提供的 proph ...

  7. 基于Facebook开发的Prophet项目预测铁路货运量(实例)

    好久没有写东西了,今天写个之前做过的预测工作练手. 前情提要:Prophet项目是Facebook开发的一个时间序列的预测算法.算法使用起来简单,且对有明显内在规律的商业行为数据很有效, 因此常常被用 ...

  8. python 时间序列prophet 模型分析_手把手教你用Prophet快速进行时间序列预测(附Prophet和R代码)...

    原标题:手把手教你用Prophet快速进行时间序列预测(附Prophet和R代码) 作者:ANKIT CHOUDHARY:翻译:王雨桐:校对:丁楠雅: 本文约3000字,建议阅读12分钟. 本文将通过 ...

  9. python 时间序列prophet 模型分析_如何评价facebook开源的prophet时间序列预测工具?...

    近期整理了一下 Facebook 的 Prophet,个人感觉这是一个非常不错的时间序列预测工具. Prophet 简介 Facebook 去年开源了一个时间序列预测的算法,叫做 fbprophet, ...

最新文章

  1. Format specifies type 'id' but the argument has type 'NSError *__autoreleasing *
  2. 输入3个数a,b,c,要求按由小到大的顺序输出
  3. python不小心用关键字做了变量名,怎么改回来
  4. 安卓手机Charles抓包显示unknown原因及解决办法
  5. 彻底掌握Linux文件与目录管理命令?进来看看硬核总结
  6. 用Location对象和history对象修改页面url
  7. java 泛型机制_java中的泛型机制
  8. php flash chart,openflashchart 2.0 简单案例php版
  9. mysql group by 别名_[转]为什么group by后面不能使用别名(除MySQL)
  10. Win8下右键“发送到”没有蓝牙选项的解决办法
  11. LeetCode 1026. 节点与其祖先之间的最大差值(二叉树DFS)
  12. ds图—最小生成树_Python实现最小生成树
  13. linux nginx支持socket,nginx配置websocket转发功能
  14. Ruby on Rails快速创新性能的终极指南
  15. QQ的clientkey与淘宝旺旺Token 不同平台环境下的登录认证
  16. 如何备考软考高级系统架构师?
  17. 500以内降噪蓝牙耳机推荐,2023年热门降噪蓝牙耳机推荐
  18. Python中利用FFT(快速傅里叶变换)进行频谱分析
  19. 常用的学生、课程、成绩、教师表的查询
  20. 计算机管理档案有什么好处,档案管理系统有什么特点?与传统手法相比,这几点真的很好用...

热门文章

  1. 缺少IT人员的服装行业该如何进行数字化转型?
  2. Win7SP1官方原版系统安装VMtools出现无法自动安装驱动程序
  3. 有线网卡与无线网卡有什么不同之处?
  4. Android:Ethernet:实现RJ45有线网和USB host无线网卡的静态IP(StaticIpConfiguration)设置上网(附源码)
  5. 树莓派官方系统连接电脑(电视)显示器无信号输出的解决方法
  6. [EDA]8位双向移位寄存器的设计
  7. 数据结构—无向图创建邻接矩阵、深度优先遍历和广度优先遍历(C语言版)
  8. 狗狗被车压了腿如何急救处理?
  9. redis客户端连接功能详解
  10. 多元宇宙算法(MVO)