package com.example.delayedqueuedemo.service;
import com.example.delayedqueuedemo.task.DelayedTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.PostConstruct;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
public class DelayedService {
private ThreadPoolExecutor takeTaskPool = new ThreadPoolExecutor(1, 1, 120, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
private ThreadPoolExecutor executeTaskPool = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 120, TimeUnit.SECONDS, new ArrayBlockingQueue<>(Runtime.getRuntime().availableProcessors()), new ThreadPoolExecutor.CallerRunsPolicy());
private DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
@Autowired
private PlatformTransactionManager transactionManager;
/**
* 生命周期回调
*/
@PostConstruct
public void postConstruct() {
// 开启线程池,避免死循环阻塞主程序
takeTaskPool.execute(new Runnable() {
@Override
public void run() {
// 如果队列中没有到期的元素,take会阻塞等待
while (true) {
try {
DelayedTask task = delayQueue.take();
// 开启线程池,避免业务执行阻塞队列获取消息,核心线程数根据电脑cpu核数而定
executeTaskPool.execute(new Runnable() {
@Override
public void run() {
// 开启事务
TransactionDefinition definition = new DefaultTransactionDefinition();
TransactionStatus transaction = transactionManager.getTransaction(definition);
try {
// 业务逻辑
log.info("获取延迟任务:task:{}", task.getData());
// 提交事务
transactionManager.commit(transaction);
} catch (TransactionException e) {
// 回滚事务
transactionManager.rollback(transaction);
}
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
}
@Transactional
public void delayed(String data, Long expireTime) {
// 1.初始化延迟队列
DelayedTask task = new DelayedTask(data, expireTime);
// 2.向队列中添加延迟执行的任务
delayQueue.put(task);
// 3.尝试执行任务
log.info("消息发送完成");
}
}
没有合适的资源?快使用搜索试试~ 我知道了~
JDK自带的延迟队列-DelayQueue
共7个文件
java:5个
yml:1个
xml:1个
0 下载量 65 浏览量
2024-01-12
22:58:46
上传
评论
收藏 7KB ZIP 举报
温馨提示
具体代码参考
资源推荐
资源详情
资源评论
收起资源包目录
delayed-queue-demo.zip (7个子文件)
delayed-queue-demo
pom.xml 2KB
src
main
resources
application.yml 438B
java
com
example
delayedqueuedemo
controller
DemoController.java 581B
demo
DelayedDemo.java 1KB
task
DelayedTask.java 1KB
service
DelayedService.java 4KB
DelayedQueueDemoApplication.java 353B
共 7 条
- 1
资源评论
可儿·四系桜
- 粉丝: 118
- 资源: 1
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功