package simple;
import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.HashSet;
import java.util.Set;
/**
* 拉模式
* Created by BaiLi
*/
public class PullConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("SimplePullConsumer");
pullConsumer.setNamesrvAddr("192.168.43.137:9876");//执行nameserver地址
Set<String> topics = new HashSet<>();
topics.add("simple");//添加Topic
topics.add("TopicTest");
pullConsumer.setRegisterTopics(topics);
pullConsumer.start();
while (true) { //循环拉取消息
pullConsumer.getRegisterTopics().forEach(n -> {
try {
Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues(n);//获取主题中的Queue
messageQueues.forEach(l -> {
try {
//获取Queue中的偏移量
long offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_MEMORY);
if (offset < 0) {
offset = pullConsumer.getOffsetStore().readOffset(l, ReadOffsetType.READ_FROM_STORE);
}
if (offset < 0) {
offset = pullConsumer.maxOffset(l);
}
if (offset < 0) {
offset = 0;
}
//拉取Queue中的消息。每次获取32条
PullResult pullResult = pullConsumer.pull(l, "*", offset, 32);
System.out.printf("循环拉取消息ing %s%n",pullResult);
switch (pullResult.getPullStatus()) {
case FOUND:
pullResult.getMsgFoundList().forEach(p -> {
System.out.printf("拉取消息成功%s%n", p);
});
//更新偏移量
pullConsumer.updateConsumeOffset(l, pullResult.getNextBeginOffset());
}
} catch (MQClientException e) {
e.printStackTrace();
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
} catch (MQClientException e) {
e.printStackTrace();
}
});
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
RocketMQ原生API简单应用

共61个文件
class:34个
java:22个
xml:5个

需积分: 0 22 浏览量
2023-06-07
14:52:46
上传
评论
收藏 65KB ZIP 举报
温馨提示
Rocket_API_BaiLi/图灵笔记
资源推荐
资源详情
资源评论










收起资源包目录























































































共 61 条
- 1
资源评论


Gabriel_liao
- 粉丝: 4
- 资源: 2
上传资源 快速赚钱
我的内容管理 展开
我的资源 快来上传第一个资源
我的收益
登录查看自己的收益我的积分 登录查看自己的积分
我的C币 登录后查看C币余额
我的收藏
我的下载
下载帮助


安全验证
文档复制为VIP权益,开通VIP直接复制
