package cn.cntv.transcoder;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.concurrent.Callable;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
enum TRANSCODE_ERROR_CODE {
SUCCESS("SUCCESS", 0),
OPEN_LOG_FILE_FAIL("OPEN_LOG_FILE_FAIL", 1),
CLEAR_LOCAL_TEMP_PATH_FAIL("CLEAR_LOCAL_TEMP_PATH_FAIL", 2),
CREATE_WORK_PATH_ON_HADOOP_FAIL("CREATE_WORK_PATH_ON_HADOOP_FAIL", 3),
SPLIT_VIDEO_FAIL("SPLIT_VIDEO_FAIL", 4),
COPY_FILE_TO_HADOOP_FAIL("COPY_FILE_TO_HADOOP_FAIL", 5),
TRANSCODE_ON_HADOOP_FAIL("TRANSCODE_ON_HADOOP_FAIL", 6),
COPY_FILE_TO_LOCAL_FAIL("COPY_FILE_TO_LOCAL_FAIL", 7),
ASSEMBLE_VIDEO_FAIL("ASSEMBLE_VIDEO_FAIL", 8),
RENAME_OUTPUT_FILE_FAIL("RENAME_OUTPUT_FILE_FAIL", 9),
ENABLE_DTS_FAIL("ENABLE_DTS_FAIL", 10),
MOVE_TO_OUTPUT_PATH_FAIL("MOVE_TO_OUTPUT_PATH_FAIL", 11);
// 成员变量
private String name;
private int index;
// 构造方法
private TRANSCODE_ERROR_CODE(String name, int index) {
this.name = name;
this.index = index;
}
public int getIndex() {
return index;
}
public String getName() {
return name;
}
}
public class TranscodeTask implements Callable<String> {
private String inputPath;
// original file name, may contain special characters.
private String originFileName;
// currently processing file name.
private String procesfileName;
private String outputPath;
private String indexPath;
private String splitPath;
private String transPath;
private String dtshd_path;
private String parameter;
private String output_filename;
private String outformat;
private String username;
private static int taskcount = 0;
private final int taskid = taskcount++;
private static ReentrantLock local_tx_lock = new ReentrantLock();
private static ReentrantLock local_rx_lock = new ReentrantLock();
private static ReentrantLock hadoop_lock = new ReentrantLock();
synchronized private void print(String msg) {
System.out.print(msg);
}
synchronized private void println(String msg) {
System.out.println(msg);
}
public static boolean renameFile(String path, String oldname, String newname) {
if (!oldname.equals(newname)) {
// 新的文件名和以前文件名不同时,才有必要进行重命名
File oldfile = new File(path + oldname);
File newfile = new File(path + newname);
if (!oldfile.exists()) {
return false;
// 重命名文件不存在
}
if (newfile.exists()) {
// 若在该目录下已经有一个文件和新文件名相同,则不允许重命名
return false;
} else {
oldfile.renameTo(newfile);
}
} else {
return false;
}
return true;
}
/**
* @param dir
* The directory that need to be deleted.
* @return boolean Returns "true" if all deletions were successful.
*/
private static boolean deleteDir(File dir) {
if (dir.isDirectory()) {
String[] children = dir.list();
for (int i = 0; i < children.length; i++) {
boolean success = deleteDir(new File(dir, children[i]));
if (!success) {
return false;
}
}
}
return dir.delete();
}
public static String getLocalIP() {
StringBuilder sb = new StringBuilder();
try {
Enumeration<NetworkInterface> en = NetworkInterface.getNetworkInterfaces();
while (en.hasMoreElements()) {
NetworkInterface intf = (NetworkInterface) en.nextElement();
Enumeration<InetAddress> enumIpAddr = intf.getInetAddresses();
while (enumIpAddr.hasMoreElements()) {
InetAddress inetAddress = (InetAddress) enumIpAddr.nextElement();
if (!inetAddress.isLoopbackAddress() && !inetAddress.isLinkLocalAddress()
&& inetAddress.isSiteLocalAddress()) {
sb.append(inetAddress.getHostAddress().toString() + "\n");
}
}
}
} catch (SocketException e) {
}
return sb.toString();
}
private static boolean clearDir(File dir) {
if (dir.isDirectory()) {
String[] children = dir.list();
for (int i = 0; i < children.length; i++) {
boolean success = deleteDir(new File(dir, children[i]));
if (!success) {
return false;
}
}
}
return true;
}
public static FilenameFilter filter(final String regex) {
return new FilenameFilter() {
private Pattern pattern = Pattern.compile(regex);
public boolean accept(File dir, String name) {
return pattern.matcher(name).matches();
}
};
}
public int callexec(Runtime rt, String[] command) {
return this.callexec(rt, command, null);
}
public int callexec(Runtime rt, String[] command, OutputStream outputStream) {
Process process = null;
int result = -1;
try {
process = rt.exec(command);
// 启用StreamGobbler线程清理错误流和输入流 防止IO阻塞
new StreamGobbler(process.getErrorStream(), "ERROR", outputStream).start();
new StreamGobbler(process.getInputStream(), "INPUT", outputStream).start();
result = process.waitFor();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (process != null && result != 0) {
process.destroy();
}
}
return result;
}
public int callexec(Runtime rt, String command, OutputStream outputStream) {
Process process = null;
int result = -1;
try {
process = rt.exec(command);
// 启用StreamGobbler线程清理错误流和输入流 防止IO阻塞
new StreamGobbler(process.getErrorStream(), "ERROR", outputStream).start();
new StreamGobbler(process.getInputStream(), "INPUT", outputStream).start();
result = process.waitFor();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (process != null && result != 0) {
process.destroy();
}
}
return result;
}
public int callexec(Runtime rt, String command) {
return this.callexec(rt, command, null);
}
public TranscodeTask(String inputPath, String fileName, String outputPath, String indexPath, String splitPath,
String transPath, String dtshd_path, String parameter, String output_filename, String outformat,
String username) {
this.inputPath = inputPath;
this.originFileName = fileName;
String filetype = fileName.substring(fileName.lastIndexOf("."), fileName.length());
this.procesfileName = "Transcoding_" + TranscodeTask.getLocalIP().trim() + "_" + taskid + filetype;
this.outputPath = outputPath;
this.indexPath = indexPath;
this.splitPath = splitPath;
this.transPath = transPath;
this.dtshd_path = dtshd_path;
this.parameter = parameter;
this.output_filename = output_filename;
this.outformat = outformat;
this.username = username;
}
public String call() {
int code = -1;
boolean flag = false;
try {
flag = TranscodeTask.renameFile(this.inputPath, this.originFileName, this.procesfileName);
if (flag == false) {
return this.originFileName;
// fail,can not rename the original video file.
}
code = transcode();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
flag = TranscodeTask.renameFile(this.inputPath, this.procesfileName, this.originFileName);
if (flag == false){
return this.originFileName;
// fail,can not rename back the video file in input path
}
}
return code == 0 ? "" : this.originFileName;
}
private boolean clear_local_path() {
clearDir(new File(indexPath));
clearDir(new File(splitPath));
return true;
}
private boolean clear_cluster() throws IOException, InterruptedException {
String hadoop = "/opt/hadoop/hadoop-2.7.1/bin/hadoop ";
String command = null;
Runtime rt = Runtime.getRuntime();
int exit = 0;
command = hadoop +