大数据时代的Serverless工作负载预测赛后总结
大数据时代的Serverless工作负载预测赛后总结
- 一、赛题介绍
- 二、思路
- 数据探索与预处理
- 特征工程
- 异常值处理(未实现)
- 异常检测(未实现)
- 模型构建
- 预测
- 模型融合
- 结果
一、赛题介绍
背景:云计算时代,Serverless软件架构可根据业务工作负载进行弹性资源调整,这种方式可以有效减少资源在空闲期的浪费以及在繁忙期的业务过载,同时给用户带来极致的性价比服务。在弹性资源调度的背后,对工作负载的预测是一个重要环节。如何快速感知业务的坡峰波谷,是一个实用的Serverless服务应该考虑的问题。
任务:传统的资源控制系统以阈值为决策依据,只关注当前监控点的取值,缺少对历史数据以及工作负载趋势的把控,不能提前做好资源的调整,具有很长的滞后性。近年来,随着企业不断上云,云环境的工作负载预测成为一个经典且极具挑战的难题。
1.1 数据说明
1. 数据背景
本次赛题数据来自华为云数据湖探索(Data Lake Insight,简称DLI),它是一个Serverless的弹性大 数据分析服务。对于用户来说,提交SQL/Spark/Flink 作业需要购买队列(Queue),并将作业指定到购买的队列中执行。队列(Queue)的概念可以认为是资源的容器,它在作业真实执行时是一个计算集群,队列存在不同的规格,单位是CU(计算单元Compute Unit),1CU等于1核4GB,即16CU的队列代表着总资源16核64GB的计算集群。数据每5分钟会进行一次采集,赛题假设集群内节点间的任务调度平均,数据中的CPU_USAGE是集群中各节点平均值。更多详情可访问
2. 训练集
选取了43个队列的性能采集数据作为训练数据,每个队列之间相互独立。
3. 测试集
对于每行测试数据,赛题会给定该队列在某时段的性能监控数据(比如9: 35– 10:00),希望参赛者可以预测该点之后的未来五个点的指标(10:00 – 10:25),详情可参看提交示例。
注意:不可使用测试数据的结果作为训练数据!
4. 字段说明:
训练集
字段 | 类型 | 说明 |
---|---|---|
QUEUE_ID | INT | 队列标识,每个ID代表一个唯一的队列 |
CU | INT | 队列规格,不同规格的资源大小不一样。1CU为1核4GB |
STATUS | STRING | 队列状态,当前队列的状态是否可用 |
QUEUE_TYPE | STRING | 队列类型,不同类型适用于不同的任务,常见的有通用队列(general)和SQL队列 |
PLATFORM | STRING | 队列平台,创建队列的机器平台 |
CPU_USAGE | INT | CPU使用率,集群中各机器节点的CPU平均使用率 |
MEM_USAGE | INT | 内存使用率,集群中各机器节点的内存平均使用率 |
LAUNCHING_JOB_NUMS | INT | 提交中的作业数,即正在等待执行的作业 |
RUNNING_JOB_NUMS | INT | 运行中的作业数 |
SUCCEED_JOB_NUMS | INT | 已完成的作业数 |
CANCELLED_JOB_NUMS | INT | 已取消的作业数 |
FAILED_JOB_NUMS | INT | 已失败的作业数 |
DOTTING_TIME | BIGINT | 采集时间,每5分钟进行一次采集 |
RESOURCE_TYPE | STRING | 资源类型,创建队列的机器类型 |
DISK_USAGE | INT | 磁盘使用 |
1.2 评测标准
本赛题在线评分采用绝对误差作为评估指标,首先单独计算每个测试用例的误差值:
上式中cpuUsage会被换算成百分比进行计算,比如预测值为89,计算时是0.89;当launching的最大值为0时,表示预测和实际提交的JOB数均为0,该项结果为0。之后会计算单个测试点的总误差:
二、思路
2.1 导入相关包
import os
import numpy as np
import pandas as pd
from tqdm import tqdm
from sklearn.model_selection import KFold
import matplotlib.pyplot as pltimport lightgbm as lgbpd.options.display.max_columns = None # 展示所有列
2.2 加载数据
# 加载原始数据
train = pd.read_csv('train.csv')
test = pd.read_csv('evaluation_public.csv')
submit = pd.read_csv('submit_example.csv')# 简单排序
train = train.sort_values(by=['QUEUE_ID', 'DOTTING_TIME']).reset_index(drop=True)
test = test.sort_values(by=['ID', 'DOTTING_TIME']).reset_index(drop=True)
数据探索与预处理
ID与QUEUE_ID
##test同个ID均为5条
# sum(test.ID.value_counts()!=5) # ##对于test中的每个qid,train中均存在
# for qid in test.QUEUE_ID.unique():
# if qid not in train.QUEUE_ID.unique():
# print(qid)##删除test中不存在的qid
df = train.copy()
train = pd.DataFrame()
for qid in test.QUEUE_ID.unique():if len(train)==0:train = df[df.QUEUE_ID==qid]else:train = pd.concat([train,df[df.QUEUE_ID==qid]],axis=0)
train.index = range(len(train))
STATUS,QUEUE_TYPE,PLATFORM,RESOURCE_TYPE
###对STATUS列进行数据对比##STATUS列在test中只有单一值.
print(train.STATUS.unique(),test.STATUS.unique())##有两个train_qid的STATUS不唯一。
for qid in test.QUEUE_ID.unique():if train[train.QUEUE_ID==qid].STATUS.unique().all()!= 'available':print(qid,train[train.QUEUE_ID==qid].STATUS.unique())print(train[train.QUEUE_ID==qid].STATUS.value_counts())look = train[train.QUEUE_ID==qid]look.index = range(len(look))print(look[look.STATUS!='available'].index,'\n')##考虑直接将81221的前23条数据删去,将83609的前158条数据删去,并将该列删去
###对QUEUE_TYPE列进行数据对比##QUEUE_TYPE列在test中只有sql和general.而train还包括spark
print(train.QUEUE_TYPE.unique(),test.QUEUE_TYPE.unique())##同个qid的QUEUE_TYPE相同。且对于同个qid,为唯一值。
for qid in test.QUEUE_ID.unique():if train[train.QUEUE_ID==qid].QUEUE_TYPE.unique()!= test[test.QUEUE_ID==qid].QUEUE_TYPE.unique():print(qid)
##直接将QUEUE_TYPE删去。
###对PLATFORM列进行数据对比##PLATFORM列在test中只有x86.而train还包括aarch64
print(train.PLATFORM.unique(),test.PLATFORM.unique())##同个qid的PLATFORM相同且为唯一值
for qid in test.QUEUE_ID.unique():if train[train.QUEUE_ID==qid].PLATFORM.unique()!= test[test.QUEUE_ID==qid].PLATFORM.unique():print(qid)
##直接将PLATFORM列删去
###对RESOURCE_TYPE列进行数据对比##RESOURCE_TYPE列在test中只有vm.而train还包括container和缺失值
print(train.RESOURCE_TYPE.unique(),test.RESOURCE_TYPE.unique(),'\n')##同个qid的RESOURCE_TYPE相同。
for qid in test.QUEUE_ID.unique():if train[train.QUEUE_ID==qid].RESOURCE_TYPE.unique().all()!= 'vm':print(qid)for qid in test.QUEUE_ID.unique():if qid in train[train.RESOURCE_TYPE.isna()].QUEUE_ID.unique():print(qid,' ',train[train.RESOURCE_TYPE.isna()].QUEUE_ID.value_counts()[qid],' ',train[train.QUEUE_ID==qid].RESOURCE_TYPE.unique(),' ',len(train[train.QUEUE_ID==qid]),' ',train[train.RESOURCE_TYPE.isna()].QUEUE_ID.value_counts()[qid]/len(train[train.QUEUE_ID==qid]))####直接将RESOURCE_TYPE列删去
###删除STATUS为其他状态的
qid_81221 = train[train.QUEUE_ID==81221]
index = qid_81221.head(23).index.tolist()
train.drop(index=index,inplace=True)
train.index = range(len(train))qid_83609 = train[train.QUEUE_ID==83609]
index = qid_83609.head(158).index.tolist()
train.drop(index=index,inplace=True)
train.index = range(len(train))
#(newcell)复制一个dotting_time入train方便后面做时间填充timestamp=pd.DataFrame()timestamp=train['DOTTING_TIME']train=pd.concat([train,timestamp],axis=1)```
```python
train.columns=['QUEUE_ID', 'CU', 'STATUS', 'QUEUE_TYPE', 'PLATFORM', 'CPU_USAGE','MEM_USAGE', 'LAUNCHING_JOB_NUMS', 'RUNNING_JOB_NUMS','SUCCEED_JOB_NUMS', 'CANCELLED_JOB_NUMS', 'FAILED_JOB_NUMS','DOTTING_TIME', 'RESOURCE_TYPE', 'DISK_USAGE', 'Add_timestamp']
特征工程
def time_process(df): #时间差处理df['DOTTING_TIME'] = pd.to_datetime(df['DOTTING_TIME'], unit='ms') #转换成时间戳#df=df.sort_values(['QUEUE_ID','DOTTING_TIME']) #按照qid与采集时间排序#df.index = range(len(df))diff_1_ = [] #与下一个时间节点的时间差diff_2_ = [] #与下一个的下一个的时间节点的时间差diff_3_ = [] #同理。。。diff_4_ = []diff_5_ = []for qid in tqdm(df.QUEUE_ID.unique()):df_ = df[df['QUEUE_ID']==qid]diff_1 = df_.DOTTING_TIME.diff(1).shift(-1).apply(lambda x: x.seconds) #转换成秒数,并除以10diff_1_.extend(diff_1/10)diff_2 = df_.DOTTING_TIME.diff(2).shift(-2).apply(lambda x: x.seconds)diff_2_.extend(diff_2/10)diff_3 = df_.DOTTING_TIME.diff(3).shift(-3).apply(lambda x: x.seconds)diff_3_.extend(diff_3/10)return diff_1_,diff_2_TIME_diff_1,TIME_diff_2= time_process(train)
train['TIME_diff_1'] = TIME_diff_1
train['TIME_diff_2'] = TIME_diff_2
test['TIME_diff_1'] = 30
test['TIME_diff_2'] = 60
##删除无用列
del train['STATUS']
del train['PLATFORM']
del train['RESOURCE_TYPE']
del train['QUEUE_TYPE']del test['STATUS']
del test['PLATFORM']
del test['RESOURCE_TYPE']
del test['QUEUE_TYPE']
DISK_USAGE
#对于disk中的缺失值,均为当前qid的前n行
for qid in list(train[train.isna().any(axis=1)].QUEUE_ID.unique()):print(qid)a = train[train.QUEUE_ID==qid]print(len(a[a.isna().any(axis=1)]),' ',len(a),' ',len(a[a.isna().any(axis=1)])/len(a),'\n')print(qid,a.head(1).index.tolist(),'\n',a[a.isna().any(axis=1)].index.tolist(),'\n')
###对于DISK为nan值的数据,其CPU_USAGE均为0,判断其为未开机或采集错误
for qid in list(train[train.isna().any(axis=1)].QUEUE_ID.unique()):a = train[train.QUEUE_ID==qid]print(qid)print(a[a.isna().any(axis=1)].CPU_USAGE.value_counts(),'\n')
###对含有DISK的nan值的数据应采用删除处理
##删除DISK中具有nan值的行
train.dropna(inplace=True)
train.index = range(len(train))
DOTTING_TIME
时间重复数据
###train中同一qid内有重复时间数据,test中没有
for qid in train.QUEUE_ID.unique():repetition = train[train.QUEUE_ID==qid].DOTTING_TIME.value_counts()print(qid)print(repetition[repetition>1].index.tolist(),'\n')
##删除同个qid下时间重复的
df = train.copy()
train = pd.DataFrame()
for qid in df.QUEUE_ID.unique():temp = df[df.QUEUE_ID==qid]temp.drop_duplicates(subset='DOTTING_TIME',keep='first',inplace=True) ##保留第一个出现的if len(train)==0:train = tempelse:train = pd.concat([train,temp])
train.index = range(len(train))
时间间隔
##时间间隔画图。
time_diff = train[['QUEUE_ID']]
time_diff['time_diff'] = pd.to_datetime(train['DOTTING_TIME'], unit='ms')
time_diff_ = pd.DataFrame()
for qid in time_diff.QUEUE_ID.unique():a = time_diff[time_diff.QUEUE_ID==qid]a.time_diff = a.time_diff.diff(1).apply(lambda x: x.seconds/60) ##一阶差分,取分钟数if len(time_diff_)==0:time_diff_=aelse:time_diff_ = pd.concat([time_diff_,a])
time_diff_ = time_diff_.fillna(5) #做差分之后的nan值采用标准的5进行填补for qid in time_diff_.QUEUE_ID.unique():plt.figure(figsize=[30,6])plt.title(qid,fontsize=24)plt.scatter(range(len(time_diff_[time_diff_.QUEUE_ID==qid])),time_diff_[time_diff_.QUEUE_ID==qid].time_diff)plt.show()##部分的时间间隔到几十甚至几百,考虑截断删除
for qid in time_diff_.QUEUE_ID.unique():a = time_diff_[time_diff_.QUEUE_ID==qid]print(qid,a.head(1).index.tolist(),a.tail(1).index.tolist(),len(a[a.time_diff>8].index.tolist()))# print(len(a[a.time_diff<3].index.tolist()))# print(a[a.time_diff<3].index.tolist())# print(a[a.time_diff>8].index.tolist(),'\n')'''
对于时间间隔<3的,数量很少,忽略不计对于时间间隔>8的,若在开始有连续数据,可直接做截断删除处理。(可细化)
部分混杂在中间的,可做填充。(未实现)如果需要用到时间差特征,可尝试做上限值截断处理,避免异常值(未实现)'''
'''
这里可以再细化一下 (done)
'''##删除qid为297的前271个数据
train.drop(index=range(271),inplace=True)train.drop(index=time_diff_[3564:3605].index.tolist(),inplace=True)
train.drop(index=time_diff_[2408:2430].index.tolist(),inplace=True)##删除qid为297的前1000个数据
index_ = train[train.QUEUE_ID==21487].head(1000).index.tolist()
train.drop(index=index,inplace=True)#qid 21671 200330:201504
train.drop(index=time_diff_[200330:201504].index.tolist(),inplace=True)#qid 21673 243633:243702
train.drop(index=time_diff_[243633:243702].index.tolist(),inplace=True)# qid 298 263354:263537
train.drop(index=time_diff_[263354:263537].index.tolist(),inplace=True)
train.index = range(len(train))train.index = range(len(train))
时间间隔填充
time_diff = train[['QUEUE_ID']]
time_diff['time_diff'] = pd.to_datetime(train['DOTTING_TIME'], unit='ms')
time_diff_ = pd.DataFrame()
for qid in time_diff.QUEUE_ID.unique():a = time_diff[time_diff.QUEUE_ID==qid]a.time_diff = a.time_diff.diff(1).apply(lambda x: x.seconds/60) ##一阶差分,取分钟数if len(time_diff_)==0:time_diff_=aelse:time_diff_ = pd.concat([time_diff_,a])
time_diff_ = time_diff_.fillna(5) #做差分之后的nan值采用标准的5进行填补
listfix=[] #要在前面填充的序号
for qid in time_diff_.QUEUE_ID.unique():a = time_diff_[time_diff_.QUEUE_ID==qid]listfix.extend(a[a.time_diff>8].index.tolist())
# print("qid=",qid,a.head(1).index.tolist(),a.tail(1).index.tolist(),">8:",len(a[a.time_diff>8].index.tolist()))
# print(a[a.time_diff>8].index.tolist())
# print("<3:",len(a[a.time_diff<3].index.tolist()))
# print(a[a.time_diff<3].index.tolist(),'\n')
#建立新的dataframe 复制时间间隔大于8的部分进入
newdata=[]
a=time_diff_.values
b=train.values
for i in listfix:j=int(a[i][-1]/5)for k in range(j):b[i][-3]=b[i][-3]-5*60000newdata.append(b[i])# b=pd.to_datetime(a[0][-1]-5*60000, unit='ms')
# newdata.append(b[1])
# newdata# for i in int(a[795][1]/5):
# b[795][1]-5
# newdata.append(b[795])# a[795][1]/5 判断制造多少行
# for i in listfix:#新的dataframe与train合并,sort qid,dotting_time(这玩意能排序)#重置index
newdata=pd.DataFrame(newdata)
newdata.columns=['QUEUE_ID', 'CU', 'CPU_USAGE', 'MEM_USAGE', 'LAUNCHING_JOB_NUMS','RUNNING_JOB_NUMS', 'SUCCEED_JOB_NUMS', 'CANCELLED_JOB_NUMS','FAILED_JOB_NUMS', 'DOTTING_TIME', 'DISK_USAGE', 'Add_timestamp','TIME_diff_1', 'TIME_diff_2']
train=pd.concat([train,newdata],axis=0)del train['DOTTING_TIME']train.columns=['QUEUE_ID', 'CU', 'CPU_USAGE', 'MEM_USAGE', 'LAUNCHING_JOB_NUMS','RUNNING_JOB_NUMS', 'SUCCEED_JOB_NUMS', 'CANCELLED_JOB_NUMS','FAILED_JOB_NUMS', 'DISK_USAGE', 'DOTTING_TIME', 'TIME_diff_1','TIME_diff_2']train = train.sort_values(by=['QUEUE_ID', 'DOTTING_TIME']).reset_index(drop=True)train['DOTTING_TIME']=pd.to_datetime(train['DOTTING_TIME'], unit='ms')
LAUNCHING_JOB_NUMS, RUNNING_JOB_NUMS,SUCCEED_JOB_NUMS,CANCELLED_JOB_NUMS,FAILED_JOB_NUMS
##重命名
for df in [train, test]:df.rename(columns={'LAUNCHING_JOB_NUMS': 'LJOB','RUNNING_JOB_NUMS': 'RJOB','SUCCEED_JOB_NUMS': 'SJOB','CANCELLED_JOB_NUMS': 'CJOB','FAILED_JOB_NUMS': 'FJOB'}, inplace=True)
train.describe()
异常值处理(未实现)
异常检测(未实现)
模型构建
这里我们使用了LigthGBM
这里我们选用LGBM而不是XGBoost有几个原因:XGBM有更快的训练效率,更低的内存使用,更高的准确率,并且支持并行化学习,支持直接使用category特征。
##test的每个ID的最后一条launching数据,作为结果。模型预测结果准确度太低
LAUNCHING_JOB_NUMS_last = []
for id in tqdm(test.ID.unique()):df_id = test[test['ID']==id].reset_index()LAUNCHING_JOB_NUMS_last.append(df_id.LJOB[4])
# 常量定义
NFOLDS = 5 # 交叉验证的折数
SEQ_LEN = 5 # 序列长度
WINDOW_SIZE = 2 * SEQ_LEN # 窗口长度
MODEL_N = 10 # 10个模型分别预测 CPU_USAGE_6...LAUNCHING_JOB_NUMS_10
##同一天内的小时数
for df in [train, test]:t = pd.to_datetime(df['DOTTING_TIME'], unit='ms')# 转成小时df['DOTTING_TIME'] = t.dt.hour + t.dt.minute / 60
#####这里的统计特征用了同一qid下的全数据,可以再挖掘一下。
used_features = ['CPU_USAGE', 'MEM_USAGE', 'DISK_USAGE', 'LJOB', 'RJOB']# 分组,只用训练集数据做统计
group_data = train.groupby(by=['QUEUE_ID'])[used_features]# 聚合函数
methods = {'AVG': 'mean','MEDIAN': 'median','MIN': 'min','MAX': 'max','STD': 'std',
}for m in methods: agg_data = group_data.agg(methods[m])###分组统计agg_data.fillna(method='ffill', inplace=True)###近邻填充agg_data.fillna(0, inplace=True)##0填充agg_data = agg_data.rename(lambda x: 'QUEUE_%s_%s' % (x, m), axis=1)agg_data = agg_data.reset_index()for df in [train, test]:merged_data = df[['QUEUE_ID']].merge(agg_data, how='left', on=['QUEUE_ID'])merged_data.drop(columns=['QUEUE_ID'], inplace=True)# 插入新的列for c in merged_data.columns:df[c] = 0# 赋值df.loc[:, list(merged_data.columns)] = merged_data.values
# 需要滑动的数值特征
num_features = ['CPU_USAGE', 'MEM_USAGE', 'DISK_USAGE','LJOB', 'RJOB', 'SJOB', 'CJOB', 'FJOB']# 需要预测的值
y_features = ['CPU_USAGE', 'LJOB']
# 生成测试集时间窗数据
for i in range(SEQ_LEN):for sf in num_features:new_f = '%s_%d' % (sf, i+1)test[new_f] = test[sf].shift(-i)# 删除原来的列
test.drop(columns=num_features, inplace=True)# 只取每个ID的第一条数据
test = test.groupby(by='ID', as_index=False).first()
# 生成训练集时间窗数据
temp = pd.DataFrame()
qids = sorted(train['QUEUE_ID'].unique())for qid in tqdm(qids): # 按QUEUE_ID进行处理queue = train[train['QUEUE_ID'] == qid].copy(deep=True)# 生成时间窗数据for i in range(SEQ_LEN):for sf in num_features:new_f = '%s_%d' % (sf, i+1)queue[new_f] = queue[sf].shift(-i)# 处理需要预测的值for i in range(SEQ_LEN):for y in y_features:new_y = '%s_%d' % (y, i+SEQ_LEN+1)queue[new_y] = queue[y].shift(-i-SEQ_LEN)# 删除原来的列queue.drop(columns=num_features, inplace=True)# 对于每个QUEUE_ID,丢弃最后10条有NAN值的数据queue = queue.head(queue.shape[0]-WINDOW_SIZE)temp = temp.append(queue)# 重设索引
train = temp.reset_index(drop=True)
cpu_usages = []
mem_usages = []
disk_usages = []
ljobs = []
rjobs = []for i in range(SEQ_LEN):postfix = '_%d' % (i + 1)cpu_usages.append('CPU_USAGE'+postfix)mem_usages.append('MEM_USAGE'+postfix)disk_usages.append('DISK_USAGE'+postfix)ljobs.append('LJOB'+postfix)rjobs.append('RJOB'+postfix)
for df in [train, test]:df['USED_CPU'] = df['CU'] * df['CPU_USAGE_5'] / 100df['USED_MEM'] = 4 * df['CU'] * df['MEM_USAGE_5'] / 100df['TO_RUN_JOBS'] = df['LJOB_5'] - df['RJOB_5']df.loc[df['TO_RUN_JOBS'] < 0, 'TO_RUN_JOBS'] = 0pairs = [('CPU', 'CPU_USAGE', cpu_usages),('MEM', 'MEM_USAGE', mem_usages),('DISK', 'DISK_USAGE', disk_usages),('LJOB', 'LJOB', ljobs),('RJOB', 'RJOB', rjobs),]for short_name, f, usages in pairs:df[short_name+'_AVG'] = df[usages].mean(axis=1)df[short_name+'_STD'] = df[usages].std(axis=1)df[short_name+'_DIFF'] = df['%s_5' % f] - df['%s_1' % f]
预测
Y_features = ['CPU_USAGE_6', 'LJOB_6','CPU_USAGE_7', 'LJOB_7','CPU_USAGE_8', 'LJOB_8','CPU_USAGE_9', 'LJOB_9','CPU_USAGE_10', 'LJOB_10'
]Y_train = train[['QUEUE_ID']+Y_features]
train.drop(columns=Y_features, inplace=True)
同模型,多种参数,得出得分相近的结果后做平均(未实现)\n不同模型,得分相近,做平均。\n同模型,不同特征组,做平均。
lgb_param = {'num_leaves': 41,'max_depth': 10,'learning_rate': 0.08,'n_estimators': 150,'subsample': 0.9,'feature_fraction': 0.8,'reg_alpha': 0.6,'reg_lambda': 1.2,'seed': 212 # 记得修改
}
for i in train.columns:print(i)
# 行内统计特征train['cpu_mean'] = train[[f'CPU_USAGE_{i}' for i in range(1,6)]].mean(axis=1)
train['cpu_std'] = train[[f'CPU_USAGE_{i}' for i in range(1,6)]].std(axis=1)
train['cpu_max'] = train[[f'CPU_USAGE_{i}' for i in range(1,6)]].max(axis=1)
train['mem_mean'] = train[[f'MEM_USAGE_{i}' for i in range(1,6)]].mean(axis=1)
train['mem_std'] = train[[f'MEM_USAGE_{i}' for i in range(1,6)]].std(axis=1)
train['mem_max'] = train[[f'MEM_USAGE_{i}' for i in range(1,6)]].max(axis=1)test['cpu_mean'] = test[[f'CPU_USAGE_{i}' for i in range(1,6)]].mean(axis=1)
test['cpu_std'] = test[[f'CPU_USAGE_{i}' for i in range(1,6)]].std(axis=1)
test['cpu_max'] = test[[f'CPU_USAGE_{i}' for i in range(1,6)]].max(axis=1)
test['mem_mean'] = test[[f'MEM_USAGE_{i}' for i in range(1,6)]].mean(axis=1)
test['mem_std'] = test[[f'MEM_USAGE_{i}' for i in range(1,6)]].std(axis=1)
test['mem_max'] = test[[f'MEM_USAGE_{i}' for i in range(1,6)]].max(axis=1)
赛题给的评估函数
def evaluate(Y_true, Y_preds):# shape: (n, 10)if not isinstance(Y_true, np.ndarray):Y_true = Y_true.to_numpy()if not isinstance(Y_preds, np.ndarray):Y_preds = Y_preds.to_numpy()dist = 0 # DIST_kfor i in range(MODEL_N//2):cpu_true, job_true = Y_true[:, i*2], Y_true[:, i*2+1] # shape: (n,)cpu_preds, job_preds = Y_preds[:, i*2], Y_preds[:, i*2+1] # shape: (n,)max_job = np.max((job_true, job_preds), axis=0)# 防止分母为0(当分母为0是,分子也为0,所以可以把分母0设为1)max_job[max_job == 0] = 1.0dist += 0.9 * np.abs((cpu_preds - cpu_true) / 100) + 0.1 * np.abs((job_true - job_true) / max_job)score = 1 - dist.mean()return score
###备用数据
train_copy = train.copy()
test_copy = test.copy()
Y_train_copy = Y_train.copy()
##训练执行有误时重新执行这一步恢复数据
# train = train_copy.copy()
# test = test_copy.copy()
# Y_train = Y_train_copy.copy()
train_ = train.copy()
test_ = test.copy()
Y_train_ = Y_train.copy()train_pre=np.zeros((train_.shape[0], MODEL_N+1)) ##与Y_train相同shape的df
train_pre = pd.DataFrame(train_pre,columns=submit.columns)
train_pre.iloc[:,0] = train_.QUEUE_ID ##第一列为qid,该df为train的交叉验证结果qid_score = [] #存放每个qid利用官方给的评价指标算出来的分数for qid in tqdm(train_.QUEUE_ID.unique()): #一个qid训练一次train = train_[train_.QUEUE_ID==qid]test = test_[test_.QUEUE_ID==qid]test_ID = test.ID ##用于将test的预测结果写回submittest_ID = (test_ID-1).tolist() #减一的原因是同一行数据中,index为i的其ID为i+1test = test.iloc[:,3:] #取有用数据部分Y_train = Y_train_[Y_train_.QUEUE_ID==qid]Y_train.drop('QUEUE_ID',axis=1,inplace=True)# 总迭代次数N = MODEL_N * NFOLDS ##model_n为10,指的是10个预测结果。cpu1-5,lau1-5; nfolds为交叉验证次数# 进度条pbar = tqdm(total=N, position=0, leave=True)oof = np.zeros((train.shape[0], MODEL_N)) #存放当前qid的train的预测结果,用于计算得分#oof[:,0] = train.index.tolist()train = train.iloc[:,2:]# 交叉验证kfold = KFold(n_splits=NFOLDS, shuffle=True, random_state=1333) ##r_s记得修改kf = kfold.split(train)for train_idx, validate_idx in kf:# 切割训练集&验证集X_train, y_train = train.iloc[train_idx, :], Y_train.iloc[train_idx, :]X_valid, y_valid = train.iloc[validate_idx, :], Y_train.iloc[validate_idx,:]train_index = X_valid.index.tolist() ##当前切分数据的index,用于将预测数据写回train_prefor i in range(MODEL_N):y = y_train.iloc[:, i] ##一列一次训练reg = lgb.LGBMRegressor(n_jobs=-1, **lgb_param)bst = reg.fit(X_train, y)# 验证集valid_pred = bst.predict(X_valid)valid_pred[valid_pred < 0] = 0 #下边界截断valid_pred[valid_pred > 100] = 100 #上边界截断valid_pred = valid_pred.astype(np.int)train_pre.iloc[train_index,i+1] = valid_pred #利用train_index将预测结果写回train_preoof[validate_idx, i] = valid_pred# 测试集test_pred = bst.predict(test)test_pred[test_pred < 0] = 0test_pred[test_pred > 100] = 100submit.iloc[test_ID, i+1] += test_pred / NFOLDS #利用test_ID将预测结果写回submit# 更新进度条pbar.update(1)oof = oof.astype(np.int)print(qid)print(evaluate(Y_train,oof)) ##计算当前qid的预测得分qid_score.append([qid,evaluate(Y_train,oof)])# 关闭进度条pbar.close()# 转为整型submit = submit.astype(np.int)train_pre = train_pre.astype(np.int)
# 计算验证集分数
oof_score = evaluate(Y_train_.iloc[:,1:], train_pre.iloc[:,1:])
print('oof score = %.6f' % oof_score) #0.895821
模型融合
同模型融合
多种参数得出相近的结果后取平均值参数
可能是测试次数较少,参数平均后效果没有提升。舍弃该方案。
另一个中同模型融合方案,使用不同特征组做平均,但未实现。
异模型融合:
原本的想法是把每次训练后效果都较差的几个ID单独拿出并训练用其他模型对这几个ID进行训练,使用了XGBoost进行训练,但分数并没有什么的提升所以舍弃方案。
结果
###launching使用每个ID最后一条
for col in [f'LAUNCHING_JOB_NUMS_{i}' for i in range(1,6)]:submit[col] = LAUNCHING_JOB_NUMS_lastsubmit.to_csv('submit.csv', index=False)
大数据时代的Serverless工作负载预测赛后总结相关推荐
- 《大数据时代》读书报告
未来已来 --<大数据时代>读书报告 课程:商务数据分析 学号: 姓名: 引言 维克托·迈尔·舍恩伯格在其<大数据时代>中的前言开宗明义:一场生活.工作与思维的大变革,大数据开 ...
- 万事皆显出自发偶然之态,但自有其规律-《爆发·大数据时代预见未来的新思维》笔记与心得
马克·吐温曾说过: 历史不会重演,却自有其韵律 虽然万事皆显出自发偶然之态,但实际上它远比你想象中容易预测. 在日常生活中,虽然我们可以针对某些事情自由做决定,但似乎人生的大部分时光还是 处于&quo ...
- 信号与噪声:大数据时代预测的科学与艺术 - 电子书下载(高清版PDF格式+EPUB格式)...
信号与噪声_大数据时代预测的科学与艺术-Nate Silver[美]纳特•西尔弗 在线阅读 百度网盘下载(mglp) 书名:信号与噪声:大数据时代预测的科学与艺术 ...
- 【2016年第4期】大数据时代的简约计算
张家琳,孙晓明 中国科学院计算技术研究所,北京 100190 摘要:大数据存储和分析的能力是未来创新型国家的核心战略能力.当前关于大数据的理论研究在共性问题提炼.方法论框架和实时数据算法理论上仍存在一 ...
- 人工智能与大数据时代-2020
20200524 2020 新基建.新动能5G车路协同白皮书 2020 能源石化交易行业区块链应用白皮书 2020中国智慧文旅5G应用白皮书 自动驾驶仿真技术研究报告 中国独角兽企业发展白皮书 &q ...
- 11.2.5 云计算、大数据时代
1.谷歌的架构变革 从2003年到2004年,谷歌(Google)陆续发表了关于GFS.MapReduce和BigTable的3篇 论文,基本上公开了谷歌内部用于处理搜索海量数据的平台架构.GFS是大 ...
- 大数据时代与多云时代:一个消亡,一个诞生
全文共3946字,预计学习时长8分钟 图片来源:unsplash.com/@ev 随着当下的重点从收集数据转向实时处理数据,大数据时代正走向消亡.如今大数据是种商业资产,为即将到来的多云支持.机器学习 ...
- 大数据时代已经到来,你了解吗?
一.大数据出现的背景 进入2012年,大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,并命名与之相关的技术发展与创新.它已经上过<纽约时报> ...
- 数据安全 | 大数据时代,如何有效预防数据泄露?
在当今的大数据时代,数据的产生.流通和应用更加普遍和密集,数字化的发展趋势,让数据发挥出了更大的价值.与此同时,各项技术应用背后的数据安全风险也日益凸显.近年来,频发的数据泄露事件,也引发外界对数据安 ...
最新文章
- PowerDesigner15官方正式版+注册补丁
- linux网络编程(四)线程池
- c语言圆周率计算_C语言入门这一篇就够了
- 23.C++- 继承的多种方式、显示调用父类构造函数、父子之间的同名函数、virtual虚函数...
- 尴尬!微软的 PowerShell 竟是 Linux 用户最多!| 极客头条
- 前端js使用java变量值_web前端:js中的变量
- 台电固态硬盘用什么测试软件,常规测试、实际使用测试与总结_固态硬盘评测-中关村在线...
- 用java编写英寸到厘米的转换_像素、英寸、厘米的换算 - flyinglife - JavaEye技术网站...
- NXP iMX8 ARM平台Distro Boot使用
- ubuntu 安装和删除字体
- Android手机ram大小,什么是手机RAM内存?手机RAM内存越大越好吗?
- IDE——jupyter的安装与卸载
- RSF 分布式服务框架设计
- 《剑指offer》刷题笔记(发散思维能力):求1+2+3+...+n
- 阿里云导出负载均衡SLB实例数据
- 服务器w8系统如何重装系统,如何重装Windows8.1 Win8.1系统重装流程图解
- 腾讯云、东华软件,和你的私人医生
- nginx 做端口转发
- 罗斯蒙特流量计安装对管道的条件
- 2016云计算大会PPT打包下载
热门文章
- Equalize Prices
- Actor模型与Akka
- 2021高考杭二中成绩查询,杭二中公布了首张高考成绩通报 北大清华51人
- 【刷机】小米降级刷机 线刷 图解 MIUI降级刷机
- EventBus Vuex?
- 华为云桌面,数字化时代便捷、安全的办公选择
- 北京移动推低价位流量卡 10元包70M
- EasyRecovery最新版本Photo16电脑数据恢复软件下载
- Xcode8使用出现bundleid: com.jd.***, enable_level: 0, persist_level: 0, propagate_with_acti
- 李德毅:未来交通——自动驾驶与智能网联