package GetFlow;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import GetFlow.flowprovider;
import GetFlow.FlowRecord;
public class flowreceive extends flowprovider{
private DatagramSocket client = null; //监听NetFlow数据报(UDP)的socket
private DatagramPacket packet = null; //UDP数据包
private ByteBuffer flowBuffer; //NetFlow数据包头部缓冲区
private BufferedWriter writer; //输出文件
private static final short HEADERLEN = 24; //NetFlow数据包头部长度
private ByteBuffer headerBuffer; //NetFlow数据包头部缓冲区
private long packetSeq = 0; //当前收到数据包的序列号
private long firstFlowSeq = 0; //开始收集时,所收到的第一条NetFlow记录的序列号
private long currentFlowSeq = 0; //当前NetFlow记录的序列号
private long receivedFlowCount = 0; //收到的NetFlow记录数
private float receivedRate = 1; //接收的成功率
private byte[] buffer = new byte[8192];// UDP的头部8字节,IP数据报的总长度小于65535字节
// private byte[] data=new byte[65507]; //因此UDP的数据部分长度最大为65507
private int port = 2000; //监听端口号,NetFlow数据从该端口流入
private int packetsBufferSize = 3; //收集PBSIZE个数据包后,将这些数据包以数组
//方式插入到缓冲队列中。相当于一级缓冲区大小
private int minBufferTimeInSecs=4;
private int maxBufferTimeInSecs=10;
private int avgBufferTimeInSecs=7;
private int maxPacketsBufferSize=4000;
public flowreceive(int port){
this.port=port;
headerBuffer=ByteBuffer.allocate(HEADERLEN);
flowBuffer=ByteBuffer.allocate(1600);
}
public void provide(){
stop=false;//命令FlowReceiver停止
end=false; //标志FlowReceiver是否已经停止
/**
* 对于第一个收到的包要特殊处理, 要从其中获得本收集器收到的第一条流记录的序列号
*/
try {
writer = new BufferedWriter(new FileWriter("D:\\test.txt"));
System.out.println("step2");
} catch (IOException ioe) {
ioe.printStackTrace();
}//建立文件夹用来存储NetFlow信息
try {
client = new DatagramSocket(port);// 监听端口port,netflow数据将从这里流入
packet = new DatagramPacket(buffer, buffer.length);
client.receive(packet);// 如果没有收到数据包,该方法将阻塞
DatagramPacket[] packets = new DatagramPacket[1];
packets[0] = packet;
headerBuffer.clear();
//headerBuffer.put(packet.getData(), 0, HEADERLEN);
headerBuffer.put(packets[0].getData(), 0, HEADERLEN);
headerBuffer.flip();
firstFlowSeq = headerBuffer.getInt(16);
currentFlowSeq = firstFlowSeq;
System.out.println("当前NetFlow记录的序列号"+currentFlowSeq);
System.out.println("过去的NetFlow记录数"+receivedFlowCount);
receivedFlowCount += (packet.getLength() - HEADERLEN) / 48;
System.out.println("现在的NetFlow记录数"+receivedFlowCount);
packetSeq++;
System.out.println("当前收到数据包的序列号"+packetSeq);
//targetPool.add(packets);//存数据库使用
} catch (Exception e) {
e.printStackTrace();
}
long bufferStartTimeInMills=0;
long bufferEndTimeInMills=0;
int bufferDurationSecs=avgBufferTimeInSecs;
while (!stop) {
try {
bufferStartTimeInMills=System.currentTimeMillis();
if(bufferDurationSecs<minBufferTimeInSecs||bufferDurationSecs>maxBufferTimeInSecs){
packetsBufferSize=regulateBufferSize(packetsBufferSize,bufferDurationSecs);
//System.out.println("after regulate packetsBufferSize is "+packetsBufferSize);
}
DatagramPacket[] packets = new DatagramPacket[packetsBufferSize];
for (int i = 0; i < packetsBufferSize; i++) { // 先将PBSIZE个数据包放入数组,然后将该数组放入队列
if (stop)
break;
// buffer = new byte[8192];// 为每个数据包开辟新的缓冲区,
// 否则后面的数据包会覆盖前面的数据包
buffer = new byte[1620];
packet = new DatagramPacket(buffer, buffer.length);
client.receive(packet);// 如果没有收到数据包,该方法将阻塞
packets[i] = packet;
headerBuffer.clear();
headerBuffer.put(packet.getData(), 0, HEADERLEN);
headerBuffer.flip();
currentFlowSeq = headerBuffer.getInt(16);
System.out.println("当前NetFlow记录的序列号"+currentFlowSeq);
receivedRate = ((float) receivedFlowCount)
/ (currentFlowSeq - firstFlowSeq);
receivedFlowCount += (packet.getLength() - HEADERLEN) / 48;
System.out.println("现在的NetFlow记录数"+receivedFlowCount);
packetSeq++;
System.out.println("当前收到数据包的序列号"+packetSeq);
int flowLength = packet.getLength() - HEADERLEN;
long sysUpMillSeconds = (headerBuffer.getInt() + 4294967296L) % 4294967296L;// NetFlow设备持续运行时间
long currentSeconds = (headerBuffer.getInt() + 4294967296L) % 4294967296L; // 当前时间,自1970年零时以来逝去的时间,以秒计
long currentMillSeconds = currentSeconds * 1000;// 当前时间,自1970年零时以来逝去的时间,以毫秒计
flowBuffer.clear();
flowBuffer.put(packet.getData(), HEADERLEN, flowLength); // 将数据包里的NetFlow记录放到flowBuffer中
flowBuffer.flip();
while (flowBuffer.hasRemaining()) {
FlowRecord flow = new FlowRecord();
String format;
// src ip
flow.nums[FlowRecord.SRCIP] = (flowBuffer.getInt() + 4294967296L) % 4294967296L;
// dst ip
flow.nums[FlowRecord.DSTIP] = (flowBuffer.getInt() + 4294967296L) % 4294967296L;
// next hop
flow.nums[FlowRecord.NEXTHOP] = (flowBuffer.getInt() + 4294967296L) % 4294967296L;
// input interface
flow.nums[FlowRecord.INF] = (flowBuffer.getShort() + 65536) % 65536;
// output interface
flow.nums[FlowRecord.OUTF] = (flowBuffer.getShort() + 65536) % 65536;
// packets
flow.nums[FlowRecord.PKTS] = (flowBuffer.getInt() + 4294967296L) % 4294967296L;
// bytes
flow.nums[FlowRecord.BYTES] = (flowBuffer.getInt() + 4294967296L) % 4294967296L;
long startMillSeconds = (flowBuffer.getInt() + 4294967296L) % 4294967296L;
long endMillSeconds = (flowBuffer.getInt() + 4294967296L) % 4294967296L;
// start time
flow.nums[FlowRecord.START] = (currentMillSeconds + startMillSeconds - sysUpMillSeconds);
// end time
flow.nums[FlowRecord.ENDT] = (currentMillSeconds + endMillSeconds - sysUpMillSeconds);
// src port
flow.nums[FlowRecord.SRCPORT] = (flowBuffer.getShort() + 65536) % 65536;
// dst port
flow.nums[FlowRecord.DSTPORT] = (flowBuffer.getShort() + 65536) % 65536;
// pad1
flow.nums[FlowRecord.PAD1] = (short) ((flowBuffer.get() + 256) % 256);
// tcp flag
flow.nums[FlowRecord.TCPF] = (short) ((flowBuffer.get() + 256) % 256);
// ip protocol
flow.nums[FlowRecord.PROT] = (short) ((flowBuffer.get() + 256) % 256);
// tos
flow.nums[FlowRecord.TOS] = (short) ((flowBuffer.get() + 256) % 256);
// src as
flow.nums[FlowRecord.SRCAS] = (flowBuffer.getShort() + 65536) % 65536;
// dst as
flow.nums[FlowRecord.DSTAS] = (flowBuffer.getShort() + 65536) % 65536;
// src mask
flow.nums[FlowRecord.SRCMASK] = (short) ((flowBuffer.get() + 256) % 256);
// dst mask
flow.nums[FlowRecord.DSTMASK] = (short) ((flowBuffer.get() + 256) % 256);
// pad2
flow.nums[FlowRecord.PAD2] = (flowBuffer.getShort() + 65536) % 65536; // pad2
// flowList.add(flow);
format = " ";// 17
writer.write(rightA
没有合适的资源?快使用搜索试试~ 我知道了~
netflow数据采集
共9个文件
java:3个
class:3个
project:1个
4星 · 超过85%的资源 需积分: 50 77 下载量 171 浏览量
2011-05-27
10:23:23
上传
评论
收藏 11KB RAR 举报
温馨提示
用于采集路由器netflow数据,用以监控网络情况。
资源推荐
资源详情
资源评论
收起资源包目录
GetFlow.rar (9个子文件)
GetFlow
.project 383B
bin
GetFlow
FlowRecord.class 2KB
flowreceive.class 7KB
flowprovider.class 1KB
.settings
org.eclipse.jdt.core.prefs 629B
src
GetFlow
flowreceive.java 12KB
FlowRecord.java 2KB
flowprovider.java 943B
.classpath 301B
共 9 条
- 1
S309060156
- 粉丝: 0
- 资源: 1
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
前往页