package com.sf.sgs.data.integration.core.adapter.hbase.tool;
import com.kenai.constantine.Constant;
import com.sf.sgs.data.integration.core.adapter.persistence.repository.report.ExpReportRepository;
import com.sf.sgs.data.integration.core.application.report.job.ReportETLJob;
import com.sf.sgs.data.integration.core.common.constant.CompareOperator;
import com.sf.sgs.data.integration.core.common.constant.ExportConstant;
import com.sf.sgs.data.integration.core.common.disconf.SystemConfig;
import com.sf.sgs.data.integration.core.common.util.DateUtil;
import com.sf.sgs.data.integration.core.common.util.FileUtil;
import com.sf.sgs.data.integration.core.common.util.StringUtil;
import com.sf.sgs.data.integration.core.domain.model.report.FilterCellEntity;
import com.sf.sgs.data.integration.core.domain.model.report.HbaseExportEntity;
import com.sf.sgs.data.integration.core.domain.model.report.TableCellEntity;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
@Component
public class HBaseExport {
public Logger logger = LoggerFactory.getLogger(HBaseExport.class);
@Autowired
private ExpReportRepository repository;
@Autowired
private SystemConfig systemConfig;
/**
* 获取配置中的日期[用于生成文件目录,文件名]
* @return
*/
private String getDateStr(String format,int i) {
int num =-1;
String diffNum = systemConfig.getReportExportTime(); //读取disconf中配置相隔的自然天数
if(!StringUtils.isEmpty(diffNum)) {
num = Integer.parseInt(diffNum);
}
String dateStr = DateUtil.getPreviousDateStr(format,num+i);
return dateStr;
}
/**
* 获取过滤器中开始日期
* @param format
* @return
*/
private String getDateStrStart(String format) {
String dateStr = getDateStr(format,0);
//dateStr = dateStr + " " + "00:00:00";
return dateStr;
}
/**
* 获取过滤器中结束日期
* @param format
* @return
*/
private String getDateStrEnd(String format) {
String dateStr = getDateStr(format,0);
//dateStr = dateStr + " " + "23:59:59";
return dateStr;
}
/**
* 读库方式,读取mysql数据库数据获取导出条件配置
*/
public void ExportConfigByReadMysqlData() throws Exception{
List<HbaseExportEntity> list = null;
try {
list = repository.getExpReportList();
} catch (Exception e) {
logger.error("调用 getExpReportList 方法,返回值:"+ e.getMessage());
throw new Exception("调用 getExpReportList 方法出错!"+e.getMessage());
}
if(list!=null&&list.size()>0) {
for(HbaseExportEntity entity : list) {
entity.setCellNameCnList(getTableCellNameCnList(entity));
entity.setCellNameEnList(getTableCellNameEnList(entity));
entity = getFilterCondition(entity);
getContentFromHbase(entity);
}
}
}
/**
* 设置filter过滤条件
* @param entity
* @return
*/
private HbaseExportEntity getFilterCondition(HbaseExportEntity entity) throws Exception {
List<FilterCellEntity> list = null;
try {
list = repository.getFilterCellList(entity.getExportCode());
} catch (Exception e) {
logger.error("调用 getFilterCellList 方法,返回值:"+ e.getMessage());
throw new Exception("调用 getFilterCellList 方法出错!"+e.getMessage());
}
List<String> filterCellNameList = new ArrayList<String>();
List<String> filterOpList = new ArrayList<String>();
List<String> filterCellValueList = new ArrayList<String>();
List<String> filterTypeList = new ArrayList<String>();
if(list!=null&&list.size()>0) {
for(FilterCellEntity en : list) {
filterCellNameList.add(en.getCellName());
filterOpList.add(en.getCompareOp());
if(en.getFilterType() == ExportConstant.OPS_FILTER_TYPE ) { //读disconf中配置相隔的自然天数(-1表示昨天,0就是今天)
filterCellValueList.add(getFilterCellValue(en.getCellValue()));
}else{
filterCellValueList.add(getFilterCellValue(en.getCellValue()));
}
filterTypeList.add(en.getFilterType()+"");
}
}
entity.setFilterCellList(filterCellNameList);
entity.setFilterCompareOpList(filterOpList);
entity.setFilterValueList(filterCellValueList);
entity.setFilterTypeList(filterTypeList);
return entity;
}
/**
* 获取mysql中过滤器列值{添加指定过滤条件}
* @param en
* @return
*/
private String getFilterCellValue(String value) {
if(!StringUtils.isEmpty(value)) {
if(value.indexOf(ExportConstant.YESTERDAY)!=-1) {
value = value.replace(ExportConstant.YESTERDAY,getDateStr(DateUtil.DATE_FORMAT_DAY_10, 0));
}else if(value.indexOf(ExportConstant.TODAY)!=-1) {
value = value.replace(ExportConstant.TODAY, getDateStr(DateUtil.DATE_FORMAT_DAY_10, 1));
}
}
return value;
}
/**
* 设置要导出的英文列名
* @param entity
* @return
* @throws Exception
*/
private List<String> getTableCellNameEnList(HbaseExportEntity entity) throws Exception {
List<TableCellEntity> list = null;
try {
list = repository.getTableCellList(entity.getExportCode());
} catch (Exception e) {
logger.error("调用 getTableCellList 方法,返回值:"+ e.getMessage());
throw new Exception("调用 getTableCellList 方法出错!"+e.getMessage());
}
List<String> cellList = new ArrayList<String>();
for(TableCellEntity cellEntity : list) {
cellList.add(cellEntity.getCellNameEn());
}
return cellList;
}
/**
* 设置要导出的中文列
* @param entity
* @return
* @throws Exception
*/
private List<String> getTableCellNameCnList(HbaseExportEntity entity) throws Exception {
List<TableCellEntity> list = null;
try {
list = repository.getTableCellList(entity.getExportCode());
} catch (Exception e) {
logger.error("调用 getTableCellList 方法,返回值:"+ e.getMessage());
throw new Exception("调用 getTableCellList 方法出错!"+e.getMessage());
}
List<String> cellList = new ArrayList<String>();
for(TableCellEntity cellEntity : list) {
cellList.add(cellEntity.getCellNameCn());
}
return cellList;
}
/**
*每次读取Hbase数据库Page_Size条
* @param entity
*/
public void getContentFromHbase(HbaseExportEntity entity) {
List<String> list = new ArrayList<String>();
ResultScanner scanner = null;
String startRow = entity.getStartRow();
String endRow = entity.getStopRow();
String tableName = entity.getTableName();
String cellNameStr = getCellNameStr(entity.getCellNameCnList(),entity.getFieldSplit()); //列名
int fileCount = 1;
String fileName = FileUtil.getFileName(entity,fileCount,getDateStr(DateUtil.DATE_FORMAT_DAY_8,0)); //获取文件名
String baseDir = entity.getSavePath();
//文件存储路径(配置)+ tableName + 时间戳 + 文件名(程序自动生成)+文件扩展名(配置)
String fileDir = entity.getExportCode()
+ File.separator + getDateStr(DateUtil.DATE_FORMAT_DAY_8,0); //保存目录
String fileUrl = baseDir+ File.separator + fileDir + File.separator + fileName; //保存绝对文件路径
cellNameStr = showFirstLineContent(entity,cellNameStr);
Filter filter = getHbaseExportFilter(entity);
logger.info("读hbase