1 Spark任务计算时间评估 可以用下面三个公式来近似估计spark任务的执行时间。 $$ 任务执行时间 ≈ \frac{任务计算总时间 + shuffle总时间 + GC垃圾回收总时间} {任务有效并行度}$$ $$ 任务有效并行度 ≈ \frac{min(任务并行度, partition分区数量)} {数据倾斜度\times 计算倾斜度} $$ $$ 任务并行度 ≈ executor数量 \times 每个executor的core数量 $$ 可以用下面两个公式来说明spark在executor上的内存分配。 $$ executor申请的内存 ≈ 堆内内存(堆内内存由多个core共享) + 堆外内存 $$ $$ 堆内内存 ≈ storage内存+execution内存+other内存 $$ 以下是对上述公式中涉及到的一些概念的初步解读。 任务计算总时间:假设由一台无限内存的同等CPU配置的单核机器执行该任务,所需要的运行时间。通过缓存避免重复计算,通过mapPartitions代替map以减少诸如连接数据库,预处理广播变量等重复过程,都是减少任务计算总时间的例子。 shuffle总时间:任务因为reduceByKey,join,sortBy等shuffle类算子会触发shuffle操作产生的磁盘读写和网络传输的总时间。shuffle操作的目的是将分布在集群中多个节点上的同一个key的数据,拉取到同一个节点上,以便让一个节点对同一个key的所有数据进行统一处理。 shuffle过程首先是前一个stage的一个shuffle write即写磁盘过程,中间是一个网络传输过程,然后是后一个stage的一个shuffle read即读磁盘过程。shuffle过程既包括磁盘读写,又包括网络传输,非常耗时。因此如有可能,应当避免使用shuffle类算子。例如用map+broadcast的方式代替join过程。退而求其次,也可以在shuffle之前对相同key的数据进行归并,减少shuffle读写和传输的数据量。此外,还可以应用一些较为高效的shuffle算子来代替低效的shuffle算子。例如用reduceByKey/aggregateByKey来代替groupByKey。最后,shuffle在进行网络传输的过程中会通过netty使用JVM堆外内存,spark任务中大规模数据的shuffle可能会导致堆外内存不足,导致任务挂掉,这时候需要在配置文件中调大堆外内存。 GC垃圾回收总时间:当JVM中execution内存不足时,会启动GC垃圾回收过程。执行GC过程时候,用户线程会终止等待。因此如果execution内存不够充分,会触发较多的GC过程,消耗较多的时间。在spark2.0之后excution内存和storage内存是统一分配的,不必调整excution内存占比,可以提高executor-memory来降低这种可能。或者减少executor-cores来降低这种可能(这会导致任务并行度的降低)。 任务有效并行度:任务实际上平均被多少个core执行。它首先取决于可用的core数量。当partition分区数量少于可用的core数量时,只会有partition分区数量的core执行任务,因此一般设置分区数是可用core数量的2倍以上20倍以下。此外任务有效并行度严重受到数据倾斜和计算倾斜的影响。有时候我们会看到99%的partition上的数据几分钟就执行完成了,但是有1%的partition上的数据却要执行几个小时。这时候一般是发生了数据倾斜或者计算倾斜。这个时候,我们说,任务实际上有效的并行度会很低,因为在后面的这几个小时的绝大部分时间,只有很少的几个core在执行任务。 任务并行度:任务可用core的数量。它等于申请到的executor数量和每个executor的core数量的乘积。可以在spark-submit时候用num-executor和executor-cores来控制并行度。此外,也可以开启spark.dynamicAllocation.enabled根据任务耗时动态增减executor数量。虽然提高executor-cores也能够提高并行度,但是当计算需要占用较大的存储时,不宜设置较高的executor-cores数量,否则可能会导致executor内存不足发生内存溢出OOM。 partition分区数量:分区数量越大,单个分区的数据量越小,任务在不同的core上的数量分配会越均匀,有助于提升任务有效并行度。但partition数量过大,会导致更多的数据加载时间,一般设置分区数是可用core数量的2倍以上20倍以下。可以在spark-submit中用spark.default.parallelism来控制RDD的默认分区数量,可以用spark.sql.shuffle.partitions来控制SparkSQL中给shuffle过程的分区数量。 数据倾斜度:数据倾斜指的是数据量在不同的partition上分配不均匀。一般来说,shuffle算子容易产生数据倾斜现象,某个key上聚合的数据量可能会百万千万之多,而大部分key聚合的数据量却只有几十几百个。一个partition上过大的数据量不仅需要耗费大量的计算时间,而且容易出现OOM。对于数据倾斜,一种简单的缓解方案是增大partition分区数量,但不能从根本上解决问题。一种较好的解决方案是利用随机数构造数量为原始key数量1000倍的中间key。大概步骤如下,利用1到1000的随机数和当前key组合成中间key,中间key的数据倾斜程度只有原来的1/1000, 先对中间key执行一次shuffle操作,得到一个数据量少得多的中间结果,然后再对我们关心的原始key进行shuffle,得到一个最终结果。 计算倾斜度:计算倾斜指的是不同partition上的数据量相差不大,但是计算耗时相差巨大。考虑这样一个例子,我们的RDD的每一行是一个列表,我们要计算每一行中这个列表中的数两两乘积之和,这个计算的复杂度是和列表长度的平方成正比的,因此如果有一个列表的长度是其它列表平均长度的10倍,那么计算这一行的时间将会是其它列表的100倍,从而产生计算倾斜。计算倾斜和数据倾斜的表现非常相似,我们会看到99%的partition上的数据几分钟就执行完成了,但是有1%的partition上的数据却要执行几个小时。计算倾斜和shuffle无关,在map端就可以发生。计算倾斜出现后,一般可以通过舍去极端数据或者改变计算方法优化性能。 堆内内存:on-heap memory, 即Java虚拟机直接管理的存储,由JVM负责垃圾回收GC。由多个core共享,core越多,每个core实际能使用的内存越少。core设置得过大容易导致OOM,并使得GC时间增加。 堆外内存:off-heap memory, 不受JVM管理的内存, 可以精确控制申请和释放, 没有GC问题。一般shuffle过程在进行网络传输的过程中会通过netty使用到堆外内存。
7125messi的博客
last update:[项目经验总结] 从事数据相关工作,最喜欢用的工具就是基于Pandas、Jupyter Lab等工具,拿到样本数据,单机上快速迭代试验验证想法,这确实很方便,但是等到模型部署上线的时候,数据量很大,很难单机就搞定,目前主流的做法是用Spark分布式计算解决。 但是如果利用纯 PySpark API,就需要将Pandas API重写成PySpark的API,虽然很多API很类似,但是多少有些不一样,而且有些逻辑用用Pandas生态很容易实现,而利用PySpark却很复杂,遇到PySpark没有的API,动辄就要写UDF函数了,所以实际生产部署的时候,如果采用此方式,改造成本会有点高。 有没有简单的方法? 我们知道通常Spark也是作为客户端,使用Hadoop的YARN作为集群的资源管理和调度器。Spark集群由Driver, Cluster Manager(Standalone,Yarn 或 Mesos),以及Worker Node组成。对于每个Spark应用程序,Worker Node上存在一个Executor进程,Executor进程中包括多个Task线程。对于PySpark,为了不破坏Spark已有的运行时架构,Spark在外围包装一层Python API。在Driver端,借助Py4j实现Python和Java的交互,进而实现通过Python编写Spark应用程序。在Executor端,则不需要借助Py4j,因为Executor端运行的Task逻辑是由Driver发过来的,那是序列化后的字节码。 Spark运行流程 Application首先被Driver构建DAG图并分解成Stage。 然后Driver向Cluster Manager申请资源。 Cluster Manager向某些Work Node发送征召信号。 被征召的Work Node启动Executor进程响应征召,并向Driver申请任务。 Driver分配Task给Work Node。 Executor以Stage为单位执行Task,期间Driver进行监控。 Driver收到Executor任务完成的信号后向Cluster Manager发送注销信号。 Cluster Manager向Work Node发送释放资源信号。 Work Node对应Executor停止运行。 所以简单的做法跑PySparkr任务时利用YARN的分发机制,将可以并行计算的任务同时分发到不同Work Node计算,然后每个节点则利用由原来的Pandas API计算即可。 import sys import calendar from typing import Tuple,List import pandas as pd import numpy as np from sklearn import linear_model from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType from pyspark.sql import functions as F from pyspark.
[参考文献] * 《Effective Python:编写高质量Python代码的90个有效方法(原书第2版)》 并发(concurrency)指计算机似乎能在同一时刻做许多件不同的事情。例如,在只配有一个CPU核心的计算机上面,操作系统可以迅速切换这个处理器所运行的程序,因此尽管同一时刻最多只有一个程序在运行,但这些程序能够交替地使用这个核心,从而造成一种假象,让人觉得它们好像真的在同时运行。 并行(parallelism)与并发的区别在于,它强调计算机确实能够在同一时刻做许多件不同的事情。例如,若计算机配有多个CPU核心,那么它就真的可以同时执行多个程序。每个CPU核心执行的都是自己的那个程序之中的指令,这些程序能够同时向前推进。 在同一个程序之中,我们可以利用并发轻松地解决某些类型的问题。例如,并发可以让程序里面出现多条独立的执行路径,每条路径都可以处理它自己的I/O流,这就让我们觉得这些I/O任务好像真的是在各自的路径里面同时向前推进的。 并行与并发之间的区别,关键在于能不能提速(speedup)。如果程序把总任务量分给两条独立的执行路径去同时处理,而且这样做确实能让总时间下降到原来的一半,那么这就是并行,此时的总速度是原来的两倍。反过来说,假如无法实现加速,那即便程序里有一千条独立的执行路径,也只能叫作并发,因为这些路径虽然看起来是在同时推进,但实际上却没有产生相应的提速效果。 Python让我们很容易就能写出各种风格的并发程序。在并发量较小的场合可以使用线程(thread),如果要运行大量的并发函数,那么可以使用协程(coroutine)。 并行任务,可以通过系统调用、子进程与C语言扩展(Cextension)来实现,但要写出真正能够并行的Python代码,其实是很困难的。 ● 即便计算机具备多核的CPU,Python线程也无法真正实现并行,因为它们会受全局解释器锁(GIL)牵制。 ● 虽然Python的多线程机制受GIL影响,但还是非常有用的,因为我们很容易就能通过多线程模拟同时执行多项任务的效果。 ● 多条Python线程可以并行地执行多个系统调用,这样就能让程序在执行阻塞式的I/O任务时,继续做其他运算。 ● 虽然Python有全局解释器锁,但开发者还是得设法避免线程之间发生数据争用。 ● 把未经互斥锁保护的数据开放给多个线程去同时修改,可能导致这份数据的结构遭到破坏。 ● 可以利用threading内置模块之中的Lock类确保程序中的固定关系不会在多线程环境下受到干扰。 ● 程序范围变大、需求变复杂之后,经常要用多条路径平行地处理任务。 ● fan-out与fan-in是最常见的两种并发协调(concurrency coordination)模式,前者用来生成一批新的并发单元,后者用来等待现有的并发单元全部完工。(分派–归集) ● Python提供了很多种实现fan-out与fan-in的方案。 但是: ● 每次都手工创建一批线程,是有很多缺点的,例如:创建并运行大量线程时的开销比较大,每条线程的内存占用量比较多,而且还必须采用Lock等机制来协调这些线程。 ● 线程本身并不会把执行过程中遇到的异常抛给启动线程或者等待该线程完工的那个人,所以这种异常很难调试。 1 通过线程池 ThreadPoolExecutor 用多线程做并发(提升有限,I/O密集型) Python有个内置模块叫作concurrent.futures,它提供了ThreadPoolExecutor类。 这个类结合了线程(Thread)方案与队列(Queue)方案的优势,可以用来平行地处理 I/O密集型操作。 ThreadPoolExecutor方案仍然有个很大的缺点,就是I/O并行能力不高,即便把max_workers设成100,也无法高效地应对那种有一万多个单元格,且每个单元格都要同时做I/O的情况。如果你面对的需求,没办法用异步方案解决,而是必须执行完才能往后走(例如文件I/O),那么ThreadPoolExecutor是个不错的选择。然而在许多情况下,其实还有并行能力更强的办法可以考虑。 利用ThreadPoolExecutor,我们只需要稍微调整一下代码,就能够并行地执行简单的I/O操作,这种方案省去了每次fan-out(分派)任务时启动线程的那些开销。 虽然ThreadPoolExecutor不像直接启动线程的方案那样,需要消耗大量内存,但它的I/O并行能力也是有限的。因为它能够使用的最大线程数需要提前通过max_workers参数指定。 2 通过线程池 ProcessPoolExecutor 用多进程做并发(I/O、CPU密集型) 从开发者这边来看,这个过程似乎很简单,但实际上,multiprocessing模块与 ProcessPoolExecutor类要做大量的工作才能实现出这样的并行效果。同样的效果,假如改用其他语言来做,那基本上只需要用一把锁或一项原子操作就能很好地协调多个线程,从而实现并行。但这在Python里面不行,所以我们才考虑通过ProcessPoolExecutor来实现。然而这样做的开销很大,因为它必须在上级进程与子进程之间做全套的序列化与反序列化处理。这个方案对那种孤立的而且数据利用度较高的任务来说,比较合适。 ● 所谓孤立(isolated),这里指每一部分任务都不需要跟程序里的其他部分共用状态信息。 ● 所谓数据利用度较高(high-leverage),这里指任务所使用的原始材料以及最终所给出的结果数据量都很小,因此上级进程与子进程之间只需要互传很少的信息就行,然而在把原始材料加工成最终产品的过程中,却需要做大量运算。刚才那个求最大公约数的任务就属于这样的例子,当然还有很多涉及其他数学算法的任务,也是如此。 如果你面对的计算任务不具备刚才那两项特征,那么使用ProcessPoolExecutor所引发的开销可能就会盖过因为并行而带来的好处。在这种情况下,我们可以考虑直接使用multiprocessing所提供的一些其他高级功能,例如共享内存(shared memory)、跨进程的锁(cross-process lock)、队列(queue)以及代理(proxy)等。但是,这些功能都相当复杂,即便两个Python线程之间所要共享的进程只有一条,也是要花很大工夫才能在内存空间里面将这些工具安排到位。假如需要共享的进程有很多条,而且还涉及socket,那么这种代码理解起来会更加困难。 总之,不要刚一上来,就立刻使用跟multiprocessing这个内置模块有关的机制,而是可以先试着用ThreadPoolExecutor来运行这种孤立且数据利用度较高的任务。把这套方案实现出来之后,再考虑向ProcessPoolExecutor方案迁移。如果ProcessPoolExecutor方案也无法满足要求,而且其他办法也全都试遍了,那么最后可以考虑直接使用multiprocessing模块里的高级功能来编写代码。 ● 把需要耗费大量CPU资源的计算任务改用C扩展模块来写,或许能够有效提高程序的运行速度,同时又让程序里的其他代码依然能够利用Python语言自身的特性。但是,这样做的开销比较大,而且容易引入bug。 ● Python自带的multiprocessing模块提供了许多强大的工具,让我们只需要耗费很少的精力,就可以把某些类型的任务平行地放在多个CPU核心上面处理。要想发挥出multiprocessing模块的优势,最好是通过concurrent.futures模块及其ProcessPoolExecutor类来编写代码,因为这样做比较简单。 ● 只有在其他方案全都无效的情况下,才可以考虑直接使用multiprocessing里面的高级功能(那些功能用起来相当复杂)。 3 使用Joblib并行运行Python代码(实际工程中比较好用) 对于大多数问题,并行计算确实可以提高计算速度。 随着PC计算能力的提高,我们可以通过在PC中运行并行代码来简单地提升计算速度。Joblib就是这样一个可以简单地将Python代码转换为并行计算模式的软件包,它可非常简单并行我们的程序,从而提高计算速度。 Joblib是一组用于在Python中提供轻量级流水线的工具。 它具有以下功能:
本文是前文 Spark预测算法端到端案例的姊妹篇,那一篇用的是普通的lightgbm做的订单量预测,后面随着门店的增加,运行效率不高。 本文主要采用了微软开源的 Microsoft Machine Learning for Apache Spark https://github.com/Azure/mmlspark 具体用法在上文 Lightgbm在spark实战介绍过,可参考学习。。。 1 门店分类 由于门店数据量巨大,所有门店数据在一起做模型训练,即使在Spark环境下也非常吃性能,所以这里先将门店进行聚类分类,然后在针对同一类型的门店一起训练。 根据 wfm_dw1.wfm_order_channel_half_hour 渠道半小时订单量数据 生成 7天每半小时特征数据 24*2*7 训练 数据:20201130--20210131 预测未来28天数据:20210201--20210228 tstart=“20201130” tend=“20210131” 原数据pdf +-------------------+--------+---------+---------+ |global_store_number|sale_day|half_hour|order_qty| +-------------------+--------+---------+---------+ | 28710|20201210| 20| 17.0| | 28710|20201230| 19| 15.0| | 28710|20201211| 13| 19.0| | 28710|20201203| 31| 1.0| | 28710|20201205| 11| 3.0| | 28710|20210111| 19| 14.0| | 28710|20210119| 16| 22.0| | 28710|20210130| 23| 5.0| | 28710|20210107| 16| 30.0| | 28710|20210117| 23| 8.
通常业务中对计算性能有要求时,通常不使用GPU跑tf,会使用xgboost/lightgbm on Spark来解决,既保证速度,准确率也能接受。 LightGBM是使用基于树的学习算法的梯度增强框架。它被设计为分布式且高效的,具有以下优点: 根据官网的介绍 * LigthGBM训练速度更快,效率更高。LightGBM比XGBoost快将近10倍。 * 降低内存使用率。内存占用率大约为XGBoost的1/6。 * 准确性有相应提升。 * 支持并行和GPU学习。 * 能够处理大规模数据。 大部分使用和分析LigthGBM的都是在python单机版本上。要在spark上使用LigthGBM,需要安装微软的MMLSpark包。 MMLSpark可以通过–packages安装。 spark * –packages参数 根据jar包的maven地址,使用该包,该参数不常用,因为公司内部的数据平台的集群不一定能联网。 如下示例: $ bin/spark-shell --packages com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1 http://maven.aliyun.com/nexus/content/groups/public/ –repositories 为该包的maven地址,建议给定,不写则使用默认源。 若依赖多个包,则中间以逗号分隔,类似–jars 默认下载的包位于当前用户根目录下的.ivy/jars文件夹中 应用场景:本地没有编译好的jar包,集群中服务需要该包的的时候,都是从给定的maven地址,直接下载 MMLSpark用法 1 .MMLSpark可以通–packages选项方便地安装在现有的Spark集群上,例如: spark-shell --packages com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1 pyspark --packages com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1 spark-submit --packages com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1 这也可以在其他Spark contexts中使用,例如,可以通过将MMLSpark添加到.aztk/spark-default.conf文件中来在AZTK中使用MMLSpark。 2 .要在Python(或Conda)安装上尝试MMLSpark,首先通过pip安装PySpark, pip安装PySpark。接下来,使用–package或在运行时添加包来获取scala源代码 import pyspark spark = pyspark.sql.SparkSession.builder.appName("MyApp").\ config("spark.jars.packages","com.microsoft.ml.spark:mmlspark_2.11:1.0.0-rc1").\ getOrCreate() import mmlspark 3.python建模使用 # 分类 from mmlspark.lightgbm import LightGBMClassifier model = LightGBMClassifier(learningRate=0.
最近做完了一个预测算法,数据量巨大,需要分布式环境跑数据,算法本身用的是LightGBM,没什么好说的,主要是怎么用Spark每个driver端跑模型。 1.基础数据 订单数据:主数据,包括渠道订单量和品类销量(day和hour) wfm_order_quantity_day wfm_order_quantity_half_hour wfm_sale_quantity_day wfm_sale_quantity_half_hour 渠道:instore、mop(手机下单,店里取)、mod(外卖,打包) (店内、啡快、专星送) 销量预测:品类+销量 单量预测:渠道+订单量 天气数据 促销数据 门店数据:商圈类型 城市级别 商品数据:商品品类 预测目标 算法中除了考虑内部因子(如历史销售、市场促销、周中周末等)以外,还需纳入外部因子(如天气、节假日、季节、偶发事件等) 每家店的每日平均半小时预测准确率应超过75%(1-MAPE) 每家店的每月平均每日预测准确率应超过92%(1-MAPE) 每家店的每日预测准确率应超过80%(1-WMAPE) 2.特征处理 时间类:年月日,星期,小时等 历史统计类:最大,最小,均值,方差,中位数等 促销类:促销类型,粒度等 节假日:工作日,节假日【传统节日,法定节日等】 3.模型加工 一般预测未来N天有三种方式,本项目预测28天: 循环发 每次预测一天 带入历史特征 滚动预测下一天 性能低下 gap 1-28 28个模型 效率低 分段 直接预测 比较合理 分7 14 21 28四个shift, 组成28天预测模型 预测1-7 采用 shift 7 预测8-14 采用 shift 14 预测15-21 采用 shift 21
本文主要是提高xgboost/lightgbm/catboost等模型在参数调优方面的工程化实现以及在stacking模型融合使用 * xgboost调优 * lightgbm调优 * catboost调优 * stacking融合 * 网格搜索、随机搜索和贝叶斯优化 * LightGBM各种操作 0 工具类函数 from __future__ import print_function from __future__ import division import numpy as np import pandas as pd import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from sklearn.metrics import roc_curve, auc from sklearn.metrics import confusion_matrix from sklearn.metrics import f1_score from itertools import chain import time import os def timer(func): """计时器""" def wrapper(*args, **kwargs): t1 = time.time() func(*args, **kwargs) t2 = time.
[项目总结提炼] 前面我们在做数据工程化过程中会大量用到数据的读写操作,现总结如下!!! 主要有以下几个文件: config.ini 配置文件 func_forecast.sh 工程执行文件 mysql-connector-java-8.0.11.jar MySQL连接jdbc驱动 predict_pred.py 执行主代码 utils.py 工具项代码 1 config.ini 主要定义参数值 [spark] executor_memory = 2g driver_memory = 2g sql_execution_arrow_enabled = true executor_cores = 2 executor_instances = 2 jar = /root/spark-rdbms/mysql-connector-java-8.0.11.jar [mysql] host = 11.23.32.16 port = 3306 db = test user = test password = 123456 [postgresql] host = 11.23.32.16 port = 3306 db = test user = test password = 123456 [dbms_parameters] mysql_conn_info_passwd @@@ =4bI=prodha mysql_conn_info_database = test_db 2 utils.
[原创] 数据工程化案例介绍 好久没写博客了😃😃😃😃😃😃,最近做完了一个偏数据工程的项目,系统的使用了大数据相关组件,学习了Hadoop生态圈技术以及数据仓库相关知识。下面将一些体会写下。。。 1 项目背景和业务特点 XXX医药业务场景:以终端消费者为中心的服务,以门店、连锁加盟、批发模式触达,当前核心竞争力是品牌加盟和供应链采购能力。随着加盟业务快速成长,致力于成为中国最大的零售药房加盟商,需要配套成长的供应链物流能力和信息化建设。“高库存、高退货、高效期” 等环节精细化运营的薄弱是主要原因,具体表现在以下几点: (1) 门店补货靠经验,造成了“高退货”; (2) 加盟店和批发商等物流能力尚未触达、物流信息尚未线上化; (3) 与供应商信息沟通均为线下,补货频次较为传统; (4) 采购计划依赖采购员个人经验采用公式计算,未考虑复杂因素; 项目目标是构建以智能补货为智慧大脑的需求驱动的全局、动态、平衡的数字化供应链运营体系,提供安全、高效、精准的供应链能力。主要包括以下部分: (1) 数据清洗 (2) 特种工程 (3) 模型训练 (4) 模型融合 (5) 数据工程化 其中前4个部分是机器学习的常见方法和步骤,考虑到线上生产环境要能正常执行,第5部分数据工程化部分启动非常重要的地位,下面对这个部分进行详细的叙述。 2 数据工程化流程架构 这里我们的数据源主要Oracle业务数据以及一些客户提供的人工数据,利用sqoop每日凌晨00:40定时同步数据至Hadoop集群src层。按照经典的Kappa数据仓库分层架构分为:src->ods->dw1->dw2->app. 与传统的数仓建模不同的是我们主要的目的是利用机器学习方法进行预测补货,数据仓库ods/dw1都是数据清洗和复杂业务逻辑处理,dw2是特征工程处理生成宽表用于训练模型。在数据清洗的过程中会有一些指标用于KPI展示以及app模型预测补货结果我们会同步至MySQL,这些都是作为数据应用服务。 整个数据工程基于Hadoop生态圈技术为载体,数据存储主要是HDFS,数据计算主要是Hive/Spark,元数据管理是Apache Atlas,数据质量分析用的是Apache Griffin,数据任务流调度系统用的是Azkaban,数据OLAP数据库是Presto,数据分析可视化Dashboard用的是Superset。这些大数据组件采用Cloudera Manager(CM)进行统一安装配置,为了保证服务的高可用(HA)采用Zookeeper进行资源调度和管理。 3 数据工程生产环境部署 3.1 可配置项 配置项对于数据工程的重要性不言而喻,可灵活调整部署代码,方便控制管理 ├─conf │ │ config.ini 所有可配置参数 │ │ env_name.conf 生产环境和测试环境的标识符 │ │ ini.sh 读写配置文件的函数 例如: [replenish_parameters] start_date=2020-07-18 end_date=2020-08-31 rolling_day=7 rolling_num=50 [env_parameters] data_base_dir=/data base_dir_jar=/root 这样我们对于每张表的计算添加统一的配置项
[原创] 在支持向量机(以下简称SVM)的核函数中,高斯核(以下简称RBF)是最常用的,从理论上讲,RBF一定不比线性核函数差,但是在实际应用中,却面临着几个重要的超参数的调优问题。如果调的不好,可能比线性核函数还要差。所以我们实际应用中,能用线性核函数得到较好效果的都会选择线性核函数。如果线性核不好,我们就需要使用RBF,在享受RBF对非线性数据的良好分类效果前,我们需要对主要的超参数进行选取。本文我们就对scikit-learn中 SVM RBF的调参做一个小结。 1 SVM RBF 主要超参数概述 如果是SVM分类模型,这两个超参数分别是惩罚系数和RBF核函数的系数。当然如果是nu-SVC的话,惩罚系数C代替为分类错误率上限nu, 由于惩罚系数C和分类错误率上限nu起的作用等价,因此本文只讨论带惩罚系数C的分类SVM** 1.1 SVM分类模型 ###(1) 惩罚系数 惩罚系数C即上一篇里讲到的松弛变量ξ的系数。它在优化函数里主要是平衡支持向量的复杂度和误分类率这两者之间的关系,可以理解为正则化系数。 当惩罚系数C比较大时,我们的损失函数也会越大,这意味着我们不愿意放弃比较远的离群点。这样我们会有更加多的支持向量,也就是说支持向量和超平面的模型也会变得越复杂,也容易过拟合。 当惩罚系数C比较小时,意味我们不想理那些离群点,会选择较少的样本来做支持向量,最终的支持向量和超平面的模型也会简单。scikit-learn中默认值是1。 (2)RBF核函数的系数 另一个超参数是RBF核函数的参数。回忆下RBF 核函数 γ主要定义了单个样本对整个分类超平面的影响。 当γ比较小时,单个样本对整个分类超平面的影响比较小,不容易被选择为支持向量 当γ比较大时,单个样本对整个分类超平面的影响比较大,更容易被选择为支持向量**,或者说整个模型的支持向量也会多。scikit-learn中默认值是1/n_features** (3)惩罚系数和RBF核函数的系数 如果把惩罚系数和RBF核函数的系数一起看: 当C比较大、 γ比较大时,会有更多的支持向量,模型会比较复杂,较容易过拟合 当C比较小、γ比较小时,模型会变得简单,支持向量的个数会少 1.2 SVM回归模型 SVM回归模型的RBF核比分类模型要复杂一点,因为此时除了惩罚系数C和RBF核函数的系数γ之外,还多了一个损失距离度量ϵ。如果是nu-SVR的话,损失距离度量ϵ代替为分类错误率上限nu,由于损失距离度量ϵ和分类错误率上限nu起的作用等价,因此本文只讨论带距离度量ϵ的回归SVM。 对于惩罚系数C和RBF核函数的系数γ,回归模型和分类模型的作用基本相同。 对于损失距离度量ϵ,它决定了样本点到超平面的距离损失.当ϵ比较大时,损失较小,更多的点在损失距离范围之内,模型较简单;当ϵ比较小时,损失函数会较大,模型也会变得复杂;scikit-learn中默认值是0.1 惩罚系数C、RBF核函数的系数γ和损失距离度量ϵ一起看 当C比较大、 γ比较大、ϵ比较小时,会有更多的支持向量,模型会比较复杂,容易过拟合一些; 当C比较小、γ比较小、ϵ比较大时**,模型会变得简单,支持向量的个数会少 2 SVM RBF 主要调参方法 对于SVM的RBF核,主要的调参方法都是交叉验证。具体在scikit-learn中,主要是使用网格搜索,即GridSearchCV类。 当然也可以使用cross_val_score类来调参,但是个人觉得没有GridSearchCV方便。本文只讨论用GridSearchCV**来进行SVM的RBF核的调参。 将GridSearchCV类用于SVM RBF调参时要注意的参数有: