package com.alibaba.datax.plugin.writer.hbase11xwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.Validate;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
public class Hbase11xHelper {
private static final Logger LOG = LoggerFactory.getLogger(Hbase11xHelper.class);
public static org.apache.hadoop.conf.Configuration getHbaseConfiguration(com.alibaba.datax.common.util.Configuration config) {
org.apache.hadoop.conf.Configuration hConfiguration = HBaseConfiguration.create();
if (config.getBool(Key.KERBEROS,false)){
LOG.info("run with KERBEROS {}",config.getBool(Key.KERBEROS));
if (StringUtils.isBlank(config.getString(Key.HBASE_SITE_XML)) || StringUtils.isBlank(config.getString(Key.KEY_TAB_KEY)) || StringUtils.isBlank(config.getString(Key.SYSTEM))) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.REQUIRED_VALUE, "HBASE kerberos 信息不可为空 ");
}
String system = config.getString(Key.SYSTEM);
try {
Map<String, String> systemMap = JSON.parseObject(system, new TypeReference<Map<String, String>>() {});
// 用户配置的 key-value 对 来表示 hbaseConfig
Validate.isTrue(systemMap != null && systemMap.size() !=0, "hbaseConfig不能为空Map结构!");
for (Map.Entry<String, String> entry : systemMap.entrySet()) {
System.setProperty(entry.getKey(), entry.getValue());
}
hConfiguration.addResource(new FileInputStream(new File(config.getString(Key.HBASE_SITE_XML))));
UserGroupInformation.loginUserFromKeytab(config.getString(Key.KEY_TAB_KEY),config.getString(Key.KEY_TAB_VALUE));
UserGroupInformation.setConfiguration(hConfiguration);
} catch (Exception e) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_CONNECTION_ERROR, e);
}
}
String hbaseConfig = config.getString(Key.HBASE_CONFIG);
if (StringUtils.isBlank(hbaseConfig)) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.REQUIRED_VALUE, "写 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息.");
}
try {
Map<String, String> hbaseConfigMap = JSON.parseObject(hbaseConfig, new TypeReference<Map<String, String>>() {});
// 用户配置的 key-value 对 来表示 hbaseConfig
Validate.isTrue(hbaseConfigMap != null && hbaseConfigMap.size() !=0, "hbaseConfig不能为空Map结构!");
for (Map.Entry<String, String> entry : hbaseConfigMap.entrySet()) {
hConfiguration.set(entry.getKey(), entry.getValue());
}
} catch (Exception e) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_CONNECTION_ERROR, e);
}
return hConfiguration;
}
public static org.apache.hadoop.hbase.client.Connection getHbaseConnection(com.alibaba.datax.common.util.Configuration configuration) {
org.apache.hadoop.conf.Configuration hConfiguration = Hbase11xHelper.getHbaseConfiguration(configuration);
org.apache.hadoop.hbase.client.Connection hConnection = null;
try {
hConnection = ConnectionFactory.createConnection(hConfiguration);
} catch (Exception e) {
Hbase11xHelper.closeConnection(hConnection);
throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_CONNECTION_ERROR, e);
}
return hConnection;
}
public static Table getTable(com.alibaba.datax.common.util.Configuration configuration){
String userTable = configuration.getString(Key.TABLE);
long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
org.apache.hadoop.hbase.client.Connection hConnection = Hbase11xHelper.getHbaseConnection(configuration);
TableName hTableName = TableName.valueOf(userTable);
org.apache.hadoop.hbase.client.Admin admin = null;
org.apache.hadoop.hbase.client.Table hTable = null;
try {
admin = hConnection.getAdmin();
Hbase11xHelper.checkHbaseTable(admin,hTableName);
hTable = hConnection.getTable(hTableName);
BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams(hTableName);
bufferedMutatorParams.writeBufferSize(writeBufferSize);
} catch (Exception e) {
Hbase11xHelper.closeTable(hTable);
Hbase11xHelper.closeAdmin(admin);
Hbase11xHelper.closeConnection(hConnection);
throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_TABLE_ERROR, e);
}
return hTable;
}
public static BufferedMutator getBufferedMutator(com.alibaba.datax.common.util.Configuration configuration){
String userTable = configuration.getString(Key.TABLE);
long writeBufferSize = configuration.getLong(Key.WRITE_BUFFER_SIZE, Constant.DEFAULT_WRITE_BUFFER_SIZE);
org.apache.hadoop.conf.Configuration hConfiguration = Hbase11xHelper.getHbaseConfiguration(configuration);
org.apache.hadoop.hbase.client.Connection hConnection = Hbase11xHelper.getHbaseConnection(configuration);
TableName hTableName = TableName.valueOf(userTable);
org.apache.hadoop.hbase.client.Admin admin = null;
BufferedMutator bufferedMutator = null;
try {
admin = hConnection.getAdmin();
Hbase11xHelper.checkHbaseTable(admin,hTableName);
//参考HTable getBufferedMutator()
bufferedMutator = hConnection.getBufferedMutator(
new BufferedMutatorParams(hTableName)
.pool(HTable.getDefaultExecutor(hConfiguration))
.writeBufferSize(writeBufferSize));
} catch (Exception e) {
Hbase11xHelper.closeBufferedMutator(bufferedMutator);
Hbase11xHelper.closeAdmin(admin);
Hbase11xHelper.closeConnection(hConnection);
throw DataXException.asDataXException(Hbase11xWriterErrorCode.GET_HBASE_BUFFEREDMUTATOR_ERROR, e);
}
return bufferedMutator;
}
public static void deleteTable(com.alibaba.datax.common.util.Configuration configuration) {
String userTable = configuration.getString(Key.TABLE);
LOG.info(String.format("由于您配置了deleteType delete,HBasWriter begins to delete table %s .", userTable));
Scan scan = new Scan();
org.apache.hadoop.hbase.client.Table hTable =Hbase11xHelper.getTable(configuration);
ResultScanner scanner = null;
try {
scanner = hTable.getScanner(scan);
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
hTable.delete(new Delete(rr.getRow()));
}
} catch (Exception e) {
throw DataXException.asDataXException(Hbase11xWriterErrorCode.DELETE_HBASE_ERROR, e);
}finally {
if(scanner != null){
scanner.close();
}
Hbase11xHelper.closeTable(hTable);
hbase11xwriter.zip
需积分: 9 85 浏览量
2021-07-23
18:14:59
上传
评论
收藏 14KB ZIP 举报
gelonSun
- 粉丝: 0
- 资源: 4
最新资源
- kernel-ml-6.8.8-1.el7.elrepo.x86-64.rpm
- Labview基本框架之状态机
- HM2309B-VB一款P-Channel沟道SOT23的MOSFET晶体管参数介绍与应用说明
- Git安全实践:保护你的代码仓库个人学习笔记.md
- 自动驾驶定位系列教程九:后端优化.pdf
- 三国志5威力加强版-windows
- HM2309A-VB一款P-Channel沟道SOT23的MOSFET晶体管参数介绍与应用说明
- HM2306-VB一款N-Channel沟道SOT23的MOSFET晶体管参数介绍与应用说明
- Git进阶技巧:提升团队协作效率个人学习笔记.md
- 自动驾驶定位系列教程八:建图系统结构优化.pdf
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
评论0