文章目录
- 前言
- 一、分布式锁演进
- 1.1 分布式锁特点
- 1.2 阶段一
- 1.3 阶段二
- 1.4 阶段三
- 1.5 阶段四
前言
在单体应用下当多线程去竞争某一共享资源时,我们通常会用一把锁来保证只有一个线程获取到资源。如加上 synchronize 关键字或 ReentrantLock 锁等操作。
在分布式应用中,多个应用服务要同时对同一条数据做修改,单靠本地锁并不能解决分布式情况下多进程竞争共享资源带来的数据安全性问题。我们可以使用 redis 来实现分布式锁,来确保数据的正确性。
一、分布式锁演进
1.1 分布式锁特点
-
互斥性:在任意时刻,对于同一个锁,只有一个客户端能持有,从而保证一个共享资源同一时间只能被一个客户端操作;
-
安全性:即不会形成死锁,当一个客户端在持有锁的期间崩溃而没有主动解锁的情况下,其持有的锁也能够被正确释放,并保证后续其它客户端能加锁;
-
可用性:当提供锁服务的节点发生宕机等不可恢复性故障时,“热备” 节点能够接替故障的节点继续提供服务,并保证自身持有的数据与故障节点一致。
-
对称性:对于任意一个锁,其加锁和解锁必须是同一个客户端,即客户端 A 不能把客户端 B 加的锁给解了。
如上众多的商品服务分别部署在不同机器上同时去一个地方占坑,如果占到,就执行逻辑。否则就必须等待,直到释放锁。占坑可以去redis,可以去数据库,可以去任何大家都能访问的地方。等待可以自旋的方式。
1.2 阶段一
阶段一所对应的代码
/*** 使用redis实现分布式锁* @return*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {//1.占分布式锁:去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111");if (lock) {//加锁成功,执行业务Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB();//执行业务后删除锁redisTemplate.delete("lock");return dataFromDB;}else {//加锁失败,重试自旋,休眠100msreturn getCatalogJsonFromDBWithRedisLock();}
}
1.1.1 出现问题
setnx占好了位,业务代码异常或者程序在页面过程中宕机。没有执行删除锁逻辑,这就造成了死锁。
1.1.2 改进方案
设置锁的自动过期,即使没有删除,会自动删除,而且得使用set key value EX 300 NX 确保设置 key 的同时设置 key 的过期时间是一个原子性操作。改进后的代码如下。
/*** 使用redis实现分布式锁* @return*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {//1.占分布式锁:去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",300,TimeUnit.SECONDS);if (lock) {//2.确保加锁和设置key的过期时间是个原子性操作//加锁成功,执行业务Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB();//执行业务后删除锁redisTemplate.delete("lock");return dataFromDB;}else {//加锁失败,重试自旋,休眠100msreturn getCatalogJsonFromDBWithRedisLock();}
}
1.3 阶段二
1.2.1 出现问题
假设锁的过期时间是10s业务代码执行12s线程A执行完了业务代码之后,此时锁已经过期,而且被其他线程所抢占,线程A正准备执行删锁操作,此时删除的时别的线程的锁。
1.2.2 改进方案
删除锁的时候得需要判断是否是自己的锁。
/**
* 使用redis实现分布式锁
* @return
*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {String uuid = UUID.randomUUID().toString().toLowerCase();//1.占分布式锁:去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);if (lock) {// 确保加锁和设置key的过期时间是个原子性操作//加锁成功,执行业务Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB();//确保是自己的锁才能删除String value = redisTemplate.opsForValue().get("lock");if (value.equals(uuid)) {//执行业务后删除锁redisTemplate.delete("lock");}return dataFromDB;}else {//加锁失败,重试自旋,休眠100msreturn getCatalogJsonFromDBWithRedisLock();}
}
1.4 阶段三
1.3.1 出现问题
1.如果正好判断是当前值,正要删除锁的时候,锁已经过期,别人已经设置到了新的值。那么我们删除的是别人的锁。
2.锁的过期时间小于业务执行时间,导致提前删锁
1.3.2 改进方案
1.删除锁必须保证原子性,使用redis+Lua脚本完成。
2.锁时间自动续期
1.5 阶段四
1.4.1 Lua 脚本
if redis.call("get",KEYS[1]) == ARGV[1]
thenreturn redis.call("del",KEYS[1])
elsereturn 0
end
1.4.2 最终方案
/*** 使用redis实现分布式锁* @return*/
public Map<String, List<Catelog2Vo>> getCatalogJsonFromDBWithRedisLock() {String uuid = UUID.randomUUID().toString().toLowerCase();//1.占分布式锁:去redis占坑Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300,TimeUnit.SECONDS);Map<String, List<Catelog2Vo>> dataFromDB;if (lock) {log.info("获取分布式锁成功!");try{// 确保加锁和设置key的过期时间是个原子性操作//加锁成功,执行业务dataFromDB = getDataFromDB();}finally {String script = "lua 脚本";Long execute = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock", uuid));}}else {//加锁失败,重试自旋,休眠100mslog.info("获取分布式锁失败!");try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}return getCatalogJsonFromDBWithRedisLock();}return dataFromDB;
}