code-learning/redis/05-Redisson 源码分析-可靠分布式锁 RedLock.md

906 lines
38 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 精尽 Redisson 源码分析 —— 可靠分布式锁 RedLock
# 1. 概述
在 [《精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock》](http://svip.iocoder.cn/Redisson/ReentrantLock/?self) 中,艿艿臭长臭长的分享了 Redisson 是如何实现可重入的 ReentrantLock 锁,一切看起来很完美,我们能够正确的加锁,也能正确的释放锁。但是,我们来看一个 Redis 主从结构下的示例Redis 分布式锁是如何失效的:
- 1、客户端 A 从 Redis Master 获得到锁 `anylock`
- 2、在 Redis Master 同步 `anylock` 到 Redis Slave 之前Master 挂了。
- 3、Redis Slave 晋升为**新的** Redis Master 。
- 4、客户端 B 从**新的** Redis Master 获得到锁 `anylock`
此时,客户端 A 和 B 同时持有 `anylock` 锁,已经失效。当然,这个情况是极小概率事件。主要看胖友业务对分布式锁可靠性的诉求。
在 Redis 分布式锁存在失效的问题Redis 的作者 Antirez 大神提出了红锁 RedLock 的想法。我们来看看它的描述。
> FROM [《Redis 文档 —— Redis 分布式锁》](http://redis.cn/topics/distlock.html)
>
> 在 Redis 的分布式环境中,我们假设有 N 个 Redis master 。这些节点完全互相独立,不存在主从复制或者其他集群协调机制。之前我们已经描述了在 Redis 单实例下怎么安全地获取和释放锁。我们确保将在每 N 个实例上使用此方法获取和释放锁。在这个样例中,我们假设有 5 个Redis master 节点,这是一个比较合理的设置,所以我们需要在 5 台机器上面或者 5 台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。
>
> 为了取到锁,客户端应该执行以下操作:
>
> - 1、获取当前 Unix 时间,以毫秒为单位。
> - 2、依次尝试从 N 个实例,使用相同的 key 和随机值获取锁。在步骤 2 ,当向 Redis 设置锁时,客户端应该设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以避免服务器端 Redis 已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试另外一个 Redis 实例。
> - 3、客户端使用当前时间减去开始获取锁时间步骤 1 记录的时间)就得到获取锁使用的时间。当且仅当从大多数(这里是 3 个节点)的 Redis 节点都取到锁,并且使用的时间小于锁失效时间时,锁才算获取成功。
> - 4、如果取到了锁key 的真正有效时间等于有效时间减去获取锁所使用的时间(步骤 3 计算的结果)。
> - 5、如果因为某些原因获取锁失败没有在至少 `N/2 + 1` 个 Redis 实例取到锁或者取锁时间已经超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁即便某些Redis实例根本就没有加锁成功
>
> 释放锁:
>
> - 1、释放锁比较简单向所有的 Redis 实例发送释放锁命令即可不用关心之前有没有从Redis实例成功获取到锁.
可能一看内容这么长,略微有点懵逼。**重点理解,需要至少在 `N/2 + 1` Redis 节点获得锁成功。**这样,即使出现某个 Redis Master 未同步锁信息到 Redis Slave 节点之前,突然挂了,也不**容易**出现多个客户端获得相同锁,因为**需要至少在 `N/2 + 1` Redis 节点获得锁成功。**。
当然,极端情况下也有。我们以 3 个 Redis Master 节点举例子:
- 1、客户端 A 从 3 个 Redis Master 获得到锁 `anylock`
- 2、**2 个** Redis Master 同步 anylock 到 Redis Slave 之前Master 都挂了。
- 3、**2 个** Redis Slave 晋升为**新的** Redis Master 。
- 4、客户端 B 从**新的** Redis Master 获得到锁 `anylock`
理论来说,出现 **2 个** Redis Master 都挂了,并且数据都未同步到 Redis Slave 的情况,已经是小概率的事件。当然,哈哈哈哈,我们就是可爱的“杠精”,就是要扣一扣这个边界情况。
同时,我们也可以发现,在时候用 RedLock 的时候Redis Master 越多,集群的可靠性就越高,性能也会越低。😈 架构设计中,从来没有银弹。我们想要得到更高的可靠性,往往需要失去一定的性能。
对了有一点要注意N 个 Redis Master 要**毫无关联**的。例如说,任一一个 Redis Master 都不能在同一个 Redis Cluster 中。再如下图,就是一个符合条件的:
> FROM [《慢谈 Redis 实现分布式锁 以及 Redisson 源码解析》](https://crazyfzw.github.io/2019/04/15/distributed-locks-with-redis/)
>
> [![Redis Master 示例](05-Redisson 源码分析-可靠分布式锁 RedLock.assets/834b021723e3d41e09763ce83f6e8cf4.png)](http://static.iocoder.cn/834b021723e3d41e09763ce83f6e8cf4)Redis Master 示例
>
> 当然,推荐 N 是奇数个,因为 `N / 2 + 1` 嘛,哈哈。
推荐胖友阅读如下三篇文章,更进一步了解 RedLock
- [《Redis 文档 —— Redis 分布式锁》](http://redis.cn/topics/distlock.html)
- [《How to do distributed locking》](http://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html)
- [《Is Redlock safe?》](http://antirez.com/news/101) 简直神仙打架,强烈推荐。
# 2. RedissonRedLock
[`org.redisson.RedissonRedLock`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/RedissonRedLock.java) ,继承自联锁 RedissonMultiLock Redisson 对 RedLock 的实现类。代码如下:
```
// RedissonRedLock.java
public class RedissonRedLock extends RedissonMultiLock {
/**
* Creates instance with multiple {@link RLock} objects.
* Each RLock object could be created by own Redisson instance.
*
* @param locks - array of locks
*/
public RedissonRedLock(RLock... locks) {
super(locks);
}
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}
@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1);
}
@Override
public void unlock() {
unlockInner(locks);
}
}
```
- RedissonMultiLock ,联锁,正如其名字“**联**”,可以将多个 RLock 锁关联成一个联锁。使用示例如下:
```
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 给lock1lock2lock3加锁如果没有手动解开的话10秒钟后将会自动解开
lock.lock(10, TimeUnit.SECONDS);
// 为加锁等待100秒时间并在加锁成功10秒钟后自动解开
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
```
- RedissonRedLock ,是一个**特殊**的联锁,加锁时无需所有的 RLock 都成功,只需要满足 `N / 2 + 1` 个 RLock 即可。使用示例如下:
```
RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁lock1 lock2 lock3
// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();
```
- RedissonRedLock 构造方法,创建时,传入多个 RLock 对象。一般来说,每个 RLock 对应到一个 Redis Master 节点上。
- `#failedLocksLimit()` 方法,允许失败加锁的数量。从 `#minLocksAmount(final List<RLock> locks)` 方法上,我们已经看到 `N / 2 + 1` 的要求。
- `#calcLockWaitTime(long remainTime)` 方法,计算每次获得 RLock 的锁的等待时长。目前的计算规则是,总的等待时间 `remainTime` 平均分配到每个 RLock 上。
- `#unlock()` 方法,解锁时,需要所有 RLock 都进行解锁。
# 3. RedissonMultiLock
[`org.redisson.RedissonMultiLock`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/RedissonMultiLock.java) ,实现 RLock 接口Redisson 对联锁 MultiLock 的实现类。
## 3.1 构造方法
```
// RedissonMultiLock.java
/**
* RLock 数组
*/
final List<RLock> locks = new ArrayList<>();
/**
* Creates instance with multiple {@link RLock} objects.
* Each RLock object could be created by own Redisson instance.
*
* @param locks - array of locks
*/
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}
```
## 3.2 failedLocksLimit
`#failedLocksLimit()` 方法,允许失败加锁的数量。代码如下:
```
// RedissonMultiLock.java
protected int failedLocksLimit() {
return 0;
}
```
默认返回值是 0 ,也就是必须所有 RLock 都加锁成功。
在 RedissonRedLock 中,会重写该方法。
```
// RedissonRedLock.java
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
protected int minLocksAmount(final List<RLock> locks) {
return locks.size() / 2 + 1;
}
```
## 3.3 calcLockWaitTime
`#failedLocksLimit(long remainTime)` 方法,计算每次获得 RLock 的锁的等待时长。代码如下:
```
// RedissonMultiLock.java
protected long calcLockWaitTime(long remainTime) {
return remainTime;
}
```
默认直接返回 `remainTime` ,也就是说,每次获得 RLock 的锁的等待时长都是 `remainTime` 。
在 RedissonRedLock 中,会重写该方法。
## 3.4 tryLock
`#tryLock(long waitTime, long leaseTime, TimeUnit unit)` 方法,同步**加锁**,并返回加锁是否成功。代码如下:
```
// RedissonMultiLock.java
@Override
public boolean tryLock() {
try {
// 同步获得锁
return tryLock(-1, -1, null);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
1: @Override
2: public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
3: // 计算新的锁的时长 newLeaseTime
4: long newLeaseTime = -1;
5: if (leaseTime != -1) {
6: if (waitTime == -1) { // 如果无限等待,则直接使用 leaseTime 即可。
7: newLeaseTime = unit.toMillis(leaseTime);
8: } else { // 如果设置了等待时长,则为等待时间 waitTime * 2 。不知道为什么要 * 2 ?例如说,先获得到了第一个锁,然后在获得第二个锁的时候,阻塞等待了 waitTime ,那么可能第一个锁就已经自动过期,所以 * 2 避免这个情况。
9: newLeaseTime = unit.toMillis(waitTime) * 2;
10: }
11: }
12:
13: long time = System.currentTimeMillis();
14: // 计算剩余等待锁的时间 remainTime
15: long remainTime = -1;
16: if (waitTime != -1) {
17: remainTime = unit.toMillis(waitTime);
18: }
19: // 计算每个锁的等待时间
20: long lockWaitTime = calcLockWaitTime(remainTime);
21:
22: // 允许获得锁失败的次数
23: int failedLocksLimit = failedLocksLimit();
24: // 已经获得到锁的数组
25: List<RLock> acquiredLocks = new ArrayList<>(locks.size());
26: // 遍历 RLock 数组,逐个获得锁
27: for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
28: // 当前 RLock
29: RLock lock = iterator.next();
30: boolean lockAcquired; // 标记是否获得到锁
31: try {
32: // 如果等待时间 waitTime 为 -1不限制并且锁时长为 -1不限制则使用 #tryLock() 方法。
33: if (waitTime == -1 && leaseTime == -1) {
34: lockAcquired = lock.tryLock();
35: // 如果任一不为 -1 时,则计算新的等待时间 awaitTime ,然后调用 #tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法。
36: } else {
37: long awaitTime = Math.min(lockWaitTime, remainTime);
38: lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
39: }
40: } catch (RedisResponseTimeoutException e) {
41: // 发生响应超时。因为无法确定实际是否获得到锁,所以直接释放当前 RLock
42: unlockInner(Collections.singletonList(lock));
43: // 标记未获得锁
44: lockAcquired = false;
45: } catch (Exception e) {
46: // 标记未获得锁
47: lockAcquired = false;
48: }
49:
50: // 如果获得成功,则添加到 acquiredLocks 数组中
51: if (lockAcquired) {
52: acquiredLocks.add(lock);
53: } else {
54: // 如果已经到达最少需要获得锁的数量,则直接 break 。例如说RedLock 只需要获得 N / 2 + 1 把。
55: if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
56: break;
57: }
58:
59: // 当已经没有允许失败的数量,则进行相应的处理
60: if (failedLocksLimit == 0) {
61: // 释放所有的锁
62: unlockInner(acquiredLocks);
63: // 如果未设置阻塞时间,直接返回 false ,表示失败。因为是 tryLock ,只是尝试加锁一次,不会无限重试。
64: if (waitTime == -1) {
65: return false;
66: }
67: // 重置整个获得锁的过程,在剩余的时间里,重新来一遍
68: // 重置 failedLocksLimit 变量
69: failedLocksLimit = failedLocksLimit();
70: // 重置 acquiredLocks 为空
71: acquiredLocks.clear();
72: // reset iterator
73: // 重置 iterator 设置回迭代器的头
74: while (iterator.hasPrevious()) {
75: iterator.previous();
76: }
77: // failedLocksLimit 减一
78: } else {
79: failedLocksLimit--;
80: }
81: }
82:
83: // 计算剩余时间 remainTime
84: if (remainTime != -1) {
85: remainTime -= System.currentTimeMillis() - time;
86: // 记录新的当前时间
87: time = System.currentTimeMillis();
88: // 如果没有剩余时间,意味着已经超时,释放所有加载成功的锁,并返回 false
89: if (remainTime <= 0) {
90: unlockInner(acquiredLocks);
91: return false;
92: }
93: }
94: }
95:
96: // 如果设置了锁的过期时间 leaseTime ,则重新设置每个锁的过期时间
97: if (leaseTime != -1) {
98: // 遍历 acquiredLocks 数组,创建异步设置过期时间的 Future
99: List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
100: for (RLock rLock : acquiredLocks) {
101: RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
102: futures.add(future);
103: }
104:
105: // 阻塞等待所有 futures 完成
106: for (RFuture<Boolean> rFuture : futures) {
107: rFuture.syncUninterruptibly();
108: }
109: }
110:
111: // 返回 true ,表示加锁成功
112: return true;
113: }
```
- 超 100 行代码,是不是有点慌?!核心逻辑是,首先从 `locks` 数组中逐个获得锁(第 27 至 94 行),然后统一设置每个锁的过期时间(第 96 至 10 9 行)。当然,还是有很多细节,我们一点点来看。
- 第 3 至 11 行:计算新的锁的时长 `newLeaseTime` 。注意,`newLeaseTime` 是用于遍历 `locks` 数组来获得锁设置的锁时长,最终在第 96 至 109 行的代码中,会设置真正的 `leaseTime` 锁的时长。整个的计算规则,看下艿艿添加的注释。
- 第 14 至 18 行:计算剩余等待锁的时间 `remainTime` 。
- 第 20 行:调用 `#calcLockWaitTime(long remainTime)` 方法,计算获得每个锁的等待时间。在 [「2. RedissonRedLock」](https://svip.iocoder.cn/Redisson/RedLock/#) 中,我们已经看到,是 `remainTime` 进行平均分配。
- 第 25 行:已经获得到锁的数组 `acquiredLocks` 。
- 下面,我们分成阶段一(加锁)和阶段二(设置锁过期时间)来抽丝剥茧。
- **【阶段一】**第 26 至 94 行:遍历 RLock 数组,逐个获得锁。
- 第 32 至 39 行:根据条件,调用对应的 `RLock#tryLock(...)` 方法获得锁。一般情况下RedissonRedLock 搭配 RedissonLock 使用,这块我们已经在 [《精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock》](http://svip.iocoder.cn/Redisson/ReentrantLock/?self) 有个详细的解析了。
- 第 40 至 44 行:如果发生响应 RedisResponseTimeoutException 超时异常时,因为无法确定实际是否获得到锁,所以直接调用 `#unlockInner(Collection<RLock> locks)` 方法,释放当前 RLock 。
- 第 50 至 52 行:如果获得成功,则添加到 `acquiredLocks` 数组中。
- 【重要】第 54 至 57 行:如果已经到达最少需要获得锁的数量,则直接 break 。例如说RedLock 只需要获得 `N / 2 + 1` 把。
- 第 77 至 80 行:`failedLocksLimit` 减一。
- 第 59 至 76 行:当已经没有允许失败的数量,则进行相应的处理。
- 第 62 行:因为已经失败了,所以调用 `#unlockInner(Collection<RLock> locks)` 方法,释放所有已经获得到的锁们。
- 第 63 至 66 行:如果*未设置*阻塞时间,直接返回 `false` 加锁失败。因为是 tryLock 方法,只是尝试加锁一次,不会无限重试。
- 第 67 至 76 行:重置整个获得锁的过程,在剩余的时间里,重新来一遍。因为*已设置*了阻塞时间,必须得用完!
- 第 84 至 93 行:计算剩余时间 `remainTime` 。如果没有剩余的时间,意味着已经超时,则 调用 `#unlockInner(Collection<RLock> locks)` 方法,释放所有加载成功的锁,并返回 `false` 加锁失败。
- **【阶段二】**第 96 至 109 行:如果设置了锁的过期时间 `leaseTime` ,则重新设置每个锁的过期时间。
- 第 98 至 103 行:遍历 `acquiredLocks` 数组,创建异步设置过期时间的 Future 。
- 第 105 至 108 行:阻塞等待所有 `futures` 完成。如果任一一个 Future 执行失败,则会抛出异常。
## 3.5 tryLockAsync
`#tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)` 方法,异步**加锁**,并返回加锁是否成功。代码如下:
```
// RedissonMultiLock.java
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit) {
return tryLockAsync(waitTime, leaseTime, unit, Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 创建 RPromise 对象,用于通知结果
RPromise<Boolean> result = new RedissonPromise<Boolean>();
// 创建 LockState 对象
LockState state = new LockState(waitTime, leaseTime, unit, threadId);
// <X> 发起异步加锁
state.tryAcquireLockAsync(locks.listIterator(), result);
// 返回结果
return result;
}
```
- 这个的逻辑在 `<X>` 处,调用 `LockState#tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result)` 方法,发起异步加锁。
🔥 LockState 是 RedissonMultiLock 的内部类,实现异步加锁的逻辑。构造方法如下:
```
// RedissonMultiLock.LockState.java
class LockState {
private final long newLeaseTime;
private final long lockWaitTime;
private final List<RLock> acquiredLocks;
private final long waitTime;
private final long threadId;
private final long leaseTime;
private final TimeUnit unit;
private long remainTime;
private long time = System.currentTimeMillis();
private int failedLocksLimit;
LockState(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
this.waitTime = waitTime;
this.leaseTime = leaseTime;
this.unit = unit;
this.threadId = threadId;
// 计算新的锁的时长 newLeaseTime
if (leaseTime != -1) {
if (waitTime == -1) {
newLeaseTime = unit.toMillis(leaseTime);
} else {
newLeaseTime = unit.toMillis(waitTime) * 2;
}
} else {
newLeaseTime = -1;
}
// 计算剩余等待锁的时间 remainTime
remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
// 计算每个锁的等待时间
lockWaitTime = calcLockWaitTime(remainTime);
// 允许获得锁失败的次数
failedLocksLimit = failedLocksLimit();
// 已经获得到锁的数组
acquiredLocks = new ArrayList<>(locks.size());
}
// ... 省略其它方法
}
```
- 构造方法的逻辑,和 [「3.4 tryLock(long waitTime, long leaseTime, TimeUnit unit)」](https://svip.iocoder.cn/Redisson/RedLock/#) 的第 3 至 25 行的代码是一致的。所以,我们可以理解成构造方法,相当于做了一遍变量的初始化。
🔥 `LockState#tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result)` 方法,发起异步加锁。代码如下:
> 整体逻辑,和 [「3.4 tryLock(long waitTime, long leaseTime, TimeUnit unit)」](https://svip.iocoder.cn/Redisson/RedLock/#) 方法的逻辑基本一致,所以艿艿就不啰嗦详细说,而是告诉它们的对等关系。
```
// RedissonMultiLock.LockState.java
1: void tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
2: // 如果迭代 iterator 的尾部,则重新设置每个锁的过期时间
3: if (!iterator.hasNext()) {
4: checkLeaseTimeAsync(result);
5: return;
6: }
7:
8: // 获得下一个 RLock 对象
9: RLock lock = iterator.next();
10: // 创建 RPromise 对象
11: RPromise<Boolean> lockAcquiredFuture = new RedissonPromise<Boolean>();
12: // 如果等待时间 waitTime 为 -1不限制并且锁时长为 -1不限制则使用 #tryLock() 方法。
13: if (waitTime == -1 && leaseTime == -1) {
14: lock.tryLockAsync(threadId)
15: .onComplete(new TransferListener<Boolean>(lockAcquiredFuture));
16: // 如果任一不为 -1 时,则计算新的等待时间 awaitTime ,然后调用 #tryLock(long waitTime, long leaseTime, TimeUnit unit) 方法。
17: } else {
18: long awaitTime = Math.min(lockWaitTime, remainTime);
19: lock.tryLockAsync(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS, threadId)
20: .onComplete(new TransferListener<Boolean>(lockAcquiredFuture));
21: }
22:
23: lockAcquiredFuture.onComplete((res, e) -> {
24: // 如果 res 非空,设置 lockAcquired 是否获得到锁
25: boolean lockAcquired = false;
26: if (res != null) {
27: lockAcquired = res;
28: }
29:
30: // 发生响应超时。因为无法确定实际是否获得到锁,所以直接释放当前 RLock
31: if (e instanceof RedisResponseTimeoutException) {
32: unlockInnerAsync(Collections.singletonList(lock), threadId);
33: }
34:
35: // 如果获得成功,则添加到 acquiredLocks 数组中
36: if (lockAcquired) {
37: acquiredLocks.add(lock);
38: } else {
39: // 如果已经到达最少需要获得锁的数量,则直接 break 。例如说RedLock 只需要获得 N / 2 + 1 把。
40: if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
41: checkLeaseTimeAsync(result);
42: return;
43: }
44:
45: // 当已经没有允许失败的数量,则进行相应的处理
46: if (failedLocksLimit == 0) {
47: // 创建释放所有的锁的 Future
48: unlockInnerAsync(acquiredLocks, threadId).onComplete((r, ex) -> {
49: // 如果发生异常,则通过 result 回调异常
50: if (ex != null) {
51: result.tryFailure(ex);
52: return;
53: }
54:
55: // 如果未设置阻塞时间,直接通知 result 失败。因为是 tryLock ,只是尝试加锁一次,不会无限重试。
56: if (waitTime == -1) {
57: result.trySuccess(false);
58: return;
59: }
60:
61: // 重置整个获得锁的过程,在剩余的时间里,重新来一遍
62: // 重置 failedLocksLimit 变量
63: failedLocksLimit = failedLocksLimit();
64: // 重置 acquiredLocks 为空
65: acquiredLocks.clear();
66: // reset iterator
67: // 重置 iterator 设置回迭代器的头
68: while (iterator.hasPrevious()) {
69: iterator.previous();
70: }
71:
72: // 校验剩余时间是否足够
73: checkRemainTimeAsync(iterator, result);
74: });
75: return;
76: } else {
77: failedLocksLimit--;
78: }
79: }
80:
81: // 校验剩余时间是否足够
82: checkRemainTimeAsync(iterator, result);
83: });
84: }
```
- 第 2 至 6 行:如果迭代 `iterator` 的尾部,则调用 `#checkLeaseTimeAsync(RPromise<Boolean> result)` 方法,重新设置每个锁的过期时间。代码如下:
```
// RedissonMultiLock.LockState.java
private void checkLeaseTimeAsync(RPromise<Boolean> result) {
// 如果设置了锁的过期时间 leaseTime ,则重新设置每个锁的过期时间
if (leaseTime != -1) {
// 创建 AtomicInteger 计数器,用于回调逻辑的计数,从而判断是不是所有回调都执行完了
AtomicInteger counter = new AtomicInteger(acquiredLocks.size());
// 遍历 acquiredLocks 数组,逐个设置过期时间
for (RLock rLock : acquiredLocks) {
// 创建异步设置过期时间的 RFuture
RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
future.onComplete((res, e) -> {
// 如果发生异常,则通过 result 回调异常
if (e != null) {
result.tryFailure(e);
return;
}
// 如果全部成功,则通过 result 回调加锁成功
if (counter.decrementAndGet() == 0) {
result.trySuccess(true);
}
});
}
return;
}
// 如果未设置了锁的过期时间 leaseTime ,则通过 result 回调加锁成功
result.trySuccess(true);
}
```
- 对标到 [「3.4 tryLock(long waitTime, long leaseTime, TimeUnit unit)」](https://svip.iocoder.cn/Redisson/RedLock/#) 的第 96 至 109 行。
- 第 8 至 79 行:对标到 [「3.4 tryLock(long waitTime, long leaseTime, TimeUnit unit)」](https://svip.iocoder.cn/Redisson/RedLock/#) 的第 28 至 81 行。
- 第 73 和 82 行:调用 `#checkRemainTimeAsync(ListIterator<RLock> iterator, RPromise<Boolean> result)` 方法,校验剩余时间是否足够。代码如下:
```
// RedissonMultiLock.LockState.java
private void checkRemainTimeAsync(ListIterator<RLock> iterator, RPromise<Boolean> result) {
// 如果设置了等待超时时间,计算剩余时间 remainTime
if (remainTime != -1) {
remainTime += -(System.currentTimeMillis() - time);
// 记录新的当前时间
time = System.currentTimeMillis();
// 如果没有剩余时间,意味着已经超时,释放所有加载成功的锁
if (remainTime <= 0) {
// 创建释放所有已获得到锁们的 Future
unlockInnerAsync(acquiredLocks, threadId).onComplete((res, e) -> {
// 如果发生异常,则通过 result 回调异常
if (e != null) {
result.tryFailure(e);
return;
}
// 如果全部成功,则通过 result 回调加锁成功
result.trySuccess(false);
});
// return 返回结束
return;
}
}
// <Y> 如果未设置等待超时时间,则继续加锁下一个 RLock
tryAcquireLockAsync(iterator, result);
}
```
- 对标到 [「3.4 tryLock(long waitTime, long leaseTime, TimeUnit unit)」](https://svip.iocoder.cn/Redisson/RedLock/#) 的第 83 至 93 行。
- 注意,`<Y>` 处,会递归调用 `#tryAcquireLockAsync(ListIterator<RLock> iterator, RPromise<Boolean> result)` 方法,继续加锁下一个 RLock 。
😈 总的来说,还是蛮简单的不是,哈哈哈哈。
## 3.6 lock
`#lockInterruptibly(long leaseTime, TimeUnit unit)` 方法,同步**加锁**,不返回加锁是否成功。代码如下:
```
// RedissonMultiLock.java
@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lockInterruptibly(leaseTime, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}
1: @Override
2: public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
3: // 计算 waitTime 时间
4: long baseWaitTime = locks.size() * 1500;
5: long waitTime = -1;
6: if (leaseTime == -1) { // 如果未设置超时时间,则直接使用 baseWaitTime
7: waitTime = baseWaitTime;
8: } else {
9: leaseTime = unit.toMillis(leaseTime);
10: waitTime = leaseTime;
11: if (waitTime <= 2000) { // 保证最小 waitTime 时间是 2000
12: waitTime = 2000;
13: } else if (waitTime <= baseWaitTime) { // 在 [waitTime / 2, waitTime) 之间随机
14: waitTime = ThreadLocalRandom.current().nextLong(waitTime / 2, waitTime);
15: } else { // 在 [baseWaitTime, waitTime) 之间随机
16: waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
17: }
18: }
19:
20: // 死循环,直到加锁成功
21: while (true) {
22: if (tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)) {
23: return;
24: }
25: }
26: }
```
- 第 2 至 18 行:计算 `waitTime` 时间。😈 我也没弄懂,为啥是这么设计,难道是因为经验值?后面去细细的翻查下原因。
- 第 20 至 25 行:死循环,调用 [3.4 `#tryLock(long waitTime, long leaseTime, TimeUnit unit)`](https://svip.iocoder.cn/Redisson/RedLock/#) 方法,直到加锁成功。
## 3.7 lockAsync
`#lockAsync(long leaseTime, TimeUnit unit, long threadId)` 方法,异步**加锁**,不返回加锁是否成功。代码如下:
```
// RedissonMultiLock.java
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit) {
return lockAsync(leaseTime, unit, Thread.currentThread().getId());
}
1: @Override
2: public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long threadId) {
3: // 计算 waitTime 时间
4: long baseWaitTime = locks.size() * 1500;
5: long waitTime;
6: if (leaseTime == -1) { // 如果未设置超时时间,则直接使用 baseWaitTime
7: waitTime = baseWaitTime;
8: } else {
9: leaseTime = unit.toMillis(leaseTime);
10: waitTime = leaseTime;
11: if (waitTime <= 2000) { // 保证最小 waitTime 时间是 2000
12: waitTime = 2000;
13: } else if (waitTime <= baseWaitTime) { // 在 [waitTime / 2, waitTime) 之间随机
14: waitTime = ThreadLocalRandom.current().nextLong(waitTime / 2, waitTime);
15: } else { // 在 [baseWaitTime, waitTime) 之间随机
16: waitTime = ThreadLocalRandom.current().nextLong(baseWaitTime, waitTime);
17: }
18: }
19:
20: // 创建 RPromise 对象
21: RPromise<Void> result = new RedissonPromise<Void>();
22: // 执行异步加锁
23: tryLockAsync(threadId, leaseTime, TimeUnit.MILLISECONDS, waitTime, result);
24: return result;
25: }
```
- 第 3 至 18 行:计算 `waitTime` 时间。和 `#lockInterruptibly(long leaseTime, TimeUnit unit)` 方法,看到的逻辑是一致的。
- 第 23 行:调用 `#tryLockAsync(long threadId, long leaseTime, TimeUnit unit, long waitTime, RPromise<Void> result)` 方法,执行异步加锁。代码如下:
```
// RedissonMultiLock.java
protected void tryLockAsync(long threadId, long leaseTime, TimeUnit unit, long waitTime, RPromise<Void> result) {
// <X1> 执行异步加锁锁
tryLockAsync(waitTime, leaseTime, unit, threadId).onComplete((res, e) -> {
// 如果发生异常,则通过 result 回调异常
if (e != null) {
result.tryFailure(e);
return;
}
// <X2> 如果加锁成功,则通知 result 成功
if (res) {
result.trySuccess(null);
// <X3> 如果加锁失败,则递归调用 tryLockAsync 方法
} else {
tryLockAsync(threadId, leaseTime, unit, waitTime, result);
}
});
}
```
- `<X1>` 处,调用 [3.5 `#tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId)`](https://svip.iocoder.cn/Redisson/RedLock/#) 方法,执行异步加锁锁。
- `<X2>` 处,如果加锁成功,则通知 `result` 成功。
- `<X3>` 处,如果加锁失败,则递归调用 `#lockAsync(long leaseTime, TimeUnit unit, long threadId)`(自己)方法,继续执行异步加锁。
## 3.8 unlock
`#unlock()` 方法,同步**解锁**。代码如下:
```
// RedissonMultiLock.java
@Override
public void unlock() {
// 创建 RFuture 数组
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
// 逐个创建异步解锁 Future并添加到 futures 数组中
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
// 同步阻塞 futures ,全部释放完成
for (RFuture<Void> future : futures) {
future.syncUninterruptibly();
}
}
```
在 RedissonMultiLock 类中,存在一个 `#unlockInner(Collection<RLock> locks)` 方法,同步**解锁**指定 RLock 数组。代码如下:
```
// RedissonMultiLock.java
protected void unlockInner(Collection<RLock> locks) {
// 创建 RFuture 数组
List<RFuture<Void>> futures = new ArrayList<>(locks.size());
// 逐个创建异步解锁 Future并添加到 futures 数组中
for (RLock lock : locks) {
futures.add(lock.unlockAsync());
}
// 同步阻塞 futures ,全部释放完成
for (RFuture<Void> unlockFuture : futures) {
unlockFuture.awaitUninterruptibly();
}
}
```
在 RedissonMultiLock 类中,存在一个 `#unlockInner(Collection<RLock> locks)` 方法,异步**解锁**指定 RLock 数组。代码如下:
```
// RedissonMultiLock.java
protected RFuture<Void> unlockInnerAsync(Collection<RLock> locks, long threadId) {
// 如果 locks 为空,直接返回成功的 RFuture
if (locks.isEmpty()) {
return RedissonPromise.newSucceededFuture(null);
}
// 创建 RPromise 对象
RPromise<Void> result = new RedissonPromise<Void>();
// 创建 AtomicInteger 计数器,用于回调逻辑的计数,从而判断是不是所有回调都执行完了
AtomicInteger counter = new AtomicInteger(locks.size());
// 遍历 acquiredLocks 数组,逐个解锁
for (RLock lock : locks) {
lock.unlockAsync(threadId).onComplete((res, e) -> {
// 如果发生异常,则通过 result 回调异常
if (e != null) {
result.tryFailure(e);
return;
}
// 如果全部成功,则通过 result 回调解锁成功
if (counter.decrementAndGet() == 0) {
result.trySuccess(null);
}
});
}
return result;
}
```
## 3.9 未实现的方法
在 RedissonMultiLock 中,因为一些方法暂时没必要实现,所以就都未提供。如下:
```
// RedissonMultiLock.java
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> forceUnlockAsync() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Integer> getHoldCountAsync() {
throw new UnsupportedOperationException();
}
@Override
public String getName() {
throw new UnsupportedOperationException();
}
@Override
public boolean forceUnlock() {
throw new UnsupportedOperationException();
}
@Override
public boolean isLocked() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> isLockedAsync() {
throw new UnsupportedOperationException();
}
@Override
public boolean isHeldByThread(long threadId) {
throw new UnsupportedOperationException();
}
@Override
public boolean isHeldByCurrentThread() {
throw new UnsupportedOperationException();
}
@Override
public int getHoldCount() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Long> remainTimeToLiveAsync() {
throw new UnsupportedOperationException();
}
@Override
public long remainTimeToLive() {
throw new UnsupportedOperationException();
}
```
# 666. 彩蛋
一开始,以为 RedLock 红锁的代码会比较复杂,所以在撸这块的源码时,有点懵逼。一度计划,准备花小 1 天的时间来研究和输出这篇博客。结果发现,竟然是个纸老虎,哈哈哈。
😈 所以把,碰到任何源码,都不要怂。该肝就肝!
爽,在 2019-10-05 的 01:30 写完了这篇博客,美滋滋。