没有合适的资源?快使用搜索试试~ 我知道了~
Hadoop源码解析---MapReduce之InputFormat
需积分: 14 8 下载量 56 浏览量
2016-04-28
09:03:05
上传
评论
收藏 601KB PDF 举报
温馨提示
试读
58页
结合Hadoop源码,详细讲解了MapReduce开发中的InputFormat,很详细。
资源推荐
资源详情
资源评论
[Hadoop
源码解读
]
(一)
MapReduce
篇之
InputFormat
平时我们写 MapReduce 程序的时候,在设置输入格式的时候,总会调用形如
job.setInputFormatClass(KeyValueTextInputFormat.class);来保证输入文件按照我们想要的格式被读取。所
有的输入格式都继承于 InputFormat,这是一个抽象类,其子类有专门用于读取普通文件的
FileInputFormat,用来读取数据库的 DBInputFormat 等等。
其实,一个输入格式 InputFormat,主要无非就是要解决如何将数据分割成分片[比如多少行为一个分片],
以及如何读取分片中的数据[比如按行读取]。前者由 getSplits()完成,后者由 RecordReader 完成。
不同的 InputFormat 都会按自己的实现来读取输入数据并产生输入分片,一个输入分片会被单独的 map
task 作为数据源。下面我们先看看这些输入分片(inputSplit)是什么样的。
InputSplit:
我们知道 Mappers 的输入是一个一个的输入分片,称 InputSplit。InputSplit 是一个抽象类,它在逻辑上
包含了提供给处理这个 InputSplit 的 Mapper 的所有 K-V 对。
public abstract class InputSplit {
public abstract long getLength() throws IOException, InterruptedException;
public abstract
String[] getLocations() throws IOException, InterruptedException;
}
getLength()用来获取 InputSplit 的大小,以支持对 InputSplits 进行排序,而 getLocations()则用来获取存
储分片的位置列表。
我们来看一个简单 InputSplit 子类:FileSplit。
public class FileSplit extends InputSplit implements Writable {
private Path file;
private long start;
private long length;
private String[] hosts;
FileSplit() {}
public FileSplit(Path file, long start, long length, String[] hosts) {
this.file = file;
this.start = start;
this.length = length;
this.hosts = hosts;
}
//序列化、反序列化方法,获得 hosts 等等……
}
从上面的源码我们可以看到,一个 FileSplit 是由文件路径,分片开始位置,分片大小和存储分片数据
的 hosts 列表组成,由这些信息我们就可以从输入文件中切分出提供给单个 Mapper 的输入数据。这些属
性会在 Constructor 设置,我们在后面会看到这会在 InputFormat 的 getSplits()中构造这些分片。
我们再看 CombineFileSplit:
public class CombineFileSplit extends InputSplit implements Writable {
private Path[] paths;
private long[] startoffset;
private long[] lengths;
private String[] locations;
private long totLength;
public CombineFileSplit() {}
public CombineFileSplit(Path[] files, long[] start,
long[] lengths, String[] locations) {
initSplit(files, start, lengths, locations);
}
public CombineFileSplit(Path[] files, long[] lengths) {
long[] startoffset = new long[files.length];
for (int i = 0; i < startoffset.length; i++) {
startoffset[i] = 0;
}
String[] locations = new String[files.length];
for (int i = 0; i < locations.length; i++) {
locations[i] = "";
}
initSplit(files, startoffset, lengths, locations);
}
private void initSplit(Path[] files, long[] start,
long[] lengths, String[] locations) {
this.startoffset = start;
this.lengths = lengths;
this.paths = files;
this.totLength = 0;
this.locations = locations;
for(long length : lengths) {
totLength += length;
}
}
//一些 getter 和 setter 方法,和序列化方法
}
与 FileSplit 类似,CombineFileSplit 同样包含文件路径,分片起始位置,分片大小和存储分片数据的 host
列表,由于 CombineFileSplit 是针对小文件的,它把很多小文件包在一个 InputSplit 内,这样一个 Mapper
就可以处理很多小文件。要知道我们上面的 FileSplit 是对应一个输入文件的,也就是说如果用 FileSplit
对应的 FileInputFormat 来作为输入格式,那么即使文件特别小,也是单独计算成一个输入分片来处理的。
当我们的输入是由大量小文件组成的,就会导致有同样大量的 InputSplit,从而需要同样大量的 Mapper
来处理,这将很慢,想想有一堆 map task 要运行!!这是不符合 Hadoop 的设计理念的,Hadoop 是为处
理大文件优化的。
最后介绍 TagInputSplit,这个类就是封装了一个 InputSplit,然后加了一些 tags 在里面满足我们需要这
些 tags 数据的情况,我们从下面就可以一目了然。
class TaggedInputSplit extends InputSplit implements Configurable, Writable {
private Class<? extends InputSplit> inputSplitClass;
private InputSplit inputSplit;
@SuppressWarnings("unchecked")
private Class<? extends InputFormat> inputFormatClass;
@SuppressWarnings("unchecked")
private Class<? extends Mapper> mapperClass;
private Configuration conf;
//getters and setters,序列化方法,getLocations()、getLength()等
}
现在我们对 InputSplit 的概念有了一些了解,我们继续看它是怎么被使用和计算出来的。
InputFormat:
通过使用 InputFormat,MapReduce 框架可以做到:
1、验证作业的输入的正确性
2、将输入文件切分成逻辑的 InputSplits,一个 InputSplit 将被分配给一个单独的 Mapper task
3、提供 RecordReader 的实现,这个 RecordReader 会从 InputSplit 中正确读出一条一条的K-V对供
Mapper 使用。
public abstract class InputFormat<K, V> {
public abstract
List<InputSplit> getSplits(JobContext context
) throws IOException, InterruptedException;
public abstract
RecordReader<K,V> createRecordReader(InputSplit split,
TaskAttemptContext context
) throws IOException,
InterruptedException;
}
上面是 InputFormat 的源码,getSplits 用来获取由输入文件计算出来的 InputSplits,我们在后面会看到
计算 InputSplits 的时候会考虑到输入文件是否可分割、文件存储时分块的大小和文件大小等因素;而
createRecordReader()提供了前面第三点所说的 RecordReader 的实现,以将 K-V 对从 InputSplit 中正确读
出来,比如 LineRecordReader就以偏移值为 key,一行的数据为 value,这就使得所有其 createRecordReader()
返回了 LineRecordReader 的 InputFormat 都是以偏移值为 key,一行数据为 value 的形式读取输入分片的。
FileInputFormat:
PathFilter 被用来进行文件筛选,这样我们就可以控制哪些文件要作为输入,哪些不作为输入。PathFilter
有一个 accept(Path)方法,当接收的 Path 要被包含进来,就返回 true,否则返回 false。可以通过设置
mapred.input.pathFilter.class 来设置用户自定义的 PathFilter。
public interface PathFilter {
boolean accept(Path path);
}
FileInputFormat 是 InputFormat 的子类,它包含了一个 MultiPathFilter,这 个 MultiPathFilter 由一个过滤
隐藏文件(名字前缀为'-'或'.')的 PathFilter 和一些可能存在的用户自定义的 PathFilters 组成,MultiPathFilter
会在 listStatus()方法中使用,而 listStatus()方法又被 getSplits()方法用来获取输入文件,也就是说实现了
在获取输入分片前先进行文件过滤。
private static class MultiPathFilter implements PathFilter {
private List<PathFilter> filters;
public MultiPathFilter(List<PathFilter> filters) {
this.filters = filters;
}
public boolean accept(Path path) {
for (PathFilter filter : filters) {
if (!filter.accept(path)) {
return false;
}
}
return true;
}
}
这些 PathFilter 会在 listStatus()方法中用到,listStatus()是用来获取输入数据列表的。
下面是 FileInputFormat 的 getSplits()方法,它首先得到分片的最小值 minSize 和最大值 maxSize,它们
会被用来计算分片大小。可以通过设置 mapred.min.split.size 和 mapred.max.split.size 来设置。splits 链表
用来存储计算得到的输入分片,files 则存储作为由 listStatus()获取的输入文件列表。然后对于每个输入
文件,判断是否可以分割,通过 computeSplitSize 计算出分片大小 splitSize,计算方法是:Math.max(minSize,
Math.min(maxSize, blockSize));也就是保证在 minSize 和 maxSize 之间,且如果
剩余57页未读,继续阅读
资源评论
tianwanch
- 粉丝: 0
- 资源: 4
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功