带着爱和梦想去生活

数据工程化案例介绍

· Read in about 4 min · (715 Words)

[原创]

数据工程化案例介绍

好久没写博客了😃😃😃😃😃😃,最近做完了一个偏数据工程的项目,系统的使用了大数据相关组件,学习了Hadoop生态圈技术以及数据仓库相关知识。下面将一些体会写下。。。

1 项目背景和业务特点

XXX医药业务场景:以终端消费者为中心的服务,以门店、连锁加盟、批发模式触达,当前核心竞争力是品牌加盟和供应链采购能力。随着加盟业务快速成长,致力于成为中国最大的零售药房加盟商,需要配套成长的供应链物流能力和信息化建设。“高库存、高退货、高效期” 等环节精细化运营的薄弱是主要原因,具体表现在以下几点:

  • (1) 门店补货靠经验,造成了“高退货”;
  • (2) 加盟店和批发商等物流能力尚未触达、物流信息尚未线上化;
  • (3) 与供应商信息沟通均为线下,补货频次较为传统;
  • (4) 采购计划依赖采购员个人经验采用公式计算,未考虑复杂因素;

项目目标是构建以智能补货为智慧大脑的需求驱动的全局、动态、平衡的数字化供应链运营体系,提供安全、高效、精准的供应链能力。主要包括以下部分:

  • (1) 数据清洗
  • (2) 特种工程
  • (3) 模型训练
  • (4) 模型融合
  • (5) 数据工程化

其中前4个部分是机器学习的常见方法和步骤,考虑到线上生产环境要能正常执行,第5部分数据工程化部分启动非常重要的地位,下面对这个部分进行详细的叙述。

2 数据工程化流程架构

项目架构设计.png

这里我们的数据源主要Oracle业务数据以及一些客户提供的人工数据,利用sqoop每日凌晨00:40定时同步数据至Hadoop集群src层。按照经典的Kappa数据仓库分层架构分为:src->ods->dw1->dw2->app.

与传统的数仓建模不同的是我们主要的目的是利用机器学习方法进行预测补货,数据仓库ods/dw1都是数据清洗和复杂业务逻辑处理,dw2是特征工程处理生成宽表用于训练模型。在数据清洗的过程中会有一些指标用于KPI展示以及app模型预测补货结果我们会同步至MySQL,这些都是作为数据应用服务。

整个数据工程基于Hadoop生态圈技术为载体,数据存储主要是HDFS,数据计算主要是Hive/Spark,元数据管理是Apache Atlas,数据质量分析用的是Apache Griffin,数据任务流调度系统用的是Azkaban,数据OLAP数据库是Presto,数据分析可视化Dashboard用的是Superset。这些大数据组件采用Cloudera Manager(CM)进行统一安装配置,为了保证服务的高可用(HA)采用Zookeeper进行资源调度和管理。

3 数据工程生产环境部署

生产环境全流程部署.png

3.1 可配置项

配置项对于数据工程的重要性不言而喻,可灵活调整部署代码,方便控制管理

├─conf
│  │  config.ini         所有可配置参数
│  │  env_name.conf      生产环境和测试环境的标识符
│  │  ini.sh             读写配置文件的函数

例如:

[replenish_parameters]
start_date=2020-07-18
end_date=2020-08-31 
rolling_day=7
rolling_num=50

[env_parameters]
data_base_dir=/data
base_dir_jar=/root

这样我们对于每张表的计算添加统一的配置项

exe_hive=`bash /root/ini.sh /root/config.ini env_parameters exe_hive`
exe_spark_sql=`bash /root/ini.sh /root/config.ini env_parameters exe_spark_sql`
database_name=`bash /root/ini.sh /root/config.ini env_parameters database_name`
master_org_id=`bash /root/ini.sh /root/config.ini env_parameters master_org_id`

3.2 任务流调度

编写全局任务的配置文件

# default project work directory
JOB_BASE_DIR=/root/code

success.emails=user1@163.com,user2@163.com
failure.emails=user1@163.com,user2@163.com

编写job任务

type=command
dependencies=ods_6_kpi_wh_supplyer_bol_retailer_bol
retries=3
retry.backoff=10000

#$(date +%F)
#job_param_compute_date=${ui_param_compute_date}

command=bash         ${JOB_BASE_DIR}/df_model_output/01_shop_op_bol.sh

任务流程图image.png

3.3 元数据管理

元数据管理和治理功能,用以构建其数据资产目录,对这些资产进行分类和管理,并为数据分析师和数据治理团队,提供围绕这些数据资产的协作功能。 * 表与表之间的血缘依赖 * 字段与字段之间的血缘依赖 image.png

3.4 数据质量监控

检查关键表的生成记录数和字段阈值

function check_wcl(){
        echo "开始检查$1"
        answer=`cat $1|wc -l`
        if [ $answer != $2 ]; then
                echo "【异常】ERROR_LINE_COUNT: $1 should be $2 lines, now $answer lines"
        fi
        echo "结束检查$1"
        echo
}

function check_range(){
        echo "开始检查$1"
        cat $1|awk -F' ' -v col=$2 -v lower=$3 -v upper=$4 '{ if (NR>1 && $(col)>upper) print "【异常】ERROR_TOO_HIGH: " $0 }'
        cat $1|awk -F' ' -v col=$2 -v lower=$3 -v upper=$4 '{ if (NR>1 && $(col)<lower) print "【异常】ERROR_TOO_LOW: " $0 }'
        echo "结束检查$1"
        echo
}

邮件通知

image.png

image.png

4 项目技术点总结

Sqoop

(1) Sqoop导入导出Null存储一致性问题

Hive中的Null在底层是以“\N”来存储,而MySQL中的Null在底层就是Null,为了保证数据两端的一致性。在导出数据时采用–input-null-string和–input-null-non-string两个参数。导入数据时采用–null-string和–null-non-string。

(2) Sqoop数据导出一致性问题

如Sqoop在导出到Mysql时,使用4个Map任务,过程中有2个任务失败,那此时MySQL中存储了另外两个Map任务导入的数据。

Sqoop本身的容错依赖于Hadoop,在Sqoop如何解决传输任务失败引发的数据一致性问题。Sqoop将一个传输作业生成一个mapreduce job,一个job有多个并行执行传输作业的mapreduce task在和外部数据库做数据传输,一些原因会导致数据一致性问题:

  1. 违反数据库约束(主键唯一性)、字段类型不一致、时间分区不一致
  2. 数据库连接丢失
  3. 由于分隔符等原因,传输的列数和表的列数不一致
  4. Hadoop机器硬件问题

一个传输任务,由多个task并行执行,每个task本身是一个transaction,当这个task fail,这个transaction会roll back,但其他的transaction不会roll back,这就会导致非常严重的脏数据问题,数据部分导入,部分缺失,怎么办???

对于Sqoop Import任务,由于Hadoop CleanUp Task的存在,这个问题不存在;Sqoop Export任务则提供了一个“中间表”的解决办法

先将数据写入到中间表,写入中间表成功,在一个transaction中将中间表的数据写入目标表–staging-table 中间表–clear-staging-table 任务开始前,清空中间表

sqoop export --connect jdbc:mysql://192.168.137.10:3306/user_behavior \
--username root \
--password 123456 \
--table app_cource_study_report \
--columns watch_video_cnt,complete_video_cnt,dt \
--fields-terminated-by "\t" \
--export-dir "/user/hive/warehouse/tmp.db/app_cource_study_analysis_${day}" \
--staging-table app_cource_study_report_tmp \
--clear-staging-table \
--input-null-string '\N'

(3) Sqoop在导入数据的时候数据倾斜

Sqoop 抽数的并行化主要涉及到两个参数:

  • num-mappers:启动N个map来并行导入数据,默认4个;
  • split-by:按照某一列来切分表的工作单元,通过ROWNUM() 生成一个严格均匀分布的字段,然后指定为分割字段。
    • split-by 根据不同的参数类型有不同的切分方法,如int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来 确定划分几个区域。比如select max(split_by),min(split-by) from得到的max(split-by)和min(split-by)分别为1000和1,而num-mappers(-m)为2的话,则会分成两个区域 (1,500)和(501-1000),同时也会分成2个sql给2个map去进行导入操作,分别为select XXX from table where split-by>=1 and split-by<500和select XXX from table where split-by>=501 and split-by<=1000.最后每个map各自获取各自SQL中的数据进行导入工作。
    • 当split-by不是int型时出现如上场景中的问题。目前想到的解决办法是:将-m 设置称1,split-by不设置,即只有一个map运行,缺点是不能并行map录入数据。(注意,当-m 设置的值大于1时,split-by必须设置字段) 。
    • split-by即便是int型,若不是连续有规律递增的话,各个map分配的数据是不均衡的,可能会有些map很忙,有些map几乎没有数据处理的情况。
#!/usr/bin/bash

#参数配置
exe_hive="/usr/bin/hive"

#抽取表名
table_name='xxx'

#分区字段信息
master_org_id='xxx'

if [[ $# -eq 1 ]]; then
    update_day=$1
else
    update_day=$(date -d "1 day ago" +"%Y-%m-%d")
fi
echo "default_date:${update_day}"

sqoop import -D org.apache.sqoop.splitter.allow_text_splitter=true \
-m 4 \
--hive-drop-import-delims \
--fields-terminated-by '\001' \
--connect "jdbc:oracle:thin:@xxx.xx.xx.x:1521:xxxx" \
--username "xx" \
--password "xx" \
--mapreduce-job-name sqoop_${table_name} \
--delete-target-dir \
--split-by 'etl_id' \
--query "
select 
    etl_id                 ,
    DJLX                   ,
    to_char(sysdate,'yyyy-mm-dd hh24:mm:ss') as update_date,
    to_char(sysdate,'yyyy-mm-dd') as dt
from (select 
				row_number() OVER(order by T.SL) AS etl_id,
        T.* 
      from dbo.${table_name} T 
      where T.receivestatus = 0
) a  where \$CONDITIONS" \
--target-dir /user/hive/warehouse/src.db/${table_name}/master_org_id=${master_org_id}/dt=${update_day} \
--null-string '\\N' \
--null-non-string '\\N' \
-z


HQL="
alter table src.${table_name} add partition(master_org_id='${master_org_id}',dt='${update_day}') 
location '/user/hive/warehouse/src.db/${table_name}/master_org_id=${master_org_id}/dt=${update_day}';
"

#执行HQL
bash ${exe_hive} -e "${HQL}"

#判断数据是否成功导入
hdfs dfs -ls /user/hive/warehouse/src.db/${table_name}/master_org_id=${master_org_id}/dt=${update_day}
if [ $? -eq 0 ] ;then 
    echo '导入数据成功'
    #删除Java文件
    rm -f *.java 
else 
    echo '导入数据失败,请检查相关服务'
    exit 1
fi

(4) Sqoop底层运行的任务是什么

只有Map阶段,没有Reduce阶段的任务。

(5)Sqoop数据导出Parquet

app层数据用Sqoop往MySql中导入数据的时候,如果用了orc(Parquet)不能导入,需转化成text格式。

Hive

(1) 大小表Join——MapJoin

如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

小表关联一个超大表时,容易发生数据倾斜,使用 MapJoin把小表全部加载到内存在map端进行join。如果需要的数据在 Map 的过程中可以访问到则不再需要Reduce。

原始sql:

select c.channel_name,count(t.requesturl) PV
from ods.cms_channel c
join (
  select host,requesturl 
  from  dms.tracklog_5min 
  where day='20151111'
) t
on c.channel_name=t.host
group by c.channel_name
order by c.channel_name;

以上为小表join大表的操作,可以使用mapjoin把小表c放到内存中处理,语法很简单只需要增加 /*+ MAPJOIN(小表) */,把需要分发的表放入到内存中。

select /*+ MAPJOIN(c) */
c.channel_name,count(t.requesturl) PV
from ods.cms_channel c
join (
  select host,requesturl 
  from  dms.tracklog_5min 
  where day='20151111'
) t
on c.channel_name=t.host
group by c.channel_name
order by c.channel_name;

(2) 行列过滤

  • 列处理:在SELECT中,只拿需要的列,如果有,尽量使用分区过滤,少用SELECT *。
  • 行处理:在分区剪裁中,当使用外关联时,如果将副表的过滤条件写在Where后面,那么就会先全表关联,之后再过滤。

(3) 合理设置Map数

  • (1)通常情况下,作业会通过input的目录产生一个或者多个map任务。

主要的决定因素有:input的文件总个数,input的文件大小,集群设置的文件块大小。

  • (2)是不是map数越多越好?

答案是否定的。如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。

  • (3)是不是保证每个map处理接近128m的文件块,就高枕无忧了?

答案也是不一定。比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。

针对上面的问题2和3,我们需要采取两种方式来解决:即减少map数和增加map数

(4) Hive小文件合并

Hive的后端存储是HDFS,它对大文件的处理是非常高效的,如果合理配置文件系统的块大小,NameNode可以支持很大的数据量。但是在数据仓库中,越是上层的表其汇总程度就越高,数据量也就越小。而且这些表通常会按日期进行分区,随着时间的推移,HDFS的文件数目就会逐渐增加。

  • 小文件带来的问题

简单来说,HDFS的文件元信息,包括位置、大小、分块信息等,都是保存在NameNode的内存中的。每个对象大约占用150个字节,因此1000万个文件及分块就会占用约3G的内存空间,一旦接近这个量级,NameNode的性能就会开始下降了。

此外,HDFS读写小文件时也会更加耗时,因为每次都需要从NameNode获取元信息,并与对应的DataNode建立连接。对于MapReduce程序来说,小文件还会增加Mapper的个数,每个脚本只处理很少的数据,浪费了大量的调度时间。当然,这个问题可以通过使用CombinedInputFile和JVM重用来解决。

  • Hive小文件产生的原因

前面已经提到,汇总后的数据量通常比源数据要少得多。而为了提升运算速度,我们会增加Reducer的数量,Hive本身也会做类似优化——Reducer数量等于源数据的量除以hive.exec.reducers.bytes.per.reducer所配置的量(默认1G)。Reducer数量的增加也即意味着结果文件的增加,从而产生小文件的问题。

解决小文件的问题可以从两个方向入手:1. 输入合并。即在Map前合并小文件 2. 输出合并。即在输出结果的时候合并小文件

  • 配置Map输入合并

    set mapred.max.split.size=256000000; -- 每个Map最大输入大小,决定合并后的文件数
    set mapred.min.split.size.per.node=100000000; -- 一个节点上split的至少的大小 ,决定了多个data node上的文件是否需要合并
    set mapred.min.split.size.per.rack=100000000; -- 一个交换机下split的至少的大小,决定了多个交换机上的文件是否需要合并
    set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -- 执行Map前进行小文件合并
    
    • 配置Map输出合并

    我们可以通过一些配置项来使Hive在执行结束后对结果文件进行合并:

    # 输出合并小文件
    set hive.merge.mapfiles = true; -- 默认true,在map-only任务结束时合并小文件
    set hive.merge.mapredfiles = true; -- 默认false,在map-reduce任务结束时合并小文件
    set hive.merge.size.per.task = 268435456; -- 默认256M
    set hive.merge.smallfiles.avgsize = 16777216; -- 当输出文件的平均大小小于该值时,启动一个独立的map-reduce任务进行文件merge
    

(5) 合理设置Reduce数

Reduce个数并不是越多越好 - 过多的启动和初始化Reduce也会消耗时间和资源; - 另外,有多少个Reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题;

在设置Reduce个数的时候也需要考虑这两个原则:处理大数据量利用合适的Reduce数;使单个Reduce任务处理数据量大小要合适;

(6) Hive元数据的安全性

Hive的metadata存储在MySQL中,需要配置MySQL的高可用(主从复制和读写分离和故障转移)。

Spark

(1) Spark的架构与作业提交流程

  • Yarn模式: image.png

  • RDD: RDD在Lineage依赖方面分为两种Narrow Dependencies与Wide Dependencies用来解决数据容错时的高效性以及划分任务时候起到重要作用。

  • Spark的宽窄依赖,以及Spark如何划分stage,每个stage又根据什么决定task个数: Stage:根据RDD之间的依赖关系的不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage。 Task:Stage是一个TaskSet,将Stage根据分区数划分成一个个的Task。

  • Spark中的缓存机制(cache和persist)与checkpoint机制: 这两个都是做RDD持久化的, cache:内存,不会截断血缘关系,使用计算过程中的数据缓存。 checkpoint:磁盘,截断血缘关系,在ck之前必须没有任何任务提交才会生效,ck过程会额外提交一次任务。

(2) Repartition和Coalesce关系与区别

  • 1)关系:

两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)。

  • 2)区别:

repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle。一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce。

(3) 当Spark涉及到数据库的操作时,如何减少Spark运行中的数据库连接数?

使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。

(4) Spark Shuffle

我们知道在进行shuffle的时候会将各个节点上key相同的数据传输到同一结点进行下一步的操作。如果某个key或某几个key下的数据的数据量特别大,远远大于其他key的数据,这时就会出现一个现象,大部分task很快就完成结束,剩下几个task运行特别缓慢。甚至有时候还会因为某个task下相同key的数据量过大而造成内存溢出。这就是发生了数据倾斜。

  • 调整分区数目
  • 去除空值多余数据
  • 使用广播将reduce join 转化为map join
  • 将key进行拆分,大数据化小数据
  • 资源参数调优

(5) Spark资源参数

对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数。

(1)num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

(2)executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

(3)executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

(4)driver-memory

  • 参数说明:该参数用于设置Driver进程的内存。
  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

(5)spark.default.parallelism

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

(6)spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

(7)spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。

在提交任务时的几个重要参数

executor-cores —— 每个executor使用的内核数,默认为1,官方建议2-5个,我们企业是4个
num-executors —— 启动executors的数量,默认为2
executor-memory —— executor内存大小,默认1G
driver-cores —— driver使用内核数,默认为1
driver-memory —— driver内存大小,默认512M

资源参数参考示例以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \
  --driver-class-path ./conf/postgresql-9.4-1202.jdbc42.jar \
  --jars ./conf/postgresql-9.4-1202.jdbc42.jar bom_variance_inc.py 

5 项目复盘

  • 开发、测试、生产代码用同一套代码
    • 问题:客户只提供了一套硬件环境,同一套代码无法区分测试数据库和生产数据库,有冲突;
    • 解决:增设配置文件,代码中对数据库名增加前缀;
  • 大量复用POC阶段数据分析的数据处理代码和逻辑
    • 问题:造成数据工程中有大量无效计算,浪费计算资源,生成不少无效库表,且不便于梳理数据流程,计算过程复杂;
    • 解决:梳理数据流程,根据算法和产品需要优化计算逻辑和表设计;
  • 被动接受产品需求推动
    • 问题:早期设计主要考虑算法需要,项目中期产品不断提出新需求,出现大量临时计算逻辑调整和字段增加;
    • 解决:集中梳理产品设计数据需求,全面梳理计算逻辑后统一设计表和ETL逻辑;
  • 使用问题追溯的方式处理数据异常
    • 问题:早期都是从ARP中的KPI、数据展示发现问题,再向上游追溯数据问题,分析非常耗时;
    • 解决:开发数据监控机制,对数据源同步,数据重复,计算结果的条数和阈值异常等增加监控,并增加邮件报警;