博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
实例理解mapreduce任务的串行运行过程
阅读量:5024 次
发布时间:2019-06-12

本文共 5635 字,大约阅读时间需要 18 分钟。

一、准备:

eclipse,hadoop集群

注意:为了方便测试和修改,我用的是 windows 连接hadoop集群,这样在windows 下直接就能够执行 mapreduce 任务,方便程序调试。在 windows 下执行 mapreduce 任务需要安装相关插件,可以参考  

数据下载地址:

密码:idnx

二、分析

本案例的数据来自某搜索引擎开放出来的部分搜索数据,数据格式如下:

20111230104334    966a6bf4c4ec1cc693b6e40702984235    X档案研究所TXT下载    4    2    http://www.readist.cn/book/dl024708.html

分别有6个字段:时间,id,关键词,该URL在返回结果中的排名,用户点击顺序号,点击的URL。每个字段之间用 \t 分隔。

现在需要统计独立 UID 的总数,也就是说要将 UID 去重,然后累加。用 mapreduce 编程思想可以看作先执行一个mapreduce 任务将 UID 去重,再执行一个 mapreduce 任务将去重后的数据累加。所以这里要执行两个mapreduce 任务,这两个mapreduce 任务是串行运行的。

三、实现

实现分两个部分,第一部分是对UID去重,第二部分是对去重后的UID累加。第一部分的输出结果作为第二部分的输入结果。

第一部分:

1.首先执行的是去重过程,去重的过程实际上就是wordcount 的过程,Map 函数先将输入的数据标记成 key-value 键值对,其中key 是行号,value 是这一行的值,然后设置我们要统计的列类,并对这个列进行标记,Map函数的输出结果类似于如下所示:

(966a6bf4c4ec1cc693b6e40702984232,1

(966a6bf4c4ec1cc693b6e40702984235,1

(966a6bf4c4ec1cc693b6e40702984235,1...

2.Reduce 的过程是对 Map 过程的一个累加,将 value 累加在一起,得出结果。

public static class MyMapper1 extends Mapper
{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { IntWritable One = new IntWritable(1); Text k1 = new Text(""); String line = value.toString(); String[] data = line.split("\t"); if (data != null || data.length == 6) { String uid = data[1]; k1.set(uid); context.write(k1, One); } } } public static class MyReduce1 extends Reducer
{ @Override protected void reduce(Text key, Iterable
value, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable iw : value) { sum += iw.get(); } context.write(key, new IntWritable(sum)); } }

输出结果是对UID的一个统计,这个过程实现了UID的去重,第一列就是去重后的UID,部分结果如下:

000048ad4cb133b2bb376f07356dde9e    6

00005c113b97c0977c768c13a6ffbb95    2
000064b4c0f12cfb69cb4646835c6544    1

第二部分:

1.在第一部分的中,得到了去重后的UID和数量,我们只需要第一列数据,即去重后的UID。

这部分的重点是统计去重后的UID的数量。

2.第一部分的结果输出作为第二部分的结果输入,所以在Map

过程中我们只需要将第二个字段的数据重置为 1 即可,这里定义了新的 key , key 的值是“独立uid的数量是:”,value 是 1 ,写成键值对的形式就是:(独立uid的数量是:,1

Map 函数的输出结果是:

独立uid的数量是:    1

独立uid的数量是:    1
独立uid的数量是:    1

reduce的过程是对 Map 的输出结果的累加,即对 “1” 的累加,输出结果就是我们的最终结果:

独立uid的数量是:    13526

public static class MyMapper2 extends Mapper
{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { IntWritable One = new IntWritable(1); Text key1=new Text("独立uid的数量是 :"); context.write(key1, One); } } public static class MyReduce2 extends Reducer
{ @Override protected void reduce(Text key, Iterable
value, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable iw : value) { sum += iw.get(); } context.write(key, new IntWritable(sum)); } }
以上过程就是mapreduce的串行执行过程。完整程序如下:

package com.sosuo;import java.io.IOException;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class UuidNumber {	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {		Configuration conf = new Configuration();		Job job1 = new Job(conf, "dlUid");		job1.setJarByClass(UuidNumber.class);		job1.setMapperClass(MyMapper1.class);		job1.setReducerClass(MyReduce1.class);		job1.setOutputKeyClass(Text.class);		job1.setOutputValueClass(IntWritable.class);		FileInputFormat.addInputPath(job1, new Path(args[0]));		FileOutputFormat.setOutputPath(job1, new Path(args[1]));		job1.waitForCompletion(true);		Job job2 = new Job(conf, "dlUid");		job2.setJarByClass(UuidNumber.class);		job2.setMapperClass(MyMapper2.class);		job2.setReducerClass(MyReduce2.class);		job2.setOutputKeyClass(Text.class);		job2.setOutputValueClass(IntWritable.class);		FileInputFormat.addInputPath(job2, new Path(args[1]));		FileOutputFormat.setOutputPath(job2, new Path(args[2]));		job2.waitForCompletion(true);	}	public static class MyMapper1 extends Mapper
{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { IntWritable One = new IntWritable(1); Text k1 = new Text(""); String line = value.toString(); String[] data = line.split("\t"); if (data != null || data.length == 6) { String uid = data[1]; k1.set(uid); context.write(k1, One); } } } public static class MyReduce1 extends Reducer
{ @Override protected void reduce(Text key, Iterable
value, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable iw : value) { sum += iw.get(); } context.write(key, new IntWritable(sum)); } } public static class MyMapper2 extends Mapper
{ @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { IntWritable One = new IntWritable(1); Text key1=new Text(""); context.write(key1, One); } } public static class MyReduce2 extends Reducer
{ @Override protected void reduce(Text key, Iterable
value, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable iw : value) { sum += iw.get(); } context.write(key, new IntWritable(sum)); } }}
要注意的是输入路径和输出路径,有三个路径,其中第一个是输入路径,第二个是中间结果的保存路径,即第一个mapreduce任务的输出结果,第三个是最终结果的路径。keke~

转载于:https://www.cnblogs.com/beeman/p/7776428.html

你可能感兴趣的文章
逆时针旋转的矩阵变换
查看>>
第10周15/16/17
查看>>
【数据库】SQL两表之间:根据一个表的字段更新另一个表的字段
查看>>
四六级作文常见错误解析(转载)
查看>>
Tomcat
查看>>
./是当前目录 ../是当前的上一级目录。上上级就是../../一般绝对路径时候常用...
查看>>
linux支持FTP和SFTP服务【1】
查看>>
树的递归与非递归遍历方法
查看>>
每天一个Linux命令(6):rmdir命令
查看>>
oracle连接的三个配置文件(转)
查看>>
Vim配置文件(Vimrc)
查看>>
RecyclerView 局部刷新(获取viewHolder 去刷新)
查看>>
PHP表单(get,post)提交方式
查看>>
使用vbs或者bat脚本修改IE浏览器安全级别和选项
查看>>
Silverlight入门
查看>>
Silverlight动态调用WEBSERVICE,WCF方法
查看>>
LeetCode 895. Maximum Frequency Stack
查看>>
模仿segmentfault 评论
查看>>
一个简单的日志函数C++
查看>>
Java 8 中如何优雅的处理集合
查看>>