import java.io.*;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
public class getMerge {
/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
writeInDadoop();
//readFromDadoop();
}
public static void writeInDadoop() throws IOException
{
Configuration conf = new Configuration();
FileSystem hdfs = FileSystem.get(conf);
FileSystem local = FileSystem.getLocal(conf);
Path inputDir = new Path("H:\\KuGou\\");
String hdfsStr = "hdfs://202.121.180.230:9100/user/";
try{
FileStatus[] inputFiles = local.listStatus(inputDir);
for(int i=0;i<inputFiles.length;i++)
{
if(inputFiles[i].isFile())
{
String fileName = inputFiles[i].getPath().getName();
System.out.println(fileName);
Path hdfsFile = new Path(hdfsStr+fileName);
FSDataOutputStream out = hdfs.create(hdfsFile);
FSDataInputStream in = local.open(inputFiles[i].getPath());
byte buffer[] = new byte[256];
int bytesRead = 0;
while((bytesRead = in.read(buffer))>0)
{
out.write(buffer, 0, bytesRead);
}
in.close();
out.close();
}
}
}catch(IOException e){
e.printStackTrace();
}
}
public static void readFromDadoop() throws IOException
{
// 加载配置文件
Configuration conf = new Configuration();
conf.addResource("core-site.xml");
HdfsCommon common = new HdfsCommon();
String fileName = "hdfs://202.121.180.230:9100/user/";
String dest = "H:\\kugoutemp\\";
// 获取HDFS文件系统
FileSystem fs = FileSystem.get(URI.create(fileName), conf);
Path fsPath = new Path(fileName);
FileStatus[] inputFiles = fs.listStatus(fsPath);
for(int i=0;i<inputFiles.length;i++)
{
String file = inputFiles[i].getPath().getName();
System.out.println(file);
common.downFile(fileName+file, dest+file);
}
}
// /**
// * 删除HDFS文件
// *
// * @param path
// * 删除文件路径
// * @throws IOException
// * IO异常
// */
// public static void deleteHdfsFile(String path) throws IOException {
//
// // 加载配置文件
// Configuration conf = new Configuration();
// conf.addResource("conf/core-site.xml");
//
// // 获取要删除的文件
// Path delefPath = new Path(path);
// FileSystem hdfs = delefPath.getFileSystem(conf);
// boolean isDeleted = false;
//
// // 检查文件是否存在,若存在,递归删除
// if (hdfs.exists(delefPath)) {
// isDeleted = hdfs.delete(delefPath, true);
// // 递归删除
// } else {
// isDeleted = false;
// System.out.println("文件不存在:删除失败");
// }
// System.out.println("Delete?" + isDeleted);
// }
//
// /**
// * 将本地文件上传至HDFS
// *
// * @param local
// * 本地路径
// * @param hdfs
// * hdfs路径
// * @throws IOException
// * IO异常
// */
// public static void uploadToHdfs(String local, String hdfs)
// throws IOException {
//
// // 加载配置文件
// Configuration config = new Configuration();
// config.addResource("conf/core-site.xml");
//
// // 获取HDFS文件系统
// FileSystem fs = FileSystem.get(URI.create(hdfs), config);
//
// // 读取本地文件
// FileInputStream fis = new FileInputStream(new File(local));
// OutputStream os = fs.create(new Path(hdfs));
// // 复制
// IOUtils.copyBytes(fis, os, 4096, true);
//
// os.close();
// fis.close();
//
// System.out.println("拷贝完成...");
// }
//
// /**
// * 读取HDFS文件
// *
// * @param fileName
// * 源文件路径
// * @param dest
// * 写入文件路径
// * @throws IOException
// */
// public static void readFromHdfs(String fileName, String dest)
// throws IOException {
// // 加载配置文件
// Configuration conf = new Configuration();
// conf.addResource("conf/core-site.xml");
//
// // 获取HDFS文件系统
// FileSystem fs = FileSystem.get(URI.create(fileName), conf);
//
// // 打开文件流
// FSDataInputStream hdfsInStream = fs.open(new Path(fileName));
//
// // 写入本地文件系统
// OutputStream out = new FileOutputStream(dest);
//
// byte[] ioBuffer = new byte[1024];
//
// // 按行读取
// int readLen = hdfsInStream.read(ioBuffer);
//
// while (-1 != readLen) {
//
// out.write(ioBuffer, 0, readLen);
// System.out.println(new String(ioBuffer));
// readLen = hdfsInStream.read(ioBuffer);
//
// }
//
// out.close();
//
// hdfsInStream.close();
//
// fs.close();
// }
//
// /**
// * 列出HDFS目录
// *
// * @param path
// * 路径
// * @throws IOException
// */
// public static void getDirectoryFromHdfs(String path) throws IOException {
//
// // 加载配置文件
// Configuration conf = new Configuration();
// conf.addResource("conf/core-site.xml");
//
// // 获取HDFS文件系统
// FileSystem fs = FileSystem.get(URI.create(path), conf);
//
// // 获取指定路径下的文件
// FileStatus fileList[] = fs.listStatus(new Path(path));
//
// int size = fileList.length;
//
// // 循环输出文件
// for (int i = 0; i < size; i++) {
//
// System.out.println("name:" + fileList.getPath().getName()
// + "\t\tsize:" + fileList.getLen());
//
// }
//
// fs.close();
//
// }
}