# yl-netty-rpc
基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java )
## 说明
Spark Netty RPC 是一个高效的 RPC 框架,即可用于非 Spark 项目,用来学习研究 Netty & RPC 设计思想也是很好的素材。
已有不少 Spark RPC 的 git 项目,把 Spark RPC 功能从 Spark 中分离出来。若出于使用,已能基本满足需要。
不过,如果要学习 Spark Netty RPC,还需要更详细的拆解,以及适当做一些简化以易于理解。
yl-netty-rpc 参考 Spark Netty RPC ,适当做了简化,80%代码进行重写。可用于RPC,也易于学习和理解。
yl-netty-rpc 基于 Spark 2.1 ( 使用 netty 代替了 akka ) 开发。
- [1. 协议栈](#1-协议栈)
- [2. 依赖](#2-依赖)
- [3. 实例](#3-如何使用)
- [3.1 自定义 RPC 服务](#31-)
- [3.2 运行 server](#32-运行服务)
- [3.3 使用 Client ](#33-客户端调用)
- [4. 关于 RpcConfig](#4-rpcConfig)
## 1. 协议栈
![protocal](https://github.com/yilong2001/yl-netty-rpc/blob/master/img/SparkNettyRpcProtocalStack.jpg)
### 客户端
- 用户定义的消息对象
- 增加目标 Endpoint 地址
- 使用 [length]-[message type]-[message content] 方式对消息进行编码
- 增加 RequestId, 在 Client 对不同的 Request 以及 Request Callback Function 进行匹配
- 使用 Frame 进行传输 (每个 Frame 固定字节长度)
- 底层使用 TCP/IP 协议,使用 NIO channel
### 服务端
- NIO 接收数据
- 使用 FrameDecode 解码
- 从 Request 提取 RequestId 和 Content 通过 ChannelRequestHandler 处理
- 对 Content 解码,提取 Endpoint 和 消息 Object
- 在 Server 把 Request 消息分发到指定的 Endpoint
- Endpoint 完成 Receive and Reply
## 2. 依赖
maven 和 SBT 的依赖方式, 基于 **scala 2.11**.
Maven:
```
<dependency>
<groupId>com.example.rpc</groupId>
<artifactId>yl-netty-rpc_2.11</artifactId>
<version>1.0.0</version>
</dependency>
```
SBT:
```
"com.example.rpc" % "yl-netty-rpc_2.11" % "1.0.0"
```
## 3. 示例
下面给出了一个简单的使用示例
### 3.1 自定义RPC service (endpoint)
一个 endpoint 即为一个 RPC 服务,可定义任意类型的消息对象。不像 thrift,消息对象不需单独编译。
```
class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
override def onStart(): Unit = {
println("start hello endpoint")
}
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case SayHi(msg) => {
//println(s"receive $msg")
context.reply(s"$msg")
}
}
override def onStop(): Unit = {
println("stop hello endpoint")
}
override val epName: String = "HelloServer"
}
```
`Endpoint` 的核心是实现如下两个函数:
```
/**
* 只接收,不需要响应
*/
def receive: PartialFunction[Any, Unit] = {
case _ => throw new RpcException(self + " does not implement 'receive'")
}
/**
* 接收并响应
*/
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new RpcException(self + " won't reply anything"))
}
```
### 3.2 启动服务
要开始RPC服务,实现 HelloWorld 功能,需要如下步骤:
1. create `RpcConfig`, 定义netty 和 服务端运行的必要参数
2. create `RpcEnv` (RpcConfig, host, port, clientMode), 服务启动、及服务所需环境信息都依赖 RpcEnv
3. create `HelloEndpoint` , 并调用 setupEndpoint 注册此 endpoint 服务
4. `awaitTermination` 使得 server 驻留在 jvm 中运行.
```
import com.example.jrpc.nettyrpc.rpc.{HostPort, RpcConfig}
import com.example.srpc.nettyrpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, RpcEnv}
object HelloServer {
val serverName = "helloServer";
def main(args: Array[String]): Unit = {
val host = args(0)
val port = args(1).asInstance[Int]
val rpcEnv: RpcEnv = RpcEnv.create(new RpcConfig(), host, port, false)
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint(serverName, helloEndpoint)
rpcEnv.awaitTermination()
}
}
```
### 3.3 Client call
#### 3.3.1 异步调用
同样的,首先创建 `RpcEnv`;接着,使用 `setupEndpointRef` 获取服务端 `Endpoint` 的本地代理,使用 `EndpointRef` 与 `Endpoint` 通信
```
import com.example.jrpc.nettyrpc.rpc.{HostPort, RpcConfig}
import com.example.srpc.nettyrpc.{RpcEndpointRef, RpcEnv}
object HelloClient {
case class SayHi(msg: String)
def main(args: Array[String]): Unit = {
asyncCall(args(0), args(1).asInstance[Int])
}
def asyncCall(host:String, port:Int) = {
val rpcEnv: RpcEnv = RpcEnv.create(new RpcConfig(), host, port, true)
val epRef: RpcEndpointRef = rpcEnv.setupEndpointRef(new HostPort(host, port),
HelloServer.serverName)
val result: Future[String] = epRef.ask[String](SayHi("world"))
result.onComplete{ ... }
Await.result(result, Duration.apply("10s"))
}
}
```
#### 3.3.2 同步调用
使用 askSync 代替 ask 即可实现同步调用
```
object HelloClient {
def main(args: Array[String]): Unit = {
syncCall(args(0), args(1).asInstance[Int])
}
def syncCall(host:String, port:Int) = {
val rpcEnv: RpcEnv = RpcEnv.create(new RpcConfig(), host, port, true)
val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(new HostPort(host, port),
HelloServer.serverName)
val result = endPointRef.askSync[String](SayHi("world"), 1000*100)
println(result)
}
}
```
## 4. RpcConfig
`RpcConfig` 是简化版的 `SparkConf` , 定义 `server`、`netty`、`RPC call`所必须的参数
```
val rpcConf = new RpcConfig()
rpcConf.set("spark.rpc.lookupTimeout", "2s")
//or, change params with set function
rpcConf.getRpcConfig.setClientThreads(10)
```
## 致谢
yl-netty-rpc 来源于 Spark. yl-netty-rpc 使用 Apache2.0 Open Source License, 保留`Spark`版权信息.
没有合适的资源?快使用搜索试试~ 我知道了~
基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java ).zip
共65个文件
java:37个
scala:23个
md:3个
1.该资源内容由用户上传,如若侵权请联系客服进行举报
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
2.虚拟产品一经售出概不退款(资源遇到问题,请及时私信上传者)
版权申诉
0 下载量 144 浏览量
2024-03-23
23:39:46
上传
评论
收藏 123KB ZIP 举报
温馨提示
基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java )
资源推荐
资源详情
资源评论
收起资源包目录
基于 Spark Netty Rpc 框架,重新实现的一个 Netty Rpc 框架 ( scala + java ).zip (65个子文件)
java0323
pom.xml 9KB
src
main
java
com
example
jrpc
nettyrpc
netty
message
MessageEncoder.java 4KB
TransportFrameDecoder.java 8KB
Encoders.java 2KB
rsp
ResponseMessage.java 392B
RpcResponse.java 3KB
RpcFailure.java 2KB
MessageDecoder.java 2KB
req
OneWayMessage.java 2KB
RequestMessage.java 566B
RpcRequest.java 3KB
IMessage.java 3KB
NettyClientFactory.java 9KB
handler
INettyChannelHandler.java 702B
client
NettyChannelResponseHandler.java 6KB
INettyClientBootstrapWrapper.java 260B
ConcurrentNettyChannelClient.java 8KB
NettyChannelCommonHandler.java 7KB
server
NettyChannelRequestHandler.java 5KB
INettyServerBootstrapWrapper.java 300B
NettyUtils.java 2KB
NettyServer.java 5KB
NettyContext.java 4KB
buffer
NioManagedBuffer.java 2KB
ManagedBuffer.java 3KB
NettyManagedBuffer.java 2KB
rpc
HostPort.java 509B
RpcEnvConfig.java 680B
RpcHandler.java 3KB
RpcConfig.java 3KB
RpcResponseCallback.java 1KB
EndpointAddress.java 1KB
exception
RpcEndpointNotFoundException.java 232B
RpcException.java 282B
RpcEnvStoppedException.java 244B
RpcTimeoutException.java 303B
util
TestSerObject.java 305B
Constant.java 738B
scala
com
demo
rpc
HelloClient.scala 1KB
HelloServer.scala 1KB
example
srpc
nettyrpc
RpcEndpoint.scala 3KB
message
Inbox.scala 6KB
Outbox.scala 4KB
RpcResponseMessage.scala 164B
RpcOutboxMessage.scala 1KB
RpcRequestMessage.scala 2KB
IRpcServer.scala 129B
RpcCallContext.scala 487B
RpcDispatcher.scala 7KB
netty
NettyRpcHandler.scala 2KB
NettyRpcEndpointRef.scala 2KB
NettyRpcEnv.scala 11KB
RpcEndpointVerifier.scala 763B
NettyRpcEnvFactory.scala 726B
NettyRpcRemoteCallContext.scala 927B
serde
ByteBufferInputStream.scala 1KB
ByteBufferOutputStream.scala 1KB
RpcSerializer.scala 895B
RpcEnv.scala 3KB
RpcEndpointRef.scala 3KB
util
ThreadUtils.scala 3KB
spark-dag-schedule源码分析.md 21KB
img
SparkNettyRpcProtocalStack.jpg 55KB
netty-eventloop.md 5KB
README.md 6KB
共 65 条
- 1
资源评论
Kwan的解忧杂货铺
- 粉丝: 1w+
- 资源: 3640
下载权益
C知道特权
VIP文章
课程特权
开通VIP
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功