package com.winterchen.hadoopdemo.service.impl;
import cn.hutool.core.date.DateUtil;
import com.winterchen.hadoopdemo.constants.OozieConstants;
import com.winterchen.hadoopdemo.enums.FrequencyTypeEnum;
import com.winterchen.hadoopdemo.enums.TaskTypeEnum;
import com.winterchen.hadoopdemo.model.CoordinatorRequest;
import com.winterchen.hadoopdemo.model.OozieConfig;
import com.winterchen.hadoopdemo.model.WorkflowRequest;
import com.winterchen.hadoopdemo.service.OozieService;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.client.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Properties;
/**
* @author winterchen
* @version 1.0
* @date 2020/11/23 2:06 下午
* @description
**/
@Slf4j
@Service
public class OozieServiceImpl implements OozieService {
@Autowired
private FileSystem fileSystem;
private final OozieConfig oozieConfig;
@Autowired
public OozieServiceImpl(OozieConfig oozieConfig) {
this.oozieConfig = oozieConfig;
}
@Override
public String submitWorkflow(WorkflowRequest workflowRequest) {
try {
OozieClient oozieClient = new OozieClient(oozieConfig.getUrl());
oozieClient.setDebugMode(1);
Path appPath = new Path(fileSystem.getHomeDirectory(), workflowRequest.getWfPath().concat(workflowRequest.getWfName()).concat(".xml"));
// 创建相关文件
// 创建并上传sql文件
String sqlPath = workflowRequest.getWfPath().concat("sql/".concat(workflowRequest.getWfName()).concat("-sql.q"));
createSqlFileAndUpload(workflowRequest.getSql(), sqlPath);
// 创建shell脚本
String shellFileName = workflowRequest.getWfName() + "-shell.sh";
String shellFilePath = workflowRequest.getWfPath().concat(workflowRequest.getWfName()).concat("/shell/");
String shellPath = createShellFileAndUpload(shellFileName, shellFilePath);
// 创建并上传wf脚本文件
createWfFileAndUpload(workflowRequest.getWfName(), workflowRequest.getWfPath(), sqlPath, workflowRequest.getCallbackId());
// 创建脚本任务的配置
Properties prop = oozieClient.createConfiguration();
prop.setProperty(OozieClient.APP_PATH, appPath.toString());
prop.setProperty(oozieClient.LIBPATH, oozieConfig.getOozieLibPath());
prop.setProperty(oozieClient.USE_SYSTEM_LIBPATH, String.valueOf(oozieConfig.isOozieSystemLibPath()));
/*Set Your Application Configuration*/
prop.setProperty(OozieConstants.NAME_NODE, oozieConfig.getNameNode());
prop.setProperty(OozieConstants.JOB_TRACKER,oozieConfig.getJobTracker());
Path outputPath = new Path(fileSystem.getHomeDirectory(), workflowRequest.getWfPath().concat("output/"));
prop.setProperty(OozieConstants.JOB_OUTPUT, outputPath.toString());
prop.setProperty(OozieConstants.JDBC_URL, oozieConfig.getJdbcUrl());
prop.setProperty(OozieConstants.PASSWORD, StringUtils.isEmpty(oozieConfig.getPassword()) ? "" : oozieConfig.getPassword());
prop.setProperty(OozieConstants.SQL_INPUT,workflowRequest.getWfPath().concat("sql/"));
prop.setProperty(OozieConstants.USER_NAME,"admin");
prop.setProperty(OozieConstants.TASK_TYPE, TaskTypeEnum.WORKFLOW.name());
prop.setProperty(OozieConstants.SHELL_FILE_NAME,shellFileName);
prop.setProperty(OozieConstants.SHELL_FILE_PATH, shellPath);
prop.setProperty(OozieConstants.CALLBACK_ID, workflowRequest.getCallbackId());
prop.setProperty(OozieConstants.QUEUE_NAME, oozieConfig.getQueueName());
String jobId = oozieClient.submit(prop);
oozieClient.start(jobId);
log.debug("workflow job submitted, jobId = {}", jobId);
return jobId;
} catch (OozieClientException e) {
log.error("workflow任务提交失败" ,e);
}
return null;
}
@Override
public String submitCoordinator(CoordinatorRequest coordinatorRequest) {
try {
OozieClient oozieClient = new OozieClient(oozieConfig.getUrl());
oozieClient.setDebugMode(1);
Path rootPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getCoordPath());
Path appPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getCoordPath()
.concat(coordinatorRequest.getCoordName()).concat(".xml"));
Path wf = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getWfPath());
// 创建相关文件
// 创建并上传定时调度任务脚本
createCoordFileAndUpload(coordinatorRequest.getCoordName(),coordinatorRequest.getCoordPath(),
wf.toString().concat("/").concat(coordinatorRequest.getWfName()).concat(".xml"),coordinatorRequest.getFrequencyType(), coordinatorRequest.getCallbackId());
// 创建shell脚本
String shellFileName = coordinatorRequest.getWfName() + "-shell.sh";
String shellFilePath = coordinatorRequest.getWfPath().concat(coordinatorRequest.getWfName()).concat("/shell/");
String shellPath = createShellFileAndUpload(shellFileName, shellFilePath);
// 创建脚本任务的配置
Properties prop = oozieClient.createConfiguration();
prop.setProperty(OozieClient.COORDINATOR_APP_PATH, appPath.toString());
prop.setProperty(oozieClient.LIBPATH, oozieConfig.getOozieLibPath());
prop.setProperty(oozieClient.USE_SYSTEM_LIBPATH, String.valueOf(oozieConfig.isOozieSystemLibPath()));
prop.setProperty(OozieConstants.JOB_TRACKER,oozieConfig.getJobTracker());
prop.setProperty(OozieConstants.USER_NAME,"admin");
prop.setProperty(OozieConstants.WORKFLOW_ROOT, rootPath.toString());
String start = DateUtil.format(DateUtil.parse(coordinatorRequest.getStartTime(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd'T'HH:mm'Z'");
prop.setProperty(OozieConstants.START, start);
String end = DateUtil.format(DateUtil.parse(coordinatorRequest.getEndTime(), "yyyy-MM-dd HH:mm:ss"), "yyyy-MM-dd'T'HH:mm'Z'");
prop.setProperty(OozieConstants.END, end);
Path outputPath = new Path(fileSystem.getHomeDirectory(), coordinatorRequest.getWfPath().concat("output/"));
prop.setProperty(OozieConstants.JOB_OUTPUT, outputPath.toString());
prop.setProperty(OozieConstants.JDBC_URL, oozieConfig.getJdbcUrl());
prop.setProperty(OozieConstants.PASSWORD, StringUtils.isEmpty(oozieConfig.getPassword()) ? "" : oozieConfig.getPassword());
prop.setProperty(OozieConstants.SQL_INPUT,coordinatorRequest.getWfPath().concat("sql/"));
prop.setProperty(OozieConstants.TASK_TYPE, TaskTypeEnum.COORDINATOR.name());
prop.setProperty(OozieConstants.SHELL_FILE_NAME,shellFileName);
prop.setProperty(OozieConstants.SHELL_FILE_PATH, shellPath);
prop.setProperty(OozieConstants.CALLBACK_ID, coordinatorRequest.getCallbackId());
prop.setProperty(OozieConstants.QUEUE_NAME, oozieConfig.getQueueName());
/*Set Your Application Configuration*/
prop.setProperty(OozieConstants.NAME_NODE, oozieConfig.getNameNode());
String jobId = oozieClient.submit(prop);
log.debug("workflow job submitted, jobId = {}", jobId);
return jobId;
} catch (OozieClientException e) {
log.error("workflow任务提交失败" ,e);
}
return null;
}
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
SpringBoot配置详解 SpringBoot日志配置 SpringBoot整合Thymeleaf模板 使用JdbcTemplate访问数据库 整合SpringDataJpa 整合Mybatis 通用Mapper与分页插件的集成 整合Lettuce Redis 使用Spring Cache集成Redis 集成Swagger在线调试 初探RabbitMQ消息队列 RabbitMQ延迟队列 actuator 服务监控与管理 actuator与spring-boot-admin 定时任务详解 文件上传 重复提交(分布式锁) 重复提交(本地锁) WebSocket 安全框架(Shiro) 分布式限流 集成hadoop、hive、oozie
资源推荐
资源详情
资源评论
收起资源包目录
Java Springboot学习资料.rar (1541个子文件)
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 6KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
mvnw.cmd 5KB
.DS_Store 6KB
.gitattributes 66B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 395B
.gitignore 278B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 268B
.gitignore 39B
schema.graphqls 107B
index.html 1KB
index.html 646B
springboot-learning-experience.iml 830B
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 50KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
maven-wrapper.jar 46KB
共 1541 条
- 1
- 2
- 3
- 4
- 5
- 6
- 16
资源评论
shangjg3
- 粉丝: 2872
- 资源: 144
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- (源码)基于C语言的系统服务框架.zip
- (源码)基于Spring MVC和MyBatis的选课管理系统.zip
- (源码)基于ArcEngine的GIS数据处理系统.zip
- (源码)基于JavaFX和MySQL的医院挂号管理系统.zip
- (源码)基于IdentityServer4和Finbuckle.MultiTenant的多租户身份认证系统.zip
- (源码)基于Spring Boot和Vue3+ElementPlus的后台管理系统.zip
- (源码)基于C++和Qt框架的dearoot配置管理系统.zip
- (源码)基于 .NET 和 EasyHook 的虚拟文件系统.zip
- (源码)基于Python的金融文档智能分析系统.zip
- (源码)基于Java的医药管理系统.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功