第一部分 Spark介绍
第二部分 Spark的使用基础
第三部分 Spark工具箱
第四部分 使用不同的数据类型
第五部分 高级分析和机器学习
第六部分 MLlib应用
第七部分 图分析
第八部分 深度学习
我们已经描述了一些我们将要遇到的核心部分,现在让我们创建一个简单的pipeline 来演示每个部分。我们使用一个小的合成数据集来帮助阐明我们的观点。 首先来读入数据,并查看一个样例。
%scala
var df = spark.read.json("/mnt/defg/simple-ml")
df.orderBy("value2").show()
%python
df = spark.read.json("/mnt/defg/simple-ml")
df.orderBy("value2").show()
>>>
+-----+----+------+------------------+
|color| lab|value1| value2|
+-----+----+------+------------------+
|green|good| 1|14.386294994851129|
|green| bad| 16|14.386294994851129|
| blue| bad| 8|14.386294994851129|
...
| red| bad| 16|14.386294994851129|
|green|good| 12|14.386294994851129|
+-----+----+------+------------------+
数据集包含 一个有两个可选值的分类表现,一个分类值(color),和两个数值。这些数据是合成的,何时会使用这样的数据的一个例子是预测一家公司的顾客健康。lab代表他们目前的健康状态,color代表 电话确认他们真实健康前的 等级,另外两个数表示某些使用度量。你应该立即认识到这是一个分类任务,我们希望根据输入来预测二分的输出变量。
包含LIBSVM在内的监督学习 有一些特定的数据格式。这些格式 有真实值标签 和稀疏的输入数据。Spark 可以非常容易地读写这些格式的数据。
%scala
val libsvmData = spark.read.format("libsvm")
.load("/mnt/defg/sample_libsvm_data.txt")
%python
libsvmData = spark.read.format("libsvm")\
.load("/mnt/defg/sample_libsvm_data.txt")
下面开始介绍流程中的各环节的工具组件。
Transformers
正如我们所提及的,transformers以这样或那样的方式帮助我们操作当前列。这些列,在机器学习术语中,代表特性(用来输入模型),在我们的具体案例中,一个标签代表正确的额输出。Tranformers可以减少特性、增加特性、操纵当前的特性 或 只是单纯正确地格式化数据。通常,transformers向DataFrame上增加新的列。
一个要求是 当使用MLlib时,所有的机器学习算法输入必须由Double类型和Vector[Double]类型的特性组成。我们当前的数据不符合这个要求,因此我们需要转换为正确的格式。
为了实现这点,我们使用RFormula函数。这是一个针对 特性机器学习模型的 说明性语言,一旦你理解了它的语法,使用起来非常简单。目前RFormule支持R操作符的一个有限的子集,这一子集在实践中对于简单模型非常好用。基本的操作符为:
~ 分离目标 和 条款
+ 合并条款(列),+0 代表移除截距(这意味着直线的y截距是0)
- 移除条款(列),-1 和 +0 含义相同
: interaction(数值或二分类值的乘法)
. 除了目标/从属变量 以外 的所有列
为了用这些语法 指定我们的transformations,需要引入相关类。
%scala
import org.apache.spark.ml.feature.RFormula
%python
from pyspark.ml.feature import RFormula
接下来 我们定义这个公式。在本例中,我们会使用所有可用的变量(用 .),并在value1 和color之间,value2和color之间指定一个interaction。
%scala
val supervised = new RFormula()
.setFormula("lab ~ . + color:value1 + color:value2")
%python
supervised = RFormula(formula="lab ~ . + color:value1 + color:value2")
此时,我们已经声明性地指定了 我们想如何将原数据 转换为 希望用来训练模型的数据。上面的transformations是一个特殊类型的transformer(称为estimator),其必须fit在输入数据上。不是所有的transformer都要求fit在输入数据上,但RFormula需要这么做是因为它会自动为我们操作分类变量,它需要弄清楚哪些列是分类数据,哪些不是。因此,我们必须调用 fit 方法。一旦使用fit,它会返回一个 transformer的“trained”版本,我们可以用其来转换数据。
现在我们介绍这些细节,继续准备我们的DataFrame。
%scala
val fittedRF = supervised.fit(df)
val preparedDF = fittedRF.transform(df)
%python
fittedRF = supervised.fit(df)
preparedDF = fittedRF.transform(df)
preparedDF.show()
>>>
+-----+----+------+------------------+--------------------+-----+
|color| lab|value1| value2| features|label|
+-----+----+------+------------------+--------------------+-----+
|green|good| 1|14.386294994851129|(10,[1,2,3,5,8],[...| 1.0|
| blue| bad| 8|14.386294994851129|(10,[2,3,6,9],[8....| 0.0|
...
| red| bad| 1| 38.97187133755819|(10,[0,2,3,4,7],[...| 0.0|
| red| bad| 2|14.386294994851129|(10,[0,2,3,4,7],[...| 0.0|
+-----+----+------+------------------+--------------------+-----+
在输出中,我们可以看到 转换后的结构,features列 包含我们之前的原始数据。这后面的转换其实非常简单。RFormula在调用fit的时候检查我们的数据,并根据指定列转换数据 输出一个对象。“trained”的transformer 类型中 总是包含 单词 Model。当我们使用这个transformer时,你会注意到Spark自动地将类别变量转换为Doubles,所以我们可以将其输入给机器学习模型。它通过多次调用 StringIndexer, Interaction, 和 VectorAssembler transformers来做到这一点,这些transformers 会在下一节进行介绍。
接下来我们对对象 调用tranform,为了将我我们的输入数据 转换为想要的输出数据。
在为模型准备数据之后,就接近 之前所描述的高级分析工作流的 最后阶段了。我们(预)处理了我们的数据,所以数据是干净的,并在这个过程中添加了一些特征。现在是时候训练我们的模型(或一组模型)了。为此,我们需要准备用于评估的测试数据集。
拥有一个测试集可能是你能 为保证所训练的模型可以在现实环境中(可靠地)使用的 而做的 最重要的事情。不要创建一个 典型的测试集 或是使用你的测试集 来进行超参数调优。不要跳过创建一个测试集这一步,这是知道你的模型是否好用的 必要条件。
%scala
val Array(train, test) = preparedDF.randomSplit(Array(0.7, 0.3))
%python
train, test = preparedDF.randomSplit([0.7, 0.3])
Estimators
现在我们已经将数据转换为 正确的格式,并创建了一些有价值的特征。是时候来拟合模型了。本例中我们使用逻辑回归。为创建分类器,我们实例化一个logsticregression 实例,一种分类方法,使用默认的配置或超参数(对更熟悉机器学习的人来说)。接着 设置标签列和特征列。我们设置的值 实际上是Spark MLlib中DataFrame API 的所有 Estimators 默认的标签,在后面的章节我们会忽略它们。
%scala
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
%python
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
labelCol="label",
featuresCol="features")
在训练模型之前,最好进行参数检查,这也是为每个特定模型更新可用选项的好办法。
%scala
println(lr.explainParams())
%python
print lr.explainParams()
一旦我们实例化了模型,就可以训练它了。者通过返回一个LogisticRegressionModel
的 fit
方法来完成。
%scala
val fittedLR = lr.fit(train)
%python
fittedLR = lr.fit(train)
这会启动一个Spark job,执行ML模型的拟合。
现在我们已经训练了模型,我么你可以用它来进行预测。逻辑上,这代表一个 从特征到标签的转换。我们用transform方法来做预测。例如,可以 transform 训练数据集来查看 模型分配给训练数据的标签是什么,以及它们与真是的输出相比如何。这只是另外一个可以操作的DataFrame。
fittedLR.transform(train).select("label", "prediction").show()
>>>
+-----+----------+
|label|prediction|
+-----+----------+
| 0.0| 0.0|
| 0.0| 0.0|
...
| 0.0| 0.0|
| 0.0| 0.0|
+-----+----------+
下一步是手动地评价该模型,并计算 如正确率和失败率这样的性能标准。然后,我们可能会回过头来,尝试一组不同的参数,看看它们的性能是否更好。这个过程,虽然有用,但实际上非常单调乏味。Spark可以避免这些,通过 指定工作负载为 包含所有转换 和调优超参数的工作pipeling。
什么是超参数?
超参数是模型的初始化配置。他们是影响其他参数的参数。逻辑回归是一个简单的模型,不含有任何超参数。然而,我们可以选择在 RFormula中 考虑不同的交互变量。这样做,我们可以有效的调整 影响数据预处理的超参数。Pipelines,如你马上会看到的,允许我们以不同方式来配置 从原始数据 到模型的全部数据操作流程,允许我们将超参数 调优为 我们所尝试的最佳参数。
Pipeline
如你在前面可能已经注意到的,如果你执行很多的 transformers,编写所有步骤 并跟踪DataFrame 是非常乏味的。 这就是为什么Spark包含Pipeline的概念。一个pipeline允许你设置一个关于相关transformations的数据流,以一个estimator结束,其会自动地根据你的规定说明来调优,并为生产用例 准备好一个调优的模型。下面的图表阐述了这个过程。

一个重要的细节是 transformers 和模型的 实例 不能跨 pipelines 或不同的模型 重复使用。 在创建另一个pipeline之前总是创建一个新的模型实例。
为了确保 没有过拟合,我们会创建一个 holdout测试集 并 基于一个验证集来调优超参数。注意,这都是我们的原始数据集。(Transformers一节中 是从 整理过的数据 中分割训
练测试集)
%scala
val Array(train, test) = df.randomSplit(Array(0.7, 0.3))
%python
train, test = df.randomSplit([0.7, 0.3])
在这个例子中,我们只选用RFormula,一个常见的模式是将许多不同的transformers与RFormula一起 建立一个pipeline(用于简单的特性)。我们在接下来的章节中介绍这些与处理技术,现在只需要记住远不止两个阶段。本例中我们不会指定一个formula。
%scala
val rForm = new RFormula()
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("features")
%python
rForm = RFormula()
lr = LogisticRegression()\
.setLabelCol("label")\
.setFeaturesCol("features")
现在不需要手动使用转换和调整模型了。我们只需要将他们在整个pipeline中分阶段进行。这使他们 成为 逻辑transformations,或一个指定给Spark在pipeline中运行的一连串命令。
import org.apache.spark.ml.Pipeline
val stages = Array(rForm, lr)
val pipeline = new Pipeline().setStages(stages)
%python
from pyspark.ml import Pipeline
stages = [rForm, lr]
pipeline = Pipeline().setStages(stages)
Evaluators
我们建立了一套pipeline。下一步就是评估pipeline的性能。Spark通过设置 由你指定的所有参数组合 构成的参数网络来做到这一点。 你应该马上注意到 在下面的代码中,甚至Rformula 也在调优特定的参数。 在一个pipeline中,我们不止可以调整模型的超参数,我们甚至可以调整 transformer的特性(回归、分类?)。
%scala
import org.apache.spark.ml.tuning.ParamGridBuilder
val params = new ParamGridBuilder()
.addGrid(rForm.formula, Array(
"lab ~ . + color:value1",
"lab ~ . + color:value1 + color:value2"))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.addGrid(lr.regParam, Array(0.1, 2.0))
.build()
%python
from pyspark.ml.tuning import ParamGridBuilder
params = ParamGridBuilder()\
.addGrid(rForm.formula, [
"lab ~ . + color:value1",
"lab ~ . + color:value1 + color:value2"])\
.addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0])\
.addGrid(lr.regParam, [0.1, 2.0])\
.build()
在当前网络中,有三个超参数 会偏离默认值。
两个供 Rformula 选择。
三个供 弹性网络参数 选择。
两个供 正则化参数 选择。
这给我们一个共12个不用的参数组合,这意味着我们会训练12个不同版本的逻辑回归。但我们不想在本章节介绍太多细节, 弹性网络参数 和 正则化参数 在 分类章节介绍。
随着网格的建立,现在可以指定我们的evaluation。有一些分类和回归的evalutors,我们在后续章节中进行介绍。
在本例中,我们会使用BinaryClassificationEvaluator。这个evaluator允许我们自动地 根据我们制定的标准优化模型训练。 这里我们会指定 areaUnderROC,其是受试者工作特性(ROC)下的总面积,是我们在分类章节中介绍过得一种很常见的分类性能度量。
现在我们有了一个pipeline,其制订了我们的数据应该如何转换。让我们进入下一个层级,通过在我们的逻辑回归模型中尝试不同的超参数,自动地 去 执行模型选型。我们通过指定一个参数网格,一个分类度量,及一个Evaluator。一个evaluator让我们可以根据一些标准(在evaluator中指定)自动地优化模型训练,然而为了利用这一点,我们需要一种简单的方法来尝试不同的模型参数,以查看哪些参数执行得最好。我们在每个任务的章节中涵盖了所有不同的评估指标。
%scala
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setRawPredictionCol("prediction")
.setLabelCol("label")
%python
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()\
.setMetricName("areaUnderROC")\
.setRawPredictionCol("prediction")\
.setLabelCol("label")
在验证集上匹配超参数,而不是在测试集上,是机器学习中最好的实践。原因是这能避免过拟合。因此我们不能使用我们的holdout 测试集(我们之前创建的)来调优这些参数。幸运地是Spark提供了两个选项 来自动地执行超参数调优。我们可以使用TrainValidationSplit——简单地将我们的数据分割为两部分,或CrossValidator——通过将数据集 随机地分割成k个不重叠的分区 来执行K-fold交叉验证。
%scala
import org.apache.spark.ml.tuning.TrainValidationSplit
val tvs = new TrainValidationSplit()
.setTrainRatio(0.75) // also the default.
.setEstimatorParamMaps(params)
.setEstimator(pipeline)
.setEvaluator(evaluator)
%python
from pyspark.ml.tuning import TrainValidationSplit
tvs = TrainValidationSplit()\
.setTrainRatio(0.75)\
.setEstimatorParamMaps(params)\
.setEstimator(pipeline)\
.setEvaluator(evaluator)
现在我们可以使用整个pipeline。我们会在验证集上测试每个版本的模型。你会注意到tvsFitted的类型是TrainValidationSplitModel。每当我们拟合了一个给定模型,其输出一个“model”类型。
%scala
val tvsFitted = tvs.fit(train)
%python
tvsFitted = tvs.fit(train)
接下自然而然的要评估其在测试集上表现的如何。
evaluator.evaluate(tvsFitted.transform(test)) // 0.9166666666666667
我们还可以看到一个特定模型的训练总结。为此,我们从pipeline中将其提取出来,将其转化为正确的类型并打印出来。可用的度量标准 依赖于下面章节中介绍的模型。唯一需要 理解的关键事情是 非拟合estimator 和estimator有相同的名字,如LogisticRegression。
import org.apache.spark.ml.PipelineModel
import org.apache.spark.ml.classification.LogisticRegressionModel
val trainedPipeline = tvsFitted.bestModel.asInstanceOf[PipelineModel]
val TrainedLR = trainedPipeline.stages(1)
.asInstanceOf[LogisticRegressionModel]
val summaryLR = TrainedLR.summary
SummaryLR.objectiveHistory
objective history 展示我们的算法 在每个训练迭代中是如何执行。这是很有帮助的,因为我们可以看到我们的算法正在向最佳模型迈进。 通常在开始时会有较大的跳跃,但后面会越来越小,值之间只有很小的变化。
Model的持久化和应用
现在我们已经训练了模型,我们可以将其保存到磁盘,以便稍后以在线预测的方式使用它。
tvsFitted.write.overwrite().save("/tmp/modelLocation")
现在我们写出了模型,为了进行预测,我们可以再将其家载入一个程序(可能在不同的存储位置)中。为了做到这一点,我们需要使用模型的 伴生对象,调优类或最初使用的transformer。本例中,我们使用输出一个TrainValidationSplitModel的TrainValidationSplit
。我们将使用“model”版本来加载持久化的模型。如果我们之前使用的是CrossValidator
,我们就必须将持久化模型读入为CrossValidatorModel
,如果我们想要手动使用LogisticRegression
,我们则需要使用LogisticRegressionModel
。
%scala
import org.apache.spark.ml.tuning.TrainValidationSplitModel
val model = TrainValidationSplitModel.load("/tmp/modelLocation")
model.transform(test)
部署模式
当谈到Spark时,有几种不同的部署模式可以将机器学习模型放到Spark的生产环境中。下面的图解 说明了一般的工作流程。

1、离线训练你的机器学习模型,然后将结果存入一个数据库(通常是 键值对 存储)。这对 如推荐一类的事务 运行很好,但对 分类或回归一类的事务和表现较差,这一类事务中你不能直接通过查询给定用户的一个值,而是需要计算得到结果。
2、离线训练你的机器学习模型,将模型持久化到磁盘中,工其他服务使用。这不是一个低延时的解决方案,启动一个spark工作的总开销会很高,及时你没有在集群上运行。此外,这不能很好地并行化,所以你可能必须在多个模型副本前 放置一个负载均衡器,并自己构建一些REST API。对这个问题有一些有趣的潜在解决方案,但并没有准备好产品化。
3、手动地(或通过一些其他软件)将你的分布式模型装换为一个可以在单机上运行的更快的模型。这在没有很多原始数据要在Spark上进行操作,并不需要长时间维持运行时表现很好。同样也有一些针对这种操作的解决方案,特同样没有产品化。 在上面的插图中没有介绍这些,因为其中需要手动操作。
4、在线地训练你的机器学习模型,并在线地使用它,当结合如流时 是可能的,但这非常复杂。随着Strctured Streaming开发的继续,这一领域会继续成熟。
这些都特定于Spark而不是用于很多单机框架,单机框架不能在很大的数据规模上进行训练,但可以更快地做出响应。
这只是一些选项,还有很多执行模型部署和管理的潜在方法。这是一个很可能迅速改变并取得进展的重要领域。
网友评论