带着爱和梦想去生活

Spark预测算法端到端案例

· Read in about 17 min · (3462 Words)

最近做完了一个预测算法,数据量巨大,需要分布式环境跑数据,算法本身用的是LightGBM,没什么好说的,主要是怎么用Spark每个driver端跑模型。

1.基础数据

  • 订单数据:主数据,包括渠道订单量和品类销量(day和hour)

    wfm_order_quantity_day wfm_order_quantity_half_hour

    wfm_sale_quantity_day wfm_sale_quantity_half_hour

    渠道:instore、mop(手机下单,店里取)、mod(外卖,打包) (店内、啡快、专星送) 销量预测:品类+销量 单量预测:渠道+订单量

  • 天气数据

  • 促销数据

  • 门店数据:商圈类型 城市级别

  • 商品数据:商品品类

  • 预测目标

    • 算法中除了考虑内部因子(如历史销售、市场促销、周中周末等)以外,还需纳入外部因子(如天气、节假日、季节、偶发事件等)
    • 每家店的每日平均半小时预测准确率应超过75%(1-MAPE)
    • 每家店的每月平均每日预测准确率应超过92%(1-MAPE)
    • 每家店的每日预测准确率应超过80%(1-WMAPE)

2.特征处理

  • 时间类:年月日,星期,小时等
  • 历史统计类:最大,最小,均值,方差,中位数等
  • 促销类:促销类型,粒度等
  • 节假日:工作日,节假日【传统节日,法定节日等】

3.模型加工

一般预测未来N天有三种方式,本项目预测28天:

  • 循环发 每次预测一天 带入历史特征 滚动预测下一天 性能低下
  • gap 1-28 28个模型 效率低
  • 分段 直接预测 比较合理

分7 14 21 28四个shift, 组成28天预测模型

预测1-7 采用 shift 7

预测8-14 采用 shift 14

预测15-21 采用 shift 21

预测22-28 采用 shift 28

4.模型训练过程

  • 加工一套一套特征,gap 为0

  • 特征分两类:固定特征,shift 特征,

    • 固定特征,例如:商品品类、门店等分类特征;
    • 时间特征,未来日期,代表样本当天特征;去年同期等。
    • shift特征:历史N天统计特征,最大值,最小值,均值,中位数,众数,标准差,ptp极差,偏度,封度量。
    • 代表历史最近一段时间,时序的统计特性,这些特征,根据预测未来N天,需要shift,形成样本的gap。
    • 例如:预测未来1天,需要构建 shift特征与 label–y的gap为1的样本 shift1,预测未来7天需要 shift 7
  • 分四次 shift 7 14 21 28 形成四个gap的样本,训练四个模型。 预测未来28天。

5.模型优化

  • 异常值剔除,根据历史同一个时间短,剔除异常值

  • 剔除疫情期间数据

  • MOP 由于年度之间分布不同,只用今年数据

  • 阴阳历对齐,今年是春节,去年不是春节;今年不是春节,去年是春节的需要调整成阴历对齐

  • 学习率的动态调整,动态衰减

6.分布式部署

对4000家门店预测结果,需要样本在千万级以上,需要分布式

我们采用Spark ,通过对样本分组,每个组内统一训练模型

例如:按照地区分组,按照店铺类型分组,按照城市级别分组,或者通过多个条件聚类分组,是预测分组数量预测计算资源相匹配。

这样,既能够优化性能,也能够提高预测的准确率

'global_store_number', 'channel',  'pred_date', 'half_hour','true_value', 'pred_value','w6_mean'
'global_store_number', 'category', 'pred_date', 'half_hour','true_value', 'pred_value','w6_mean'

7.代码实操

下面以 渠道–订单量预测 为例介绍下,具体实现过程:

7.1 数据预处理

# get_order_qty_hour     获取  订单量-小时
# get_calendar           获取  日历信息,节日
# get_store              获取  门店数据

# get_order_feat_hour    获取  订单特征 -按小时
# 门店列表test
valid_store_list = tuple([22529, 22530, 22531, 56836, 61967, 21526, 21527, 48168, 47659, 47660, 61483, 58434, 26186, 19536,
                     18522, 53341, 53342, 19040, 25185, 25186, 19553, 19041, 19042, 19044, 19043, 19045, 25188, 57448,
                     18038, 18039, 60536, 55930, 60546, 27780, 16517, 19592, 51854, 57486, 50318, 48274, 29843, 29844,
                     29846, 64662, 64663, 48795, 48798, 48799, 48800, 56481, 52900, 27303, 18090, 18606, 60084, 17082,
                     52936, 64714, 55507, 58067, 52957, 62176, 61153, 23783, 23784, 17129, 20202, 50923, 22252, 61675,
                     51949, 23788, 24816, 20209, 28909, 20211, 28916, 24819, 23795, 18685, 24829, 17151, 63746, 54022,
                     17680, 62742, 17177, 24346, 64796, 60190, 59679, 51488, 51487, 47395, 63787, 48428, 60205, 60207,
                     18228, 48438, 63287, 63289, 20287, 20303, 27472, 25939, 51027, 62294, 17245, 59743, 48998, 49515,
                     56685, 18806, 53624, 56700, 52103, 56712, 55691, 60300, 63897, 58268, 52642, 61861, 16293, 51637,
                     51638, 52668, 64958, 47553, 60866, 19403, 25037, 56782, 17870, 27092, 51668, 63959, 27614, 24544,
                     57316, 56805, 48102, 24552, 49132, 20976, 58867])
is_test = True

def get_store(spark):
    """
    获取门店数据
    Parameters
    ----------
    spark
    Returns
    -------

    """
    sql = """
        select * from database.wfm_store  
    """
    # print(sql)
    sparkdf = spark.sql(sql)
    return sparkdf

def get_calendar(spark):
    """
    获取日历信息,节日
    Parameters
    ----------
    spark

    Returns
    -------

    """
    sql = """
        select * from database.calendar
    """
    # print(sql)
    sparkdf = spark.sql(sql)
    return sparkdf


def get_order_qty_hour(spark, start_date, end_date):
    """
    获取  单量-小时
    Parameters
    ----------
    spark
    start_date
    end_date

    Returns
    -------

    """
    filter_cond = ''
    sql_tpl = """
        select * from database.wfm_order_channel_half_hour where sale_day>='{0}' and sale_day<='{1}' and channel <> 'scope out'  {2}
    """
    if is_test:
        filter_cond = 'and global_store_number in {}'.format(valid_store_list)
    sql = sql_tpl.format(start_date, end_date, filter_cond)
    # print(sql)
    sparkdf = spark.sql(sql)
    return sparkdf

###################################################获取特征表
def get_order_feat_hour(spark, date):
    """
    获取订单特征 -按小时
    Parameters
    ----------
    spark
    date

    Returns
    -------

    """
    sql_tpl = """
        select * from database.wfm_order_feat_h where statis_date='{}' 
    """
    sql = sql_tpl.format(date)
    # print(sql)
    sparkdf = spark.sql(sql)
    return sparkdf

7.2 Driver端代码

写表

def insert_table_by_sparkdf(sparkdf, table_name, statis_date_str,partion_num):
    """
    预测结果插入到预测结果表中
    :param spark:
    :param spark_rdd:
    :param statis_date:
    :return:
    """
    rdd = sparkdf.withColumn('statis_date', functions.lit(statis_date_str)).repartition(partion_num).write.mode("overwrite").format("orc").partitionBy('statis_date').saveAsTable("database.{}".format(table_name))
    return rdd

driver端渠道订单量特征生成

def get_order_qty_hour_data(spark, start_date_str, end_date_str):
    """
    获取订单 小时 数据
    1.过滤 scope out
    2.聚合楼层
    Parameters
    ----------
    spark
    start_date_str
    end_date_str

    Returns
    -------

    """
    order_qty_hour_sdf = get_order_qty_hour(spark, start_date_str, end_date_str)
    order_qty_hour_sdf_cut = order_qty_hour_sdf.select(['global_store_number', 'channel', 'store_floor', 'half_hour', 'order_qty', 'sale_day'])
    order_qty_hour_sdf_cut_f = order_qty_hour_sdf_cut.filter(order_qty_hour_sdf_cut['channel'] != 'scope out')
    order_qty_hour_sdf_cut_f = order_qty_hour_sdf_cut_f.withColumn("order_qty",order_qty_hour_sdf_cut_f["order_qty"].cast(IntegerType()))
    order_qty_day_sdf_acc = order_qty_hour_sdf_cut_f.groupby(['global_store_number', 'channel', 'sale_day', 'half_hour']).agg({'order_qty': 'sum'}).withColumnRenamed("sum(order_qty)", "order_qty")
    order_qty_day_sdf_acc = order_qty_day_sdf_acc.withColumn("half_hour",order_qty_day_sdf_acc["half_hour"].cast(IntegerType()))
    return order_qty_day_sdf_acc
def create_features_groupbyKey(rows, calendar_b, keys, pred_type):
    """
    pandas操作
    """
    row_list = list()
    for row in rows:
        row_list.append(row.asDict())
    print(row_list)
    
    ts_df = pd.DataFrame(row_list)
    raw_df = ts_df
    
    raw_df['sale_day'] = pd.to_datetime(raw_df['sale_day'], format='%Y%m%d')
    calendar_df = calendar_b.value    # 广播变量
    calendar_df['sale_day'] = pd.to_datetime(calendar_df['sale_day'], format='%Y%m%d')
    
    # 生成特征
    features_df = gen_features(raw_df, calendar_df, keys, predict_begin_date, end_date, predict_date_list,pred_type, spring_day_list,day_windows, week_windows)
    
    # datetime64类型转成str:y-m-d
    features_df.loc[:, features_df.select_dtypes(include=['datetime64']).columns] = features_df.loc[:,features_df.select_dtypes(include=['datetime64']).columns].apply(lambda x: x.dt.strftime('%Y%m%d'))
    
    FeatureRow = Row(*features_df.columns)
    row_list = []
    for r in features_df.values:
        row_list.append(FeatureRow(*r))
    
    return row_list
import random
import re
import sys
import os
import numpy as np
import pandas as pd
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import Row
from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType
from pyspark.sql import functions

spark_conf = SparkConf().set("spark.yarn.queue", "root.project_name")
spark = SparkSession.builder.appName("Gen_Features").config(conf=spark_conf).enableHiveSupport().getOrCreate()

"""
预测order构造特征
"""
spark_conf = SparkConf()
# 构造spark_session对象
spark = SparkSession.builder.appName("wfm lgb order make feature").config(conf=spark_conf).enableHiveSupport().getOrCreate()

random.seed(100)
np.random.seed(100)

predict_begin_date = pd.to_datetime('20200901', format='%Y%m%d')
print(predict_begin_date)

history_months = 36
start_date = predict_begin_date - pd.DateOffset(months=history_months)
print(start_date)

periods = 28

# 数据最后一天日期,用于调试历史
end_date = predict_begin_date + pd.to_timedelta(periods - 1, 'D')
print(end_date)

predict_date_list = pd.date_range(predict_begin_date, periods=periods)
print(predict_date_list)

# 特征参数
keys = ["global_store_number", "channel"]
spring_day_list = ['20160208', '20170128', '20180216', '20190205', '20200125']
week_windows = [1, 2, 3, 4, 5, 6]
day_windows = [2, 3, 5, 7, 14, 21, 28, 30, 60, 90]

# 读取广播数据
calendar_df = get_calendar(spark).toPandas()
calendar_b = spark.sparkContext.broadcast(calendar_df)
store_df = get_store(spark).toPandas()
store_b = spark.sparkContext.broadcast(store_df)

# 获取历史数据
start_date_str, end_date_str = start_date.strftime('%Y%m%d'), end_date.strftime('%Y%m%d')
predict_begin_date_str = predict_begin_date.strftime('%Y%m%d')
print(start_date_str,end_date_str,predict_begin_date_str)

order_qty_hour_sdf = get_order_qty_hour_data(spark, start_date_str, end_date_str)
print('order_qty_hour_sdf length is {}'.format(order_qty_hour_sdf.count()))

###################################
################################### 主要是这里利用了spark的分布式计算能力,其他都是利用driver端pandas进行计算 ###################################
hour_feat_rdd = order_qty_hour_sdf.rdd.map(lambda x: (x['global_store_number'], x)).groupByKey().flatMap(lambda x: create_features_groupbyKey(x[1], calendar_b, keys, pred_type='hour'))
hour_feat_sdf = spark.createDataFrame(hour_feat_rdd)

# 插入结果
insert_table_by_sparkdf(hour_feat_sdf, table_name='wfm_order_feat_h', statis_date_str=predict_begin_date_str,partion_num=100)
print("features_rdd done!")
spark.stop()

gen_features生成特征函数

由于功能复杂,现举几个例子:

(1)优化空间存储和内存占用

import itertools
from functools import partial

import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder

import datetime
import datetime as dt
from dateutil.relativedelta import relativedelta

import random
from scipy.stats import hmean
# from pykalman import KalmanFilter
from tqdm import tqdm


# 减少内存--类型
class Data_Preprocess:
    def __init__(self):
        self.int8_max = np.iinfo(np.int8).max
        self.int8_min = np.iinfo(np.int8).min

        self.int16_max = np.iinfo(np.int16).max
        self.int16_min = np.iinfo(np.int16).min

        self.int32_max = np.iinfo(np.int32).max
        self.int32_min = np.iinfo(np.int32).min

        self.int64_max = np.iinfo(np.int64).max
        self.int64_min = np.iinfo(np.int64).min

        self.float16_max = np.finfo(np.float16).max
        self.float16_min = np.finfo(np.float16).min

        self.float32_max = np.finfo(np.float32).max
        self.float32_min = np.finfo(np.float32).min

        self.float64_max = np.finfo(np.float64).max
        self.float64_min = np.finfo(np.float64).min

    '''
    function: _get_type(self,min_val, max_val, types)

       get the correct types that our columns can trans to

    '''

    def _get_type(self, min_val, max_val, types):
        if types == 'int':
            if max_val <= self.int8_max and min_val >= self.int8_min:
                return np.int8
            elif max_val <= self.int16_max <= max_val and min_val >= self.int16_min:
                return np.int16
            elif max_val <= self.int32_max and min_val >= self.int32_min:
                return np.int32
            return None

        elif types == 'float':
            if max_val <= self.float16_max and min_val >= self.float16_min:
                return np.float16
            if max_val <= self.float32_max and min_val >= self.float32_min:
                return np.float32
            if max_val <= self.float64_max and min_val >= self.float64_min:
                return np.float64
            return None

    '''

    function: _memory_process(self,df) 
       column data types trans, to save more memory
    '''

    def _memory_process(self, df):
        init_memory = df.memory_usage().sum() / 1024 ** 2 / 1024
        print('Original data occupies {} GB memory.'.format(init_memory))
        df_cols = df.columns

        for col in tqdm(df_cols):
            try:
                if 'float' in str(df[col].dtypes):
                    pass
                    # max_val = df[col].max()
                    # min_val = df[col].min()
                    # trans_types = self._get_type(min_val, max_val, 'float')
                    # if trans_types is not None:
                    #     df[col] = df[col].astype(trans_types)
                elif 'int' in str(df[col].dtypes):
                    max_val = df[col].max()
                    min_val = df[col].min()
                    trans_types = self._get_type(min_val, max_val, 'int')
                    if trans_types is not None:
                        df[col] = df[col].astype(trans_types)
            except:
                print(' Can not do any process for column, {}.'.format(col))
        afterprocess_memory = df.memory_usage().sum() / 1024 ** 2 / 1024
        print('After processing, the data occupies {} GB memory.'.format(afterprocess_memory))
        return df
# 减少内存--函数型
def reduce_mem(df):
    starttime = time.time()
    numerics = ['int16', 'int32', 'int64', 'float16', 'float32', 'float64']
    start_mem = df.memory_usage().sum() / 1024**2
    for col in df.columns:
        col_type = df[col].dtypes
        if col_type in numerics:
            c_min = df[col].min()
            c_max = df[col].max()
            if pd.isnull(c_min) or pd.isnull(c_max):
                continue
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max < np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)
            else:
                if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
    end_mem = df.memory_usage().sum() / 1024**2
    print('-- Mem. usage decreased to {:5.2f} Mb ({:.1f}% reduction),time spend:{:2.2f} min'.format(end_mem,100*(start_mem-end_mem)/start_mem,(time.time()-starttime)/60))
    return df
# 删除不需要的变量,释放内存
import gc
# Python中,为了解决内存泄露问题,采用了对象引用计数,并基于引用计数实现自动垃圾回收
# 没有__del__()函数的对象间的循环引用是可以被垃圾回收器回收掉的

# 但是对于  有 __del__() 函数的对象间的循环引用会导致内存泄漏
# gc.collect() 可以返回处理这些循环引用一共释放掉的对象个数
del train_data, test_data, user_info
gc.collect()

(2)阴阳历转换

用了 https://github.com/lidaobing/python-lunardate

(3)工具类函数

def add_shift_feat(feature_df, windows, grp_keys, period_type):
    """
    添加 shift特征,改成原生聚合函数
    天粒度:
        1.按照 keys 滑动,也就是按天滑动
    小时粒度:
        2.按照 keys 滑动,也就是按小时滑动  注意**:因为没有实时数据,不能使用这个特征
        3.按照 keys+['half_hour'] 滑动,也就不同的小时,按照天滑动
    注意:暂时没有两种粒度的特征
    :param feature_df:
    :return:
    """
    min_periods = 1
    # 不同天的同一小时shift
    if grp_keys[-1] != 'half_hour':
        shift_value = 'd'
    else:
        shift_value = 'h'
    if period_type == 'week':
        grp_keys = grp_keys + ['dayofweek']
    dl = list(range(len(grp_keys)))
    gs = feature_df.groupby(grp_keys)['target_value']
    for w in windows:
        print('add_shift_feat windows {} week'.format(w))
        w_columns = ['target_value_{}_{}_{}_by{}'.format(f, w, shift_value, period_type) for f in
                     ['max', 'min', 'mean', 'median', 'std', 'ptp', 'kurt', 'skew']]
        f1 = gs.rolling(window=w, min_periods=min_periods).max()
        f1.index = f1.index.droplevel(dl)
        feature_df.loc[:, w_columns[0]] = f1.astype(np.float32)
        # f2
        f2 = gs.rolling(window=w, min_periods=min_periods).min()
        f2.index = f2.index.droplevel(dl)
        feature_df.loc[:, w_columns[1]] = f2.astype(np.float32)
        # f3  bug  mean写成了 max
        f3 = gs.rolling(window=w, min_periods=min_periods).mean()
        f3.index = f3.index.droplevel(dl)
        feature_df.loc[:, w_columns[2]] = f3.astype(np.float32)
        # f4
        f4 = gs.rolling(window=w, min_periods=min_periods).median()
        f4.index = f4.index.droplevel(dl)
        feature_df.loc[:, w_columns[3]] = f4.astype(np.float32)
        # f5
        f5 = gs.rolling(window=w, min_periods=min_periods).std()
        f5.index = f5.index.droplevel(dl)
        feature_df.loc[:, w_columns[4]] = f5.astype(np.float32)
        # ptp
        feature_df.loc[:, w_columns[5]] = feature_df.loc[:, w_columns[0]] - feature_df.loc[:, w_columns[1]]

    return feature_df


def add_other_feat(feature_df, grp_keys, period_type):
    """
    添加 环比特征
    Parameters
    ----------
    feature_df
    windows
    grp_keys
    period_type

    Returns
    -------

    """
    if period_type == 'byweek':
        grp_keys = grp_keys + ['dayofweek']
    gs = feature_df.groupby(grp_keys)['target_value']
    # 添加环比特征
    fname6 = 'target_value_pct_chg'
    f6 = gs.shift(1).pct_change()
    # 那些为0的置为None
    f6.loc[f6 == 0] = None
    feature_df.loc[:, fname6] = f6.astype(np.float32)
    return feature_df


def add_holiday_feat(feature_df, calendar_df):
    """
    添加节假日特征
    包括春节
    Parameters
    ----------
    feature_df
    calendar_df

    Returns
    -------

    """
    feature_df = pd.merge(feature_df, calendar_df, on=['sale_day'], how='left')
    feature_df['holiday'] = feature_df['holiday'].fillna('h0000')
    labelencoder = LabelEncoder()
    feature_df['holiday_label'] = labelencoder.fit_transform(feature_df['holiday'])
    return feature_df


def filter_outliers(feature_df, grp_keys, predict_begin_date):
    """
    平滑异常值
    Parameters
    ----------
    feature_df
    predict_begin_date

    Returns
    -------

    """
    grp_keys = grp_keys + ['dayofweek']
    dl = list(range(len(grp_keys)))
    feature_df['md'] = feature_df.groupby(grp_keys)['target_value'].rolling(window=5, min_periods=1).median().droplevel(
        dl)
    cond = (feature_df['target_value'] > feature_df['md'] * 2.5) & (feature_df['sale_day'] < predict_begin_date)
    feature_df.loc[cond, 'target_value'] = feature_df.loc[cond, 'md'] * 2.5
    return feature_df


def gen_features(raw_df, calendar_df, keys, predict_begin_date, end_date, predict_date_list, pred_type, spring_day_list,
                 day_windows, week_windows):
    """
    生成特征
    :param raw_df:
    :param predict_date_list:
    :return:
    """
    ts_diff = (end_date - predict_begin_date).days
    feat_conf = {}
    feat_conf['lastyear_feat'] = True

    if 'order_qty' in raw_df:
        raw_df.rename(columns={'order_qty': 'target_value'}, inplace=True)
    if 'sale_qty' in raw_df:
        raw_df.rename(columns={'sale_qty': 'target_value'}, inplace=True)

    # 添加统计指标,为了过滤数据
    df, statis_columns = add_statis_columns(raw_df, keys, pred_type, predict_begin_date)
    
    # 只预测平均销量>10的,超过500天的。没有中间为0的,如果有,得去掉
    df_filter = filter_df(df, ts_diff)
    
    # 防止后续异常,如果过滤成空的,后面会报错
    if len(df_filter) == 0:
        return pd.DataFrame()
    if pred_type == 'day':
        # cut_columns = keys + ['sale_day', 'target_value']
        # 填充缺失值
        fill_df = fill_empty_df(df_filter, keys, fill_group_day, end_date)
        # 构造特征
        # feature_df = fill_df
        # 添加未来预测天的样本
        expanded_df = add_predict_data_day(fill_df, predict_date_list, keys)
    else:
        # cut_columns = keys + ['sale_day', 'half_hour', 'target_value']
        fill_df = fill_empty_df(df_filter, keys, fill_group_hour, end_date)
        # 构造特征
        # feature_df = fill_df
        # 添加未来预测天的样本
        expanded_df = add_predict_data_hour(fill_df, predict_date_list, keys)
    
    # 添加日期等公共特征
    feature_df = add_common_feat(expanded_df)
    
    # 添加春节标记;去年是春节标记
    fes_df = gen_spring_day_df(spring_day_list, before_days=7, after_days=15)
    feature_df = add_spring_label(feature_df, fes_df)
    
    # 添加去年同期特征
    if feat_conf['lastyear_feat']:
        # 添加阳历同期
        feature_df = add_solar_lastyear_feat(feature_df, keys, pred_type)
        # 添加阴历同期:太慢去掉
        feature_df = add_lunar_lastyear_feat(feature_df, keys, pred_type)
        # 调整春节期间 lastyear_vb_num
        feature_df = adjust_lastyear_feature(feature_df)
    
    # 添加促销类型标记
    # promotion_df = gen_promotion_df()
    # feature_df = add_promotion_label(feature_df, promotion_df)
    
    # 添加节假日特征
    feature_df = add_holiday_feat(feature_df, calendar_df)
    
    # 预测特征中的label  置0
    # feature_df['target_value'].fillna(0, inplace=True)
    # feature_df['{}_s2254'.format('target_value')] = feature_df.groupby(keys)['vb_num'].apply(smooth)
    if pred_type == 'day':
        grp_keys = keys
    else:
        grp_keys = keys + ['half_hour']
    
    # 添加 异常值平滑
    # feature_df = filter_outliers(feature_df, grp_keys, predict_begin_date)
    feature_df = add_shift_feat(feature_df, day_windows, grp_keys, period_type='day')
    feature_df = add_shift_feat(feature_df, week_windows, grp_keys, period_type='week')
    feature_df = add_other_feat(feature_df, grp_keys, period_type='week')
    
    # 添加日期正弦 余弦
    feature_df['ws'] = np.sin(feature_df['dayofweek'])
    feature_df['wc'] = np.cos(feature_df['dayofweek'])
    
    # 添加天特征
    # if pred_type == 'hour' and feat_conf.add_day_feat:
    #     feature_df = add_shift_feat(feature_df, day_windows, grp_keys, period_type='week')
    return feature_df

7.3 预测订单量模型训练和预测

import lightgbm as lgb
import numpy as np
import pandas as pd
import re

gbdt_params = {'task': 'train',
               'boosting_type': 'gbdt',
               'objective': 'regression',
               'metric': {'mape'},
               'num_leaves': 64,
               'min_child_samples': 2,
               'learning_rate': 0.03,
               'feature_fraction': 1,
               'bagging_fraction': 1,
               'bagging_freq': 3,
               # 'nthread': 20,
               'num_threads': 1,
               'seed': 2018,
               'bagging_seed': 2018,
               'feature_fraction_seed': 2018,
               'verbose': 1,
               'max_bin': 128
               }


def error_weighted_mapd_lgb(preds, dtrain):
    """

    :param preds:
    :param dtrain:
    :return:
    """
    labels = dtrain.get_label()
    return 'wmapd', compute_mapd(labels, preds), False


def mapd(r, p):
    """
    如果分母为零 返回 分子(误差)
    :param r:
    :param p:
    :return:
    """
    return r > 0 and abs(p - r) / r or abs(p - r)


def mapd_list(rl, pl):
    """
    如果分母为零 返回 分子(误差)
    :param r:
    :param p:
    :return:
    """
    mmape = np.mean([mapd(r, p) for r, p in zip(rl, pl)])
    return mmape


def create_features_by_gap(feature_df, gap, fix_features, shift_features, keys, pred_type):
    """
    构造shift特征
    Parameters
    ----------
    feature_df
    gap
    fix_features
    shift_features

    Returns
    -------

    """
    # if pred_type == 'day':
    #     grp_keys = keys
    #     # time_columns = ['sale_day']
    #     # merge_key =grp_keys+['sale_day']
    # else:
    #     grp_keys = keys + ['half_hour']
    #     # time_columns = ['sale_day', 'half_hour']

    grp_keys = keys
    if pred_type != 'day':
        grp_keys = keys + ['half_hour']
    merge_key = grp_keys + ['sale_day']
    
    fix_df = feature_df[grp_keys + ['sale_day'] + fix_features + ['target_value']]
    shift_featues_df = feature_df[grp_keys + ['sale_day'] + shift_features]

    # 把日期改成gap的天,会报错,慢
    # shift_featues_df['sale_day'] = shift_featues_df.groupby(grp_keys)['sale_day'].apply(lambda x: x.shift(-gap))
    shift_featues_df['sale_day'] = shift_featues_df['sale_day'] + pd.DateOffset(days=gap)
    
    # 也就是让shift往后挪动
    feature_gap_df = pd.merge(fix_df, shift_featues_df, on=merge_key, how='left')
    
    return feature_gap_df


def train_model(train_data, valid_data, features, target, category_columns, error_func):
    """
    训练模型
    Parameters
    ----------
    train_data
    valid_data
    features
    target
    category_columns
    error_func

    Returns
    -------

    """
    # 构造训练数据
    dTrain = lgb.Dataset(train_data.loc[:, features], train_data.loc[:, target], categorical_feature=category_columns)
    dValid = lgb.Dataset(valid_data.loc[:, features], label=valid_data.loc[:, target],categorical_feature=category_columns)

    # 训练参数 modify by yxm
    # model = lgb.train(gbdt_params, dTrain, early_stopping_rounds=300, num_boost_round=4000, verbose_eval=50, valid_sets=dValid, feval=error_func)
    try:
        model = lgb.train(gbdt_params, dTrain, early_stopping_rounds=100, num_boost_round=1000, verbose_eval=50,valid_sets=dValid, feval=error_func)
    except:
        print('*************************train models abced***')
        print('dTrain.data.head is {} '.format(dTrain.data.head()))
        print('dTrain.data.dtypes is {}'.format(dTrain.data.dtypes))
        print('dValid.data.head is {}'.format(dValid.data.head()))
        raise ValueError('input error!')
    return model


def make_train(feature_gap_df, features, category_columns, target, predict_begin_date, predict_end_date,train_end_date):
    """

    :param feature_gap_df:
    :param features:
    :param target:
    :param predict_date:
    :return:
    """
    # end_date =pd.to_datetime(predict_date)-pd.DateOffset(days=1)
    # bug ?? isnull
    # end_date = feature_gap_df.loc[feature_gap_df[target] > 0, 'sale_day'].max()
    end_date = feature_gap_df.loc[feature_gap_df[target].notnull(), 'sale_day'].max()
    print('******************************max date{}'.format(end_date))
    
    # if end_date >= predict_begin_date:
    #     end_date = pd.to_datetime(predict_begin_date) - pd.DateOffset(days=1)
    if end_date >= train_end_date:
        end_date = pd.to_datetime(train_end_date) - pd.DateOffset(days=7)
    print('******************************valid date{}'.format(end_date))
    
    cond = (feature_gap_df[target].notnull()) & (feature_gap_df['sale_day'] <= end_date)
    # 去掉 lastyear为空的
    # lastyear_column = 'lastyear_target_value'
    # if lastyear_column in feature_gap_df:
    #     cond = cond & feature_gap_df[lastyear_column].notnull()
    train_data = feature_gap_df.loc[cond]
    
    # 最后一天作为验证集,与train data 重合,验证集不影响训练
    valid_data = feature_gap_df.loc[(feature_gap_df['sale_day'] == end_date)]
    pred_data = feature_gap_df.loc[
        (feature_gap_df['sale_day'] >= predict_begin_date) & (feature_gap_df['sale_day'] < predict_end_date)]
    model = None
    try:
        model = train_model(train_data, valid_data, features, target, category_columns, error_func=error_weighted_mapd_lgb)
    except:
        print('train_data is {}'.format(train_data.head()))
        print('feature_gap_df {}'.format(feature_gap_df['global_store_number'].iloc[0]))
    return model


def make_predict(model, feature_gap_df, features, category_columns, target, predict_begin_date,predict_end_date,train_end_date):
    """

    :param feature_gap_df:
    :param features:
    :param target:
    :param predict_date:
    :return:
    """
    # end_date =pd.to_datetime(predict_date)-pd.DateOffset(days=1)
    # bug ?? isnull
    # end_date = feature_gap_df.loc[feature_gap_df[target] > 0, 'sale_day'].max()
    end_date = feature_gap_df.loc[feature_gap_df[target].notnull(), 'sale_day'].max()
    print('******************************max date{}'.format(end_date))
    
    # if end_date >= predict_begin_date:
    #     end_date = pd.to_datetime(predict_begin_date) - pd.DateOffset(days=1)
    if end_date >= train_end_date:
        end_date = pd.to_datetime(train_end_date) - pd.DateOffset(days=7)
    print('******************************valid date{}'.format(end_date))
    cond = (feature_gap_df[target].notnull()) & (feature_gap_df['sale_day'] <= end_date)
    
    # 去掉 lastyear为空的
    # lastyear_column = 'lastyear_target_value'
    # if lastyear_column in feature_gap_df:
    #     cond = cond & feature_gap_df[lastyear_column].notnull()
    train_data = feature_gap_df.loc[cond]
    
    # 最后一天作为验证集,与train data 重合,验证集不影响训练
    valid_data = feature_gap_df.loc[(feature_gap_df['sale_day'] == end_date)]
    pred_data = feature_gap_df.loc[(feature_gap_df['sale_day'] >= predict_begin_date) & (feature_gap_df['sale_day'] < predict_end_date)]
    preds, clip_preds = None, None
    
    if len(pred_data.loc[:, features]) > 0:
        preds = model.predict(pred_data.loc[:, features], num_iteration=model.best_iteration)
        # 归零处理
        clip_preds = np.array([max(0.0, p) for p in preds])
        # 训练集mapd
        # mapd = compute_mapd(pred_data.loc[:, target], clip_preds)
        # mmapd = mapd_list(pred_data.loc[:, target].tolist(), clip_preds.tolist())
        # print('mean wmapd:{}'.format(np.mean(mapd)))
        # print('mean mape :{}'.format(mmapd))
    return train_data, pred_data, preds, clip_preds


def cmp_acc(valid_data_pred_df, predict_date):
    """
    计算准确度
        1.	每家店的每日平均半小时预测准确率应超过75%(1-MAPE)
        2.	每家店的每月平均每日预测准确率应超过92%(1-MAPE)
        3.	每家店的每日预测准确率应超过80%(1-WMAPE)
    :param valid_data_pred_df:
    :return:
    """
    # 'statis_date', 's_werks', 'zsize'
    pred = valid_data_pred_df[valid_data_pred_df['pred_date'] == predict_date]
    pred['mape1'] = 1 - np.abs(pred['target_value'] - pred['pred']) / pred['target_value']

    return pred


def do_train(feature_df, predict_date_list, gap_list, features, category_columns, fix_features, shift_features, keys, pred_type):
    """
    多天预测入口
    :param feature_df:
    :param predict_date_list:
    :return:
    """
    valid_data_list = []
    if pred_type == 'day':
        time_columns = ['sale_day']
    else:
        time_columns = ['sale_day', 'half_hour']

    # 分成四组
    train_end_date = predict_date_list[0]
    models = {}
    for gap, predict_date in zip(gap_list, predict_date_list):
        print('************************************predict_date:{} gap: {}'.format(predict_date, gap))

        feature_gap_df = create_features_by_gap(feature_df, gap, fix_features, shift_features, keys, pred_type)
        predict_begin_date = predict_date
        predict_end_date = predict_date + pd.DateOffset(days=7)
        # 一次取出7天
        model = make_train(feature_gap_df, features, category_columns,
                             target='target_value',
                             predict_begin_date=predict_begin_date,
                             predict_end_date=predict_end_date,
                             train_end_date=train_end_date
                            )
        models[gap] = model
    return models



def do_predict(models, feature_df, predict_date_list, gap_list, features, category_columns, fix_features, shift_features,
               keys, pred_type):
    """
    多天预测入口
    :param feature_df:
    :param predict_date_list:
    :return:
    """
    valid_data_list = []
    if pred_type == 'day':
        time_columns = ['sale_day']
    else:
        time_columns = ['sale_day', 'half_hour']

    # 分成四组
    train_end_date = predict_date_list[0]
    for gap, predict_date in zip(gap_list, predict_date_list):
        print('************************************predict_date:{} gap: {}'.format(predict_date, gap))

        feature_gap_df = create_features_by_gap(feature_df, gap, fix_features, shift_features, keys, pred_type)
        predict_begin_date = predict_date
        predict_end_date = predict_date + pd.DateOffset(days=7)
        # 一次取出7天
        train_data, valid_data, preds, clip_preds = make_predict(models[gap], feature_gap_df, features, category_columns,
                                                                 target='target_value',
                                                                 predict_begin_date=predict_begin_date,
                                                                 predict_end_date=predict_end_date,
                                                                 train_end_date=train_end_date
                                                                )
        
        valid_data_cut = valid_data[keys + time_columns + ['target_value']]
        # train_data['pred{}'.format(gap)] = model.predict(train_data.loc[:, features],
        valid_data_cut['pred'] = clip_preds
        valid_data_list.append(valid_data_cut)
        # 预热期处理结束
    
    valid_data_pred_df = pd.concat(valid_data_list, axis=0).reset_index(drop=True)
    valid_data_pred_df = valid_data_pred_df.sort_values(by=keys + time_columns)

    valid_data_pred_df.rename(columns={'sale_day': 'pred_date'}, inplace=True)
    valid_data_pred_df['pred_date'] = valid_data_pred_df['pred_date'].dt.strftime('%Y%m%d')

    return valid_data_pred_df

def get_hour_feat_data(spark, predict_begin_date_str):
    """
    过滤 订单-小时 数据
    cond1=(g['sale_day']>pd.to_datetime('2019-06-01'))&(g['sale_day']<pd.to_datetime('2019-11-01'))
    cond2=(g['sale_day']>pd.to_datetime('2020-06-01'))
    cond3=(g['channel']=='In-Store')
    cond4=(g['channel']!='In-Store')
    cond=((cond1|cond2)&cond3)|(cond2&cond4)
    :param spark:
    :param predict_begin_date_str:
    :return:
    """
    order_hour_feat_sdf = get_order_feat_hour(spark, predict_begin_date_str)
    
    # 去年同期
    cond1 = " sale_day >'20190601'  and sale_day< '20191101' "
    
    # 今年6月后
    cond2 = " sale_day >'20200601' "
    cond3 = " channel =='In-Store'"
    cond4 = " channel !='In-Store'"
    
    cond = "((({0}) or  {1}) and  {2}) or ({1} and {3})".format(cond1, cond2, cond3, cond4)
    order_hour_feat_sdf_f = order_hour_feat_sdf.filter(cond)
    return order_hour_feat_sdf_f


def make_and_train(store, rows, shift_features, category_columns, features, keys, pred_type):
    """
    预测结果
    :param feature_df:
    :return:
    """
    row_list = list()
    for row in rows:
        row_list.append(row.asDict())
    # print(row_list)
    feature_df = pd.DataFrame(row_list)
    feature_df['sale_day'] = pd.to_datetime(feature_df['sale_day'], format='%Y%m%d')
    
    models = do_train(            feature_df, predict_date_list, gap_list, features, category_columns, fix_features, shift_features, keys, pred_type)
    return store, models


def make_and_predict(models, rows, shift_features, category_columns, features, keys, pred_type):
    """
    预测结果
    :param feature_df:
    :return:
    """
    row_list = list()
    for row in rows:
        row_list.append(row.asDict())
    # print(row_list)
    feature_df = pd.DataFrame(row_list)
    feature_df['sale_day'] = pd.to_datetime(feature_df['sale_day'], format='%Y%m%d')
    
    result_df = do_predict(models,feature_df, predict_date_list, gap_list, features, category_columns, fix_features, shift_features, keys, pred_type)
    
    datetime_cls = result_df.select_dtypes(include=['datetime64']).columns
    result_df.loc[:, datetime_cls] = result_df.loc[:, datetime_cls].apply(lambda x: x.dt.strftime('%Y%m%d'))
    ResultRow = Row(*result_df.columns)
    
    row_list = []
    for r in result_df.values:
        row_list.append(ResultRow(*r))
    
    return row_list
"""
作业量预测 lightgbm
"""
spark_conf = SparkConf()
spark = SparkSession.builder.appName("wfm lgb order train and predict").config(conf=spark_conf).enableHiveSupport().getOrCreate()

# 确定随机数种子
random.seed(100)
np.random.seed(100)

keys = ["global_store_number", "channel"]

predict_begin_date_str = '20210201'
periods = 28
predict_date_list = pd.date_range(predict_begin_date_str, periods=4, freq='7D')
gap_list = [i + 7 for i in range(0, periods, 7)]
print("predict_date_list:", predict_date_list)

# 获取特征数据
order_hour_feat_sdf = get_hour_feat_data(spark, predict_begin_date_str)
print('order_hour_feat_sdf length is {}'.format(order_hour_feat_sdf.count()))

# 定义特征名字
fix_features = [
    'month',
    'dayofweek',
    'day',
    'is_work_day',
    'holiday_label',
    # 'open',
    # 'close',
    'lastyear_target_value',
    'ws',
    'wc'
]

# hour names
shift_features_h = [c for c in order_hour_feat_sdf.columns if re.match(r'^target_value.+$', c)]
print('shift features columns hour is {}'.format(shift_features_h))

features_h = fix_features + shift_features_h + ['half_hour']
category_columns = ['month', 'day', 'is_work_day', 'holiday_label']

# 进行预测
retrain = 1

if retrain:
    hour_models = order_hour_feat_sdf.rdd.map(lambda x: (x['global_store_number'], x)).groupByKey().map(lambda x: make_and_train(x[0], x[1], shift_features_h, category_columns, features_h, keys, 'hour'))
    # hour_models 是 rdd训练好的模型保存到HDFS
    os.system('hadoop fs -rm -r hdfs://ns1/user/admin/order_hour_models.pkl')
    hour_models.saveAsPickleFile('order_hour_models.pkl')
else:
    # 直接用训练好的模型来预测数据
    hour_models = spark.sparkContext.pickleFile('order_hour_models.pkl')
    hour_result_rdd = order_hour_feat_sdf.rdd.map(lambda x: (x['global_store_number'], x)).groupByKey().join(hour_models).flatMap(lambda x: make_and_predict(x[1][1], x[1][0], shift_features_h, category_columns, features_h, keys, 'hour'))
    hour_result_sdf = spark.createDataFrame(hour_result_rdd)
    
    # 预测数据写表
    table = 'wfm_order_predict_h'
    insert_table_by_sparkdf(hour_result_sdf, table_name='test_order_sale_train_predict', statis_date_str=predict_begin_date_str,partion_num=100)    

print("train and predict done!")
# 训练结果的模型保存HDFS
[root@ce60148 ~]$ hdfs dfs -ls /user/admin
drwxr-xr-x   - admin       supergroup                   0 2021-03-14 07:44 /user/admin/order_hour_models.pkl
drwxr-xr-x   - admin       supergroup                   0 2021-03-21 18:42 /user/admin/order_hour_models_0.pkl
drwxr-xr-x   - admin       supergroup                   0 2021-03-21 18:45 /user/admin/order_hour_models_1.pkl
drwxr-xr-x   - admin       supergroup                   0 2021-03-21 19:13 /user/admin/order_hour_models_10.pkl

# 预测结果存表
[root@ce60148 ~]$ hdfs dfs -ls /user/hive/warehouse/database.db/wfm_order_predict_h
Found 2 items
-rw-r--r--   3 admin hive          0 2021-03-22 10:31 /user/hive/warehouse/database.db/wfm_order_predict_h/_SUCCESS
drwxr-xr-x   - admin hive          0 2021-03-22 10:31 /user/hive/warehouse/database.db/wfm_order_predict_h/statis_date=20210321
[root@ce60148 ~]$ hdfs dfs -ls /user/hive/warehouse/database.db/wfm_order_predict_h/statis_date=20210321
Found 32 items
drwxr-xr-x   - admin hive          0 2021-03-22 09:53 /user/hive/warehouse/database.db/wfm_order_predict_h/statis_date=20210321/batch=0
drwxr-xr-x   - admin hive          0 2021-03-22 09:54 /user/hive/warehouse/database.db/wfm_order_predict_h/statis_date=20210321/batch=1
drwxr-xr-x   - admin hive          0 2021-03-22 10:05 

[root@ce60148 ~]$ hdfs dfs -ls /user/hive/warehouse/database.db/wfm_order_predict_h/statis_date=20210321/batch=1
Found 123 items
-rw-r--r--   3 admin hive      22271 2021-03-22 09:54 /user/hive/warehouse/database.db/wfm_order_predict_h/statis_date=20210321/batch=1/part-00000-aebab4c0-5c9c-4e66-af20-123b4fad21e2.c000.snappy.orc
-rw-r--r--   3 admin hive      26636 2021-03-22 09:54 /user/hive/warehouse/database.db/wfm_order_predict_h/statis_date=20210321/batch=1/part-00002-aebab4c0-5c9c-4e66-af20-123b4fad21e2.c000.snappy.orc

7.4 模型部署工程化

特征生成

################################################################################
# Function : algorithm_python_features
# Author : ydzhao
# Created Time : 2021-01-21
# Input table :
# Output table :
# Comment :7天一跑
################################################################################
# !/bin/bash
current_dir=$(cd $(dirname $0); pwd)
cd ${current_dir}

source /etc/profile
source ../global_config.sh

bash /root/update_kerberos.sh

function algorithm_python_features()
{
    /opt/spark-2.4.4/bin/spark-submit  \
	--master  yarn-client  \
    --executor-cores  1  \
    --num-executors  8  \
    --executor-memory  10g  \
    --conf  spark.memory.fraction=0.8  \
    --conf  spark.driver.maxResultSize=4g  \
	--queue root.project_name  \
	--py-files  ../algorithm/algorithm.zip \
	../algorithm/wfmforecast/drivers/$1  $pred_time 28

		if [[ $? -ne 0 ]]; then
			echo "--> execute algorithm_python_features failed!"
		exit 1
		fi
}

startDate=$(date -d "${pred_start_time}" +%s)
endDate=$(date -d "${pred_time}" +%s)
##计算两个时间戳的差值除于每天86400s即为天数差
stampDiff=`expr $endDate - $startDate`
dayDiff=`expr $stampDiff / 86400`
echo $dayDiff
flag=$(( $dayDiff % 7 ))
if [ ${flag} = 0 ]; then
    echo "algorithm_python_features run"
    algorithm_python_features $1
fi

模型训练和预测

################################################################################
# Function : algorithm_python_train
# Author : ydzhao
# Created Time : 2021-01-21
# Input table :
# Output table :
# Comment :
################################################################################
# !/bin/bash
current_dir=$(cd $(dirname $0); pwd)
cd ${current_dir}

source /etc/profile
source ../global_config.sh

bash /root/update_kerberos.sh

function algorithm_python_train()
{
    /opt/spark-2.4.4/bin/spark-submit  \
	--master  yarn-client  \
    --executor-cores  1  \
    --num-executors  8  \
    --executor-memory  10g  \
    --conf  spark.memory.fraction=0.8  \
    --conf  spark.driver.maxResultSize=4g  \
	--queue root.project_name  \
	--py-files  ../algorithm/algorithm.zip \
	../algorithm/wfmforecast/drivers/$1  $pred_time 28 1

		if [[ $? -ne 0 ]]; then
			echo "--> execute algorithm_python_train failed!"
		exit 1
		fi
}
startDate=$(date -d "${pred_start_time}" +%s)
endDate=$(date -d "${pred_time}" +%s)
##计算两个时间戳的差值除于每天86400s即为天数差
stampDiff=`expr $endDate - $startDate`
dayDiff=`expr $stampDiff / 86400`
echo $dayDiff
flag=$(( $dayDiff % 28 ))
if [ ${flag} = 0 ]; then
    echo "algorithm_python_train run"
    algorithm_python_train $1
fi
################################################################################
# Function : algorithm_python_predict
# Author : ydzhao
# Created Time : 2021-01-21
# Input table :
# Output table :
# Comment :
################################################################################
# !/bin/bash
current_dir=$(cd $(dirname $0); pwd)
cd ${current_dir}

source /etc/profile
source ../global_config.sh

bash /root/update_kerberos.sh

function algorithm_python_predict()
{
    /opt/spark-2.4.4/bin/spark-submit  \
	--master  yarn-client  \
    --executor-cores  1  \
    --num-executors  8  \
    --executor-memory  10g  \
    --conf  spark.memory.fraction=0.8  \
    --conf  spark.driver.maxResultSize=4g  \
	--queue root.project_name  \
	--py-files  ../algorithm/algorithm.zip \
	../algorithm/wfmforecast/drivers/$1  $pred_time 28 0

		if [[ $? -ne 0 ]]; then
			echo "--> execute algorithm_python_predict failed!"
		exit 1
		fi
}

startDate=$(date -d "${pred_start_time}" +%s)
endDate=$(date -d "${pred_time}" +%s)
##计算两个时间戳的差值除于每天86400s即为天数差
stampDiff=`expr $endDate - $startDate`
dayDiff=`expr $stampDiff / 86400`
echo $dayDiff
flag=$(( $dayDiff % 7 ))
if [ ${flag} = 0 ]; then
    echo "algorithm_python_predict run"
    algorithm_python_predict $1
fi

Azkaban调度任务

由于隐私,这里不做具体展示,需要根据上游数据生成时间和频率,将算法模块放入依赖项之后,目前的频率是每周训练预测一次。。。