package com.bigdata.mapreduce;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.bigdata.mapreduce.WordCountDemo.WordCountReducer;
public class MapReduceModule {
//step1 : Mapper class
public static class MapReduceMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
// 定义变量
private Text mapOutputKey = new Text();
// 出现一次单词就记录一次
private IntWritable mapOutputValue = new IntWritable(1);
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取文件的每一行,将text类型转换成String类型
String linevalue = value.toString();
// 分割单词,以空格分割
String[] strs = linevalue.split(" ");
// 分割之后将单词从数组中一个个拿出来,组成<key value>
for(String str : strs) {
// 设置key的输出
mapOutputKey.set(str);
// map输出
context.write(mapOutputKey, mapOutputValue);
}
}
}
//step2 : Reducer class
public static class MapReduceReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private IntWritable outputValue = new IntWritable();
@Override
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 累加
int sum = 0;
// 对值进行迭代累加
for(IntWritable value : values) {
sum += value.get();
}
// 输出 set output value
outputValue.set(sum);
//最终输出
context.write(key, outputValue);
}
}
//step3 : Driver
public int run(String[] args) throws Exception{
// 获取集群中的相关配置信息
Configuration configuration = new Configuration();
//configuration.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
// 创建一个job任务
Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
// 整个MapReduce程序运行的入口,或者叫 jar包的入口,jar包具体运行的是哪个类
job.setJarByClass(this.getClass());
// 设置job
// input 输入,输入路径
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
// output 输出路径的设置
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);
// 设置Mapper
job.setMapperClass(MapReduceMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//=================shuffle=============
// 1. 分区
// job.setPartitionerClass(cls);
// 2. 排序
// job.setSortComparatorClass(cls);
// 3. 优化
job.setCombinerClass(WordCountReducer.class);
// 4. 分组
// job.setGroupingComparatorClass(cls);
//=================shuffle=============
// 设置Reducer
job.setReducerClass(MapReduceReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 提交job给 YARN
boolean isSuccess = job.waitForCompletion(true);
return isSuccess ? 0:1;
}
public static void main(String[] args) throws Exception{
args = new String[] {
"hdfs://localhost:8020/user/mapreduce/input",
"hdfs://localhost:8020/user/mapreduce/output"
};
// run job
int status = new MapReduceModule().run(args);
// 关闭
System.exit(status);
}
}