package ch3;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class EnglishMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(EnglishMain.class);
job.setMapperClass(EnglishMapper.class);
job.setReducerClass(EnglishReducer.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
package ch3;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Mapper.Context;
public class EnglishMapper extends Mapper<LongWritable,Text, LongWritable, LongWritable> {
@Override
protected void map(LongWritable key1,Text value1, Context context)
throws IOException, InterruptedException {
String data = value1.toString();
String[] words = data.split(",");
context.write(new LongWritable(Integer.parseInt(words[1])),new LongWritable(Integer.parseInt(words[5])));
}
}
package ch3;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class EnglishReducer extends Reducer<LongWritable, LongWritable,LongWritable, LongWritable> {
@Override
protected void reduce(LongWritable k3, Iterable<LongWritable> v3,Context context) throws IOException,InterruptedException {
long sum = 0;
long count = 0;
for(LongWritable v:v3)
{
sum+=v.get();
count+=1;
}
context.write(k3, new LongWritable(sum / count));
}
}