SparkML机器学习之特征工程(二)特征转化(Binarizer、StandardScaler、MaxAbsScaler、Normalizer、N-gram、Tokenizer等)

  1. 云栖社区>
  2. 博客>
  3. 正文

SparkML机器学习之特征工程(二)特征转化(Binarizer、StandardScaler、MaxAbsScaler、Normalizer、N-gram、Tokenizer等)

公众号胖滚猪学编程 2018-04-08 12:17:59 浏览7755
展开阅读全文

特征转化

为什么要转化数据呢,就是要让它成为有效的特征,因为原始数据是很多脏数据无用数据的。常用的方法是标准化,归一化,特征的离散化等等。比如我输入的数据是句子,我得把它切分为一个个单词进行分析,这就是一种转化。

连续型数据处理之二值化:Binarizer

假设淘宝现在有个需求,我得根据年龄来进行物品推荐,把50以上的人分为老年,50以下分为非老年人,那么我们根据二值化可以很简单的把50以上的定为1,50以下的定为0。这样就方便我们后续的推荐了。Binarizer就是根据阈值进行二值化,大于阈值的为1.0,小于等于阈值的为0.0

package ml.test
import org.apache.spark.ml.feature.Binarizer
import org.apache.spark.sql.SparkSession
/**
  * Created by liuyanling on 2018/3/19   
  */
object BinarizerDemo {
  def main(args: Array[String]): Unit = {
    var spark = SparkSession.builder().appName("BinarizerDemo").master("local[2]").getOrCreate();
    val array = Array((1,34.0),(2,56.0),(3,58.0),(4,23.0))
    //将数组转为DataFrame
    val df = spark.createDataFrame(array).toDF("id","age")
    //初始化Binarizer对象并进行设定:setThreshold是设置我们的阈值,InputCol是设置需要进行二值化的输入列,setOutputCol设置输出列
    val binarizer = new Binarizer().setThreshold(50.0).setInputCol("age").setOutputCol("binarized_feature")
    //transform方法将DataFrame二值化。
    val binarizerdf = binarizer.transform(df)
    //show是用于展示结果
    binarizerdf.show
  }
}

输出结果:

+---+----+-----------------+
| id| age|binarized_feature|
+---+----+-----------------+
|  1|34.0|              0.0|
|  2|56.0|              1.0|
|  3|58.0|              1.0|
|  4|23.0|              0.0|
+---+----+-----------------+

连续型数据处理之给定边界离散化:Bucketizer

现在淘宝的需求变了,他们觉得把人分为50以上和50以下太不精准了,应该分为20岁以下,20-30岁,30-40岁,36-50岁,50以上,那么就得用到数值离散化的处理方法了。离散化就是把特征进行适当的离散处理,比如上面所说的年龄是个连续的特征,但是我把它分为不同的年龄阶段就是把它离散化了,这样更利于我们分析用户行为进行精准推荐。Bucketizer能方便的将一堆数据分成不同的区间。

object BucketizerDemo {
  def main(args: Array[String]): Unit = {
    var spark = SparkSession.builder().appName("BucketizerDemo").master("local[2]").getOrCreate();
    val array = Array((1,13.0),(2,16.0),(3,23.0),(4,35.0),(5,56.0),(6,44.0))
    //将数组转为DataFrame
    val df = spark.createDataFrame(array).toDF("id","age")
    // 设定边界,分为5个年龄组:[0,20),[20,30),[30,40),[40,50),[50,正无穷)
    // 注:人的年龄当然不可能正无穷,我只是为了给大家演示正无穷PositiveInfinity的用法,负无穷是NegativeInfinity。
    val splits = Array(0, 20, 30, 40, 50, Double.PositiveInfinity)
    //初始化Bucketizer对象并进行设定:setSplits是设置我们的划分依据
    val bucketizer = new Bucketizer().setSplits(splits).setInputCol("age").setOutputCol("bucketizer_feature")
    //transform方法将DataFrame二值化。
    val bucketizerdf = bucketizer.transform(df)
    //show是用于展示结果
    bucketizerdf.show
  }

}

输出结果:

+---+----+------------------+
| id| age|bucketizer_feature|
+---+----+------------------+
|  1|13.0|               0.0|
|  2|16.0|               0.0|
|  3|23.0|               1.0|
|  4|35.0|               2.0|
|  5|56.0|               4.0|
|  6|44.0|               3.0|
+---+----+------------------+

连续型数据处理之给定分位数离散化:QuantileDiscretizer

有时候我们不想给定分类标准,可以让spark自动给我们分箱。

object QuantileDiscretizerDemo {
  def main(args: Array[String]): Unit = {
    var spark = SparkSession.builder().appName("QuantileDiscretizerDemo").master("local[2]").getOrCreate();
    val array = Array((1,13.0),(2,14.0),(3,22.0),(4,35.0),(5,44.0),(6,56.0),(7,21.0))
    val df = spark.createDataFrame(array).toDF("id","age")
    //和Bucketizer类似:将连续数值特征转换离散化。但这里不再自己定义splits(分类标准),而是定义分几箱就可以了。
    val quantile = new QuantileDiscretizer().setNumBuckets(5).setInputCol("age").setOutputCol("quantile_feature")
    //因为事先不知道分桶依据,所以要先fit,相当于对数据进行扫描一遍,取出分位数来,再transform进行转化。
    val quantiledf = quantile.fit(df).transform(df)
    //show是用于展示结果
    quantiledf.show
  }
}

标准化

对于同一个特征,不同的样本中的取值可能会相差非常大,一些异常小或异常大的数据会误导模型的正确训练;另外,如果数据的分布很分散也会影响训练结果。以上两种方式都体现在方差会非常大。此时,我们可以将特征中的值进行标准差标准化,即转换为均值为0,方差为1的正态分布。如果特征非常稀疏,并且有大量的0(现实应用中很多特征都具有这个特点),Z-score 标准化的过程几乎就是一个除0的过程,结果不可预料。所以在训练模型之前,一定要对特征的数据分布进行探索,并考虑是否有必要将数据进行标准化。基于特征值的均值(mean)和标准差(standard deviation)进行数据的标准化。它的计算公式为:标准化数据=(原数据-均值)/标准差。标准化后的变量值围绕0上下波动,大于0说明高于平均水平,小于0说明低于平均水平。

StandardScaler

object StandardScalerDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()
    val df = spark.read.format("libsvm").load("libsvm_data.txt")
    //setWithMean是否减均值。setWithStd是否将数据除以标准差。这里就是没有减均值但有除以标准差
    df.printSchema()
    val scaler =new StandardScaler().setInputCol("features").setOutputCol("scaledFeatures").setWithMean(false).setWithStd(true)
    // 计算均值方差等参数
    val scalermodel = scaler.fit(df)
    // 标准化
    scalermodel.transform(df).show()
  }
}
libsvm_data.txt
label 标签,就是属于哪一类,即你要分类的种类,通常是一些整数。
index 是有顺序的索引,通常是连续的整数。就是指特征编号,必须按照升序排列
value 就是特征值,训练,通常是一堆实数组成。
格式:标签 第一维特征编号:第一维特征值 第二维特征编号:第二维特征值 …例:
0 1:1 2:2
1 1:2 2:4

归一化

为什么数据需要归一化?以房价预测为案例,房价(y)通常与离市中心距离(x1)、面积(x2)、楼层(x3)有关,设y=ax1+bx2+cx3,那么abc就是我们需要重点解决的参数。但是有个问题,面积一般数值是比较大的,100平甚至更多,而距离一般都是几公里而已,b参数只要一点变化都能对房价产生巨大影响,而a的变化对房价的影响相对就小很多了。显然这会影响最终的准确性,毕竟距离可是个非常大的影响因素啊。 所以, 需要使用特征的归一化, 取值跨度大的特征数据, 我们浓缩一下, 跨度小的括展一下, 使得他们的跨度尽量统一
归一化就是将所有特征值都等比地缩小到0-1或者-1到1之间的区间内。其目的是为了使特征都在相同的规模中。

绝对值最大标准化:MaxAbsScaler

import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
/**
  * Created by LYL on 2018/3/20.
  */
object MaxAbsScalerDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("MaxAbsScalerDemo").master("local[2]").getOrCreate()
    val dataFrame = spark.createDataFrame(Seq(
      (0, Vectors.dense(10.0, 0.1, -8.0)),
      (1, Vectors.dense(100.0, 1.0, -4.0)),
      (2, Vectors.dense(1000.0, 10.0, 8.0))
    )).toDF("id", "features")
    val maxabs = new MaxAbsScaler().setInputCol("features").setOutputCol("maxabs_features")
    // fit作用是把所有值都扫描一遍,计算出最大最小值,比如1000的话那么absMax=1000。最后返回MaxAbsScalerModel
    val scalerModel = maxabs.fit(dataFrame)
    // 使用每个特征的最大值的绝对值将输入向量的特征值都缩放到[-1,1]范围内
    val scalerdf = scalerModel.transform(dataFrame)
    scalerdf.show
  }
}

运行结果:

+---+-----------------+----------------+
| id|         features| maxabs_features|
+---+-----------------+----------------+
|  0|  [10.0,0.1,-8.0]|[0.01,0.01,-1.0]|
|  1| [100.0,1.0,-4.0]|  [0.1,0.1,-0.5]|
|  2|[1000.0,10.0,8.0]|   [1.0,1.0,1.0]|
+---+-----------------+----------------+

归一化之最小最大值标准化MinMaxScaler

import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.SparkSession
/**
  * Created by LYL on 2018/3/20.
  */
object MinMaxScalerDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("MinMaxScalerDemo").master("local[2]").getOrCreate()
    val dataFrame = spark.createDataFrame(Seq(
      (0, Vectors.dense(2.0, 0.9, -2.0)),
      (1, Vectors.dense(3.0, -2.0, -3.0)),
      (2, Vectors.dense(4.0, -2.0, 2.0))
    )).toDF("id", "features")
    val maxabs = new MinMaxScaler().setInputCol("features").setOutputCol("minmax_features")
    val scalerModel = maxabs.fit(dataFrame)
    // 将所有值都缩放到[0,1]范围内
    val scalerdf = scalerModel.transform(dataFrame)
    scalerdf.show
  }
}

运行结果为:

+---+---------------+---------------+
| id|       features|minmax_features|
+---+---------------+---------------+
|  0| [2.0,0.9,-2.0]|  [0.0,1.0,0.2]|
|  1|[3.0,-2.0,-3.0]|  [0.5,0.0,0.0]|
|  2| [4.0,-2.0,2.0]|  [1.0,0.0,1.0]|
+---+---------------+---------------+

正则化Normalizer

为什么要有正则化?就是为了防止过拟合。来看一下正则化是怎么计算的:
未标题-2.jpg

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()
    val dataFrame = spark.createDataFrame(Seq(
      (0, Vectors.dense(1.0, 0.5, -1.0)),
      (1, Vectors.dense(2.0, 1.0, 1.0)),
      (2, Vectors.dense(4.0, 10.0, 2.0))
    )).toDF("id", "features")
    //setP是指L1正则化还是L2正则化,比如1.0就是上面说到的L1正则化,计算如下:1/(1+0.5+1)
    val normalizer = new Normalizer().setInputCol("features").setOutputCol("normalizer_features").setP(1.0)
    normalizer.transform(dataFrame).show(truncate = false)
  }

输出结果为:

+---+--------------+-------------------+
|id |features      |normalizer_features|
+---+--------------+-------------------+
|0  |[1.0,0.5,-1.0]|[0.4,0.2,-0.4]     |
|1  |[2.0,1.0,1.0] |[0.5,0.25,0.25]    |
|2  |[4.0,10.0,2.0]|[0.25,0.625,0.125] |
+---+--------------+-------------------+

N-gram

N-Gram认为语言中每个单词只与其前面长度 N-1 的上下文有关。主要分为bigram和trigram,bigram假设下一个词的出现依赖它前面的一个词,trigram假设下一个词的出现依赖它前面的两个词。在SparkML中用NGram类实现,setN(2)为bigram,setN(3)为trigram。

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()
    val wordDataFrame = spark.createDataFrame(Seq(
      (0, Array("Hi", "I", "heard", "about", "Spark")),
      (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
      (2, Array("Logistic", "regression", "models", "are", "neat"))
    )).toDF("id", "words")
    val ngram2 = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")
    val ngram3 = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams")
    ngram2.transform(wordDataFrame).show(false)
    ngram3.transform(wordDataFrame).show(false)
  }

输出结果为:

+---+------------------------------------------+------------------------------------------------------------------+
|id |words                                     |ngrams                                                            |
+---+------------------------------------------+------------------------------------------------------------------+
|0  |[Hi, I, heard, about, Spark]              |[Hi I, I heard, heard about, about Spark]                         |
|1  |[I, wish, Java, could, use, case, classes]|[I wish, wish Java, Java could, could use, use case, case classes]|
|2  |[Logistic, regression, models, are, neat] |[Logistic regression, regression models, models are, are neat]    |
+---+------------------------------------------+------------------------------------------------------------------+

18/03/24 11:23:32 INFO CodeGenerator: Code generated in 14.198881 ms
+---+------------------------------------------+--------------------------------------------------------------------------------+
|id |words                                     |ngrams                                                                          |
+---+------------------------------------------+--------------------------------------------------------------------------------+
|0  |[Hi, I, heard, about, Spark]              |[Hi I heard, I heard about, heard about Spark]                                  |
|1  |[I, wish, Java, could, use, case, classes]|[I wish Java, wish Java could, Java could use, could use case, use case classes]|
|2  |[Logistic, regression, models, are, neat] |[Logistic regression models, regression models are, models are neat]            |
+---+------------------------------------------+--------------------------------------------------------------------------------+

多项式转化PolynomialExpansion

有时候要对特征值进行一些多项式的转化,比如平方啊,三次方啊等等,那就用到了PolynomialExpansion。

object PolynomialExpansionDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()
    val dataFrame = spark.createDataFrame(Seq(
      (0, Vectors.dense(1.0, 5.0)),
      (1, Vectors.dense(2.0, 1.0)),
      (2, Vectors.dense(4.0, 8.0))
    )).toDF("id", "features")
    //setDegree表示多项式最高次幂 比如1.0,5.0可以是 三次:1.0^3 5.0^3 1.0+5.0^2 二次:1.0^2+5.0 1.0^2 5.0^2 1.0+5.0 一次:1.0 5.0
    val po = new PolynomialExpansion().setDegree(3).setInputCol("features").setOutputCol("Polynomial_features")
    po.transform(dataFrame).show(truncate = false)
  }
}

输出结果为:

+---+---------+-----------------------------------------------+
|id |features |Polynomial_features                            |
+---+---------+-----------------------------------------------+
|0  |[1.0,5.0]|[1.0,1.0,1.0,5.0,5.0,5.0,25.0,25.0,125.0]      |
|1  |[2.0,1.0]|[2.0,4.0,8.0,1.0,2.0,4.0,1.0,2.0,1.0]          |
|2  |[4.0,8.0]|[4.0,16.0,64.0,8.0,32.0,128.0,64.0,256.0,512.0]|
+---+---------+-----------------------------------------------+

Tokenizer分词器

当我们的输入数据为文本(句子)的时候,我们会想把他们切分为单词再进行数据处理,这时候就要用到Tokenizer类了。

object TokenizerDemo {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").appName("TokenizerDemo").getOrCreate()
    val dataFrame = spark.createDataFrame(Seq((0, "Hello I am LYL"), (1, "I Love Bigdata"), (2, "1 2 3 4"))).toDF("id", "sentence")
    // Tokenization(分词器)是一个接受文本(通常是句子)输入,然后切分成单词。
    val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
    tokenizer.transform(dataFrame).show()
    // RegexTokenizer(正则化分词器)基于正则表达式匹配提供了更多高级的分词功能。将gaps参数设置为false,表明使用正则表达式匹配标记,而不是使用分隔符。
    // 此处\\d表示匹配数字。
    val regextokenizer = new RegexTokenizer().setInputCol("sentence").setOutputCol("words").setGaps(false).setPattern("\\d")
    regextokenizer.transform(dataFrame).show()
    //使用udf函数 统计单词个数
    val wordcount = udf { (word: Seq[String]) => word.length }
    tokenizer.transform(dataFrame).select("words").withColumn("wordcount", wordcount(col("words"))).show()

  }

输出结果为:

+---+--------------+-------------------+
| id|      sentence|              words|
+---+--------------+-------------------+
|  0|Hello I am LYL|[hello, i, am, lyl]|
|  1|I Love Bigdata| [i, love, bigdata]|
|  2|       1 2 3 4|       [1, 2, 3, 4]|
+---+--------------+-------------------+

+---+--------------+------------+
| id|      sentence|       words|
+---+--------------+------------+
|  0|Hello I am LYL|          []|
|  1|I Love Bigdata|          []|
|  2|       1 2 3 4|[1, 2, 3, 4]|
+---+--------------+------------+

+-------------------+---------+
|              words|wordcount|
+-------------------+---------+
|[hello, i, am, lyl]|        4|
| [i, love, bigdata]|        3|
|       [1, 2, 3, 4]|        4|
+-------------------+---------+

SQLTransformer

我们都很喜欢sql语句,简单好用又熟悉,那么Spark ML很人性化的为我们提供了SQLTransformer类,使得我们能用我们熟悉的SQL来做特征转化。它支持SparkSql中的所有select选择语句,sum(),count(),group by,order by等等都可以用!形如"SELECT ...FROM __THIS__"。'__THIS__'代表输入数据的基础表。

def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().master("local[2]").getOrCreate()
    val df = spark.createDataFrame(Seq((1, "lyl", 90), (2, "ddd", 88), (3, "ccc", 95)))
      .toDF("id", "name", "score")
    //__THIS__代表输入数据的基础表 在这里就是指df
    val sqltransformer1 = new SQLTransformer().setStatement("select id from __THIS__")
    val sqltransformer2 = new SQLTransformer().setStatement("select sum(score)/3 as average from __THIS__")
    sqltransformer1.transform(df).show()
    sqltransformer2.transform(df).show()
  }

输出结果为:

+---+
| id|
+---+
|  1|
|  2|
|  3|
+---+

|average|
+-------+
|   91.0|
+-------+

网友评论

登录后评论
0/500
评论
公众号胖滚猪学编程
+ 关注