redis 分布式锁的 5个坑,真是又大又深
ztj100 2024-11-04 15:16 12 浏览 0 评论
引言
最近项目上线的频率颇高,连着几天加班熬夜,身体有点吃不消精神也有些萎靡,无奈业务方催的紧,工期就在眼前只能硬着头皮上了。脑子浑浑噩噩的时候,写的就不能叫代码,可以直接叫做Bug。我就熬夜写了一个bug被骂惨了。
由于是做商城业务,要频繁的对商品库存进行扣减,应用是集群部署,为避免并发造成库存超买超卖等问题,采用 redis 分布式锁加以控制。本以为给扣库存的代码加上锁lock.tryLock就万事大吉了
1 /**
2 * @author xiaofu
3 * @description 扣减库存
4 * @date 2020/4/21 12:10
5 */
6 public String stockLock() {
7 RLock lock = redissonClient.getLock("stockLock");
8 try {
9 /**
10 * 获取锁
11 */
12 if (lock.tryLock(10, TimeUnit.SECONDS)) {
13 /**
14 * 查询库存数
15 */
16 Integer stock = Integer.valueOf(stringRedisTemplate.opsForValue().get("stockCount"));
17 /**
18 * 扣减库存
19 */
20 if (stock > 0) {
21 stock = stock - 1;
22 stringRedisTemplate.opsForValue().set("stockCount", stock.toString());
23 LOGGER.info("库存扣减成功,剩余库存数量:{}", stock);
24 } else {
25 LOGGER.info("库存不足~");
26 }
27 } else {
28 LOGGER.info("未获取到锁业务结束..");
29 }
30 } catch (Exception e) {
31 LOGGER.info("处理异常", e);
32 } finally {
33 lock.unlock();
34 }
35 return "ok";
36 }
结果业务代码执行完以后我忘了释放锁lock.unlock(),导致redis线程池被打满,redis服务大面积故障,造成库存数据扣减混乱,被领导一顿臭骂,这个月绩效~ 哎·~。
随着 使用redis 锁的时间越长,我发现 redis 锁的坑远比想象中要多。就算在面试题当中redis分布式锁的出镜率也比较高,比如:“用锁遇到过哪些问题?” ,“又是如何解决的?” 基本都是一套连招问出来的。
今天就分享一下我用redis 分布式锁的踩坑日记,以及一些解决方案,和大家一起共勉。
一、锁未被释放
这种情况是一种低级错误,就是我上边犯的错,由于当前线程 获取到redis 锁,处理完业务后未及时释放锁,导致其它线程会一直尝试获取锁阻塞,例如:用Jedis客户端会报如下的错误信息
1redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
redis线程池已经没有空闲线程来处理客户端命令。
解决的方法也很简单,只要我们细心一点,拿到锁的线程处理完业务及时释放锁,如果是重入锁未拿到锁后,线程可以释放当前连接并且sleep一段时间。
1 public void lock() {
2 while (true) {
3 boolean flag = this.getLock(key);
4 if (flag) {
5 TODO .........
6 } else {
7 // 释放当前redis连接
8 redis.close();
9 // 休眠1000毫秒
10 sleep(1000);
11 }
12 }
13 }
二、B的锁被A给释放了
我们知道Redis实现锁的原理在于 SETNX命令。当 key不存在时将 key的值设为 value ,返回值为 1;若给定的 key已经存在,则 SETNX不做任何动作,返回值为 0 。
1SETNX key value
我们来设想一下这个场景:A、B两个线程来尝试给key myLock加锁,A线程先拿到锁(假如锁3秒后过期),B线程就在等待尝试获取锁,到这一点毛病没有。
那如果此时业务逻辑比较耗时,执行时间已经超过redis锁过期时间,这时A线程的锁自动释放(删除key),B线程检测到myLock这个key不存在,执行 SETNX命令也拿到了锁。
但是,此时A线程执行完业务逻辑之后,还是会去释放锁(删除key),这就导致B线程的锁被A线程给释放了。
为避免上边的情况,一般我们在每个线程加锁时要带上自己独有的value值来标识,只释放指定value的key,否则就会出现释放锁混乱的场景。
三、数据库事务超时
emm~ 聊redis锁咋还扯到数据库事务上来了?别着急往下看,看下边这段代码:
1 @Transaction
2 public void lock() {
3
4 while (true) {
5 boolean flag = this.getLock(key);
6 if (flag) {
7 insert();
8 }
9 }
10 }
给这个方法添加一个@Transaction注解开启事务,如代码中抛出异常进行回滚,要知道数据库事务可是有超时时间限制的,并不会无条件的一直等一个耗时的数据库操作。
比如:我们解析一个大文件,再将数据存入到数据库,如果执行时间太长,就会导致事务超时自动回滚。
一旦你的key长时间获取不到锁,获取锁等待的时间远超过数据库事务超时时间,程序就会报异常。
一般为解决这种问题,我们就需要将数据库事务改为手动提交、回滚事务。
1 @Autowired
2 DataSourceTransactionManager dataSourceTransactionManager;
3
4 @Transaction
5 public void lock() {
6 //手动开启事务
7 TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
8 try {
9 while (true) {
10 boolean flag = this.getLock(key);
11 if (flag) {
12 insert();
13 //手动提交事务
14 dataSourceTransactionManager.commit(transactionStatus);
15 }
16 }
17 } catch (Exception e) {
18 //手动回滚事务
19 dataSourceTransactionManager.rollback(transactionStatus);
20 }
21 }
四、锁过期了,业务还没执行完
这种情况和我们上边提到的第二种比较类似,但解决思路上略有不同。
同样是redis分布式锁过期,而业务逻辑没执行完的场景,不过,这里换一种思路想问题,把redis锁的过期时间再弄长点不就解决了吗?
那还是有问题,我们可以在加锁的时候,手动调长redis锁的过期时间,可这个时间多长合适?业务逻辑的执行时间是不可控的,调的过长又会影响操作性能。
要是redis锁的过期时间能够自动续期就好了。
为了解决这个问题我们使用redis客户端redisson,redisson很好的解决了redis在分布式环境下的一些棘手问题,它的宗旨就是让使用者减少对Redis的关注,将更多精力用在处理业务逻辑上。
redisson对分布式锁做了很好封装,只需调用API即可。
1 RLock lock = redissonClient.getLock("stockLock");
redisson在加锁成功后,会注册一个定时任务监听这个锁,每隔10秒就去查看这个锁,如果还持有锁,就对过期时间进行续期。默认过期时间30秒。这个机制也被叫做:“看门狗”,这名字。。。
举例子:假如加锁的时间是30秒,过10秒检查一次,一旦加锁的业务没有执行完,就会进行一次续期,把锁的过期时间再次重置成30秒。
通过分析下边redisson的源码实现可以发现,不管是加锁、解锁、续约都是客户端把一些复杂的业务逻辑,通过封装在Lua脚本中发送给redis,保证这段复杂业务逻辑执行的原子性。
1@Slf4j
2@Service
3public class RedisDistributionLockPlus {
4
5 /**
6 * 加锁超时时间,单位毫秒, 即:加锁时间内执行完操作,如果未完成会有并发现象
7 */
8 private static final long DEFAULT_LOCK_TIMEOUT = 30;
9
10 private static final long TIME_SECONDS_FIVE = 5 ;
11
12 /**
13 * 每个key的过期时间 {@link LockContent}
14 */
15 private Map<String, LockContent> lockContentMap = new ConcurrentHashMap<>(512);
16
17 /**
18 * redis执行成功的返回
19 */
20 private static final Long EXEC_SUCCESS = 1L;
21
22 /**
23 * 获取锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:超时时间
24 */
25 private static final String LOCK_SCRIPT = "if redis.call('exists', KEYS[2]) == 1 then ARGV[2] = math.floor(redis.call('get', KEYS[2]) + 10) end " +
26 "if redis.call('exists', KEYS[1]) == 0 then " +
27 "local t = redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2]) " +
28 "for k, v in pairs(t) do " +
29 "if v == 'OK' then return tonumber(ARGV[2]) end " +
30 "end " +
31 "return 0 end";
32
33 /**
34 * 释放锁lua脚本, k1:获锁key, k2:续约耗时key, arg1:requestId,arg2:业务耗时 arg3: 业务开始设置的timeout
35 */
36 private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " +
37 "local ctime = tonumber(ARGV[2]) " +
38 "local biz_timeout = tonumber(ARGV[3]) " +
39 "if ctime > 0 then " +
40 "if redis.call('exists', KEYS[2]) == 1 then " +
41 "local avg_time = redis.call('get', KEYS[2]) " +
42 "avg_time = (tonumber(avg_time) * 8 + ctime * 2)/10 " +
43 "if avg_time >= biz_timeout - 5 then redis.call('set', KEYS[2], avg_time, 'EX', 24*60*60) " +
44 "else redis.call('del', KEYS[2]) end " +
45 "elseif ctime > biz_timeout -5 then redis.call('set', KEYS[2], ARGV[2], 'EX', 24*60*60) end " +
46 "end " +
47 "return redis.call('del', KEYS[1]) " +
48 "else return 0 end";
49 /**
50 * 续约lua脚本
51 */
52 private static final String RENEW_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('expire', KEYS[1], ARGV[2]) else return 0 end";
53
54
55 private final StringRedisTemplate redisTemplate;
56
57 public RedisDistributionLockPlus(StringRedisTemplate redisTemplate) {
58 this.redisTemplate = redisTemplate;
59 ScheduleTask task = new ScheduleTask(this, lockContentMap);
60 // 启动定时任务
61 ScheduleExecutor.schedule(task, 1, 1, TimeUnit.SECONDS);
62 }
63
64 /**
65 * 加锁
66 * 取到锁加锁,取不到锁一直等待知道获得锁
67 *
68 * @param lockKey
69 * @param requestId 全局唯一
70 * @param expire 锁过期时间, 单位秒
71 * @return
72 */
73 public boolean lock(String lockKey, String requestId, long expire) {
74 log.info("开始执行加锁, lockKey ={}, requestId={}", lockKey, requestId);
75 for (; ; ) {
76 // 判断是否已经有线程持有锁,减少redis的压力
77 LockContent lockContentOld = lockContentMap.get(lockKey);
78 boolean unLocked = null == lockContentOld;
79 // 如果没有被锁,就获取锁
80 if (unLocked) {
81 long startTime = System.currentTimeMillis();
82 // 计算超时时间
83 long bizExpire = expire == 0L ? DEFAULT_LOCK_TIMEOUT : expire;
84 String lockKeyRenew = lockKey + "_renew";
85
86 RedisScript<Long> script = RedisScript.of(LOCK_SCRIPT, Long.class);
87 List<String> keys = new ArrayList<>();
88 keys.add(lockKey);
89 keys.add(lockKeyRenew);
90 Long lockExpire = redisTemplate.execute(script, keys, requestId, Long.toString(bizExpire));
91 if (null != lockExpire && lockExpire > 0) {
92 // 将锁放入map
93 LockContent lockContent = new LockContent();
94 lockContent.setStartTime(startTime);
95 lockContent.setLockExpire(lockExpire);
96 lockContent.setExpireTime(startTime + lockExpire * 1000);
97 lockContent.setRequestId(requestId);
98 lockContent.setThread(Thread.currentThread());
99 lockContent.setBizExpire(bizExpire);
100 lockContent.setLockCount(1);
101 lockContentMap.put(lockKey, lockContent);
102 log.info("加锁成功, lockKey ={}, requestId={}", lockKey, requestId);
103 return true;
104 }
105 }
106 // 重复获取锁,在线程池中由于线程复用,线程相等并不能确定是该线程的锁
107 if (Thread.currentThread() == lockContentOld.getThread()
108 && requestId.equals(lockContentOld.getRequestId())){
109 // 计数 +1
110 lockContentOld.setLockCount(lockContentOld.getLockCount()+1);
111 return true;
112 }
113
114 // 如果被锁或获取锁失败,则等待100毫秒
115 try {
116 TimeUnit.MILLISECONDS.sleep(100);
117 } catch (InterruptedException e) {
118 // 这里用lombok 有问题
119 log.error("获取redis 锁失败, lockKey ={}, requestId={}", lockKey, requestId, e);
120 return false;
121 }
122 }
123 }
124
125
126 /**
127 * 解锁
128 *
129 * @param lockKey
130 * @param lockValue
131 */
132 public boolean unlock(String lockKey, String lockValue) {
133 String lockKeyRenew = lockKey + "_renew";
134 LockContent lockContent = lockContentMap.get(lockKey);
135
136 long consumeTime;
137 if (null == lockContent) {
138 consumeTime = 0L;
139 } else if (lockValue.equals(lockContent.getRequestId())) {
140 int lockCount = lockContent.getLockCount();
141 // 每次释放锁, 计数 -1,减到0时删除redis上的key
142 if (--lockCount > 0) {
143 lockContent.setLockCount(lockCount);
144 return false;
145 }
146 consumeTime = (System.currentTimeMillis() - lockContent.getStartTime()) / 1000;
147 } else {
148 log.info("释放锁失败,不是自己的锁。");
149 return false;
150 }
151
152 // 删除已完成key,先删除本地缓存,减少redis压力, 分布式锁,只有一个,所以这里不加锁
153 lockContentMap.remove(lockKey);
154
155 RedisScript<Long> script = RedisScript.of(UNLOCK_SCRIPT, Long.class);
156 List<String> keys = new ArrayList<>();
157 keys.add(lockKey);
158 keys.add(lockKeyRenew);
159
160 Long result = redisTemplate.execute(script, keys, lockValue, Long.toString(consumeTime),
161 Long.toString(lockContent.getBizExpire()));
162 return EXEC_SUCCESS.equals(result);
163
164 }
165
166 /**
167 * 续约
168 *
169 * @param lockKey
170 * @param lockContent
171 * @return true:续约成功,false:续约失败(1、续约期间执行完成,锁被释放 2、不是自己的锁,3、续约期间锁过期了(未解决))
172 */
173 public boolean renew(String lockKey, LockContent lockContent) {
174
175 // 检测执行业务线程的状态
176 Thread.State state = lockContent.getThread().getState();
177 if (Thread.State.TERMINATED == state) {
178 log.info("执行业务的线程已终止,不再续约 lockKey ={}, lockContent={}", lockKey, lockContent);
179 return false;
180 }
181
182 String requestId = lockContent.getRequestId();
183 long timeOut = (lockContent.getExpireTime() - lockContent.getStartTime()) / 1000;
184
185 RedisScript<Long> script = RedisScript.of(RENEW_SCRIPT, Long.class);
186 List<String> keys = new ArrayList<>();
187 keys.add(lockKey);
188
189 Long result = redisTemplate.execute(script, keys, requestId, Long.toString(timeOut));
190 log.info("续约结果,True成功,False失败 lockKey ={}, result={}", lockKey, EXEC_SUCCESS.equals(result));
191 return EXEC_SUCCESS.equals(result);
192 }
193
194
195 static class ScheduleExecutor {
196
197 public static void schedule(ScheduleTask task, long initialDelay, long period, TimeUnit unit) {
198 long delay = unit.toMillis(initialDelay);
199 long period_ = unit.toMillis(period);
200 // 定时执行
201 new Timer("Lock-Renew-Task").schedule(task, delay, period_);
202 }
203 }
204
205 static class ScheduleTask extends TimerTask {
206
207 private final RedisDistributionLockPlus redisDistributionLock;
208 private final Map<String, LockContent> lockContentMap;
209
210 public ScheduleTask(RedisDistributionLockPlus redisDistributionLock, Map<String, LockContent> lockContentMap) {
211 this.redisDistributionLock = redisDistributionLock;
212 this.lockContentMap = lockContentMap;
213 }
214
215 @Override
216 public void run() {
217 if (lockContentMap.isEmpty()) {
218 return;
219 }
220 Set<Map.Entry<String, LockContent>> entries = lockContentMap.entrySet();
221 for (Map.Entry<String, LockContent> entry : entries) {
222 String lockKey = entry.getKey();
223 LockContent lockContent = entry.getValue();
224 long expireTime = lockContent.getExpireTime();
225 // 减少线程池中任务数量
226 if ((expireTime - System.currentTimeMillis())/ 1000 < TIME_SECONDS_FIVE) {
227 //线程池异步续约
228 ThreadPool.submit(() -> {
229 boolean renew = redisDistributionLock.renew(lockKey, lockContent);
230 if (renew) {
231 long expireTimeNew = lockContent.getStartTime() + (expireTime - lockContent.getStartTime()) * 2 - TIME_SECONDS_FIVE * 1000;
232 lockContent.setExpireTime(expireTimeNew);
233 } else {
234 // 续约失败,说明已经执行完 OR redis 出现问题
235 lockContentMap.remove(lockKey);
236 }
237 });
238 }
239 }
240 }
241 }
242}
五、redis主从复制的坑
redis高可用最常见的方案就是主从复制(master-slave),这种模式也给redis分布式锁挖了一坑。
redis cluster集群环境下,假如现在A客户端想要加锁,它会根据路由规则选择一台master节点写入key mylock,在加锁成功后,master节点会把key异步复制给对应的slave节点。
如果此时redis master节点宕机,为保证集群可用性,会进行主备切换,slave变为了redis master。B客户端在新的master节点上加锁成功,而A客户端也以为自己还是成功加了锁的。
此时就会导致同一时间内多个客户端对一个分布式锁完成了加锁,导致各种脏数据的产生。
至于解决办法嘛,目前看还没有什么根治的方法,只能尽量保证机器的稳定性,减少发生此事件的概率。
总结
上面就是我在使用Redis 分布式锁时遇到的一些坑,有点小感慨,经常用一个方法填上这个坑,没多久就发现另一个坑又出来了,其实根本没有什么十全十美的解决方案,哪有什么银弹,只不过是在权衡利弊后,选一个在接受范围内的折中方案而已。
感悟
从正式成为一名程序员的那天起,注定要进行没有止境的学习,想要进阶高级或者专家,就要坚持每天都高效的学习,不要给自己的懒惰找借,这次我给你整理好了,我看你还有啥理由!私信回复【666】送你
相关推荐
- 从IDEA开始,迈进GO语言之门(idea got)
-
前言笔者在学习GO语言编程的时候,GO语言在国内还没有像JAVA/Php/Python那样普及,绕了不少的弯路,要开始入门学习一门编程语言,最好就先从选择一个好的编程语言的开发环境开始,有了这个开发环...
- 基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于SpringBoot...
- 基于springboot的个人服装管理系统java网上商城jsp源代码mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...
- 基于springboot的美食网站Java食品销售jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍基于springboot...
- 贸易管理进销存springboot云管货管账分析java jsp源代码mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目描述贸易管理进销存spring...
- SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql
-
本项目为前几天收费帮学妹做的一个项目,JavaEEJSP项目,在工作环境中基本使用不到,但是很多学校把这个当作编程入门的项目来做,故分享出本项目供初学者参考。一、项目介绍SpringBoot+V...
- 目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗
-
帮粉丝找了一个基于SpringBoot的天猫商城项目,快速部署运行,所用技术:MySQL,Druid,Log4j2,Maven,Echarts,Bootstrap...免费给大家分享出来前台演示...
- SpringBoot+Mysql实现的手机商城附带源码演示导入视频
-
今天为大家带来的是基于SpringBoot+JPA+Thymeleaf框架的手机商城管理系统,商城系统分为前台和后台、前台用的是Bootstrap框架后台用的是SpringBoot+JPA都是现在主...
- 全网首发!马士兵内部共享—1658页《Java面试突击核心讲》
-
又是一年一度的“金九银十”秋招大热门,为助力广大程序员朋友“面试造火箭”,小编今天给大家分享的便是这份马士兵内部的面试神技——1658页《Java面试突击核心讲》!...
- SpringBoot数据库操作的应用(springboot与数据库交互)
-
1.JDBC+HikariDataSource...
- SpringBoot 整合 Flink 实时同步 MySQL
-
1、需求在Flink发布SpringBoot打包的jar包能够实时同步MySQL表,做到原表进行新增、修改、删除的时候目标表都能对应同步。...
- SpringBoot + Mybatis + Shiro + mysql + redis智能平台源码分享
-
后端技术栈基于SpringBoot+Mybatis+Shiro+mysql+redis构建的智慧云智能教育平台基于数据驱动视图的理念封装element-ui,即使没有vue的使...
- Springboot+Mysql舞蹈课程在线预约系统源码附带视频运行教程
-
今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的Springboot+Mysql舞蹈课程在线预约系统,系统项目源代码在【猿来入此】获取!https://www.yuan...
- SpringBoot+Mysql在线众筹系统源码+讲解视频+开发文档(参考论文
-
今天发布的是由【猿来入此】的优秀学员独立做的一个基于springboot脚手架的在线众筹管理系统,主要实现了普通用户在线参与众筹基本操作流程的全部功能,系统分普通用户、超级管理员等角色,除基础脚手架外...
- Docker一键部署 SpringBoot 应用的方法,贼快贼好用
-
这两天发现个Gradle插件,支持一键打包、推送Docker镜像。今天我们来讲讲这个插件,希望对大家有所帮助!GradleDockerPlugin简介...
你 发表评论:
欢迎- 一周热门
- 最近发表
-
- 从IDEA开始,迈进GO语言之门(idea got)
- 基于SpringBoot+MyBatis的私人影院java网上购票jsp源代码Mysql
- 基于springboot的个人服装管理系统java网上商城jsp源代码mysql
- 基于springboot的美食网站Java食品销售jsp源代码Mysql
- 贸易管理进销存springboot云管货管账分析java jsp源代码mysql
- SpringBoot+VUE员工信息管理系统Java人员管理jsp源代码Mysql
- 目前见过最牛的一个SpringBoot商城项目(附源码)还有人没用过吗
- SpringBoot+Mysql实现的手机商城附带源码演示导入视频
- 全网首发!马士兵内部共享—1658页《Java面试突击核心讲》
- SpringBoot数据库操作的应用(springboot与数据库交互)
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- node卸载 (33)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- exceptionininitializererror (33)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)