package com.moon.storagering.service.impl;
import com.google.common.base.Strings;
import com.moon.storagering.common.util.JsonUtil;
import com.moon.storagering.common.util.StorageRingUtil;
import com.moon.storagering.entity.ObjectListResult;
import com.moon.storagering.entity.ObjectMetaData;
import com.moon.storagering.entity.StorageRingObject;
import com.moon.storagering.entity.StorageRingObjectSummary;
import com.moon.storagering.exception.bussness.FileSystemException;
import com.moon.storagering.service.IHdfsService;
import com.moon.storagering.service.IStorageRingStore;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.io.ByteBufferInputStream;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.*;
/**
* @author Chanmoey
* @date 2023年01月05日
*/
public class StorageRingStoreImpl implements IStorageRingStore {
private static final Logger LOGGER = Logger.getLogger(StorageRingStoreImpl.class);
private Connection connection = null;
private IHdfsService hdfsService;
private String zkUrls;
private CuratorFramework zkClient;
public StorageRingStoreImpl(Connection connection, IHdfsService hdfsService, String zkUrls) {
this.connection = connection;
this.hdfsService = hdfsService;
this.zkUrls = zkUrls;
zkClient = CuratorFrameworkFactory.newClient(zkUrls, new ExponentialBackoffRetry(20, 5));
this.zkClient.start();
}
@Override
public void createBucketStore(String bucketName) throws IOException {
// 创建目录表
HBaseService.createTable(connection, StorageRingUtil.getDirTableName(bucketName), StorageRingUtil.getDirColumnFamily(), null);
// 创建文件表
HBaseService.createTable(connection, StorageRingUtil.getObjTableName(bucketName), StorageRingUtil.getObjColumnFamily(), StorageRingUtil.OBJ_REGIONS);
// 将其添加到seq表
Put put = new Put(bucketName.getBytes());
put.addColumn(StorageRingUtil.BUCKET_DIR_SEQ_CF_BYTES, StorageRingUtil.BUCKET_DIR_SEQ_QUALIFIER, Bytes.toBytes(0L));
HBaseService.putRow(connection, StorageRingUtil.BUCKET_DIR_SEQ_TABLE, put);
// 创建hdfs目录
hdfsService.mkDir(StorageRingUtil.FILE_STORE_ROOT + "/" + bucketName);
}
@Override
public void deleteBucketStore(String bucketName) throws IOException {
// 删除目录表和文件表
HBaseService.deleteTable(connection, StorageRingUtil.getDirTableName(bucketName));
HBaseService.deleteTable(connection, StorageRingUtil.getObjTableName(bucketName));
// 删除seq表中的记录
HBaseService.deleteRow(connection, StorageRingUtil.BUCKET_DIR_SEQ_TABLE, bucketName);
// 删除hdfs上的目录
hdfsService.deleteDir(StorageRingUtil.FILE_STORE_ROOT + "/" + bucketName);
}
@Override
public void createSeqTable() throws IOException {
HBaseService.createTable(connection, StorageRingUtil.BUCKET_DIR_SEQ_TABLE, new String[]{StorageRingUtil.BUCKET_DIR_SEQ_CF}, null);
}
@Override
public void put(String bucketName, String key, ByteBuffer content, long length, String mediaType, Map<String, String> properties) throws Exception {
InterProcessMutex lock = null;
// 判断是否创建目录
if (key.endsWith("/")) {
putDir(bucketName, key);
return;
}
// 获取seqid
String dir = key.substring(0, key.lastIndexOf("/") + 1);
String hash = null;
while (hash == null) {
if (!dirExist(bucketName, dir)) {
hash = putDir(bucketName, dir);
} else {
hash = getDirSeqId(bucketName, dir);
}
}
// 上传文件到文件表
// 获取锁
String lockKey = key.replace("/", "_");
lock = new InterProcessMutex(zkClient, "/stroagering/" + bucketName + "/" + lockKey);
lock.acquire();
// 上传文件
String fileKey = hash + "_" + key.substring(key.lastIndexOf("/") + 1);
Put contentPut = new Put(fileKey.getBytes());
// 记录文件属性
if (!Strings.isNullOrEmpty(mediaType)) {
contentPut.addColumn(StorageRingUtil.OBJ_META_CF_BYTES, StorageRingUtil.OBJ_CONT_QUALIFIER, mediaType.getBytes());
}
if (properties != null) {
String prop = JsonUtil.toJson(properties);
contentPut.addColumn(StorageRingUtil.OBJ_META_CF_BYTES, StorageRingUtil.OBJ_PROPS_QUALIFIER, prop.getBytes());
}
contentPut.addColumn(StorageRingUtil.OBJ_META_CF_BYTES, StorageRingUtil.OBJ_LEN_QUALIFIER, Bytes.toBytes(length));
// 判断文件大小,小于20M,存储到hbase,否则存储到hdfs
if (length <= StorageRingUtil.FILE_STORE_THRESHOLD) {
ByteBuffer buffer = ByteBuffer.wrap(StorageRingUtil.OBJ_CONT_QUALIFIER);
contentPut.addColumn(StorageRingUtil.OBJ_CONT_CF_BYTES, buffer, System.currentTimeMillis(), content);
buffer.clear();
} else {
String fileDir = StorageRingUtil.FILE_STORE_ROOT + "/" + bucketName + "/" + hash;
String name = key.substring(key.lastIndexOf("/") + 1);
InputStream inputStream = new ByteBufferInputStream(content);
hdfsService.saveFile(fileDir, name, inputStream, length, (short) 1);
}
HBaseService.putRow(connection, StorageRingUtil.getObjTableName(bucketName), contentPut);
// 释放锁
if (lock != null) {
lock.release();
}
}
@Override
public StorageRingObjectSummary getSummary(String bucketName, String key) throws IOException {
// 判断是否时文件夹
if (key.endsWith("/")) {
Result result = HBaseService.getRow(connection, StorageRingUtil.getDirTableName(bucketName), key);
if (!result.isEmpty()) {
// 读取文件夹的基础属性
return this.dirObject2Summary(result, bucketName, key);
}
return null;
}
// 获取文件
// 先获取文件的父目录
String dir = key.substring(0, key.lastIndexOf("/") + 1);
String seq = getDirSeqId(bucketName, dir);
// 父目录不存在,则说明文件也不存在
if (seq == null) {
return null;
}
// 获取文件的rootKey
String objectKey = seq + "_" + key.substring(key.lastIndexOf("/") + 1);
Result result = HBaseService.getRow(connection, StorageRingUtil.getObjTableName(bucketName), objectKey);
if (result.isEmpty()) {
return null;
}
return this.result2ObjectSummary(result, bucketName, dir);
}
@Override
public List<StorageRingObjectSummary> list(String bucketName, String startKey, String endKey) throws IOException {
return null;
}
@Override
public ObjectListResult listDir(String bucketName, String dir, String start, int maxCount) throws IOException {
// 查询目录表
start = Strings.nullToEmpty(start);
Get get = new Get(Bytes.toBytes(dir));
get.addFamily(StorageRingUtil.DIR_SUBDIR_CF_BYTES);
if (!Strings.isNullOrEmpty(start)) {
get.setFilter(new QualifierFilter(CompareFilter.Com
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
基于HDFS与HBase的对象存储服务.zip (39个子文件)
storage-ring-main
pom.xml 4KB
src
test
java
com
moon
storagering
service
BucketServiceTest.java 853B
StorageRingApplicationTests.java 226B
main
resources
script
storagering.sql 2KB
application.yml 275B
config
exception-code.properties 537B
java
com
moon
storagering
repository
AuthRepository.java 513B
UserInfoRepository.java 432B
BucketRepository.java 631B
StorageRingApplication.java 434B
controller
UserInfoController.java 213B
BucketController.java 301B
service
IBucketService.java 603B
IAuthService.java 326B
IHdfsService.java 560B
IUserInfoService.java 228B
IStorageRingStore.java 1KB
impl
HBaseService.java 6KB
AuthServiceImpl.java 937B
StorageRingStoreImpl.java 21KB
BucketServiceImpl.java 2KB
HdfsServiceImpl.java 3KB
UserInfoServiceImpl.java 657B
common
UnifyResponse.java 2KB
util
StorageRingUtil.java 3KB
CoreUtil.java 1KB
JsonUtil.java 557B
entity
UserInfo.java 483B
ObjectListResult.java 479B
Bucket.java 782B
StorageRingObject.java 1KB
ObjectMetaData.java 543B
Auth.java 519B
StorageRingObjectSummary.java 789B
exception
ExceptionCodeConfiguration.java 709B
bussness
BusinessException.java 260B
FileSystemException.java 243B
GlobalExceptionAdvice.java 4KB
config
StorageRingConfiguration.java 2KB
共 39 条
- 1
资源评论
博士僧小星
- 粉丝: 1912
- 资源: 5876
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- IMG_0796.PNG
- Hyper+Lightning5+SDXL大模型比较推荐
- 基于matlab 小波神经网络的时间序列预测-短时交通流量预测源代码+详细教程
- 基于matlab 思维进化算法优化BP神经网络-非线性函数拟合源代码+详细教程
- macOS植物大战僵尸3D 苹果电脑版.7z
- 基于matlab 极限学习机在回归拟合及分类问题中的应用研究-对比实验源代码+详细教程
- ItemApplicationTest.java
- (PC+WAP)公司注册财务会计类网站pbootcms模板 蓝色律师公证网站源码下载.zip
- 高校校园招聘系统 JAVA+Spring+SpringMVC+MyBatis
- 个人发卡源码,发卡系统,二次元发卡系统,二次元发卡源码,发卡程序,动漫发卡,PHP发卡源码,异次元发卡
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功