package multiThreadDownload;
import java.io.*;
import java.net.*;
import org.apache.commons.httpclient.*;
import org.apache.commons.httpclient.methods.*;
import org.apache.commons.logging.*;
public class HttpTask extends Task {
private static final HttpClient httpClient;
private static final Log logger = LogFactory.getLog(HttpTask.class);
static {
MultiThreadedHttpConnectionManager connectionManager =
new MultiThreadedHttpConnectionManager();
connectionManager.getParams().setDefaultMaxConnectionsPerHost(20);
httpClient = new HttpClient(connectionManager);
}
private URL url;
private File destination;
private int threadCounts;
private long contentLength;
private volatile boolean stop = false;
private int workerCompleted = 0;
private HttpTaskWorker[] workers;
private RandomAccessFile store;
private DownloadListener listener;
public HttpTask(String url, String dest, int threadCounts)
{
if (url == null || dest == null) {
throw new NullPointerException();
}
if (threadCounts <= 0) {
throw new IllegalArgumentException();
}
try {
this.url = new URL(url);
if (!this.url.getProtocol().equals("http")) {
throw new IllegalArgumentException("Unsupported URL Scheme");
}
this.destination = new File(dest);
if (!(destination.exists() && destination.isDirectory()
&& destination.canWrite())) {
throw new IllegalArgumentException("Illegal destination folder");
}
this.threadCounts = threadCounts;
workers = new HttpTaskWorker[threadCounts];
}
catch (MalformedURLException ex) {
throw new IllegalArgumentException("Illegal Resource URL");
}
}
protected long getResourceContentLen() throws HttpException, IOException
{
HeadMethod head = new HeadMethod(url.toString());
head.setFollowRedirects(false);
int statusCode = httpClient.executeMethod(head);
if (statusCode == HttpStatus.SC_MOVED_TEMPORARILY){
String recentUrl = head.getResponseHeader("Location").getValue();
logger.info("URL redirection: " + recentUrl);
this.url = new URL(url.getProtocol(),
url.getHost(),
url.getPort(),
recentUrl);
head = new HeadMethod(url.toString());
statusCode = httpClient.executeMethod(head);
}
if (statusCode == HttpStatus.SC_OK) {
destination = new File(destination,
new File(url.getPath()).getName());
store = new RandomAccessFile(destination, "rw");
logger.info("File " + destination.getName() + " Created.");
listener.onProgress("File " + destination.getName() + " Created.");
return head.getResponseContentLength();
}
else {
throw new UnsupportedOperationException();
}
}
public void start() {
String msg = "Starting to download URL: " + url.toString();
logger.info(msg);
listener.onProgress(msg);
try {
contentLength = getResourceContentLen();
msg = "Resource Content Length is " + contentLength;
logger.info(msg);
listener.onProgress(msg);
listener.onGetContentLength(destination.getName(), contentLength);
long chunk = contentLength / threadCounts;
long startPos = 0;
long endPos;
for (int i = 0; i < threadCounts; i++)
{
endPos = startPos + chunk;
if (i == threadCounts - 1) {
endPos = contentLength;
}
workers[i] = new HttpTaskWorker(
"Worker Thread #" + (i + 1),
startPos, endPos - 1);
workers[i].start();
startPos = endPos;
}
}
catch(Exception ex) {
logger.fatal(ex);
ex.printStackTrace();
}
}
public void stop() {
stop = true;
}
public void resume(){
stop = false;
for (int i = 0; i < workers.length; i++) {
if (!workers[i].isDone()) {
workers[i] = new HttpTaskWorker(
"Worker Thread #" + (i+1),
workers[i].startPos,
workers[i].endPos, workers[i].complete);
workers[i].start();
}
}
}
public synchronized void onWorkerComplete() {
workerCompleted++;
if (workerCompleted == threadCounts) {
String msg = "!!!!!!Download Completed.!!!!!!";
logger.info(msg);
listener.onProgress(msg);
try {
store.close();
}
catch(IOException ioe) {
logger.error(ioe);
}
}
}
protected synchronized void saveChunk(byte[] buf, int offset, int len, long absPos)
throws IOException {
store.seek(absPos);
store.write(buf, offset, len);
}
public void addDownloadListener(DownloadListener listener) {
this.listener = listener;
}
/**
* Http Resource download worker thread.
*/
protected class HttpTaskWorker extends Thread {
/**
* bytes range start position.
*/
private long startPos;
/**
* bytes range end position.
*/
private long endPos;
/**
* bytes number completed.
*/
private long complete;
/**
* create Http resource download worker thread with specified bytes range
* and already completed bytes.
* @param name worker thread name
* @param startPos bytes range start position.
* @param endPos bytes range end position.
* @param complete bytes number completed.
*/
public HttpTaskWorker(String name, long startPos, long endPos, long complete) {
super(name);
this.startPos = startPos;
this.endPos = endPos;
this.complete = complete;
String msg = getName() + " Worker Created: startPos = "
+ startPos
+ " endPos = "
+ endPos
+ " Complete = "
+ complete;
logger.info(msg);
listener.onProgress(msg);
}
/**
* create Http resource download worker thread with specified bytes range
*
* @param name
* @param startPos
* @param endPos
*/
public HttpTaskWorker(String name, long startPos, long endPos) {
this(name, startPos, endPos, 0);
}
private boolean isDone() {
return endPos - startPos == complete;
}
public void run() {
GetMethod get = createGetMethod();
try {
InputStream in;
byte buf[] = new byte[4096];
int bytesRead;
int bytesNeedToSave;
int chunkSize = (int)(endPos - startPos + 1);
while (!stop && complete < chunkSize) {
httpClient.executeMethod(get);
in = get.getResponseBodyAsStream();
while ((bytesRead = in.read(buf)) != -1 && complete < chunkSize) {
String msg = getName() + "Bytes read " + bytesRead + " bytes";
logger.debug(msg);
//listener.onProgress(msg);
bytesNeedToSave = (int)(bytesRead + complete > chunkSize ? chunkSize - complete : bytesRead);
if (bytesNeedToSave > 0)
listener.onReadChunk(this.getName(), bytesNeedToSave);
saveChunk(buf, 0, bytesNeedToSave, startPos + complete);
complete += bytesNeedToSave;
}
String msg = getName() + " Bytes Completed: " + complete + " bytes";
logger.info(msg);
listener.onProgress(msg);
}
}
catch(HttpException he) {
logger.error(he);
}
catch(IOException ioe) {
logger.error(ioe);
listener.onProgress(getName() + " " + ioe.getMessage());
}
finally {
get.releaseConnection();
String msg = getName() + " " + "Http Connection released.";
logger.info(msg);
listener.onProgress(msg);
}
String msg= getName() + " Download complete. :)";
logger.info(msg);
listener.onProgress(msg);
onWorkerComplete();
}
protected GetMethod createGetMethod() {
GetMethod get = new GetMethod(url.toString());
get.addRequestHeader("Accept", "*/*");
get.addRequestHeader("User-Agent",
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.0; .NET CLR 1.1.4322)");
get.addRequestHeader("Pragma", "no-cache");
get.addRequestHeader("Cache-Control","no-cache");
get.addRequestHeader("Connection", "close");
if (startPos + complete > 0) {
String bytesRange = String.valueOf(startPos + comple
Java多线程与线程安全实践-基于Http协议的断点续传.rar
需积分: 0 151 浏览量
更新于2023-10-15
收藏 521KB RAR 举报
Java多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Http协议的断点续传.rarJava多线程与线程安全实践-基于Htt