mmlspark订单量预测案例
本文是前文 Spark预测算法端到端案例
的姊妹篇,那一篇用的是普通的lightgbm做的订单量预测,后面随着门店的增加,运行效率不高。
本文主要采用了微软开源的 Microsoft Machine Learning for Apache Spark
https://github.com/Azure/mmlspark
具体用法在上文 Lightgbm在spark实战
介绍过,可参考学习。。。
1 门店分类
由于门店数据量巨大,所有门店数据在一起做模型训练,即使在Spark环境下也非常吃性能,所以这里先将门店进行聚类分类,然后在针对同一类型的门店一起训练。
根据 wfm_dw1.wfm_order_channel_half_hour
渠道半小时订单量数据 生成 7天每半小时特征数据 24*2*7
训练 数据:20201130--20210131
预测未来28天数据:20210201--20210228
tstart=“20201130” tend=“20210131”
原数据pdf
+-------------------+--------+---------+---------+
|global_store_number|sale_day|half_hour|order_qty|
+-------------------+--------+---------+---------+
| 28710|20201210| 20| 17.0|
| 28710|20201230| 19| 15.0|
| 28710|20201211| 13| 19.0|
| 28710|20201203| 31| 1.0|
| 28710|20201205| 11| 3.0|
| 28710|20210111| 19| 14.0|
| 28710|20210119| 16| 22.0|
| 28710|20210130| 23| 5.0|
| 28710|20210107| 16| 30.0|
| 28710|20210117| 23| 8.0|
| 28710|20210113| 10| 22.0|
| 28710|20210113| 26| 2.0|
| 28710|20210113| 12| 12.0|
| 28710|20201211| 26| 3.0|
| 28710|20201226| 11| 9.0|
| 28710|20201215| 21| 13.0|
| 28710|20201222| 19| 17.0|
| 28710|20201201| 12| 9.0|
| 28710|20201215| 31| 2.0|
| 28710|20201215| 23| 13.0|
+-------------------+--------+---------+---------+
缺失时段数据修补
def inter(df: pd.DataFrame) -> pd.DataFrame:
global tend
min_sale_day = df['sale_day'].min()
idx = pd.MultiIndex.from_product([pd.date_range(start=min_sale_day, end=tend, freq='D').strftime('%Y%m%d'), range(48)], names=('sale_day', 'half_hour'))
return df.set_index(['sale_day', 'half_hour']).reindex(idx)[['order_qty']].fillna(0).reset_index() # 原来数据(有些时段) 有缺失值 ,补齐,保证每天都有48个时段
pdf_inter = inter(pdf)
pdf_inter
sale_day half_hour order_qty
0 20201130 0 0.0
1 20201130 1 0.0
2 20201130 2 0.0
3 20201130 3 0.0
4 20201130 4 0.0
... ... ... ...
3019 20210131 43 0.0
3020 20210131 44 0.0
3021 20210131 45 0.0
3022 20210131 46 0.0
3023 20210131 47 0.0
生成wfm_dw1.wfm_order_clustering_feat
表
这里以 门店编号 28710举例
pdf_inter.loc[:, 'day_of_week'] = pd.to_datetime(pdf_inter['sale_day']).dt.weekday
pdf_recon = pdf_inter.groupby(['day_of_week', 'half_hour']).mean()[['order_qty']].transpose() # 这里去每周几的每个半小时时间段内的平均值
day_of_week_dict = {0: 'Mon', 1: 'Tue', 2: 'Wed', 3: 'Thu', 4: 'Fri', 5: 'Sat', 6: 'Sun'}
pdf_recon.columns, pdf_recon.index = list(map(lambda x: f'{day_of_week_dict[x[0]]}_{x[1]}', pdf_recon.columns)), [0]
pdf_recon = pd.DataFrame(
pdf_recon,
columns=[f'{x}_{y}' for x in ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] for y in range(48)]
).fillna(0)
adt = pdf_recon.sum(axis=1)
pdf_recon = pdf_recon.div(adt, axis=0)
pdf_recon.loc[:, 'adt'] = adt
pdf_recon.loc[:, 'global_store_number'] = '28710'
pdf_recon
# pdf_recon.shape[0] = 338 = 24*2*7 + 2(adt和global_store_number)
Mon_0 Mon_1 Mon_2 Mon_3 Mon_4 Mon_5 Mon_6 Mon_7 Mon_8 Mon_9 ... Sun_40 Sun_41 Sun_42 Sun_43 Sun_44 Sun_45 Sun_46 Sun_47 adt global_store_number
0 0.0 0.0 0.0 0.0 0.0 0.0 0.00033 0.006065 0.011274 0.012526 ... 0.0 0.0 0.0 0.0 0.0 0.0 0.0 0.0 1685.333333 28710
门店分类
def cluster_kmeans():
wfm_order_clustering_feat = spark.sql("select * from wfm_dw1.wfm_order_clustering_feat").toPandas()
wfm_order_clustering_feat.sort_values('adt', ascending=False, inplace=True) # 排序为了下面 聚类的adt 尽可能类似
wfm_order_clustering_feat.dropna(inplace=True) # (5085, 338) 去除没销量的门店
kmeans_dict = dict()
result_dict = dict()
for idx_1 in range(10):
# wfm_order_clustering_feat.iloc[idx_1*500:(idx_1+1)*500, :-2] 0,500,1000,1500,2000,... 每500行提取一下数据门店数据做聚类
kmeans = KMeans(n_clusters=10, random_state=0).fit(wfm_order_clustering_feat.iloc[idx_1*500:(idx_1+1)*500, :-2]) # 5000多家门店 根据 adt 分成10个聚类,
kmeans_dict[idx_1] = kmeans
for idx_2 in range(10):
result_dict[(idx_1, idx_2)] = wfm_order_clustering_feat.loc[wfm_order_clustering_feat.index[idx_1*500:(idx_1+1)*500][kmeans.labels_ == idx_2], 'global_store_number'].values
return result_dict
生成的分类结果:
result_dict
{(0,
0): array(['18482', '19265', '61039', '16212', '23566', '20564', '22051',
'29198', '8013', '47230', '23034', '51774', '53654', '51775',
'58989', '15907', '23994', '25146', '25145', '26871'], dtype=object),
(0,
1): array(['35813', '35008', '50162', '29262', '31904', '27148', '32135',
'58172', '22407', '26566', '31206', '31374', '48188', '47357',
'13179', '1692', '15312', '34004', '50243', '19567', '49632',
'56593', '22686', '19441', '15061', '1648', '49902', '28109',
'35806', '31185', '26727', '35005', '27226', '21577', '19370',
'50589', '51453', '25167', '22903', '53355', '27613', '16796',
'51642', '47397', '52559', '27473', '35808', '23971', '25216',
'15021', '16334'], dtype=object)
......
# 每个分类大小不一
[len(i) for i in result_dict.values()]
[20,
51,
80,
68
...
]
2 特征数据生成
数据依然选 wfm_dw1.wfm_order_channel_half_hour
处理方式不同,根据 ‘global_store_number’, ‘channel’分组后
缺失时段数据修补
def inter(df: pd.DataFrame) -> pd.DataFrame:
min_sale_day = df['sale_day'].min()
max_sale_day = (pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
idx = pd.MultiIndex.from_product([pd.date_range(start=min_sale_day, end=max_sale_day, freq='D').strftime('%Y%m%d'), range(48)], names=('sale_day', 'half_hour'))
return df.set_index(['sale_day', 'half_hour']).reindex(idx)[['order_qty']].fillna(0).reset_index()
pdf_inter = pdf.groupby(['global_store_number', 'channel']).apply(inter) # 对每一家门店的每一个渠道进行分组,然后处理
pdf_inter.index = pdf_inter.index.droplevel(-1) # 删除最后一个索引数据
pdf_inter.reset_index(inplace=True)
pdf_inter
global_store_number channel sale_day half_hour order_qty
0 28710 In-Store 20201130 0 0.0
1 28710 In-Store 20201130 1 0.0
2 28710 In-Store 20201130 2 0.0
3 28710 In-Store 20201130 3 0.0
4 28710 In-Store 20201130 4 0.0
... ... ... ... ... ...
13099 28710 MOP 20210228 43 0.0
13100 28710 MOP 20210228 44 0.0
13101 28710 MOP 20210228 45 0.0
13102 28710 MOP 20210228 46 0.0
13103 28710 MOP 20210228 47 0.0
添加特征
pdf_feat = pdf_inter.groupby(['global_store_number', 'channel']).apply(gen) # 对每一家门店的每一个渠道进行分组,然后处理
pdf_calendar
pdf_event 门店holiday和festival特征
_pdf_event = execute_sql("select * from arp_dw1.mdm_event_mgt").drop_duplicates(subset=['event_id', 'effective_from', 'effective_to'])
pdf_event = pd.DataFrame(
0,
index=pd.Index(
pd.date_range(
fstart,
(pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
),
name='date_id'
),
columns=['holiday', 'festival']
)
# pd.to_datetime(fend) + pd.DateOffset(days=28)
# Timestamp('2021-02-28 00:00:00')
# 训练数据2020-11-30--2021-01-31 以及预测数据2021-02-01--2021-02-28
# pd.date_range(
# fstart,
# (pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
# )
# DatetimeIndex(['2020-11-30', '2020-12-01', '2020-12-02', '2020-12-03',
# '2020-12-04', '2020-12-05', '2020-12-06', '2020-12-07',
# '2020-12-08', '2020-12-09', '2020-12-10', '2020-12-11',
# '2020-12-12', '2020-12-13', '2020-12-14', '2020-12-15',
# '2020-12-16', '2020-12-17', '2020-12-18', '2020-12-19',
# '2020-12-20', '2020-12-21', '2020-12-22', '2020-12-23',
# '2020-12-24', '2020-12-25', '2020-12-26', '2020-12-27',
# '2020-12-28', '2020-12-29', '2020-12-30', '2020-12-31',
# '2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04',
# '2021-01-05', '2021-01-06', '2021-01-07', '2021-01-08',
# '2021-01-09', '2021-01-10', '2021-01-11', '2021-01-12',
# '2021-01-13', '2021-01-14', '2021-01-15', '2021-01-16',
# '2021-01-17', '2021-01-18', '2021-01-19', '2021-01-20',
# '2021-01-21', '2021-01-22', '2021-01-23', '2021-01-24',
# '2021-01-25', '2021-01-26', '2021-01-27', '2021-01-28',
# '2021-01-29', '2021-01-30', '2021-01-31', '2021-02-01',
# '2021-02-02', '2021-02-03', '2021-02-04', '2021-02-05',
# '2021-02-06', '2021-02-07', '2021-02-08', '2021-02-09',
# '2021-02-10', '2021-02-11', '2021-02-12', '2021-02-13',
# '2021-02-14', '2021-02-15', '2021-02-16', '2021-02-17',
# '2021-02-18', '2021-02-19', '2021-02-20', '2021-02-21',
# '2021-02-22', '2021-02-23', '2021-02-24', '2021-02-25',
# '2021-02-26', '2021-02-27', '2021-02-28'],
# dtype='datetime64[ns]', freq='D')
# 索引
# pd.Index(
# pd.date_range(
# fstart,
# (pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
# ),
# name='date_id'
# )
# DatetimeIndex(['2020-11-30', '2020-12-01', '2020-12-02', '2020-12-03',
# '2020-12-04', '2020-12-05', '2020-12-06', '2020-12-07',
# '2020-12-08', '2020-12-09', '2020-12-10', '2020-12-11',
# '2020-12-12', '2020-12-13', '2020-12-14', '2020-12-15',
# '2020-12-16', '2020-12-17', '2020-12-18', '2020-12-19',
# '2020-12-20', '2020-12-21', '2020-12-22', '2020-12-23',
# '2020-12-24', '2020-12-25', '2020-12-26', '2020-12-27',
# '2020-12-28', '2020-12-29', '2020-12-30', '2020-12-31',
# '2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04',
# '2021-01-05', '2021-01-06', '2021-01-07', '2021-01-08',
# '2021-01-09', '2021-01-10', '2021-01-11', '2021-01-12',
# '2021-01-13', '2021-01-14', '2021-01-15', '2021-01-16',
# '2021-01-17', '2021-01-18', '2021-01-19', '2021-01-20',
# '2021-01-21', '2021-01-22', '2021-01-23', '2021-01-24',
# '2021-01-25', '2021-01-26', '2021-01-27', '2021-01-28',
# '2021-01-29', '2021-01-30', '2021-01-31', '2021-02-01',
# '2021-02-02', '2021-02-03', '2021-02-04', '2021-02-05',
# '2021-02-06', '2021-02-07', '2021-02-08', '2021-02-09',
# '2021-02-10', '2021-02-11', '2021-02-12', '2021-02-13',
# '2021-02-14', '2021-02-15', '2021-02-16', '2021-02-17',
# '2021-02-18', '2021-02-19', '2021-02-20', '2021-02-21',
# '2021-02-22', '2021-02-23', '2021-02-24', '2021-02-25',
# '2021-02-26', '2021-02-27', '2021-02-28'],
# dtype='datetime64[ns]', name='date_id', freq='D')
# pdf_event
# holiday festival
# date_id
# 2020-11-30 0 0
# 2020-12-01 0 0
for i in _pdf_event.values:
pdf_event.loc[i[5]:i[6], i[2].lower()] = np.maximum(pdf_event.loc[i[5]:i[6], i[2].lower()], int(i[3][-1])) # 这里考虑的是holiday和festival的影响哪些日期时段
pdf_event.reset_index(inplace=True)
pdf_event['date_id'] = pdf_event['date_id'].dt.strftime('%Y%m%d').astype(int)
pdf_event
# date_id holiday festival
# 2020-12-30 0 0
# 2020-12-31 0 1
# 2021-01-01 3 0
# 2021-01-02 3 0
# 2021-01-03 3 0
# 2021-01-04 0 0
# 2021-01-05 0 0
# 2021-01-06 0 0
# 2021-01-07 0 0
# 2021-01-08 0 0
_pdf_store
_pdf_store = spark.sql("select global_store_number, trade_area_type from wfm_dw1.wfm_store2").toPandas()
_pdf_store.loc[:, '_trade_area_type'] = _pdf_store['trade_area_type'].apply(lambda x: re.sub('[^A-Z]', '', re.split('[ -]', x.replace('Other', 'OT'))[0]) if x else 'NO')
# global_store_number trade_area_type _trade_area_type
# 0 58088 TS-Airport TS
# 1 47874 C-District C
store_dict门店类型特征
_trade_area_types = list(set(_pdf_store['_trade_area_type']))
# ['O', 'NO', 'R', 'OT', 'H', 'TS', 'X', 'T', 'C', 'RT', 'S']
_trade_area_type_map = {_trade_area_types[i]: i for i in range(len(_trade_area_types))}
# {'O': 0,
# 'NO': 1,
# 'R': 2,
# 'OT': 3,
# 'H': 4,
# 'TS': 5,
# 'X': 6,
# 'T': 7,
# 'C': 8,
# 'RT': 9,
# 'S': 10}
store_dict = {str(x[0]): _trade_area_type_map[x[2]] for x in _pdf_store.values}
# {'58088': 5,
# '47874': 8,
# '58041': 5,
# '23857': 7,
# '59466': 8,
# '24020': 8,
# '19601': 8}
channel_dict订单渠道特征
channel_dict = {'In-Store':0, 'MOD': 1, 'MOP': 2}
gen主要函数
def gen(df: pd.DataFrame) -> pd.DataFrame:
# df:每一家门店的每一个渠道的 DataFrame
global pdf_calendar, pdf_event, channel_dict, store_dict # 分别是 日历数据/事件数据/渠道字典数据/门店编号字典数据
# 初始化一个空的 pd.DataFrame,然后逐个添加特征列
feat = pd.DataFrame(index=df.index) # 记录原来分组前的DataFrame索引编号df.index
feat.loc[:, 'sale_day'] = df['sale_day']
# 原来字段数据带上,rename,注意后面所有 分类特征用 *_cat 标识, 数值特征用 *_real 标识
feat.loc[:, ['dt_half_hour_of_day_cat', 'fu_shift_0d_real']] = df[['half_hour', 'order_qty']].values
# channel type
feat['st_channel_cat'] = channel_dict[xx['channel'].values[0]]
# store type
feat['st_trade_area_type_cat'] = store_dict[xx['global_store_number'].values[0]]
#################################### date相关特征数据 时间相关序列 典型特征 都是 分类特征
#### 以feat['sale_day']的 作为 pdf_calendar.set_index('date_id') 索引筛选出 20201130--20210131--20210228
# pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['weekend', 'day_of_week', 'day_of_month', 'month', 'working_day']]
# weekend day_of_week day_of_month month working_day
# date_id
# 20201130 0 1 30 11 1
# 20201130 0 1 30 11 1
# 20201130 0 1 30 11 1
# 20201130 0 1 30 11 1
# 20201130 0 1 30 11 1
# ... ... ... ... ... ...
# 20210228 1 0 28 2 0
# 20210228 1 0 28 2 0
# 20210228 1 0 28 2 0
# 20210228 1 0 28 2 0
# 20210228 1 0 28 2 0
# pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['legal_holiday', 'related_festival']].isnull()
# legal_holiday related_festival
# date_id
# 20201130 True True
# 20201130 True True
# 20201130 True True
# 20201130 True True
# 20201130 True True
# ... ... ...
# 20210228 True True
# 20210228 True True
# 20210228 True True
# 20210228 True True
# 20210228 True True
# pdf_event.set_index('date_id').loc[feat['sale_day'].astype(int), ['holiday', 'festival']]
# holiday festival
# date_id
# 20201130 0 0
# 20201130 0 0
# 20201130 0 0
# 20201130 0 0
# 20201130 0 0
# ... ... ...
# 20210228 0 0
# 20210228 0 0
# 20210228 0 0
# 20210228 0 0
# 20210228 0 0
feat.loc[:, ['dt_weekend_cat', 'dt_day_of_week_cat', 'dt_day_of_month_cat', 'dt_month_of_year_cat', 'dt_working_day_cat']] = pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['weekend', 'day_of_week', 'day_of_month', 'month', 'working_day']].values
feat.loc[:, ['dt_legal_holiday_cat', 'dt_related_festival_cat']] = pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['legal_holiday', 'related_festival']].isnull().values
feat.loc[:, ['dt_ordinal_holiday_cat', 'dt_ordinal_festival_cat']] = pdf_event.set_index('date_id').loc[feat['sale_day'].astype(int), ['holiday', 'festival']].values
#################################### shift 数值特征
#### 小时特征偏移
for i in range(1, 24):
feat.loc[:, f'fu_shift_{i}hh_real'] = df['order_qty'].shift(i).values
#### 天特征偏移
for i in (1, 2, 3, 4, 5, 6, 7, 14, 21, 28, 35):
feat.loc[:, f'fu_shift_{i}d_real'] = df['order_qty'].shift(i * 48).values
#################################### aggregate
#### 天特征聚合
for i in ('mean', 'median'):
for j in (3, 4, 5, 6, 7):
feat.loc[:, f'fu_{i}_{j}d_real'] = getattr(feat[list(map(lambda x: f'fu_shift_{x}d_real', range(j)))], i)(axis=1).values # getattr(a, 'bar2', 3) # 返回对象属性值,属性 bar2 不存在,但设置了默认值
#### 周特征聚合
for i in ('mean', 'median'):
for j in (3, 4, 5, 6):
feat.loc[:, f'fu_{i}_{j}w_real'] = getattr(feat[list(map(lambda x: f'fu_shift_{x * 7}d_real', range(j)))], i)(axis=1).values
#################################### wave
#### Encoding Cyclical Features
#### 对于周期性的变量,如日期,月,日,时,分,单纯用数值表示或者简单按数值可取数量编码是欠妥的,如23时和凌晨1h,二者相差只有2h,但是如果只是将时按简单的数字做特征,23与1,二者相差22h,将严重误导模型学习的结果。
#### 所以有必要对诸如小时,分钟这样的周期性特征做合适的编码工作。最典型的编码方式是将一维数值变量扩展为二维的(正弦值,余弦值)来编码。
#### 某特征X,计算其最大取值max_value,如小时的最大取值是23时,max_value = 23
#### data['hour_sin'] = np.sin(2 * np.pi * data['hour']/23.0)
#### data['hour_cos'] = np.cos(2 * np.pi * data['hour']/23.0)
#### 将扩充后的特征Xsin,Xcos加入到特征集合中,去除其对应的原特征X(不用单独的“时”数值特征,用“时”的sin,cos值代替)
feat.loc[:, 'dt_sin_w_real'] = np.sin( (feat['dt_day_of_week_cat'] + feat['dt_half_hour_of_day_cat'] / 48) / 7 * 2 * np.pi).values
feat.loc[:, 'dt_cos_w_real'] = np.cos( (feat['dt_day_of_week_cat'] + feat['dt_half_hour_of_day_cat'] / 48) / 7 * 2 * np.pi).values
feat.loc[:, 'dt_sin_d_real'] = np.sin( feat['dt_half_hour_of_day_cat'] / 48 * 2 * np.pi).values
feat.loc[:, 'dt_cos_d_real'] = np.cos( feat['dt_half_hour_of_day_cat'] / 48 * 2 * np.pi).values
#################################### shift back (future available features) 预测的未来四周数据的可能特征
dt_cols = list(filter(lambda x: x.startswith('dt_'), feat.columns))
for i in (1, 2, 3, 4):
feat.loc[:, list(map(lambda x: x.replace('dt_', f'fa_{i * 7}_'), dt_cols))] = feat[dt_cols].shift(-i * 7 * 48).values
#################################### response fu_shift_0d_real = order_qty 实际值
#################################### res_7_real res_14_real res_21_real res_28_real
for i in (1, 2, 3, 4):
feat.loc[:, f'res_{i * 7}_real'] = feat['fu_shift_0d_real'].shift(-i * 7 * 48)
# dtype
cat_cols = list(filter(lambda x: x.endswith('_cat'), feat.columns)) # 类别特征列 52
real_cols = list(filter(lambda x: x.endswith('_real'), feat.columns))# 数值特征列 77
dtype_dict = {**{cat_col: int for cat_col in cat_cols}, **{real_col: float for real_col in real_cols}}
# {'dt_half_hour_of_day_cat': int,
# 'st_channel_cat': int,
# 'st_trade_area_type_cat': int,
# 'dt_weekend_cat': int,
# 'dt_day_of_week_cat': int,
# 'dt_day_of_month_cat': int,
# 'dt_month_of_year_cat': int,
# 'dt_working_day_cat': int,
# 'dt_legal_holiday_cat': int,
# 'dt_related_festival_cat': int,
# 'dt_ordinal_holiday_cat': int,
# 'dt_ordinal_festival_cat': int,
# 'fa_7_half_hour_of_day_cat': int,
# 'fa_7_weekend_cat': int,
# 'fa_7_day_of_week_cat': int,
# 'fa_7_day_of_month_cat': int,
# 'fa_7_month_of_year_cat': int,
# 'fa_7_working_day_cat': int,
# 'fa_7_legal_holiday_cat': int,
# 'fa_7_related_festival_cat': int,
# 'fa_7_ordinal_holiday_cat': int,
# 'fa_7_ordinal_festival_cat': int,
# 'fa_14_half_hour_of_day_cat': int,
# 'fa_14_weekend_cat': int,
# 'fa_14_day_of_week_cat': int,
# 'fa_14_day_of_month_cat': int,
# 'fa_14_month_of_year_cat': int,
# 'fa_14_working_day_cat': int,
# 'fa_14_legal_holiday_cat': int,
# 'fa_14_related_festival_cat': int,
# 'fa_14_ordinal_holiday_cat': int,
# 'fa_14_ordinal_festival_cat': int,
# 'fa_21_half_hour_of_day_cat': int,
# 'fa_21_weekend_cat': int,
# 'fa_21_day_of_week_cat': int,
# 'fa_21_day_of_month_cat': int,
# 'fa_21_month_of_year_cat': int,
# 'fa_21_working_day_cat': int,
# 'fa_21_legal_holiday_cat': int,
# 'fa_21_related_festival_cat': int,
# 'fa_21_ordinal_holiday_cat': int,
# 'fa_21_ordinal_festival_cat': int,
# 'fa_28_half_hour_of_day_cat': int,
# 'fa_28_weekend_cat': int,
# 'fa_28_day_of_week_cat': int,
# 'fa_28_day_of_month_cat': int,
# 'fa_28_month_of_year_cat': int,
# 'fa_28_working_day_cat': int,
# 'fa_28_legal_holiday_cat': int,
# 'fa_28_related_festival_cat': int,
# 'fa_28_ordinal_holiday_cat': int,
# 'fa_28_ordinal_festival_cat': int,
# 'fu_shift_0d_real': float,
# 'fu_shift_1hh_real': float,
# 'fu_shift_2hh_real': float,
# 'fu_shift_3hh_real': float,
# 'fu_shift_4hh_real': float,
# 'fu_shift_5hh_real': float,
# 'fu_shift_6hh_real': float,
# 'fu_shift_7hh_real': float,
# 'fu_shift_8hh_real': float,
# 'fu_shift_9hh_real': float,
# 'fu_shift_10hh_real': float,
# 'fu_shift_11hh_real': float,
# 'fu_shift_12hh_real': float,
# 'fu_shift_13hh_real': float,
# 'fu_shift_14hh_real': float,
# 'fu_shift_15hh_real': float,
# 'fu_shift_16hh_real': float,
# 'fu_shift_17hh_real': float,
# 'fu_shift_18hh_real': float,
# 'fu_shift_19hh_real': float,
# 'fu_shift_20hh_real': float,
# 'fu_shift_21hh_real': float,
# 'fu_shift_22hh_real': float,
# 'fu_shift_23hh_real': float,
# 'fu_shift_1d_real': float,
# 'fu_shift_2d_real': float,
# 'fu_shift_3d_real': float,
# 'fu_shift_4d_real': float,
# 'fu_shift_5d_real': float,
# 'fu_shift_6d_real': float,
# 'fu_shift_7d_real': float,
# 'fu_shift_14d_real': float,
# 'fu_shift_21d_real': float,
# 'fu_shift_28d_real': float,
# 'fu_shift_35d_real': float,
# 'fu_mean_3d_real': float,
# 'fu_mean_4d_real': float,
# 'fu_mean_5d_real': float,
# 'fu_mean_6d_real': float,
# 'fu_mean_7d_real': float,
# 'fu_median_3d_real': float,
# 'fu_median_4d_real': float,
# 'fu_median_5d_real': float,
# 'fu_median_6d_real': float,
# 'fu_median_7d_real': float,
# 'fu_mean_3w_real': float,
# 'fu_mean_4w_real': float,
# 'fu_mean_5w_real': float,
# 'fu_mean_6w_real': float,
# 'fu_median_3w_real': float,
# 'fu_median_4w_real': float,
# 'fu_median_5w_real': float,
# 'fu_median_6w_real': float,
# 'dt_sin_w_real': float,
# 'dt_cos_w_real': float,
# 'dt_sin_d_real': float,
# 'dt_cos_d_real': float,
# 'fa_7_sin_w_real': float,
# 'fa_7_cos_w_real': float,
# 'fa_7_sin_d_real': float,
# 'fa_7_cos_d_real': float,
# 'fa_14_sin_w_real': float,
# 'fa_14_cos_w_real': float,
# 'fa_14_sin_d_real': float,
# 'fa_14_cos_d_real': float,
# 'fa_21_sin_w_real': float,
# 'fa_21_cos_w_real': float,
# 'fa_21_sin_d_real': float,
# 'fa_21_cos_d_real': float,
# 'fa_28_sin_w_real': float,
# 'fa_28_cos_w_real': float,
# 'fa_28_sin_d_real': float,
# 'fa_28_cos_d_real': float,
# 'res_7_real': float,
# 'res_14_real': float,
# 'res_21_real': float,
# 'res_28_real': float}
# 去除数据并数据转换为 dtype_dict
return feat.dropna().astype(dtype_dict)
生成预测数据特征表 wfm_dw1.wfm_order_predicting_feat
3 模型预测
new_table = 0
for i in range(max_i):
for j in range(max_j):
if len(cluster_dict[(i, j)]) > 1:
sql_query = f"select * from wfm_dw1.wfm_order_predicting_feat where global_store_number in {tuple(cluster_dict[(i, j)])}"
else:
sql_query = f"select * from wfm_dw1.wfm_order_predicting_feat where global_store_number == '{cluster_dict[(i, j)][0]}'"
sdf = spark.sql(sql_query)
#### 预测 时间长度
#### pred_len = 7/14/21/28
pred_len = length
label = f'res_{pred_len}_real'
prediction = f'res_{pred_len}_prediction_real'
feature_name = list(filter(lambda x: x.startswith(f'fa_{pred_len}_') or x.startswith('fu_') or x.startswith('st_'), sdf.columns))
# pred_len = 7
# feature_name = list(filter(lambda x: x.startswith(f'fa_{pred_len}_') or x.startswith('fu_') or x.startswith('st_'), sdf.columns))
# ['fu_shift_0d_real',
# 'st_channel_cat',
# 'st_trade_area_type_cat',
# 'fu_shift_1hh_real',
# 'fu_shift_2hh_real',
# 'fu_shift_3hh_real',
# 'fu_shift_4hh_real',
# 'fu_shift_5hh_real',
# 'fu_shift_6hh_real',
# 'fu_shift_7hh_real',
# 'fu_shift_8hh_real',
# 'fu_shift_9hh_real',
# 'fu_shift_10hh_real',
# 'fu_shift_11hh_real',
# 'fu_shift_12hh_real',
# 'fu_shift_13hh_real',
# 'fu_shift_14hh_real',
# 'fu_shift_15hh_real',
# 'fu_shift_16hh_real',
# 'fu_shift_17hh_real',
# 'fu_shift_18hh_real',
# 'fu_shift_19hh_real',
# 'fu_shift_20hh_real',
# 'fu_shift_21hh_real',
# 'fu_shift_22hh_real',
# 'fu_shift_23hh_real',
# 'fu_shift_1d_real',
# 'fu_shift_2d_real',
# 'fu_shift_3d_real',
# 'fu_shift_4d_real',
# 'fu_shift_5d_real',
# 'fu_shift_6d_real',
# 'fu_shift_7d_real',
# 'fu_shift_14d_real',
# 'fu_shift_21d_real',
# 'fu_shift_28d_real',
# 'fu_shift_35d_real',
# 'fu_mean_3d_real',
# 'fu_mean_4d_real',
# 'fu_mean_5d_real',
# 'fu_mean_6d_real',
# 'fu_mean_7d_real',
# 'fu_median_3d_real',
# 'fu_median_4d_real',
# 'fu_median_5d_real',
# 'fu_median_6d_real',
# 'fu_median_7d_real',
# 'fu_mean_3w_real',
# 'fu_mean_4w_real',
# 'fu_mean_5w_real',
# 'fu_mean_6w_real',
# 'fu_median_3w_real',
# 'fu_median_4w_real',
# 'fu_median_5w_real',
# 'fu_median_6w_real',
# 'fa_7_half_hour_of_day_cat',
# 'fa_7_weekend_cat',
# 'fa_7_day_of_week_cat',
# 'fa_7_day_of_month_cat',
# 'fa_7_month_of_year_cat',
# 'fa_7_working_day_cat',
# 'fa_7_legal_holiday_cat',
# 'fa_7_related_festival_cat',
# 'fa_7_ordinal_holiday_cat',
# 'fa_7_ordinal_festival_cat',
# 'fa_7_sin_w_real',
# 'fa_7_cos_w_real',
# 'fa_7_sin_d_real',
# 'fa_7_cos_d_real']
## 去除几个特征
feature_name.remove(f'fa_{pred_len}_day_of_month_cat')
feature_name.remove(f'fa_{pred_len}_month_of_year_cat')
feature_name.remove(f'fa_{pred_len}_half_hour_of_day_cat')
## 筛选类别特征
categorical_feature = list(filter(lambda x: x.endswith('_cat'), feature_name))
# ['st_channel_cat',
# 'st_trade_area_type_cat',
# 'fa_7_weekend_cat',
# 'fa_7_day_of_week_cat',
# 'fa_7_working_day_cat',
# 'fa_7_legal_holiday_cat',
# 'fa_7_related_festival_cat',
# 'fa_7_ordinal_holiday_cat',
# 'fa_7_ordinal_festival_cat']
va = ft.VectorAssembler(inputCols = feature_name, outputCol = 'features', handleInvalid = "keep")
sdf_va = va.transform(sdf)
sdf_va_train = sdf_va.filter(sdf_va['sale_day'] >= tstart).filter(sdf_va['sale_day'] <= tend)、
if train:
os.system(f"hadoop fs -rm -r hdfs://ns1/user/rainbow_admin/wfm_order_predicting_model_{i}_{j}_{pred_len}_{mode}")
if sdf_va_train.count():
print(f'Group {i}, cluster {j}, TRAINING')
lgb = LightGBMRegressor(
baggingFraction=0.8,
baggingFreq=10,
baggingSeed=3,
boostingType='gbdt',
categoricalSlotNames=categorical_feature,
# earlyStoppingRound=100,
featureFraction=0.8,
labelCol=label,
learningRate=0.1,
maxBin=63,
maxDepth=-1,
metric='mape',
minSumHessianInLeaf=0.001,
numIterations=100,
numLeaves=31,
objective='regression',
predictionCol=prediction,
verbosity=1
)
model = lgb.fit(sdf_va_train)
model.saveNativeModel(f"wfm_order_predicting_model_{i}_{j}_{pred_len}_{mode}")
try:
model = LightGBMRegressionModel.loadNativeModelFromFile(f"wfm_order_predicting_model_{i}_{j}_{pred_len}_{mode}", labelColName = label, predictionColName = prediction)
except AnalysisException:
print(f'Group {i}, cluster {j}, PASS')
next
sdf_va_test = sdf_va.filter(sdf_va['sale_day'] >= pstart).filter(sdf_va['sale_day'] <= pend)
sdf_result = model.transform(sdf_va_test).select(['global_store_number', 'channel', 'sale_day', 'dt_half_hour_of_day_cat', label, prediction])
table = f'wfm_order_predicting_result_{pred_len}_{mode}'
if new_table:
sdf_result.withColumn('group_id', func.lit(i)) \
.withColumn('cluster_id', func.lit(j)) \
.write.saveAsTable(
f'wfm_dw1.{table}', format='orc',
mode='overwrite', partitionBy=['group_id', 'cluster_id']
)
new_table = 0
else:
sdf_result.withColumn('group_id', func.lit(i)) \
.withColumn('cluster_id', func.lit(j)) \
.write.insertInto(f'wfm_dw1.{table}', overwrite=False)
4 生产调度
这里使用的是mmlspark-LightGBM 需要jar包,可以去官网下载
提交代码时:--conf spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1,com.microsoft.ml.lightgbm:lightgbmlib:2.3.100
# !/bin/bash
current_dir=$(cd $(dirname $0); pwd)
cd ${current_dir}
source /etc/profile
source ../global_config.sh
bash /root/update_kerberos.sh
function algorithm_python_features()
{
# tstart=$(date -d"24 month ago ${pred_time}" +%Y%m%d)
pstart=$(date -d"6 day ago ${pred_time}" +%Y%m%d)
tend=$(date -d"1 month ago ${pred_time}" +%Y%m%d)
tstart=$(date -d"6 month ago ${tend}" +%Y%m%d)
/opt/spark-2.4.4/bin/spark-submit \
--master yarn-client \
--executor-memory 8g \
--executor-cores 4 \
--queue root.rainbow_wfm \
--num-executors 4 \
--conf spark.memory.fraction=0.8 \
--conf spark.driver.maxResultSize=4g \
--conf spark.sql.sources.partitionOverwriteMode=dynamic \
--conf spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1,com.microsoft.ml.lightgbm:lightgbmlib:2.3.100 \
../dataProcess/python/jobs/driver_wfm_order_predicting_result.py --train ${1} --length ${2} --tstart ${tstart} --tend ${tend} --pstart ${pstart} --pend ${pred_time}
echo "/opt/spark-2.4.4/bin/spark-submit \
--master yarn-client \
--executor-memory 8g \
--executor-cores 4 \
--queue root.rainbow_wfm \
--num-executors 4 \
--conf spark.memory.fraction=0.8 \
--conf spark.driver.maxResultSize=4g \
--conf spark.sql.sources.partitionOverwriteMode=dynamic \
--conf spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1,com.microsoft.ml.lightgbm:lightgbmlib:2.3.100 \
/data/rainbow_wfm/sqw/wfmforecast-master/wfmforecast/drivers/driver_wfm_order_predicting_result.py --train ${1} --length ${2} --tstart ${tstart} --tend ${tend} --pstart ${pstart} --pend ${pred_time}"
}
startDate=$(date -d "${pred_start_time}" +%s)
endDate=$(date -d "${pred_time}" +%s)
##计算两个时间戳的差值除于每天86400s即为天数差
stampDiff=`expr $endDate - $startDate`
dayDiff=`expr $stampDiff / 86400`
echo $dayDiff
flag28=$(( $dayDiff % 28 ))
flag7=$(( $dayDiff % 7 ))
if [ ${flag28} = 0 ]; then
echo "train predicting ${1} run"
algorithm_python_features 1 ${1}
elif [ ${flag7} = 0 ]; then
echo "predicting 0 ${1} run"
algorithm_python_features 0 ${1}
else
echo "${pred_time} predicting no run"
fi
# pred_start_time="20210103"
# pred_time="20210131"
# start_time="20210103"
# end_time="20210131"