package com.aeye.vias.mr.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
public class HbaseClient {
private static Logger logger=LoggerFactory.getLogger(HbaseClient.class);
public static final String ROWKEY="rowkey";
public Configuration configuration;
private volatile static HbaseClient hbaseClient;
public static HbaseClient getInstance() {
if (hbaseClient == null) {
synchronized (HbaseClient.class) {
if (hbaseClient == null) {
hbaseClient = new HbaseClient();
}
}
}
return hbaseClient;
}
public HbaseClient(){
configuration= HBaseConfiguration.create();
}
/**
* 获取连接
* @return
* @throws IOException
*/
public Connection getConnection() throws IOException{
return ConnectionFactory.createConnection(configuration);
}
/**
* 如果表不存在则创建表
* @param tableName 表名
* @param familys 列族列表
* @return 如果表不存在创建了表返回true,表已经存在没有创建表则返回false
* @throws Exception
*/
public boolean createTableIfNotExists(String tableName, String[] familys) throws Exception{
boolean created=false;
Connection connection = null;
Admin admin = null;
try{
connection = getConnection();
admin = connection.getAdmin();
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName));
if(!admin.tableExists(table.getTableName())){
for(String family:familys){
table.setConfiguration(HTableDescriptor.SPLIT_POLICY, "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy");
table.setConfiguration("KeyPrefixRegionSplitPolicy.prefix_length", "6");
table.addFamily(new HColumnDescriptor(family));
}
admin.createTable(table);
created=true;
logger.info("create table("+tableName+").familys("+Arrays.asList(familys)+") success");
}else{
logger.info("table("+tableName+") already exists,don't need to create");
}
}catch (Exception e) {
logger.error("Exception:",e);
throw e;
}finally{
if(admin!=null){
try{
admin.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
if(connection!=null){
try{
connection.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
}
return created;
}
/**
* 清空表内所有数据
* @param tableName
* @throws Exception
*/
public void truncateTable(String tableName) throws Exception{
Connection connection = null;
Admin admin = null;
try{
connection = getConnection();
admin = connection.getAdmin();
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tableName));
if(admin.tableExists(table.getTableName())){
if(admin.isTableEnabled(table.getTableName())){
admin.disableTable(table.getTableName());
}
admin.truncateTable(table.getTableName(),false);
logger.info("truncate table("+tableName+") success");
}else{
logger.info("table("+tableName+") not exists,don't need to truncate");
}
}catch (Exception e) {
logger.error("Exception:",e);
throw e;
}finally{
if(admin!=null){
try{
admin.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
if(connection!=null){
try{
connection.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
}
}
/**
* 插入一条数据
* @param tableName 表名
* @param record 待插入的数据
* @throws Exception
*/
public void insert(String tableName,Map<String,String> record) throws Exception{
List<Map<String,String>> list=new ArrayList<Map<String,String>>();
list.add(record);
insertAll(tableName, list);
}
/**
* 插入多条数据
* @param tableName 表名
* @param list 待插入的数据列表
* @throws Exception
*/
public void insertAll(String tableName,List<Map<String,String>> list) throws Exception{
Connection connection = null;
Table table = null;
try{
connection = getConnection();
table = connection.getTable(TableName.valueOf(tableName));
Put put=null;
List<Put> resultList=new ArrayList<Put>();
for(Map<String,String> map:list){
String rowkey=map.remove(ROWKEY).toString();
put=new Put(Bytes.toBytes(rowkey));
for(Entry<String, String> e:map.entrySet()){
String[] cells=e.getKey().split(":");
put.addColumn(Bytes.toBytes(cells[0]), Bytes.toBytes(cells[1]), Bytes.toBytes(e.getValue()));
}
resultList.add(put);
}
table.put(resultList);
}catch (Exception e) {
logger.error("Exception:",e);
throw e;
}finally{
if(table!=null){
try{
table.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
if(connection!=null){
try{
connection.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
}
}
/**
* 查询数据
* @param tableName 表名
* @param columns 查询的字段名列表,List<String[]{family,qualifier}>,null表示查询所有字段
* @param startRow 开始的rowkey
* @param stopRow 结束的rowkey,不包括此rowkey
* @param limit 只查询前多少行,0表示查询所有数据
* @return
* @throws Exception
*/
public List<Map<String,String>> query(String tableName,List<String[]> columns,String startRow,String stopRow,int limit) throws Exception{
List<Map<String,String>> list = null;
Connection connection = null;
Table table = null;
ResultScanner resultScanner = null;
try{
connection = getConnection();
table = connection.getTable(TableName.valueOf(tableName));
Scan scan=new Scan(Bytes.toBytes(startRow), Bytes.toBytes(stopRow));
scan.setCaching(limit>0 && limit<100?limit:100);
if(limit>0){
scan.setMaxResultSize(limit);
}
if(columns!=null && columns.size()>0){
for(String[] column:columns){
scan.addColumn(Bytes.toBytes(column[0]), Bytes.toBytes(column[1]));
}
}
resultScanner = table.getScanner(scan);
list=new ArrayList<Map<String,String>>();
for(Result result:resultScanner){
Map<String,String> row=new LinkedHashMap<String,String>();
row.put(ROWKEY, Bytes.toString(result.getRow()));
for(Cell cell:result.listCells()){
row.put(Bytes.toString(CellUtil.cloneFamily(cell))+":"+Bytes.toString(CellUtil.cloneQualifier(cell)), Bytes.toString(CellUtil.cloneValue(cell)));
}
list.add(row);
}
}catch (Exception e) {
logger.error("Exception:",e);
throw e;
}finally{
if(resultScanner!=null){
try{
resultScanner.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
if(table!=null){
try{
table.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
if(connection!=null){
try{
connection.close();
}catch (Exception e) {
logger.error("Exception:",e);
}
}
}
return list;
}
/**
* 查询数据
* @param tableName 表名
* @param columns 查询的字段名列表,List<String[]{family,qualifier}>,null表示查询所有字段
* @param limit 只查询前多少行,0表示查询所有数
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
MR-PersonFile.rar (31个子文件)
MR-PersonFile
.gitignore 9B
src
main
resources
core-site.xml 786B
log4j.properties 263B
application.properties 189B
hbase-site.xml 343B
java
com
aeye
vias
mr
client
HbaseClient.java 14KB
FaceCalcAPI.java 7KB
util
ConfigUtils.java 2KB
personfile
PersonFile.java 6KB
.classpath 1KB
.settings
org.eclipse.m2e.core.prefs 90B
org.eclipse.core.resources.prefs 327B
org.eclipse.jdt.core.prefs 736B
pom.xml 3KB
target
classes
META-INF
MANIFEST.MF 115B
maven
com.eye.vias
MR-PersonFile
pom.properties 228B
pom.xml 3KB
com
aeye
vias
mr
client
FaceCalcAPI.class 6KB
HbaseClient.class 15KB
util
ConfigUtils.class 4KB
personfile
PersonFile$ReducerTask.class 3KB
PersonFile.class 2KB
PersonFile$MapperTask.class 7KB
core-site.xml 786B
log4j.properties 263B
application.properties 189B
hbase-site.xml 343B
archive-tmp
maven-archiver
pom.properties 122B
generated-sources
annotations
test-classes
maven-status
maven-compiler-plugin
compile
default-compile
inputFiles.lst 329B
createdFiles.lst 0B
.project 565B
共 31 条
- 1
资源评论
没事进来看看
- 粉丝: 3
- 资源: 11
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功