package org.frameworkset.tran.config;
/**
* Copyright 2008 biaoping.yin
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.fasterxml.jackson.annotation.JsonIgnore;
import org.frameworkset.spi.DefaultApplicationContext;
import org.frameworkset.spi.assemble.GetProperties;
import org.frameworkset.tran.*;
import org.frameworkset.tran.context.BaseImportContext;
import org.frameworkset.tran.context.ImportContext;
import org.frameworkset.tran.context.InitJobContextCall;
import org.frameworkset.tran.context.JobContext;
import org.frameworkset.tran.metrics.job.Metrics;
import org.frameworkset.tran.plugin.metrics.output.ETLMetrics;
import org.frameworkset.tran.plugin.metrics.output.MetricsOutputConfig;
import org.frameworkset.tran.record.SplitHandler;
import org.frameworkset.tran.schedule.*;
import org.frameworkset.tran.schedule.timer.TimerScheduleConfig;
import org.frameworkset.util.annotations.DateFormateMeta;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import static org.frameworkset.tran.DBConfig.*;
/**
* <p>Description: </p>
* <p></p>
* <p>Copyright (c) 2018</p>
* @Date 2019/1/11 21:31
* @author biaoping.yin
* @version 1.0
*/
public class ImportBuilder {
protected InputConfig inputConfig;
protected OutputConfig outputConfig;
protected ImportStartAction importStartAction;
protected ImportEndAction importEndAction;
private Map jobInputParams;
private Map jobOutputParams;
/**
* 输入参数组,将输入参数和输入动态参数组装为一个参数组添加到参数组集合中,添加完成后重置输入参数和输入动态参数,为增加新的参数组做准备
* 通过添加多个参数组,作业调度时,特定的输入插件可以利用参数组中的每组参数发起并发数据请求,比如httpinput插件
*/
private List<JobInputParamGroup> jobInputParamGroups;
private Map<String,DynamicParam> jobDynamicInputParams;
private Map<String,DynamicParam> jobDynamicOutputParams;
protected static Logger logger = LoggerFactory.getLogger(ImportBuilder.class);
private DBConfig statusDbConfig ;
private String statusDbname;
private String statusTableDML;
private String statusHistoryTableDML;
private Integer fetchSize = 5000;
private String jobName;
private String jobId;
private List<ETLMetrics> metrics;
/**
* 指标时间维度字段,不是设置默认采用当前时间,否则采用字段对应的时间值
*/
private String dataTimeField;
private boolean useDefaultMapData = true;
private boolean flushMetricsOnScheduleTaskCompleted;
private boolean cleanKeysWhenflushMetricsOnScheduleTaskCompleted;
private boolean waitCompleteWhenflushMetricsOnScheduleTaskCompleted = true;
public String getDataTimeField() {
return dataTimeField;
}
/**
* 设置指标时间维度字段,不是设置默认采用当前时间,否则采用字段对应的时间值
* @param dataTimeField
* @return
*/
public ImportBuilder setDataTimeField(String dataTimeField) {
this.dataTimeField = dataTimeField;
return this;
}
public ImportBuilder setImportEndAction(ImportEndAction importEndAction) {
this.importEndAction = importEndAction;
return this;
}
public ImportEndAction getImportEndAction() {
return importEndAction;
}
public List<ETLMetrics> getMetrics() {
return metrics;
}
public ImportBuilder setInputConfig(InputConfig inputConfig) {
this.inputConfig = inputConfig;
return this;
}
public ImportBuilder setImportStartAction(ImportStartAction importStartAction) {
this.importStartAction = importStartAction;
return this;
}
public ImportStartAction getImportStartAction() {
return importStartAction;
}
public ImportBuilder setOutputConfig(OutputConfig outputConfig) {
this.outputConfig = outputConfig;
return this;
}
public String getSplitFieldName() {
return splitFieldName;
}
/**
* 添加作业提取数据输入插件条件
* use addJobInputParam(String key, Object value)
* @param key
* @param value
* @return
*
*/
@Deprecated
public ImportBuilder addParam(String key, Object value){
return addJobInputParam(key, value);
}
/**
* 添加作业提取数据输入插件条件
* @param key
* @param value
* @return
*/
public ImportBuilder addJobInputParam(String key, Object value){
if(jobInputParams == null)
jobInputParams = new LinkedHashMap();
this.jobInputParams.put(key,value);
return this;
}
/**
* 添加作业提取数据输入插件动态条件
* @param key
* @param dynamicParam
* @return
*/
public ImportBuilder addJobDynamicInputParam(String key, DynamicParam dynamicParam){
if(jobDynamicInputParams == null)
jobDynamicInputParams = new LinkedHashMap();
this.jobDynamicInputParams.put(key,dynamicParam);
return this;
}
/**
* 添加作业输出插件变量参数
* @param key
* @param value
* @return
*/
public ImportBuilder addJobOutputParam(String key, Object value){
if(jobOutputParams == null)
jobOutputParams = new LinkedHashMap();
this.jobOutputParams.put(key,value);
return this;
}
/**
* 添加作业输出插件动态变量参数
* @param key
* @param dynamicParam
* @return
*/
public ImportBuilder addJobDynamicOutputParam(String key, DynamicParam dynamicParam){
if(jobDynamicOutputParams == null)
jobDynamicOutputParams = new LinkedHashMap();
this.jobDynamicOutputParams.put(key,dynamicParam);
return this;
}
public ImportBuilder setSplitFieldName(String splitFieldName) {
this.splitFieldName = splitFieldName;
return this;
}
private String splitFieldName;
private transient SplitHandler splitHandler;
/**
* 设置强制刷新检测空闲时间间隔,单位:毫秒,在空闲flushInterval后,还没有数据到来,强制将已经入列的数据进行存储操作,默认8秒,为0时关闭本机制
*/
private long flushInterval = 8000l;
private boolean ignoreNullValueField;
private Map<String, Object> geoipConfig;
private boolean sortLastValue = true;
private boolean useBatchContextIndexName = false;
// public abstract InputPlugin buildInputDataTranPlugin(ImportContext importContext);
// public abstract OutputPlugin buildOutputDataTranPlugin(ImportContext importContext);
public DataTranPlugin buildDataTranPlugin(ImportContext importContext){
DataTranPlugin dataTranPlugin = importContext.buildDataTranPlugin();
return dataTranPlugin;
}
/**
* 任务开始时间
*/
private Date scheduleDate;
/**
* 任务结束时间
*/
private Date scheduleEndDate;
private Long deyLay;
public Date getScheduleDate() {
return scheduleDate;
}
public ImportBuilder setScheduleDate(Date scheduleDate) {
this.scheduleDate = scheduleDate;
return this;
}
public Long getDeyLay() {
return deyLay;
}
// public void setDeyLay(Long deyLay) {
// this.deyLay = deyLay;
// }
private ScheduleConfig scheduleConfig;
protected ImportIncreamentConfig importIncreamentConfig = new ImportIncreamentConfig();;
private String lastValueStorePassword;
public boolean isExternalTimer() {
return externalTimer;
}
protected DataStream createDataStream(){
return new DataStream();
}
public Map<String, Object> getGeoipConfig() {
return geoipConfig;
}
public void setStatusTableId(String statusTableId) {
if(importIncreamentConfig == null){
importIncreamentConfig = new ImportIncreamentConfig();
}
importIncreamentConfig.setStatusTableId(statusTable
没有合适的资源?快使用搜索试试~ 我知道了~
由 bboss 开源的数据采集&流批一体化工具,提供数据采集、数据清洗转换处理和数据入库以及数据指标统计计算流批一体化处理功能
共614个文件
java:453个
md:48个
gitignore:29个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 6 浏览量
2023-06-08
08:57:47
上传
评论
收藏 915KB ZIP 举报
温馨提示
bboss-datatran 由 bboss 开源的数据采集&流批一体化工具,提供数据采集、数据清洗转换处理和数据入库以及数据指标统计计算流批一体化处理功能。
资源推荐
资源详情
资源评论
收起资源包目录
由 bboss 开源的数据采集&流批一体化工具,提供数据采集、数据清洗转换处理和数据入库以及数据指标统计计算流批一体化处理功能 (614个子文件)
gradlew.bat 3KB
.classpath 1KB
.classpath 1KB
.classpath 1KB
.classpath 1KB
.classpath 1KB
.classpath 617B
.classpath 352B
.gitattributes 65B
.gitignore 586B
.gitignore 74B
.gitignore 74B
.gitignore 74B
.gitignore 74B
.gitignore 74B
.gitignore 74B
.gitignore 74B
.gitignore 74B
.gitignore 26B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 14B
.gitignore 8B
.gitignore 8B
.gitignore 8B
.gitignore 8B
.gitignore 8B
.gitignore 8B
.gitignore 8B
.gitignore 8B
.gitignore 7B
bboss.gpg 3KB
build.gradle 6KB
build.gradle 2KB
build.gradle 967B
build.gradle 816B
build.gradle 753B
settings.gradle 726B
build.gradle 715B
build.gradle 711B
build.gradle 692B
build.gradle 653B
build.gradle 651B
build.gradle 625B
build.gradle 618B
build.gradle 617B
build.gradle 611B
build.gradle 548B
build.gradle 537B
build.gradle 495B
build.gradle 494B
build.gradle 493B
build.gradle 493B
build.gradle 493B
gradlew 8KB
ImportBuilder.java 55KB
BaseStatusManager.java 52KB
FileReaderTask.java 51KB
DataTranPluginImpl.java 39KB
FileListenerService.java 30KB
MySQLBinlogListener.java 27KB
CommonRecordTranJob.java 26KB
StringTranJob.java 24KB
DBConfig.java 24KB
BaseImportConfig.java 24KB
GeoIPFilter.java 23KB
FileInputDataTranPlugin.java 21KB
FileDataTranPluginImpl.java 20KB
FileConfig.java 19KB
BaseImportContext.java 19KB
ContextImpl.java 18KB
KeyTimeMetrics.java 17KB
BaseElasticsearchDataTran.java 17KB
SFTPTransfer.java 16KB
HttpConfigClientProxy.java 15KB
FtpTransfer.java 15KB
BaseDataTran.java 15KB
FileInputConfig.java 15KB
DBOutputConfig.java 14KB
ExportExcel.java 14KB
DbSearcher.java 14KB
ClientOptions.java 13KB
TranUtil.java 13KB
ElasticsearchInputDataTranPlugin.java 13KB
ExcelFileReaderTask.java 12KB
TimeUtil.java 12KB
TimeMetrics.java 11KB
HttpInputDataTranPlugin.java 11KB
HbaseTemplate2.java 11KB
GeoIPUtil.java 11KB
MySQLBinlogListenerTest.java 11KB
MysqlBinlogDataTranPluginImpl.java 11KB
FileTransfer.java 11KB
共 614 条
- 1
- 2
- 3
- 4
- 5
- 6
- 7
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6651
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功