/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
/**
*
* @author sefr
*/
import java.util.*;
import java.io.*;
import java.net.*;
public class DME_RA {
private static final String CFName = "src/Configuration/config.cfg";
private static Queue inQ = new Queue();
private static Queue reqQ = new Queue();
private static Queue repQ = new Queue();
private static Queue outQ = new Queue();
private enum State {
RELEASED, WANTED, HELD
};
private static volatile State state = State.RELEASED;
private static String pName;
private static volatile int LClockVal = 0, LClockTS = 0;
private static HashMap<String, SPair> g = new HashMap<String, SPair>();
public static void abort(String s, Exception e) {
System.err.println(s);
if (e != null) {
e.printStackTrace();
}
System.exit(-1);
}
public static void main(String[] args) {
HashMap<String, SPair> h = new HashMap<String, SPair>();
String pIPAddr = null;
int pPortNo = -1;
int pTot = -1;
if (args.length != 1) {
abort("Usage: java DME_RA <pName>", null);
}
pName = args[0];
try {
BufferedReader br = new BufferedReader(new FileReader(CFName));
for (String s = br.readLine(); s != null; s = br.readLine(), pTot++) {
s = s.trim();
if (s.equals("")) {
continue;
}
Scanner scn = new Scanner(s).useDelimiter(":");
String procName = scn.next();
String procIPAddr = scn.next();
int procPortNo = scn.nextInt();
if (pName.equals(procName)) {
if (pIPAddr != null || pPortNo != -1) {
abort("Duplicate data for " + pName + " in " + CFName + ".", null);
}
pIPAddr = procIPAddr;
pPortNo = procPortNo;
} else {
SPair sp = new SPair(procIPAddr, procPortNo);
if (h.containsValue(sp)) {
abort("Multiple processes with IP= " + procIPAddr + ", port= " + procPortNo + ".", null);
}
if (h.put(procName, sp) != null) {
abort("Multiple definitions for " + procName + " in " + CFName + ".", null);
}
g.put(procName, sp);
}
}
br.close();
} catch (IOException ex) {
abort("Error while reading " + CFName + ".", ex);
}
if (pIPAddr == null || pPortNo == -1) {
abort("No data found for " + pName + " in " + CFName + ".", null);
}
ServerSocket sskt = null;
try {
sskt = new ServerSocket(pPortNo);
sskt.setSoTimeout(300000);
} catch (IOException ex) {
abort("Can not open ServerSocket.", ex);
}
System.out.println(pName + " starting the initiation phase.");
new Thread(new Runnable() {
private static final int iterMax = 10;
public void run() {
for (Iterator giter = g.keySet().iterator(); giter.hasNext();) {
String procName = (String) giter.next();
SPair sp = (SPair) g.get(procName);
Socket initskt = null;
int iter = iterMax;
while (--iter >= 0) {
try {
initskt = new Socket(sp.getIPAddr(), sp.getPortNo());
break;
} catch (UnknownHostException ex) {
abort("Uknown host " + sp.getIPAddr() + ".", ex);
} catch (IOException ex) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
}
if (iter < 0) {
abort("Could not establish connection with (" + sp.getIPAddr() + ", " + sp.getPortNo() + ").", null);
}
try {
ObjectOutputStream oos = new ObjectOutputStream(initskt.getOutputStream());
oos.writeObject(new InitMessage(pName, -1, procName, sp.getIPAddr(), sp.getPortNo(), null));
oos.close();
initskt.close();
} catch (IOException ex) {
abort("Could not open or write to socket (" + sp.getIPAddr() + ", " + sp.getPortNo() + ").", ex);
}
}
}
}).start();
for (int i = 0; i < pTot; i++) {
try {
Socket reqskt = sskt.accept();
ObjectInputStream ois = new ObjectInputStream(reqskt.getInputStream());
Message msg = (Message) ois.readObject();
if (!(msg instanceof InitMessage)) {
abort("No-init message received from " + msg.getsrcPName() + ".", null);
}
if (!h.containsKey(msg.getsrcPName())) {
abort("INIT message received from " + msg.getsrcPName() + " or duplicate message.", null);
}
h.remove(msg.getsrcPName());
reqskt.close();
} catch (SocketTimeoutException ex) {
abort("ServerSocket request timeout.", ex);
} catch (IOException ex) {
abort("IO error while receiving initial requests.", ex);
} catch (Exception ex) {
}
}
System.out.println(pName + " completed the initiation phase.");
PrintWriter pw = null;
try {
pw = new PrintWriter(new FileWriter(pName + ".log"));
} catch (IOException ex) {
abort("Can not create log file.", ex);
}
new IncMsgHandler(pName, sskt, inQ).start();
new OutMsgHandler(pName, outQ).start();
String[] procNames = g.keySet().toArray(new String[]{});
new Thread(new Runnable() {
public void run() {
while (true) {
Message msg = inQ.dequeue();
if (msg instanceof RequestMessage) {
State st = state;
int this_LClockTS = LClockTS;
if (st == State.HELD ||
(st == State.WANTED && (this_LClockTS < msg.getlclk() || (this_LClockTS == msg.getlclk() && pName.compareTo(msg.getsrcPName()) < 0)))) {
reqQ.enqueue(msg);
} else {
SPair dstPair = g.get(msg.getsrcPName());
outQ.enqueue(new ReplyMessage(pName, LClockVal, msg.getsrcPName(), dstPair.getIPAddr(), dstPair.getPortNo(), null));
}
} else {
repQ.enqueue(msg);
}
}
}
}).start();
for (int iter = 0; true; iter++) {
try {
int workTime = (int) (1000 + 1000 * Math.random());
int lWorkTime = (int) (1 + 4 * Math.random());
LClockVal += lWorkTime;
pw.println("Working for " + workTime + "ms at iteration " + iter + " (advancing logical clock by " + lWorkTime + ").");
Thread.sleep(workTime);
} catch (Exception ex) {
ex.printStackTrace();
}
pw.println("Requesting entry to critical section at iteration " + iter + " (request timestamp is " + LClockTS + ").");
state = State.WANTED;
LClockTS = LClockVal;
评论0