没有合适的资源?快使用搜索试试~ 我知道了~
Spark读取HbaseRDD的过程及源码解析
需积分: 1 0 下载量 170 浏览量
2023-04-07
17:16:33
上传
评论
收藏 136KB PDF 举报
温馨提示
试读
6页
Spark读取HbaseRDD的过程及源码解析
资源推荐
资源详情
资源评论
浪尖 qq 技术交流群 459898801 224209501
浪尖 qq 技术交流群 459898801 224209501
Spark 读取 HbaseRDD 的过程
1,构建 hbaseRDD
val hbaseRDD = sc.newAPIHadoopRDD(confHbase, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
2,构建 NewHadoopRDD
new NewHadoopRDD(this, fClass, kClass, vClass, jconf)
3,getPartitions 方法
Table 的初始化
val inputFormat = inputFormatClass.newInstance
inputFormat match {
case configurable: Configurable =>
configurable.setConf(_conf)
case _ =>
}
TableInputFormat 的 setConf 方法,给 InputFormat 设置表
this.conf = configuration;
String tableName = conf.get(
INPUT_TABLE
);
try {
setHTable(new HTable(new Configuration(conf), tableName));
} catch (Exception e) {
LOG.error(StringUtils.
stringifyException
(e));
}
初始化 scan,会先会获取用户设置的 scan 方法,然后再跟进多种情况进行构建。
Scan scan = null;
if (conf.get(
SCAN
) != null) {
try {
scan = TableMapReduceUtil.
convertStringToScan
(conf.get(
SCAN
));
} catch (IOException e) {
LOG.error("An error occurred.", e);
}
} else {
try {
scan = new Scan();
if (conf.get(
SCAN_ROW_START
) != null) {
scan.setStartRow(Bytes.
toBytes
(conf.get(
SCAN_ROW_START
)));
}
if (conf.get(
SCAN_ROW_STOP
) != null) {
scan.setStopRow(Bytes.
toBytes
(conf.get(
SCAN_ROW_STOP
)));
}
资源评论
小萝卜算子
- 粉丝: 68
- 资源: 20
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功