12、Redis项目实战:秒杀优化

**作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
*上期文章:Redis:原理速成+项目实战——Redis实战8(基于Redis的分布式锁及优化)
*订阅专栏:Redis:原理速成+项目实战
希望文章对你们有所帮助

简单回顾一下之前实现秒杀的思路,其实无非就是2点:
1、库存够不够,该用户有没有买过
2、操作数据库(扣库存、创建订单)

之前的业务由于涉及了大量数据库的操作,所以性能并不是太好。

秒杀优化

  • 异步秒杀思路
  • Redis实现秒杀资格的判断
    • 分析
  • 实现
  • 基于阻塞队列实现秒杀异步下单
  • 总结及发现问题

异步秒杀思路

之前的业务,可以用下图来表示:
*
可以发现,Tomcat中顺序执行的操作里面有4个需要对数据库进行查询或修改操作,而MySQL本身的并发能力就是很差的,时间是所有时间之和,这种解决方式并不好。
因此,我们需要将操作数据库的步骤分给多个线程来做,而库存量判断、是否具有购买资格的判断,我们也可以将其交给Redis,从而提高效率。
既然是将步骤分给多个线程来做,我们要开辟线程,并且要使得开辟的线程能够正确执行业务:

*
整个业务的流程被分开了,所需要的时间不再是所有的过程时间之和(完成下单的操作是异步执行的),而且整个业务基于Redis,将会大大提升业务的性能。
但这需要考虑2个难点问题:如何在Redis里面完成判断库存以及一人一单的校验?如何基于阻塞队列实现秒杀异步下单?

Redis实现秒杀资格的判断

分析

要在Redis里面判断库存量以及一人一单,我们肯定需要将优惠券的库存信息以及相关订单信息缓存到Redis中去,我们需要选取合适的数据结构来保存这两个信息。
库存很容易,因为只包含了库存量这个信息,直接使用String类型进行存储即可。
但要实现一人一单,我们要判断这个优惠券被哪些用户购买过。所以这个数据结构需要能够保存多个值,而又因为一人一单,所以我们要保存的用户的id显然是不能重复的。所以,set是很适合的。
因此业务流程可以很容易知道:
*
可以发现业务的步骤还是有很多步的,因此我们要保证步骤的原子性,因此上述的内容应当用Lua脚本来写。

实现

需求:

1、新增秒杀优惠券的同时,将优惠券的信息保存到Redis中
2、基于Lua脚本,判断秒杀库存、一人一单、决定用户是否抢购成功
3、若成功,将优惠券id与用户id进行封装,再存入阻塞队列
4、开启线程任务,不断从阻塞队列中获取信息,实现异步下单

1、 在VoucherServiceImpl类中新增优惠券,同时保存到Redis中:;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    @Transactional
    public void addSeckillVoucher(Voucher voucher) {
   
     
        // 保存优惠券
        save(voucher);
        // 保存秒杀信息
        SeckillVoucher seckillVoucher = new SeckillVoucher();
        seckillVoucher.setVoucherId(voucher.getId());
        seckillVoucher.setStock(voucher.getStock());
        seckillVoucher.setBeginTime(voucher.getBeginTime());
        seckillVoucher.setEndTime(voucher.getEndTime());
        seckillVoucherService.save(seckillVoucher);
        //保存秒杀库存到Redis中, SECKILL_STOCK_KEY = "seckill:stock:"
        stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());
    }

打开postman,进行测试:
*
添加成功:
*
2、 在resources下创建seckill.lua,决定用户能否抢券成功:;

-- 1 参数列表
-- 1.1 优惠券id
local voucherId = ARGV[1]
-- 1.2 用户id
local userId = ARGV[2]

-- 2 数据key
-- 2.1 库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2 订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3 脚本业务
-- 3.1 判断库存是够充足
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2 库存不足,返回1
    return 1
end
--3.2 判断用户是否下单,即判断用户id是不是这个set集合的成员
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.2 存在,说明重复下单
    return 2
end
-- 3.4 扣库存
redis.call('incrby', stockKey, -1)
-- 3.5 下单(保存用户)
redis.call('sadd', orderKey, userId)
return 0

3、 在VoucherOrderServiceImpl中修改seckillVoucher,修改业务,先调用Lua脚本执行,返回0即可将下单信息存入阻塞队列(存入阻塞队列的代码编写较为麻烦,暂时放着):;

    //秒杀优化,调用Lua的代码
    @Override
    public Result seckillVoucher(Long voucherId) {
   
     
        //获取用户
        Long userId = UserHolder.getUser().getId();
        //执行Lua脚本,这里使用了静态代码块
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(), //这里我们没有key传,只需要传送一个空集合即可
                voucherId.toString(), userId.toString()//传其他类型的参数
        );
        //判断结果是否为0
        int r = result.intValue();
        if(r != 0){
   
     
            //不为0,没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        //为0,有购买资格,将下单信息保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");
        //TODO 保存阻塞队列,这边先空着,下一部分单独编写

        //返回订单id
        return Result.ok(orderId);
    }

打开postman进行测试,连续下单两次,证明下单资格判断的可行性:
*
*
*
当然数据库中的数据还是没有改变的, 异步下单还没做。

如果要进行压力测试的话,大家要自己构建出几百个用户,然后这些用户分别占一个线程进行下单,用jmeter进行测试,完成这些测试还是很繁琐的,但是因为这些操作都是基于Redis的,容易知道吞吐率肯定是变大了不少的。

基于阻塞队列实现秒杀异步下单

对于用户资格的判断已经完成了,假设用户具有秒杀的资格,这时候我们只需要独立开辟一个线程,去异步实现下单。因为用户只要具有这个资格,直接返回订单的id从而让用户去执行付款即可,而将订单的信息存入数据库并不需要严格的时效性,因此业务可行:
*
接下来,开启线程任务,不断从阻塞队列中获取信息,实现下单,这是整体性能变高的关键。
代码的实现,我们之所以选择阻塞队列,是因为阻塞队列具有一个重要的性质:当线程尝试获取阻塞队列的元素时,若队列中没有元素,该线程就会被阻塞,直到队列中获取到这个元素了。
另外代码中因为涉及了开辟独立线程去实现异步下单,因此我们需要准备好线程池与线程任务。
最终业务的全部代码实现如下:

@Slf4j
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {
   
     

    //注入秒杀优惠券的service
    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;
    
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    public static final DefaultRedisScript<Long> SECKILL_SCRIPT;
    static {
   
     
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));//设置脚本位置
        SECKILL_SCRIPT.setResultType(Long.class);//配置返回值
    }

    //阻塞队列:当线程尝试从这个队列中获取元素,如果没有元素,那么该线程就会被阻塞,直到队列中获取到元素
    private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
    //线程池
    private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();

    //线程任务,用户随时都能抢单,所以应该要在这个类被初始化的时候马上开始执行
    @PostConstruct  //该注解表示在当前类初始化完毕以后立即执行
    private void init(){
   
     
        SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
    }

    private class VoucherOrderHandler implements Runnable{
   
     

        @Override
        public void run() {
   
     
            //不断从队列中取订单信息
            while (true){
   
     
                try {
   
     
                    VoucherOrder voucherOrder = orderTasks.take();
                    //创建订单,流程里面无须再加锁,加个锁就是做个兜底
                    handleVoucherOrder(voucherOrder);
                } catch (Exception e) {
   
     
                    //e.printStackTrace();
                    log.error("创建订单异常", e);
                }
            }
        }
    }

    IVoucherOrderService proxy;

    //秒杀优化,调用Lua的代码
    @Override
    public Result seckillVoucher(Long voucherId) {
   
     
        //获取用户
        Long userId = UserHolder.getUser().getId();
        //执行Lua脚本,这里使用了静态代码块
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(), //这里我们没有key传,只需要传送一个空集合即可
                voucherId.toString(), userId.toString()//传其他类型的参数
        );
        //判断结果是否为0
        int r = result.intValue();
        if(r != 0){
   
     
            //不为0,没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        //为0,有购买资格,将下单信息保存到阻塞队列
        long orderId = redisIdWorker.nextId("order");
        //TODO 保存阻塞队列
        //先将用户id与订单id封装起来
        VoucherOrder voucherOrder = new VoucherOrder();
        voucherOrder.setId(orderId);
        voucherOrder.setUserId(userId);
        voucherOrder.setVoucherId(voucherId);
        //放入阻塞队列
        orderTasks.add(voucherOrder);
        //获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();

        //返回订单id
        return Result.ok(orderId);
    }

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
   
     
        //获取用户,用户id不能再从UserHolder中取了,因为现在是从线程池获取的全新线程,不是主线程
        Long userId = voucherOrder.getUserId();
        //创建锁对象
        RLock lock = redissonClient.getLock("lock:order:" + userId);
        //获取锁
        boolean isLock = lock.tryLock();
        //判断是否获取锁成功
        if(!isLock){
   
     
            log.error("不允许重复下单");//理论上不会发生
            return;
        }
        try {
   
     
            //这是主线程的一个子线程,无法直接获得代理对象,代理对象需要在主线程中获取,并设置成成员变量使得该子线程能够获取
            //createVoucherOrder的方法体直接修改,不再需要重新根据id创建订单,而是直接将订单传进去
            proxy.createVoucherOrder(voucherOrder);
        } finally {
   
     
            lock.unlock();
        }
    }

    @Transactional
    public void createVoucherOrder(VoucherOrder voucherOrder) {
   
     
        Long userId = voucherOrder.getUserId();
        //查询订单
        int count = query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count();
        //判断是否存在
        if (count > 0) {
   
     
            log.error("不可重复购买");
            return;
        }
        //扣减库存
        boolean success = seckillVoucherService.update().
                setSql("stock = stock - 1").
                eq("voucher_id", voucherOrder.getVoucherId()).
                gt("stock", 0).
                update();
        if (!success) {
   
     
            log.error("库存不足");
            return;
        }
        //不需要再创建订单,直接save
        save(voucherOrder);
    }
}

总结及发现问题

1、 秒杀业务的优化思路;

(1)先用Redis完成库存量以及一人一单判断,完成抢单
(2)下单的业务(操作数据库的业务)放入阻塞队列,并用独立线程完成异步下单

2、 基于阻塞队列的异步秒杀存在的问题;

(1)内存限制问题:我们使用了JDK的阻塞队列,占用的是JVM内存,若不加以限制,会有很多的订单对象创建线程并且将大量信息放入阻塞队列,可能会内存溢出
(2)数据安全问题:要用于下单的业务信息都存到了内存中,万一服务宕机,那么用户完成了抢单,但是数据库却没有做出相应的修改,将会导致数据不一致

解决的方法将在下一节进行分析

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: