package org.lgn.labe.mr;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Map.Entry;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;
import org.lgn.labe.util.HdfsLabelsWriter;
import org.lgn.labe.util.IntArrayWritable;
import org.lgn.labe.util.LabelsWriter;
public class LPReducer extends Reducer<IntWritable,IntArrayWritable,Text,Text>{
static final int LabelSIZE = 3;
LabelsWriter lw ;
long max = 0;
static class SortType implements Comparable<SortType>{
public int id;
public int weight;
public SortType(int id, int weight){
this.id = id;
this.weight = weight;
}
public SortType(Map.Entry<Integer, Integer> e){
this.id = e.getKey();
this.weight = e.getValue();
}
@Override
public int compareTo(SortType o1) {
// TODO Auto-generated method stub
if (this.weight > o1.weight)
return -1;
else if (this.weight < o1.weight)
return 1;
else if (this.id < o1.id)
return -1;
else if (this.id > o1.id)
return 1;
else
return 0;
}
public String toString(){
return id+":"+weight;
}
}
public void setup(Context context) throws IOException{
lw = new HdfsLabelsWriter(context.getConfiguration());
}
public void cleanup(Context context) throws IOException{
lw.writeSize(max);
lw.close();
}
public void reduce(IntWritable key, Iterable<IntArrayWritable> values, Context context)
throws IOException, InterruptedException {
HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
max = Math.max(max, key.get());
for(IntArrayWritable iaw : values){
Writable[] labelsIn = iaw.get();
for(int i = 0 ; i < labelsIn.length ; i+=2 ){
int id = ((IntWritable)labelsIn[i]).get();
int weight = ((IntWritable)labelsIn[i+1]).get();
if (id == 0){
continue;
}
Integer w = map.get(id);
if (w== null)
map.put(id, weight);
else
map.put(id, w + weight);
}
}
if (map.isEmpty()){
System.out.println("--------");
return;
}
ArrayList<SortType> list = new ArrayList<SortType>();
for(Entry<Integer, Integer> entry : map.entrySet()){
list.add(new SortType(entry));
}
Collections.sort(list);
int bound = 0;
if (list.size() >= LabelSIZE && list.get(0).weight - list.get(1).weight > list.get(1).weight - list.get(2).weight)
bound = 2;
else if (list.size() < LabelSIZE)
bound = list.size();
else
bound = LabelSIZE;
byte[] out = yieldLabels(list.subList(0, bound), key.get());
if (key.get()%10000 == 0){
System.out.println(list.size());
}
lw.writeOut(out); //
// context.write(new Text(""+key.get()), new Text(list.subList(0, bound).toString()));
}
private byte[] yieldLabels(List<SortType> list, int id){
int sum = 0;
for(int i = 0 ; i < list.size(); i++){
sum += list.get(i).weight;
}
byte[] out = new byte[]{0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0};
ByteBuffer bb = ByteBuffer.wrap(out);
bb.putInt(0, id);
for(int i = 0; i < list.size(); i++){
byte tweight = (byte)((list.get(i).weight / (sum+0.0)) * 256);
bb.putInt(4 * (i + 1) , list.get(i).id);
//must puts BYTE after puts INT!
bb.put(4 * (i + 1), tweight);
}
return bb.array();
}
}
- 1
- 2
- 3
前往页