package com.xdc.hadoop;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class TileInputFormat extends FileInputFormat<LongWritable, BytesWritable> {
private static final double SPLIT_SLOP=1.1;
/*
* 查询判断当前文件是否可以分块?"true"为可以分块,"false"表示不进行分块
*/
// protected boolean isSplitable(Configuration conf, Path path) {
// return true;
// }
@Override
protected boolean isSplitable(JobContext context, Path file) {
final CompressionCodec codec =
new CompressionCodecFactory(context.getConfiguration()).getCodec(file);
if (null == codec) {
return true;
}
return codec instanceof SplittableCompressionCodec;
}
//
// // 该函数其实可不写,与抽象类的实现完全相同,写出来为了方便调试
// @Override
// public List<InputSplit> getSplits(JobContext job) throws IOException {
// Stopwatch sw = new Stopwatch().start();
// long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
// long maxSize = getMaxSplitSize(job);
//
// // generate splits
// List<InputSplit> splits = new ArrayList<InputSplit>();
// List<FileStatus> files = listStatus(job);
// for (FileStatus file: files) {
// Path path = file.getPath();
// long length = file.getLen();
// if (length != 0) {
// BlockLocation[] blkLocations;
// if (file instanceof LocatedFileStatus) {
// blkLocations = ((LocatedFileStatus) file).getBlockLocations();
// } else {
// FileSystem fs = path.getFileSystem(job.getConfiguration());
// blkLocations = fs.getFileBlockLocations(file, 0, length);
// }
// if (isSplitable(job, path)) {
// long blockSize = file.getBlockSize();
// long splitSize = computeSplitSize(blockSize, minSize, maxSize);
//
// long bytesRemaining = length;
// while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
// int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// splits.add(makeSplit(path, length-bytesRemaining, splitSize,
// (blkLocations[blkIndex].getHosts(),
// blkLocations[blkIndex].getCachedHosts()));
// bytesRemaining -= splitSize;
// }
//
// if (bytesRemaining != 0) {
// int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
// splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
// blkLocations[blkIndex].getHosts(),
// blkLocations[blkIndex].getCachedHosts()));
// }
// } else { // not splitable
// splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
// blkLocations[0].getCachedHosts()));
// }
// } else {
// //Create empty hosts array for zero length files
// splits.add(makeSplit(path, 0, length, new String[0]));
// }
// }
// // Save the number of input files for metrics/loadgen
// job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
// sw.stop();
// // if (LOG.isDebugEnabled()) {
// // LOG.debug("Total # of splits generated by getSplits: " + splits.size()
// // + ", TimeTaken: " + sw.elapsedMillis());
// // }
// return splits;
// }
//
/*
* MapReduce的客户端调用此方法得到所有的分块,然后将分块发送给MapReduce服务端。
* 注意,分块中不包含实际的信息,而只是对实际信息的分块信息。具体的说,每个分块中
* 包含当前分块对应的文件路径,当前分块在该文件中起始位置,当前分块的长度以及对应的
* 实际数据所在的机器列表。在实现这个函数时,将这些信息填上即可。
* */
/*public List<InputSplit> getSplits(Configuration conf) throws IOException {
List<InputSplit> splits = new ArrayList<InputSplit>();
long minSplitSize = conf.getLong("mapred.min.split.size",1);
long maxSplitSize = conf.getLong("mapred.max.split.size", 1);
long blockSize = conf.getLong("dfs.block.size",1);
long splitSize = Math.max(minSplitSize, Math.min(maxSplitSize, blockSize));
FileSystem fs = FileSystem.get(conf);
String path = conf.get(INPUT_DIR);
FileStatus[] files = fs.listStatus(new Path(path));
for (int fileIndex = 0; fileIndex < files.length; fileIndex++) {
FileStatus file = files[fileIndex];
System.out.println("input file: " + file.getPath().toString());
long length = file.getLen();
FileSystem fsin = file.getPath().getFileSystem(conf);
BlockLocation[] blkLocations = fsin.getFileBlockLocations(file, 0, length);
if ((length != 0) && isSplitable(conf, file.getPath())) {
long bytesRemaining = length;
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
splits.add(new FileSplit(file.getPath(), length-bytesRemaining, splitSize,
blkLocations[blkIndex].getHosts()));
bytesRemaining -= splitSize;
}
if (bytesRemaining != 0) {
splits.add(new FileSplit(file.getPath(), length-bytesRemaining, bytesRemaining,
blkLocations[blkLocations.length-1].getHosts()));
}
} else if (length != 0) {
splits.add(new FileSplit(file.getPath(), 0, length, blkLocations[0].getHosts()));
} else {
//Create empty hosts array for zero length files
splits.add(new FileSplit(file.getPath(), 0, length, new String[0]));
}
}
return splits;
}*/
/*
* 类RecordReader是用来创建传给map函数的Key-Value序列,传给此类的参数有两个:一个分块(split)和作业的配置信息(context).
* 在Mapper的run函数中可以看到MapReduce框架执行Map的逻辑:
* public void run(Context context) throws IOException, InterruptedException {
* setup(context);
* 调用RecordReader方法的nextKeyValue,生成新的键值对。如果当前分块(Split)中已经处理完毕了,则nextKeyValue会返回false.退出run函数
* while (context.nextKeyValue()) {
* map(context.getCurrentKey(), context.getCurrentValue(), context);
* }
* cleanup(context);
* }
**/
@Override
public RecordReader<LongWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
TileRecordReader reader = new TileRecordReader();
reader.initialize(split,context);
return reader;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
基于Spark实现的可视域分析算法+源代码+文档说明
![preview](https://csdnimg.cn/release/downloadcmsfe/public/img/white-bg.ca8570fa.png)
共8个文件
java:4个
scala:2个
xml:1个
![preview-icon](https://csdnimg.cn/release/downloadcmsfe/public/img/scale.ab9e0183.png)
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 159 浏览量
2024-03-20
11:10:59
上传
评论
收藏 27KB ZIP 举报
温馨提示
- 不懂运行,下载完可以私聊问,可远程教学 该资源内项目源码是个人的毕设,代码都测试ok,都是运行成功后才上传资源,答辩评审平均分达到96分,放心下载使用! <项目介绍> 1、该资源内项目代码都经过测试运行成功,功能ok的情况下才上传的,请放心下载使用! 2、本项目适合计算机相关专业(如计科、人工智能、通信工程、自动化、电子信息等)的在校学生、老师或者企业员工下载学习,也适合小白学习进阶,当然也可作为毕设项目、课程设计、作业、项目初期立项演示等。 3、如果基础还行,也可在此代码基础上进行修改,以实现其他功能,也可用于毕设、课设、作业等。 下载后请首先打开README.md文件(如有),仅供学习参考, 切勿用于商业用途。 --------
资源推荐
资源详情
资源评论
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
![7z](https://img-home.csdnimg.cn/images/20210720083312.png)
![txt](https://img-home.csdnimg.cn/images/20210720083642.png)
![zip](https://img-home.csdnimg.cn/images/20210720083736.png)
收起资源包目录
![package](https://csdnimg.cn/release/downloadcmsfe/public/img/package.f3fc750b.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![folder](https://csdnimg.cn/release/downloadcmsfe/public/img/folder.005fa2e5.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
![file-type](https://csdnimg.cn/release/download/static_files/pc/images/minetype/UNKNOWN.png)
共 8 条
- 1
资源评论
![avatar-default](https://csdnimg.cn/release/downloadcmsfe/public/img/lazyLogo2.1882d7f4.png)
![avatar](https://profile-avatar.csdnimg.cn/a3ff7c83b4464a7a89efb22831515060_abc6838.jpg!1)
机器学习的喵
- 粉丝: 1174
- 资源: 1468
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助
![voice](https://csdnimg.cn/release/downloadcmsfe/public/img/voice.245cc511.png)
![center-task](https://csdnimg.cn/release/downloadcmsfe/public/img/center-task.c2eda91a.png)
安全验证
文档复制为VIP权益,开通VIP直接复制
![dialog-icon](https://csdnimg.cn/release/downloadcmsfe/public/img/green-success.6a4acb44.png)