7.0 MapReduce 编程

在学习了 MapReduce 的使用之后,我们已经可以处理 Word Count 这类统计和检索任务,但是客观上 MapReduce 可以做的事情还有很多。

MapReduce 主要是依靠开发者通过编程来实现功能的,开发者可以通过实现 Map 和 Reduce 相关的方法来进行数据处理。

为了简单的展示这一过程,我们将手工编写一个 Word Count 程序。

注意:MapReduce 依赖 Hadoop 的库,但由于本教程使用的 Hadoop 运行环境是 Docker 容器,难以部署开发环境,所以真实的开发工作(包含调试)将需要一个运行 Hadoop 的计算机。在这里我们仅学习已完成程序的部署。

MyWordCount.java 文件代码

/**
 * 引用声明
 * 本程序引用自 http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.html
 */



package
com.runoob.hadoop
;


import
java.io.IOException
;


import
java.util.*
;


import
org.apache.hadoop.fs.Path
;


import
org.apache.hadoop.io.*
;


import
org.apache.hadoop.mapred.*
;


/**
 * 与 `Map` 相关的方法
 */



class
Map
extends MapReduceBase
implements Mapper
<LongWritable, Text, Text, IntWritable
>
{

   
private
final
static IntWritable one
=
new IntWritable
(
1
)
;

   
private Text word
=
new Text
(
)
;

   
public
void map
(LongWritable key,

               Text value,

               OutputCollector
<Text, IntWritable
> output,

               Reporter reporter
)

         
throws
IOException
{

     
String line
= value.
toString
(
)
;

     
StringTokenizer tokenizer
=
new
StringTokenizer
(line
)
;

     
while
(tokenizer.
hasMoreTokens
(
)
)
{

         word.
set
(tokenizer.
nextToken
(
)
)
;

         output.
collect
(word, one
)
;

     
}

   
}


}


/**
 * 与 `Reduce` 相关的方法
 */



class Reduce
extends MapReduceBase
implements Reducer
<Text, IntWritable, Text, IntWritable
>
{

   
public
void reduce
(Text key,

                  Iterator
<IntWritable
> values,

                  OutputCollector
<Text, IntWritable
> output,

                  Reporter reporter
)

         
throws
IOException
{

     
int sum
=
0
;

     
while
(values.
hasNext
(
)
)
{

         sum
+= values.
next
(
).
get
(
)
;

     
}

      output.
collect
(key,
new IntWritable
(sum
)
)
;

   
}


}


public
class MyWordCount
{

   
public
static
void main
(
String
[
] args
)
throws
Exception
{

      JobConf conf
=
new JobConf
(MyWordCount.
class
)
;

      conf.
setJobName
(
“my_word_count”
)
;

      conf.
setOutputKeyClass
(Text.
class
)
;

      conf.
setOutputValueClass
(IntWritable.
class
)
;

      conf.
setMapperClass
(
Map.
class
)
;

      conf.
setCombinerClass
(Reduce.
class
)
;

      conf.
setReducerClass
(Reduce.
class
)
;

      conf.
setInputFormat
(TextInputFormat.
class
)
;

      conf.
setOutputFormat
(TextOutputFormat.
class
)
;

     
// 第一个参数表示输入

      FileInputFormat.
setInputPaths
(conf,
new Path
(args
[
0
]
)
)
;

     
// 第二个输入参数表示输出

      FileOutputFormat.
setOutputPath
(conf,
new Path
(args
[
1
]
)
)
;

      JobClient.
runJob
(conf
)
;

   
}


}

请将此 Java 文件的内容保存到 NameNode 容器中去,建议位置:

/home/hadoop/MyWordCount/com/runoob/hadoop/MyWordCount.java

注意:根据当前情况,有的 Docker 环境中安装的 JDK 不支持中文,所以保险起见,请去掉以上代码中的中文注释。

进入目录:

cd /home/hadoop/MyWordCount

编译:

javac -classpath ${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.4.jar -classpath ${HADOOP_HOME}/share/hadoop/client/hadoop-client-api-3.1.4.jar com/runoob/hadoop/MyWordCount.java

打包:

jar -cf my-word-count.jar com

执行:

hadoop jar my-word-count.jar com.runoob.hadoop.MyWordCount /wordcount/input /wordcount/output2

查看结果:

hadoop fs -cat /wordcount/output2/part-00000

输出:

I       4
hadoop  2
like    2
love    2
runoob  2