package com.gerry.rpc;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 软谋教育Java VIP课程
* <pre>
* RPC的服务提供端
* </pre>
*
* @author gerry
* @date 2018-07-04
*/
public class RPCServer {
// 定义端口属性
private int serverPort;
// 创建一个控制并发数量的线程池
ThreadPoolExecutor executor =
new ThreadPoolExecutor(5, 30, 200,
TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(10));
// 定义Map集合来注册服务
Map<String, Class<?>> serverRegistry = Collections.synchronizedMap(new HashMap<>());
public RPCServer() {
}
public RPCServer(int port) {
this.serverPort = port;
}
/**
* 定义一个暴露服务接口的方法
* @param serviceInterface 暴露的服务接口名称
* @param serviceImpl 暴露的服务接口对应实现类
*/
public void registryService(Class<?> serviceInterface, Class<?> serviceImpl) {
serverRegistry.put(serviceInterface.getName(), serviceImpl);
}
/**
* 定义发布服务的方法
*/
public void start() throws IOException {
// 建立网络通信
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(serverPort));
System.out.println("rpc服务启动.......");
try {
while (true) {
executor.execute(new RpcTask(serverSocket.accept()));
}
} finally {
if (serverSocket != null) {
serverSocket.close();
}
}
}
/**
* 定义关闭服务的方法
*/
public void stop() {
executor.shutdown();
}
/**
* 处理客户端线程代理类
*/
private class RpcTask implements Runnable {
private final Socket client;
public RpcTask(Socket client) {
this.client = client;
}
@Override
public void run() {
// 定义反序列化对象句柄
ObjectInputStream deSerializer = null;
// 定义序列化对象的句柄
ObjectOutputStream serializer = null;
try {
// 创建一个反序列化句柄
deSerializer = new ObjectInputStream(client.getInputStream());
// 获取调用接口全名称
String interfaceName = deSerializer.readUTF();
// 获取调用的方法名称
String methodName = deSerializer.readUTF();
// 获取方法参数类型列表
Class<?>[] parameterTypes = (Class<?>[]) deSerializer.readObject();
// 获取方法参数列表
Object[] parameters = (Object[]) deSerializer.readObject();
// 通过暴露的服务接口获取到对应接口实现类
Class<?> serviceInstance = serverRegistry.get(interfaceName);
// 反射创建一个方法对象
Method method = serviceInstance.getDeclaredMethod(methodName, parameterTypes);
// 通过反射调用方法
Object result = method.invoke(serviceInstance.newInstance(), parameters);
// 把服务端调用处理的结果返回到客户端
serializer = new ObjectOutputStream(client.getOutputStream());
// 把结果序列化到客户端
serializer.writeObject(result);
} catch (Exception e) {
} finally {
try {
if (deSerializer != null) {
deSerializer.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (serializer != null) {
serializer.flush();
serializer.close();
}
} catch (Exception e) {
e.printStackTrace();
}
try {
if (client != null) {
client.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}