/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.QueueACL;
import static org.apache.hadoop.mapred.QueueManager.toFullPropertyName;
import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.split.JobSplitWriter;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.base.Charsets;
/**
* 添加获取JobID的方法
* @author fansy
* @date 2015-2-13
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobSubmitter {
protected static final Log LOG = LogFactory.getLog(JobSubmitter.class);
private static final String SHUFFLE_KEYGEN_ALGORITHM = "HmacSHA1";
private static final int SHUFFLE_KEY_LENGTH = 64;
private FileSystem jtFs;
private ClientProtocol submitClient;
private String submitHostName;
private String submitHostAddress;
// 获取JobID
private static JobID jobId;
/**
* 获取JobID
* @return
*/
public static JobID getJobId(){
while(jobId==null){
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
} // 如果没有,则继续等待
}
return jobId;
}
/**
* 初始化JobID
* @return
*/
public static boolean initialJobId(){
try{
jobId=null;
}catch(Exception e){
e.printStackTrace();
return false;
}
return true;
}
JobSubmitter(FileSystem submitFs, ClientProtocol submitClient)
throws IOException {
this.submitClient = submitClient;
this.jtFs = submitFs;
}
/*
* see if two file systems are the same or not.
*/
private boolean compareFs(FileSystem srcFs, FileSystem destFs) {
URI srcUri = srcFs.getUri();
URI dstUri = destFs.getUri();
if (srcUri.getScheme() == null) {
return false;
}
if (!srcUri.getScheme().equals(dstUri.getScheme())) {
return false;
}
String srcHost = srcUri.getHost();
String dstHost = dstUri.getHost();
if ((srcHost != null) && (dstHost != null)) {
try {
srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
} catch(UnknownHostException ue) {
return false;
}
if (!srcHost.equals(dstHost)) {
return false;
}
} else if (srcHost == null && dstHost != null) {
return false;
} else if (srcHost != null && dstHost == null) {
return false;
}
//check for ports
if (srcUri.getPort() != dstUri.getPort()) {
return false;
}
return true;
}
// copies a file to the jobtracker filesystem and returns the path where it
// was copied to
private Path copyRemoteFiles(Path parentDir,
Path originalPath, Configuration conf, short replication)
throws IOException {
//check if we do not need to copy the files
// is jt using the same file system.
// just checking for uri strings... doing no dns lookups
// to see if the filesystems are the same. This is not optimal.
// but avoids name resolution.
FileSystem remoteFs = null;
remoteFs = originalPath.getFileSystem(conf);
if (compareFs(remoteFs, jtFs)) {
return originalPath;
}
// this might have name collisions. copy will throw an exception
//parse the original path to create new path
Path newPath = new Path(parentDir, originalPath.getName());
FileUtil.copy(remoteFs, originalPath, jtFs, newPath, false, conf);
jtFs.setReplication(newPath, replication);
return newPath;
}
// configures -files, -libjars and -archives.
private void copyAndConfigureFiles(Job job, Path submitJobDir,
short replication) throws IOException {
Configuration conf = job.getConfiguration();
if (!(conf.getBoolean(Job.USED_GENERIC_PARSER, false))) {
LOG.warn("Hadoop command-line option parsing not performed. " +
"Implement the Tool interface and execute your application " +
"with ToolRunner to remedy this.");
}
// get all the command line arguments passed in by the user conf
String files = conf.get("tmpfiles");
String libjars = conf.get("tmpjars");
String archives = conf.get("tmparchives");
String jobJar = job.getJar();
//
// Figure out what fs the JobTracker is using. Copy the
// job to it, under a temporary name. This allows DFS to work,
// and under the local fs also provides UNIX-like object loading
// semantics. (that is, if the job file is deleted right after
// submission, we can still run the submission to completion)
//
// Create a number of filenames in the JobTracker's fs namespace
LOG.debug("default FileSystem: " + jtFs.getUri());
if (jtFs.exists(submitJobDir)) {
throw new IOException("Not submitting job. Job directory " + submitJobDir
+" already exists!! This is unexpected.Please check what's there in" +
" that directory");
}
submitJobDir = jtFs.makeQualified(submitJobDir);
submitJobDir = new Path(submitJobDir.toUri().getPath());
FsPermission mapredSysPerms = new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jtFs, submitJobDir, mapredSysPerms);
Path filesDir = JobSubmissionFiles.getJobDistCacheFiles(submitJobDir)
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
Mahout案例实战 约会推荐 (508个子文件)
rec.js.bak 9KB
JobSubmitter.class 27KB
Utils.class 20KB
UserService.class 12KB
CloudAction.class 12KB
BaseDAOImpl.class 9KB
RecReducer.class 6KB
TopReducer.class 6KB
TreeNodeService.class 6KB
TestService.class 5KB
RecMapper.class 4KB
CurrentJobInfo.class 4KB
CatalogAction.class 4KB
TopDriver.class 4KB
UserAction.class 4KB
CFThread.class 4KB
RecDriver.class 3KB
UserRec.class 3KB
TopCombiner.class 3KB
TestAction.class 3KB
TreeNode.class 3KB
TopMapper.class 3KB
LoginAction.class 3KB
RecommenderDriverTest.class 3KB
TransData.class 3KB
UserItemPrefVO.class 3KB
UserAndSimilarity.class 3KB
ItemPref.class 2KB
TopDriverTest.class 2KB
BaseDAOTest.class 2KB
Top10Thread.class 2KB
BaseDAO.class 2KB
UserItemPref.class 2KB
UserTopRec.class 2KB
AdminUserServiceImpl.class 2KB
User.class 2KB
NewRecThread.class 2KB
UserAndUserRec.class 2KB
RecReducer$1.class 2KB
Menu.class 2KB
TopReducer$1.class 2KB
JobSubmitter$SplitComparator.class 2KB
UserRating.class 2KB
RecommendRating.class 2KB
JobSubmitter$1.class 2KB
AdminUser.class 1KB
SpringUtil.class 937B
ImportUserThread.class 784B
AdminUserService.class 488B
.classpath 8KB
org.eclipse.wst.common.component 652B
org.eclipse.wst.jsdt.ui.superType.container 49B
easyui.css 48KB
easyui.css 48KB
easyui.css 48KB
easyui.css 47KB
easyui.css 44KB
tabs.css 8KB
tabs.css 8KB
tabs.css 8KB
tabs.css 8KB
tabs.css 6KB
datagrid.css 5KB
datagrid.css 5KB
datagrid.css 5KB
datagrid.css 5KB
datagrid.css 5KB
linkbutton.css 4KB
linkbutton.css 4KB
linkbutton.css 4KB
linkbutton.css 4KB
linkbutton.css 4KB
calendar.css 4KB
calendar.css 4KB
calendar.css 4KB
calendar.css 4KB
calendar.css 4KB
tree.css 3KB
tree.css 3KB
tree.css 3KB
tree.css 3KB
tree.css 3KB
panel.css 3KB
panel.css 3KB
panel.css 3KB
panel.css 3KB
panel.css 2KB
icon.css 2KB
menu.css 2KB
menu.css 2KB
menu.css 2KB
menu.css 2KB
menu.css 2KB
window.css 2KB
window.css 2KB
window.css 2KB
window.css 2KB
tooltip.css 2KB
tooltip.css 2KB
tooltip.css 2KB
共 508 条
- 1
- 2
- 3
- 4
- 5
- 6
fansy1990
- 粉丝: 1668
- 资源: 60
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
- 4
前往页