Hadoop MapReduce编程 API入门系列之MapReduce多种输入格式(十七)

简介:

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

 

复制代码
  1 package zhouls.bigdata.myMapReduce.ScoreCount;
  2 
  3 import java.io.DataInput;
  4 import java.io.DataOutput;
  5 import java.io.IOException;
  6 import org.apache.hadoop.io.WritableComparable;
  7 /**
  8 * 学习成绩读写类
  9 * 数据格式参考:19020090017 小讲 90 99 100 89 95
 10 * @author Bertron
 11 * 需要自定义一个 ScoreWritable 类实现 WritableComparable 接口,将学生各门成绩封装起来。
 12 */
 13 public class ScoreWritable implements WritableComparable< Object > {//其实这里,跟TVPlayData一样的
 14 //  注意:    Hadoop通过Writable接口实现的序列化机制,不过没有提供比较功能,所以和java中的Comparable接口合并,提供一个接口WritableComparable。(自定义比较)
 15 //         Writable接口提供两个方法(write和readFields)。
 16 
 17     
 18     private float Chinese;
 19     private float Math;
 20     private float English;
 21     private float Physics;
 22     private float Chemistry;
 23     
 24     
 25 //    问:这里我们自己编程时,是一定要创建一个带有参的构造方法,为什么还要显式的写出来一个带无参的构造方法呢?
 26 //    答:构造器其实就是构造对象实例的方法,无参数的构造方法是默认的,但是如果你创造了一个带有参数的构造方法,那么无参的构造方法必须显式的写出来,否则会编译失败。
 27     
 28     public ScoreWritable(){}//java里的无参构造函数,是用来在创建对象时初始化对象  
 29     //在hadoop的每个自定义类型代码里,好比,现在的ScoreWritable,都必须要写无参构造函数。
 30     
 31     
 32     //问:为什么我们在编程的时候,需要创建一个带有参的构造方法?
 33     //答:就是能让赋值更灵活。构造一般就是初始化数值,你不想别人用你这个类的时候每次实例化都能用另一个构造动态初始化一些信息么(当然没有需要额外赋值就用默认的)。
 34     
 35     public ScoreWritable(float Chinese,float Math,float English,float Physics,float Chemistry){//java里的有参构造函数,是用来在创建对象时初始化对象  
 36         this.Chinese = Chinese;
 37         this.Math = Math;
 38         this.English = English;
 39         this.Physics = Physics;
 40         this.Chemistry = Chemistry;
 41     }
 42     
 43     //问:其实set和get方法,这两个方法只是类中的setxxx和getxxx方法的总称,
 44     //    那么,为什么在编程时,有set和set***两个,只有get***一个呢?
 45     
 46     public void set(float Chinese,float Math,float English,float Physics,float Chemistry){
 47         this.Chinese = Chinese;//即float Chinese赋值给private float Chinese;
 48         this.Math = Math;
 49         this.English = English;
 50         this.Physics = Physics;
 51         this.Chemistry = Chemistry;
 52     }
 53 //    public float get(float Chinese,float Math,float English,float Physics,float Chemistry){因为这是错误的,所以对于set可以分开,get只能是get***
 54 //        return Chinese;
 55 //        return Math;
 56 //        return English;
 57 //        return Physics;
 58 //        return Chemistry;
 59 //    }
 60     
 61     
 62     public float getChinese() {//拿值,得返回,所以需有返回类型float
 63         return Chinese;
 64     }
 65     public void setChinese(float Chinese){//设值,不需,所以空返回类型
 66         this.Chinese = Chinese;
 67     }
 68     public float getMath() {//拿值
 69         return Math;
 70     }
 71     public void setMath(float Math){//设值
 72         this.Math = Math;
 73     }
 74     public float getEnglish() {//拿值
 75         return English;
 76     }
 77     public void setEnglish(float English){//设值
 78         this.English = English;
 79     }
 80     public float getPhysics() {//拿值
 81         return Physics;
 82     }
 83     public void setPhysics(float Physics){//设值
 84         this.Physics = Physics;
 85     }
 86     public float getChemistry() {//拿值
 87         return Chemistry;
 88     }
 89     public void setChemistry(float Chemistry) {//拿值
 90         this.Chemistry = Chemistry;
 91     }
 92     
 93     // 实现WritableComparable的readFields()方法
 94 //    对象不能传输的,需要转化成字节流!
 95 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
 96 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
 97     public void readFields(DataInput in) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
 98         Chinese = in.readFloat();//因为,我们这里的对象是float类型,所以是readFloat()
 99         Math = in.readFloat();
100         English = in.readFloat();//注意:反序列化里,需要生成对象对吧,所以,是用到的是get那边对象
101         Physics = in.readFloat();
102         Chemistry = in.readFloat();
103 //        in.readByte()
104 //        in.readChar()
105 //        in.readDouble()
106 //        in.readLine() 
107 //        in.readFloat()
108 //        in.readLong()
109 //        in.readShort()
110     }
111     
112     // 实现WritableComparable的write()方法,以便该数据能被序列化后完成网络传输或文件输出 
113 //    将对象转换为字节流并写入到输出流out中是序列化,write 的过程(最好记!!!)
114 //    从输入流in中读取字节流反序列化为对象      是反序列化,readFields的过程(最好记!!!)
115     public void write(DataOutput out) throws IOException {//拿代码来说的话,对象就是比如Chinese、Math。。。。
116         out.writeFloat(Chinese);//因为,我们这里的对象是float类型,所以是writeFloat()
117         out.writeFloat(Math);
118         out.writeFloat(English);//注意:序列化里,需要对象对吧,所以,用到的是set那边的对象
119         out.writeFloat(Physics);
120         out.writeFloat(Chemistry);
121 //        out.writeByte()
122 //        out.writeChar()
123 //        out.writeDouble()
124 //        out.writeFloat()
125 //        out.writeLong()
126 //        out.writeShort()
127 //        out.writeUTF()
128     }
129     
130     public int compareTo(Object o) {//java里的比较,Java String.compareTo()
131         return 0;
132     }
133     
134     
135 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
136 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
137 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
138     
139     
140 //    Hadoop中定义了两个序列化相关的接口:Writable 接口和 Comparable 接口,这两个接口可以合并成一个接口 WritableComparable。
141 //    Writable 接口中定义了两个方法,分别为write(DataOutput out)和readFields(DataInput in)
142 //    所有实现了Comparable接口的对象都可以和自身相同类型的对象比较大小
143     
144     
145 //  源码是
146 //    package java.lang;  
147 //    import java.util.*;      
148 //    public interface Comparable {  
149 //        /** 
150 //        * 将this对象和对象o进行比较,约定:返回负数为小于,零为大于,整数为大于 
151 //        */  
152 //        public int compareTo(T o);  
153 //    }
154     
155 }
复制代码

 

 

 

 

 

 

 

 

 

复制代码
  1 package zhouls.bigdata.myMapReduce.ScoreCount;
  2 
  3 import java.io.IOException;
  4 import org.apache.hadoop.conf.Configuration;
  5 import org.apache.hadoop.fs.FSDataInputStream;
  6 import org.apache.hadoop.fs.FileSystem;
  7 import org.apache.hadoop.fs.Path;
  8 import org.apache.hadoop.io.Text;
  9 import org.apache.hadoop.mapreduce.InputSplit;
 10 import org.apache.hadoop.mapreduce.JobContext;
 11 import org.apache.hadoop.mapreduce.RecordReader;
 12 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 14 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 15 import org.apache.hadoop.util.LineReader;
 16 /**
 17 * 自定义学生成绩读写InputFormat
 18 * 数据格式参考:19020090017 小讲 90 99 100 89 95
 19 * @author Bertron
 20 */
 21 
 22             //其实这个程序,就是在实现InputFormat接口,TVPlayInputFormat是InputFormat接口的实现类
 23             //比如   ScoreInputFormat  extends FileInputFormat implements InputFormat。
 24 
 25             //问:自定义输入格式 ScoreInputFormat 类,首先继承 FileInputFormat,然后分别重写 isSplitable() 方法和 createRecordReader() 方法。
 26 
 27 public class ScoreInputFormat extends FileInputFormat<Text,ScoreWritable > {//自定义数据输入格式,其实这都是模仿源码的!可以去看
 28 
 29 //    线路是: boolean  isSplitable()   ->   RecordReader<Text,ScoreWritable> createRecordReader()   ->   ScoreRecordReader extends RecordReader<Text, ScoreWritable > 
 30     
 31     @Override
 32     protected boolean isSplitable(JobContext context, Path filename) {//这是InputFormat的isSplitable方法
 33             //isSplitable方法就是是否要切分文件,这个方法显示如果是压缩文件就不切分,非压缩文件就切分。
 34 //        如果不允许分割,则isSplitable==false,则将第一个block、文件目录、开始位置为0,长度为整个文件的长度封装到一个InputSplit,加入splits中
 35 //        如果文件长度不为0且支持分割,则isSplitable==true,获取block大小,默认是64MB
 36         return false;    //整个文件封装到一个InputSplit
 37         //要么就是return true;        //切分64MB大小的一块一块,再封装到InputSplit
 38     }
 39     
 40     @Override
 41     public RecordReader<Text,ScoreWritable> createRecordReader(InputSplit inputsplit,TaskAttemptContext context) throws IOException, InterruptedException {
 42 //        RecordReader<k1, v1>是返回类型,返回的RecordReader对象的封装
 43 //        createRecordReader是方法,在这里是,ScoreInputFormat.createRecordReader。ScoreInputFormat是InputFormat类的实例
 44 //        InputSplit input和TaskAttemptContext context是传入参数
 45         
 46 //        isSplitable(),如果是压缩文件就不切分,整个文件封装到一个InputSplit
 47 //        isSplitable(),如果是非压缩文件就切,切分64MB大小的一块一块,再封装到InputSplit
 48         
 49         //这里默认是系统实现的的RecordReader,按行读取,下面我们自定义这个类ScoreRecordReader。
 50         //类似与Excel、WeiBo、TVPlayData代码写法
 51         return new ScoreRecordReader();//新建一个ScoreRecordReader实例,所有才有了上面RecordReader<Text,ScoreWritable>,所以才如下ScoreRecordReader,写我们自己的
 52     }
 53     
 54     
 55     //RecordReader中的两个参数分别填写我们期望返回的key/value类型,我们期望key为Text类型,value为ScoreWritable类型封装学生所有成绩
 56     public static class ScoreRecordReader extends RecordReader<Text, ScoreWritable > {//RecordReader<k1, v1>是一个整体
 57         public LineReader in;//行读取器
 58         public Text line;//每行数据类型
 59         public Text lineKey;//自定义key类型,即k1
 60         public ScoreWritable lineValue;//自定义value类型,即v1
 61         
 62         @Override
 63         public void close() throws IOException {//关闭输入流
 64             if(in !=null){
 65                 in.close();
 66             }
 67         }
 68         @Override
 69         public Text getCurrentKey() throws IOException, InterruptedException {//获取当前的key,即CurrentKey
 70             return lineKey;//返回类型是Text,即Text lineKey
 71         }
 72         @Override
 73         public ScoreWritable getCurrentValue() throws IOException,InterruptedException {//获取当前的Value,即CurrentValue
 74             return lineValue;//返回类型是ScoreWritable,即ScoreWritable lineValue
 75         }
 76         @Override
 77         public float getProgress() throws IOException, InterruptedException {//获取进程,即Progress
 78             return 0;//返回类型是float,即float 0
 79         }
 80         @Override
 81         public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {//初始化,都是模板
 82             FileSplit split=(FileSplit)input;
 83             Configuration job=context.getConfiguration();
 84             Path file=split.getPath();
 85             FileSystem fs=file.getFileSystem(job);
 86             
 87             FSDataInputStream filein=fs.open(file);
 88             in=new LineReader(filein,job);//输入流in
 89             line=new Text();//每行数据类型
 90             lineKey=new Text();//自定义key类型,即k1。//新建一个Text实例作为自定义格式输入的key
 91             lineValue = new ScoreWritable();//自定义value类型,即v1。//新建一个TVPlayData实例作为自定义格式输入的value
 92         }
 93         
 94         //此方法读取每行数据,完成自定义的key和value
 95         @Override
 96         public boolean nextKeyValue() throws IOException, InterruptedException {//这里面,才是篡改的重点
 97             int linesize=in.readLine(line);//line是每行数据,我们这里用到的是in.readLine(str)这个构造函数,默认读完读到文件末尾。其实这里有三种。
 98             
 99 //            是SplitLineReader.readLine  ->  SplitLineReader  extends   LineReader  ->  org.apache.hadoop.util.LineReader
100             
101 //            in.readLine(str)//这个构造方法执行时,会首先将value原来的值清空。默认读完读到文件末尾
102 //            in.readLine(str, maxLineLength)//只读到maxLineLength行
103 //            in.readLine(str, maxLineLength, maxBytesToConsume)//这个构造方法来实现不清空,前面读取的行的值
104 
105             if(linesize==0) return false;
106             
107             
108             String[] pieces = line.toString().split("\\s+");//解析每行数据
109                     //因为,我们这里是。默认读完读到文件末尾。line是Text类型。pieces是String[],即String数组。
110             
111             if(pieces.length != 7){
112                 throw new IOException("Invalid record received");
113             }
114             //将学生的每门成绩转换为 float 类型
115             float a,b,c,d,e;
116             try{
117                 a = Float.parseFloat(pieces[2].trim());//将String类型,如pieces[2]转换成,float类型,给a
118                 b = Float.parseFloat(pieces[3].trim());
119                 c = Float.parseFloat(pieces[4].trim());
120                 d = Float.parseFloat(pieces[5].trim());
121                 e = Float.parseFloat(pieces[6].trim());
122             }catch(NumberFormatException nfe){
123                 throw new IOException("Error parsing floating poing value in record");
124             }
125             lineKey.set(pieces[0]+"\t"+pieces[1]);//完成自定义key数据
126             lineValue.set(a, b, c, d, e);//封装自定义value数据
127 //            或者写
128 //            lineValue.set(Float.parseFloat(pieces[2].trim()),Float.parseFloat(pieces[3].trim()),Float.parseFloat(pieces[4].trim()),
129 //                    Float.parseFloat(pieces[5].trim()),Float.parseFloat(pieces[6].trim()));
130             
131 //            pieces[0]   pieces[1] pieces[2]  ... pieces[6]
132 //            19020090040 秦心芯 123 131 100 95 100
133 //            19020090006 李磊 99 92 100 90 100
134 //            19020090017 唐一建 90 99 100 89 95
135 //            19020090031 曾丽丽 100 99 97 79 96
136 //            19020090013 罗开俊 105 115 94 45 100
137 //            19020090039 周世海 114 116 93 31 97
138 //            19020090020 王正伟 109 98 88 47 99
139 //            19020090025 谢瑞彬 94 120 100 50 73
140 //            19020090007 于微 89 78 100 66 99
141 //            19020090012 刘小利 87 82 89 71 99
142             
143             
144             
145             return true;
146         }        
147     }
148 }
复制代码

 

 

 

 

 

 

复制代码
 1 package zhouls.bigdata.myMapReduce.ScoreCount;
 2 
 3 
 4 import java.io.IOException;
 5 import org.apache.hadoop.conf.Configuration;
 6 import org.apache.hadoop.conf.Configured;
 7 import org.apache.hadoop.fs.FileSystem;
 8 import org.apache.hadoop.fs.Path;
 9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.mapreduce.Job;
11 import org.apache.hadoop.mapreduce.Mapper;
12 import org.apache.hadoop.mapreduce.Reducer;
13 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
14 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
15 import org.apache.hadoop.util.Tool;
16 import org.apache.hadoop.util.ToolRunner;
17 /**
18 * 学生成绩统计Hadoop程序
19 * 数据格式参考:19020090017 小讲 90 99 100 89 95
20 * @author HuangBQ
21 */
22 public class ScoreCount extends Configured implements Tool{
23     public static class ScoreMapper extends Mapper<Text,ScoreWritable,Text,ScoreWritable>{
24         @Override
25         protected void map(Text key, ScoreWritable value, Context context)throws IOException, InterruptedException{
26             context.write(key, value);//写入key是k2,value是v2
27 //            context.write(new Text(key), new ScoreWritable(value));等价           
28         }
29     }
30     
31     public static class ScoreReducer extends Reducer<Text,ScoreWritable,Text,Text>{
32         private Text text = new Text();
33         protected void reduce(Text Key, Iterable< ScoreWritable > Values, Context context)throws IOException, InterruptedException{
34             float totalScore=0.0f;
35             float averageScore = 0.0f;
36             for(ScoreWritable ss:Values){
37                 totalScore +=ss.getChinese()+ss.getMath()+ss.getEnglish()+ss.getPhysics()+ss.getChemistry();
38                 averageScore +=totalScore/5;
39             }
40             text.set(totalScore+"\t"+averageScore);
41             context.write(Key, text);//写入Key是k3,text是v3
42 //            context.write(new Text(Key),new Text(text));等价            
43         }
44     }
45     
46 
47     public int run(String[] args) throws Exception{
48         Configuration conf = new Configuration();//读取配置文件
49         
50         Path mypath = new Path(args[1]);
51         FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
52         if (hdfs.isDirectory(mypath)) 
53         {
54             hdfs.delete(mypath, true);
55         }
56         
57         Job job = new Job(conf, "ScoreCount");//新建任务
58         job.setJarByClass(ScoreCount.class);//设置主类
59         
60         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
61         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
62         
63         job.setMapperClass(ScoreMapper.class);// Mapper
64         job.setReducerClass(ScoreReducer.class);// Reducer
65         
66         job.setMapOutputKeyClass(Text.class);// Mapper key输出类型
67         job.setMapOutputValueClass(ScoreWritable.class);// Mapper value输出类型
68                 
69         job.setInputFormatClass(ScoreInputFormat.class);//设置自定义输入格式
70         
71         job.waitForCompletion(true);        
72         return 0;
73     }
74     
75     
76     
77     public static void main(String[] args) throws Exception{
78 //        String[] args0 = 
79 //                { 
80 //                "hdfs://HadoopMaster:9000/score/score.txt",
81 //                "hdfs://HadoopMaster:9000/out/score/" 
82 //                };
83         
84         String[] args0 = 
85             { 
86             "./data/score/score.txt",
87             "./out/score/" 
88             };
89         
90         int ec = ToolRunner.run(new Configuration(), new ScoreCount(), args0);
91         System.exit(ec);
92     }
93 }
复制代码

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6165667.html,如需转载请自行联系原作者

相关文章
|
2月前
|
JSON 缓存 前端开发
API接口,实现统一格式
API接口,实现统一格式
20 1
|
3月前
|
API
uniapp上传文件时用到的api是什么?格式是什么?
uniapp上传文件时用到的api是什么?格式是什么?
|
4月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
31 0
|
5月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
87 0
|
5月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
274 0
|
5月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
65 0
|
2月前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
29 0
|
28天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
20 2
|
29天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
58 0
|
4月前
|
JSON 关系型数据库 MySQL
这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
【1月更文挑战第17天】【1月更文挑战第84篇】这个问题是由于Flink的Table API在处理MySQL数据时,将MULTISET类型的字段转换为了JSON格式
36 1

热门文章

最新文章