package ge.dl;
import ge.dl.api.RegisterInfo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharsetEncoder;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class LocatorService {
private static Map<String, RegisterInfo> infos = new HashMap<String, RegisterInfo>();
static CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
private static Set<String> exceptionClientIdSet = new HashSet<String>();
public static void main(String[] args) {
Selector selector = null;
ServerSocketChannel server = null;
try {
// 创建一个Selector
selector = Selector.open();
// 创建Socket并注册
server = ServerSocketChannel.open();
server.configureBlocking(false);
server.register(selector, SelectionKey.OP_ACCEPT);
// 启动端口监听
InetSocketAddress ip = new InetSocketAddress(12345);
server.socket().bind(ip);
// 监听事件
while (true) {
// 监听事件
selector.select();
// 事件来源列表
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
// 删除当前事件
it.remove();
// 判断事件类型
if (key.isAcceptable()) {
// 连接事件
ServerSocketChannel server2 = (ServerSocketChannel) key.channel();
SocketChannel channel = server2.accept();
channel.configureBlocking(false);
SelectionKey clientKey = channel.register(selector, SelectionKey.OP_READ);
ByteBuffer buffer = ByteBuffer.allocate(1024);
clientKey.attach(buffer);
System.out.println("Client Connected from: :" + channel.socket().getInetAddress().getHostName()
+ ":" + channel.socket().getPort());
} else if (key.isReadable()) {
// 读取数据事件
SocketChannel channel = (SocketChannel) key.channel();
// 读取数据
CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
ByteBuffer buffer = ByteBuffer.allocate(50);
channel.read(buffer);
buffer.flip();
String msg = decoder.decode(buffer).toString();
System.out.println("Recevied Message: :" + msg);
postInfo(channel, msg);
sendback(channel, msg);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
// 关闭
try {
selector.close();
server.close();
} catch (IOException e) {
}
}
}
private static void sendback(SocketChannel channel, String msg) {
String clientIP = channel.socket().getInetAddress().getHostName();
String retString = "HeartBearAck";
for(RegisterInfo info : infos.values()) {
if(info.isAlive() && info.timeout()) {
info.setAlive(false);
exceptionClientIdSet.add(info.getBackupClientId());
}
}
if(exceptionClientIdSet.contains(clientIP)) {
retString += "|Failover";
}
try {
channel.write(encoder.encode(CharBuffer.wrap(retString)));
} catch (CharacterCodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private static void postInfo(SocketChannel channel, String msg) {
String[] msgArray = msg.split("|");
String clientIP = channel.socket().getInetAddress().getHostName();
if(infos.get(clientIP) == null) {
RegisterInfo info = new RegisterInfo();
info.setClientId(clientIP);
info.setActiveTS(System.currentTimeMillis());
info.setAlive(true);
info.setBackupClientId(msgArray[1]);
info.setFailoverStarted(false);
infos.put(clientIP, info);
} else {
infos.get(clientIP).setActiveTS(System.currentTimeMillis());
}
if(msgArray.length == 3) {
String extraAction = msgArray[2];
for(RegisterInfo info : infos.values()) {
if(clientIP.equals(info.getBackupClientId())) {
info.setFailoverStarted(true);
}
}
}
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}