package test;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
public class IsearchRecordReader extends RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(IsearchRecordReader.class);
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
private int maxLineLength;
private LongWritable key = null;
private Text value = null;
// 行分隔符,即一条记录的分隔符
private byte[] separator = { '\b' };
private int sepLength = 1;
public IsearchRecordReader() {
}
public IsearchRecordReader(String seps) {
this.separator = seps.getBytes();
sepLength = separator.length;
}
public void initialize(InputSplit genericSplit, TaskAttemptContext context)
throws IOException {
FileSplit split = (FileSplit) genericSplit;
Configuration job = context.getConfiguration();
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
this.start = split.getStart();
this.end = (this.start + split.getLength());
Path file = split.getPath();
this.compressionCodecs = new CompressionCodecFactory(job);
CompressionCodec codec = this.compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
boolean skipFirstLine = false;
if (codec != null) {
this.in = new LineReader(codec.createInputStream(fileIn), job);
this.end = Long.MAX_VALUE;
} else {
if (this.start != 0L) {
skipFirstLine = true;
this.start -= sepLength;
fileIn.seek(this.start);
}
this.in = new LineReader(fileIn, job);
}
if (skipFirstLine) { // skip first line and re-establish "start".
int newSize = in.readLine(new Text(), 0, (int) Math.min(
(long) Integer.MAX_VALUE, end - start));
if (newSize > 0) {
start += newSize;
}
}
this.pos = this.start;
}
public boolean nextKeyValue() throws IOException {
if (this.key == null) {
this.key = new LongWritable();
}
this.key.set(this.pos);
if (this.value == null) {
this.value = new Text();
}
int newSize = 0;
while (this.pos < this.end) {
newSize = this.in.readLine(this.value, this.maxLineLength, Math
.max(
(int) Math.min(Integer.MAX_VALUE, this.end
- this.pos), this.maxLineLength));
if (newSize == 0) {
break;
}
this.pos += newSize;
if (newSize < this.maxLineLength) {
break;
}
LOG.info("Skipped line of size " + newSize + " at pos "
+ (this.pos - newSize));
}
if (newSize == 0) {
// 读下一个buffer
this.key = null;
this.value = null;
return false;
}
// 读同一个buffer的下一个记录
return true;
}
public LongWritable getCurrentKey() {
return this.key;
}
public Text getCurrentValue() {
return this.value;
}
public float getProgress() {
if (this.start == this.end) {
return 0.0F;
}
return Math.min(1.0F, (float) (this.pos - this.start)
/ (float) (this.end - this.start));
}
public synchronized void close() throws IOException {
if (this.in != null)
this.in.close();
}
/*class LineReader {
//回车键(hadoop默认)
//private static final byte CR = 13;
//换行符(hadoop默认)
//private static final byte LF = 10;
//按buffer进行文件读取
private static final int DEFAULT_BUFFER_SIZE = 32 * 1024 * 1024;
private int bufferSize = DEFAULT_BUFFER_SIZE;
private byte[] buffer;
private int bufferLength = 0;
private int bufferPosn = 0;
LineReader(InputStream in, int bufferSize) {
this.bufferLength = 0;
this.bufferPosn = 0;
// this.in = in;
this.bufferSize = bufferSize;
this.buffer = new byte[this.bufferSize];
}
public LineReader(InputStream in, Configuration conf) throws IOException {
this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
}
public void close() throws IOException {
in.close();
}
public int readLine(Text str, int maxLineLength) throws IOException {
return readLine(str, maxLineLength, Integer.MAX_VALUE);
}
public int readLine(Text str) throws IOException {
return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
}
//以下是需要改写的部分_start,核心代码
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
str.clear();
Text record = new Text();
int txtLength = 0;
long bytesConsumed = 0L;
boolean newline = false;
int sepPosn = 0;
do {
//已经读到buffer的末尾了,读下一个buffer
if (this.bufferPosn >= this.bufferLength) {
bufferPosn = 0;
bufferLength = in.read(buffer);
//读到文件末尾了,则跳出,进行下一个文件的读取
if (bufferLength <= 0) {
break;
}
}
int startPosn = this.bufferPosn;
for (; bufferPosn < bufferLength; bufferPosn ++) {
//处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
if(sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]){
sepPosn = 0;
}
//遇到行分隔符的第一个字符
if (buffer[bufferPosn] == separator[sepPosn]) {
bufferPosn ++;
int i = 0;
//判断接下来的字符是否也是行分隔符中的字符
for(++ sepPosn; sepPosn < sepLength; i ++, sepPosn ++){
//buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
if(bufferPosn + i >= bufferLength){
bufferPosn += i - 1;
break;
}
//一旦其中有一个字符不相同,就判定为不是分隔符
if(this.buffer[this.bufferPosn + i] != separator[sepPosn]){
sepPosn = 0;
break;
}
}
//的确遇到了行分隔符
if(sepPosn == sepLength){
bufferPosn += i;
newline = true;
sepPosn = 0;
break;
}
}
}
int readLength = this.bufferPosn - startPosn;
bytesConsumed += readLength;
//行分隔符不放入块中
//int appendLength = readLength - newlineLength;
if (readLength > maxLineLength - txtLength) {
readLength = maxLineLength - txtLength;
}
if (readLength > 0) {
record.append(this.buffer, startPosn, readLength);
txtLength += readLength;
//去掉记录的分隔符
if(newline){
str.set(record.getBytes(), 0, record.getLength() - sepLength);
}
}
} while (!newline && (bytesConsumed < maxBytesToConsume));
if (bytesConsumed > (long)Integer.MAX_VALUE) {
throw new IOException("Too many bytes before newline: " + bytesConsumed);
}
return (int) bytesConsumed;
}
//以下是需要改写的部分_end
//以下是hadoop-core中LineReader的源码_start
public int readLine(Text str, int maxLineLength, int maxBytesToConsume) throws IOException{
str.clear();
int txtLength = 0;
int newlineLength = 0;
评论1
最新资源