带着爱和梦想去生活

Spark分布式计算

· Read in about 5 min · (934 Words)

[项目经验总结]

从事数据相关工作,最喜欢用的工具就是基于Pandas、Jupyter Lab等工具,拿到样本数据,单机上快速迭代试验验证想法,这确实很方便,但是等到模型部署上线的时候,数据量很大,很难单机就搞定,目前主流的做法是用Spark分布式计算解决。

但是如果利用纯 PySpark API,就需要将Pandas API重写成PySpark的API,虽然很多API很类似,但是多少有些不一样,而且有些逻辑用用Pandas生态很容易实现,而利用PySpark却很复杂,遇到PySpark没有的API,动辄就要写UDF函数了,所以实际生产部署的时候,如果采用此方式,改造成本会有点高。

有没有简单的方法?

我们知道通常Spark也是作为客户端,使用Hadoop的YARN作为集群的资源管理和调度器。Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。对于PySpark,为了不破坏Spark已有的运行时架构,Spark在外围包装一层Python API。在Driver端,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序。在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码。

Spark运行流程

  • Application首先被Driver构建DAG图并分解成Stage。
  • 然后Driver向Cluster Manager申请资源。
  • Cluster Manager向某些Work Node发送征召信号。
  • 被征召的Work Node启动Executor进程响应征召,并向Driver申请任务。
  • Driver分配Task给Work Node。
  • Executor以Stage为单位执行Task,期间Driver进行监控。
  • Driver收到Executor任务完成的信号后向Cluster Manager发送注销信号。
  • Cluster Manager向Work Node发送释放资源信号。
  • Work Node对应Executor停止运行。

所以简单的做法跑PySparkr任务时利用YARN的分发机制,将可以并行计算的任务同时分发到不同Work Node计算,然后每个节点则利用由原来的Pandas API计算即可。

import sys
import calendar
from typing import Tuple,List
import pandas as pd
import numpy as np
from sklearn import linear_model

from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType, udf, broadcast
from pyspark.sql import Row
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.window import Window

from foo.utils.conn_utils import SparkInit, spark_write_to_hive


def sales_diff_mom_feature(df, unit):
    diff_list = [(0, 1), (0, 2), (1, 2), (1, 3), (2, 3), (2, 4), (3, 4), (3, 5), (0, 12), (8, 12), (11, 12)]
    for (i, j) in diff_list:
        df[f'{target}_{unit}_diff_{i}_{j}'] = df[f'{target}_{unit}_lag_{i}'] - df[f'{target}_{unit}_lag_{j}']
        df[f'{target}_{unit}_mom_{i}_{j}'] = (df[f'{target}_{unit}_lag_{i}'] + 1) / (df[f'{target}_{unit}_lag_{j}'] + 1)

    return df


def sales_rolling_feature(df, window, unit):
    columns = [f'{target}_{unit}_lag_{i}' for i in range(window)]
    df[f'{target}_{unit}_lag_rolling_{window}_mean'] = df[columns].mean(axis=1).astype(np.float32)
    df[f'{target}_{unit}_lag_rolling_{window}_std'] = df[columns].std(axis=1).astype(np.float32)
    df[f'{target}_{unit}_lag_rolling_{window}_max'] = df[columns].max(axis=1).astype(np.float32)
    df[f'{target}_{unit}_lag_rolling_{window}_min'] = df[columns].min(axis=1).astype(np.float32)
    df[f'{target}_{unit}_lag_rolling_{window}_median'] = df[columns].median(axis=1).astype(np.float32)

    return df


def value_sku_agg_rate_lag_feature(df, aggregation, lags):
    df[f'value_sku-{"_".join(aggregation)}_rate'] = df['value'] / df.groupby(aggregation)['value'].transform('sum')
    for lag in lags:
        df[f'value_sku-{"_".join(aggregation)}_rate_lag_{lag}'] = df.groupby([division, 'material_code'])[f'value_sku-{"_".join(aggregation)}_rate'].shift(lag).astype(np.float16)
    df = df.drop([f'value_sku-{"_".join(aggregation)}_rate'], axis=1)

    return df


def get_days_of_month(days_dict, year_month):
    if year_month not in days_dict:
        year = int(str(year_month)[:4])
        month = int(str(year_month)[4:6])
        days = calendar.monthrange(year, month)[1]
        days_dict[year_month] = days

    return days_dict[year_month]


def time_features(df, division):
    df['year'] = df['year_month'].apply(lambda x: int(str(x)[:4])).astype('category')
    df['month'] = df['year_month'].apply(lambda x: int(str(x)[4:6])).astype('category')
    df['on_market_month'] = (df['month_idx'] - df['min_month_idx'] + 1).astype(np.int8)
    df['on_market_day_first_month'] = df.apply(lambda x: get_days_of_month({}, x['year_month']) - int(str(x['mindate'])[6:8]) + 1, axis=1)
    df[f'on_market_month_{division}'] = (df['month_idx'] - df[f'min_month_idx_{division}_sku'] + 1).astype(np.int8)

    return df


def sales_rolling_mean_feature_level(df, window, target, level):
    columns = [f'{target}_material_code_lag_{i}' for i in range(window)]
    df2 = pd.DataFrame(df.groupby([level, 'year_month'])[columns].sum()).reset_index()
    df2[f'{target}_lag_rolling_{window}_mean_{level}'] = df2[columns].mean(axis=1).astype(np.float32)
    df2 = df2.rename(columns={f'{target}_material_code_lag_{i}': f'{target}_{level}_lag_{i}' for i in range(window)})

    return df2[[level, 'year_month', f'{target}_lag_rolling_{window}_mean_{level}', f'{target}_{level}_lag_0']]


def latest_sale_proportion_feature(df, window, target, level):
    df2 = sales_rolling_mean_feature_level(df, window, target, level)
    df2[f'latest_{window}_{target}_proportion_{level}'] = df2[f'{target}_{level}_lag_0'] / df2[f'{target}_lag_rolling_{window}_mean_{level}']
    df = pd.merge(df, df2[[level, 'year_month', f'latest_{window}_{target}_proportion_{level}']], on=[level, 'year_month'], how='left')
    return df


def get_trend_by_lr(x, df_all, unit, window, target):
    current_month = x['month_idx']
    sku = x[unit]
    start_month = current_month - window
    condition = (df_all['month_idx'] >= start_month) & (df_all['month_idx'] <= current_month) & (df_all[unit] == sku)
    train_data = df_all.loc[condition, [target, 'month_idx']].drop_duplicates()

    coef = np.nan
    if len(train_data):
        model = linear_model.LinearRegression()
        model.fit(train_data['month_idx'].to_numpy().reshape(-1, 1), train_data[target])
        coef = model.coef_[0]
    return coef


def add_trend_feature(df, unit, window, target):
    df_unit = df.loc[df['on_market_month'] > 3].groupby([unit, 'month_idx'])[target].sum().reset_index()
    df_unit = df_unit.dropna(axis=0, subset=[target])
    if df_unit.empty:
        return df
    try:
        df_unit[f'trend_slope_{unit}_{window}'] = df_unit.apply(lambda x: get_trend_by_lr(x, df_unit, unit, window, target), axis=1)
        df = pd.merge(df, df_unit[[unit, 'month_idx', f'trend_slope_{unit}_{window}']], on=[unit, 'month_idx'], how='left')
    except Exception as e:
        print(e)

    return df


def feature_engineer(df, calendar, division, target):
    df = df.sort_values(['material_code', division, 'year_month'])
    unit_list = ['material_code']

    for unit in unit_list:
        for lag in range(13):
            df[f'{target}_{unit}_lag_{lag}'] = df.groupby([division, 'material_code'])[target].shift(lag)
        df = sales_diff_mom_feature(df, unit)
    print('sales_lag_feature finished!')
    print('sales_lag_feature diff & mon finished!')

    window_list = [2, 3, 6, 9, 12]
    for unit in unit_list:
        for window in window_list:
            df = sales_rolling_feature(df, window, unit)
    print('sales_rolling_feature finished!')

    agg_list = [['year_month', 'material_code']]
    for agg in agg_list:
        df = value_sku_agg_rate_lag_feature(df, agg, range(6))
    print('value_sku-aggregation_rate_feature finished!')

    df = time_features(df, division)
    print('time_features finished!')

    for level in ['material_code', 'sub_brand', 'category']:
        df = latest_sale_proportion_feature(df, 12, target, level)
    print("latest_sale_proportion_feature finished!")

    for unit in ['material_code', 'store']:
        for window in [3, 4, 5, 6, 9]:
            df = add_trend_feature(df, unit, window, target)
    print("trend feature finished!")

    cat_columns = ['month', 'material_code', 'brand', 'sub_brand', 'franchise', 'category', 'series', 'signature',
                   'area', 'axe', 'sub_axe', 'class', 'function_id', 'mstatus', division, 'sales_level',
                   'level3', 'level5', 'level6', 'level5_6', 'level3_5_6', 'oj1_brand', 'l2_label',
                   f'{division}_values_level', f'{division}_level', 'seasonality_flag', 'sku_type', 'Franchise', 'citycode', 'line_city', 'prvnname_ch', 'regionname_ch', 'area', 'nation']
    for column in cat_columns:
        if column in df.columns:
            df[column] = df[column].astype('category')
    print('category_feature finished!')

    for lag in range(1, 16):
        df[f'target_m_{lag}'] = df.groupby([division, 'material_code'])[target].shift(-lag).astype(np.float32)
    print('define Y finished!')

    for lag in range(1, 16):
        df[f'active_sku_filter_m_{lag}'] = df.groupby([division, 'material_code'])['filter'].shift(-lag).astype(np.float32)
        df[f'active_sku_filter_m_{lag}'] = df[f'active_sku_filter_m_{lag}'].fillna(0)
    df = df.drop('filter', axis=1)
    print('active_sku_filter finished!')

    column_list = df.columns.to_list()
    column_list.remove('brand')
    column_list.remove('category')
    df = df[column_list + ['brand', 'category']]
    
    return df


def make_feature_engineer(rows, calendar_b, division, target):
    """
        groupbyKey -- category
    :param rows:
    :param calendar_b:
    :param division:
    :param target:
    :return:
    """
    row_list = list()
    for row in rows:
        row_list.append(row.asDict())
    df = pd.DataFrame(row_list)
    
    # 广播变量的值
    calendar = calendar_b.value

    df = feature_engineer(df, calendar, division, target)

    dfRow = Row(*df.columns)
    row_list = []
    for r in df.values:
        row_list.append(dfRow(*r))

    return row_list


def spark_dis_com(spark, processor_data, calendar_b, division, target, repartition, parallel_column):
    #### distributed compute
    feature_data_rdd = processor_data.rdd. \
        map(lambda x: (x[parallel_column], x)). \
        groupByKey(). \
        flatMap(lambda x: make_feature_engineer(x[1], calendar_b, division, target))

    #### write table
    spark_write_to_hive(
        spark.createDataFrame(feature_data_rdd.repartition(repartition)),
        'ldlgtm_dpt.ld_feature_store_bh'
    )



if __name__ == '__main__':
    ############################# offline test #############################
    ###################### Configuring
    # ...
    # processor_data = pd.read_csv('...')
    # calendar = pd.read_csv('...')
    # feature_data = feature_engineer(processor_data, calendar, division, target)
    # feature_data.to_pickle('...')
    
    ############################# online pre/prd #############################
    ###################### Init Spark
    spark =  SparkInit(f'sf-app-gtm-art-fcsting-POS-LD')

    ###################### Configuring
    ...

    # 加载数据
    processor_data = spark.sql(f""" ... """)
    calendar = get_calendar_data(spark)
    
    # 广播变量
    calendar_b = spark.sparkContext.broadcast(calendar.toPandas())
    
    spark_dis_com(spark, processor_data, calendar_b, division, target, 24, 'category')
    print("feature_data is successful")
    print("feature_data write table successful")
    
    spark.stop()

以上述代码举例说明:

  • offline test 是在线下测试的代码,如函数 feature_engineer 即是普通的基于Pandas API 的纯Python代码;
  • online pre/prd 是线上开发和生产环境的代码,可以看到函数 make_feature_engineer 和 spark_com_dis 的代码对于 feature_engineer 稍加改动就变成了分布式计算的代码,主要有以下几点:
    • 利用spark.sql 读取 hive表里存储的预处理好的数据 processor_data(pyspark.sql.dataframe.DataFrame),基于processor_data做特征工程计算;
    • processor_data 可以根据某个字段或某几个字段做map分组分发;
    • flatMap API 根据make_feature_engineer函数做分布式计算,并将最后结果合并;
    • make_feature_engineer 函数先将每个分组内的pyspark.sql.Row准成 Python dict,再转成 list, 继而生成一个Pandas DataFrame,然后继续使用 feature_engineer 函数计算,最后还原成由pyspark.sql.Row组成的list;
    • 利用spark.createDataFrame API 创建 Spark DataFrame 写表;
    • 对于其他的辅助变量 例如calendar_b,需要广播到各个节点。

可以看到,这里分布式计算较为灵活,可以根据某个字段或某几个字段(需要根据自己的数据需求)做map分组分发,比如这里我是根据 category 分组分发计算(我这里的数据必须根据每个category 训练模型),非常实用。

当然有的时候我们还可以根据下面的方式分组分发,这里我是根据每个门店 store 做分发:

  • 利用 spark.sparkContext.parallelize(store_list, 24) 生成 rdd(使用已经存在的迭代器或者集合通过调用spark驱动程序提供的parallelize函数来创建并行集合,并行集合被创建用来在分布式集群上并行计算,这里的24表示将RDD切分多少个分区);
  • 对于其他变量进行广播,每组RDD内的store数据直接使用 flatMap API计算;
def map_make_post_process(model_output_b, calendar_b, odf_df_b, event_df_b, sales_df_pro_b, sku_delisting_df_b, forecast_list, end_date, M0, M1_5_list, store):
    ######## calendar & odp data & event data & ld_month_actual_event_normal_ratio data broadcast
    model_output = model_output_b.value
    calendar = calendar_b.value
    odp_df = odf_df_b.value
    event_df = event_df_b.value
    sales_df_pro = sales_df_pro_b.value
    sku_delisting_df = sku_delisting_df_b.value

    output_formated = model_output.loc[model_output['store'] == store]
    
    ...
    
    output_formatedRow = Row(*output_formated.columns)
    row_list = []
    for r in output_formated.values:
        row_list.append(output_formatedRow(*r))

    return row_list
    
if __name__ == '__main__':
    # ......
    store_list = model_output['store'].unique()
    store_rdd = spark.sparkContext.parallelize(store_list, 24)
    output_formated_data_row_list = store_rdd.flatMap(
        lambda x: map_make_post_process(model_output_b, calendar_b, odp_df_b, event_df_b, sales_df_pro_b, sku_delisting_df_b, forecast_list, end_date, M0, M1_5_list, x)
    )
    output_formated_data_sdf = output_formated_data_row_list.toDF().repartition(24).persist()
    ......