/**
* Copyright : Copyright (c) 2006. Wintim Corp. All rights reserved
* File Summary : class of connector through netty
* Author : wuyadong
* Create time : 2013-2-21
* Project Name : Sailfish1.4.3
*/
package com.wintim.connect;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.handler.codec.serialization.ObjectEncoder;
import com.wintim.buffer.IFlushEvent;
import com.wintim.observer.IEvent;
/***
* communicate with server through netty
* */
public class NettyConnector implements IConnect {
private static NettyConnector connector = null;
private static List<Object[]> addressList = null;
private static int formerServerIndex = -1;
private ClientBootstrap clientBootstrap = null;
private Channel channel = null;
private AtomicBoolean isConnected = null;
private AtomicBoolean isConnecting = null;
private NettyConnector(String ip, int port) {
isConnected = new AtomicBoolean(false);
isConnecting = new AtomicBoolean(false);
clientBootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));
clientBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
// TODO Auto-generated method stub
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("log-decoder", new ObjectEncoder());
pipeline.addLast("command-handler", new SimpleChannelUpstreamHandler(){
@Override
public void channelConnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
// TODO Auto-generated method stub
isConnected.set(true);
channel = e.getChannel();
super.channelConnected(ctx, e);
}
@Override
public void channelClosed(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
// TODO Auto-generated method stub
isConnected.set(false);
System.out.println("netty connector: channel closed!");
super.channelClosed(ctx, e);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
// TODO Auto-generated method stub
isConnected.set(false);
System.out.println("nettyConnector: channel disconnected!");
super.channelDisconnected(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext arg0,
ExceptionEvent exception) throws Exception {
// TODO Auto-generated method stub
System.err.println(exception);
}
});
return pipeline;
}
});
clientBootstrap.setOption("tcpNoDelay", false);
clientBootstrap.setOption("keepAlive", false);
clientBootstrap.setOption("reuseAddress", true);
clientBootstrap.setOption("soLinger", 0);
clientBootstrap.setOption("remoteAddress", new InetSocketAddress(ip, port));
clientBootstrap.setOption("connectTimeoutMillis", 500);
System.out.println("netty connector init finished!");
}
private static NettyConnector getInstance(String ip, int port) {
if (connector == null) {
connector = new NettyConnector(ip, port);
}
return connector;
}
public static NettyConnector getInstance(List<Object[]> list) {
if (list == null) {
throw new NullPointerException("address list can't be null");
}
if (list.size() <= 0) {
throw new IllegalArgumentException("address list can't be empty!");
}
if (addressList == null) {
addressList = list;
}
Object[] object = getAddress();
return getInstance((String)object[0],(Integer)object[1]);
}
//random server, except former server
private static Object[] getAddress() {
Random random = new Random(System.currentTimeMillis());
int i = 0;
if (addressList.size() > 1) {
i = random.nextInt(addressList.size() - 1);
//if i == former server ,then add 1
if (i == formerServerIndex) {
i = (formerServerIndex + 1) % addressList.size();
}
} else if (addressList.size() == 1) {
i = 0;
}
return addressList.get(i);
}
@Override
public synchronized boolean send(Object obj) {
// TODO Auto-generated method stub
if (channel != null
&& isConnected.get()
&& channel.isWritable()) {
channel.write(obj);
return true;
} else {
return false;
}
}
@Override
public void connect() {
// TODO Auto-generated method stub
if (clientBootstrap != null ) {
//if it's locked, then do nothing, just get out!
synchronized (NettyConnector.this) {
if (isConnecting.get()
|| isConnected.get())
return ;
isConnecting.set(true);
}
//change server
Object[] object = getAddress();
System.out.println("connect...");
System.out.println("address:" + object[0] + ":" + object[1]);
clientBootstrap.setOption("remoteAddress", new InetSocketAddress((String)object[0],
(Integer)object[1]));
ChannelFuture future = clientBootstrap.connect();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture f) throws Exception {
// TODO Auto-generated method stub
if (f.isSuccess()) {
isConnected.set(true);
System.out.println("connected!");
}
else {
isConnected.set(false);
System.err.println("unconnected!");
}
isConnecting.set(false);
}
});
}
}
@Override
public void close() {
// TODO Auto-generated method stub
if (clientBootstrap != null) {
if (channel != null) {
System.out.println("close");
channel.close().awaitUninterruptibly();
//channel.getCloseFuture().awaitUninterruptibly();
}
//clientBootstrap.releaseExternalResources();
}
}
/****
* notice ,whether it is connected, depended on
* the former result of connection.
*
* */
@Override
public boolean isConnected() {
// TODO Auto-generated method stub
if (isConnected.get()
&& channel != null) {
return true;
} else {
return false;
}
}
@Override
public void update(IEvent event) {
// TODO Auto-generated method stub
if (event.getCommand().equals(IFlushEvent.BUFFER_FLUSH_EVENT)) {
//if can connected ,send them to server
if (isConnected()) {
send(event.getContent());
} else {
//if cann't store them in sailfish.log
System.err.println("Can't Send log");
System.err.println(event.getContent().get("logs"));
}
}
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
基于Storm的日志收集系统 (203个子文件)
all-wcprops 967B
all-wcprops 967B
all-wcprops 945B
all-wcprops 629B
all-wcprops 426B
all-wcprops 426B
all-wcprops 426B
all-wcprops 125B
all-wcprops 118B
AlarmHandler.class 8KB
BaseAppender.class 7KB
BaseAppender.class 7KB
NettyConnector.class 6KB
NettyConnector.class 6KB
LogSpout.class 6KB
StoreHandler.class 5KB
NettyReceiver.class 5KB
StoreConfig.class 5KB
AlarmConfig$Alarm.class 4KB
AlarmConfig.class 4KB
LogBuffer.class 4KB
LogBuffer.class 4KB
NettyReceiver$1$1.class 3KB
LogInf.class 3KB
LogInf.class 3KB
LogInf.class 3KB
ReceiverConfig.class 3KB
Server.class 3KB
UIDUtil.class 3KB
UIDUtil.class 3KB
TopologyParser.class 3KB
SimpleBolt.class 3KB
Organization.class 3KB
Log4jFormater.class 3KB
Log4jFormater.class 2KB
TopologyParser$1.class 2KB
SLevel.class 2KB
NettyConnector$1$1.class 2KB
NettyConnector$1$1.class 2KB
SLevel.class 2KB
SLevel.class 2KB
PathUtil.class 2KB
StoreConfig$Filter.class 2KB
Configurator.class 2KB
LogSpout$1.class 2KB
MailThread.class 2KB
ReceiverConfig$Address.class 2KB
NettyReceiver$1.class 2KB
Log.class 2KB
Log.class 1KB
Log.class 1KB
NettyConnector$1.class 1KB
NettyConnector$2.class 1KB
NettyConnector$2.class 1KB
XMLParser.class 1KB
Duty.class 1KB
NettyConnector$1.class 1KB
NettyReceiver$1$1$1.class 1KB
LogBuffer$2.class 1KB
LogBuffer$2.class 1KB
SimpleAppender.class 1KB
LogCreator.class 1KB
SimpleAppender.class 1KB
LogCreator.class 1KB
LogBuffer$1.class 623B
LogBuffer$1.class 622B
HtmlUtil.class 550B
IHandler.class 412B
IFormat.class 298B
IFormat.class 298B
IEvent.class 276B
IEvent.class 276B
IEvent.class 276B
IReceiverEvent.class 263B
IConnect.class 260B
ISubject.class 260B
IConnect.class 260B
ISubject.class 260B
ISubject.class 260B
IConfig.class 234B
IFlushEvent.class 232B
IFlushEvent.class 232B
IObserver.class 170B
IObserver.class 170B
IObserver.class 170B
IReceiver.class 153B
.classpath 980B
entries 981B
entries 981B
entries 975B
entries 687B
entries 517B
entries 517B
entries 517B
entries 333B
entries 234B
sailfish-server-1.4.3-SNAPSHOT-jar-with-dependencies.jar 2.33MB
netty-3.5.9.Final.jar 1.08MB
log4j-1.2.17.jar 478KB
junit-4.8.1.jar 231KB
共 203 条
- 1
- 2
- 3
zxogj
- 粉丝: 11
- 资源: 26
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
- 4
前往页