博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
手动实现一个单词统计MapReduce程序与过程原理分析
阅读量:6395 次
发布时间:2019-06-23

本文共 7119 字,大约阅读时间需要 23 分钟。

[toc]


手动实现一个单词统计MapReduce程序与过程原理分析

前言

我们知道,在搭建好hadoop环境后,可以运行wordcount程序来体验一下hadoop的功能,该程序在hadoop目录下的share/hadoop/mapreduce目录中,通过下面的命令:

yarn jar $HADOOP_HOME/share/hadoop/mapreducehadoop-mapreduce-examples-2.6.4.jar wordcount inputPath outPath

即可对输入文件执行单词统计的计算。

那么下面就通过手动写一个wordcount的例子来加深对MapReduce的基本理解。

案例场景

假如有下面一个文本文件需要进行单词统计:

$ cat hellohello youhello hehello me

Note:该hello文件为李老师的经典文本文件。

下面就来演示MapReduce程序如何来对该文本文件进行计算,最后再依据此写一个wordcount程序。

MapReduce计算分析

我们先来简单分析一下MapReduce是如何处理上面的文本文件,然后才写一个程序。

对于上面的一个文本文件,MapReduce程序分三个步骤进行处理:Map阶段、Shuffle阶段和Reduce阶段。(三个阶段的分析在代码的注释中也是非常详细的解释)

Map阶段

上面的文本文件经过Map处理后会得到类似下面的结果:

shuffle阶段

对Map阶段的结果进行处理,会得到如下的结果:

Reduce阶段

经过reducer处理之后,结果如下:

关于上面的过程分析,可以参考下面的几张图示以帮助理解:

图示1:

手动实现一个单词统计MapReduce程序与过程原理分析

图示2:

手动实现一个单词统计MapReduce程序与过程原理分析

图示3:

手动实现一个单词统计MapReduce程序与过程原理分析

程序思路分析

* 整个的解题思路,使用map函数进行单词的拆分,使用reduce函数进行汇总,中间进行shuffle * 要想让我们的map函数和reduce函数进行接替运行,需要一个驱动程序 * 代码的思路: * 1、编写一个类继承Mapper,成为自定义的Mapper,主要业务逻辑就是复写其中的map函数 *  map *  首先要确定清楚Mapper类或者map函数的数据类型/类型参数--->泛型 *  Mapper
* 2、编写一个类继承Reducer,成为自定义的Reducer,主要业务逻辑就是复写其中的reduce函数 * reduce * 首先要确定清楚Reducer类或者reduce函数它的数据类型/类型参数--->泛型 * Reducer
* * 需要我们用户自定义的类型就是K2, V2, K3, V3 * K1和V1一般情况下是固定的,只要数据格式确定,其类型就确定 * 比如我们操作的是普通的文本文件,那么K1=LongWritable,V1=Text * K1--->代表的是这一行记录在整个文本中的偏移量,V1就是这一行文本的内容 * (也就是说,K1和V1取决于我们要处理的是什么文件) * 注意:与Hadoop的程序需要使用Hadoop提供的数据类型,而不能使用java中提供的数据类型

wordcount程序

程序代码中有非常详细的注释,可以参考来进行理解。

WordCount.java

package com.uplooking.bigdata.mr.wc;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;/** * 统计hdfs://uplooking01:9000/input/mr/hello的单词出现次数 * * 整个的解题思路,使用map函数进行单词的拆分,使用reduce函数进行汇总,中间进行shuffle * 要想让我们的map函数和reduce函数进行接替运行,需要一个驱动程序 * 代码的思路: * 1、编写一个类继承Mapper,成为自定义的Mapper,主要业务逻辑就是复写其中的map函数 *  map *  首先要确定清楚Mapper类或者map函数的数据类型/类型参数--->泛型 *  Mapper
* 2、编写一个类继承Reducer,成为自定义的Reducer,主要业务逻辑就是复写其中的reduce函数 * reduce * 首先要确定清楚Reducer类或者reduce函数它的数据类型/类型参数--->泛型 * Reducer
* * 需要我们用户自定义的类型就是K2, V2, K3, V3 * K1和V1一般情况下是固定的,只要数据格式确定,其类型就确定 * 比如我们操作的是普通的文本文件,那么K1=LongWritable,V1=Text * K1--->代表的是这一行记录在整个文本中的偏移量,V1就是这一行文本的内容 * (也就是说,K1和V1取决于我们要处理的是什么文件) * 注意:与Hadoop的程序需要使用Hadoop提供的数据类型,而不能使用java中提供的数据类型 */public class WordCount { /** * 这里的main函数就是用来组织map函数和reduce函数的 * 最终mr的运行会转变成一个个的Job * * @param args */ public static void main(String[] args) throws Exception { // 构建Job所需的配置文件和jobName Configuration configuration = new Configuration(); String jobName = "wordcount"; // 1.创建一个job Job job = Job.getInstance(configuration, jobName); // 添加mr要运行的主函数所在的类,就是WordCount这个类 job.setJarByClass(WordCount.class); // 2.设置mr的输入参数 // 设置计算的文件 Path inputPath = new Path("hdfs://uplooking01:9000/input/mr/hello"); FileInputFormat.setInputPaths(job, inputPath); // 指定解析数据源的Format类,即将输入的数据解析为
的形式,然后再交由mapper函数处理 job.setInputFormatClass(TextInputFormat.class); // 指定使用哪个mapper来进行计算 job.setMapperClass(WordCountMapper.class); // 指定mapper结果的key的数据类型(即K2的数据类型),注意要与我们写的Mapper中定义的一致 job.setMapOutputKeyClass(Text.class); // 指定mapper结果的value的数据类型(即V2的数据类型),注意要与我们写的Mapper中定义的一致 job.setMapOutputValueClass(IntWritable.class); // 3.设置mr的输出参数 // 设置输出的目录 Path outputPath = new Path("hdfs://uplooking01:9000/output/mr/wc"); // 如果outputPath目录存在,会抛出目录存在异常,这里先删除,保证该目录不存在 outputPath.getFileSystem(configuration).delete(outputPath, true); FileOutputFormat.setOutputPath(job, outputPath); // 指定格式化数据结果的Format类 job.setOutputFormatClass(TextOutputFormat.class); // 指定使用哪个reducer来进行汇总 job.setReducerClass(WordCountReducer.class); // 指定reduce结果的key的数据类型(即K3的数据类型),注意要与我们写的Reducer中定义的一致 job.setOutputKeyClass(Text.class); // 指定reduce结果的value的数据类型(即V3的数据类型),注意要与我们写的Reducer中定义的一致 job.setOutputValueClass(IntWritable.class); // 设置有几个reducer来执行mr程序,默认为1个 job.setNumReduceTasks(1); // 提交mapreduce job job.waitForCompletion(true); }}

WordCountMapper.java

package com.uplooking.bigdata.mr.wc;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * 1、编写一个类继承Mapper,成为自定义的Mapper,主要业务逻辑就是复写其中的map函数 *  map *  首先要确定清楚Mapper类或者map函数的数据类型/类型参数--->泛型 *  Mapper
* K1:行的偏移量,如第998行 * V1:行的内容,如 hello you * K2:输出的数据的key值,如hello * V2:输出的数据的value值,如1 * 注意,为了减少在网络中传输的数据,map之后得到的结果还需要进行shuffle,将相同key的value汇总起来: * 如: * map后的结果有:
,
,
,
,
,
* shuffle后的结果为:
,
,
,
* 这样相比原来map的结果,数据的量就少了许多 */public class WordCountMapper extends Mapper
{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { // 先将每一行转换为java的String类型 String line = v1.toString(); // 将行中的单词以空格作为分隔符分离出来得到一个字符串数组 String[] words = line.split(" "); // 定义输出数据的变量k2和v2,类型分别为Text和IntWritable Text k2 = null; IntWritable v2 = null; // 统计单词并写入到上下文变量context中 for (String word : words) { k2 = new Text(word); v2 = new IntWritable(1); context.write(k2, v2); } }}

WordCountReducer.java

package com.uplooking.bigdata.mr.wc;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/** * 2、编写一个类继承Reducer,成为自定义的Reducer,主要业务逻辑就是复写其中的reduce函数 *  reduce *  首先要确定清楚Reducer类或者reduce函数它的数据类型/类型参数--->泛型 *  Reducer
* K2:map输出中的key值 * V2s:map输出中根据本周key值shuffle后得到的可迭代列表 * 如:
,
,
,
* K3:reduce输出中的key值 * V3:reduce输出中的value值 */public class WordCountReducer extends Reducer
{ @Override protected void reduce(Text k2, Iterable
v2s, Context context) throws IOException, InterruptedException { // 定义某个key值k2出现次数的变量 int sum = 0; // 统计k2孤个数 for (IntWritable item : v2s) { sum += item.get(); } // 构建reduce输出的k3和v3,类型分别为Text和IntWritable Text k3 = k2; IntWritable v3 = new IntWritable(sum); // 结果reduce结果写入到上下文变量context中 context.write(k2, v3); }}

测试

将上面的程序打包成jar包,然后上传到我们的hadoop服务器上,执行下面的命令:

yarn jar wordcount.jar com.uplooking.bigdata.mr.wc.WordCount

这样就可以使用在hadoop中使用我们自己写的wodcount程序来进行MapReduce的计算。

任务执行结束后,通过下面的命令查看结果:

$ hdfs dfs -cat /output/mr/wc/part-r-0000018/03/03 13:59:55 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicablehe  1hello   3me  1you 1

这样就完成了从编写MR程序到测试的完整过程。

转载于:https://blog.51cto.com/xpleaf/2082642

你可能感兴趣的文章
希尔排序
查看>>
几种常见的内嵌数据库
查看>>
改变随机数中一些值的概率
查看>>
velocity在html中换行符转换
查看>>
ASP.NET程序开发范例宝典
查看>>
Spark分析之SparkContext启动过程分析
查看>>
2014电子商务安全技术峰会(含全议题下载)
查看>>
东大OJ-5到100000000之间的回文质数
查看>>
linux C 快速排序法
查看>>
模仿与创新
查看>>
Python用subprocess的Popen来调用系统命令
查看>>
Java NIO与IO的差别和比較
查看>>
.NET源代码的内部排序实现
查看>>
解决Strict Standards: Only variables should be passed by reference
查看>>
解决JBoss只能通过localhost(127.0.0.1)而不能通过IP访问
查看>>
MS SQL处理双引号(DoubleQuote)函数
查看>>
[智能架构系列]什么是Buddy智能开发框架
查看>>
三十一、关于android camera setParameters出错
查看>>
【收藏】QCIF、 CIF、2CIF、DCIF、D1(4CIF)格式介绍
查看>>
hdu 3836 Equivalent Sets (tarjan缩点)
查看>>