/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.console.service.client;
import com.google.common.base.Throwables;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.admin.ConsumeStats;
import org.apache.rocketmq.common.admin.RollbackStats;
import org.apache.rocketmq.common.admin.TopicStatsTable;
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.ClusterInfo;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumeStatsList;
import org.apache.rocketmq.common.protocol.body.ConsumerConnection;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.body.GroupList;
import org.apache.rocketmq.common.protocol.body.KVTable;
import org.apache.rocketmq.common.protocol.body.ProducerConnection;
import org.apache.rocketmq.common.protocol.body.QueueTimeSpan;
import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.console.util.JsonUtil;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.tools.admin.MQAdminExt;
import org.apache.rocketmq.tools.admin.api.MessageTrack;
import org.joor.Reflect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import static org.apache.rocketmq.remoting.protocol.RemotingSerializable.decode;
@Service
public class MQAdminExtImpl implements MQAdminExt {
private Logger logger = LoggerFactory.getLogger(MQAdminExtImpl.class);
public MQAdminExtImpl() {
}
@Override
public void updateBrokerConfig(String brokerAddr, Properties properties)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
UnsupportedEncodingException, InterruptedException, MQBrokerException {
MQAdminInstance.threadLocalMQAdminExt().updateBrokerConfig(brokerAddr, properties);
}
@Override
public void createAndUpdateTopicConfig(String addr, TopicConfig config)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateTopicConfig(addr, config);
}
@Override
public void createAndUpdateSubscriptionGroupConfig(String addr, SubscriptionGroupConfig config)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
MQAdminInstance.threadLocalMQAdminExt().createAndUpdateSubscriptionGroupConfig(addr, config);
}
@Override
public SubscriptionGroupConfig examineSubscriptionGroupConfig(String addr, String group) {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
}
catch (Exception err) {
throw Throwables.propagate(err);
}
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
SubscriptionGroupWrapper subscriptionGroupWrapper = decode(response.getBody(), SubscriptionGroupWrapper.class);
return subscriptionGroupWrapper.getSubscriptionGroupTable().get(group);
}
default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
}
}
@Override
public TopicConfig examineTopicConfig(String addr, String topic) {
RemotingClient remotingClient = MQAdminInstance.threadLocalRemotingClient();
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null);
RemotingCommand response = null;
try {
response = remotingClient.invokeSync(addr, request, 3000);
}
catch (Exception err) {
throw Throwables.propagate(err);
}
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
TopicConfigSerializeWrapper topicConfigSerializeWrapper = decode(response.getBody(), TopicConfigSerializeWrapper.class);
return topicConfigSerializeWrapper.getTopicConfigTable().get(topic);
}
default:
throw Throwables.propagate(new MQBrokerException(response.getCode(), response.getRemark()));
}
}
@Override
public TopicStatsTable examineTopicStats(String topic)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().examineTopicStats(topic);
}
@Override
public TopicList fetchAllTopicList() throws RemotingException, MQClientException, InterruptedException {
TopicList topicList = MQAdminInstance.threadLocalMQAdminExt().fetchAllTopicList();
logger.debug("op=look={}", JsonUtil.obj2String(topicList.getTopicList()));
return topicList;
}
@Override
public KVTable fetchBrokerRuntimeStats(String brokerAddr)
throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().fetchBrokerRuntimeStats(brokerAddr);
}
@Override
public ConsumeStats examineConsumeStats(String consumerGroup)
throws RemotingException, MQClientException, InterruptedException, MQBrokerException {
return MQAdminInstance.threadLocalMQAdminExt().examineConsumeStats(consumerGroup);
}
@Override
public ConsumeStats examineConsumeStats(String consum
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
rocketmq可视化界面,rocketmq-console (1052个子文件)
angular-material.min.css 402KB
bootstrap.css 144KB
bootstrap.min.css 122KB
bootstrap-material-design.css 105KB
bootstrap-material-design.css 94KB
bootstrap-material-design.min.css 92KB
btn.css 40KB
font-awesome.css 37KB
font-awesome.min.css 30KB
bootstrap-theme.css 26KB
docs.css 23KB
bootstrap-theme.min.css 23KB
chosen.css 14KB
ladda.css 12KB
main.css 10KB
ladda-themeless.css 10KB
normalize.css 9KB
ladda.min.css 9KB
bootstrap-datetimepicker.min.css 8KB
ladda-themeless.min.css 8KB
default.css 5KB
app.css 5KB
ngDialog-theme-default.css 4KB
timeline.css 3KB
jquery.dropdown.css 2KB
angular-ui-notification.css 2KB
angular-ui-notification.css 2KB
normalize.css 2KB
ngDialog.min.css 1KB
theme.css 1KB
ripples.css 1KB
ripples.min.css 1KB
ripples.css 761B
animate.css 534B
chosen-spinner.css 482B
angular-csp.css 343B
login.css 129B
Dockerfile 169B
fontawesome-webfont.eot 162KB
glyphicons-halflings-regular.eot 20KB
icomoon.eot 2KB
spinner.gif 2KB
.gitignore 23B
topic.html 22KB
consumer.html 22KB
message.html 15KB
index.html 7KB
cluster.html 6KB
producer.html 3KB
index.html 2KB
un_support_browser.html 2KB
_header.html 2KB
ops.html 2KB
50x.html 819B
404.html 819B
_footer.html 246B
MQAdminExtImpl.java 23KB
ConsumerServiceImpl.java 16KB
DashboardCollectTask.java 14KB
MessageServiceImpl.java 9KB
ConsumerServiceImplTest.java 8KB
TopicServiceImpl.java 8KB
RocketMQConsoleTestBase.java 7KB
TopicServiceImplTest.java 6KB
DashboardCollectServiceImpl.java 6KB
TestRocketMQServer.java 6KB
JsonUtil.java 6KB
MessageServiceImplTest.java 6KB
ConsumerController.java 5KB
TopicController.java 5KB
MessageView.java 5KB
TestController.java 4KB
MessageController.java 4KB
MonitorServiceImpl.java 4KB
MQAdminInstance.java 4KB
WebStaticApplicationTests.java 3KB
ClusterServiceImpl.java 3KB
RMQConfigure.java 3KB
TopicConfigInfo.java 3KB
MQAdminAspect.java 3KB
ProducerServiceImplTest.java 3KB
MonitorController.java 3KB
OpsServiceImpl.java 3KB
DashboardServiceImpl.java 3KB
ClusterServiceImplTest.java 3KB
GroupConsumeInfo.java 3KB
GlobalRestfulResponseBodyAdvice.java 2KB
QueueStatInfo.java 2KB
OpsServiceImplTest.java 2KB
ConsumerService.java 2KB
OpsController.java 2KB
ConnectionInfo.java 2KB
DashboardController.java 2KB
MonitorTask.java 2KB
ConsumerConfigInfo.java 2KB
TopicService.java 2KB
GlobalExceptionHandler.java 2KB
AbstractCommonService.java 2KB
ProducerController.java 2KB
DashboardCollectServiceImplTest.java 2KB
共 1052 条
- 1
- 2
- 3
- 4
- 5
- 6
- 11
资源评论
尘光掠影
- 粉丝: 385
- 资源: 54
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功