/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.client.impl;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.consumer.PullCallback;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.impl.consumer.PullResultExt;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.*;
import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.*;
import org.apache.rocketmq.common.protocol.header.*;
import org.apache.rocketmq.common.protocol.header.filtersrv.RegisterMessageFilterClassRequestHeader;
import org.apache.rocketmq.common.protocol.header.namesrv.*;
import org.apache.rocketmq.common.protocol.heartbeat.HeartbeatData;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.*;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
import org.apache.rocketmq.remoting.netty.NettyRemotingClient;
import org.apache.rocketmq.remoting.netty.ResponseFuture;
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.slf4j.Logger;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* MQClient API实现
*/
public class MQClientAPIImpl {
private final static Logger log = ClientLogger.getLog();
public static boolean sendSmartMsg =
Boolean.parseBoolean(System.getProperty("org.apache.rocketmq.client.sendSmartMsg", "true"));
static {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
}
/**
* 远程调用Client
*/
private final RemotingClient remotingClient;
private final TopAddressing topAddressing;
private final ClientRemotingProcessor clientRemotingProcessor;
private String nameSrvAddr = null;
private ClientConfig clientConfig;
public MQClientAPIImpl(final NettyClientConfig nettyClientConfig, final ClientRemotingProcessor clientRemotingProcessor,
RPCHook rpcHook, final ClientConfig clientConfig) {
this.clientConfig = clientConfig;
topAddressing = new TopAddressing(MixAll.WS_ADDR, clientConfig.getUnitName());
this.remotingClient = new NettyRemotingClient(nettyClientConfig, null);
this.clientRemotingProcessor = clientRemotingProcessor;
this.remotingClient.registerRPCHook(rpcHook);
this.remotingClient.registerProcessor(RequestCode.CHECK_TRANSACTION_STATE, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.GET_CONSUMER_RUNNING_INFO, this.clientRemotingProcessor, null);
this.remotingClient.registerProcessor(RequestCode.CONSUME_MESSAGE_DIRECTLY, this.clientRemotingProcessor, null);
}
public List<String> getNameServerAddressList() {
return this.remotingClient.getNameServerAddressList();
}
public RemotingClient getRemotingClient() {
return remotingClient;
}
public String fetchNameServerAddr() {
try {
String addrs = this.topAddressing.fetchNSAddr();
if (addrs != null) {
if (!addrs.equals(this.nameSrvAddr)) {
log.info("name server address changed, old=" + this.nameSrvAddr + ", new=" + addrs);
this.updateNameServerAddressList(addrs);
this.nameSrvAddr = addrs;
return nameSrvAddr;
}
}
} catch (Exception e) {
log.error("fetchNameServerAddr Exception", e);
}
return nameSrvAddr;
}
public void updateNameServerAddressList(final String addrs) {
List<String> lst = new ArrayList<String>();
String[] addrArray = addrs.split(";");
for (String addr : addrArray) {
lst.add(addr);
}
this.remotingClient.updateNameServerAddressList(lst);
}
public void start() {
this.remotingClient.start();
}
public void shutdown() {
this.remotingClient.shutdown();
}
public void createSubscriptionGroup(final String addr, final SubscriptionGroupConfig config, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
byte[] body = RemotingSerializable.encode(config);
request.setBody(body);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
没有合适的资源?快使用搜索试试~ 我知道了~
基于芋道源码的rocketmq源码版本再次对核心功能实现进行详细的注解标识,一款低延迟、可靠、可扩展、易用的面向消息的中间件
共751个文件
java:673个
xml:33个
sh:14个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 179 浏览量
2023-11-23
16:35:26
上传
评论
收藏 1.24MB ZIP 举报
温馨提示
Apache RocketMQ 是阿里巴巴海量消息业务诞生的一款低延迟、可靠、可扩展、易用的面向消息的中间件。Pub/Sub 和 P2P 消息传递模型,在同一队列中实现可靠的 FIFO 和严格的顺序消息传递,长拉队列模型,也支持推流消费风格,单队列百万消息累积能力,通过各种消息传递协议,如JMS,MQTT等。,分布式高可用部署架构,满足至少一次消息投递语义,用于隔离测试和云隔离群集的 Docker 映像,功能丰富的管理仪表板,用于配置、指标和监控
资源推荐
资源详情
资源评论
收起资源包目录
基于芋道源码的rocketmq源码版本再次对核心功能实现进行详细的注解标识,一款低延迟、可靠、可扩展、易用的面向消息的中间件 (751个子文件)
BUILDING 928B
broker.conf 949B
DISCLAIMER 524B
.gitignore 165B
MQClientAPIImpl.java 92KB
DefaultMessageStore.java 76KB
AdminBrokerProcessor.java 62KB
CommitLog.java 61KB
DefaultMQProducerImpl.java 58KB
MQClientInstance.java 53KB
DefaultMQPushConsumerImpl.java 52KB
DefaultMQAdminExtImpl.java 45KB
RouteInfoManager.java 35KB
BrokerController.java 33KB
PullMessageProcessor.java 31KB
DefaultMQPullConsumerImpl.java 29KB
DefaultMQProducer.java 28KB
ConsumeMessageOrderlyService.java 28KB
SendMessageProcessor.java 27KB
NettyRemotingClient.java 26KB
HAService.java 26KB
ConsumeMessageConcurrentlyService.java 23KB
RebalanceImpl.java 23KB
MappedFile.java 23KB
DefaultRequestProcessor.java 23KB
DefaultMQAdminExtTest.java 22KB
ConsumeQueue.java 21KB
MappedFileQueue.java 21KB
MessageStoreConfig.java 21KB
DefaultMQAdminExt.java 21KB
DefaultMQPushConsumer.java 21KB
MQAdminImpl.java 20KB
NettyRemotingAbstract.java 20KB
StoreStatsService.java 20KB
TopicConfigManager.java 19KB
RemotingCommand.java 18KB
ProcessQueue.java 17KB
ScheduleMessageService.java 17KB
MixAll.java 17KB
AbstractSendMessageProcessor.java 17KB
HAConnection.java 17KB
UtilAll.java 17KB
DynaCode.java 16KB
NettyRemotingServer.java 15KB
DefaultRequestProcessor.java 15KB
BrokerConfig.java 15KB
Broker2Client.java 15KB
IndexService.java 14KB
MQVersion.java 14KB
MessageDecoder.java 14KB
PullAPIWrapper.java 13KB
MQAdminExt.java 13KB
BrokerOuterAPI.java 13KB
MonitorService.java 13KB
AllocateMappedFileService.java 12KB
ClusterListSubCommand.java 12KB
DefaultMQPullConsumer.java 12KB
MQClientAPIImplTest.java 12KB
RebalanceLockManager.java 12KB
DefaultRequestProcessorTest.java 12KB
EndTransactionProcessor.java 11KB
ConsumerRunningInfo.java 11KB
PullMessageProcessorTest.java 11KB
DefaultMQPushConsumerTest.java 11KB
BrokerStartup.java 11KB
SendMessageProcessorTest.java 11KB
RemoteBrokerOffsetStore.java 11KB
DefaultMQProducerTest.java 11KB
BrokerStatsManager.java 11KB
MQAdminStartup.java 11KB
Store.java 10KB
IndexFile.java 10KB
Producer.java 10KB
ConsumerOffsetManager.java 10KB
ConsumerProgressSubCommand.java 10KB
QueryMsgByIdSubCommand.java 10KB
ClientRemotingProcessor.java 10KB
TransactionProducer.java 9KB
RebalancePushImpl.java 9KB
PullRequestHoldService.java 9KB
MonitorServiceTest.java 9KB
PrintMessageByQueueCommand.java 9KB
Configuration.java 9KB
TagMessageWithMulConsumerIT.java 9KB
RemotingCommandTest.java 9KB
ConsumerGroupInfo.java 9KB
LocalFileOffsetStore.java 9KB
RandomUtil.java 9KB
FilterClassManager.java 8KB
ProducerManager.java 8KB
ConsumerManager.java 8KB
LatencyFaultToleranceImpl.java 8KB
TagMessageWith1ConsumerIT.java 8KB
SubscriptionGroupManager.java 8KB
StatsItem.java 8KB
MQPullConsumerScheduleService.java 8KB
FiltersrvController.java 8KB
CLusterSendMsgRTCommand.java 8KB
Consumer.java 8KB
KVConfigManager.java 8KB
共 751 条
- 1
- 2
- 3
- 4
- 5
- 6
- 8
资源评论
Java程序员-张凯
- 粉丝: 1w+
- 资源: 6828
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功