华章计算机 + 关注
手机版

《Hadoop与大数据挖掘》一2.5.4 Hadoop K-Means编程实现

  1. 云栖社区>
  2. 华章计算机>
  3. 博客>
  4. 正文

《Hadoop与大数据挖掘》一2.5.4 Hadoop K-Means编程实现

华章计算机 2017-06-26 10:31:00 浏览1014 评论0

摘要: 本节书摘来华章计算机《Hadoop与大数据挖掘》一书中的第2章 ,第2.5.4节,张良均 樊 哲 位文超 刘名军 许国杰 周 龙 焦正升 著 更多章节内容可以访问云栖社区“华章计算机”公众号查看。 2.5.4 Hadoop K-Means编程实现 在下面的实现过程中,会进行简单实现思路介绍,针对一些实现会有动手实践给读者练习。

本节书摘来华章计算机《Hadoop与大数据挖掘》一书中的第2章 ,第2.5.4节,张良均 樊 哲 位文超 刘名军 许国杰 周 龙 焦正升 著 更多章节内容可以访问云栖社区“华章计算机”公众号查看。

2.5.4 Hadoop K-Means编程实现

在下面的实现过程中,会进行简单实现思路介绍,针对一些实现会有动手实践给读者练习。一般情况下我们建议读者自己全部实现,对于实现起来有难度的读者,我们提供了参考程序,但是需要注意,参考程序不是完整的,里面设置了TODO提示,这些地方是需要读者去完善的。

image


思路1
不管是思路1还是思路2,Hadoop实现K-Means算法都包含4个步骤:①初始化聚类中心向量;②进行聚类并更新聚类中心向量;③判断是否达到循环条件,如果是则循环;④判断是否需要对原始数据进行分类,如果是则进行分类操作。下面就针对这4个步骤分别进行分析。
(1)初始化聚类中心向量:蓄水池抽样
初始化聚类中心其实和单机算法类似,可以有多种方法,比如随机取出k个聚类中心向量、直接取出前k个聚类中心向量等。在Hadoop的编程框架MapReduce限制下,如果是随机取k个聚类中心向量,那么实现起来就是这样的:遍历一次所有数据,统计数据个数n,再次遍历,按照k/n概率抽取k个数据。这样不是不可以,但是效率太低,并且如果真要实现起来,还是要考虑多个问题的,比如如果有多个Mapper怎么处理?
这里提出一种效率高,并且还能达到随机取数的算法—蓄水池抽样。
什么是蓄水池抽样呢?简单描述:先选中第1~k个元素,作为被选中的元素。然后依次对第k+1至第n个元素做如下操作:每个元素都有k/x的概率被选中,然后等概率地(1/k)替换掉被选中的元素(其中x是元素的序号)。其算法伪代码描述如代码清单2-33所示。

代码清单2-33 蓄水池抽样伪代码
Init : a reservoir with the size: k
                     for i= k+1 to N
                         M=random(1, i);
                         if( M < k)
                           SWAP the Mth value and ith value
    end for

蓄水池抽样同样可以使用Driver、Mapper、Reducer来进行分析。Driver部分可以参考MapReduce程序的固定模式,但是需要注意,需要传入聚类中心向量的个数,即k值。其代码参考代码清单2-34。

代码清单2-34 蓄水池抽样Driver示例代码
public int run(String[] args) throws Exception {
Configuration conf = getConf();
if (args.length != 3){
    System.err.println("Usage: dome.job.SampleJob <in> <out> <selectRecords>");
    System.exit(2);
}
//设置传入Mapper以及Reducer的参数
conf.setInt(SELECTRECORDS, integer.parseInt(args[2]));
Job job = Job.getInstance(conf, "sample job " + args[0]);
job.setJarByclass(SampleJob.class);
job.setMapperClass(SampleMapper.class);
job.setReducerClass(SampleReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}

Mapper就是蓄水池抽样算法的具体实现了,这里需要注意,map函数针对每条记录进行筛选,并不输出,所以这里在cleanup进行输出。这样就需要在setup里面初始化一个变量来存储当前已经被选为聚类中心向量的值。其各个函数描述如下。

  • setup():读取传入的参数值selectedRecordsNum,初始化当前处理的行数遍历row、存储已经选择的selectedRecordsNum个数据变量selectedRecords。
  • map():每次map函数读取一行记录,判断当前行数row是否小于selectedRe-cordsNum,如果小于则直接把当前记录加入selectedRecords;否则,以概率selectedRecordsNum/row使用当前记录来对selectedRecords中的任一记录进行替换。其部分代码如代码清单2-35所示。
  • cleanup():直接输出selectedRecords的内容即可。
代码清单2-35 蓄水池抽样Mapper map函数示例的代码
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, NullWritable, Text>.Context context)
        throws IOException, InterruptedException {
    row++; // 行数加1;
        if(row<=selectRecordsNum){
            selectRecords[(int) (row-1)]= new Text(value.toString());
            // 前面k条记录直接插入
        }else{// 以概率 k/i 决定是否用第i条记录替换前面的任意一条记录
            int p = SampleJob.getRandom((int)row);
            if(p<selectRecordsNum){// 替换
                selectRecords[p]=new Text(value.toString());
            }
        }
    }

在设计Reducer的时候需要考虑的一个问题是,如果有多个Mapper怎么办?多个Mapper就会发送k×N个聚类中心向量到Reducer中(其中N为Mapper的个数),所以在Reducer端需要对k×N个记录再次筛选,选出其中的k个聚类中心向量。这里当然也有多种方法,其实这里的选择和最开始我们在Mapper中针对所有数据随机选取k条记录的选择一样,这里所有数据只是“变”小了而已。因为是在Reducer中处理(一个Reducer可以理解为单机),所以其实也可以理解为单机的随机选择k条记录的算法。这里随机选择k条记录的算法也可以,不过我们这里还是选择使用蓄水池抽样。
这里只能使用一个Reducer,为什么?请读者思考。
动手实践:蓄水池抽样Hadoop实现
首先理解上面蓄水池抽样算法的Hadoop实现的描述及分析,接着新建工程,并参考上节完善工程代码功能。
实验步骤:
1)打开Eclipse,新建工程2.5_002_sample;
2)添加相关环境(如JDK路径、Hadoop路径等);
3)参考上节蓄水池抽样Hadoop实现原理实现编写源代码;
4)把工程编译,并导出jar包,然后上传jar包到master节点上,使用yarn jar的方式运行,查看输出及相关日志。
思考:
1)还有其他方式实现蓄水池抽样吗?
2)如何查看蓄水池抽样抽取出来的结果?
(2)更新聚类中心向量
更新聚类中心向量其实就是整个K-Means算法的核心所在,K-Means算法的每次循环其实就是一个不断更新聚类中心向量的过程。那么具体怎么更新呢?我们在单机算法中已经知道怎么更新了,怎么把其转换为Hadoop的MapReduce代码呢?其实,可以把每个Mapper理解为一个单机算法,因为其处理的数据其实是所有数据的一部分(一个文件块)。下面来看具体涉及的Driver、Mapper和Reducer。
针对Driver类,除了一些固定写法外,还需传入聚类初始中心向量路径、聚类中心个数、列分隔符(考虑是否需要?),其示例代码如代码清单2-36所示。

代码清单2-36 更新聚类中心向量Driver示例代码
    conf.set(SPLITTER, splitter );
    conf.set(CENTERPATH, args[4]);
    conf.setInt(K, k);
    Job job =Job.getInstance(conf,"kmeans center path:"+args[4]+",output"+output);
    job.setJarByClass(KMeansDriver.class);
    job.setMapperClass(KMeansMapper.class);
    job.setReducerClass(KMeansReducer.class);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(NullWritable.class);
    
    job.setNumReduceTasks(1);// 如果有多个会有什么问题?

Reducer设置多个会有什么问题?可以设置多个吗?设置多个有什么好处?

Mapper的工作主要包括两个:其一,读取首次HDFS上的聚类中心;其二,根据聚类中心对每个键值对记录进行距离计算,输出距离最小的聚类中心ID以及该条键值对记录。下面针对具体实现做分析。
1)setup():读取传入的初始聚类中心向量路径,根据路径读取对应的数据,利用分隔符来对初始聚类中心向量进行初始化(初始化为数组和列表)。
2)map():在map阶段根据初始化的聚类中心向量对当前记录进行分类,输出其对应的聚类中心id、当前记录,如代码清单2-37所示。

代码清单2-37 更新聚类中心向量Mapper map函数示例代码
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, IntWritable, Text>.Context context)
        throws IOException, InterruptedException {
    int vecId = getCenterId(value.toString());
    if(!validate(vecId)){
        logger.info("数据异常:{}",value.toString());
        return ;
    }
    ID.set(vecId);
    context.write(ID, value);
    logger.info("ID:{},value:{}",new Object[]{vecId,value});
}

Reducer要做的工作就是针对每个组的所有数据计算其平均值(该平均值就是新的聚类中心向量)。其函数描述如下。
1)reduce():每个reduce函数针对同一个聚类中心id的数据进行处理;具体处理过程为,把每条记录对应列的值加起来,同时记录当前的记录数;接着,使用每列和除以记录数,即可得到每列平均值,也就是当前聚类中心id新的聚类中心,如代码清单2-38所示。

代码清单2-38 更新聚类中心向量Reducer reduce函数示例代码
@Override
protected void reduce(IntWritable key, Iterable<Text> values,
Reducer<IntWritable, Text, Text, NullWritable>.Context arg2)throws IOException, InterruptedException {
    double[] sum=null;
    long  num =0;
    for(Text value:values){
        String[] valStr = pattern.split(value.toString(), -1);
        if(sum==null){// 初始化
           sum=new double[valStr.length];
           addToSum(sum,valStr);// 第一次需要加上
        }else{
        // 对应字段相加
           addToSum(sum,valStr);
        }
        num++;
        
    }
    if(num==0){
        centerVec[key.get()]="";
    }
    averageSum(sum,num);
    centerVec[key.get()]= format(sum);
}

3)cleanup():输出每个类别新的聚类中心。
动手实践:Hadoop实现更新聚类中心向量
实验步骤如下:
1)打开Eclipse,打开上一小节完成的工程;
2)根据上节Hadoop实现更新聚类中心实现思路,编写对应源代码;
3)把工程编译并导出Jar包,然后上传Jar包到master节点上,使用yarn jar的方式运行,查看输出及相关日志。
思考:如何测试代码?
(3)是否循环
是否循环其实就是检查前后两次聚类中心向量是否满足给定阈值。这里使用的是方差,其描述如图2-51所示。


image


还需要注意的问题是,如果不满足delta阈值,那么再次循环需初始化对应参数,主要包括下一个MapReduce程序的输入聚类中心向量及输出路径等。
动手实践:Hadoop实现更新聚类中心向量循环
实验步骤如下:
1)打开Eclipse,打开上一小节完成的工程;
2)参考上述描述完成对应的代码;
3)编译工程并导出jar包,然后上传jar包到master节点上,使用yarn jar的方式运行,查看输出及相关日志。
(4)是否分类
分类是针对原始数据进行的,这个工作其实在更新聚类中心向量的Mapper已经做了这个工作,所以分类可以参考前面的Mapper。这里不给出其具体代码,读者只需要完成动手实践即可(分类动手实践)。
动手实践:Hadoop实现最终分类
实验步骤如下:
1)打开Eclipse,并打开已经完成的工程;
2)使用KMeansMapper的实现,编辑Driver主类,分类原始数据;
3)编译工程,并导出jar包,然后上传jar包到master节点上,使用hadoop jar的方式运行,查看输出及相关日志。
思路2
思路2其实和思路1里面的大部分步骤都是一样的逻辑流程,只是在更新聚类中心向量环节做了优化。下面只针对优化的环节做分析,其他部分请读者参考思路1。
(1)更新聚类中心向量
更新聚类中心向量的Driver部分直接参考思路1对应内容即可,这里直接分析其Mapper实现。结合前面内容,我们知道这里需要实现自定义值类型。
由于Mapper输出的类型包含列和、个数,所以这里可以自定义一个值类型,该值类型需包含一个double的数组,用于存储某个类别的所有列和;一个long变量,用于存储当前类别的数据个数,如代码清单2-39所示。

代码清单2-39 更新聚类中心向量Mapper输出值自定义类型示例代码1
public class SumNumWritable implements Writable {

private long num;
private double[] sum;
…
}

同时,需要覆写readFields、write函数,在这里针对数组类型还需要做些额外的处理。其处理过程为存储数组的长度,在实例化类的时候传入数组的长度,否则会报NullPointer的异常,如代码清单2-40所示。

代码清单2-40 更新聚类中心向量Mapper输出值自定义类型示例代码2
@Override
public void readFields(DataInput in) throws IOException {
    this.num = in.readLong();    // 先读个数
    int size = in.readInt();    // 再读sum数组长度
    sum = new double[size];
    for (int i = 0; i < size; i++) {
         sum[i] = in.readDouble();
    }
}
@Override
public void write(DataOutput out) throws IOException {
    out.writeLong(this.num);    // 先写入个数
    out.writeInt(sum.length);    // 接着写入sum数组的长度;
    for (double d : sum) {
         out.writeDouble(d);    // 依次写入数组的值
    }
}

写入或者读取时,注意顺序,顺序重要吗?如果乱序会有什么影响?请读者思考。
下面针对Mapper进行分析。
setup():在setup函数中,除了需要参考思路1把初始聚类中心读取出来外,还需要初始化“列和”;由于每个类别都有一个“列和”,所以可以定义一个“列和”数组;然后根据聚类中心数来初始化该“列和”数组;同时,根据初始聚类中心的列个数类初始化每个类别的“列和”的double数组,如代码清单`javascript
2-41所示。
代码清单2-41 更新聚类中心向量Mapper的setup函数示例
private SumNumWritable[] sumNums = null;
@Override
protected void setup(Mapper.Context context)

    throws IOException, InterruptedException {
centerPathStr = context.getConfiguration().get(MainDriver.CENTERPATH);
splitter = context.getConfiguration().get(MainDriver.SPLITTER);
pattern = Pattern.compile(splitter);
k = context.getConfiguration().getInt(MainDriver.K, 0);
centerVec  = new String[k];
sumNums = new SumNumWritable[k];
// 读取数据
Path path = new Path(centerPathStr);
FileSystem fs = FileSystem.get(context.getConfiguration());
BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(path)));
try {
    String line;
    int index =0;
    while ((line =br.readLine())!= null){
    logger.info("center "+index+" vector:{}",line);
       centerVec[index++]=line;
    }
} finally {
    br.close();
}
// 初始化 sumNums
colSize = pattern.split(centerVec[0]).length;
for(int i=0;i<k;i++){
    sumNums[i] = new SumNumWritable(colSize);
}
logger.info("colSize:{}",colSize);

}

SumNumWritable构造函数如代码清单2-42所示。

代码清单2-42 更新聚类中心向量Mapper输出自定义值类型构造函数
public SumNumWritable(int size) {
this.sum = new double [size];
this.num =0;
}


- map():在map函数中在得到当前记录的类别后(可以参考思路1的做法),需要根据此类别去更新该类别的“列和”以及个数,如代码清单2-43所示。

**代码清单2-43 更新聚类中心向量Mapper的map函数示例
/**
 * 更新列和以及个数
 * @param sumNumWritable 某个类别的“列和”
 * @param valArr 当前记录
 */
private void updateSumNum(SumNumWritable sumNumWritable, double[] valArr) {
    if(sumNumWritable==null) return ;
    sumNumWritable.setNum(sumNumWritable.getNum()+1);
    addSum(sumNumWritable.getSum(),valArr);    // 这里不用setSum()
}**
- cleanup():在cleanup中只需要输出“列和”数组即可,如代码清单2-44所示。

代码清单2-44 更新聚类中心向量Mapper的cleanup函数示例
/**

  • 输出
    */

@Override
protected void cleanup(Context context)

      throws IOException, InterruptedException {
int index =0;
for(SumNumWritable sn:sumNums){
    ID.set(index++);
    context.write(ID, sn);
}

}

Reducer只需要整合各个Mapper的输出记录,针对每个记录分别求“列和”、个数和,然后再求平均即可得到新的聚类中心向量和。各个函数描述如下。
setup():只需读取分隔符参数,并进行初始化即可(在reduce函数中需要使用此参数)。
reduce():在reduce中直接使用for循环读取每个类别的“列和”以及个数,分别相加即可得到每个类别的最终“列和”以及个数,然后求平均即可得到更新后的聚类中心向量,如代码清单2-45所示。

代码清单2-45 更新聚类中心向量Reducer reduce示例代码
@Override
protected void reduce(IntWritable key, Iterable values,

    Context context) throws IOException, InterruptedException {
double[] sum=null;
long  num =0;
for(SumNumWritable value:values){
    if(sum==null){  // 第一次需要初始化
        sum = new double[value.getSum().length];
    }
    addToSum(sum,value.getSum());
    num+=value.getNum();
}
if(num==0){
    vec.set("");
    log.info("id:{}类别没有数据!",key.get());
}else{
    averageSum(sum,num);
    vec.set(format(sum));

log.info("id:{},聚类中心是:[{}]",new Object[]{key.get(),vec.toString ()});

}
context.write(vec, NullWritable.get());    //写入的顺序有影响吗?如果顺序写入呢?

}

(2)动手实践:Hadoop实现K-Means算法思路2
【云栖快讯】阿里巴巴小程序繁星计划,20亿补贴第一弹云应用免费申请,限量从速!  详情请点击

网友评论