/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.store;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.*;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.ha.HAService;
import org.apache.rocketmq.store.schedule.ScheduleMessageService;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Store all metadata downtime for recovery, data protection reliability
*/
public class CommitLog {
// Message's MAGIC CODE daa320a7
public final static int MESSAGE_MAGIC_CODE = -626843481;
protected static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
// End of file empty MAGIC CODE cbd43194
protected final static int BLANK_MAGIC_CODE = -875286124;
//文件队列,通过它来获取MappedFile(代表commitLog文件夹)
protected final MappedFileQueue mappedFileQueue;
protected final DefaultMessageStore defaultMessageStore;
//刷盘线程:同步刷盘和异步刷盘 GroupCommitService同步刷盘线程,FlushRealTimeService异步刷盘线程
private final FlushCommitLogService flushCommitLogService;
//If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
//暂存池专用缓存刷盘线程:CommitRealTimeService专用写入缓存writeBuffer提交线程(需要开启writeBuffer才起作用)
private final FlushCommitLogService commitLogService;
private final AppendMessageCallback appendMessageCallback;
private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
//topic每个分区的消费偏移量(一个map结构)
protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
protected volatile long confirmOffset = -1L;
private volatile long beginTimeInLock = 0;
//锁:可重入锁 和 自旋锁
protected final PutMessageLock putMessageLock;
//CommitLog初始化,开启后台刷盘线程
public CommitLog(final DefaultMessageStore defaultMessageStore) {
this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
this.defaultMessageStore = defaultMessageStore;
//刷盘类型:同步刷盘、异步刷盘
if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
//开启同步刷盘线程
this.flushCommitLogService = new GroupCommitService();
} else {
//开启异步刷盘线程
this.flushCommitLogService = new FlushRealTimeService();
}
//暂存池异步刷盘:专用缓存writeBuffer刷盘提交线程(将写入暂存池中的数据,进行异步刷盘操作)
this.commitLogService = new CommitRealTimeService();
this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
@Override
protected MessageExtBatchEncoder initialValue() {
return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
//put数据写入锁:重入锁 或 自旋锁,默认是自旋锁
this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
//启动刷盘线程(GroupCommitService同步刷盘线程,FlushRealTimeService异步刷盘线程)
public void start() {
//启动刷盘线程
this.flushCommitLogService.start();
//专用缓存提交:根据暂存池开关判断是否启动 writeBuffer的提交线程(专用缓存打开相当于加了一个二级缓存writeBuffer,没打开,只有一个一级缓存pageCache)
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.start();
}
}
public void shutdown() {
if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
this.commitLogService.shutdown();
}
this.flushCommitLogService.shutdown();
}
public long flush() {
//提交专用缓存
this.mappedFileQueue.commit(0);
//刷盘
this.mappedFileQueue.flush(0);
return this.mappedFileQueue.getFlushedWhere();
}
public long getMaxOffset() {
return this.mappedFileQueue.getMaxOffset();
}
public long remainHowManyDataToCommit() {
return this.mappedFileQueue.remainHowManyDataToCommit();
}
public long remainHowManyDataToFlush() {
return this.mappedFileQueue.remainHowManyDataToFlush();
}
//删除过期的commitLog文件
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
/**
* Read CommitLog data, use data replication
*/
public SelectMappedBufferResult getData(final long offset) {
return this.getData(offset, offset == 0);
}
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);
if (mappedFile != null) {
int pos = (int) (offset % mappedFileSize);
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);
return result;
}
return null;
}
/**
* When the normal exit, data recovery, all memory data have been flush
*/
public void recoverNormally(long maxPhyOffsetOfConsumeQueue) {
boolean checkCRCOnRecover = this.defau
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
该项目为RocketMQ-4.7.1版本的Java语言实现,总计包含1176个文件,涵盖979个Java源文件、54个Markdown文档、35个XML配置文件、25个PNG图片文件、15个Shell脚本、13个YAML文件、12个属性文件、8个批处理文件、6个PEM文件和4个配置文件。源码阅读笔记详实记录了其设计细节,涉及Java、Shell和Python等多种编程语言。
资源推荐
资源详情
资源评论
收起资源包目录
基于Java语言的RocketMQ-4.7.1版本设计源码阅读心得分享 (1175个子文件)
org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener 68B
org.apache.rocketmq.acl.AccessValidator 50B
org.apache.rocketmq.acl.AccessValidator 50B
BUILDING 997B
runbroker.cmd 2KB
runserver.cmd 2KB
tools.cmd 2KB
mqshutdown.cmd 1KB
play.cmd 1KB
mqnamesrv.cmd 1KB
mqbroker.cmd 1KB
mqadmin.cmd 1KB
broker-n0.conf 1KB
broker-n1.conf 1KB
broker-n2.conf 1KB
broker.conf 949B
.gitignore 165B
CommitLog.java 100KB
MQClientAPIImpl.java 100KB
DefaultMessageStore.java 92KB
AdminBrokerProcessor.java 81KB
DefaultMQProducerImpl.java 73KB
DefaultMQPushConsumerImpl.java 59KB
BrokerController.java 55KB
MQClientInstance.java 55KB
DefaultMQAdminExtImpl.java 48KB
DefaultMQProducer.java 46KB
DefaultLitePullConsumerImpl.java 45KB
SelectorParser.java 39KB
LoggingBuilder.java 39KB
SendMessageProcessor.java 36KB
RouteInfoManager.java 36KB
DefaultMQPushConsumer.java 34KB
PullMessageProcessor.java 34KB
DefaultMQPullConsumerImpl.java 34KB
SelectorParserTokenManager.java 32KB
NettyRemotingAbstract.java 32KB
DLedgerCommitLog.java 31KB
NettyRemotingClient.java 31KB
PlainAccessValidatorTest.java 30KB
DefaultLitePullConsumerTest.java 29KB
ConsumeMessageOrderlyService.java 27KB
DefaultMessageStoreTest.java 26KB
MappedFile.java 26KB
BrokerConfig.java 25KB
DefaultRequestProcessor.java 25KB
ConsumeMessageConcurrentlyService.java 25KB
RebalanceImpl.java 25KB
ConsumeQueue.java 24KB
DefaultMQAdminExt.java 24KB
HAService.java 24KB
MappedFileQueue.java 23KB
DefaultMessageStoreCleanFilesTest.java 23KB
MessageStoreConfig.java 23KB
DefaultMQAdminExtTest.java 22KB
NettyRemotingServer.java 22KB
TransactionalMessageServiceImpl.java 22KB
MQAdminImpl.java 21KB
MQClientAPIImplTest.java 21KB
DefaultMQProducerTest.java 21KB
ScheduleMessageService.java 20KB
StoreStatsService.java 20KB
UtilAll.java 20KB
TopicConfigManager.java 20KB
MessageDecoder.java 20KB
PlainPermissionManager.java 19KB
ConsumeQueueExt.java 19KB
RemoteAddressStrategyTest.java 19KB
RemotingCommand.java 18KB
ConsumerFilterManager.java 18KB
BrokerOuterAPI.java 18KB
TransactionProducer.java 18KB
MixAll.java 17KB
ProcessQueue.java 17KB
DefaultMQPullConsumer.java 17KB
AsyncTraceDispatcher.java 17KB
ComparisonExpression.java 16KB
ReplyMessageProcessor.java 16KB
ExpressionTest.java 16KB
TransactionalMessageBridge.java 16KB
HAConnection.java 16KB
AbstractSendMessageProcessor.java 16KB
InnerLoggerFactory.java 16KB
DefaultLitePullConsumer.java 15KB
DefaultMQPushConsumerTest.java 15KB
DefaultMQConsumerWithTraceTest.java 15KB
MessageStoreWithFilterTest.java 15KB
AllocateMappedFileService.java 15KB
MQAdminExt.java 15KB
ClientRemotingProcessor.java 15KB
IndexService.java 14KB
Broker2Client.java 14KB
SendMessageProcessorTest.java 14KB
MQVersion.java 14KB
TlsTest.java 14KB
SimpleCharStream.java 14KB
EndTransactionProcessor.java 13KB
IndexFile.java 13KB
Logger.java 13KB
MonitorService.java 13KB
共 1175 条
- 1
- 2
- 3
- 4
- 5
- 6
- 12
资源评论
wjs2024
- 粉丝: 2152
- 资源: 5437
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- 【java毕业设计】停车场管理系统源码(springboot+vue+mysql+说明文档).zip
- TIdHTTP控件 Indy SSL Lib 库
- 【java毕业设计】高仿小米电子商城项目模板源码(springboot+vue+mysql+说明文档).zip
- 5G建设和AI技术推动下,中证5G通信ETF的投资价值探讨
- Python项目之淘宝模拟登录.zip
- 中国地区数据要素化水平(2006-2022年).xlsx
- 全国数据要素化水平(2010-2023年).xlsx
- 课程设计项目:python+QT实现的小型编译器.zip
- (源码)基于AVR ATmega644的智能卡AES解密系统.zip
- (源码)基于C++插件框架的计算与打印系统.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功