/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package blue.org.apache.flume.sink.hbase;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedExceptionAction;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.Transaction;
import org.apache.flume.annotations.InterfaceAudience;
import org.apache.flume.auth.FlumeAuthenticationUtil;
import org.apache.flume.auth.PrivilegedExecutor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.hbase.BatchAware;
import org.apache.flume.sink.hbase.HBaseSinkConfigurationConstants;
import org.apache.flume.sink.hbase.HbaseEventSerializer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.util.Bytes;
import org.dom4j.Attribute;
import org.dom4j.Document;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
*
* A simple sink which reads events from a channel and writes them to HBase.
* The Hbase configuration is picked up from the first <tt>hbase-site.xml</tt>
* encountered in the classpath. This sink supports batch reading of
* events from the channel, and writing them to Hbase, to minimize the number
* of flushes on the hbase tables. To use this sink, it has to be configured
* with certain mandatory parameters:<p>
* <tt>table: </tt> The name of the table in Hbase to write to. <p>
* <tt>columnFamily: </tt> The column family in Hbase to write to.<p>
* This sink will commit each transaction if the table's write buffer size is
* reached or if the number of events in the current transaction reaches the
* batch size, whichever comes first.<p>
* Other optional parameters are:<p>
* <tt>serializer:</tt> A class implementing {@link HbaseEventSerializer}.
* An instance of
* this class will be used to write out events to hbase.<p>
* <tt>serializer.*:</tt> Passed in the configure() method to serializer
* as an object of {@link org.apache.flume.Context}.<p>
* <tt>batchSize: </tt>This is the batch size used by the client. This is the
* maximum number of events the sink will commit per transaction. The default
* batch size is 100 events.
* <p>
*
* <strong>Note: </strong> While this sink flushes all events in a transaction
* to HBase in one shot, Hbase does not guarantee atomic commits on multiple
* rows. So if a subset of events in a batch are written to disk by Hbase and
* Hbase fails, the flume transaction is rolled back, causing flume to write
* all the events in the transaction all over again, which will cause
* duplicates. The serializer is expected to take care of the handling of
* duplicates etc. HBase also does not support batch increments, so if
* multiple increments are returned by the serializer, then HBase failure
* will cause them to be re-written, when HBase comes back up.
*/
public class HBaseSink extends AbstractSink implements Configurable {
private String tableName;
private byte[] columnFamily;
private HTable table;
private long batchSize;
private Configuration config;
private static final Logger logger = LoggerFactory.getLogger(HBaseSink.class);
private HbaseEventSerializer serializer;
private String eventSerializerType;
private Context serializerContext;
private String kerberosPrincipal;
private String kerberosKeytab;
private boolean enableWal = true;
private boolean batchIncrements = false;
private Method refGetFamilyMap = null;
private SinkCounter sinkCounter;
private PrivilegedExecutor privilegedExecutor;
// Internal hooks used for unit testing.
private DebugIncrementsCallback debugIncrCallback = null;
public String hui() throws Exception{
InetAddress ia=null;
try {
ia=ia.getLocalHost();
} catch (UnknownHostException e7) {
// TODO Auto-generated catch block
e7.printStackTrace();
}
String localip=ia.getHostAddress();
SAXReader reader = new SAXReader();
Document document = null;
document = reader.read("/home/hadoop/dao.xml");
List e1 = document.selectNodes("/configuration/ip[@name='"+localip+"']/biao/@name");
String tableName = null;
tableName = ((Attribute)e1.get(0)).getText();
return tableName;
}
public String qiang() throws Exception{
InetAddress ia=null;
try {
ia=ia.getLocalHost();
} catch (UnknownHostException e7) {
// TODO Auto-generated catch block
e7.printStackTrace();
}
String localip=ia.getHostAddress();
SAXReader reader = new SAXReader();
Document document = null;
String columnFamily = null;
document = reader.read("/home/hadoop/dao.xml");
List e1 = document.selectNodes("/configuration/ip[@name='"+localip+"']/biao/@name");
String u = ((Attribute)e1.get(0)).getText();
List e2 = document.selectNodes("/configuration/ip[@name='"+localip+"']/biao[@name='"+tableName+"']/liezus/zu/@name");
columnFamily = ((Attribute)e2.get(0)).getText();
return columnFamily;
}
public HBaseSink(){
this(HBaseConfiguration.create());
}
public HBaseSink(Configuration conf){
this.config = conf;
}
@VisibleForTesting
@InterfaceAudience.Private
HBaseSink(Configuration conf, DebugIncrementsCallback cb) {
this(conf);
this.debugIncrCallback = cb;
}
@Override
public void start(){
Preconditions.checkArgument(table == null, "Please call stop " +
"before calling start on an old instance.");
try {
privilegedExecutor = FlumeAuthenticationUtil.getAuthenticator(kerberosPrincipal, kerberosKeytab);
} catch (Exception ex) {
sinkCounter.incrementConnectionFailedCount();
throw new FlumeException("Failed to login to HBase using "
+ "provided credentials.", ex);
}
try {
table = privilegedExecutor.execute(new PrivilegedExceptionAction<HTable>() {
@Override
public HTable run() throws Exception {
HTable table = new HTable(config, tableName);
table.setAutoFlush(false);
// Flush is controlled by us. This ensures that HBase changing
// their criteria for flushing does not change how we flush.
return table;
}
});
} catch (Exception e) {
sinkCounter.incrementConnectionFailedCount();
logger.error("Could not load table, " + tableNam
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
flume-ng-core5.zip (38个子文件)
flume-ng-core5
.project 390B
bin
cn
huyanping
flume
sinks
SafeRollingFileSink.class 10KB
SafeRollingFileSink$1.class 1KB
SafePathManager.class 1006B
org
apache
flume
chiwei
filemonitor
PositionLog.class 4KB
FileMonitorSource$FileMonitorThread.class 6KB
Constants.class 809B
FileMonitorSource.class 9KB
com
tcloud
flume
AsyncHbaseLogEventSerializer.class 6KB
hui
avrosource
AvroSource$AdvancedChannelPipelineFactory.class 5KB
AvroSource$1.class 1KB
AvroSource.class 19KB
AvroSource$2.class 846B
avrosink
AvroSink.class 868B
AbstractRpcSink.class 11KB
AbstractRpcSink$1.class 795B
bluedon
org
apache
flume
sink
hbase
HBaseSink$1.class 1KB
HBaseSink$3.class 2KB
HBaseSink$DebugIncrementsCallback.class 566B
HBaseSink$2.class 1KB
HBaseSink.class 21KB
HBaseSink$4.class 2KB
dao.xml 2KB
.settings
org.eclipse.core.resources.prefs 361B
org.eclipse.jdt.core.prefs 629B
src
cn
huyanping
flume
sinks
SafeRollingFileSink.java 9KB
SafePathManager.java 957B
org
apache
flume
chiwei
filemonitor
Constants.java 445B
PositionLog.java 3KB
FileMonitorSource.java 8KB
blue
org
apache
flume
sink
hbase
HBaseSink.java 23KB
com
tcloud
flume
AsyncHbaseLogEventSerializer.java 4KB
hui
avrosource
AvroSource.java 22KB
avrosink
AbstractRpcSink.java 14KB
AvroSink.java 4KB
.classpath 25KB
lib
jaxen-1.1-beta-7.jar 222KB
dom4j-1.6.1.jar 307KB
共 38 条
- 1
资源评论
小强签名设计
- 粉丝: 446
- 资源: 27
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功