package com.roy.springboot.producer;
import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import com.roy.springboot.util.MyConstants;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
@RestController
public class ProducerController {
@Autowired
private RabbitTemplate rabbitTemplate;
// @Autowired
// private AmqpTemplate amqpTemplate;
@ApiOperation(value="direct发送接口",notes="直接发送到队列。task模式")
@GetMapping(value="/directSend")
public Object directSend(String message) throws AmqpException, UnsupportedEncodingException {
//设置部分请求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setPriority(2);
//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//将对象转换成json再发送。
// rabbitTemplate.convertandsend("",Object);
//发消息
rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
@ApiOperation(value="fanout发送接口",notes="发送到fanoutExchange。消息将往该exchange下的所有queue转发")
@GetMapping(value="/fanoutSend")
public Object fanoutSend(String message) throws AmqpException, UnsupportedEncodingException {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里发送消息。分发到exchange下的所有queue
rabbitTemplate.send(MyConstants.EXCHANGE_FANOUT, "", new Message(message.getBytes("UTF-8"),messageProperties));
Message message2 = MessageBuilder.withBody(message.getBytes()).setMessageId(UUID.randomUUID().toString()).build();
rabbitTemplate.send(message2);
return "message sended : "+message;
}
@ApiOperation(value="topic发送接口",notes="发送到topicExchange。exchange转发消息时,会往routingKey匹配的queue发送,*代表一个单词,#代表0个或多个单词。")
@ApiImplicitParam(name="routingKey",value="路由关键字")
@GetMapping(value="/topicSendHunanIT")
public Object topicSend(String routingKey,String message) throws AmqpException, UnsupportedEncodingException {
if(null == routingKey) {
routingKey="hebei.IT";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
//fanout模式只往exchange里发送消息。分发到exchange下的所有queue
rabbitTemplate.send("topicExchange", routingKey, new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : routingKey >"+routingKey+";message > "+message;
}
@ApiOperation(value="header发送接口",notes="发送到headerExchange。exchange转发消息时,不再管routingKey,而是根据header条件进行转发。")
@GetMapping(value="/headerSend")
public Object headerSend(String txTyp,String busTyp,String message) throws AmqpException, UnsupportedEncodingException {
if(null == txTyp) {
txTyp="0";
}
if(null == busTyp) {
busTyp="0";
}
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setHeader("txTyp", txTyp);
messageProperties.setHeader("busTyp", busTyp);
//fanout模式只往exchange里发送消息。分发到exchange下的所有queue
rabbitTemplate.send("headerExchange", "uselessRoutingKey", new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : txTyp >"+txTyp+";busTyp > "+busTyp;
}
@ApiOperation(value="quorum队列发送接口",notes="直接发送到队列。Quorum队列")
@GetMapping(value="/directQuorum")
public Object directQuorum(String message) throws AmqpException, UnsupportedEncodingException {
//设置部分请求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setPriority(2);
//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//将对象转换成json再发送。
// rabbitTemplate.convertandsend("",Object);
//发消息
rabbitTemplate.send(MyConstants.QUEUE_QUORUM,new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
// @ApiOperation(value="stream队列发送接口",notes="直接发送到队列。stream队列")
// @GetMapping(value="/directStream")
public Object directStream(String message) throws AmqpException, UnsupportedEncodingException {
//设置部分请求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setPriority(2);
//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//将对象转换成json再发送。
// rabbitTemplate.convertandsend("",Object);
//发消息
rabbitTemplate.send(MyConstants.QUEUE_STREAM,new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
rabbitMQ
共129个文件
class:55个
java:42个
xml:15个
0 下载量 160 浏览量
2024-08-01
20:17:21
上传
评论
收藏 150KB ZIP 举报
温馨提示
rabbitMQrabbitMQ
资源推荐
资源详情
资源评论
收起资源包目录
rabbitMQrabbitMQ (129个子文件)
application.properties.bak 5KB
application.properties.bak 5KB
PublishConfirm.class 8KB
ProducerController.class 5KB
DirectReceiver.class 5KB
ReceiveLogsHeader$1.class 3KB
HeaderConfig.class 3KB
Sixinxiaofeizhe$1.class 3KB
PushReceiver$1.class 3KB
Sender.class 3KB
ReceiveLogsHeader.class 3KB
EmitLogHeader.class 3KB
Shengchanzhe.class 3KB
StreamSender.class 2KB
TopicConfig.class 2KB
ShardingConsumer.class 2KB
DownStreamConsumer.class 2KB
DirectReceiver$1.class 2KB
xiaofeizhe.class 2KB
StreamPushConsumer.class 2KB
SendMessageController.class 2KB
Swagger2.class 2KB
PullReceiver.class 2KB
ReceiveLogsTopic.class 2KB
ReceiveLogsFanout.class 2KB
FanoutConfig.class 2KB
StreamPushConsumer$1.class 2KB
ShardingProducer.class 2KB
Worker1$1.class 2KB
Worker$1.class 2KB
ReceiveLogsDirect.class 2KB
DownStreamConsumer$1.class 2KB
ShardingConsumer$1.class 2KB
ReceiveLogsDirect$1.class 2KB
ReceiveLogsFanout$1.class 2KB
UpstreamProducer.class 2KB
ReceiveLogsTopic$1.class 2KB
Sixinxiaofeizhe.class 2KB
EmitLogTopic.class 2KB
NewTask.class 2KB
EmitLogDirect.class 2KB
EmitLogFanout.class 2KB
xiaofeizhe$1.class 2KB
PushReceiver.class 1KB
RabbitmqConfig.class 1KB
Worker1.class 1KB
Worker.class 1KB
MessageReceiver.class 1KB
MessageSender.class 1KB
StreamConfig.class 1KB
RabbitMQUtil.class 1KB
MyConstants.class 1KB
QuorumConfig.class 1KB
SCApplication.class 870B
DirectConfig.class 725B
RabbitMQApplication.class 719B
WorkConfig.class 307B
.gitignore 48B
.gitignore 0B
RabbitMQDemo.iml 81B
SpringCloudStreamDemo-1.0.jar 7KB
ProducerController.java 6KB
PublishConfirm.java 5KB
DirectReceiver.java 5KB
StreamPushConsumer.java 3KB
ShardingConsumer.java 3KB
ReceiveLogsHeader.java 2KB
DownStreamConsumer.java 2KB
EmitLogHeader.java 2KB
ReceiveLogsTopic.java 2KB
ReceiveLogsDirect.java 2KB
PushReceiver.java 2KB
Sixinxiaofeizhe.java 2KB
PullReceiver.java 2KB
ReceiveLogsFanout.java 2KB
Worker.java 2KB
HeaderConfig.java 2KB
Sender.java 2KB
Worker1.java 2KB
FanoutConfig.java 2KB
TopicConfig.java 1KB
xiaofeizhe.java 1KB
Swagger2.java 1KB
StreamSender.java 1KB
Shengchanzhe.java 1KB
MyConstants.java 1KB
SendMessageController.java 1KB
ShardingProducer.java 1KB
UpstreamProducer.java 1KB
EmitLogDirect.java 1KB
EmitLogTopic.java 1KB
RabbitMQUtil.java 925B
RabbitmqConfig.java 916B
EmitLogFanout.java 915B
StreamConfig.java 825B
NewTask.java 823B
MessageReceiver.java 754B
QuorumConfig.java 637B
MessageSender.java 614B
SCApplication.java 530B
共 129 条
- 1
- 2
资源评论
Promise_J_Z
- 粉丝: 82
- 资源: 3
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功