import java.io.IOException;
import org.apache.commons.io.output.NullWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class PageRankViewer {
//<url,<pr值,链出信息>>
public static class PageRankViewerMapper extends
Mapper<LongWritable, Text, DoubleWritable, Text> {
private Text outPage = new Text();
private DoubleWritable outPr = new DoubleWritable();
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] line = value.toString().split("\t");
String page = line[0];
double pr = Double.parseDouble(line[1]);
outPage.set(page);
outPr.set(pr);
context.write(outPr,outPage);
}
}
public static class ViewReducer extends Reducer<DoubleWritable, Text, Text, NullWritable> {
public void reduce(DoubleWritable key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for(Text text:values){
context.write(new Text("("+text+","+String.format("%.10f", key.get())+")"),NullWritable.get());
}
}
}
public static class DescDoubleComparator extends DoubleWritable.Comparator {
// @Override
public float compare(WritableComparator a,
WritableComparable<DoubleWritable> b) {
return -super.compare(a, b);
}
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
return -super.compare(b1, s1, l1, b2, s2, l2);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job3 =Job.getInstance(conf, "PageRankViewer");
job3.setJarByClass(PageRankViewer.class);
job3.setOutputKeyClass(DoubleWritable.class);
job3.setSortComparatorClass(DescDoubleComparator.class);
job3.setOutputValueClass(Text.class);
job3.setMapperClass(PageRankViewerMapper.class);
job3.setReducerClass(ViewReducer.class);
FileInputFormat.addInputPath(job3, new Path("hdfs://localhost:9000/user/hadoop/Data10"));
FileOutputFormat.setOutputPath(job3, new Path("hdfs://localhost:9000/user/hadoop/output"));
job3.waitForCompletion(true);
}
}