Hadoop MapReduce编程 API入门系列之分区和合并(十四)

简介:

 

 

 

 

 

 

 

 

 

 

 

 

代码

复制代码
  1 package zhouls.bigdata.myMapReduce.Star;
  2 
  3 
  4 import java.io.IOException;
  5 import org.apache.hadoop.conf.Configuration;
  6 import org.apache.hadoop.conf.Configured;
  7 import org.apache.hadoop.fs.FileSystem;
  8 import org.apache.hadoop.fs.Path;
  9 import org.apache.hadoop.io.Text;
 10 import org.apache.hadoop.mapreduce.Job;
 11 import org.apache.hadoop.mapreduce.Mapper;
 12 import org.apache.hadoop.mapreduce.Partitioner;
 13 import org.apache.hadoop.mapreduce.Reducer;
 14 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 15 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 16 import org.apache.hadoop.util.Tool;
 17 import org.apache.hadoop.util.ToolRunner;
 18 /**
 19  * 
 20  * @function 统计分别统计出男女明星最大搜索指数
 21  * @author 小讲
 22  */
 23  
 24  /*
 25 姓名    性别    搜索指数
 26 李易峰    male    32670
 27 朴信惠    female    13309
 28 林心如    female    5242
 29 黄海波    male    5505
 30 成龙    male    7757
 31 刘亦菲    female    14830
 32 angelababy    female    55083
 33 王宝强    male    9472
 34 郑爽    female    9279
 35 周杰伦    male    42020
 36 莫小棋    female    13978
 37 朱一龙    male    10524
 38 宋智孝    female    12494
 39 吴京    male    6684
 40 赵丽颖    female    24174
 41 尹恩惠    female    5985
 42 李金铭    female    5925
 43 关之琳    female    7668
 44 邓超    male    11532
 45 钟汉良    male    8289
 46 周润发    male    4808
 47 甄子丹    male    5479
 48 林妙可    female    5306
 49 柳岩    female    8221
 50 蔡琳    female    7320
 51 张佳宁    female    6628
 52 裴涩琪    female    5658
 53 李晨    male    9559
 54 周星驰    male    11483
 55 杨紫    female    11094
 56 全智贤    female    5336
 57 张柏芝    female    9337
 58 孙俪    female    7295
 59 鲍蕾    female    5375
 60 杨幂    female    20238
 61 刘德华    male    19786
 62 柯震东    male    6398
 63 张国荣    male    5013
 64 王阳    male    5169
 65 李小龙    male    6859
 66 林志颖    male    4512
 67 林正英    male    5832
 68 吴秀波    male    5668
 69 陈伟霆    male    12817
 70 陈奕迅    male    10472
 71 赵又廷    male    5190
 72 张馨予    female    35062
 73 陈晓    male    17901
 74 赵韩樱子    female    7077
 75 乔振宇    male    8877
 76 宋慧乔    female    5708
 77 韩艺瑟    female    5426
 78 张翰    male    7012
 79 谢霆锋    male    6654
 80 刘晓庆    female    5553
 81 陈翔    male    7999
 82 陈学冬    male    8829
 83 秋瓷炫    female    6504
 84 王祖蓝    male    6662
 85 吴亦凡    male    16472
 86 陈妍希    female    32590
 87 倪妮    female    9278
 88 高梓淇    male    7101
 89 赵奕欢    female    7197
 90 赵本山    male    12655
 91 高圆圆    female    13688
 92 陈赫    male    6820
 93 鹿晗    male    32492
 94 贾玲    female    5304
 95 宋佳    female    6202
 96 郭碧婷    female    5295
 97 唐嫣    female    12055
 98 杨蓉    female    10512
 99 李钟硕    male    26278
100 郑秀晶    female    10479
101 熊黛林    female    26732
102 金秀贤    male    11370
103 古天乐    male    4954
104 黄晓明    male    10964
105 李敏镐    male    10512
106 王丽坤    female    5501
107 谢依霖    female    7000
108 陈冠希    male    9135
109 范冰冰    female    13734
110 姚笛    female    6953
111 彭于晏    male    14136
112 张学友    male    4578
113 谢娜    female    6886
114 胡歌    male    8015
115 古力娜扎    female    8858
116 黄渤    male    7825
117 周韦彤    female    7677
118 刘诗诗    female    16548
119 郭德纲    male    10307
120 郑恺    male    21145
121 赵薇    female    5339
122 李连杰    male    4621
123 宋茜    female    11164
124 任重    male    8383
125 李若彤    female    9968
126 
127 
128 得到:
129 angelababy    female    55083
130 周杰伦    male    42020
131 */
132 public class Star extends Configured implements Tool{
133     /**
134      * @function Mapper 解析明星数据
135      * @input key=偏移量  value=明星数据
136      * @output key=gender value=name+hotIndex
137      */
138     public static class ActorMapper extends Mapper<Object,Text,Text,Text>{
139                     //在这个例子里,第一个参数Object是Hadoop根据默认值生成的,一般是文件块里的一行文字的行偏移数,这些偏移数不重要,在处理时候一般用不上
140         public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
141         //拿:周杰伦    male    42020
142             //value=name+gender+hotIndex
143             String[] tokens = value.toString().split("\t");//使用分隔符\t,将数据解析为数组 tokens
144             String gender = tokens[1].trim();//性别,trim()是去除两边空格的方法
145                         //tokens[0]        tokens[1]        tokens[2]        
146                         //周杰伦        male            42020
147             String nameHotIndex = tokens[0] + "\t" + tokens[2];//名称和关注指数
148             //输出key=gender value=name+hotIndex
149             context.write(new Text(gender), new Text(nameHotIndex));//写入gender是k2,nameHotIndex是v2
150 //            context.write(gender,nameHotIndex);等价        
151             //将gender和nameHotIndex写入到context中
152         }
153     }
154 
155     
156     
157     /**
158      * @function Partitioner 根据sex选择分区
159      */
160     public static class ActorPartitioner extends Partitioner<Text, Text>{     
161         @Override
162         public int getPartition(Text key, Text value, int numReduceTasks){
163             String sex = key.toString();//按性别分区
164             
165             // 默认指定分区 0
166             if(numReduceTasks==0)
167                 return 0;
168             
169             //性别为male 选择分区0
170             if(sex.equals("male"))             
171                 return 0;
172             //性别为female 选择分区1
173             if(sex.equals("female"))
174                 return 1 % numReduceTasks;
175             //其他性别 选择分区2
176             else
177                 return 2 % numReduceTasks;
178            
179         }
180     }
181 
182     
183     
184     /**
185      * @function 定义Combiner 合并 Mapper 输出结果
186      */
187     public static class ActorCombiner extends Reducer<Text, Text, Text, Text>{
188         private Text text = new Text();
189         @Override
190         public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
191             int maxHotIndex = Integer.MIN_VALUE;
192             int hotIndex = 0;
193             String name="";
194             for (Text val : values){//星型for循环,即把values的值传给Text val
195                 String[] valTokens = val.toString().split("\\t");
196                 hotIndex = Integer.parseInt(valTokens[1]);
197                 if(hotIndex>maxHotIndex){
198                     name = valTokens[0];
199                     maxHotIndex = hotIndex;
200                 }
201             }
202             text.set(name+"\t"+maxHotIndex);
203             context.write(key, text);
204         }
205     }
206     
207     
208     
209     /**
210      * @function Reducer 统计男、女明星最高搜索指数
211      * @input key=gender  value=name+hotIndex
212      * @output key=name value=gender+hotIndex(max)
213      */
214     public static class ActorReducer extends Reducer<Text,Text,Text,Text>{
215         @Override
216         public void reduce(Text key, Iterable<Text> values, Context context)throws IOException, InterruptedException{
217             int maxHotIndex = Integer.MIN_VALUE;
218 
219             String name = " ";
220             int hotIndex = 0;
221             // 根据key,迭代 values 集合,求出最高搜索指数
222             for (Text val : values){//星型for循环,即把values的值传给Text val
223                 String[] valTokens = val.toString().split("\\t");
224                 hotIndex = Integer.parseInt(valTokens[1]);
225                 if (hotIndex > maxHotIndex){
226                     name = valTokens[0];
227                     maxHotIndex = hotIndex;
228                 }
229             }
230             context.write(new Text(name), new Text(key + "\t"+ maxHotIndex));//写入name是k3,key + "\t"+ maxHotIndex是v3
231 //            context.write(name,key + "\t"+ maxHotIndex);//等价            
232         }
233     }
234 
235     /**
236      * @function 任务驱动方法
237      * @param args
238      * @return
239      * @throws Exception
240      */
241 
242     public int run(String[] args) throws Exception{
243         // TODO Auto-generated method stub
244         
245         Configuration conf = new Configuration();//读取配置文件,比如core-site.xml等等
246         Path mypath = new Path(args[1]);//Path对象mypath
247         FileSystem hdfs = mypath.getFileSystem(conf);//FileSystem对象hdfs
248         if (hdfs.isDirectory(mypath)){    
249             hdfs.delete(mypath, true);
250         }
251 
252         Job job = new Job(conf, "star");//新建一个任务
253         job.setJarByClass(Star.class);//主类
254         
255         job.setNumReduceTasks(2);//reduce的个数设置为2
256         job.setPartitionerClass(ActorPartitioner.class);//设置Partitioner类
257         
258         job.setMapperClass(ActorMapper.class);//Mapper
259         job.setMapOutputKeyClass(Text.class);//map 输出key类型
260         job.setMapOutputValueClass(Text.class);//map 输出value类型
261                 
262         job.setCombinerClass(ActorCombiner.class);//设置Combiner类
263         
264         job.setReducerClass(ActorReducer.class);//Reducer
265         job.setOutputKeyClass(Text.class);//输出结果 key类型
266         job.setOutputValueClass(Text.class);//输出结果 value类型
267         
268         FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
269         FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径
270         job.waitForCompletion(true);//提交任务
271         return 0;
272     }
273     
274     
275     /**
276      * @function main 方法
277      * @param args
278      * @throws Exception
279      */
280     public static void main(String[] args) throws Exception{
281 //        String[] args0 = { "hdfs://HadoopMaster:9000/star/star.txt",
282 //                            "hdfs://HadoopMaster:9000/out/star/" };
283         String[] args0 = { "./data/star/star.txt",
284                             "./out/star" };
285         
286         int ec = ToolRunner.run(new Configuration(), new Star(), args0);
287         System.exit(ec);
288     }
289 }
复制代码

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6165047.html,如需转载请自行联系原作者

相关文章
|
3月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
28 0
|
5月前
|
分布式计算
29 MAPREDUCE中的分区Partitioner
29 MAPREDUCE中的分区Partitioner
27 0
|
4月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
82 0
|
4月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
143 0
|
4月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
59 0
|
1月前
|
API 开发工具 开发者
抖音商品详情API入门:为开发者和商家打造增长工具箱
抖音商品详情API入门:为开发者和商家打造增长工具箱
50 0
|
3天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
17 2
|
2月前
|
前端开发 JavaScript API
前端秘法番外篇----学完Web API,前端才能算真正的入门
前端秘法番外篇----学完Web API,前端才能算真正的入门
|
3月前
|
JSON 安全 数据挖掘
从入门到精通:淘宝API接口调用全攻略
概述: 在当今电子商务的繁荣发展下,淘宝作为中国领先的电商平台,不仅为消费者提供了便捷的购物环境,也为商家们提供了强大的数据支持和服务能力。淘宝开放平台提供的API接口使得商家能够高效地获取店铺和商品的实时数据,从而更好地分析市场趋势、优化店铺运营、提升用户体验。本文将详细介绍如何从入门到精通地调用淘宝API接口,使商家能够充分利用这一强大工具推动业务增长。
|
3月前
|
JavaScript 前端开发 IDE
Vue3【为什么选择Vue框架、Vue简介 、Vue API 风格 、Vue开发前的准备 、Vue项目目录结构 、模板语法、属性绑定 、 】(一)-全面详解(学习总结---从入门到深化)
Vue3【为什么选择Vue框架、Vue简介 、Vue API 风格 、Vue开发前的准备 、Vue项目目录结构 、模板语法、属性绑定 、 】(一)-全面详解(学习总结---从入门到深化)
48 1