package TFRC;
/*--------------------------------------------------------------
TFRCSocket is the application's way of connecting to a
TFRCServerSocket and have a two-way connection.
It has a datastream, controlstream, TFRCCongestion, input and
output buffers, and an ACKWindowList.
TFRCSocket have functions for the application to query about
status such as bandwidth, ports, addresses, and packet size.
There is a running thread that sends data at a sending rate
determined by calling TFRCCongestion.
Data is built by using data from the output buffer.
The input buffer has the data received by a sending TFRCSocket
through the datastream, which calls recvData.
Control stream calls recvControl with an ACKWindowList to
update TFRCCongestion.
-------------------------------------------------------------*/
import java.net.*;
import java.io.*;
import java.util.*;
public class TFRCSocket extends Thread
{
private static final short MAX_SIZE = 32000;
//variables used to implement the TFRC protocol
private ControlStream cstream = null;
private DataStream dstream = null;
private TFRCSession session = null;
private TFRCCongestion congestion = null;
private AckWindowList AckWinLis = null;
//variables used to receive and send
private byte[] inbuffer = new byte[MAX_SIZE];
private byte[] outbuffer = new byte[MAX_SIZE];
private short inlength = 0;
private short outlength = 0;
public TFRCSocket(Socket s, int dataPort, int ctrlPort) throws IOException
{
try
{
s.close();
cstream = new ControlStream( new DatagramSocket (ctrlPort),s.getInetAddress(),ctrlPort,this);
dstream = new DataStream(new DatagramSocket (dataPort),s.getInetAddress(),dataPort,this);
session = new TFRCSession();
AckWinLis = new AckWindowList(session.NextSequenceNumToSend);
congestion = new TFRCCongestion(950, session.NextSequenceNumToSend);
start();
}
catch(Exception e)
{
System.out.println("exception in TFRC SockEt Line 35 to 47 :"+e);
}
}
public TFRCSocket(String host, int port, int dataPort, int ctrlPort)
throws UnknownHostException, IOException
{
this(new Socket(host,port), dataPort, ctrlPort);
System.out.println("in tfrcsocket constructor in client side");
}
// called by ControlStream when control data arrives
public synchronized void recvControl(AckWindowList AWL)
{
LinkedList LL = AWL.LastAcks;
TFRCPacketHeader first = (TFRCPacketHeader)LL.get(0);
long rtt = (new Date()).getTime() - first.time;
// temporary printout must be erased later
int a;
// Checks if ACK is late, discard late ACKS
if ( (first.seq_no >=
session.AckSequenceNumWaitLocal)) {
/* for (int i = 0; i < 8; ++i){
a = ((TFRCPacketHeader)LL.get(i)).seq_no;
System.out.print(a + " ");
}
*/
// Update next local ACK to wa it for
session.AckSequenceNumWaitLocal = first.seq_no + 1;
//every time valid control stream receive, update congestion
congestion.update(rtt, LL);
}
//System.out.print(">> ");
//System.out.println(rtt + " " + congestion.getBandwidth());
}
// Insert TFRCPacketHeader into ACKWinList and send it
public synchronized void sendControl(TFRCPacketHeader TPH)
{
AckWinLis.insert(TPH);
cstream.send(AckWinLis);
}
// Called by DataStream when data arrives
public void recvData(TFRCPacket TP)
{
// Checks is packet is out of sequence
if (TP.header.seq_no >= session.AckSequenceNumWaitRemote) {
sendControl(TP.header);
// Update Ack from remote waiting for
session.AckSequenceNumWaitRemote = TP.header.seq_no + 1;
// Call function to write data to buffer
writeInput(TP.data, TP.data.length);
}
}
// Sends a byte array of data through a TFRCPacket by calling DataStream
// send
public void sendData(byte data[])
{
// Create TFRCPacket with data in it along with current time
TFRCPacket TP = new TFRCPacket(data.length);
TP.data = data;
TFRCPacketHeader TPH = null;
TP.header = new TFRCPacketHeader(session.NextSequenceNumToSend++,
(new Date().getTime()));
// send TFRCPacket to Datastream for sending
dstream.write(TP);
}
// Get remote address
public InetAddress getRemoteAddress()
{
return (dstream.getRemoteAddress());
}
// Get Local Address
public InetAddress getLocalAddress()
{
return (dstream.getLocalAddress());
}
// Get remote control port
public int getRemotePortControl()
{
return (cstream.getRemotePort());
}
// get local control port
public int getLocalPortControl()
{
return (cstream.getLocalPort());
}
// get remote data port
public int getRemotePortData()
{
return (dstream.getRemotePort());
}
// get local data port
public int getLocalPortData()
{
return (dstream.getLocalPort());
}
// gets sending rate Bandwidth
public int getBandwidth()
{
return (congestion.getBandwidth());
}
// gets packet size set to
public int packet_size()
{
return (congestion.getPacketSize());
}
// gets current round trip time
public float round_trip_time()
{
return(congestion.getRTT());
}
/* TFRCOutputStream calls this to write array bytes
to TFRCSocket output buffer to send */
public synchronized int write(byte[] buff) {
/* the TFRCOuputStream waits till there is room
in the output buffer before it writes */
while (buff.length > MAX_SIZE - outlength) {
try {
wait(10);
}
catch (InterruptedException e) {
System.err.println("TFRCSocket - write: InterruptedException");
}
}
// Copies bytes to output buffer
System.arraycopy(buff, 0, outbuffer, outlength, buff.length);
outlength += buff.length;
return buff.length;
}
// recvData writes the data into the input buffer
private synchronized void writeInput(byte[] buff, int len)
{
// wait till there is room in the input buffer
while (inlength + len > MAX_SIZE) {
try {
wait(10);
}
catch (InterruptedException e) {
}
}
// write data into input buffer
System.arraycopy(buff, 0, inbuffer, inlength, buff.length);
inlength += buff.length;
}
/* TFRCInputStream calls this to read data from
the TFRCSocket input buffer */
public synchronized int read(byte[] buff, int len) {
// Waits till the buffer if at least of size len
while (len >= inlength) {
try {
wait(10);
}
catch (InterruptedException e) {
System.err.println("TFRCSocket - write: InterruptedException");
}
}
// Read the data and put it into buff
System.arraycopy(inbuffer, 0, buff, 0, len);
inlength -= len;
System.arraycopy(inbuffer, len, inbuffer, 0, inlength);
return len;
}
//TFRCSocket sends the data
public void run() {
// Variables for time calculations
long last_time_sent = (new Date().getTime());
long current_time;
int rate;
long wait;
int pack_size = 950;
while (true){
// Checks for a sending rate and sends at that time
try {
rate = congestion.getRate();
current_time = new Date().getTime();
// Enforces the sending rate by waiting a calculated time
// Waiting time is equal to the rate minus time already
// spent doing other calculations
wait = rate - (current_time - last_time_sent);
// Wait for positive amount of time
// Update the last time packet was sent
if (wait > 0) {
Thread.sleep(wait);
last_time_sent = current_time + wait;
}
else last_time_sent = current_time;
}
catch (InterruptedException e) { e.printStackTrace(); }
// After time waited is spent, check if there is data to send
if (outlength >0) {
// Check if data is enough to fill pack_size
// If so send a packet of that size or else
// send what you have
if (outlength >= pack_size) {
// create send data of pack_size
byte[] senddata = new byte[pack_size];
System.arraycopy(outbuffer, 0, senddata, 0,
pack_size);
outlength -= pack_size;
System.arraycopy(outbuffer, pack_size, outbuffer, 0,
outlength);
// Send data to sendData which sends it to DataStream
sendData(senddata);
}
else {
// create send data of buffer size
byte[] senddata = new byte[outlength];
System.arraycopy(outbuffer, 0, senddata, 0,
outlength);
outlength = 0;
// Send data to sendData which sends to DataStream
sendData(senddata);
}
}
}
}
// get a TRRCInputStream
public TFRCInputStream getInputStream() {
return new TFRCInputStream(this);
}
// get a TFRCOutputStream
public TFRCOutputStream getOutputStream() {
return new TFRCOutputStream(this);
}
}
TFRC.zip_zip
版权申诉
178 浏览量
2022-09-24
06:23:43
上传
评论
收藏 47KB ZIP 举报
刘良运
- 粉丝: 70
- 资源: 1万+