/**
*
*/
package com.fz.util;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer.Option;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.util.ReflectionUtils;
import com.fz.filter.keytype.DoubleArrIntWritable;
import com.fz.model.CurrentJobInfo;
import com.fz.model.RecommendData;
import com.fz.util.JarUtil;
/**
* Hadoop 工具类
*
* @author fansy
* @date 2015-5-28
*/
public class HUtils {
private static boolean RUNNINGJOBERROR = false;// 运行任务中是否出现错误
private static boolean ALLJOBSFINISHED=false; // 循环MR任务所有任务成功标志位;
static final String DOWNLOAD_EXTENSION = ".dat";
private static Configuration conf = null;
private static FileSystem fs = null;
public static boolean flag = true; // get configuration from db or file
// ,true : db,false:file
public static int JOBNUM = 1; // 一组job的个数
// 第一个job的启动时间阈值,大于此时间才是要取得的的真正监控任务
private static long jobStartTime = 0L;// 使用 System.currentTimeMillis() 获得
private static JobClient jobClient = null;
public static final String HDFSPRE= "/user/root";
public static final String LOCALPRE= "WEB-INF/classes/data";
public static final String FILTER = "/user/root/_filter";
public static final String FILTER_PREPAREVECTORS = FILTER + "/";
public static final int FILTER_PREPAREVECTORS_FILES = 4;
public static Configuration getConf() {
if (conf == null) {
conf = new Configuration();
// get configuration from db or file
conf.setBoolean("mapreduce.app-submission.cross-platform", "true"
.equals(Utils.getKey(
"mapreduce.app-submission.cross-platform", flag)));// 配置使用跨平台提交任务
conf.set("fs.defaultFS", Utils.getKey("fs.defaultFS", flag));// 指定namenode
conf.set("mapreduce.framework.name",
Utils.getKey("mapreduce.framework.name", flag)); // 指定使用yarn框架
conf.set("yarn.resourcemanager.address",
Utils.getKey("yarn.resourcemanager.address", flag)); // 指定resourcemanager
conf.set("yarn.resourcemanager.scheduler.address", Utils.getKey(
"yarn.resourcemanager.scheduler.address", flag));// 指定资源分配器
conf.set("mapreduce.jobhistory.address",
Utils.getKey("mapreduce.jobhistory.address", flag));
conf.set("mapreduce.job.jar", JarUtil.jar(JarUtil.class));
}
return conf;
}
public static FileSystem getFs() {
if (fs == null) {
try {
fs = FileSystem.get(getConf());
} catch (IOException e) {
e.printStackTrace();
}
}
return fs;
}
/**
* 返回HDFS路径
*
* @param url
* @return fs.defaultFs+url
*/
public static String getHDFSPath(String url) {
return Utils.getKey("fs.defaultFS", flag) + url;
}
/**
* 获取hdfs文件目录及其子文件夹信息
*
* @param input
* @param recursive
* @return
* @throws IOException
*/
public static String getHdfsFiles(String input, boolean recursive)
throws IOException {
RemoteIterator<LocatedFileStatus> files = getFs().listFiles(
new Path(input), recursive);
StringBuffer buff = new StringBuffer();
while (files.hasNext()) {
buff.append(files.next().getPath().toString()).append("<br>");
}
return buff.toString();
}
/**
* 根据时间来判断,然后获得Job的状态,以此来进行监控 Job的启动时间和使用system.currentTimeMillis获得的时间是一致的,
* 不存在时区不同的问题;
*
* @return
* @throws IOException
*/
public static List<CurrentJobInfo> getJobs() throws IOException {
JobStatus[] jss = getJobClient().getAllJobs();
List<CurrentJobInfo> jsList = new ArrayList<CurrentJobInfo>();
jsList.clear();
for (JobStatus js : jss) {
if (js.getStartTime() > jobStartTime) {
jsList.add(new CurrentJobInfo(getJobClient().getJob(
js.getJobID()), js.getStartTime(), js.getRunState()));
}
}
Collections.sort(jsList);
return jsList;
}
public static void printJobStatus(JobStatus js) {
System.out.println(new java.util.Date() + ":jobId:"
+ js.getJobID().toString() + ",map:" + js.getMapProgress()
+ ",reduce:" + js.getReduceProgress() + ",finish:"
+ js.getRunState());
}
/**
* @return the jobClient
*/
public static JobClient getJobClient() {
if (jobClient == null) {
try {
jobClient = new JobClient(getConf());
} catch (IOException e) {
e.printStackTrace();
}
}
return jobClient;
}
/**
* @param jobClient
* the jobClient to set
*/
public static void setJobClient(JobClient jobClient) {
HUtils.jobClient = jobClient;
}
public static long getJobStartTime() {
return jobStartTime;
}
public static void setJobStartTime(long jobStartTime) {
HUtils.jobStartTime = jobStartTime;
}
/**
* 判断一组MR任务是否完成
*
* @param currentJobInfo
* @return
*/
public static String hasFinished(CurrentJobInfo currentJobInfo) {
if (currentJobInfo != null) {
if ("SUCCEEDED".equals(currentJobInfo.getRunState())) {
return "success";
}
if ("FAILED".equals(currentJobInfo.getRunState())) {
return "fail";
}
if ("KILLED".equals(currentJobInfo.getRunState())) {
return "kill";
}
}
return "running";
}
/**
* 返回HDFS路径
*
* @param url
* @return fs.defaultFs+url
*/
public static Path getPath(String url) {
return new Path(url);
}
/**
* 获得hdfs全路径
* @param url
* @return
*/
public static String getFullHDFS(String url){
return Utils.getKey("fs.defaultFS", flag) + url;
}
/**
* 上传本地文件到HFDS
* 如果hdfs文件存在则覆盖
*
* @param localPath
* @param hdfsPath
* @return
*/
public static Map<String, Object> upload(String localPath, String hdfsPath) {
Map<String, Object> ret = new HashMap<String, Object>();
FileSystem fs = getFs();
Path src = new Path(localPath);
Path dst = new Path(hdfsPath);
ret.put("return_show", "upload_return");
try {
fs.copyFromLocalFile(src, dst);
ret.put("return_txt", localPath+"上传至"+hdfsPath+"成功");
Utils.simpleLog(localPath+"上传至"+hdfsPath+"成功");
} catch (Exception e) {
ret.put("flag", "false");
ret.put("msg", e.getMessage());
e.printStackTrace();
return ret;
}
ret.put("flag", "true");
ret.put("msg", "HFDS:'"+hdfsPath+"'");
return ret;
}
/**
* 删除HFDS文件或者文件夹
*
* @param hdfsFolder
* @return
*/
public static boolean delete(String hdfsFolder) {
try {
getFs().delete(getPath(hdfsFolder), true);
} catch (IOException e) {
e.printStackTrace();
return false;
}
return true;
}
/**
* 下载文件
* @param hdfsPath
* @param localPath
* ,本地
没有合适的资源?快使用搜索试试~ 我知道了~
HadoopWeb项目--网上商城推荐系统
共2281个文件
png:1320个
css:421个
jpg:107个
5星 · 超过95%的资源 需积分: 49 237 下载量 114 浏览量
2017-09-08
00:46:41
上传
评论 23
收藏 24.69MB RAR 举报
温馨提示
我的目的是有个网上商城,里面有很多商品,某个用户在浏览一些商品后系统根据用户的浏览记录再结合其它广大用户的偏好记录,应用基于物品的协同过滤算法,为这个用户推荐合乎情理的其它商品,目的在于加深对算法的理解及更直观的展示算法的成果
资源推荐
资源详情
资源评论
收起资源包目录
HadoopWeb项目--网上商城推荐系统 (2281个子文件)
zx.bmp 6KB
pingan.bmp 6KB
nanjing.bmp 6KB
cmbc.bmp 6KB
icbc.bmp 6KB
bj.bmp 6KB
beijingnongshang.bmp 6KB
post.bmp 6KB
cib.bmp 6KB
gf.bmp 6KB
bh.bmp 6KB
zheshang.bmp 6KB
dy.bmp 6KB
guangda.bmp 6KB
abc.bmp 6KB
shpd.bmp 6KB
sh.bmp 6KB
bc.bmp 6KB
bcc.bmp 6KB
sfz.bmp 6KB
ningbo.bmp 6KB
hx.bmp 6KB
cmb.bmp 6KB
ccb.bmp 6KB
HUtils.class 20KB
HBaseCommandService.class 19KB
HUtils.class 16KB
HadoopUtils.class 15KB
Utils.class 13KB
HBaseCommandAction.class 11KB
HdfsManagerAction.class 9KB
BaseDaoImpl.class 9KB
OrderAction.class 8KB
CloudAction.class 8KB
DBService.class 8KB
BaseDaoImpl.class 8KB
CloudAction.class 7KB
DBService.class 7KB
HdfsService.class 6KB
Utils.class 6KB
AdminProductAction.class 5KB
ProductDao.class 5KB
DBAction.class 5KB
AdminOrderAction.class 5KB
PaymentUtil.class 4KB
ProductService.class 4KB
UserAction.class 4KB
UserDao.class 3KB
JarUtil.class 3KB
JarUtil.class 3KB
CheckImgAction.class 3KB
OrderDao.class 3KB
DoubleArrIntWritable.class 3KB
CurrentJobInfo.class 3KB
CurrentJobInfo.class 3KB
AdminCategorySecondAction.class 3KB
UserService.class 3KB
hiveCommandAction.class 3KB
StringWritable.class 3KB
OrderService.class 3KB
ProductAction.class 3KB
CategorySecondDao.class 3KB
IntDoublePairWritable.class 3KB
ExchangeJob.class 3KB
CategorySecondService.class 3KB
Order.class 3KB
MailUitls.class 3KB
User.class 3KB
IndexAction.class 3KB
RecommendItembasedRunnable.class 2KB
RecommendItembasedRunnable.class 2KB
CartAction.class 2KB
HdfsRequestProperties.class 2KB
Product.class 2KB
UserAdminAction.class 2KB
AdminCategoryAction.class 2KB
BaseDAO.class 2KB
HConstants.class 2KB
HConstants.class 2KB
BaseDAO.class 2KB
HdfsResponseProperties.class 2KB
ExchangeMapper3.class 2KB
AdminUserAction.class 2KB
HBaseTable.class 2KB
CategoryDao.class 2KB
ExchangeMapper.class 2KB
HBaseTableData.class 2KB
Cart.class 2KB
PageHibernateCallback.class 2KB
PageBean.class 2KB
RecommendResult.class 2KB
CategorySecond.class 2KB
TestService.class 2KB
TestService.class 2KB
ExchangeMapper2.class 2KB
RecommendData.class 2KB
OrderItem.class 2KB
PrivilegeInterceptor.class 1KB
AdminUserDao.class 1KB
CategoryService.class 1KB
共 2281 条
- 1
- 2
- 3
- 4
- 5
- 6
- 23
Primal
- 粉丝: 75
- 资源: 10
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
前往页