package cn.youth.datacollection.core.repository;
import cn.youth.datacollection.core.pool.RepositoryObject;
import cn.youth.datacollection.core.pool.RepositoryObjectPool;
import cn.youth.datacollection.service.impl.KRepositoryServiceImpl;
import cn.youth.datacollection.util.*;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import cn.youth.datacollection.core.database.DatabaseCodec;
import cn.youth.datacollection.entity.KRepository;
import lombok.extern.slf4j.Slf4j;
import cn.youth.datacollection.core.enums.GlobalStatusEnum;
import cn.youth.datacollection.core.exceptions.MyMessageException;
import cn.youth.datacollection.core.povo.TreeDTO;
import cn.youth.datacollection.core.enums.RepTypeEnum;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.DBCache;
import org.pentaho.di.core.ProgressNullMonitorListener;
import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.database.PartitionDatabaseMeta;
import org.pentaho.di.core.database.SqlScriptStatement;
import org.pentaho.di.core.exception.KettleDatabaseException;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.logging.KettleLogStore;
import org.pentaho.di.core.logging.LoggingObjectInterface;
import org.pentaho.di.core.logging.LoggingObjectType;
import org.pentaho.di.core.logging.SimpleLoggingObject;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.RepositoryPluginType;
import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.repository.*;
import org.pentaho.di.repository.filerep.KettleFileRepository;
import org.pentaho.di.repository.filerep.KettleFileRepositoryMeta;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.ui.core.database.dialog.SQLEditor;
import org.pentaho.ui.database.Messages;
import org.springframework.util.StringUtils;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
* kettle资源库工具类
*
* @author lyf
*/
@Slf4j
public class RepositoryUtil {
/**
* 资源库缓存
*/
private static final Map<String, AbstractRepository> DATABASE_REP = new ConcurrentHashMap<>();
/**
* 日志对象接口
*/
public static final LoggingObjectInterface loggingObject = new SimpleLoggingObject("DatabaseController", LoggingObjectType.DATABASE, null);
/**
* 连接到数据库资源库
*
* @param dbRep 连接参数
*/
private static AbstractRepository databaseRepository(KRepository dbRep) {
// 检查资源库是否存在
if (dbRep.getId() != null && DATABASE_REP.containsKey(dbRep.getId())) {
if (DATABASE_REP.get(dbRep.getId()).test()) {
return DATABASE_REP.get(dbRep.getId());
}
}
// 获取数据连接元
DatabaseMeta dataMeta = new DatabaseMeta(dbRep.getDbName(), dbRep.getDbType(), dbRep.getDbAccess(), dbRep.getDbHost(), dbRep.getDbName(), dbRep.getDbPort(), dbRep.getDbUsername(), dbRep.getDbPassword());
// 设置连接池
dataMeta.setUsingConnectionPool(true);
// 数据库资源库元
KettleDatabaseRepositoryMeta drm = new KettleDatabaseRepositoryMeta();
drm.setConnection(dataMeta);
drm.setName(dbRep.getRepName());
// 初始化并连接到数据库资源库
KettleDatabaseRepository rep = new KettleDatabaseRepository();
rep.init(drm);
// 开始连接资源库
try {
rep.connect(dbRep.getRepUsername(), dbRep.getRepPassword());
} catch (KettleException e) {
String msg = "连接数据库资源库失败";
log.error(msg, e);
throw new MyMessageException(GlobalStatusEnum.KETTLE_ERROR, msg);
}
// 缓存资源库信息
if (dbRep.getId() != null) {
DATABASE_REP.put(dbRep.getId(), rep);
}
return rep;
}
/**
* 连接到文件资源库
*
* @param fileRep 连接参数
*/
private static AbstractRepository fileRepository(KRepository fileRep) {
// 检查资源库是否存在
if (fileRep.getId() != null && DATABASE_REP.containsKey(fileRep.getId())) {
return DATABASE_REP.get(fileRep.getId());
}
// 判断文件是否存在
String baseDir = FileUtil.replaceSeparator(fileRep.getRepBasePath());
if (StringUtil.isEmpty(baseDir) || !new File(baseDir).exists()) {
throw new MyMessageException(GlobalStatusEnum.KETTLE_ERROR, "文件资源库不存在");
}
// 文件资源库元数据
KettleFileRepositoryMeta frm = new KettleFileRepositoryMeta();
frm.setBaseDirectory(baseDir);
frm.setName(fileRep.getRepName());
// 初始化资源库
KettleFileRepository rep = new KettleFileRepository();
rep.init(frm);
// 开始连接资源库
try {
rep.connect(fileRep.getRepUsername(), fileRep.getRepPassword());
} catch (KettleException e) {
String msg = "连接文件资源库失败";
log.error(msg, e);
throw new MyMessageException(GlobalStatusEnum.KETTLE_ERROR, msg);
}
// 缓存资源库信息
if (fileRep.getId() != null) {
DATABASE_REP.put(fileRep.getId(), rep);
}
return rep;
}
/**
* 连接资源库
*
* @param kRepository 连接参数
*/
public static AbstractRepository connection(KRepository kRepository) {
AbstractRepository repository = null;
// 判断资源库是否在缓存中
if (isExist(kRepository.getId())) {
repository = getRepository(kRepository.getId());
//判断资源库连接状态
if (repository.isConnected()) {
return repository;
} else {
log.warn("资源库链接已断开,即将进行重连!!!");
}
}
repository = createConnection(kRepository);
// 返回资源库
return repository;
}
/**
* 创建资源库链接
*
* @param kRepository
* @return
*/
public static AbstractRepository createConnection(KRepository kRepository) {
// 不存在就创建资源库
AbstractRepository repository = null;
RepTypeEnum repTypeEnum = RepTypeEnum.getEnum(kRepository.getRepType());
if (repTypeEnum != null) {
switch (repTypeEnum) {
case FILE:
repository = fileRepository(kRepository);
break;
case DB:
repository = databaseRepository(kRepository);
break;
default:
throw new IllegalStateException("Unexpected value: " + kRepository.getRepType());
}
}
return repository;
}
/**
* 批量连接资源库
*
* @param list 连接参数列表
*/
public static void connectionBatch(List<KRepository> list) {
list.forEach(RepositoryUtil::connection);
}
/**
* 断开指定资源库
*
* @param repId 资源库ID
*/
public static void disconnection(String repId) {
// 检查资源库是否存在
if (DATABASE_REP.containsKey(repId)) {
AbstractRepository repository = DATABASE_REP.get(repId);
// 断开连接
repository.disconnect();
repository.clearSharedObjectCache();
// 清除缓存
DATABASE_REP.remove(repId);
}
}
/**
* 断开所有资源库
*/
public static void disconnectionAll() {
if (