Redis实战项目(二)
本篇博客总结了我在学习redis中做的实战项目,实现了优惠券秒杀功能,并用分布式锁、消息队列等对其进行了优化,其中业务部分代码和注释都是我手敲的,前端代码和图片来自黑马程序员
虽然这个项目已经烂大街,但是对于学习项目开发和redis来说仍是一份不错的教材,感谢黑马程序员
课程链接:https://www.bilibili.com/video/BV1cr4y1671t?p=1&vd_source=40ac0553f204ea9791dc385431e71f1c
redis实现全局唯一id
不使用id自增的原因:
- id规律性太明显,暴露信息
- 受单表数据量的限制
- 分布式系统中不能保证唯一性
全局ID生成器:
ID组成部分:
符号位:1bit,永远为0
时间戳:31bit,可以使用69年
序列号:32bit,秒内计数器,支持每秒产生2^32个不同ID
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class RedisIdWorker {
//开始时间戳
private static final long BEGIN_TIMESTAMP = 1640995200L;
//序列号的位数
private static final int COUNT_BITS = 32;
private StringRedisTemplate stringRedisTemplate;
public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
this.stringRedisTemplate = stringRedisTemplate;
}
public long nextId(String keyPrefix) {
//1.生成时间戳
LocalDateTime now = LocalDateTime.now();
long nowSecond = now.toEpochSecond(ZoneOffset.UTC);
long timestamp = nowSecond - BEGIN_TIMESTAMP;
//2.生成序列号
//2.1获取当前日期,精确到天,避免超过32位上限,且便于统计
String date = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
//2.2自增长
Long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + ":" + date);
//3.拼接并返回:时间戳的值左移序列号的位数
return timestamp << COUNT_BITS | count;
}
}redis自增策略:
每天一个key,方便统计订单量
ID构造:时间戳 + 计数器
优惠券秒杀
下单时需要判断两点:
秒杀是否开始或结束,如果尚未开始或已经结束则无法下单
库存是否充足,不足则无法下单
业务流程:
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64/**
* controller包下
*/
private IVoucherOrderService voucherOrderService;
public Result seckillVoucher( { Long voucherId)
return voucherOrderService.seckillVoucher(voucherId);
}
/**
* service.impl包下
* @param voucherId 优惠券id
* @return
*/
Result seckillVoucher(Long voucherId);
private ISeckillVoucherService seckillVoucherService;
//获取全局唯一id
private RedisIdWorker redisIdWorker;
//涉及两张表操作
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if (voucher.getStock() < 1) {
//库存不足
return Result.fail("库存不足!");
}
//5.扣减库存
boolean success = seckillVoucherService.update().setSql("stock = stock - 1").
eq("voucher_id", voucherId).update();
if (!success) {
//扣减失败
return Result.fail("库存不足!");
}
//6.创建订单
VoucherOrder order = new VoucherOrder();
//6.1订单id
long orderId = redisIdWorker.nextId("order");
order.setId(orderId);
//6.2用户id
Long userId = UserHolder.getUser().getId();
//6.3代金券id
order.setVoucherId(userId);
save(order);
//7.返回订单id
return Result.ok(orderId);
}
超卖问题
假设有100个订单,当前有200个用户同时发起请求,那么订单的余量就可能出现负数,为什么会出现这种问题?
假设目前只剩下一个库存:
只要在线程1扣减之前,有无数的线程进来查询,都会判断库存为充足,都会扣减库存,最终导致超卖。
解决方法
这种典型的线程安全问题的一个常见解决方案就是加锁:
悲观锁实现简单,但效率较低,乐观锁虽然效率高,但难点在于如何知道在判断之前查询的数据是否被修改过,常见的方法有两种:
- 版本号法:没当数据做一次修改,版本号加1,判断版本号有没有变化即可
- CAS法:我们每次更新库存时必须更新版本,那么就可以把库存当作版本即可
基于CAS法解决超卖问题
乐观锁的问题就是成功率太低,当有无数线程发出请求,第一个线程更新了库存后,剩下的大量线程都会拿到不匹配的数据,从而失败,我们只要将库存的判断条件改为是否大于0就可以解决这个问题:
1 | //5.扣减库存 |
总结
超卖这样的线程安全问题,解决方案有哪些?
- 悲观锁:添加同步锁,让线程串行执行
- 优点:简单粗暴
- 缺点:性能一般
- 乐观锁:不加锁,在更新时判断是否有其他线程在修改
- 优点:性能好
- 缺点:存在成功率低的问题
一人一单
同一个优惠券,一个人只能下一单。
业务流程:
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public Result seckillVoucher(Long voucherId) {
//1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
//2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
//尚未开始
return Result.fail("秒杀尚未开始!");
}
//3.判断秒杀是否已经结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已经结束!");
}
//4.判断库存是否充足
if (voucher.getStock() < 1) {
//库存不足
return Result.fail("库存不足!");
}
Long userId = UserHolder.getUser().getId();
//先获取锁,提交事务,再释放锁才能才能确保线程安全
synchronized (userId.toString().intern()) {
//拿到事务代理对象
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
}
}
public Result createVoucherOrder(Long voucherId) {
//1.一人一单判断
Long userId = UserHolder.getUser().getId();
//1.1查询订单
int count = query().eq("user_id", userId)
.eq("voucher_id", voucherId)
.count();
//1.2判断是否存在
if (count > 0) {
return Result.fail("用户已经购买过一次");
}
//2.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1") //set stock = stock-1
.eq("voucher_id", voucherId) //where voucher_id==voucherId
.gt("stock", 0) //where stock >0
.update();
if (!success) {
//扣减失败
return Result.fail("库存不足!");
}
VoucherOrder voucherOrder = new VoucherOrder();
long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setVoucherId(voucherId);
save(voucherOrder);
return Result.ok(orderId);
}在单机情况下,这种做法是可行的,但在集群模式下,由于多台JVM都有自己的锁,每个JVM中都会有一个线程是成功的,还是会导致线程安全问题:
总的来说,就是在集群/分布式系统下,有多个JVM的存在,每个JVM都有自己的锁,导致每个锁都可以有一个线程获取,就出现了并行运行,可能出现线程安全问题。
解决问题的关键:让多个JVM只能使用同一把锁
分布式锁
一个JVM只有一个锁监视器,所以只会有一个线程获取锁,可以实现线程间的互斥。但是当有多个JVM的时候,就无法实现多个线程间的互斥,所以就要使用一个在JVM外部的,多JVM进程都可以看到的一个锁监视器:
通过上面的描述,分布式锁的概念就很清晰了,即:
满足分布式系统或集群模式下多进程可见并且互斥的锁。
同时它也应该满足以下几点要求:
- 多进程可见:在JVM外部的,比如redis、MySQL
- 互斥:多线程同时访问,只有一个线程能拿到锁
- 高可用:获取锁的动作不能经常出现问题,从而影响整个业务
- 高性能:加锁已经影响性能,获取锁的动作必须做到高并发
- 安全性:考虑异常情况,比如死锁、程序异常终止导致锁未释放
分布式锁的实现
分布式锁的核心是实现多进程之间的互斥,而满足这一点的方式有很多,常见的有三种:
条件 | MySQL | Redis | Zookeeper |
---|---|---|---|
互斥 | 利用mysql本身的互斥锁机制 | 利用setnx这样的互斥命令 | 利用节点的唯一性和有序性实现互斥 |
高可用 | 好 | 好 | 好 |
高性能 | 一般 | 好 | 一般 |
安全性 | 断开连接,自动释放锁 | 利用锁超时时间,到期释放 | 临时节点,断开连接自动释放 |
基于redis的分布式锁
获取锁:
互斥:确保只能有一个线程获取锁
添加锁,NX是互斥,EX是设置超时时间(把两步写成一步,保证原子性):SET lock thread1 NX EX 10
释放锁:
超时释放:获取锁的时候添加一个超时时间
释放锁,删除即可:DEL key
练习
定义一个类,实现下面接口,利用redis实现分布式锁功能。
1 | public interface ILock { |
代码实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26public class SimpleRedisLock implements ILock {
//业务名称,也是锁的名称
private String name;
private StringRedisTemplate stringRedisTemplate;
private static final String KEY_PREFIX = "lock:";
public SimpleRedisLock(String name, StringRedisTemplate stringRedisTemplate) {
this.name = name;
this.stringRedisTemplate = stringRedisTemplate;
}
public boolean tryLock(long timeoutSec) {
//获取线程标示
long threadId = Thread.currentThread().getId();
//获取锁
Boolean success = stringRedisTemplate.opsForValue()
.setIfAbsent(KEY_PREFIX + name, threadId, timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}
public void unlock() {
//释放锁
stringRedisTemplate.delete(KEY_PREFIX + name);
}测试:
注入redis,修改seckillVoucher()
第4步以后的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private StringRedisTemplate stringRedisTemplate;
//创建锁对象
SimpleRedisLock lock = new SimpleRedisLock("order:" + userId, stringRedisTemplate);
//获取锁
boolean isLock = lock.tryLock(10);
//判断是否获取锁成功
if (!isLock) {
//获取锁失败,返回错误信息或重试
return Result.fail("不可重复下单!");
}
try {
//获取锁成功
IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
} catch (IllegalStateException e) {
throw new RuntimeException(e);
} finally {
//释放锁
lock.unlock();
}
误删问题
线程1拿到锁,但业务阻塞时间太长,超过了我们设置的超时时间,就会自动释放锁,此时线程2就可以拿到锁,执行自己的业务。如果这期间线程1的业务执行完毕,它会执行del lock
也就是释放锁的操作,导致把线程2的锁释放了,而这时恰好线程3来了,又能拿到一把锁,线程2和线程3都在执行自己的业务,就有可能发生线程安全问题,如图所示:
解决方案
在释放锁之前,判断锁的标识是否一致。
1 | //添加锁的前缀 |
但是,这并没有完全解决问题,判断锁标识和释放是两个动作,这两个动作之间可能产生阻塞(如GC),就会出现问题。判断锁和释放如果满足原子性,就可以解决这个问腿。
Lua脚本解决多条命令原子性问题
释放锁的业务流程:
- 获取锁中的线程提示
- 判断是否与指定的标识(当前线程标识)一致
- 如果一致则释放锁
- 如果不一致则什么都不做
在resources目录下创建unlock.lua
文件:
1 | --比较线程标识与锁中标识是否一致 |
在unlock()
方法中调用脚本:
1 | //在外部提前定义好,避免频繁的IO |
总结
基于redis的分布式锁实现思路:
- 利用
set nx ex
获取锁,并设置过期时间,保存线程提示 - 释放锁时先判断线程标识是否与自己一致,一致则删除
特性:
- 利用
set nx
满足互斥 - 利用
set ex
保证故障时锁依然能释放,避免死锁,提高安全性 - 利用redis集群保证高可用性和高并发特性
基于setnx实现分布式锁完整代码如下:
1 | public class SimpleRedisLock implements ILock { |
分布式锁优化(Redisson)
基于setnx实现的分布式锁存在以下问题:
- 不可重入:同一个线程无法多次获取同一把锁
- 不可重试:获取锁只尝试一次就返回false,没有重试机制
- 超时释放:锁超时释放虽然可以避免死锁,但如果是业务执行耗时较长,也会导致锁释放,存在安全隐患
- 主从一致性:如果redis提供了主从集群,主从同步存在延迟,当主宕机时,如果从并未同步主中的数据,则会出现锁失效
可重入
- 配置Redisson:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class RedissonConfig {
public RedissonClient redissonClient(){
//配置
Config config = new Config();
config.useSingleServer().setAddress("redis://192.168.10.104:6379").setPassword("200119");
//创建redissonClient 对象
return Redisson.create(config);
}
}
- 使用Redisson分布式锁:
```java
private RedissonClient redissonClient;
void testRedisson() throws InterrupytedException() {
//获取锁(可重入),指定锁名称
RLock lock = redissonClient.getLock("anyLock");
//尝试获取锁,参数分别是:获取锁的最大等待时间(期间会重试),锁自动释放是时间,时间单位
boolean isLock = lock.tryLock(1, 10, TimeUnit.SECONDS);
//判断释放获取成功
if(isLock) {
try {
System.out.println("执行业务");
}finally {
//释放锁
lock.unlock();
}
}
}
原理:利用Hash结构代替String结构,不仅存储线程标识,还要存储可重入的次数,每获取一次锁,可重入次数加一,反之减一,由于获取锁和释放锁常常成对出现,当次数为0时,就可以大胆删除锁。
业务逻辑:
利用lua脚本编写保证原子性:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41-- 获取锁
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断是否存在
if(redis.call('exists', key) == 0) then
-- 不存在,获得锁
redis.call('hset', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
-- 锁已经存在,判断threadId是否是自己
if(redis.call('hexists', key, threadId) == 1) then
-- 不存在,获取锁,重入次数加1
redis.call('hincrby', key, threadId, '1');
-- 设置有效期
redis.call('expire', key, releaseTime);
return 1; -- 返回结果
end;
return 0; -- 代码走完,说明获取锁的不是自己,获取锁失败
-- 释放锁
local key = KEYS[1]; -- 锁的key
local threadId = ARGV[1]; -- 线程唯一标识
local releaseTime = ARGV[2]; -- 锁的自动释放时间
-- 判断当前锁是否还是自己持有
if(redis.call('hexists', key, threadId) == 0) then
return null; -- 如果已经不是自己,则直接返回
end;
-- 是自己的锁,则重入次数减一
local count = redis.call('hincrby', key, threadId, -1);
-- 判断是否重入次数已经为0
if(count > 0) then
-- 大于0说明不能释放锁,重置有效期返回
redis.call('expire', key, releaseTime);
return nil;
else -- 等于0说明可以释放锁,直接删除
redis.call('del', key);
return nil;
end;
锁重试和看门狗机制
- 业务流程:
总结
Redosson分布式锁原理:
- 可重入:利用hash结构记录线程id和重入次数
- 可重试:利用信号量和PubSub功能实现等待、唤醒,获取锁失败的重试机制
- 超时释放:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间
秒杀优化
秒杀的业务基本是一条龙下来,为了保证一人一单,我们又加了分布式锁,而且存在大量对数据库的写操作,其性能可想而知,所以我们改变整体的业务逻辑如下:
其难点在于如何在redis中完成判断和校验,业务流程如下:
项目实践
需求:
- 新增秒杀优惠券的同时,将优惠券信息保存到redis中
- 基于Lua脚本,判断秒杀库存、一人一单,决定用户是否抢购成功
- 如果抢购成功,将优惠券id和用户id封装后存入阻塞队列
- 开启线程任务,不断从阻塞队列中获取信息,实现异步下单功能
需求1
在addSeckillVoucher()
中加入以下代码:
1 | //保存秒杀信息到redis中 |
需求2
在resource包下创建seckill.lua
:
1 | --1.参数列表 |
在VoucherOrderServiceImpl
类下加载脚本:
1 | private static final DefaultRedisScript<Long> SECKILL_SCRIPT; |
需求3
1 | //创建阻塞队列 |
实现handleVoucherOrder()
方法:
1 | //提前准备代理对象,确保子线程能拿到 |
修改createVoucherOrder()
方法:
1 |
|
需求4
实现秒杀业务:
1 |
|
总结
秒杀业务的优化思路是什么?
- 先利用redis完后库存余量、一人一单判断,完成抢单业务
- 再将下单业务放入阻塞队列,利用独立线程异步下单
基于阻塞队列的异步秒杀存在哪些问题?
- 内存限制问题:如果队列存满,其他订单就无法存入
- 数据安全问题:如果服务突然宕机,内存中的订单信息都会丢失
消息队列实现异步秒杀
- 消息队列:存储和管理信息,也被称为消息代理(Message Broker)
- 生产者:发送消息到消息队列
- 消费者:从消息队列中获取消息并处理消息
redis提供了三种不同的方式实现消息队列:
- list结构:基于List结构模拟消息队列
- PubSub:基于点对点消息模型
- Stream:比较完善的消息队列模型
基于List的消息队列
Redis的list数据结构是一个双向链表,很容易模拟出队列效果(尾入头出)。可以利用LPUSH
结合RPOP
,或者RPUSH
结合LPOP
来实现,当队列中没有消息时,LPOP
和RPOP
并不会阻塞,而是返回NULL
,因此这里使用BRPOP或BLPOP实现阻塞效果。
优点:
- 利用redis存储,不受限于JVM内存上限
- 基于redis的持久化机制,数据安全性有保障
- 可以满足消息有序性
缺点:
- 无法避免消息丢失
- 只支持单消费者
基于PubSub的消息队列
**PubSub(发布订阅)**是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。
- SUBSCRIBE channel[channel]:订阅一个或多个频道
- PUBLISH channel msg:向一个频道发送消息
- PSUBSCRIBE pattern[pattern]:订阅与pattern格式匹配的所有频道
优点:
- 支持发布订阅模式,支持多生产、多消费
缺点:
- 不支持数据持久化
- 无法避免消息丢失
- 消息堆积有上限,超出时数据丢失。
基于Stream的消息队列
Stream类型消息队列的XREAD命令特点:
- 消息可回溯
- 一个消息可以被多个消费者读取
- 可以阻塞读取
- 有消息漏读的风险
消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:
创建消费者组:XGROUP CREATE key groupName ID [MKSTREAM]
- key:队列名称
- groupName:消费者组名称
- ID:起始ID标识,$代表队列中最后一个消息,0代表第一个消息
- MKSTREAM:队列不存在时自动创建
从消费者组读取消息:XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group:消费组名称
- consumer:消费者名称,如果不存在,自动创建
- count:本次查询最大数量
- BLOCK milliseconds:当没有消息时最长等待时间
- NOACK:无须手动ACK,获取到消息后自动确认
- STREAMS key:指定队列名称
- ID:获取消息的起始ID:
- “>”:从下一个未消费的消息开始
- 其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始
基本思路:
1 | while(true){ |
STREAM类型消息队列的XREADGROUP命令特点:
- 消息可回溯
- 可以多消费者争夺消息,加快消费速度
- 可以阻塞读取
- 没有消息漏读的风险
- 有消息确认机制,保证消息至少被消费一次
总结
功能 | List | PubSub | Stream |
---|---|---|---|
消息持久化 | 支持 | 不支持 | 支持 |
阻塞读取 | 支持 | 支持 | 支持 |
消息堆积处理 | 受限于内存空间,可以利用多消费者加快处理 | 受限于消费者缓冲区 | 受限于消息队列长度,可以利用消费者组提高消费速度,减少堆积 |
消息确认机制 | 不支持 | 不支持 | 支持 |
消息回溯 | 不支持 | 不支持 | 支持 |