MapReduce多个job同时使用的方式(从网上找到的案例，原始博文：http://www.cnblogs.com/yjmyzz/p/4540469.html)

1. 云栖社区>
2. 博客>
3. 正文

## MapReduce多个job同时使用的方式(从网上找到的案例，原始博文：http://www.cnblogs.com/yjmyzz/p/4540469.html)

1. 求Sum

2. 求Count

3. 计算平均数

```package cn.toto.bigdata.mr.wc;

import java.io.IOException;

public class Avg2 {

private static final Text TEXT_SUM = new Text("SUM");
private static final Text TEXT_COUNT = new Text("COUNT");
private static final Text TEXT_AVG = new Text("AVG");

public static class SumMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public long sum = 0;

@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
sum += value.toString().length();
}

@Override
protected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
context.write(TEXT_SUM, new LongWritable(sum));
}
}

public static class SumReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
public long sum = 0;

@Override
protected void reduce(Text key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
for (LongWritable v : values) {
sum += v.get();
}
context.write(TEXT_SUM, new LongWritable(sum));
}
}

//计算Count
public static class CountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
public long count = 0;

@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException {
count += 1;
}

@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
context.write(TEXT_COUNT, new LongWritable(count));
}
}

public static class CountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
public long count = 0;

@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
for (LongWritable v : values) {
count += v.get();
}
context.write(TEXT_COUNT, new LongWritable(count));
}
}

//计算Avg
public static class AvgMapper extends Mapper<LongWritable, Text, LongWritable, LongWritable> {
public long count = 0;
public long sum = 0;

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String[] v = value.toString().split("\t");
if (v[0].equals("COUNT")) {
count = Long.parseLong(v[1]);
} else if (v[0].equals("SUM")) {
sum = Long.parseLong(v[1]);
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
context.write(new LongWritable(sum), new LongWritable(count));
}
}

public static class AvgReducer extends Reducer<LongWritable, LongWritable, Text, DoubleWritable> {
public long sum = 0;
public long count = 0;

@Override
protected void reduce(LongWritable key, Iterable<LongWritable> values,Context context)
throws IOException, InterruptedException {
sum += key.get();
for(LongWritable v : values) {
count += v.get();
}
}

@Override
protected void cleanup(Reducer<LongWritable, LongWritable, Text, DoubleWritable>.Context context)
throws IOException, InterruptedException {
context.write(TEXT_AVG, new DoubleWritable(new Double(sum) / count));
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();

String inputPath = "E:/wordcount/input/a.txt";
String maxOutputPath = "E:/wordcount/output/max/";
String countOutputPath = "E:/wordcount/output/count/";
String avgOutputPath = "E:/wordcount/output/avg/";

Job job1 = Job.getInstance(conf, "Sum");
job1.setJarByClass(Avg2.class);
job1.setMapperClass(SumMapper.class);
job1.setCombinerClass(SumReducer.class);
job1.setReducerClass(SumReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job1, new Path(maxOutputPath));

Job job2 = Job.getInstance(conf, "Count");
job2.setJarByClass(Avg2.class);
job2.setMapperClass(CountMapper.class);
job2.setCombinerClass(CountReducer.class);
job2.setReducerClass(CountReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job2, new Path(countOutputPath));

Job job3 = Job.getInstance(conf, "Average");
job3.setJarByClass(Avg2.class);
job3.setMapperClass(AvgMapper.class);
job3.setReducerClass(AvgReducer.class);
job3.setMapOutputKeyClass(LongWritable.class);
job3.setMapOutputValueClass(LongWritable.class);
job3.setOutputKeyClass(Text.class);
job3.setOutputValueClass(DoubleWritable.class);

//将job1及job2的输出为做job3的输入
FileOutputFormat.setOutputPath(job3, new Path(avgOutputPath));

//提交job1及job2,并等待完成
if (job1.waitForCompletion(true) && job2.waitForCompletion(true)) {
System.exit(job3.waitForCompletion(true) ? 0 : 1);
}
}

}
```

E:/wordcount/input/a.txt

E:\wordcount\output\max\part-r-00000的内容如下：