Hadoop MapReduce编程 API入门系列之网页流量版本1(二十一)

简介:

  对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

代码

package zhouls.bigdata.myMapReduce.areapartition;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

public class FlowBean implements WritableComparable<FlowBean>{


private String phoneNB;
private long up_flow;
private long d_flow;
private long s_flow;

//在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
public FlowBean(){}

//为了对象数据的初始化方便,加入一个带参的构造函数
public FlowBean(String phoneNB, long up_flow, long d_flow) {
this.phoneNB = phoneNB;
this.up_flow = up_flow;
this.d_flow = d_flow;
this.s_flow = up_flow + d_flow;
}

public String getPhoneNB() {
return phoneNB;
}

public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}

public long getUp_flow() {
return up_flow;
}

public void setUp_flow(long up_flow) {
this.up_flow = up_flow;
}

public long getD_flow() {
return d_flow;
}

public void setD_flow(long d_flow) {
this.d_flow = d_flow;
}

public long getS_flow() {
return s_flow;
}

public void setS_flow(long s_flow) {
this.s_flow = s_flow;
}



//将对象数据序列化到流中
public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);
out.writeLong(up_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);

}


//从数据流中反序列出对象的数据
//从数据流中读出对象字段时,必须跟序列化时的顺序保持一致
public void readFields(DataInput in) throws IOException {

phoneNB = in.readUTF();
up_flow = in.readLong();
d_flow = in.readLong();
s_flow = in.readLong();

}


@Override
public String toString() {

return "" + up_flow + "\t" +d_flow + "\t" + s_flow;
}

public int compareTo(FlowBean o) {
return s_flow>o.getS_flow()?-1:1;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.areapartition;

import java.util.HashMap;

import org.apache.hadoop.mapreduce.Partitioner;

public class AreaPartitioner<KEY, VALUE> extends Partitioner<KEY, VALUE>{

private static HashMap<String,Integer> areaMap = new HashMap<>();

static{
areaMap.put("135", 0);
areaMap.put("136", 1);
areaMap.put("137", 2);
areaMap.put("138", 3);
areaMap.put("139", 4);
}





@Override
public int getPartition(KEY key, VALUE value, int numPartitions) {
//从key中拿到手机号,查询手机归属地字典,不同的省份返回不同的组号

int areaCoder = areaMap.get(key.toString().substring(0, 3))==null?5:areaMap.get(key.toString().substring(0, 3));

return areaCoder;
}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

package zhouls.bigdata.myMapReduce.areapartition;

import java.io.IOException;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import zhouls.bigdata.myMapReduce.areapartition.FlowBean;


/**
* 对流量原始日志进行流量统计,将不同省份的用户统计结果输出到不同文件
* 需要自定义改造两个机制:
* 1、改造分区的逻辑,自定义一个partitioner
* 2、自定义reduer task的并发任务数


*
*/
public class FlowSumArea implements Tool {

public static class FlowSumAreaMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {

//拿一行数据
String line = value.toString();
//切分成各个字段
String[] fields = StringUtils.split(line, "\t");

//拿到我们需要的字段
String phoneNB = fields[1];
long u_flow = Long.parseLong(fields[7]);
long d_flow = Long.parseLong(fields[8]);

//封装数据为kv并输出
context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

}


}


public static class FlowSumAreaReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

@Override
protected void reduce(Text key, Iterable<FlowBean> values,Context context)
throws IOException, InterruptedException {

long up_flow_counter = 0;
long d_flow_counter = 0;

for(FlowBean bean: values){

up_flow_counter += bean.getUp_flow();
d_flow_counter += bean.getD_flow();


}

context.write(key, new FlowBean(key.toString(), up_flow_counter, d_flow_counter));



}

}


public int run(String[] arg0) throws Exception {


Configuration conf = new Configuration();
Job job = Job.getInstance(conf);

job.setJarByClass(FlowSumArea.class);

job.setMapperClass(FlowSumAreaMapper.class);
job.setReducerClass(FlowSumAreaReducer.class);

//设置我们自定义的分组逻辑定义
job.setPartitionerClass(AreaPartitioner.class);


job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

//设置reduce的任务并发数,应该跟分组的数量保持一致
job.setNumReduceTasks(1);


FileInputFormat.addInputPath(job, new Path(arg0[0]));// 文件输入路径
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));// 文件输出路径
job.waitForCompletion(true);

return 0;

}

public static void main(String[] args) throws Exception {

//集群路径
// String[] args0 = { "hdfs://HadoopMaster:9000/flowSumArea/HTTP_20130313143750.dat",
// "hdfs://HadoopMaster:9000/out/flowSumArea"};

//集群路径
String[] args0 = { "./data/flowSumArea/HTTP_20130313143750.dat",
"./out/flowSumArea/"};

int ec = ToolRunner.run( new Configuration(), new FlowSumArea(), args0);
System. exit(ec);

}

@Override
public Configuration getConf() {
// TODO Auto-generated method stub
return null;
}

@Override
public void setConf(Configuration arg0) {
// TODO Auto-generated method stub

}


}

 


本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/6165864.html,如需转载请自行联系原作者

相关文章
|
4月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
31 0
|
3月前
|
分布式计算 Hadoop Java
Hadoop快速入门——第一章、认识Hadoop与创建伪分布式模式(Hadoop3.1.3版本配置)
Hadoop快速入门——第一章、认识Hadoop与创建伪分布式模式(Hadoop3.1.3版本配置)
77 0
|
5月前
|
Java Shell 分布式数据库
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
【大数据技术Hadoop+Spark】HBase数据模型、Shell操作、Java API示例程序讲解(附源码 超详细)
87 0
|
5月前
|
分布式计算 Java 大数据
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
【大数据技术Hadoop+Spark】HDFS Shell常用命令及HDFS Java API详解及实战(超详细 附源码)
279 0
|
5月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
68 0
|
2月前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
29 0
|
1月前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
21 2
|
1月前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
58 0
|
1月前
|
分布式计算 资源调度 Hadoop
Apache Hadoop入门指南:搭建分布式大数据处理平台
【4月更文挑战第6天】本文介绍了Apache Hadoop在大数据处理中的关键作用,并引导初学者了解Hadoop的基本概念、核心组件(HDFS、YARN、MapReduce)及如何搭建分布式环境。通过配置Hadoop、格式化HDFS、启动服务和验证环境,学习者可掌握基本操作。此外,文章还提及了开发MapReduce程序、学习Hadoop生态系统和性能调优的重要性,旨在为读者提供Hadoop入门指导,助其踏入大数据处理的旅程。
197 0
|
4月前
|
分布式计算 Java Hadoop
MapReduce编程:检索特定群体搜索记录和定义分片操作
MapReduce编程:检索特定群体搜索记录和定义分片操作
31 0