package com.yj;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.net.URLDecoder;
import java.sql.Connection;
import java.sql.SQLException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import com.yj.hive.JDBCUtils;
import com.yj.hive.HiveJdbcUtil;
import org.apache.commons.lang.StringUtils;
import com.yj.pc.Consumer;
import com.yj.oracle.CustomDataSource;
import com.yj.pc.Producer;
import org.apache.commons.dbutils.QueryRunner;
import org.apache.commons.dbutils.handlers.ColumnListHandler;
import org.apache.commons.dbutils.handlers.MapListHandler;
import org.apache.commons.dbutils.handlers.ScalarHandler;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.log4j.Logger;
/**
* @author lff
* @description 合并Hive小文件
*/
public class MergeApp {
private static final Logger log = Logger.getLogger(MergeApp.class);
private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 获取需要合并的表
*
* @return List<String>
*/
public static List<String> getHiveTableList() {
List<String> result = new ArrayList<>();
QueryRunner qr = new QueryRunner(CustomDataSource.getInstance());
String sql = "select db_name, table_name from hive.hive_inner_tab_dir_stat where merge_status = 0 AND rownum <= 3 AND avg_filesize > 2097152 AND avg_filesize < 15728640 AND FILE_NUM < 10000000 order by file_num desc, avg_filesize ASC";
//String sql = "SELECT db_name, table_name FROM HIVE_INNER_TAB_DIR_STAT hitds WHERE merge_status = 0 and TABLE_NAME = 'esp_trace_data_hbase_auto' and db_name = 'esp_auto'";
log.info("Producer Connect Oracle!!!");
try {
List<Map<String, Object>> listmap = qr.query(sql, new MapListHandler());
for (Map<String, Object> map : listmap) {
String db = map.get("db_name").toString();
String tableName = map.get("table_name").toString();
String fullName = String.format("%s.%s", db, tableName);
updateMergeState(db, tableName, sdf.format(new Date()), 2);
result.add(fullName);
}
} catch (SQLException e) {
log.error(e, e);
System.exit(-1);
}
return result;
}
/**
* 更新Oracle表合并的状态:2正在合并、1合并完成、0未合并、-1合并失败
*
* @param db
* @param tableName
* @param time
* @param status
*/
public static void updateMergeState(String db, String tableName, String time, int status) {
QueryRunner qr = new QueryRunner(CustomDataSource.getInstance());
String sql = String.format("update hive.hive_inner_tab_dir_stat set merge_status=%d, merge_time='%s' where db_name='%s' and table_name='%s'", status, time, db, tableName);
try {
log.info(sql);
int i = qr.update(sql);
} catch (SQLException e) {
log.error(e, e);
}
}
/**
* 获取表属主
*
* @param db
* @param tableName
* @return String
*/
public static String getTableOwer(String db, String tableName) {
QueryRunner qr = new QueryRunner(CustomDataSource.getInstance());
String sql = String.format("SELECT OWNER FROM hive.TB_HIVE_METADATA_RESULT thmr WHERE thmr.NAME = '%s' AND thmr.TBL_NAME = '%s'", db, tableName);
try {
log.info(sql);
return qr.query(sql, new ScalarHandler<>());
} catch (SQLException e) {
log.error(e, e);
return null;
}
}
/**
* 合并Hive内部表
*
* @param db
* @param tableName
*/
public static void mergeTable(String db, String tableName) {
if (!isHiveTable(db, tableName)) {
log.info(String.format("%s.%s is not hive table, quit!", db, tableName));
return;
}
if (isPartitionedTable(db, tableName)) {
log.info(String.format("%s.%s is partitioned table, merge directly!", db, tableName));
mergePartitionedTable(db, tableName);
return;
}
log.info(String.format("%s.%s is not partitioned table, merge directly!", db, tableName));
mergeNonPartitionedTable(db, tableName);
}
/**
* 合并Hive内部非分区表
*
* @param db
* @param tableName
*/
public static void mergeNonPartitionedTable(String db, String tableName) {
Connection conn = null;
try {
conn = JDBCUtils.getConnection();
String fullName = db + "." + tableName;
String mergeSql = "insert overwrite table $full_name select * from $full_name";
mergeSql = mergeSql.replace("$full_name", fullName);
log.info(mergeSql);
HiveJdbcUtil.execute(conn, mergeSql);
} catch (SQLException | RuntimeException e) {
log.error(e, e);
} finally {
try {
Objects.requireNonNull(conn).close();
} catch (SQLException | RuntimeException e) {
log.error(e, e);
}
}
}
/**
* URL 解码
*
* @param URL
* @return String
*/
public static String decodeString(String URL) {
String urlString = "";
try {
urlString = URLDecoder.decode(URL, "UTF-8");
} catch (UnsupportedEncodingException e) {
log.info(e, e);
}
return urlString;
}
/**
* 获取分区字段
*
* @param db
* @param tableName
* @return List<String>
*/
public static List<String> getPartitionStrList(String db, String tableName) {
QueryRunner qr = new QueryRunner(CustomDataSource.getInstance());
String sql = String.format("select replace(c.part_name, '/', ',') from tbls a, dbs b, partitions c where a.DB_ID=b.DB_ID and a.TBL_ID=c.TBL_ID and b.NAME='%s' and a.TBL_NAME='%s'", db, tableName);
try {
return qr.query(sql, new ColumnListHandler<>(1));
} catch (SQLException e) {
log.error(e, e);
}
return null;
}
/**
* 单分区-合并分区表
*
* @param db
* @param tableName
*/
public static void mergePartitionedTable(String db, String tableName) {
String fullTableName = db + "." + tableName;
List<String> partStrList = getPartitionStrList(db, tableName);
Connection conn = null;
try {
conn = JDBCUtils.getConnection();
for (String partStr : Objects.requireNonNull(partStrList)) {
log.info("分区字符串:" + partStr);
String[] whereStrArray = StringUtils.split(partStr, ",");
String selectStr = null;
String whereStr = null;
String partitionStr = null;
String[] keyAndValue = null;
log.info(String.format("%s表有%s级分区", fullTableName, whereStrArray.length));
if (whereStrArray.length == 0) {
return;
}
if (whereStrArray.length == 1) {
keyAndValue = StringUtils.split(partStr, "=");
selectStr = String.format("`(%s)?+.+`", keyAndValue[0]);
String key = keyAndValue[0];
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
merge-smallfile.zip (22个子文件)
merge-smallfile
pom.xml 6KB
src
test
java
com
yj
AppTest.java 3KB
main
resources
prepare_data.sql 2KB
druid.properties 2KB
log4j.properties 494B
app.properties 254B
hive-jdbc.properties 433B
java
com
yj
demo
ProducerDemo.java 1KB
App.java 460B
ConsumerDemo.java 1KB
ConnectOracle.java 2KB
pc
Consumer.java 3KB
Producer.java 1KB
oracle
DBUtil.java 2KB
CustomDataSource.java 8KB
MergeApp.java 12KB
hive
HiveJdbcPool.java 6KB
HiveJdbcUtil.java 3KB
JDBCUtils.java 3KB
HiveJdbcProperty.java 2KB
hdfs
HdfsChownUtil.java 1KB
CommandUtil.java 2KB
共 22 条
- 1
资源评论
谦蓦
- 粉丝: 168
- 资源: 2
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功