MapReduce编程实践之自定义数据类型

简介: 一:任务描述 自定义数据类型完成手机流量的分析 二:example data 格式为:记录报告时间戳、手机号码、AP mac、AC mac、访问的网址、网址种类、上行数据包数、下行数据包数、上行总流量、下行总流量、HTTP Response的状态。

一:任务描述

自定义数据类型完成手机流量的分析

二:example data

格式为:记录报告时间戳、手机号码、AP mac、AC mac、访问的网址、网址种类、上行数据包数、下行数据包数、上行总流量、下行总流量、HTTP Response的状态。

136315798506613726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82i02.c.aliimg.com 2427 248124681 200
1363157995052 138265441015C-0E-8B-C7-F1-E0:CMCC 120.197.40.44 0 264 0 200
1363157991076 1392643565620-10-7A-28-CC-0A:CMCC 120.196.100.992 4 132 1512 200
1363154400022 139262511065C-0E-8B-8B-B1-50:CMCC 120.197.40.44 0 240 0 200
1363157993044 1821157596194-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99iface.qiyi.com 视频网站15 12 1527 2106 200
1363157995074 841384135C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4122.72.52.12 2016 41161432 200
1363157993055 13560439658C4-17-FE-BA-DE-D9:CMCC 120.196.100.9918 15 1116 954 200
1363157995033 159201332575C-0E-8B-C7-BA-20:CMCC 120.197.40.4sug.so.360.cn 信息安全20 20 3156 2936 200
1363157983019 1371919941968-A1-B7-03-07-B1:CMCC-EASY 120.196.100.824 0 240 0 200
1363157984041 136605779915C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4s19.cnzz.com 站点统计24 9 6960 690 200
1363157973098 150136858585C-0E-8B-C7-F7-90:CMCC 120.197.40.4rank.ie.sogou.com 搜索引擎28 27 3659 3538 200
1363157986029 15989002119E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99www.umeng.com 站点统计3 3 1938 180 200
1363157992093 13560439658C4-17-FE-BA-DE-D9:CMCC 120.196.100.9915 9 918 4938 200
1363157986041 134802531045C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.43 3 180 180 200
1363157984040 136028465655C-0E-8B-8B-B6-00:CMCC 120.197.40.42052.flash2-http.qq.com 综合门户15 12 1938 2910 200
1363157995093 1392231446600-FD-07-A2-EC-BA:CMCC 120.196.100.82img.qfc.cn 1212 30083720 200
1363157982040 135024688235C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99y0.ifengimg.com 综合门户57 102 7335 110349 200
1363157986072 1832017338284-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99input.shouji.sogou.com 搜索引擎21 18 9531 2412 200
1363157990043 1392505741300-1F-64-E1-E6-9A:CMCC 120.196.100.55t3.baidu.com 搜索引擎69 63 11058 48243 200
1363157988072 1376077871000-FD-07-A4-7B-08:CMCC 120.196.100.822 2 120 120 200
1363157985079 1382307000120-7C-8F-70-68-1F:CMCC 120.196.100.996 3 360 180 200
1363157985069 1360021750200-1F-64-E2-E8-B1:CMCC 120.196.100.5518 138 1080 186852 200

三:Code

package mrTest;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class zidingyishujuleixing {
	
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		// TODO Auto-generated method stub
		
		Job job = new Job(new Configuration(),"自定义数据类型");
		job.setJarByClass(zidingyishujuleixing.class);
		//一:文件输入路径
		FileInputFormat.addInputPath(job, new Path(args[0]));
		
		//二:指定自定义Map类
		job.setMapperClass(Map.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(WlanString.class);
		
		//三:分区,指定reduce个数
		job.setNumReduceTasks(1);
		
		//四:TODO 排序  分组
		
		//五:规约处理
		
		//六:指定自定义的reduce类
		job.setReducerClass(Reduce.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(WlanString.class);
		
		//七:指定文件输出位置
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//八:提交运行
		System.exit(job.waitForCompletion(true)? 0 : 1);
	}
	
	public static class Map extends Mapper<Object, Text, Text, WlanString>{
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
			String[] split = value.toString().split("\t");
			String keyNum = split[1];
			WlanString ws = new WlanString(split[6],split[7],split[8],split[9]);
			System.out.println(split[6] + "| " + split[7] + "|" + split[8] + "|" + split[9]);
			context.write(new Text(keyNum), ws);
		}
	}
	
	public static class Reduce extends Reducer<Text, WlanString, Text, WlanString>{
		public void reduce(Text key, Iterable<WlanString> values, Context context) throws IOException, InterruptedException{
			long upData = 0;
			long downData = 0;
			long upFlow = 0;
			long downFlow = 0;
			for (WlanString w : values) {
				upData += w.upData;
				downData += w.downData;
				upFlow += w.upFlow;
				downFlow += w.downFlow;
			}
			WlanString newWs = new WlanString(String.valueOf(upData),String.valueOf(downData),String.valueOf(upFlow),String.valueOf(downFlow)); 
			context.write(key, newWs);
		}
	}
	
	public static class WlanString implements Writable{

		long upData;
		long downData;
		long upFlow;
		long downFlow;
		
		public WlanString(){	}
		
		public WlanString(String  upData1, String downData1, String upFlow1, String downFlow1) {
			// TODO Auto-generated constructor stub
			this.upData =  Long.parseLong(upData1);
			this.upData = Long.parseLong(downData1);
			this.upFlow =  Long.parseLong(upFlow1);
			this.downFlow = Long.parseLong(downFlow1);
		}

		@Override
		public void readFields(DataInput in) throws IOException {
			// TODO Auto-generated method stub
			this.upData = in.readLong();
			this.downData = in.readLong();
			this.upFlow = in.readLong();
			this.downFlow = in.readLong();
			System.out.println("upData:" + upData + "downData:" + downData + "upFlow:" + upFlow + "downFlow:" + downFlow);
		}

		@Override
		public void write(DataOutput out) throws IOException {
			// TODO Auto-generated method stub
			out.writeLong(upData);
			out.writeLong(downData);
			out.writeLong(upFlow);
			out.writeLong(upFlow);
			System.out.println("upData:" + upData + "downData:" + downData + "upFlow:" + upFlow + "downFlow:" + downFlow);
		}	

		 @Override  
	     public String toString() {  
	            return upData + "\t" + downData + "\t" + upFlow +"\t" + downFlow;  
	     }  
	}
}


四:结果展示



相关文章
|
2月前
|
SQL 分布式计算 关系型数据库
阿里云E-MapReduce Trino专属集群外连引擎及权限控制踩坑实践
本文以云厂商售后技术支持的角度,从客户的需求出发,对于阿里云EMR-Trino集群的选型,外连多引擎的场景、Ldap以及Kerberos鉴权等问题进行了简要的实践和记录,模拟客户已有的业务场景,满足客户需求的同时对过程中的问题点进行解决、记录和分析,包括但不限于Mysql、ODPS、Hive connector的配置,Hive、Delta及Hudi等不同表格式读取的兼容,aws s3、阿里云 oss协议访问异常的解决等。
|
4月前
|
分布式计算 Hadoop Java
MapReduce编程:自定义分区和自定义计数器
MapReduce编程:自定义分区和自定义计数器
30 0
|
5月前
|
存储 分布式计算 负载均衡
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
【大数据技术Hadoop+Spark】MapReduce概要、思想、编程模型组件、工作原理详解(超详细)
64 0
|
2月前
|
存储 分布式计算 算法
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
【底层服务/编程功底系列】「大数据算法体系」带你深入分析MapReduce算法 — Shuffle的执行过程
29 0
|
25天前
|
机器学习/深度学习 分布式计算 监控
面经:MapReduce编程模型与优化策略详解
【4月更文挑战第10天】本文是关于MapReduce在大数据处理中的关键作用的博客摘要。作者分享了面试经验,强调了MapReduce的基本原理、Hadoop API、优化策略和应用场景。MapReduce包含Map和Reduce两个主要阶段,Map阶段处理输入数据生成中间键值对,Reduce阶段进行聚合计算。面试重点包括理解MapReduce工作流程、使用Hadoop API编写Map/Reduce函数、选择优化策略(如分区、Combiner和序列化)以及应用场景,如日志分析和机器学习。
19 2
|
26天前
|
分布式计算 监控 Hadoop
Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
【4月更文挑战第3天】Hadoop【基础知识 02】【分布式计算框架MapReduce核心概念+编程模型+combiner&partitioner+词频统计案例解析与进阶+作业的生命周期】(图片来源于网络)
58 0
|
4月前
|
分布式计算 Java Hadoop
MapReduce编程:检索特定群体搜索记录和定义分片操作
MapReduce编程:检索特定群体搜索记录和定义分片操作
29 0
|
4月前
|
分布式计算 Java Hadoop
MapReduce编程:数据过滤保存、UID 去重
MapReduce编程:数据过滤保存、UID 去重
50 0
|
4月前
|
缓存 分布式计算 Java
MapReduce编程:join操作和聚合操作
MapReduce编程:join操作和聚合操作
33 0
|
5月前
|
存储 分布式计算 分布式数据库
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
对给定的数据利用MapReduce编程实现数据的清洗和预处理,编程实现数据存储到HBase数据库,实现数据的增删改查操作接口
29 0