package com.winterchen.hadoopdemo.service.impl;
import cn.hutool.core.date.DateUtil;
import com.winterchen.hadoopdemo.constants.OozieConstants;
import com.winterchen.hadoopdemo.enums.FrequencyTypeEnum;
import com.winterchen.hadoopdemo.enums.TaskTypeEnum;
import com.winterchen.hadoopdemo.model.CoordinatorRequest;
import com.winterchen.hadoopdemo.model.OozieConfig;
import com.winterchen.hadoopdemo.model.WorkflowRequest;
import com.winterchen.hadoopdemo.service.OozieService;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Properties;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/23 2:06 下午
* @description
**/
@Slf4j
@Service
public class OozieServiceImpl implements OozieService {
@Autowired
private FileSystem fileSystem;
private final OozieConfig oozieConfig;
@Autowired
public OozieServiceImpl(OozieConfig oozieConfig) {
this.oozieConfig = oozieConfig;
}
@Override
public String submitWorkflow(WorkflowRequest workflowRequest) {
try {
OozieClient oozieClient = new OozieClient(oozieConfig.getUrl());
oozieClient.setDebugMode(1);
Path appPath = new Path(fileSystem.getHomeDirectory(), workflowRequest.getWfPath().concat(workflowRequest.getWfName()).concat(".xml"));
// 创建相关文件
// 创建并上传sql文件
String sqlPath = workflowRequest.getWfPath().concat("sql/".concat(workflowRequest.getWfName()).concat("-sql.q"));
createSqlFileAndUpload(workflowRequest.getSql(), sqlPath);
// 创建shell脚本
String shellFileName = workflowRequest.getWfName() + "-shell.sh";
String shellFilePath = workflowRequest.getWfPath().concat(workflowRequest.getWfName()).concat("/shell/");
String shellPath = createShellFileAndUpload(shellFileName, shellFilePath);
// 创建并上传wf脚本文件
createWfFileAndUpload(workflowRequest.getWfName(), workflowRequest.getWfPath(), sqlPath, workflowRequest.getCallbackId());
// 创建脚本任务的配置
Properties prop = oozieClient.createConfiguration();
prop.setProperty(OozieClient.APP_PATH, appPath.toString());
prop.setProperty(oozieClient.LIBPATH, oozieConfig.getOozieLibPath());
prop.setProperty(oozieClient.USE_SYSTEM_LIBPATH, String.valueOf(oozieConfig.isOozieSystemLibPath()));
/*Set Your Application Configuration*/
prop.setProperty(OozieConstants.NAME_NODE, oozieConfig.getNameNode());
prop.setProperty(OozieConstants.JOB_TRACKER,oozieConfig.getJobTracker());
Path outputPath = new Path(fileSystem.getHomeDirectory(), workflowRequest.getWfPath().concat("output/"));
prop.setProperty(OozieConstants.JOB_OUTPUT, outputPath.toString());
prop.setProperty(OozieConstants.JDBC_URL, oozieConfig.getJdbcUrl());
prop.setProperty(OozieConstants.PASSWORD, StringUtils.isEmpty(oozieConfig.getPassword()) ? "" : oozieConfig.getPassword());
prop.setProperty(OozieConstants.SQL_INPUT,workflowRequest.getWfPath().concat("sql/"));
prop.setProperty(OozieConstants.USER_NAME,"admin");
prop.setProperty(OozieConstants.TASK_TYPE, TaskTypeEnum.WORKFLOW.name());
prop.setProperty(OozieConstants.SHELL_FILE_NAME,shellFileName);
prop.setProperty(OozieConstants.SHELL_FILE_PATH, shellPath);
prop.setProperty(OozieConstants.CALLBACK_ID, workflowRequest.getCallbackId());
prop.setProperty(OozieConstants.QUEUE_NAME, oozieConfig.getQueueName());
String jobId = oozieClient.submit(prop);
oozieClient.start(jobId);
log.debug("workflow job submitted, jobId = {}", jobId);
return jobId;
} catch (OozieClientException e) {
log.error("workflow任务提交失败" ,e);
}
return null;
}
@Override
public String submitCoordinator(CoordinatorRequest coordinatorRequest) {
try {
OozieClient oozieClient = new OozieClient(oozieConfig.getUrl());
oozieClient.setDebugMode(1);
Path rootPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getCoordPath());
Path appPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getCoordPath()
.concat(coordinatorRequest.getCoordName()).concat(".xml"));
Path wf = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getWfPath());
// 创建相关文件
// 创建并上传定时调度任务脚本
createCoordFileAndUpload(coordinatorRequest.getCoordName(),coordinatorRequest.getCoordPath(),
wf.toString().concat("/").concat(coordinatorRequest.getWfName()).concat(".xml"),coordinatorRequest.getFrequencyType(), coordinatorRequest.getCallbackId());
// 创建shell脚本
String shellFileName = coordinatorRequest.getWfName() + "-shell.sh";
String shellFilePath = coordinatorRequest.getWfPath().concat(coordinatorRequest.getWfName()).concat("/shell/");
String shellPath = createShellFileAndUpload(shellFileName, shellFilePath);
// 创建脚本任务的配置
Properties prop = oozieClient.createConfiguration();
prop.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath.toString());
prop.setProperty(oozieClient.LIBPATH, oozieConfig.getOozieLibPath());
prop.setProperty(oozieClient.USE_SYSTEM_LIBPATH, String.valueOf(oozieConfig.isOozieSystemLibPath()));
prop.setProperty(OozieConstants.JOB_TRACKER,oozieConfig.getJobTracker());
prop.setProperty(OozieConstants.USER_NAME,"admin");
prop.setProperty(OozieConstants.WORKFLOW_ROOT, rootPath.toString());
String start = DateUtil.format(DateUtil.parse(coordinatorRequest.getStartTime(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd'T'HH:mm'Z'");
prop.setProperty(OozieConstants.START, start);
String end = DateUtil.format(DateUtil.parse(coordinatorRequest.getEndTime(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd'T'HH:mm'Z'");
prop.setProperty(OozieConstants.END, end);
Path outputPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getWfPath().concat("output/"));
prop.setProperty(OozieConstants.JOB_OUTPUT, outputPath.toString());
prop.setProperty(OozieConstants.JDBC_URL, oozieConfig.getJdbcUrl());
prop.setProperty(OozieConstants.PASSWORD, StringUtils.isEmpty(oozieConfig.getPassword()) ? "" : oozieConfig.getPassword());
prop.setProperty(OozieConstants.SQL_INPUT,coordinatorRequest.getWfPath().concat("sql/"));
prop.setProperty(OozieConstants.TASK_TYPE, TaskTypeEnum.COORDINATOR.name());
prop.setProperty(OozieConstants.SHELL_FILE_NAME,shellFileName);
prop.setProperty(OozieConstants.SHELL_FILE_PATH, shellPath);
prop.setProperty(OozieConstants.CALLBACK_ID, coordinatorRequest.getCallbackId());
prop.setProperty(OozieConstants.QUEUE_NAME, oozieConfig.getQueueName());
/*Set Your Application Configuration*/
prop.setProperty(OozieConstants.NAME_NODE, oozieConfig.getNameNode());
String jobId = oozieClient.submit(prop);
log.debug("workflow job submitted, jobId = {}", jobId);
return jobId;
} catch (OozieClientException e) {
log.error("workflow任务提交失败" ,e);
}
return null;
}
没有合适的资源?快使用搜索试试~ 我知道了~
SpringBoot配置详解
SpringBoot日志配置
SpringBoot整合Thymeleaf模板
使用JdbcTemplate访问数据库
整合SpringDataJpa
整合Mybatis
通用Mapper与分页插件的集成
整合Lettuce Redis
使用Spring Cache集成Redis
集成Swagger在线调试
初探RabbitMQ消息队列
RabbitMQ延迟队列
actuator 服务监控与管理
actuator与spring-boot-admin
定时任务详解
文件上传
重复提交(分布式锁)
重复提交(本地锁)
WebSocket
安全框架(Shiro)
分布式限流
集成hadoop、hive、oozie
收起资源包目录
Java Springboot学习资料.rar (1541个子文件)
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
.DS_Store 6KB
.gitattributes 66B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 278B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 39B
schema.graphqls 107B
index.html 1KB
index.html 646B
springboot-learning-experience.iml 830B
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
共 1541 条
- 1
- 2
- 3
- 4
- 5
- 6
- 16
资源推荐
资源预览
资源评论
5星 · 资源好评率100%
134 浏览量
5星 · 资源好评率100%
125 浏览量
5星 · 资源好评率100%
121 浏览量
5星 · 资源好评率100%
112 浏览量
2018-11-30 上传
2019-01-29 上传
2018-05-14 上传
2021-01-19 上传
2020-10-15 上传
2024-01-16 上传
188 浏览量
152 浏览量
5星 · 资源好评率100%
135 浏览量
2024-01-15 上传
190 浏览量
2020-09-17 上传
2021-05-13 上传
193 浏览量
2018-08-12 上传
122 浏览量
资源评论
shangjg3
- 粉丝: 3041
- 资源: 144
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功