package dfs;
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;
/**
*
* HDFS文件操作类,用于HDFS文件系统常规操作
*
* @author 马风新
* @version [版本号, 2012-08-02]
* @see [相关类/方法]
* @since version 1.0
*/
public class HDFSOperation {
private Configuration conf;
private FileSystem fs;
/*
* 初始化配置文件和得到FileSystem一个实例
*
*/
public HDFSOperation() throws IOException{
conf = new Configuration();
fs = FileSystem.get(conf);
}
/*
* 文件对象的上传操作
* @param in 上传文件流
* @param hdfsPath 指定的上传文件在Hadoop中的路径
* @return 文件是否上传成功
* @throws Exception
*/
public boolean upLoad(InputStream in, String hdfsPath){
Path p = new Path(hdfsPath);
try{
if(fs.exists(p)){
System.out.println("文件已经存在");
return false;
}
//上传打印进程情况
Progressable progress = new Progressable(){
public void progress() {
System.out.print(".");
}
};
FSDataOutputStream out = fs.create(p,progress);
IOUtils.copyBytes(in, out, conf);
byte[] buffer = new byte[400];
int length = 0;
while ((length = in.read(buffer)) > 0) {
out.write(buffer, 0, length);
}
out.flush();
out.close();
in.close();
}catch(Exception e){
e.printStackTrace();
}
return true;
}
/*
* 文件对象的下载操作
* @param localPath 本地文件保存路径
* @param hdfsPath 指定的上传文件在Hadoop中的路径
* @return 文件是否下载成功
*/
public boolean downLoad(String hdfsPath,String localPath ){
Path path = new Path(hdfsPath);
try {
if(!fs.exists(path)){
System.out.println("您所输入的文件不存在!");
return false;
}
FSDataInputStream in = fs.open(path);
OutputStream out = new FileOutputStream(localPath);
byte[] buffer = new byte[400];
int length = 0;
while((length = in.read(buffer))>0){
out.write(buffer, 0, length);
}
} catch (Exception e) {
e.printStackTrace();
return false;
}
return true;
}
/*
* 文件对象的删除操作
* @param hdfsPath 指定的上传文件在Hadoop中的路径
* @return 文件是否删除成功
* @throws IOException IO异常
*/
public boolean deletePath(String hdfsPath){
try {
fs.delete(new Path(hdfsPath), true);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 取得指定目录下的文件和文件夹列表
* @param hdfsPath 指定的上传文件在Hadoop中的路径
* @return 文件列表
* @throws IOException IO异常
*/
public ArrayList<FileBean> getFileList(String hdfsPath){
Path path = new Path(hdfsPath);
ArrayList<FileBean> fileList = new ArrayList<FileBean>();
FileStatus[] status;
try {
status = fs.listStatus(path);
for(FileStatus fs : status){
fileList.add(new FileBean(fs));
}
} catch (Exception e) {
e.printStackTrace();
}
return fileList;
}
}
评论28
最新资源