# Spark MLib
在Spark下进行机器学习,必然无法离开其提供的MLlib框架,所以接下来我们将以本框架为基础进行实际的讲解。首先我们需要了解其中最基本的结构类型,即转换器、估计器、评估器和流水线。
```mermaid
graph LR
A[转换器] --> B(估计器)
B --> C(评估器)
C --> D[模型]
```
# 一、基础使用
接下来我们将以一个简单的例子为基础整体介绍在Spark下进行机器学习的使用方式,便于读者大体熟悉完整的流程节点。当然在这其中对于部分不了解的情况下可以等在后续详细学习的过程中进行补充即可。
> [点击此处查看代码示例](src/main/scala/MachineLearnWithSpark.scala)
## 1. 特征工程
这部分相关知识可以参考本人编写的[人工智能专题]()的开源教程,其中对该部分进行详细的说明,下面我们将就框架提供的`RFormula`进行具体的实战操作(这里熟悉R语言的可能对此比较熟悉,本身就是借鉴了R语言,但是仅实现了其中的一个子集),对于我们需要进行特征化的数据首先我们需要定义对应的线性模型公式,具体如下。
```java
Dataset<Row> df = session.read().json("sparkdemo/data/simple-ml");
RFormula supervised = new RFormula().setFormula("lab ~ . + color: value1 + color: value2");
```
当然仅仅通过上述的方式还不能实现对数据的特征化,我们还需要通过数据对其进行训练,从而得到我们所需的转换器,为此我们需要使用其中的`fit`方法进行转换。
```java
RFormulaModel model = supervised.fit(df);
```
完成转换器的训练后我们就可以利用其进行实际的转换操作,从而生成特征`features`与标签`label`列,当然读者也可以通过`supervised.setLabelCol`设置标签列名,`supervised.setFeaturesCol`设置特征列名。对于监督学习都需要将数据分为样本数据与测试数据,为此我们需要通过以下方式将数据拆分。
```java
Dataset<Row>[] data = preparedDF.randomSplit(new double[]{0.7, 0.3});
```
## 2. 模型训练
在Spark MLib中为估计器,这里我们将采用逻辑回归的算法做为演示,提供一个分类算法模型的训练,首先我们实例化我们需要的模型类,通过其提供的方式对将训练数据传入其中进行模型的训练。
```java
LogisticRegression lr = new LogisticRegression();
LogisticRegressionModel lrModel = lr.fit(data[0]);
lrModel.transform(data[1]).select("label1", "prediction").show();
```
如果在对数据进行特征工程的时候将标签以及特征列的名称进行了修改,那么我们也需要通过`lr.setLabelCol`以及`lr.setFeaturesCol`进行同步修改调整。同时框架也提供了`explainParams`方法打印模型中可供调整的参数。
## 3. 流水线
对于机器学习,后期工作基本就是对各种参数的调优,为此Spark提供了友好的流水线,并基于其本平台分布式计算集群的能力助力我们缩短对不同参数模型的训练与评估,从而提供最佳的参数模型供我们使用,下面我们将一步一步介绍如何使用其提供的该特性。首先我们定义工作流中涉及到的阶段步骤,具体如下所示。
```java
Dataset<Row> df = session.read().json("sparkdemo/data/simple-ml.json");
Dataset<Row>[] data = df.randomSplit(new double[] {0.7, 0.3});
RFormula rForm = new RFormula();
LogisticRegression lr = new LogisticRegression();
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { rForm, lr });
```
上述完成工作流水线各阶段的任务后,接下来我们就需要指定各阶段的参数列表,从而便于Spark形成不同的组合进行模型训练。
```java
Seq<String> formulaParam = JavaConverters.asScalaIteratorConverter(Arrays.asList("lab ~ . + color:value1", "lab ~ . + color:value1 + color:value2").iterator()).asScala().toSeq();
ParamMap[] params = new ParamGridBuilder()
.addGrid(rForm.formula(), formulaParam)
.addGrid(lr.elasticNetParam(), new double[]{0.0, 0.5, 1.0})
.addGrid(lr.regParam(), new double[]{0.1, 2.0})
.build();
```
有了以上其实我们就可以单纯的进行模型训练了,但是这样训练除的模型并无法评估出最好的一个模型。我们需要指定一个评估器用来评估实际效果是否符合最佳。这里我们主要采用了`BinaryClassificationEvaluator`类。
```java
BinaryClassificationEvaluator evaluator = new BinaryClassificationEvaluator()
.setMetricName("areaUnderROC")
.setRawPredictionCol("prediction")
.setLabelCol("label");
```
最后我们需要能够自动调整超参数,并自动分配数据集的方式将上述的各部分组成从而形成最终有效的模型。
```java
TrainValidationSplit tvs = new TrainValidationSplit()
.setTrainRatio(0.75)
.setEstimatorParamMaps(params)
.setEstimator(pipeline)
.setEvaluator(evaluator);
```
而具体的使用与之前逻辑回归的方式如出一辙。
```java
TrainValidationSplitModel model = tvs.fit(data[0]);
System.out.println(evaluator.evaluate(model.transform(data[1])));
```
如果读者需要将该模型进行持久化可以采用`model.write().overwrite().save("sparkdemo/data/model");`该方式进行实际的持久化,当然读取时需要采用与写入一致的类,否则将无法正确读取。
# 二、特征工程
参考[机器学习教程](../ReadMe.md)中对应章节的内容可弥补关于各类算法的基础知识,接下来我们将仅从基于Spark的实战角度出发进行列举常用的方式对特定的数据预处理。
## 1. 通用
下面我们将介绍较通用的三种的针对数据进行处理的方式,其中一种上述的教程已经使用了,这里将仅做为介绍进行概述。
> [点击此处查看代码示例](src/main/scala/FeatureEngineering.scala)
### 1) RFormula
其主要参考了基于R语言的formula设计思路,当然其中仅仅支持有限有限的操作符。并且其中对于字符串的处理是采用独热编码(One-hot)的方式,具体的使用方式如下。
```scala
val supervised = new RFormula().setFormula("lab ~ . + color:value1 + color:value2")
supervised.fit(df).transform(df).show()
```
支持的操作符如下:
> `~ 分割标签与特征`
> `+ 将两个特征相加,+ 0代表除去截距`
> `- 减去一个特征, - 1代表除去截距`
> `: 将多个特征相乘变成一个特征`
> `. 选取所有特征`
如果读者不了解其中表达式的作用,接下来我们将举一个例子,假设a,b为2个特征,y是应变量。利用上述的公式我们将可以写出如下的语句。
`y ~ a + b`: 对应到线性模型的公式为: `y = w0 + w1 * a + w2 * b`,其中w0为截距
`y ~ a + b + a:b - 1`: 对应的线性模型的公式为: `y = w1 * a + w2 * b + w3 * a * b`,由于-1的存在所以没有截距
如果读者还是不能理解我们可以通过具体的例子来进行介绍,首先我们准备以下相关数据。
| lab | value1 | value2 |
| --- | --- | --- |
| good | 13 | 2.1 |
| bad | 9 | 8.2 |
将上面的数据采用公式`lab ~ value1 + value2`进行处理后,结果数据将如下所示。
| lab | value1 | value2 | features | label |
| --- | --- | --- | --- | --- |
| good | 13 | 2.1 | [13.0, 2.1] | 1 |
| bad | 9 | 8.2 | [9.0, 8.2] | 0 |
上述我们可以看到针对字符串类型的标签采用了字符串索引的方式进行映射,至此关于`RFormula`介绍到此为止。
### 2) SQLTransformer
即利用Spark提供的众多关键字对数据通过SQL语句的方式进行处理,这里需要注意的是不要直接
使用标名,如果需要引用本表则需要通过关键字`__THIS__`来引用。
```scala
val basicTrans = new SQLTransformer()
.setStatement("""
SELECT sum(value1), count(*), color
FROM __THIS__