/*
* Copyright 2019 WeBank
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.webank.wedatasphere.dss.orchestrator.server.service.impl;
import com.google.common.collect.Lists;
import com.webank.wedatasphere.dss.appconn.core.AppConn;
import com.webank.wedatasphere.dss.common.constant.project.ProjectUserPrivEnum;
import com.webank.wedatasphere.dss.common.exception.DSSErrorException;
import com.webank.wedatasphere.dss.common.label.DSSLabel;
import com.webank.wedatasphere.dss.common.label.DSSLabelUtil;
import com.webank.wedatasphere.dss.common.label.EnvDSSLabel;
import com.webank.wedatasphere.dss.common.protocol.JobStatus;
import com.webank.wedatasphere.dss.common.protocol.project.ProjectUserAuthRequest;
import com.webank.wedatasphere.dss.common.protocol.project.ProjectUserAuthResponse;
import com.webank.wedatasphere.dss.common.utils.DSSExceptionUtils;
import com.webank.wedatasphere.dss.common.utils.MapUtils;
import com.webank.wedatasphere.dss.common.utils.RpcAskUtils;
import com.webank.wedatasphere.dss.contextservice.service.ContextService;
import com.webank.wedatasphere.dss.framework.common.exception.DSSFrameworkErrorException;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorInfo;
import com.webank.wedatasphere.dss.orchestrator.common.entity.DSSOrchestratorVersion;
import com.webank.wedatasphere.dss.orchestrator.common.entity.OrchestratorVo;
import com.webank.wedatasphere.dss.orchestrator.common.protocol.RequestOrchestratorInfos;
import com.webank.wedatasphere.dss.orchestrator.common.protocol.RequestProjectUpdateOrcVersion;
import com.webank.wedatasphere.dss.orchestrator.common.protocol.ResponseOrchestratorInfos;
import com.webank.wedatasphere.dss.orchestrator.common.ref.OrchestratorRefConstant;
import com.webank.wedatasphere.dss.orchestrator.core.DSSOrchestrator;
import com.webank.wedatasphere.dss.orchestrator.core.exception.DSSOrchestratorErrorException;
import com.webank.wedatasphere.dss.orchestrator.core.utils.OrchestratorUtils;
import com.webank.wedatasphere.dss.orchestrator.db.dao.OrchestratorMapper;
import com.webank.wedatasphere.dss.orchestrator.loader.OrchestratorManager;
import com.webank.wedatasphere.dss.orchestrator.publish.utils.OrchestrationDevelopmentOperationUtils;
import com.webank.wedatasphere.dss.orchestrator.server.conf.OrchestratorConf;
import com.webank.wedatasphere.dss.orchestrator.server.constant.DSSOrchestratorConstant;
import com.webank.wedatasphere.dss.orchestrator.server.entity.request.OrchestratorModifyRequest;
import com.webank.wedatasphere.dss.orchestrator.server.entity.request.OrchestratorRequest;
import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorBaseInfo;
import com.webank.wedatasphere.dss.orchestrator.server.entity.vo.OrchestratorUnlockVo;
import com.webank.wedatasphere.dss.orchestrator.server.service.OrchestratorService;
import com.webank.wedatasphere.dss.sender.service.DSSSenderServiceFactory;
import com.webank.wedatasphere.dss.standard.app.development.operation.*;
import com.webank.wedatasphere.dss.standard.app.development.ref.*;
import com.webank.wedatasphere.dss.standard.app.development.service.DevelopmentService;
import com.webank.wedatasphere.dss.standard.app.development.service.RefCRUDService;
import com.webank.wedatasphere.dss.standard.app.development.service.RefQueryService;
import com.webank.wedatasphere.dss.standard.app.development.standard.DevelopmentIntegrationStandard;
import com.webank.wedatasphere.dss.standard.app.sso.Workspace;
import com.webank.wedatasphere.dss.standard.common.desc.AppDesc;
import com.webank.wedatasphere.dss.standard.common.desc.AppInstance;
import com.webank.wedatasphere.dss.standard.common.entity.ref.ResponseRef;
import com.webank.wedatasphere.dss.standard.common.exception.operation.ExternalOperationWarnException;
import com.webank.wedatasphere.dss.workflow.common.protocol.*;
import org.apache.commons.collections.CollectionUtils;
import org.apache.linkis.cs.client.ContextClient;
import org.apache.linkis.cs.client.builder.ContextClientFactory;
import org.apache.linkis.rpc.Sender;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
public class OrchestratorServiceImpl implements OrchestratorService {
private static final Logger LOGGER = LoggerFactory.getLogger(OrchestratorServiceImpl.class);
@Autowired
private OrchestratorManager orchestratorManager;
@Autowired
private OrchestratorMapper orchestratorMapper;
@Autowired
private ContextService contextService;
private static final int VALID_FLAG = 1;
@Override
@Transactional(rollbackFor = Exception.class)
public OrchestratorVo createOrchestrator(String userName,
Workspace workspace,
String projectName,
Long projectId,
String description,
DSSOrchestratorInfo dssOrchestratorInfo,
List<DSSLabel> dssLabels) throws Exception {
OrchestratorVo orchestratorVo = new OrchestratorVo();
//todo 增加校验
String uuid = UUID.randomUUID().toString();
//作为Orchestrator的唯一标识,包括跨环境导入导出也不发生变化。
dssOrchestratorInfo.setUUID(uuid);
String version = OrchestratorUtils.generateNewVersion();
String contextId = contextService.createContextID(workspace.getWorkspaceName(), projectName, dssOrchestratorInfo.getName(), version, userName);
LOGGER.info("Create a new ContextId: {} for new orchestrator {}.", contextId, dssOrchestratorInfo.getName());
//1. 访问DSS工作流微模块创建工作流
RefJobContentResponseRef appRef = tryRefOperation(dssOrchestratorInfo, userName, workspace, dssLabels, null,
developmentService -> ((RefCRUDService) developmentService).getRefCreationOperation(),
dssContextRequestRef -> dssContextRequestRef.setContextId(contextId),
projectRefRequestRef -> projectRefRequestRef.setProjectName(projectName).setRefProjectId(projectId),
(developmentOperation, developmentRequestRef) -> {
DSSOrchestrator dssOrchestrator = orchestratorManager.getOrCreateOrchestrator(userName,
workspace.getWorkspaceName(), dssOrchestratorInfo.getType(), dssLabels);
Map<String, Object> dssJobContent = MapUtils.newCommonMapBuilder()
.put(OrchestratorRefConstant.DSS_ORCHESTRATOR_INFO_KEY, dssOrchestratorInfo)
.put(OrchestratorRefConstant.ORCHESTRATOR_VERSION_KEY, version)
.put(OrchestratorRefConstant.ORCHESTRATION_SCHEDULER_APP_CONN, Optional.ofNullable(dssOrchestrator)
.map(DSSOrchestrator::getSchedulerAppConn).map(AppConn::getAppDesc).map(AppDesc::getAppName)
.