本篇博客总结了我在学习redis中做的实战项目,实现了优惠券秒杀功能,并用分布式锁、消息队列等对其进行了优化,其中业务部分代码和注释都是我手敲的,前端代码和图片来自黑马程序员
虽然这个项目已经烂大街,但是对于学习项目开发和redis来说仍是一份不错的教材,感谢黑马程序员
课程链接:https://www.bilibili.com/video/BV1cr4y1671t?p=1&vd_source=40ac0553f204ea9791dc385431e71f1c

redis实现全局唯一id

不使用id自增的原因:

  • id规律性太明显,暴露信息
  • 受单表数据量的限制
  • 分布式系统中不能保证唯一性

全局ID生成器:
结构
ID组成部分:

  • 符号位:1bit,永远为0

  • 时间戳:31bit,可以使用69年

  • 序列号:32bit,秒内计数器,支持每秒产生2^32个不同ID

  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    @Component
    public class RedisIdWorker {
    //开始时间戳
    private static final long BEGIN_TIMESTAMP = 1640995200L;

    //序列号的位数
    private static final int COUNT_BITS = 32;

    private StringRedisTemplate stringRedisTemplate;

    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
    this.stringRedisTemplate = stringRedisTemplate;
    }

    public long nextId(String keyPrefix) {
    //1.生成时间戳
    LocalDateTime now = LocalDateTime.now();
    long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
    long timestamp = nowSecond - BEGIN_TIMESTAMP;

    //2.生成序列号
    //2.1获取当前日期,精确到天,避免超过32位上限,且便于统计
    String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    //2.2自增长
    Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);

    //3.拼接并返回:时间戳的值左移序列号的位数
    return timestamp << COUNT_BITS | count;
    }
    }

    redis自增策略:

  • 每天一个key,方便统计订单量

  • ID构造:时间戳 + 计数器

优惠券秒杀

下单时需要判断两点:

  • 秒杀是否开始或结束,如果尚未开始或已经结束则无法下单

  • 库存是否充足,不足则无法下单

  • 业务流程:
    业务流程

  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    /**
    * controller包下
    */
    @Resource
    private IVoucherOrderService voucherOrderService;

    @PostMapping("seckill/{id}")
    public Result seckillVoucher(@PathVariable("id") Long voucherId) {
    return voucherOrderService.seckillVoucher(voucherId);
    }

    /**
    * service.impl包下
    * @param voucherId 优惠券id
    * @return
    */
    Result seckillVoucher(Long voucherId);

    @Resource
    private ISeckillVoucherService seckillVoucherService;
    //获取全局唯一id
    @Resource
    private RedisIdWorker redisIdWorker;

    @Override
    //涉及两张表操作
    @Transactional
    public Result seckillVoucher(Long voucherId) {
    //1.查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    //2.判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
    //尚未开始
    return Result.fail("秒杀尚未开始!");
    }
    //3.判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
    return Result.fail("秒杀已经结束!");
    }
    //4.判断库存是否充足
    if (voucher.getStock() < 1) {
    //库存不足
    return Result.fail("库存不足!");
    }
    //5.扣减库存
    boolean success = seckillVoucherService.update().setSql("stock = stock - 1").
    eq("voucher_id", voucherId).update();
    if (!success) {
    //扣减失败
    return Result.fail("库存不足!");
    }
    //6.创建订单
    VoucherOrder order = new VoucherOrder();
    //6.1订单id
    long orderId = redisIdWorker.nextId("order");
    order.setId(orderId);
    //6.2用户id
    Long userId = UserHolder.getUser().getId();
    //6.3代金券id
    order.setVoucherId(userId);
    save(order);
    //7.返回订单id
    return Result.ok(orderId);
    }

超卖问题

假设有100个订单,当前有200个用户同时发起请求,那么订单的余量就可能出现负数,为什么会出现这种问题?
假设目前只剩下一个库存:
业务流程
只要在线程1扣减之前,有无数的线程进来查询,都会判断库存为充足,都会扣减库存,最终导致超卖。

解决方法

这种典型的线程安全问题的一个常见解决方案就是加锁:
比较
悲观锁实现简单,但效率较低,乐观锁虽然效率高,但难点在于如何知道在判断之前查询的数据是否被修改过,常见的方法有两种:

  • 版本号法:没当数据做一次修改,版本号加1,判断版本号有没有变化即可
  • CAS法:我们每次更新库存时必须更新版本,那么就可以把库存当作版本即可

基于CAS法解决超卖问题

乐观锁的问题就是成功率太低,当有无数线程发出请求,第一个线程更新了库存后,剩下的大量线程都会拿到不匹配的数据,从而失败,我们只要将库存的判断条件改为是否大于0就可以解决这个问题:

1
2
3
4
5
6
7
//5.扣减库存
boolean success = seckillVoucherService.update().
setSql("stock = stock - 1").
//判断库存在修改前后是否相等
// eq("voucher_id", voucherId).eq("stock", voucher.getStock()).
eq("voucher_id", voucherId).gt("stock", 0).
update();

总结

超卖这样的线程安全问题,解决方案有哪些?

  1. 悲观锁:添加同步锁,让线程串行执行
  • 优点:简单粗暴
  • 缺点:性能一般
  1. 乐观锁:不加锁,在更新时判断是否有其他线程在修改
  • 优点:性能好
  • 缺点:存在成功率低的问题

一人一单

同一个优惠券,一个人只能下一单。

  • 业务流程:
    业务流程

  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    @Override
    public Result seckillVoucher(Long voucherId) {
    //1.查询优惠券
    SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
    //2.判断秒杀是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
    //尚未开始
    return Result.fail("秒杀尚未开始!");
    }
    //3.判断秒杀是否已经结束
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
    return Result.fail("秒杀已经结束!");
    }
    //4.判断库存是否充足
    if (voucher.getStock() < 1) {
    //库存不足
    return Result.fail("库存不足!");
    }
    Long userId = UserHolder.getUser().getId();
    //先获取锁,提交事务,再释放锁才能才能确保线程安全
    synchronized (userId.toString().intern()) {
    //拿到事务代理对象
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
    }
    }

    @Transactional
    public Result createVoucherOrder(Long voucherId) {
    //1.一人一单判断
    Long userId = UserHolder.getUser().getId();
    //1.1查询订单
    int count = query().eq("user_id", userId)
    .eq("voucher_id", voucherId)
    .count();
    //1.2判断是否存在
    if (count > 0) {
    return Result.fail("用户已经购买过一次");
    }
    //2.扣减库存
    boolean success = seckillVoucherService.update()
    .setSql("stock = stock - 1") //set stock = stock-1
    .eq("voucher_id", voucherId) //where voucher_id==voucherId
    .gt("stock", 0) //where stock >0
    .update();
    if (!success) {
    //扣减失败
    return Result.fail("库存不足!");
    }
    VoucherOrder voucherOrder = new VoucherOrder();
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    voucherOrder.setVoucherId(voucherId);
    save(voucherOrder);
    return Result.ok(orderId);
    }

    在单机情况下,这种做法是可行的,但在集群模式下,由于多台JVM都有自己的锁,每个JVM中都会有一个线程是成功的,还是会导致线程安全问题:
    业务流程
    总的来说,就是在集群/分布式系统下,有多个JVM的存在,每个JVM都有自己的锁,导致每个锁都可以有一个线程获取,就出现了并行运行,可能出现线程安全问题。
    解决问题的关键:让多个JVM只能使用同一把锁

分布式锁

一个JVM只有一个锁监视器,所以只会有一个线程获取锁,可以实现线程间的互斥。但是当有多个JVM的时候,就无法实现多个线程间的互斥,所以就要使用一个在JVM外部的,多JVM进程都可以看到的一个锁监视器:
业务流程
通过上面的描述,分布式锁的概念就很清晰了,即:
满足分布式系统或集群模式下多进程可见并且互斥的锁。
同时它也应该满足以下几点要求:

  • 多进程可见:在JVM外部的,比如redis、MySQL
  • 互斥:多线程同时访问,只有一个线程能拿到锁
  • 高可用:获取锁的动作不能经常出现问题,从而影响整个业务
  • 高性能:加锁已经影响性能,获取锁的动作必须做到高并发
  • 安全性:考虑异常情况,比如死锁、程序异常终止导致锁未释放

分布式锁的实现

分布式锁的核心是实现多进程之间的互斥,而满足这一点的方式有很多,常见的有三种:

条件 MySQL Redis Zookeeper
互斥 利用mysql本身的互斥锁机制 利用setnx这样的互斥命令 利用节点的唯一性和有序性实现互斥
高可用
高性能 一般 一般
安全性 断开连接,自动释放锁 利用锁超时时间,到期释放 临时节点,断开连接自动释放

基于redis的分布式锁

  • 获取锁:
    互斥:确保只能有一个线程获取锁
    添加锁,NX是互斥,EX是设置超时时间(把两步写成一步,保证原子性):
    SET lock thread1 NX EX 10

  • 释放锁:
    超时释放:获取锁的时候添加一个超时时间
    释放锁,删除即可:DEL key

练习

定义一个类,实现下面接口,利用redis实现分布式锁功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface ILock {
/**
* 尝试获取锁
* @param timeoutSec 锁持有的超时时间,过期后自动释放
* @return true 代表获取锁成功;false代表获取锁失败
* */
boolean tryLock(long timeoutSec);

/*
* 释放锁
* */
void unlock();
}
  • 代码实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public class SimpleRedisLock implements ILock {
    //业务名称,也是锁的名称
    private String name;
    private StringRedisTemplate stringRedisTemplate;
    private static final String KEY_PREFIX = "lock:";

    public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
    this.name = name;
    this.stringRedisTemplate = stringRedisTemplate;
    }

    @Override
    public boolean tryLock(long timeoutSec) {
    //获取线程标示
    long threadId = Thread.currentThread().getId();
    //获取锁
    Boolean success = stringRedisTemplate.opsForValue()
    .setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
    return Boolean.TRUE.equals(success);
    }

    @Override
    public void unlock() {
    //释放锁
    stringRedisTemplate.delete(KEY_PREFIX + name);
    }
  • 测试:
    注入redis,修改seckillVoucher()第4步以后的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    //创建锁对象
    SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
    //获取锁
    boolean isLock = lock.tryLock(10);
    //判断是否获取锁成功
    if (!isLock) {
    //获取锁失败,返回错误信息或重试
    return Result.fail("不可重复下单!");
    }
    try {
    //获取锁成功
    IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
    } catch (IllegalStateException e) {
    throw new RuntimeException(e);
    } finally {
    //释放锁
    lock.unlock();
    }

误删问题

线程1拿到锁,但业务阻塞时间太长,超过了我们设置的超时时间,就会自动释放锁,此时线程2就可以拿到锁,执行自己的业务。如果这期间线程1的业务执行完毕,它会执行del lock也就是释放锁的操作,导致把线程2的锁释放了,而这时恰好线程3来了,又能拿到一把锁,线程2和线程3都在执行自己的业务,就有可能发生线程安全问题,如图所示:
业务流程

解决方案

在释放锁之前,判断锁的标识是否一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//添加锁的前缀
private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "-";

//tryLock()方法中获取线程id的代码改为
String threadId = ID_PREFIX + Thread.currentThread().getId();

//修改释放锁的业务逻辑
@Override
public void unlock() {
//获取线程标识
String threadId = ID_PREFIX + Thread.currentThread().getId();
//判断是否一致
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);
if (threadId == id) {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

但是,这并没有完全解决问题,判断锁标识和释放是两个动作,这两个动作之间可能产生阻塞(如GC),就会出现问题。判断锁和释放如果满足原子性,就可以解决这个问腿。

Lua脚本解决多条命令原子性问题

释放锁的业务流程:

  1. 获取锁中的线程提示
  2. 判断是否与指定的标识(当前线程标识)一致
  3. 如果一致则释放锁
  4. 如果不一致则什么都不做

在resources目录下创建unlock.lua文件:

1
2
3
4
5
6
--比较线程标识与锁中标识是否一致
if (redis.call('get', KEYS[1]) == ARGV[1]) then
--释放锁,del key
return redis.call('del', KEYS[1])
end
return 0

unlock()方法中调用脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//在外部提前定义好,避免频繁的IO
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

@Override
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId());
}

总结

基于redis的分布式锁实现思路:

  • 利用set nx ex获取锁,并设置过期时间,保存线程提示
  • 释放锁时先判断线程标识是否与自己一致,一致则删除

特性:

  • 利用set nx满足互斥
  • 利用set ex保证故障时锁依然能释放,避免死锁,提高安全性
  • 利用redis集群保证高可用性和高并发特性

基于setnx实现分布式锁完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class SimpleRedisLock implements ILock {
//业务名称,也是锁的名称
private String name;
private StringRedisTemplate stringRedisTemplate;

private static final String KEY_PREFIX = "lock:";
//添加锁的前缀
private static final String ID_PREFIX = UUID.randomUUID() + "-";
//在外部提前定义好,避免频繁的IO
private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;

static {
UNLOCK_SCRIPT = new DefaultRedisScript<>();
UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
UNLOCK_SCRIPT.setResultType(Long.class);
}

public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}

@Override
public boolean tryLock(long timeoutSec) {
//获取线程标示
String threadId = ID_PREFIX + Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}


@Override
public void unlock() {
//调用lua脚本
stringRedisTemplate.execute(UNLOCK_SCRIPT,
Collections.singletonList(KEY_PREFIX + name),
ID_PREFIX + Thread.currentThread().getId()
);
}
}

分布式锁优化(Redisson)

基于setnx实现的分布式锁存在以下问题:

  • 不可重入:同一个线程无法多次获取同一把锁
  • 不可重试:获取锁只尝试一次就返回false,没有重试机制
  • 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
  • 主从一致性:如果redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并未同步主中的数据,则会出现锁失效

可重入

  • 配置Redisson:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    @Configuration
    public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
    //配置
    Config config = new Config();
    config.useSingleServer().setAddress("redis://192.168.10.104:6379").setPassword("200119");
    //创建redissonClient 对象
    return Redisson.create(config);
    }
    }
    - 使用Redisson分布式锁:
    ```java
    @Resource
    private RedissonClient redissonClient;

    @Test
    void testRedisson() throws InterrupytedException() {
    //获取锁(可重入),指定锁名称
    RLock lock = redissonClient.getLock("anyLock");
    //尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放是时间,时间单位
    boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
    //判断释放获取成功
    if(isLock) {
    try {
    System.out.println("执行业务");
    }finally {
    //释放锁
    lock.unlock();
    }
    }
    }

原理:利用Hash结构代替String结构,不仅存储线程标识,还要存储可重入的次数,每获取一次锁,可重入次数加一,反之减一,由于获取锁和释放锁常常成对出现,当次数为0时,就可以大胆删除锁。

  • 业务逻辑:
    业务流程

  • 利用lua脚本编写保证原子性:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    -- 获取锁
    local key = KEYS[1]; -- 锁的key
    local threadId = ARGV[1]; -- 线程唯一标识
    local releaseTime = ARGV[2]; -- 锁的自动释放时间
    -- 判断是否存在
    if(redis.call('exists', key) == 0) then
    -- 不存在,获得锁
    redis.call('hset', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
    end;
    -- 锁已经存在,判断threadId是否是自己
    if(redis.call('hexists', key, threadId) == 1) then
    -- 不存在,获取锁,重入次数加1
    redis.call('hincrby', key, threadId, '1');
    -- 设置有效期
    redis.call('expire', key, releaseTime);
    return 1; -- 返回结果
    end;
    return 0; -- 代码走完,说明获取锁的不是自己,获取锁失败

    -- 释放锁
    local key = KEYS[1]; -- 锁的key
    local threadId = ARGV[1]; -- 线程唯一标识
    local releaseTime = ARGV[2]; -- 锁的自动释放时间
    -- 判断当前锁是否还是自己持有
    if(redis.call('hexists', key, threadId) == 0) then
    return null; -- 如果已经不是自己,则直接返回
    end;
    -- 是自己的锁,则重入次数减一
    local count = redis.call('hincrby', key, threadId, -1);
    -- 判断是否重入次数已经为0
    if(count > 0) then
    -- 大于0说明不能释放锁,重置有效期返回
    redis.call('expire', key, releaseTime);
    return nil;
    else -- 等于0说明可以释放锁,直接删除
    redis.call('del', key);
    return nil;
    end;

锁重试和看门狗机制

  • 业务流程:
    业务流程

总结

Redosson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
  • 超时释放:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间

源码讲解见:https://www.bilibili.com/video/BV1cr4y1671t/?p=67&spm_id_from=pageDriver&vd_source=40ac0553f204ea9791dc385431e71f1c

秒杀优化

秒杀的业务基本是一条龙下来,为了保证一人一单,我们又加了分布式锁,而且存在大量对数据库的写操作,其性能可想而知,所以我们改变整体的业务逻辑如下:
业务流程

其难点在于如何在redis中完成判断和校验,业务流程如下:
业务流程

项目实践

需求:

  1. 新增秒杀优惠券的同时,将优惠券信息保存到redis中
  2. 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
  3. 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
  4. 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能

需求1

addSeckillVoucher()中加入以下代码:

1
2
//保存秒杀信息到redis中
stringRedisTemplate.opsForValue().set(SECKILL_STOCK_KEY + voucher.getId(), voucher.getStock().toString());

需求2

在resource包下创建seckill.lua

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
--1.参数列表
--1.1优惠券id
local voucherId = ARGV[1]
--1.2用户id
local userId = Argv[2]
--1.3订单id
local orderId = ARGV[3]

--2.数据key
--2.1库存key
local stockKey = "seckill:stock" .. voucherId
--2.2订单key
local orderKey = "seckill:order" .. voucherId

--3.脚本业务
--3.1判断库存是否充足 get stock
if (tonumber(redis.call('get', stockKey)) <= 0) then
--库存不足,返回1
return 1
end
--3.2判断用户是否下单
if (redis.call('sismember', orderKey, userId) == 1) then
--存在,说明是重复下单,返回2
return 2
end
--3.3扣库存incrby stockKey -1
redis.call('incrby', stockKey, -1)
--3.4下单,保存用户信息 sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0;

VoucherOrderServiceImpl类下加载脚本:

1
2
3
4
5
6
private static final DefaultRedisScript<Long> SECKILL_SCRIPT;
static {
SECKILL_SCRIPT = new DefaultRedisScript<>();
SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
SECKILL_SCRIPT.setResultType(Long.class);
}

需求3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//创建阻塞队列
private BlockingQueue<VoucherOrder> orderTasks = new ArrayBlockingQueue<>(1024 * 1024);
//创建线程池
private static final ExecutorService SECKILL_ORDER_EXECUTOR = Executors.newSingleThreadExecutor();
//确保类初始化完毕就立即执行
@PostConstruct
private void init() {
SECKILL_ORDER_EXECUTOR.submit(new VoucherOrderHandler());
}

private class VoucherOrderHandler implements Runnable {
@Override
public void run() {
while (true) {
try {
//1.获取队列中的信息
VoucherOrder voucherOrder = orderTasks.take();
//2.创建订单存到数据库
handleVoucherOrder(voucherOrder);
} catch (InterruptedException e) {
log.error("处理订单异常", e);
}
}
}
}

实现handleVoucherOrder()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//提前准备代理对象,确保子线程能拿到
private IVoucherOrderService proxy;

//在seckillVoucher方法的orderTasks.add()后加
proxy = (IVoucherOrderService) AopContext.currentProxy();

//异步创建订单
private void handleVoucherOrder(VoucherOrder voucherOrder) {
//1.获取用户id
Long userId = voucherOrder.getUserId();
//2.创建锁对象
RLock lock = redissonClient.getLock("lock:order:" + userId);
//3.获取锁
boolean isLock = lock.tryLock();
//4.判断获取锁是否成功
if (!isLock) {
//获取锁失败,返回错误
log.error("不允许重复下单");
return;
}
try {
proxy.createVoucherOrder(voucherOrder);
} finally {
//3.释放锁
lock.unlock();
}
}

修改createVoucherOrder()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Transactional
public void createVoucherOrder(VoucherOrder voucherOrder) {
//1.一人一单判断
Long userId = UserHolder.getUser().getId();
//1.1查询订单
int count = query().eq("user_id", userId)
.eq("voucher_id", voucherOrder.getVoucherId())
.count();
//1.2判断是否存在
if (count > 0) {
log.error("用户已经购买过一次");
return;
}
//2.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock = stock-1
.eq("voucher_id", voucherOrder.getVoucherId()) //where voucher_id==voucherId
.gt("stock", 0) //where stock >0
.update();
if (!success) {
//扣减失败
log.error("库存不足!");
return;
}
//3.创建订单
save(voucherOrder);
}

需求4

实现秒杀业务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public Result seckillVoucher(Long voucherId) {
//1.1获取用户id
Long userId = UserHolder.getUser().getId();
//1.2执行lua脚本
Long result = stringRedisTemplate.execute(
SECKILL_SCRIPT,
Collections.emptyList(),
voucherId.toString(), userId.toString()
);
//2.判断执行结果是否为0
int r = result.intValue();
if (r != 0) {
//2.1不为0,没有购买资格
return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
}
//2.2 为0,有购买资格,把下单信息保存到阻塞队列中
VoucherOrder voucherOrder = new VoucherOrder();
//2.3订单id
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
//2.4保存用户id
voucherOrder.setUserId(userId);
//2.5优惠券id
voucherOrder.setVoucherId(voucherId);
//2.6放入阻塞队列
orderTasks.add(voucherOrder);
//3.获取代理对象(确保事务Transactional生效,保证createVoucherOrder 提交完事务之后再释放锁)
proxy = (IVoucherOrderService) AopContext.currentProxy();
//4.返回订单id
return Result.ok(orderId);
}

总结

秒杀业务的优化思路是什么?

  1. 先利用redis完后库存余量、一人一单判断,完成抢单业务
  2. 再将下单业务放入阻塞队列,利用独立线程异步下单

基于阻塞队列的异步秒杀存在哪些问题?

  • 内存限制问题:如果队列存满,其他订单就无法存入
  • 数据安全问题:如果服务突然宕机,内存中的订单信息都会丢失

消息队列实现异步秒杀

  • 消息队列:存储和管理信息,也被称为消息代理(Message Broker)
  • 生产者:发送消息到消息队列
  • 消费者:从消息队列中获取消息并处理消息

redis提供了三种不同的方式实现消息队列:

  • list结构:基于List结构模拟消息队列
  • PubSub:基于点对点消息模型
  • Stream:比较完善的消息队列模型

基于List的消息队列

Redis的list数据结构是一个双向链表,很容易模拟出队列效果(尾入头出)。可以利用LPUSH结合RPOP,或者RPUSH结合LPOP来实现,当队列中没有消息时,LPOPRPOP并不会阻塞,而是返回NULL,因此这里使用BRPOPBLPOP实现阻塞效果。

优点:

  • 利用redis存储,不受限于JVM内存上限
  • 基于redis的持久化机制,数据安全性有保障
  • 可以满足消息有序性

缺点:

  • 无法避免消息丢失
  • 只支持单消费者

基于PubSub的消息队列

**PubSub(发布订阅)**是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

  • SUBSCRIBE channel[channel]:订阅一个或多个频道
  • PUBLISH channel msg:向一个频道发送消息
  • PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道

优点:

  • 支持发布订阅模式,支持多生产、多消费

缺点:

  • 不支持数据持久化
  • 无法避免消息丢失
  • 消息堆积有上限,超出时数据丢失。

基于Stream的消息队列

Stream类型消息队列的XREAD命令特点:

  • 消息可回溯
  • 一个消息可以被多个消费者读取
  • 可以阻塞读取
  • 有消息漏读的风险

消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
项目结构

创建消费者组:
XGROUP CREATE key groupName ID [MKSTREAM]

  • key:队列名称
  • groupName:消费者组名称
  • ID:起始ID标识,$代表队列中最后一个消息,0代表第一个消息
  • MKSTREAM:队列不存在时自动创建

从消费者组读取消息:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]

  • group:消费组名称
  • consumer:消费者名称,如果不存在,自动创建
  • count:本次查询最大数量
  • BLOCK milliseconds:当没有消息时最长等待时间
  • NOACK:无须手动ACK,获取到消息后自动确认
  • STREAMS key:指定队列名称
  • ID:获取消息的起始ID:
    • “>”:从下一个未消费的消息开始
    • 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

基本思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
while(true){
//尝试监听队列,使用阻塞模式,最长等待2000毫秒
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 >");
if(msg == null){
continue;
}
try{
//处理消息,完成后一定要ACK
handleMessage(msg);
}catch(Exception e){
while(true){
Object msg = redis.call("XREADGROUP GROUP g1 c1 COUNT 1 BLOCK 2000 STREAMS s1 0");
if(msg == null){
break;
}
try{
//说明有异常消息,再次处理
handleMessage(msg);
}catch(Exception e){
//再次出现异常,记录日志,继续循环
continue;
}
}
}
}

STREAM类型消息队列的XREADGROUP命令特点:

  • 消息可回溯
  • 可以多消费者争夺消息,加快消费速度
  • 可以阻塞读取
  • 没有消息漏读的风险
  • 有消息确认机制,保证消息至少被消费一次

总结

功能 List PubSub Stream
消息持久化 支持 不支持 支持
阻塞读取 支持 支持 支持
消息堆积处理 受限于内存空间,可以利用多消费者加快处理 受限于消费者缓冲区 受限于消息队列长度,可以利用消费者组提高消费速度,减少堆积
消息确认机制 不支持 不支持 支持
消息回溯 不支持 不支持 支持