package wang.oo0oo.wordcount.service;
import wang.oo0oo.wordcount.GlobalConfig;
import wang.oo0oo.wordcount.dao.TaskDAO;
import wang.oo0oo.wordcount.hadoop.HdfsUtil;
import wang.oo0oo.wordcount.hadoop.WordCount;
import wang.oo0oo.wordcount.pojo.Task;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.*;
public class TaskService {
public static final String HADOOP_IP = "bigdata";
public static final String HDFS_PORT = "9000";
public static final String HDFS_PROJECT_DIRECTORY_PATH = "/user/root/word_count_files";
private HdfsUtil hdfsUtil = new HdfsUtil();
private TaskDAO taskDAO = new TaskDAO();
private Map<String, Integer> resultMap = new LinkedHashMap<>();
/**
* 【核心】
*/
public Map<String, Integer> createWordCountJob(File folder) {
if (folder == null || GlobalConfig.loginUser == null || !folder.exists() || !folder.isDirectory()) {
resultMap.put("系统或参数出错", 0);
return resultMap;
}
Integer taskId = GlobalConfig.numberOfSubmittedTasksList.get(GlobalConfig.loginUser.getUserName()) + 1;
//这个是任务对象,等会儿写数据库用的
Task task = new Task();
// SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
task.setCommitUserName(GlobalConfig.loginUser.getUserName());
task.setCommitTaskId(taskId);
task.setCommitTime(new Timestamp(System.currentTimeMillis()));
//创建hdfs文件夹
String hdfsUserHomePath = String.format("hdfs://%s:%s/%s/%s",
HADOOP_IP, HDFS_PORT, HDFS_PROJECT_DIRECTORY_PATH, GlobalConfig.loginUser.getUserName());
String hdfsUserTaskPath = String.format("%s/%s", hdfsUserHomePath, taskId);
try {
if (!hdfsUtil.fileExists(hdfsUserTaskPath)) {
if (!hdfsUtil.mkdir(hdfsUserTaskPath)) {
System.out.println("[HDFS]警告:创建文件夹 " + hdfsUserTaskPath + " 失败");
task.setTaskStatus("创建HDFS任务文件夹失败");
taskDAO.add(task);
resultMap.put("创建HDFS任务文件夹失败", 0);
return resultMap;
}
}
} catch (IOException e) {
System.out.println("[HDFS]警告:HDFS出错!");
task.setTaskStatus("HDFS出错");
taskDAO.add(task);
resultMap.put("HDFS出错", 0);
return resultMap;
}
//遍历目录中的文件
String[] names = folder.list();
String folderPath = folder.getAbsolutePath();
for (String name : Objects.requireNonNull(names)) {
String hdfsFilePath = String.format("%s/%s", hdfsUserTaskPath, name);
//传文件上去
boolean uploadStatus = hdfsUtil.upload(String.format("%s/%s", folder.getAbsolutePath(), name), hdfsFilePath);
if (!uploadStatus) {
resultMap.put("HDFS文件上传失败", 0);
return resultMap;
}
}
//创建wordcount任务
String[] args = new String[2];
args[0] = hdfsUserTaskPath;
args[1] = hdfsUserTaskPath + "/output";
//等待任务结束
boolean taskStatus = false;
try {
taskStatus = WordCount.doWordCountJob(args);
} catch (Exception e) {
System.out.println("[MapReduce]任务执行异常!");
task.setTaskStatus("MapReduce任务执行异常");
taskDAO.add(task);
}
//获取任务结果
if (taskStatus) {
System.out.println("[Service]任务执行成功");
} else {
System.out.println("[MapReduce]任务执行失败!");
task.setTaskStatus("MapReduce任务执行失败");
taskDAO.add(task);
resultMap.put("MapReduce任务执行失败", 0);
return resultMap;
}
//TODO:输出结果(现在只能输出part-r-0000)
String localOutputDirectoryPath = String.format("%s/output", folder.getAbsolutePath());
File localOutputDirectory = new File(localOutputDirectoryPath);
localOutputDirectory.mkdir();
String localOutputFilePath = String.format("%s/part-r-00000.txt", localOutputDirectoryPath);
String hdfsOutputFilePath = String.format("%s/output/part-r-00000", hdfsUserTaskPath);
if (hdfsUtil.download(localOutputFilePath, hdfsOutputFilePath)) {
File localOutputFile = new File(localOutputFilePath);
try (FileReader reader = new FileReader(localOutputFile);
BufferedReader br = new BufferedReader(reader)
) {
String line;
while ((line = br.readLine()) != null) {
if (!"".equals(line)) {
String[] splitResultLine = line.split("\t");
resultMap.put(splitResultLine[0], Integer.parseInt(splitResultLine[1]));
}
}
} catch (IOException e) {
e.printStackTrace();
}
task.setFinishTime(new Timestamp(System.currentTimeMillis()));
task.setTaskStatus("FINISHED");
taskDAO.add(task);
hdfsUtil.closeClient();
GlobalConfig.numberOfSubmittedTasksList.put(GlobalConfig.loginUser.getUserName(), taskId + 1);
return resultMap;
}
resultMap.put("未获取到输出信息", 0);
return resultMap;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
基于Hadoop 2.7.7 的中文词频统计工具(JavaWeb).zip (37个子文件)
word-count-master
mvnw.cmd 6KB
pom.xml 4KB
src
test
java
wang
oo0oo
wordcount
WordCountApplicationTests.java 340B
main
resources
stopword.dic 129B
core-site.xml 1KB
IKAnalyzer.cfg.xml 404B
application.properties 204B
MyDic.dic 19B
templates
main.html 3KB
common
header.html 291B
index.html 2KB
log4j.properties 12KB
static
js
bootstrap.js 158KB
popper.min.js 21KB
jquery-3.4.1.js 274KB
css
bootstrap.css 188KB
java
wang
oo0oo
wordcount
WordCountApplication.java 610B
GlobalConfig.java 636B
controller
BookController.java 3KB
UserController.java 2KB
dao
TaskDAO.java 5KB
UserDAO.java 5KB
hadoop
WordCount.java 4KB
HdfsUtil.java 5KB
IKAnalyzerTest.java 944B
IKAnalyzerDemo.java 4KB
pojo
User.java 1KB
Task.java 2KB
service
TaskService.java 6KB
util
DBUtil.java 993B
FileUtil.java 1023B
JsonUtil.java 2KB
WebMvcConfig.java 638B
.mvn
wrapper
maven-wrapper.properties 116B
maven-wrapper.jar 47KB
MavenWrapperDownloader.java 5KB
mvnw 9KB
共 37 条
- 1
资源评论
博士僧小星
- 粉丝: 1922
- 资源: 5884
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 基于matlab实现电磁优化计算功能,进行线型规划优化电磁设计.rar
- 基于matlab实现带精英策略的非支配排序遗传算法matlab 源码.rar
- 基于matlab实现差分进化算法,最新的用于替代遗传算法,是以后的主要发展方法.rar
- VSCode配置c/c++环境教程.md
- 基于matlab实现标准合作型协同进化遗传算法matlab源程序
- 七下人教.zip
- 基于matlab实现本份代码能对图像进行gabor滤波处理,结合指纹方向图以及指纹沟壑频率特性,对指纹图像进行增强.rar
- 基于matlab实现RBM神经网络实现了手写数字体识别的GUI程序.rar
- 基于matlab实现蝙蝠算法优化相关向量机建模对数据进行建模和预测.rar
- 基于matlab实现编写的禁忌搜索算法,解决了TSP问题,对初学者有重要的参考价值.rar
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功