package com.meiyou.bigwhale.service;
import com.meiyou.bigwhale.common.Constant;
import com.meiyou.bigwhale.config.SshConfig;
import com.meiyou.bigwhale.config.YarnConfig;
import com.meiyou.bigwhale.data.service.AbstractMysqlPagingAndSortingQueryService;
import com.meiyou.bigwhale.dto.DtoScript;
import com.meiyou.bigwhale.entity.*;
import com.meiyou.bigwhale.entity.auth.User;
import com.meiyou.bigwhale.service.auth.UserService;
import com.meiyou.bigwhale.util.WebHdfsUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.persistence.TypedQuery;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Predicate;
import javax.persistence.criteria.Root;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@Service
public class ScriptServiceImpl extends AbstractMysqlPagingAndSortingQueryService<Script, Integer> implements ScriptService {
private static final Pattern DATE_PATTERN = Pattern.compile("(\\$\\{now(\\s*([-+])\\s*(\\d+)([dhms]))*(@([A-Za-z0-9\\s-':+.]+))*})+");
@Autowired
private MonitorService monitorService;
@Autowired
private ScriptHistoryService scriptHistoryService;
@Autowired
private ClusterService clusterService;
@Autowired
private UserService userService;
@Autowired
private AgentService agentService;
@Autowired
private ClusterUserService clusterUserService;
@Autowired
private SshConfig sshConfig;
@Autowired
private YarnConfig yarnConfig;
@Transactional(rollbackFor = Exception.class)
@Override
public void delete(Script entity) {
//删除jar包
if (entity.isYarn()) {
deleteJar(entity);
}
if (entity.getMonitorId() != null) {
monitorService.deleteById(entity.getMonitorId());
}
super.delete(entity);
}
@Override
public Page<Script> fuzzyPage(DtoScript req) {
CriteriaBuilder builder = entityManager.getCriteriaBuilder();
CriteriaQuery<Script> criteriaQuery = builder.createQuery(clazz);
Root<Script> root = criteriaQuery.from(clazz);
CriteriaQuery<Long> countCriteriaQuery = builder.createQuery(Long.class);
Root<Script> countRoot = countCriteriaQuery.from(clazz);
countCriteriaQuery.select(builder.count(countRoot));
this.predicate(builder, criteriaQuery, root, req);
this.predicate(builder, countCriteriaQuery, countRoot, req);
long totalCount = entityManager.createQuery(countCriteriaQuery).getSingleResult();
int pageNo = req.pageNo - 1;
int pageSize = req.pageSize;
TypedQuery<Script> typedQuery = entityManager.createQuery(criteriaQuery);
typedQuery.setFirstResult(pageNo * pageSize);
typedQuery.setMaxResults(pageSize);
List<Script> entities = typedQuery.getResultList();
return new PageImpl<>(entities, new PageRequest(pageNo, pageSize), totalCount);
}
@Override
public String validate(DtoScript req) {
String msg = checkLegal(req);
if (msg != null) {
return msg;
}
//应用内存资源参数检查和补充必要参数
if (req.isYarn()) {
if (yarnConfig.getAppMemoryThreshold() > 0 && !yarnConfig.getAppWhiteList().contains(req.getApp())) {
try {
int totalMemory = calResource(req.getType(), req.getContent()).get("totalMemory");
if (totalMemory > yarnConfig.getAppMemoryThreshold()) {
//超过阀值
return "内存参数配置达到大内存应用标准【" + yarnConfig.getAppMemoryThreshold() + "MB】 !请调小内存参数或联系管理员升级为大内存应用。";
}
} catch (NumberFormatException e) {
return "内存参数请向上取整";
}
}
appendNecessaryArgs(req);
}
if (req.getId() != null) {
Script dbScript = findById(req.getId());
if (Constant.ScriptType.SPARK_STREAM.equals(dbScript.getType()) || Constant.ScriptType.FLINK_STREAM.equals(dbScript.getType())) {
//更换集群或队列时检查应用是否正在运行
if (checkNeedKillYarnAppIfChangeClusterOrQueue(dbScript, req)) {
return "更换集群或队列前请先关闭正在运行的应用";
}
}
if (dbScript.isYarn()) {
//检查程序包是否变更
String dbJarPath = extractJarPath(dbScript.getContent());
String reqJarPath = extractJarPath(req.getContent());
if (!dbJarPath.equals(reqJarPath)) {
deleteJar(dbScript);
}
}
}
if (StringUtils.isBlank(req.getUser())) {
req.setUser(sshConfig.getUser());
}
return null;
}
@Transactional(rollbackFor = Exception.class)
@Override
public Script update(Script entity, Monitor monitorEntity) {
Monitor monitor = monitorService.save(monitorEntity);
if (entity.getMonitorId() == null) {
entity.setMonitorId(monitor.getId());
}
return super.save(entity);
}
@Override
public ScriptHistory generateHistory(Script script) {
return generateHistory(script, null, null, null, null);
}
@Override
public ScriptHistory generateHistory(Script script, Monitor monitor) {
return generateHistory(script, monitor, null, null, null);
}
@Transactional(rollbackFor = Exception.class)
@Override
public void generateHistory(Schedule schedule, String scheduleInstanceId, String previousScheduleTopNodeId) {
recursiveGenerateHistory(schedule, scheduleInstanceId, previousScheduleTopNodeId);
}
@Transactional(rollbackFor = Exception.class)
@Override
public void reGenerateHistory(Schedule schedule, String scheduleInstanceId, String previousScheduleTopNodeId) {
generateHistory(schedule, scheduleInstanceId, previousScheduleTopNodeId);
}
@Override
public String extractJarPath(String content) {
String [] tokens = content.split(" ");
int jarIndex = -1;
for (int i = 0; i < tokens.length; i ++) {
String token = tokens[i];
if (token.contains(".jar") || token.contains(".py")) {
if (!"--jars".equals(tokens[i - 1]) && !"-j".equals(tokens[i - 1]) && !"--jar".equals(tokens[i - 1])) {
jarIndex = i;
break;
}
}
}
if (jarIndex != -1) {
return tokens[jarIndex];
}
return null;
}
@Override
public void deleteJar(Script entity) {
String jarPath = extractJarPath(entity.getContent());
if (jarPath != null) {
//检查是否还被引用
boolean used = false;
for (Script item : findByQuery("createBy=" + entity.getCreateBy() + ";id!=" + entity.getId())) {
if (jarPath.equals(extractJarPath(item.getContent()))) {
used = true;
break;
}
}
if (!used) {
for (Cluster cluster : clusterService.findAll()) {
if (jarPath.startsWith(cluster.getFsDefaultFs())) {
String fs
没有合适的资源?快使用搜索试试~ 我知道了~
分布式计算任务调度系统,提供Spark、Flink等批处理任务的DAG调度和流处理任务的运行管理和状态监控
共1191个文件
js:868个
java:126个
gif:76个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 3 浏览量
2023-04-14
15:32:34
上传
评论
收藏 3.83MB ZIP 举报
温馨提示
巨鲸任务调度平台为美柚大数据研发的分布式计算任务调度系统,提供Spark、Flink等批处理任务的DAG调度和流处理任务的运行管理和状态监控,并具有Yarn应用管理、重复应用检测、大内存应用检测等功能。
资源推荐
资源详情
资源评论
收起资源包目录
分布式计算任务调度系统,提供Spark、Flink等批处理任务的DAG调度和流处理任务的运行管理和状态监控 (1191个子文件)
bootstrap.min.css 118KB
layui.css 68KB
animate.min.css 46KB
sweetalert2.min.css 26KB
font-awesome.min.css 26KB
layer.css 14KB
dropzone.min.css 13KB
layui.mobile.css 10KB
bootstrap-select.min.css 10KB
laydate.css 7KB
bootstrap-switch.min.css 6KB
script-type.css 6KB
xadmin.css 6KB
laydate.css 6KB
schedule_edit.css 5KB
laydate.css 3KB
laydate.css 3KB
laydate.css 3KB
common.css 2KB
schedule_instance.css 1KB
code.css 1KB
big-whale.css 818B
stream_edit.css 336B
mypage.css 333B
angular-csp.css 263B
mydaterange.css 207B
bootstrap-treeview.min.css 204B
hdfs.css 109B
rotate.cur 2KB
iconfont.eot 40KB
iconfont.eot 7KB
icomoon.eot 1KB
flink 3KB
59.gif 10KB
22.gif 10KB
24.gif 8KB
13.gif 7KB
16.gif 7KB
39.gif 6KB
64.gif 6KB
63.gif 6KB
50.gif 6KB
loading-0.gif 6KB
4.gif 6KB
1.gif 5KB
42.gif 5KB
71.gif 5KB
21.gif 5KB
20.gif 5KB
29.gif 5KB
70.gif 4KB
5.gif 4KB
17.gif 4KB
27.gif 4KB
9.gif 4KB
44.gif 4KB
11.gif 4KB
8.gif 4KB
3.gif 4KB
23.gif 4KB
34.gif 4KB
41.gif 4KB
38.gif 4KB
65.gif 3KB
32.gif 3KB
45.gif 3KB
7.gif 3KB
12.gif 3KB
26.gif 3KB
60.gif 3KB
2.gif 3KB
40.gif 3KB
25.gif 3KB
19.gif 3KB
66.gif 3KB
18.gif 3KB
46.gif 3KB
10.gif 3KB
28.gif 3KB
51.gif 3KB
57.gif 3KB
67.gif 3KB
0.gif 3KB
48.gif 3KB
43.gif 3KB
30.gif 2KB
61.gif 2KB
33.gif 2KB
69.gif 2KB
14.gif 2KB
47.gif 2KB
36.gif 2KB
49.gif 2KB
58.gif 2KB
6.gif 2KB
54.gif 2KB
53.gif 2KB
56.gif 2KB
62.gif 2KB
31.gif 2KB
共 1191 条
- 1
- 2
- 3
- 4
- 5
- 6
- 12
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6651
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功