package tokenize.inputformat;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;
public class MyRecordReader extends RecordReader<Text, Text> {
private CombineFileSplit combineFileSplit; // 当前处理的分片
private int totalLength; // 分片包含的文件数量
private int currentIndex; // 当前处理的文件索引
private float currentProgress = 0; // 当前的进度
private Text currentKey = new Text(); // 当前的Key
private Text currentValue = new Text(); // 当前的Value
private Configuration conf; // 任务信息
private boolean processed; // 记录当前文件是否已经读取
public MyRecordReader(CombineFileSplit combineFileSplit,
TaskAttemptContext context, Integer index) throws IOException {
super();
this.currentIndex = index;
this.combineFileSplit = combineFileSplit;
conf = context.getConfiguration();
totalLength = combineFileSplit.getPaths().length;
processed = false;
}
@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
}
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}
@Override
public float getProgress() throws IOException {
if (currentIndex >= 0 && currentIndex < totalLength) {
currentProgress = (float) currentIndex / totalLength;
return currentProgress;
}
return currentProgress;
}
@Override
public void close() throws IOException {
}
@Override
public boolean nextKeyValue() throws IOException {
if (!processed) { // 如果文件未处理则读取文件并设置key-value
// set key
Path file = combineFileSplit.getPath(currentIndex);
currentKey.set(file.getParent().getName()); // category's name
// set value
FSDataInputStream in = null;
byte[] contents = new byte[(int)combineFileSplit.getLength(currentIndex)];
try {
FileSystem fs = file.getFileSystem(conf);
in = fs.open(file);
in.readFully(contents);
currentValue.set(contents);
} catch (Exception e) {
} finally {
in.close();
}
processed = true;
return true;
}
return false; //如果文件已经处理,必须返回false
}
}
【甘道夫】通过Mahout构建贝叶斯文本分类器案例详解 -- 配套源码
4星 · 超过85%的资源 需积分: 10 88 浏览量
2015-01-07
12:02:36
上传
评论
收藏 3KB RAR 举报
Gandalf_lee
- 粉丝: 194
- 资源: 4
最新资源
- 华师调节效应和中介效应分析教学讲义 温忠麟
- 2023年更新儒家文化数据大全包含明清进士、书院数量、孔庙遗存
- 深度学习要点优缺点实际应用,代码案例和代码解析
- LBMA,SCE,CMRT冲突矿产相关规则和认证倡议机制培训+经典应用程序
- 企业寻租程度(超额管理费用测度)(2006-2020)stata原始数据+do文件+参考文献
- 【stata命令】市场分割-市场一体化-区域一体化指数stata计算教程
- 基于matlab实现移相全桥控制双向DCDC变换器matlab仿真原理图.rar
- 基于matlab实现小波分析应用(Matlab实例源程序)-.rar
- 珀顿国际公寓(武汉光谷步行街店) 2.m4a
- 基于matlab实现文件一个基于matlab的移动侦测方面的例子-move detect.rar
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈