package com.rpc;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* Message属性: AMQP协议一共预定义了14个属性,但是大多数属性很少使用,下面几个可能用的比较多
* deliveryMode:有2个值,一个是持久,另一个表示短暂(第二篇说过)
* contentType:内容类型:用来描述编码的MIME类型。例如,经常使用JSON编码是将此属性设置为一个很好的做法:application/json。
* replyTo:经常使用的是回调队列的名字 correlationid:RPC响应请求的相关应用
*
* 我们的rpc工作方式如下:
* 1:当客户端启动时,它创建一个匿名的独占回调队列。
* 2:对于rpc请求,客户端发送2个属性,一个是replyTo设置回调队列,另一是correlationId为每个队列设置唯一值
* 3:请求被发送到一个rpc_queue队列中
* 4:rpc服务器是等待队列的请求,当收到一个请求的时候,他就把消息返回的结果返回给客户端,使请求结束。
* 5:客户端等待回调队列上的数据,当消息出现的时候,他检查correlationId,如果它和从请求返回的值匹配,就进行响应。
*
* 服务器代码比较简单
* 1:建立连接,通道,队列
* 2:我们可能运行多个服务器进程,为了分散负载服务器压力,我们设置channel.basicQos(1);
* 3:我们用basicconsume访问队列。然后进入循环,在其中我们等待请求消息并处理消息然后发送响应。
*
*/
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) {
if (n == 0) {
return 0;
}
if (n == 1) {
return 1;
}
return fib(n - 1) + fib(n - 1);
}
public static void main(String[] args) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println("RPCServer Awating RPC request");
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(props.getCorrelationId()).build();
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println("RPCServer fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish("", props.getReplyTo(), replyProps,
response.getBytes());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
testRabbitMQ.zip (97个子文件)
testRabbitMQ
src
main
java
com
spring
exchange
direct
DirectListener2.java 538B
ProducerMain.java 1KB
Spittle.java 675B
ConsumerMain.java 353B
DirectListener1.java 538B
topic
ProducerMain.java 1KB
ConsumerMain.java 351B
TopicListener2.java 535B
TopicListener1.java 535B
fanout
ProducerMain.java 1KB
ConsumerMain.java 353B
FanoutListener2.java 538B
FanoutListener1.java 538B
queue
Customer.java 2KB
Producer.java 2KB
onetomany
Work.java 1KB
NewTask.java 2KB
onetoone
Send.java 873B
Recv.java 997B
exchange
direct
ReceiveLogsDirect2.java 2KB
RoutingSendDirect.java 2KB
ReceiveLogsDirect1.java 2KB
Topics
TopicSend.java 2KB
ReceiveLogsTopic2.java 2KB
ReceiveLogsTopic1.java 2KB
fanout
EmitLog.java 1KB
ReceiveLogs1.java 2KB
rpc
RPCClient.java 3KB
RPCServer.java 3KB
test
java
.classpath 1KB
.settings
org.eclipse.m2e.core.prefs 90B
org.eclipse.core.resources.prefs 119B
org.eclipse.jdt.core.prefs 243B
pom.xml 3KB
target
classes
META-INF
MANIFEST.MF 107B
maven
com
testRabbitMQ
pom.properties 240B
pom.xml 3KB
com
spring
exchange
direct
ConsumerMain.class 649B
DirectListener1.class 1KB
ProducerMain.class 2KB
DirectListener2.class 1KB
Spittle.class 935B
topic
ConsumerMain.class 646B
ProducerMain.class 2KB
TopicListener2.class 1KB
TopicListener1.class 1KB
fanout
ConsumerMain.class 649B
FanoutListener2.class 1KB
ProducerMain.class 2KB
FanoutListener1.class 1KB
queue
Customer$1.class 1KB
Customer.class 2KB
Producer.class 2KB
onetomany
NewTask.class 2KB
Work.class 3KB
onetoone
Recv.class 2KB
Send.class 2KB
exchange
direct
ReceiveLogsDirect2$1.class 2KB
ReceiveLogsDirect1.class 3KB
ReceiveLogsDirect1$1.class 2KB
RoutingSendDirect.class 2KB
ReceiveLogsDirect2.class 3KB
Topics
TopicSend.class 3KB
ReceiveLogsTopic2.class 3KB
ReceiveLogsTopic2$1.class 2KB
ReceiveLogsTopic1$1.class 2KB
ReceiveLogsTopic1.class 3KB
fanout
ReceiveLogs1$1.class 1KB
ReceiveLogs1.class 2KB
EmitLog.class 2KB
rpc
RPCClient.class 4KB
RPCServer.class 4KB
direct
customer.xml 2KB
common.xml 991B
producer.xml 2KB
topic
customer.xml 2KB
common.xml 1KB
producer.xml 2KB
fanout
customer.xml 2KB
common.xml 991B
producer.xml 1KB
error
customer.xml 1KB
common.xml 1KB
producer.xml 889B
test-classes
.project 772B
config
direct
customer.xml 2KB
common.xml 991B
producer.xml 2KB
topic
customer.xml 2KB
common.xml 1KB
producer.xml 2KB
fanout
customer.xml 2KB
common.xml 991B
producer.xml 1KB
error
customer.xml 1KB
common.xml 1KB
producer.xml 889B
共 97 条
- 1
资源评论
- java_人生2018-05-30不错,值得推荐。
- 普通网友2017-11-15感谢分享,很不错
- 苦涩20142018-09-06不错 可以看。谢谢
聆烨
- 粉丝: 0
- 资源: 5
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功