package com.example.threadpool.core;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
public class ThreadPoolManager {
private static ThreadPoolManager instance = null;
private final List<TaskRunnable> taskQueue = Collections.synchronizedList(new LinkedList<TaskRunnable>());//任务队列
private final WorkThread[] workThreads ; //工作线程(真正执行任务的线程)
private static final int DEFAULT_WORKER_NUM = 5 ; //工作线程数量(默认工作线程数量是5)
private ThreadPoolManager(){
this(DEFAULT_WORKER_NUM);
}
private ThreadPoolManager(int num){
workThreads = new WorkThread[num];
for(int i = 0;i < num; i++){
workThreads[i] = new WorkThread(i);
}
}
public static synchronized ThreadPoolManager getInstance(){
if(null == instance){
instance = new ThreadPoolManager();
}
return instance;
}
public void addTask(TaskRunnable task){
//对任务队列的操作要上锁
synchronized (taskQueue) {
if(null != task){
taskQueue.add(task);
taskQueue.notifyAll();
}
}
}
public void BatchAddTask(TaskRunnable[] tasks){
//对任务队列的修改操作要上锁
synchronized (taskQueue) {
for(TaskRunnable e:tasks){
if(null != e){
taskQueue.add(e);
taskQueue.notifyAll();
}
}
}
}
public void destory(){
System.out.println("pool begins to destory ...");
for(int i = 0;i < workThreads.length;i++){
WorkThread current = workThreads[i];
if(null != current){
try {
current.stopThread();
} catch (Exception e) {
e.printStackTrace();
}
}
workThreads[i] = null;
}
//对任务队列的操作要上锁
synchronized (taskQueue) {
taskQueue.clear();
}
System.out.println("pool ends to destory ...");
}
private class WorkThread extends Thread{
private boolean isRuning = true;
private boolean isWaiting = false;
public WorkThread(int taskId){
this.start();
}
public boolean isWaiting(){
return isWaiting;
}
// 如果任务进行中时,不能立刻终止线程,需要等待任务完成之后检测到isRuning为false的时候,退出run()方法
public void stopThread(){
isRuning = false;
}
@Override
public void run() {
while(isRuning){
TaskRunnable temp = null;
//对任务队列的操作要上锁
synchronized (taskQueue) {
//任务队列为空,等待新的任务加入
while(isRuning && taskQueue.isEmpty()){
try {
taskQueue.wait(20);
} catch (InterruptedException e) {
System.out.println("InterruptedException occre...");
e.printStackTrace();
}
}
if(isRuning){
temp = taskQueue.remove(0);
}
}
//当等待新任务加入时候,终止线程(调用stopThread函数)造成 temp = null
if(temp != null){
isWaiting = false;
temp.run();
isWaiting = true;
}
}
}
}
}
- 1
- 2
- 3
- 4
- 5
前往页