package com.hmdp.service.impl;
import cn.hutool.core.bean.BeanUtil;
import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.SimpleRedisLock;
import com.hmdp.utils.UserHolder;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
/**
* <p>
* 服务实现类
* </p>
*
* @author
* @since 2021-12-22
*/
@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Autowired
private ISeckillVoucherService seckillVoucherService;
@Autowired
private RedisIdWorker redisIdWorker;
@Autowired
private RedissonClient redissonClient;
// private BlockingDeque<VoucherOrder> orderTasks = (BlockingDeque<VoucherOrder>) new ArrayBlockingQueue<VoucherOrder>(1024 * 1024); //阻塞队列
private static ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
// 只有redis版本>5时才支持redis流 redis-server --version Redis server v=3.0.504(所以暂时将此方法注释掉,避免springboot启动报错)
/*
@PostConstruct
private void init(){
SECKILL_ORDER_EXECUTOR.submit(()->{
while(true){
try{
// 1.从消息队列中拿消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order > //g1组的c1消费者
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
StreamOffset.create("stream.orders", ReadOffset.lastConsumed())
);
if(list == null || list.isEmpty()){
// 没有消息,继续下一次循环
continue;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue(); // 键值对的值
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 获取消息成功,下单
handleVouncherOrder(voucherOrder);
//ack确认
stringRedisTemplate.opsForStream().acknowledge("streams.order","g1",record.getId());
}catch (Exception e){
log.error("处理订单异常",e);
// 从pendinglist 里取消息,重新执行
handlePendingList();
}
}
});
}
*/
// 阻塞队列方案的线程执行方法
// private class VoucherOrderHandle implements Runnable{
// @Override
// public void run() {
// while(true){
// try {
// // 1. 获取队列中的订单信息
// VoucherOrder voucherOrder = orderTasks.take();
// // 2. 创建订单
// handleVouncherOrder(voucherOrder);
// } catch (Exception e) {
// e.printStackTrace();
// log.error("处理订单异常",e);
// }
//
// }
// }
// }
private void handlePendingList(){
while(true){
try{
// 1.从pending-list中拿消息 XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS streams.order 0 //g1组的c1消费者
List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
Consumer.from("g1", "c1"),
StreamReadOptions.empty().count(1),
StreamOffset.create("stream.orders", ReadOffset.from("0"))
);
if(list == null || list.isEmpty()){
// 没有读到,说明pending-list里没有异常消息 结束循环
break;
}
// 解析消息
MapRecord<String, Object, Object> record = list.get(0);
Map<Object, Object> value = record.getValue(); // 键值对的值
VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), true);
// 获取消息成功,下单
handleVouncherOrder(voucherOrder);
//ack确认
stringRedisTemplate.opsForStream().acknowledge("streams.order","g1",record.getId());
}catch (Exception e){
log.error("处理pending-list订单异常",e);
}
}
}
private void handleVouncherOrder(VoucherOrder voucherOrder) throws InterruptedException {
// 1. 获取用户(因为是从线程池中新开的线程,不是主线程,从UserHolder取不到用户信息,所以只能从voucherOrder中取)
Long userId = voucherOrder.getUserId();
// 2. 创建锁对象 (其实不加锁也可以,因为redis做了并发的避免,但此处加只是做个兜底(虽然这种可能性几乎没有))
RLock lock = redissonClient.getLock("lock:order:" + userId);
boolean isLock = lock.tryLock();
if(!isLock){
log.error("不允许重复下单");
return;
}
//TODO 写入到数据库的一些操作(即 createVoucherOrder 但要注意的是如果在此处获取代理对象会失效,因为获取代理对象的底层用的ThreadLocal,而我们此时是在线程池里的新线程执行的:解决方法:1.将代理对象传参进来 2.将代理对象变成类的成员变量)
}
//秒杀lua脚本
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("./lua/seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}
//基于redis完成秒杀资格判断(秒杀优化)+Stream消息队列
@Override
public Result seckKillVoucher(Long voucherId) {
// 1. 查寻优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
if(voucher == null){
return Result.fail("优惠券信息错误!");
}
// 2. 判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
// 3. 判断秒杀是否结束
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
Long userId = UserHolder.getUser().getId();
long orderId = redisIdWorker.nexId("order");
// 1.执行lua脚本
Long result = strin
baidu_16992441
- 粉丝: 312
- 资源: 1041
最新资源
- chromedriver-linux64_117.0.5902.2.zip
- chromedriver-linux64_117.0.5902.0.zip
- chromedriver-linux64_117.0.5900.2.zip
- 某赛闭环步进驱动方案42.57.60.86闭环两相电机驱动器 混合伺服驱动器 原理图+PCB+代码 包括完整位置环,速度环,电流环,位置前馈等核心算法 代码无错误无警告
- chromedriver-linux64_117.0.5905.0.zip
- chromedriver-linux64_117.0.5903.2.zip
- chromedriver-linux64_117.0.5904.0.zip
- chromedriver-linux64_117.0.5910.0.zip
- chromedriver-linux64_117.0.5907.0.zip
- chromedriver-linux64_117.0.5906.0.zip
- chromedriver-linux64_117.0.5911.0.zip
- chromedriver-linux64_117.0.5915.0.zip
- chromedriver-linux64_117.0.5912.0.zip
- chromedriver-linux64_117.0.5916.0.zip
- chromedriver-linux64_117.0.5917.0.zip
- chromedriver-linux64_117.0.5918.0.zip
资源上传下载、课程学习等过程中有任何疑问或建议,欢迎提出宝贵意见哦~我们会及时处理!
点击此处反馈