package com.ruyuan.dfs.namenode.server;
import com.google.protobuf.InvalidProtocolBufferException;
import com.ruyuan.dfs.common.Constants;
import com.ruyuan.dfs.common.FileInfo;
import com.ruyuan.dfs.common.NettyPacket;
import com.ruyuan.dfs.common.enums.CommandType;
import com.ruyuan.dfs.common.enums.NameNodeLaunchMode;
import com.ruyuan.dfs.common.enums.NodeType;
import com.ruyuan.dfs.common.enums.PacketType;
import com.ruyuan.dfs.common.exception.NameNodeException;
import com.ruyuan.dfs.common.exception.RequestTimeoutException;
import com.ruyuan.dfs.common.metrics.Prometheus;
import com.ruyuan.dfs.common.network.AbstractChannelHandler;
import com.ruyuan.dfs.common.network.RequestWrapper;
import com.ruyuan.dfs.common.network.file.FilePacket;
import com.ruyuan.dfs.common.network.file.FileReceiveHandler;
import com.ruyuan.dfs.common.utils.DefaultScheduler;
import com.ruyuan.dfs.common.utils.NetUtils;
import com.ruyuan.dfs.common.utils.PrettyCodes;
import com.ruyuan.dfs.model.backup.*;
import com.ruyuan.dfs.model.client.*;
import com.ruyuan.dfs.model.common.DataNode;
import com.ruyuan.dfs.model.datanode.*;
import com.ruyuan.dfs.model.namenode.*;
import com.ruyuan.dfs.namenode.config.NameNodeConfig;
import com.ruyuan.dfs.namenode.datanode.DataNodeInfo;
import com.ruyuan.dfs.namenode.datanode.DataNodeManager;
import com.ruyuan.dfs.namenode.editslog.EditLogWrapper;
import com.ruyuan.dfs.namenode.fs.CalculateResult;
import com.ruyuan.dfs.namenode.fs.DiskNameSystem;
import com.ruyuan.dfs.namenode.fs.Node;
import com.ruyuan.dfs.namenode.rebalance.RemoveReplicaTask;
import com.ruyuan.dfs.namenode.rebalance.ReplicaTask;
import com.ruyuan.dfs.namenode.server.tomcat.domain.User;
import com.ruyuan.dfs.namenode.shard.ShardingManager;
import com.ruyuan.dfs.namenode.shard.controller.ControllerManager;
import com.ruyuan.dfs.namenode.shard.peer.PeerNameNode;
import com.ruyuan.dfs.namenode.shard.peer.PeerNameNodes;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.socket.SocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
/**
* NameNode网络请求接口处理器
*
* @author Sun Dasheng
*/
@Slf4j
public class NameNodeApis extends AbstractChannelHandler {
private final DefaultScheduler defaultScheduler;
private final ControllerManager controllerManager;
private final UserManager userManager;
private final DiskNameSystem diskNameSystem;
private final ShardingManager shardingManager;
private final NameNodeConfig nameNodeConfig;
private final PeerNameNodes peerNameNodes;
private final DataNodeManager dataNodeManager;
private final ThreadPoolExecutor executor;
protected int nodeId;
private final NameNodeLaunchMode mode;
private final FetchEditLogBuffer fetchEditLogBuffer;
private final AtomicBoolean slotsChanged = new AtomicBoolean(false);
private Map<Integer, Integer> slots;
private final FileReceiveHandler fileReceiveHandler;
private BackupNodeInfoHolder backupNodeInfoHolder;
public NameNodeApis(NameNodeConfig nameNodeConfig, DataNodeManager dataNodeManager, PeerNameNodes peerNameNodes,
ShardingManager shardingManager, DiskNameSystem diskNameSystem, DefaultScheduler defaultScheduler,
UserManager userManager, ControllerManager controllerManager) {
this.dataNodeManager = dataNodeManager;
this.peerNameNodes = peerNameNodes;
this.nameNodeConfig = nameNodeConfig;
this.shardingManager = shardingManager;
this.diskNameSystem = diskNameSystem;
this.nodeId = nameNodeConfig.getNameNodeId();
this.mode = nameNodeConfig.getMode();
this.userManager = userManager;
this.peerNameNodes.setNameNodeApis(this);
this.controllerManager = controllerManager;
this.defaultScheduler = defaultScheduler;
this.fetchEditLogBuffer = new FetchEditLogBuffer(diskNameSystem);
this.shardingManager.addOnSlotAllocateCompletedListener(slots -> {
if (slotsChanged.compareAndSet(false, true)) {
this.slots = slots;
}
});
FsImageFileTransportCallback fsImageFileTransportCallback = new FsImageFileTransportCallback(nameNodeConfig,
defaultScheduler, diskNameSystem);
this.fileReceiveHandler = new FileReceiveHandler(fsImageFileTransportCallback);
this.executor = new ThreadPoolExecutor(nameNodeConfig.getNameNodeApiCoreSize(), nameNodeConfig.getNameNodeApiMaximumPoolSize(),
60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(nameNodeConfig.getNameNodeApiQueueSize()));
}
public BackupNodeInfoHolder getBackupNodeInfoHolder() {
return backupNodeInfoHolder;
}
@Override
protected Set<Integer> interestPackageTypes() {
return new HashSet<>();
}
@Override
protected Executor getExecutor() {
return executor;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
String token = userManager.getTokenByChannel(ctx.channel());
String username = userManager.logout(ctx.channel());
if (username != null && token != null) {
NettyPacket nettyPacket = NettyPacket.buildPacket(new byte[0], PacketType.CLIENT_LOGOUT);
nettyPacket.setUsername(username);
nettyPacket.setUserToken(token);
broadcast(nettyPacket);
}
if (backupNodeInfoHolder != null && backupNodeInfoHolder.match(ctx.channel())) {
backupNodeInfoHolder = null;
}
ctx.fireChannelInactive();
}
@Override
protected boolean handlePackage(ChannelHandlerContext ctx, NettyPacket request) {
boolean consumedMsg = peerNameNodes.onMessage(request);
if (consumedMsg) {
return true;
}
if (request.isError()) {
// 在请求转发的情况下,如果目标NameNode节点发生未知异常,然后返回的结果是异常的,
// 而源NameNode正常处理流程会出现空指针异常,从而出现死循环。
log.warn("收到一个异常请求或响应, 丢弃不进行处理:[request={}]", request.getHeader());
return true;
}
PacketType packetType = PacketType.getEnum(request.getPacketType());
Prometheus.incCounter("namenode_net_package_inbound_count", "NameNode收到的请求数量");
Prometheus.incCounter("namenode_net_package_inbound_bytes", "NameNode收到的请求大小", request.getBody().length);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
RequestWrapper requestWrapper = new RequestWrapper(ctx, request, nodeId, bodyLength -> {
Prometheus.incCounter("namenode_net_package_outbound_count", "NameNode返回的响应数量");
Prometheus.incCounter("namenode_net_package_outbound_bytes", "NameNode返回的响应大小", bodyLength);
stopWatch.stop();
long time = stopWatch.getTime();
Prometheus.gauge("namenode_request_times", "NameNode处理请求耗时", "requestType", packetType.getDescription(), time);
});
try {
switch (packetType) {
case DATA_NODE_REGISTER:
handleDataNodeRegisterRequest(requestWrapper);
break;
case HEART_BRET:
handleDataNodeHeartbeatRequest(requestWrapper);
break;
case MKDIR:
handleMkdirRequest(requestWrapper);
break;
case FETCH_EDIT_LOG:
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
本资源提供了一套基于Java的自研分布式小文件存储系统的设计源码,包含272个文件,其中包括183个Java源代码文件,26个PNG图片文件,以及15个JavaScript脚本文件。此外,还包括12个XML配置文件,6个Git忽略文件,5个Shell脚本文件,以及5个Protocol Buffers配置文件。这些文件详细展示了如何使用Java、JavaScript、Shell、CSS和HTML等技术构建一个分布式小文件存储系统,该系统支持存储几KB到几百MB之间的文件,非常适合用于学习和参考Java项目的开发。
资源推荐
资源详情
资源评论
收起资源包目录
基于Java的自研分布式小文件存储系统设计源码 (271个子文件)
index.css 366B
Index.css 335B
login.css 129B
App.css 65B
.gitignore 363B
.gitignore 341B
.gitignore 308B
.gitignore 308B
.gitignore 308B
.gitignore 308B
index.html 2KB
favicon.ico 1KB
NameNodeApis.java 59KB
DataNodeManager.java 22KB
FileSystemImpl.java 19KB
DispatcherServlet.java 18KB
ControllerManager.java 17KB
PeerNameNodes.java 13KB
ConsoleTable.java 12KB
PeerDataNodes.java 12KB
TextFormat.java 11KB
NameNodeClient.java 11KB
FsEditLog.java 10KB
NetClient.java 9KB
UserManager.java 9KB
FileUtil.java 9KB
RemoteController.java 9KB
UserController.java 9KB
StorageManager.java 9KB
RebalanceManager.java 9KB
LocalController.java 8KB
NameNode.java 8KB
NodeRoleSwitcher.java 8KB
NettyPacket.java 8KB
FsDirectory.java 8KB
NameNodeConfig.java 8KB
NodesController.java 7KB
AbstractController.java 6KB
DataNode.java 6KB
NameNodeClient.java 6KB
RollingWindow.java 6KB
AbstractFsNameSystem.java 6KB
StringUtils.java 6KB
AbstractCommand.java 6KB
HttpFileServerHandler.java 5KB
DefaultChannelHandler.java 5KB
DataNodeApis.java 5KB
DiskNameSystem.java 5KB
RebalanceSlotInfo.java 5KB
BackupNode.java 5KB
NetServer.java 5KB
VariablePathParser.java 5KB
DefaultFileSendTask.java 5KB
FileDownloadServlet.java 4KB
Node.java 4KB
FileSystem.java 4KB
NodeCommand.java 4KB
FsImage.java 4KB
PacketType.java 4KB
FsDirectoryTest.java 4KB
Prometheus.java 4KB
FileTransportClient.java 4KB
FsImageCheckPointer.java 4KB
FetchEditLogBuffer.java 4KB
RequestPromise.java 4KB
TomcatServer.java 4KB
User.java 4KB
FileAppender.java 4KB
DataNodeInfo.java 4KB
DefaultScheduler.java 3KB
ExportCommand.java 3KB
TrashPolicyDefault.java 3KB
SyncRequestSupport.java 3KB
FileSystemTest.java 3KB
MetricsHandler.java 3KB
FilePacket.java 3KB
ReplicateManager.java 3KB
ImportCommand.java 3KB
Controller.java 3KB
AbstractChannelHandler.java 3KB
FileReceiveHandler.java 3KB
ShardingManager.java 3KB
BackupNodeConfig.java 3KB
PeerNameNodeServer.java 2KB
EditsLogFetcher.java 2KB
CommandReader.java 2KB
DfsCommand.java 2KB
FsImageClearTask.java 2KB
RequestWrapper.java 2KB
BackupNodeManager.java 2KB
DataNodeConfig.java 2KB
EditLogWrapper.java 2KB
MetricsServlet.java 2KB
EditLogBuffer.java 2KB
DataNodeVO.java 2KB
DataNodeServer.java 2KB
InMemoryNameSystem.java 2KB
ProgressBar.java 2KB
UserFilesNode.java 2KB
DateUtils.java 2KB
共 271 条
- 1
- 2
- 3
资源评论
沐知全栈开发
- 粉丝: 4745
- 资源: 3374
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功