/*
*
* Copyright 2020 byai.com All right reserved. This software is the
* confidential and proprietary information of byai.com ("Confidential
* Information"). You shall not disclose such Confidential Information and shall
* use it only in accordance with the terms of the license agreement you entered
* into with byai.com.
* /
*/
package com.byai.bigdata.hdfs;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.SparkSession;
import java.io.IOException;
/**
* @description:
* @author: chongou
* @createDate: 2020/9/24
*/
public class CombineSmallFiles {
//每个block的大小
private static int blockSize = 268435456;
//超过10个小文件,进行压缩
private static int minBlock = 10;
public static void main(String[] args) throws IOException {
if(args.length < 2) {
throw new RuntimeException("参数不符合规则");
}
SparkSession ss = SparkSession
.builder()
.appName("BT_OPS_CombineSmallFiles")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", blockSize)
.enableHiveSupport()
.getOrCreate();
FileSystem hdfs = FileSystem.get(ss.sparkContext().hadoopConfiguration());
String srcPath = args[0];
String type = args[1];
String compress = null;
if(args.length ==3 ) {
compress = args[2];
}
searchFiles(ss, hdfs, srcPath, type, compress);
ss.stop();
}
private static void searchFiles(SparkSession ss, FileSystem hdfs, String srcStr, String fileType, String compress) throws IOException {
Path srcPath = new Path(srcStr);
FileStatus[] fileStatus = hdfs.listStatus(srcPath);
int smallFileCount = 0;
long smallFileSize = 0;
for (FileStatus file : fileStatus) {
// 对大小小于128M的文件进行标记
if (!file.isDirectory() && !file.getPath().getName().startsWith(".") && !file.getPath().getName().endsWith(".combine")
&& (hdfs.getContentSummary(file.getPath()).getLength() < blockSize)) {
if (!file.getPath().getName().endsWith(".small")) {
smallFileSize = smallFileSize + hdfs.getContentSummary(file.getPath()).getLength();
hdfs.rename(file.getPath(), new Path(file.getPath() + ".small"));
}
smallFileCount += 1;
} else if (file.isDirectory() && !file.getPath().getName().startsWith(".")) {
searchFiles(ss, hdfs, file.getPath().toUri().getPath(), fileType, compress);
}
}
if (smallFileCount > minBlock) {
combineSmallFile(ss, hdfs, srcPath, fileType, smallFileSize, compress);
}
}
private static void combineSmallFile(SparkSession ss, FileSystem hdfs, Path srcPath, String fileType, long smallFileSize, String compress) throws IOException {
String srcStr = srcPath.toUri().getPath();
String combineStr = srcStr + "/.combine";
//如果因为程序中断导致combine遗留合并后的文件,则移动后清除
moveCombineFileAndRemove(hdfs, srcStr, combineStr);
if(null != compress && "gzip".equals(compress)) {
ss.conf().set("mapreduce.output.fileoutputformat.compress", "true");
ss.conf().set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
ss.conf().set("mapreduce.map.output.compress", "true");
ss.conf().set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.GzipCodec");
}
//满256M一个分区
Long partitions = smallFileSize%blockSize ==0 ? (smallFileSize/blockSize) : (smallFileSize/blockSize) + 1;
// Step 2:获取目录下.small的文件,读取写入临时目录并生成文件.combine
ss
.read()
.format(fileType)
.load(srcStr + "/*.small")
.repartition(partitions.intValue())
.write()
.format(fileType)
.save(combineStr);
// Step3:删除.small -> mv .combine下文件到源目录 -> 删除.combine
for (FileStatus file : hdfs.listStatus(srcPath)) {
// 对.small结尾的文件清除
if (!file.isDirectory() && file.getPath().getName().endsWith(".small")) {
hdfs.delete(file.getPath(), true);
}
}
moveCombineFileAndRemove(hdfs, srcStr, combineStr);
}
private static void moveCombineFileAndRemove(FileSystem hdfs, String srcStr, String combineStr) throws IOException {
Path combinePath = new Path(combineStr);
if (!hdfs.exists(combinePath))
return;
for (FileStatus combineFile : hdfs.listStatus(combinePath)) {
if (combineFile.getPath().getName().startsWith("part-")) {
hdfs.rename(combineFile.getPath(), new Path(srcStr + "/" + combineFile.getPath().getName() + ".combine"));
hdfs.deleteOnExit(combineFile.getPath());
}
}
hdfs.delete(new Path(combineStr), true);
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
smallFileMerge.zip (10个子文件)
smallFileMerge
src
test
java
com
byai
bigdata
hdfs
AppTest.java 293B
main
java
com
byai
bigdata
hdfs
App.java 323B
CombineSmallFiles.java 5KB
.idea
misc.xml 513B
compiler.xml 535B
workspace.xml 4KB
encodings.xml 186B
.gitignore 176B
jarRepositories.xml 1KB
pom.xml 3KB
共 10 条
- 1
资源评论
慕慕老师
- 粉丝: 0
- 资源: 5
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功