带着爱和梦想去生活

mmlspark订单量预测案例

· Read in about 13 min · (2643 Words)

本文是前文 Spark预测算法端到端案例的姊妹篇,那一篇用的是普通的lightgbm做的订单量预测,后面随着门店的增加,运行效率不高。

本文主要采用了微软开源的 Microsoft Machine Learning for Apache Spark

https://github.com/Azure/mmlspark

具体用法在上文 Lightgbm在spark实战介绍过,可参考学习。。。

1 门店分类

由于门店数据量巨大,所有门店数据在一起做模型训练,即使在Spark环境下也非常吃性能,所以这里先将门店进行聚类分类,然后在针对同一类型的门店一起训练。

根据 wfm_dw1.wfm_order_channel_half_hour 渠道半小时订单量数据 生成 7天每半小时特征数据 24*2*7

训练 数据:20201130--20210131 预测未来28天数据:20210201--20210228

tstart=“20201130” tend=“20210131”

原数据pdf

+-------------------+--------+---------+---------+
|global_store_number|sale_day|half_hour|order_qty|
+-------------------+--------+---------+---------+
|              28710|20201210|       20|     17.0|
|              28710|20201230|       19|     15.0|
|              28710|20201211|       13|     19.0|
|              28710|20201203|       31|      1.0|
|              28710|20201205|       11|      3.0|
|              28710|20210111|       19|     14.0|
|              28710|20210119|       16|     22.0|
|              28710|20210130|       23|      5.0|
|              28710|20210107|       16|     30.0|
|              28710|20210117|       23|      8.0|
|              28710|20210113|       10|     22.0|
|              28710|20210113|       26|      2.0|
|              28710|20210113|       12|     12.0|
|              28710|20201211|       26|      3.0|
|              28710|20201226|       11|      9.0|
|              28710|20201215|       21|     13.0|
|              28710|20201222|       19|     17.0|
|              28710|20201201|       12|      9.0|
|              28710|20201215|       31|      2.0|
|              28710|20201215|       23|     13.0|
+-------------------+--------+---------+---------+

缺失时段数据修补

def inter(df: pd.DataFrame) -> pd.DataFrame:
    global tend
    min_sale_day = df['sale_day'].min()
    idx = pd.MultiIndex.from_product([pd.date_range(start=min_sale_day, end=tend, freq='D').strftime('%Y%m%d'), range(48)], names=('sale_day', 'half_hour'))    
    return df.set_index(['sale_day', 'half_hour']).reindex(idx)[['order_qty']].fillna(0).reset_index()   # 原来数据(有些时段) 有缺失值 ,补齐,保证每天都有48个时段

pdf_inter = inter(pdf)
pdf_inter
sale_day	half_hour	order_qty
0	20201130	0	0.0
1	20201130	1	0.0
2	20201130	2	0.0
3	20201130	3	0.0
4	20201130	4	0.0
...	...	...	...
3019	20210131	43	0.0
3020	20210131	44	0.0
3021	20210131	45	0.0
3022	20210131	46	0.0
3023	20210131	47	0.0

生成wfm_dw1.wfm_order_clustering_feat

这里以 门店编号 28710举例

pdf_inter.loc[:, 'day_of_week'] = pd.to_datetime(pdf_inter['sale_day']).dt.weekday
pdf_recon = pdf_inter.groupby(['day_of_week', 'half_hour']).mean()[['order_qty']].transpose()    # 这里去每周几的每个半小时时间段内的平均值
day_of_week_dict = {0: 'Mon', 1: 'Tue', 2: 'Wed', 3: 'Thu', 4: 'Fri', 5: 'Sat', 6: 'Sun'}
pdf_recon.columns, pdf_recon.index = list(map(lambda x: f'{day_of_week_dict[x[0]]}_{x[1]}', pdf_recon.columns)), [0]
pdf_recon = pd.DataFrame(
    pdf_recon,
    columns=[f'{x}_{y}' for x in ['Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'] for y in range(48)]
).fillna(0)
adt = pdf_recon.sum(axis=1)
pdf_recon = pdf_recon.div(adt, axis=0)
pdf_recon.loc[:, 'adt'] = adt
pdf_recon.loc[:, 'global_store_number'] = '28710'
pdf_recon
# pdf_recon.shape[0] = 338 = 24*2*7 + 2(adt和global_store_number)
   Mon_0	Mon_1	Mon_2	Mon_3	Mon_4	Mon_5	Mon_6	Mon_7	Mon_8	Mon_9	...	Sun_40	Sun_41	Sun_42	Sun_43	Sun_44	Sun_45	Sun_46	Sun_47	adt	global_store_number
0	0.0	0.0	0.0	0.0	0.0	0.0	0.00033	0.006065	0.011274	0.012526	...	0.0	0.0	0.0	0.0	0.0	0.0	0.0	0.0	1685.333333	28710

门店分类

def cluster_kmeans():
    wfm_order_clustering_feat = spark.sql("select * from wfm_dw1.wfm_order_clustering_feat").toPandas()    
    wfm_order_clustering_feat.sort_values('adt', ascending=False, inplace=True)   # 排序为了下面 聚类的adt 尽可能类似
    wfm_order_clustering_feat.dropna(inplace=True)                                # (5085, 338)  去除没销量的门店

    kmeans_dict = dict()
    result_dict = dict()
    
    for idx_1 in range(10):
        # wfm_order_clustering_feat.iloc[idx_1*500:(idx_1+1)*500, :-2]  0,500,1000,1500,2000,... 每500行提取一下数据门店数据做聚类
        kmeans = KMeans(n_clusters=10, random_state=0).fit(wfm_order_clustering_feat.iloc[idx_1*500:(idx_1+1)*500, :-2])    # 5000多家门店 根据 adt  分成10个聚类,
        kmeans_dict[idx_1] = kmeans
        for idx_2 in range(10):
            result_dict[(idx_1, idx_2)] = wfm_order_clustering_feat.loc[wfm_order_clustering_feat.index[idx_1*500:(idx_1+1)*500][kmeans.labels_ == idx_2], 'global_store_number'].values
    return result_dict

生成的分类结果:

result_dict
{(0,
  0): array(['18482', '19265', '61039', '16212', '23566', '20564', '22051',
        '29198', '8013', '47230', '23034', '51774', '53654', '51775',
        '58989', '15907', '23994', '25146', '25145', '26871'], dtype=object),
 (0,
  1): array(['35813', '35008', '50162', '29262', '31904', '27148', '32135',
        '58172', '22407', '26566', '31206', '31374', '48188', '47357',
        '13179', '1692', '15312', '34004', '50243', '19567', '49632',
        '56593', '22686', '19441', '15061', '1648', '49902', '28109',
        '35806', '31185', '26727', '35005', '27226', '21577', '19370',
        '50589', '51453', '25167', '22903', '53355', '27613', '16796',
        '51642', '47397', '52559', '27473', '35808', '23971', '25216',
        '15021', '16334'], dtype=object)
        ......

# 每个分类大小不一
[len(i) for i in result_dict.values()]
[20,
 51,
 80,
 68
 ...
]

2 特征数据生成

数据依然选 wfm_dw1.wfm_order_channel_half_hour 处理方式不同,根据 ‘global_store_number’, ‘channel’分组后

缺失时段数据修补

def inter(df: pd.DataFrame) -> pd.DataFrame:
    min_sale_day = df['sale_day'].min()
    max_sale_day = (pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
    idx = pd.MultiIndex.from_product([pd.date_range(start=min_sale_day, end=max_sale_day, freq='D').strftime('%Y%m%d'), range(48)], names=('sale_day', 'half_hour'))
    return df.set_index(['sale_day', 'half_hour']).reindex(idx)[['order_qty']].fillna(0).reset_index()

pdf_inter = pdf.groupby(['global_store_number', 'channel']).apply(inter)  # 对每一家门店的每一个渠道进行分组,然后处理
pdf_inter.index = pdf_inter.index.droplevel(-1)   # 删除最后一个索引数据
pdf_inter.reset_index(inplace=True)
pdf_inter
  global_store_number	channel  	sale_day	half_hour	order_qty
0	28710	In-Store	20201130	0	0.0
1	28710	In-Store	20201130	1	0.0
2	28710	In-Store	20201130	2	0.0
3	28710	In-Store	20201130	3	0.0
4	28710	In-Store	20201130	4	0.0
...	...	...	...	...	...
13099	28710	MOP	20210228	43	0.0
13100	28710	MOP	20210228	44	0.0
13101	28710	MOP	20210228	45	0.0
13102	28710	MOP	20210228	46	0.0
13103	28710	MOP	20210228	47	0.0

添加特征

pdf_feat = pdf_inter.groupby(['global_store_number', 'channel']).apply(gen)  # 对每一家门店的每一个渠道进行分组,然后处理

pdf_calendar

pdf_event 门店holiday和festival特征

_pdf_event = execute_sql("select * from arp_dw1.mdm_event_mgt").drop_duplicates(subset=['event_id', 'effective_from', 'effective_to'])
pdf_event = pd.DataFrame(
    0,
    index=pd.Index(
        pd.date_range(
            fstart,
            (pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
        ),
        name='date_id'
    ),
    columns=['holiday', 'festival']
)
# pd.to_datetime(fend) + pd.DateOffset(days=28)
# Timestamp('2021-02-28 00:00:00')

# 训练数据2020-11-30--2021-01-31   以及预测数据2021-02-01--2021-02-28
# pd.date_range(
#             fstart,
#             (pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
#         )
# DatetimeIndex(['2020-11-30', '2020-12-01', '2020-12-02', '2020-12-03',
#                '2020-12-04', '2020-12-05', '2020-12-06', '2020-12-07',
#                '2020-12-08', '2020-12-09', '2020-12-10', '2020-12-11',
#                '2020-12-12', '2020-12-13', '2020-12-14', '2020-12-15',
#                '2020-12-16', '2020-12-17', '2020-12-18', '2020-12-19',
#                '2020-12-20', '2020-12-21', '2020-12-22', '2020-12-23',
#                '2020-12-24', '2020-12-25', '2020-12-26', '2020-12-27',
#                '2020-12-28', '2020-12-29', '2020-12-30', '2020-12-31',
#                '2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04',
#                '2021-01-05', '2021-01-06', '2021-01-07', '2021-01-08',
#                '2021-01-09', '2021-01-10', '2021-01-11', '2021-01-12',
#                '2021-01-13', '2021-01-14', '2021-01-15', '2021-01-16',
#                '2021-01-17', '2021-01-18', '2021-01-19', '2021-01-20',
#                '2021-01-21', '2021-01-22', '2021-01-23', '2021-01-24',
#                '2021-01-25', '2021-01-26', '2021-01-27', '2021-01-28',
#                '2021-01-29', '2021-01-30', '2021-01-31', '2021-02-01',
#                '2021-02-02', '2021-02-03', '2021-02-04', '2021-02-05',
#                '2021-02-06', '2021-02-07', '2021-02-08', '2021-02-09',
#                '2021-02-10', '2021-02-11', '2021-02-12', '2021-02-13',
#                '2021-02-14', '2021-02-15', '2021-02-16', '2021-02-17',
#                '2021-02-18', '2021-02-19', '2021-02-20', '2021-02-21',
#                '2021-02-22', '2021-02-23', '2021-02-24', '2021-02-25',
#                '2021-02-26', '2021-02-27', '2021-02-28'],
#               dtype='datetime64[ns]', freq='D')

# 索引
# pd.Index(
#         pd.date_range(
#             fstart,
#             (pd.to_datetime(fend) + pd.DateOffset(days=28)).strftime('%Y%m%d')
#         ),
#         name='date_id'
#     )
# DatetimeIndex(['2020-11-30', '2020-12-01', '2020-12-02', '2020-12-03',
#                '2020-12-04', '2020-12-05', '2020-12-06', '2020-12-07',
#                '2020-12-08', '2020-12-09', '2020-12-10', '2020-12-11',
#                '2020-12-12', '2020-12-13', '2020-12-14', '2020-12-15',
#                '2020-12-16', '2020-12-17', '2020-12-18', '2020-12-19',
#                '2020-12-20', '2020-12-21', '2020-12-22', '2020-12-23',
#                '2020-12-24', '2020-12-25', '2020-12-26', '2020-12-27',
#                '2020-12-28', '2020-12-29', '2020-12-30', '2020-12-31',
#                '2021-01-01', '2021-01-02', '2021-01-03', '2021-01-04',
#                '2021-01-05', '2021-01-06', '2021-01-07', '2021-01-08',
#                '2021-01-09', '2021-01-10', '2021-01-11', '2021-01-12',
#                '2021-01-13', '2021-01-14', '2021-01-15', '2021-01-16',
#                '2021-01-17', '2021-01-18', '2021-01-19', '2021-01-20',
#                '2021-01-21', '2021-01-22', '2021-01-23', '2021-01-24',
#                '2021-01-25', '2021-01-26', '2021-01-27', '2021-01-28',
#                '2021-01-29', '2021-01-30', '2021-01-31', '2021-02-01',
#                '2021-02-02', '2021-02-03', '2021-02-04', '2021-02-05',
#                '2021-02-06', '2021-02-07', '2021-02-08', '2021-02-09',
#                '2021-02-10', '2021-02-11', '2021-02-12', '2021-02-13',
#                '2021-02-14', '2021-02-15', '2021-02-16', '2021-02-17',
#                '2021-02-18', '2021-02-19', '2021-02-20', '2021-02-21',
#                '2021-02-22', '2021-02-23', '2021-02-24', '2021-02-25',
#                '2021-02-26', '2021-02-27', '2021-02-28'],
#               dtype='datetime64[ns]', name='date_id', freq='D')

# pdf_event
#            holiday	festival
# date_id		
# 2020-11-30	     0	0
# 2020-12-01	     0	0

for i in _pdf_event.values:
    pdf_event.loc[i[5]:i[6], i[2].lower()] = np.maximum(pdf_event.loc[i[5]:i[6], i[2].lower()], int(i[3][-1]))   # 这里考虑的是holiday和festival的影响哪些日期时段
pdf_event.reset_index(inplace=True)
pdf_event['date_id'] = pdf_event['date_id'].dt.strftime('%Y%m%d').astype(int)
pdf_event
# date_id   holiday	festival
# 2020-12-30	0	0
# 2020-12-31	0	1
# 2021-01-01	3	0
# 2021-01-02	3	0
# 2021-01-03	3	0
# 2021-01-04	0	0
# 2021-01-05	0	0
# 2021-01-06	0	0
# 2021-01-07	0	0
# 2021-01-08	0	0

_pdf_store

_pdf_store = spark.sql("select global_store_number, trade_area_type from wfm_dw1.wfm_store2").toPandas()
_pdf_store.loc[:, '_trade_area_type'] = _pdf_store['trade_area_type'].apply(lambda x: re.sub('[^A-Z]', '', re.split('[ -]', x.replace('Other', 'OT'))[0]) if x else 'NO')
# global_store_number	trade_area_type	_trade_area_type
# 0	58088	            TS-Airport	TS
# 1	47874	            C-District	C

store_dict门店类型特征

_trade_area_types = list(set(_pdf_store['_trade_area_type']))
# ['O', 'NO', 'R', 'OT', 'H', 'TS', 'X', 'T', 'C', 'RT', 'S']

_trade_area_type_map = {_trade_area_types[i]: i for i in range(len(_trade_area_types))}
#     {'O': 0,
#  'NO': 1,
#  'R': 2,
#  'OT': 3,
#  'H': 4,
#  'TS': 5,
#  'X': 6,
#  'T': 7,
#  'C': 8,
#  'RT': 9,
#  'S': 10}

store_dict = {str(x[0]): _trade_area_type_map[x[2]] for x in _pdf_store.values}
# {'58088': 5,
#  '47874': 8,
#  '58041': 5,
#  '23857': 7,
#  '59466': 8,
#  '24020': 8,
#  '19601': 8}

channel_dict订单渠道特征

channel_dict = {'In-Store':0, 'MOD': 1, 'MOP': 2}

gen主要函数

def gen(df: pd.DataFrame) -> pd.DataFrame:
    # df:每一家门店的每一个渠道的 DataFrame
    global pdf_calendar, pdf_event, channel_dict, store_dict    # 分别是 日历数据/事件数据/渠道字典数据/门店编号字典数据

    # 初始化一个空的 pd.DataFrame,然后逐个添加特征列
    feat = pd.DataFrame(index=df.index)  # 记录原来分组前的DataFrame索引编号df.index
    feat.loc[:, 'sale_day'] = df['sale_day']

    # 原来字段数据带上,rename,注意后面所有    分类特征用 *_cat 标识, 数值特征用 *_real 标识
    feat.loc[:, ['dt_half_hour_of_day_cat', 'fu_shift_0d_real']] = df[['half_hour', 'order_qty']].values

    # channel type
    feat['st_channel_cat'] = channel_dict[xx['channel'].values[0]]

    # store type
    feat['st_trade_area_type_cat'] = store_dict[xx['global_store_number'].values[0]]

    #################################### date相关特征数据 时间相关序列  典型特征  都是 分类特征
    #### 以feat['sale_day']的 作为 pdf_calendar.set_index('date_id') 索引筛选出 20201130--20210131--20210228
# pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['weekend', 'day_of_week', 'day_of_month', 'month', 'working_day']]
#        weekend	day_of_week	day_of_month	month	working_day
# date_id					
# 20201130	0	1	30	11	1
# 20201130	0	1	30	11	1
# 20201130	0	1	30	11	1
# 20201130	0	1	30	11	1
# 20201130	0	1	30	11	1
# ...	...	...	...	...	...
# 20210228	1	0	28	2	0
# 20210228	1	0	28	2	0
# 20210228	1	0	28	2	0
# 20210228	1	0	28	2	0
# 20210228	1	0	28	2	0

# pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['legal_holiday', 'related_festival']].isnull()
#        legal_holiday	related_festival
# date_id		
# 20201130	True	True
# 20201130	True	True
# 20201130	True	True
# 20201130	True	True
# 20201130	True	True
# ...	...	...
# 20210228	True	True
# 20210228	True	True
# 20210228	True	True
# 20210228	True	True
# 20210228	True	True

# pdf_event.set_index('date_id').loc[feat['sale_day'].astype(int), ['holiday', 'festival']]
#         holiday	festival
# date_id		
# 20201130	0	0
# 20201130	0	0
# 20201130	0	0
# 20201130	0	0
# 20201130	0	0
# ...	...	...
# 20210228	0	0
# 20210228	0	0
# 20210228	0	0
# 20210228	0	0
# 20210228	0	0
    
    feat.loc[:, ['dt_weekend_cat', 'dt_day_of_week_cat', 'dt_day_of_month_cat', 'dt_month_of_year_cat', 'dt_working_day_cat']] = pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['weekend', 'day_of_week', 'day_of_month', 'month', 'working_day']].values
    feat.loc[:, ['dt_legal_holiday_cat', 'dt_related_festival_cat']] = pdf_calendar.set_index('date_id').loc[feat['sale_day'].astype(int), ['legal_holiday', 'related_festival']].isnull().values
    feat.loc[:, ['dt_ordinal_holiday_cat', 'dt_ordinal_festival_cat']] = pdf_event.set_index('date_id').loc[feat['sale_day'].astype(int), ['holiday', 'festival']].values

    #################################### shift 数值特征
    #### 小时特征偏移
    for i in range(1, 24):
        feat.loc[:, f'fu_shift_{i}hh_real'] = df['order_qty'].shift(i).values

    #### 天特征偏移
    for i in (1, 2, 3, 4, 5, 6, 7, 14, 21, 28, 35):
        feat.loc[:, f'fu_shift_{i}d_real'] = df['order_qty'].shift(i * 48).values

    #################################### aggregate
    #### 天特征聚合
    for i in ('mean', 'median'):
        for j in (3, 4, 5, 6, 7):
            feat.loc[:, f'fu_{i}_{j}d_real'] = getattr(feat[list(map(lambda x: f'fu_shift_{x}d_real', range(j)))], i)(axis=1).values      # getattr(a, 'bar2', 3)    # 返回对象属性值,属性 bar2 不存在,但设置了默认值

    #### 周特征聚合
    for i in ('mean', 'median'):
        for j in (3, 4, 5, 6):
            feat.loc[:, f'fu_{i}_{j}w_real'] = getattr(feat[list(map(lambda x: f'fu_shift_{x * 7}d_real', range(j)))], i)(axis=1).values

    #################################### wave
    #### Encoding Cyclical Features
    #### 对于周期性的变量,如日期,月,日,时,分,单纯用数值表示或者简单按数值可取数量编码是欠妥的,如23时和凌晨1h,二者相差只有2h,但是如果只是将时按简单的数字做特征,23与1,二者相差22h,将严重误导模型学习的结果。
    #### 所以有必要对诸如小时,分钟这样的周期性特征做合适的编码工作。最典型的编码方式是将一维数值变量扩展为二维的(正弦值,余弦值)来编码。
    #### 某特征X,计算其最大取值max_value,如小时的最大取值是23时,max_value = 23
    #### data['hour_sin'] = np.sin(2 * np.pi * data['hour']/23.0)
    #### data['hour_cos'] = np.cos(2 * np.pi * data['hour']/23.0)
    #### 将扩充后的特征Xsin,Xcos加入到特征集合中,去除其对应的原特征X(不用单独的“时”数值特征,用“时”的sin,cos值代替)
    feat.loc[:, 'dt_sin_w_real'] = np.sin(    (feat['dt_day_of_week_cat'] + feat['dt_half_hour_of_day_cat'] / 48) / 7      * 2 * np.pi).values
    feat.loc[:, 'dt_cos_w_real'] = np.cos(    (feat['dt_day_of_week_cat'] + feat['dt_half_hour_of_day_cat'] / 48) / 7      * 2 * np.pi).values
    feat.loc[:, 'dt_sin_d_real'] = np.sin(     feat['dt_half_hour_of_day_cat'] / 48     * 2 * np.pi).values
    feat.loc[:, 'dt_cos_d_real'] = np.cos(     feat['dt_half_hour_of_day_cat'] / 48     * 2 * np.pi).values

    #################################### shift back (future available features)  预测的未来四周数据的可能特征
    dt_cols = list(filter(lambda x: x.startswith('dt_'), feat.columns))
    for i in (1, 2, 3, 4):
        feat.loc[:, list(map(lambda x: x.replace('dt_', f'fa_{i * 7}_'), dt_cols))] = feat[dt_cols].shift(-i * 7 * 48).values

    #################################### response  fu_shift_0d_real = order_qty 实际值
    #################################### res_7_real	res_14_real	res_21_real	res_28_real
    for i in (1, 2, 3, 4):
        feat.loc[:, f'res_{i * 7}_real'] = feat['fu_shift_0d_real'].shift(-i * 7 * 48)

    # dtype
    cat_cols = list(filter(lambda x: x.endswith('_cat'), feat.columns))  # 类别特征列   52
    real_cols = list(filter(lambda x: x.endswith('_real'), feat.columns))# 数值特征列   77
    dtype_dict = {**{cat_col: int for cat_col in cat_cols}, **{real_col: float for real_col in real_cols}}
# {'dt_half_hour_of_day_cat': int,
#  'st_channel_cat': int,
#  'st_trade_area_type_cat': int,
#  'dt_weekend_cat': int,
#  'dt_day_of_week_cat': int,
#  'dt_day_of_month_cat': int,
#  'dt_month_of_year_cat': int,
#  'dt_working_day_cat': int,
#  'dt_legal_holiday_cat': int,
#  'dt_related_festival_cat': int,
#  'dt_ordinal_holiday_cat': int,
#  'dt_ordinal_festival_cat': int,
#  'fa_7_half_hour_of_day_cat': int,
#  'fa_7_weekend_cat': int,
#  'fa_7_day_of_week_cat': int,
#  'fa_7_day_of_month_cat': int,
#  'fa_7_month_of_year_cat': int,
#  'fa_7_working_day_cat': int,
#  'fa_7_legal_holiday_cat': int,
#  'fa_7_related_festival_cat': int,
#  'fa_7_ordinal_holiday_cat': int,
#  'fa_7_ordinal_festival_cat': int,
#  'fa_14_half_hour_of_day_cat': int,
#  'fa_14_weekend_cat': int,
#  'fa_14_day_of_week_cat': int,
#  'fa_14_day_of_month_cat': int,
#  'fa_14_month_of_year_cat': int,
#  'fa_14_working_day_cat': int,
#  'fa_14_legal_holiday_cat': int,
#  'fa_14_related_festival_cat': int,
#  'fa_14_ordinal_holiday_cat': int,
#  'fa_14_ordinal_festival_cat': int,
#  'fa_21_half_hour_of_day_cat': int,
#  'fa_21_weekend_cat': int,
#  'fa_21_day_of_week_cat': int,
#  'fa_21_day_of_month_cat': int,
#  'fa_21_month_of_year_cat': int,
#  'fa_21_working_day_cat': int,
#  'fa_21_legal_holiday_cat': int,
#  'fa_21_related_festival_cat': int,
#  'fa_21_ordinal_holiday_cat': int,
#  'fa_21_ordinal_festival_cat': int,
#  'fa_28_half_hour_of_day_cat': int,
#  'fa_28_weekend_cat': int,
#  'fa_28_day_of_week_cat': int,
#  'fa_28_day_of_month_cat': int,
#  'fa_28_month_of_year_cat': int,
#  'fa_28_working_day_cat': int,
#  'fa_28_legal_holiday_cat': int,
#  'fa_28_related_festival_cat': int,
#  'fa_28_ordinal_holiday_cat': int,
#  'fa_28_ordinal_festival_cat': int,
#  'fu_shift_0d_real': float,
#  'fu_shift_1hh_real': float,
#  'fu_shift_2hh_real': float,
#  'fu_shift_3hh_real': float,
#  'fu_shift_4hh_real': float,
#  'fu_shift_5hh_real': float,
#  'fu_shift_6hh_real': float,
#  'fu_shift_7hh_real': float,
#  'fu_shift_8hh_real': float,
#  'fu_shift_9hh_real': float,
#  'fu_shift_10hh_real': float,
#  'fu_shift_11hh_real': float,
#  'fu_shift_12hh_real': float,
#  'fu_shift_13hh_real': float,
#  'fu_shift_14hh_real': float,
#  'fu_shift_15hh_real': float,
#  'fu_shift_16hh_real': float,
#  'fu_shift_17hh_real': float,
#  'fu_shift_18hh_real': float,
#  'fu_shift_19hh_real': float,
#  'fu_shift_20hh_real': float,
#  'fu_shift_21hh_real': float,
#  'fu_shift_22hh_real': float,
#  'fu_shift_23hh_real': float,
#  'fu_shift_1d_real': float,
#  'fu_shift_2d_real': float,
#  'fu_shift_3d_real': float,
#  'fu_shift_4d_real': float,
#  'fu_shift_5d_real': float,
#  'fu_shift_6d_real': float,
#  'fu_shift_7d_real': float,
#  'fu_shift_14d_real': float,
#  'fu_shift_21d_real': float,
#  'fu_shift_28d_real': float,
#  'fu_shift_35d_real': float,
#  'fu_mean_3d_real': float,
#  'fu_mean_4d_real': float,
#  'fu_mean_5d_real': float,
#  'fu_mean_6d_real': float,
#  'fu_mean_7d_real': float,
#  'fu_median_3d_real': float,
#  'fu_median_4d_real': float,
#  'fu_median_5d_real': float,
#  'fu_median_6d_real': float,
#  'fu_median_7d_real': float,
#  'fu_mean_3w_real': float,
#  'fu_mean_4w_real': float,
#  'fu_mean_5w_real': float,
#  'fu_mean_6w_real': float,
#  'fu_median_3w_real': float,
#  'fu_median_4w_real': float,
#  'fu_median_5w_real': float,
#  'fu_median_6w_real': float,
#  'dt_sin_w_real': float,
#  'dt_cos_w_real': float,
#  'dt_sin_d_real': float,
#  'dt_cos_d_real': float,
#  'fa_7_sin_w_real': float,
#  'fa_7_cos_w_real': float,
#  'fa_7_sin_d_real': float,
#  'fa_7_cos_d_real': float,
#  'fa_14_sin_w_real': float,
#  'fa_14_cos_w_real': float,
#  'fa_14_sin_d_real': float,
#  'fa_14_cos_d_real': float,
#  'fa_21_sin_w_real': float,
#  'fa_21_cos_w_real': float,
#  'fa_21_sin_d_real': float,
#  'fa_21_cos_d_real': float,
#  'fa_28_sin_w_real': float,
#  'fa_28_cos_w_real': float,
#  'fa_28_sin_d_real': float,
#  'fa_28_cos_d_real': float,
#  'res_7_real': float,
#  'res_14_real': float,
#  'res_21_real': float,
#  'res_28_real': float}
    
    # 去除数据并数据转换为 dtype_dict
    return feat.dropna().astype(dtype_dict)

生成预测数据特征表 wfm_dw1.wfm_order_predicting_feat

3 模型预测

new_table = 0
for i in range(max_i):
    for j in range(max_j):
        if len(cluster_dict[(i, j)]) > 1:
            sql_query = f"select * from wfm_dw1.wfm_order_predicting_feat where global_store_number in {tuple(cluster_dict[(i, j)])}"
        else:
            sql_query = f"select * from wfm_dw1.wfm_order_predicting_feat where global_store_number == '{cluster_dict[(i, j)][0]}'"
        sdf = spark.sql(sql_query)

        #### 预测 时间长度
        #### pred_len = 7/14/21/28
        pred_len = length
        label = f'res_{pred_len}_real'
        prediction = f'res_{pred_len}_prediction_real'
        feature_name = list(filter(lambda x: x.startswith(f'fa_{pred_len}_') or x.startswith('fu_') or x.startswith('st_'), sdf.columns))

# pred_len = 7
#             feature_name = list(filter(lambda x: x.startswith(f'fa_{pred_len}_') or x.startswith('fu_') or x.startswith('st_'), sdf.columns))
# ['fu_shift_0d_real',
#  'st_channel_cat',
#  'st_trade_area_type_cat',
#  'fu_shift_1hh_real',
#  'fu_shift_2hh_real',
#  'fu_shift_3hh_real',
#  'fu_shift_4hh_real',
#  'fu_shift_5hh_real',
#  'fu_shift_6hh_real',
#  'fu_shift_7hh_real',
#  'fu_shift_8hh_real',
#  'fu_shift_9hh_real',
#  'fu_shift_10hh_real',
#  'fu_shift_11hh_real',
#  'fu_shift_12hh_real',
#  'fu_shift_13hh_real',
#  'fu_shift_14hh_real',
#  'fu_shift_15hh_real',
#  'fu_shift_16hh_real',
#  'fu_shift_17hh_real',
#  'fu_shift_18hh_real',
#  'fu_shift_19hh_real',
#  'fu_shift_20hh_real',
#  'fu_shift_21hh_real',
#  'fu_shift_22hh_real',
#  'fu_shift_23hh_real',
#  'fu_shift_1d_real',
#  'fu_shift_2d_real',
#  'fu_shift_3d_real',
#  'fu_shift_4d_real',
#  'fu_shift_5d_real',
#  'fu_shift_6d_real',
#  'fu_shift_7d_real',
#  'fu_shift_14d_real',
#  'fu_shift_21d_real',
#  'fu_shift_28d_real',
#  'fu_shift_35d_real',
#  'fu_mean_3d_real',
#  'fu_mean_4d_real',
#  'fu_mean_5d_real',
#  'fu_mean_6d_real',
#  'fu_mean_7d_real',
#  'fu_median_3d_real',
#  'fu_median_4d_real',
#  'fu_median_5d_real',
#  'fu_median_6d_real',
#  'fu_median_7d_real',
#  'fu_mean_3w_real',
#  'fu_mean_4w_real',
#  'fu_mean_5w_real',
#  'fu_mean_6w_real',
#  'fu_median_3w_real',
#  'fu_median_4w_real',
#  'fu_median_5w_real',
#  'fu_median_6w_real',
#  'fa_7_half_hour_of_day_cat',
#  'fa_7_weekend_cat',
#  'fa_7_day_of_week_cat',
#  'fa_7_day_of_month_cat',
#  'fa_7_month_of_year_cat',
#  'fa_7_working_day_cat',
#  'fa_7_legal_holiday_cat',
#  'fa_7_related_festival_cat',
#  'fa_7_ordinal_holiday_cat',
#  'fa_7_ordinal_festival_cat',
#  'fa_7_sin_w_real',
#  'fa_7_cos_w_real',
#  'fa_7_sin_d_real',
#  'fa_7_cos_d_real']

        ## 去除几个特征
        feature_name.remove(f'fa_{pred_len}_day_of_month_cat')
        feature_name.remove(f'fa_{pred_len}_month_of_year_cat')
        feature_name.remove(f'fa_{pred_len}_half_hour_of_day_cat')
        
        ## 筛选类别特征
        categorical_feature = list(filter(lambda x: x.endswith('_cat'), feature_name))

# ['st_channel_cat',
#  'st_trade_area_type_cat',
#  'fa_7_weekend_cat',
#  'fa_7_day_of_week_cat',
#  'fa_7_working_day_cat',
#  'fa_7_legal_holiday_cat',
#  'fa_7_related_festival_cat',
#  'fa_7_ordinal_holiday_cat',
#  'fa_7_ordinal_festival_cat']

        va = ft.VectorAssembler(inputCols = feature_name, outputCol = 'features', handleInvalid = "keep")
        sdf_va = va.transform(sdf)
        sdf_va_train = sdf_va.filter(sdf_va['sale_day'] >= tstart).filter(sdf_va['sale_day'] <= tend)、


        if train:
            os.system(f"hadoop fs -rm -r hdfs://ns1/user/rainbow_admin/wfm_order_predicting_model_{i}_{j}_{pred_len}_{mode}")
            if sdf_va_train.count():
                print(f'Group {i}, cluster {j}, TRAINING')
                lgb = LightGBMRegressor(
                    baggingFraction=0.8,
                    baggingFreq=10,
                    baggingSeed=3,
                    boostingType='gbdt',
                    categoricalSlotNames=categorical_feature,
                    # earlyStoppingRound=100,
                    featureFraction=0.8,
                    labelCol=label,
                    learningRate=0.1,
                    maxBin=63,
                    maxDepth=-1,
                    metric='mape',
                    minSumHessianInLeaf=0.001,
                    numIterations=100,
                    numLeaves=31,
                    objective='regression',
                    predictionCol=prediction,
                    verbosity=1
                )
                model = lgb.fit(sdf_va_train)
                model.saveNativeModel(f"wfm_order_predicting_model_{i}_{j}_{pred_len}_{mode}")
        try:
            model = LightGBMRegressionModel.loadNativeModelFromFile(f"wfm_order_predicting_model_{i}_{j}_{pred_len}_{mode}", labelColName = label, predictionColName = prediction)
        except AnalysisException:
            print(f'Group {i}, cluster {j}, PASS')
            next

        sdf_va_test = sdf_va.filter(sdf_va['sale_day'] >= pstart).filter(sdf_va['sale_day'] <= pend)        
        sdf_result = model.transform(sdf_va_test).select(['global_store_number', 'channel', 'sale_day', 'dt_half_hour_of_day_cat', label, prediction])

        table = f'wfm_order_predicting_result_{pred_len}_{mode}'
        if new_table:
            sdf_result.withColumn('group_id', func.lit(i)) \
                .withColumn('cluster_id', func.lit(j)) \
                .write.saveAsTable(
                f'wfm_dw1.{table}', format='orc',
                mode='overwrite', partitionBy=['group_id', 'cluster_id']
            )
            new_table = 0
        else:
            sdf_result.withColumn('group_id', func.lit(i)) \
                .withColumn('cluster_id', func.lit(j)) \
                .write.insertInto(f'wfm_dw1.{table}', overwrite=False)

4 生产调度

这里使用的是mmlspark-LightGBM 需要jar包,可以去官网下载

提交代码时:--conf spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1,com.microsoft.ml.lightgbm:lightgbmlib:2.3.100

# !/bin/bash

current_dir=$(cd $(dirname $0); pwd)
cd ${current_dir}

source /etc/profile
source ../global_config.sh

bash /root/update_kerberos.sh

function algorithm_python_features()
{
#         tstart=$(date -d"24 month ago ${pred_time}" +%Y%m%d)
        pstart=$(date -d"6 day ago ${pred_time}" +%Y%m%d)
        
        tend=$(date -d"1 month ago ${pred_time}" +%Y%m%d)
        tstart=$(date -d"6 month ago ${tend}" +%Y%m%d)
        
        /opt/spark-2.4.4/bin/spark-submit  \
        --master  yarn-client  \
        --executor-memory  8g  \
        --executor-cores  4  \
        --queue  root.rainbow_wfm  \
        --num-executors  4  \
        --conf  spark.memory.fraction=0.8  \
        --conf  spark.driver.maxResultSize=4g  \
        --conf  spark.sql.sources.partitionOverwriteMode=dynamic  \
        --conf  spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1,com.microsoft.ml.lightgbm:lightgbmlib:2.3.100  \
        ../dataProcess/python/jobs/driver_wfm_order_predicting_result.py --train ${1} --length ${2} --tstart ${tstart} --tend ${tend} --pstart ${pstart} --pend ${pred_time}

        echo "/opt/spark-2.4.4/bin/spark-submit  \
        --master  yarn-client  \
        --executor-memory  8g  \
        --executor-cores  4  \
        --queue  root.rainbow_wfm  \
        --num-executors  4  \
        --conf  spark.memory.fraction=0.8  \
        --conf  spark.driver.maxResultSize=4g  \
        --conf  spark.sql.sources.partitionOverwriteMode=dynamic  \
        --conf  spark.jars.packages=com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1,com.microsoft.ml.lightgbm:lightgbmlib:2.3.100  \
        /data/rainbow_wfm/sqw/wfmforecast-master/wfmforecast/drivers/driver_wfm_order_predicting_result.py --train ${1} --length ${2} --tstart ${tstart} --tend ${tend} --pstart ${pstart} --pend ${pred_time}"
}

startDate=$(date -d "${pred_start_time}" +%s)
endDate=$(date -d "${pred_time}" +%s)
##计算两个时间戳的差值除于每天86400s即为天数差
stampDiff=`expr $endDate - $startDate`
dayDiff=`expr $stampDiff / 86400`
echo $dayDiff
flag28=$(( $dayDiff % 28 ))
flag7=$(( $dayDiff % 7 ))
if [ ${flag28} = 0 ]; then
    echo "train predicting ${1} run"
    algorithm_python_features 1 ${1}
elif [ ${flag7} = 0 ]; then
    echo "predicting 0 ${1} run"
    algorithm_python_features 0 ${1}
else
    echo "${pred_time} predicting no run"
fi


# pred_start_time="20210103"
# pred_time="20210131"
# start_time="20210103"
# end_time="20210131"