package multithreading;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import multithreading.customized.FutureTaskWithCallableAvailed;
import multithreading.customized.TaskInfo;
import org.apache.log4j.Logger;
public class MyThreadPoolExecutor {
private static final Logger log = Logger.getLogger("threadMonitor");
private static ConcurrentMap<String, MyThreadPoolExecutor> poolCache = new ConcurrentHashMap<String, MyThreadPoolExecutor>();
private static MyThreadPoolExecutor myThreadPool = null;
private int corePoolSize = 3; // 线程池维护线程的最小数量
private int maximumPoolSize = 5; // 线程池维护线程的最大数量
private long keepAliveTime = 60; // 线程池维护线程所允许的空闲时间
private final TimeUnit unit = TimeUnit.SECONDS; // 线程池维护线程所允许的空闲时间的单位
private BlockingQueue<Runnable> workQueue = null; // 线程池所使用的缓冲队列
private final RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); // 线程池对拒绝任务的处理策略
private ThreadPoolExecutor threadPool = null;
private MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, int timeoutQueueSize,
String namePrefix) {
this.corePoolSize = corePoolSize > 0 ? corePoolSize : this.corePoolSize;
this.maximumPoolSize = maximumPoolSize > 0 ? maximumPoolSize : this.maximumPoolSize;
this.keepAliveTime = keepAliveTime;
workQueue = new ArrayBlockingQueue<Runnable>(timeoutQueueSize);
threadPool = new MonitoredThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new MoonmmThreadFactory(namePrefix), handler);
}
/**
* 实例化线程池
*/
public static MyThreadPoolExecutor getInstance(int corePoolSize, int maximumPoolSize, long keepAliveTime,
int timeoutQueueSize, String namePrefix) {
if (poolCache.get(namePrefix) == null) {
myThreadPool = new MyThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
timeoutQueueSize, namePrefix);
poolCache.put(namePrefix, myThreadPool);
}
return myThreadPool;
}
/**
* 通过线程池执行Runable
*/
public void execute(FutureTask<?> task) {
threadPool.execute(task);
}
/**
* 关闭所有线程
*/
public void shutdown() {
threadPool.shutdown();
}
/**
* 返回核心线程数
*
* @return
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* 返回最大线程数
*
* @return
*/
public int getMaximumPoolSize() {
return maximumPoolSize;
}
/**
* 返回线程的最大空闲时间
*
* @return
*/
public long getKeepAliveTime() {
return keepAliveTime;
}
/**
* 线程工厂类
*/
static class MoonmmThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
String namePrefix = "";
public MoonmmThreadFactory(String namePrefix) {
this.namePrefix = namePrefix;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
static class MonitoredThreadPoolExecutor extends ThreadPoolExecutor {
public MonitoredThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
private final ThreadLocal<Long> startTime = new ThreadLocal<Long>();
private final AtomicLong numTasks = new AtomicLong();
private final AtomicLong totalTime = new AtomicLong();
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
startTime.set(System.currentTimeMillis());
}
protected void afterExecute(Runnable r, Throwable t) {
try {
long endTime = System.currentTimeMillis();
long taskTime = endTime - startTime.get();
numTasks.incrementAndGet();
totalTime.addAndGet(taskTime);
if (r instanceof FutureTaskWithCallableAvailed) {
Callable c = ((FutureTaskWithCallableAvailed)r).getTask();
if (c instanceof TaskInfo) {
String taskInfo = ((TaskInfo)c).desc();
log.info(String.format("Task %s: time=%dms", taskInfo, taskTime));
}
}
} finally {
super.afterExecute(r, t);
}
}
public void logThreadPoolMonitorData() {
log.info("total tasks completed: " + numTasks.get());
log.info("totalTime: " + totalTime.get());
}
public Map<String, Object> obtainThreadPoolMonitorData() {
Map<String, Object> monitorData = new HashMap<String, Object>();
monitorData.put("total_task", numTasks.get());
monitorData.put("total_time", totalTime.get());
return monitorData;
}
}
public static <T> List<T> getMultiTaskResult(List<FutureTask<List<T>>> futureTaskList) {
List<T> results = new ArrayList<T>();
for (FutureTask<List<T>> futureTask : futureTaskList) {
try {
// 每个线程设置固定的执行时间,过期不候
List<T> partResultList = futureTask.get(ThreadConstants.TASK_WAIT_TIME, TimeUnit.SECONDS);
if (partResultList != null && partResultList.size() > 0) {
for (T file : partResultList) {
results.add(file);
}
}
} catch (TimeoutException e) {
log.error(futureTask.getClass() + " Multi thread timeout error: " + Thread.currentThread().getName(),
e);
} catch (InterruptedException e) {
log.error(futureTask.getClass() + " Multi thread interrupted error: "
+ Thread.currentThread().getName(), e);
没有合适的资源?快使用搜索试试~ 我知道了~
资源推荐
资源详情
资源评论
收起资源包目录
javathreading.zip (38个子文件)
javathreading
thread.log 25.99MB
pom.xml 3KB
target
test-classes
classes
multithreading
MyThreadPoolExecutor$MonitoredThreadPoolExecutor.class 4KB
tasks
FileSearchTask.class 2KB
ThreadConstants.class 362B
customized
TaskInfo.class 161B
FutureTaskWithCallableAvailed.class 965B
MyThreadPoolExecutor.class 5KB
service
FileSearchService.class 6KB
MyThreadPoolExecutor$MoonmmThreadFactory.class 1KB
callable
FileExtractCallable.class 2KB
FileMatchCallable.class 2KB
log4j.properties 742B
javathreading-spring-base.xml 2KB
utils
StringUtil.class 267B
timecost
TimeMeasurerIF.class 143B
TimeMeasurer.class 650B
CommandIF.class 132B
appcontext
ApplicationContextUtil.class 1021B
.settings
org.maven.ide.eclipse.prefs 218B
org.eclipse.jdt.core.prefs 243B
src
test
resources
java
main
resources
log4j.properties 742B
javathreading-spring-base.xml 2KB
java
multithreading
tasks
FileSearchTask.java 2KB
customized
TaskInfo.java 161B
FutureTaskWithCallableAvailed.java 385B
ThreadConstants.java 116B
MyThreadPoolExecutor.java 8KB
service
FileSearchService.java 7KB
callable
FileMatchCallable.java 996B
FileExtractCallable.java 949B
utils
StringUtil.java 56B
timecost
TimeMeasurer.java 353B
TimeMeasurerIF.java 83B
CommandIF.java 84B
appcontext
ApplicationContextUtil.java 755B
.project 567B
.classpath 709B
共 38 条
- 1
资源评论
- scmhw2013-10-10感谢分享,目测可以运行
影儿
- 粉丝: 250
- 资源: 16
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
最新资源
- user+name.csv
- 安卓学习教材经验Android进阶学习资料安卓面试资料等文档资料合集(22个).zip
- 王小晨申论高分课.zip
- 基于matlab实现说话人语音识别源码+项目说明+PPT+报告(高分项目).zip
- VSc++编程助手1.0 - 1
- 基于matlab的说话人语音识别源码+PPT+报告(优质项目).zip
- 语音识别基于matlab说话人识别系统源码+报告PPT(高分优质项目).zip
- 毕业设计: 校园失物招领小程序的设计与实现论文(源码 + 数据库 + 说明文档)
- 美易ME-DUI自绘模块3.3-易语言模块
- 毕业设计:小程序消防知识每天学(源码 + 数据库 + 说明文档)
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功