code-learning/redis/04-Redisson 源码分析-可重入分布式锁 ReentrantLock.md

1728 lines
72 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock
# 1. 概述
在 Redisson 中,提供了 8 种分布锁的实现,具体我们可以在 [《Redisson 文档 —— 分布式锁和同步器》](https://github.com/redisson/redisson/wiki/8.-分布式锁和同步器) 中看到。绝大数情况下我们使用可重入锁Reentrant Lock就够了对应到就是 [`org.redisson.RedissonLock`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/RedissonLock.java) 类,具体的使用示例可以看看 [《芋道 Spring Boot Redis 入门》](http://www.iocoder.cn/Spring-Boot/Redis/?vip) 的「6.2 Redis 分布式锁」小节 。
在 [《精尽 Redis 面试题》](http://svip.iocoder.cn/Redis/Interview/?self) 的问题中,我们在聊到**“如何使用 Redis 实现分布式锁?”**这个题目中,提到了需要考虑的 7 个方面,这里我们再来重复看下:
- 1、正确的获得锁
> set 指令附带 nx 参数,保证有且只有一个进程获得到。
- 2、正确的释放锁
> 使用 Lua 脚本,比对锁持有的是不是自己。如果是,则进行删除来释放。
- 3、超时的自动释放锁
> set 指令附带 expire 参数,通过过期机制来实现超时释放。
- 4、未获得到锁的等待机制
> sleep 或者基于 Redis 的订阅 Pub/Sub 机制。
>
> 一些业务场景,可能需要支持获得不到锁,直接返回 false ,不等待。
- 5、【可选】锁的重入性
> 通过 ThreadLocal 记录是第几次获得相同的锁。
>
> 1有且第一次计数为 1 && 获得锁时,才向 Redis 发起获得锁的操作。
> 2有且计数为 0 && 释放锁时,才向 Redis 发起释放锁的操作。
- 6、锁超时的处理
> 一般情况下,可以考虑告警 + 后台线程自动续锁的超时时间。通过这样的机制,保证有且仅有一个线程,正在持有锁。
- 7、Redis 分布式锁丢失问题
> 具体看「方案二Redlock」。
RedissonLock 实现了前 6 点,而第 7 点需要通过 [`org.redisson.RedissonRedLock`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/RedissonRedLock.java) 来实现,这个话题,我们后面在聊。
在开始阅读源码之前,胖友可以先看看艿艿画的 Redisson 实现分布式锁的整体流程图,以便更好的阅读源码。[猛击传送门](https://www.processon.com/view/link/5f4c871d079129356ec6f4d7)
# 2. 整体一览
我们来看看 Redisson 锁相关的整体类图,如下:
[![RLock 接口](04-Redisson 源码分析-可重入分布式锁 ReentrantLock.assets/01.png)](http://www.iocoder.cn/images/Redis/2019_10_04/01.png)RLock 接口
- [`org.redisson.api.RLockAsync`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/api/RLockAsync.java) ,定义了异步操作的接口。
- [`org.redisson.api.RLock`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/api/RLock.java) ,继承 RLockAsync 的基础上定义了同步操作的接口。比较有意思的是RLock 同时实现继承 JDK 的 `java.util.concurrent.locks.Lock` 接口,从而符合 Java 的 Lock 的标准。
- 本文的主角 RedissonLock ,实现 RLock 接口,提供可重入的分布式锁实现。
- 其它的 RLock 实现的关系,胖友自己看图哈。
RLockAsync 和 RLock 定义的接口,差别就在于同步和异步,所以我们就只看看 RLock 接口。代码如下:
```
String getName();
// 锁定相关的接口方法,还有部分在 Lock 接口上
void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException;
void lock(long leaseTime, TimeUnit unit);
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
// 解锁相关的接口方法,还有部分在 Lock 接口上
boolean forceUnlock();
// 其它非关键方法
boolean isLocked();
boolean isHeldByThread(long threadId);
boolean isHeldByCurrentThread();
int getHoldCount();
long remainTimeToLive();
```
# 3. Lua 脚本
在我们看具体的代码实现,我们先来看核心的使用到的 Lua 脚本,方便我们后续更好的理解 RedissonLock 的实现。
## 3.1 tryLockInnerAsync
`#tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command)` 方法,实现**加锁**逻辑,并且支持**可重入性**。代码如下:
> FROM [《慢谈 Redis 实现分布式锁 以及 Redisson 源码解析》](https://crazyfzw.github.io/2019/04/15/distributed-locks-with-redis/)
>
> [![加锁流程图](04-Redisson 源码分析-可重入分布式锁 ReentrantLock.assets/a1ecddbd0ffc0da7e4f85ba6637c72ee.png)](http://static.iocoder.cn/a1ecddbd0ffc0da7e4f85ba6637c72ee)加锁流程图
```
// RedissonLock.java
1: <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
2: internalLockLeaseTime = unit.toMillis(leaseTime);
3:
4: return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
5: "if (redis.call('exists', KEYS[1]) == 0) then " + // 情况一,当前分布式锁被未被获得
6: "redis.call('hset', KEYS[1], ARGV[2], 1); " + // 写入分布锁被 ARGV[2] 获取到了,设置数量为 1 。
7: "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置分布式的过期时间为 ARGV[1]
8: "return nil; " + // 返回 null ,表示成功
9: "end; " +
10: "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 情况二,如果当前分布锁已经被 ARGV[2] 持有
11: "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 写入持有计数字 + 1 。
12: "redis.call('pexpire', KEYS[1], ARGV[1]); " + // 设置分布式的过期时间为 ARGV[1]
13: "return nil; " + // 返回 null ,表示成功
14: "end; " +
15: "return redis.call('pttl', KEYS[1]);", // 情况三,获取不到分布式锁,则返回锁的过期时间。
16: Collections.<Object>singletonList(getName()), // KEYS[分布式锁名]
17: internalLockLeaseTime, getLockName(threadId)); // ARGV[锁超时时间,获得的锁名]
18: }
```
- `<2>` 处,根据传入的 `leaseTime` + `unit` 参数,设置到 `internalLockLeaseTime` 属性上,表示锁的时长。代码如下:
```
// RedissonLock.java
/**
* 锁的时长
*/
protected long internalLockLeaseTime;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
// ... 省略其它代码
this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
}
```
- 默认情况下,`internalLockLeaseTime` 属性,使用 Lock 的 WatchDog 的超时时长 `30 * 1000` 毫秒。默认的值,当且仅当我们未**显示**传入锁的时长时,才有用。例如说,稍后我们会看到的 `#lock()` 等等方法中。
- 有一点,我们要特别注意,`internalLockLeaseTime` 是 RedissonLock 的**成员**变量,并且也未声明 `volatile` 修饰,所以跨线程使用同一个 RedissonLock 对象,可能会存在 `internalLockLeaseTime` 读取不到最新值的情况。
- 还是熟悉的配方,通过 Lua 脚本实现。具体传入的参数,朋友看下第 16 和 17 行的代码,对应的 `KEYS``ARGV` 。可能有几个值胖友会有点懵逼,我们先来看看。
- `KEYS[1]` :调用 `#getName()` 方法获得分布式锁的名字。稍后,我们会看到分布式锁在 Redis 使用是以 `KEYS[1]` 分布式锁为 KEY VALUE 为 HASH 类型。
- `ARGV[1]` :锁的时长。
- `ARGV[2]` :调用 `#getLockName(threadId)` 方法,获得的锁名。该名字,用于表示该分布式锁正在被哪个进程的线程所持有。代码如下:
```
// RedissonLock.java
/**
* ID ,就是 {@link ConnectionManager#getId()}
*/
final String id;
protected String getLockName(long threadId) {
return id + ":" + threadId;
}
```
- 可能描述看起来不是很好理解,我们来看一个获取到分布式锁的示例:[![分布式锁的示例](04-Redisson 源码分析-可重入分布式锁 ReentrantLock.assets/02.png)](http://www.iocoder.cn/images/Redis/2019_10_04/02.png)分布式锁的示例
- 第 4 至 15 行Lua 脚本,一共分成 3 种情况,胖友认真仔细看看,艿艿已经添加了完整的注释。
不同于我们在市面上看到的 Redis 通过 SET 命令带上 NX 和 EXPIRE 的方式实现获得分布式锁RedissonLock 提供重入性,所以需要 Lua 脚本来实现。当然,实际上,也可以通过 ThreadLocal 来实现重入性的技术,胖友可以思考一波,不懂的话星球来给艿艿留言。
## 3.2 unlockInnerAsync
`#unlockInnerAsync(long threadId)` 方法,实现**解锁**逻辑,并且支持**可重入性**。代码如下:
> FROM [《慢谈 Redis 实现分布式锁 以及 Redisson 源码解析》](https://crazyfzw.github.io/2019/04/15/distributed-locks-with-redis/)
>
> [![解锁流程图](04-Redisson 源码分析-可重入分布式锁 ReentrantLock.assets/bbc6ac47cd74c5cd2641486415438cf5.png)](http://static.iocoder.cn/bbc6ac47cd74c5cd2641486415438cf5)解锁流程图
```
// RedissonLock.java
1: protected RFuture<Boolean> unlockInnerAsync(long threadId) {
2: return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
3: "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + // 情况一,分布式锁未被 ARGV[3] 持有,则直接返回 null ,表示解锁失败。
4: "return nil;" +
5: "end; " +
6: "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + // 持有锁的数量减 1 。
7: "if (counter > 0) then " + // 情况二,如果后还有剩余的持有锁数量,则返回 0 ,表示解锁未完成
8: "redis.call('pexpire', KEYS[1], ARGV[2]); " + // 重新设置过期时间为 ARGV[2]
9: "return 0; " +
10: "else " + // 情况三,不存在剩余的锁数量,则返回 1 ,表示解锁成功
11: "redis.call('del', KEYS[1]); " + // 删除对应的分布式锁对应的 KEYS[1]
12: "redis.call('publish', KEYS[2], ARGV[1]); " + // 发布解锁事件到 KEYS[2] ,通知气他可能要获取锁的线程
13: "return 1; "+
14: "end; " +
15: "return nil;", // 不存在这个情况。
16: Arrays.<Object>asList(getName(), getChannelName()), // KEYS[分布式锁名, 该分布式锁对应的 Channel 名]
17: LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); // ARGV[解锁消息,锁超时时间,获得的锁名]
18: }
```
- 具体传入的参数,朋友看下第 16 和 17 行的代码,对应的 `KEYS` 和 `ARGV` 。
- `KEYS[1]` :调用 `#getName()` 方法获得分布式锁的名字。
- `KEYS[2]` :调用 `#getChannelName()` 方法,该分布式锁对应的 Channel 名。因为 RedissonLock 释放锁时,会通过该 Channel 来 Publish 一条消息,通知其它可能在阻塞等待这条消息的客户端。代码如下:
```
// RedissonLock.java
String getChannelName() {
return prefixName("redisson_lock__channel", getName());
}
```
- 通过 [Redis Pub/Sub](http://redis.cn/topics/pubsub.html) 机制,实现**未获得到锁的等待机制**。
- 每个分布式锁,对应一个其独有的 Channel 。
- `ARGV[1]` :解锁消息 [`LockPubSub.UNLOCK_MESSAGE`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java) 。通过收到这条消息,其它等待锁的客户端,会重新发起获得锁的请求。具体的,我们在下文来一起瞅瞅。
- `ARGV[2]` :锁的时长。
- `ARGV[3]` :调用 `#getLockName(threadId)` 方法,获得的锁名。
- 第 3 至 15 行Lua 脚本,还是分成 3 种情况,胖友认真仔细看看,艿艿已经添加了完整的注释。
不同于我们在市面上看到的 Redis 通过 Lua 脚本的方式实现释放分布式锁,一共有 2 点:
- 1、要实现**可重入性**,所以只有在计数为 0 时,才会真正释放锁。
- 2、要实现客户端的**等待通知**所以在释放锁时Publish 一条释放锁的消息。
## 3.3 forceUnlockAsync
`#forceUnlockAsync()` 方法,实现**强制解锁**逻辑。代码如下:
```
// RedissonLock.java
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal(null);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('del', KEYS[1]) == 1) then " // 情况一,释放锁成功,则通过 Publish 发布释放锁的消息,并返回 1
+ "redis.call('publish', KEYS[2], ARGV[1]); "
+ "return 1 "
+ "else " // 情况二,释放锁失败,因为不存在这个 KEY ,所以返回 0
+ "return 0 "
+ "end",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.UNLOCK_MESSAGE);
}
```
- 比较简单,分成 2 种情况。胖友认真仔细看看,艿艿已经添加了完整的注释。
代码处理的比较细致Redis DEL 成功,才 PUBLISH 发布释放锁的消息,避免错误通知客户端。
## 3.4 renewExpirationAsync
`#renewExpirationAsync(long threadId)` 方法,实现**续锁**逻辑。可能胖友有点懵逼,至少艿艿看到这段逻辑,完全不知道为何意啊。
我们先来看下 [《Redisson 文档 —— 分布式锁和同步器》](https://github.com/redisson/redisson/wiki/8.-分布式锁和同步器) ,有一段奇怪的说明:
```
RLock lock = redisson.getLock("anyLock");
// 最常见的使用方法
lock.lock();
```
> 大家都知道如果负责储存这个分布式锁的Redisson节点宕机以后而且这个锁正好处于锁住的状态时这个锁会出现锁死的状态。为了避免这种情况的发生Redisson内部提供了一个监控锁的看门狗它的作用是在Redisson实例被关闭前不断的延长锁的有效期。默认情况下看门狗的检查锁的超时时间是30秒钟也可以通过修改 [Config.lockWatchdogTimeout](https://github.com/redisson/redisson/wiki/2.-配置方法#lockwatchdogtimeout监控锁的看门狗超时单位毫秒) 来另行指定。
在使用 `RedissonLock#lock()` 方法,我们要求**持续**持有锁,直到**手动**释放。但是实际上,我们有一个隐藏条件,如果 Java 进程挂掉时,需要**自动**释放。那么,如果实现 `RedissonLock#lock()` 时,设置过期 Redis 为无限大,或者不过期都不合适。那么 RedissonLock 是怎么实现的呢RedissonLock 先获得一个 `internalLockLeaseTime` 的分布式锁,然后每 `internalLockLeaseTime / 3` 时间,定时调用 `#renewExpirationAsync(long threadId)` 方法,进行续租。这样,在 Java 进程异常 Crash 掉后,能够保证最多 `internalLockLeaseTime` 时间后,分布式锁自动释放。
略骚略巧妙~不过为了实现这样的功能RedissonLock 的整体逻辑,又复杂了一丢丢。
下面,还是先让我们看下具体的 `#renewExpirationAsync(long threadId)` 方法的代码,如下:
```
// RedissonLock.java
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + // 情况一,如果持有锁,则重新设置过期时间为 ARGV[1] internalLockLeaseTime ,并返回 1 续租成功。
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;", // 情况二,未吃货有,返回 0 续租失败。
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
```
- 比较简单,分成 2 种情况。胖友认真仔细看看,艿艿已经添加了完整的注释。
------
至此,我们看完了 Lua 脚本部分,其实基本也大体知道 RedissonLock 是如何实现加锁、接锁的逻辑。但是,复杂的逻辑,还在下面,胖友请保持好耐心,开启我们的高能时刻。
> 艿艿T T RedissonLock 好多重载的方法,文章小标题,都不造杂取了。最关键的是,可能胖友会被绕进去。
# 4. LockPubSub
在开始研究真正的加锁和解锁的调用之前,我们先看看和其相关的客户端订阅解锁消息,从而实现在持有锁的客户端释放锁时,等待锁的客户端能够快速的去调用加锁逻辑。
> 艿艿:😭 整个调用栈太深的,艿艿只好先写它,就当卖了一个关子,哈哈哈。
[`org.redisson.pubsub.LockPubSub`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/pubsub/LockPubSub.java) ,继承 PublishSubscribe 抽象类,实现 Lock 相关消息的订阅。代码如下:
```
// LockPubSub.java
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {
/**
* 锁释放的消息
*/
public static final Long UNLOCK_MESSAGE = 0L;
/**
* 读锁释放的消息
*/
public static final Long READ_UNLOCK_MESSAGE = 1L;
public LockPubSub(PublishSubscribeService service) {
super(service);
}
@Override
protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
return new RedissonLockEntry(newPromise);
}
@Override
protected void onMessage(RedissonLockEntry value, Long message) {
if (message.equals(UNLOCK_MESSAGE)) {
// 回调监听器
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute != null) {
runnableToExecute.run();
}
// 通过信号量,通知阻塞等待的线程
value.getLatch().release();
} else if (message.equals(READ_UNLOCK_MESSAGE)) {
while (true) {
Runnable runnableToExecute = value.getListeners().poll();
if (runnableToExecute == null) {
break;
}
runnableToExecute.run();
}
value.getLatch().release(value.getLatch().getQueueLength());
}
}
}
```
- 在 `#createEntry(RPromise<RedissonLockEntry> newPromise)` 方法,会创建 [`org.redisson.RedissonLockEntry`](https://github.com/YunaiV/redisson/blob/master/redisson/src/main/java/org/redisson/RedissonLockEntry.java) 对象。代码如下:
```
// RedissonLockEntry.java
public class RedissonLockEntry implements PubSubEntry<RedissonLockEntry> {
/**
* 计数器
*
* 每次发起订阅,则计数器 + 1
* 每次取消订阅,则计数器 - 1 。当减少到 0 时,才正常取消订阅。
*/
private int counter;
/**
* 信号量,用于实现 RedissonLock 阻塞等待的通知
*/
private final Semaphore latch;
private final RPromise<RedissonLockEntry> promise;
/**
* 监听器们
*/
private final ConcurrentLinkedQueue<Runnable> listeners = new ConcurrentLinkedQueue<Runnable>();
public RedissonLockEntry(RPromise<RedissonLockEntry> promise) {
super();
this.latch = new Semaphore(0);
this.promise = promise;
}
@Override
public void aquire() {
counter++;
}
@Override
public int release() {
return --counter;
}
@Override
public RPromise<RedissonLockEntry> getPromise() {
return promise;
}
public void addListener(Runnable listener) {
listeners.add(listener);
}
public boolean removeListener(Runnable listener) {
return listeners.remove(listener);
}
public ConcurrentLinkedQueue<Runnable> getListeners() {
return listeners;
}
public Semaphore getLatch() {
return latch;
}
}
```
- 虽然代码比较多,我们重点来看 `latch` 和 `listeners` 属性。
- `latch` 属性:信号量,用于实现 RedissonLock 阻塞等待的通知。在我们下面看到**同步**加锁的逻辑,会看到通过它来实现阻塞等待。
- `listeners` 属性:监听器,实现订阅到锁的释放消息,从而再次发起获得锁。当然,这里的 Runnable 对象肯定无法体现,具体我们后面看看 `#tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)` 或者 `#lockAsync(long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId)` 方法,就可以看到方法内部会创建具体的 Runnable 实现类,实现再次发起获得锁的逻辑。
- 在 `#onMessage(RedissonLockEntry value, Long message)` 方法中,在接收到释放锁的消息后,会执行 `listeners` 的回调,以及 `latch` 的时候放。
当然,单单看 LockPubSub 类胖友可能会感到懵逼保持耐心继续向下。LockPubSub 更多的是实现了锁释放消息的监听,以及回调监听器,释放信号量。**真正的逻辑,还是要看监听器的逻辑,以及 RedissonLock 是怎么实现信号量的**。
另外,在 RedissonLock 中,提供如下几个方法,发起和取消订阅。代码如下:
```
// RedissonLock.java
/**
* Sub Entry 名字
*/
final String entryName;
protected final LockPubSub pubSub;
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
// ... 省略其他无关
this.entryName = id + ":" + name;
this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}
/**
* 获得线程对应的 RedissonLockEntry 对象
*
* @param threadId 线程编号
* @return RedissonLockEntry 对象
*/
protected RedissonLockEntry getEntry(long threadId) {
return pubSub.getEntry(getEntryName());
}
/**
* 异步发起订阅
*
* @param threadId 线程编号
* @return RFuture 对象
*/
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return pubSub.subscribe(getEntryName(), getChannelName());
}
/**
* 异步取消订阅
*
* @param future RFuture 对象
* @param threadId 线程编号
*/
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
pubSub.unsubscribe(future.getNow(), getEntryName(), getChannelName());
}
```
# 5. tryLockAsync
> 艿艿:重点开始了,打起精神。
`#tryLockAsync(long waitTime, TimeUnit unit)` 方法,异步加锁,并返回是否成功。代码如下:
```
// RedissonLock.java
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, TimeUnit unit) {
return tryLockAsync(waitTime, -1, unit);
}
@Override
public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit) {
// 获得线程编号
long currentThreadId = Thread.currentThread().getId();
// 执行锁
return tryLockAsync(waitTime, leaseTime, unit, currentThreadId);
}
```
- 最终都调用 `#tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long currentThreadId)` 方法,真正实现异步加锁的逻辑。
`#tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long currentThreadId)` 方法,代码如下:
```
// RedissonLock.java
1: @Override
2: public RFuture<Boolean> tryLockAsync(long waitTime, long leaseTime, TimeUnit unit, long currentThreadId) {
3: // 创建 RPromise 对象,用于通知结果
4: RPromise<Boolean> result = new RedissonPromise<Boolean>();
5:
6: // 表示剩余的等待获得锁的时间
7: AtomicLong time = new AtomicLong(unit.toMillis(waitTime));
8: // 记录当前时间
9: long currentTime = System.currentTimeMillis();
10: // 执行异步获得锁
11: RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
12: ttlFuture.onComplete((ttl, e) -> {
13: // 如果发生异常,则通过 result 通知异常
14: if (e != null) {
15: result.tryFailure(e);
16: return;
17: }
18:
19: // lock acquired
20: // 如果获得到锁,则通过 result 通知获得锁成功
21: if (ttl == null) {
22: if (!result.trySuccess(true)) { // 如果处理 result 通知对结果返回 false ,意味着需要异常释放锁
23: unlockAsync(currentThreadId);
24: }
25: return;
26: }
27:
28: // 减掉已经等待的时间
29: long el = System.currentTimeMillis() - currentTime;
30: time.addAndGet(-el);
31:
32: // 如果无剩余等待的时间,则通过 result 通知获得锁失败
33: if (time.get() <= 0) {
34: trySuccessFalse(currentThreadId, result);
35: return;
36: }
37:
38: // 记录新的当前时间
39: long current = System.currentTimeMillis();
40: // 记录下面的 future 的指向
41: AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
42:
43: // 创建 SUBSCRIBE 订阅的 Future
44: RFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);
45: subscribeFuture.onComplete((r, ex) -> {
46: // 如果发生异常,则通过 result 通知异常
47: if (ex != null) {
48: result.tryFailure(ex);
49: return;
50: }
51:
52: // 如果创建定时任务 Future scheduledFuture则进行取消
53: if (futureRef.get() != null) {
54: futureRef.get().cancel();
55: }
56:
57: // 减掉已经等待的时间
58: long elapsed = System.currentTimeMillis() - current;
59: time.addAndGet(-elapsed);
60:
61: // 再次执行异步获得锁
62: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
63: });
64:
65: // 如果创建 SUBSCRIBE 订阅的 Future 未完成,创建定时任务 Future scheduledFuture 。
66: if (!subscribeFuture.isDone()) {
67: Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
68: @Override
69: public void run(Timeout timeout) throws Exception {
70: // 如果创建 SUBSCRIBE 订阅的 Future 未完成
71: if (!subscribeFuture.isDone()) {
72: // 进行取消 subscribeFuture
73: subscribeFuture.cancel(false);
74: // 通过 result 通知获得锁失败
75: trySuccessFalse(currentThreadId, result);
76: }
77: }
78: }, time.get(), TimeUnit.MILLISECONDS); // 延迟 time 秒后执行
79: // 记录 futureRef 执行 scheduledFuture
80: futureRef.set(scheduledFuture);
81: }
82: });
83:
84: return result;
85: }
```
- 整体逻辑是,获得分布锁。如果获取失败,则发起 Redis Pub/Sub 订阅,等待释放锁的消息,从而再次发起获得分布式锁。
- 第 11 行:调用 `#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)` 方法,执行异步获得锁。详细解析,胖友先跳到 [「5.1 tryAcquireAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 中。
- 继续开始我们“漫长”的回调之旅。其实也比较容易懂,走起~
- 第 13 至 17 行:如果发生异常,则通过 `result` 通知异常。
- 第 19 至 26 行:如果 `ttl` 为空,说明获得到锁了,则通过 `result` 通知获得锁成功。这里,在第 23 至 24 行有个小细节,胖友自己看下注释。
- 第 41 行:声明 `futureRef` 变量,用于设置第 65 至 81 行创建的定时任务。
- 第 65 至 82 行:如果创建 SUBSCRIBE 订阅的 Future `subscribeFuture` 未完成,创建定时任务 Future `scheduledFuture` 。因为 `subscribeFuture` 是异步的,而存在一个情况,可能 `subscribeFuture` 未完成时,等待获得锁已经超时,所以通过 `scheduledFuture` 来实现超时通知。
- 第 80 行:记录 `futureRef` 为 `scheduledFuture` 。
- 第 71 行:兜底判断 `subscribeFuture` 未完成。
- 第 73 行:进行取消 `subscribeFuture` 。
- 第 75 行:调用 `#trySuccessFalse(long currentThreadId, RPromise<Boolean> result)` 方法,通知获得锁失败。代码如下:
```
// RedissonLock.java
protected RFuture<Void> acquireFailedAsync(long threadId) {
return RedissonPromise.newSucceededFuture(null);
}
private void trySuccessFalse(long currentThreadId, RPromise<Boolean> result) {
acquireFailedAsync(currentThreadId).onComplete((res, e) -> {
if (e == null) { // 通知获得锁失败
result.trySuccess(false);
} else { // 通知异常
result.tryFailure(e);
}
});
}
```
- x
- 第 43 至 63 行:创建 SUBSCRIBE 订阅的 Future `subscribeFuture` 。通过订阅释放锁的消息,从而实现等待锁释放的客户端,快速抢占加锁。
- 第 46 至 50 行:如果发生异常,则通过 `result` 通知异常。
- 第 52 至 55 行:如果创建定时任务 Future `scheduledFuture`,则进行取消。
- 第 57 至 59 行:减掉已经等待的时间。
- 第 62 行:调用 `#tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId` 方法,再次执行异步获得锁。详细解析,见 [「5.2 更强的 tryLockAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 小节。
> 艿艿:特喵的,又是一个 tryLockAsync 重载的方法,我已经瞎取标题了。深呼吸,继续!
感叹想要写好全异步的代码实际是非常困难的所以艿艿的感受Spring Webflux 反应式框架想要推广在编写业务逻辑基本可能性是为零。当然Webflux 乃至反应式编程,更加适合推广在基础组件中。
## 5.1 tryAcquireAsync
> 艿艿看完这个方法就跳回去哈。MMP 整个调用链,真长,大几百行代码。
`#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)` 方法,执行异步获得锁。代码如下:
```
// RedissonLock.java
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
// <1> 情况一,如果锁有时长,则直接获得分布式锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// <2> 情况二,如果锁无时长,则先获得 Lock WatchDog 的锁超时时长
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 如果发生异常,则直接返回
if (e != null) {
return;
}
// lock acquired
// 如果获得到锁,则创建定时任务,定时续锁
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
```
- 一共分成两种情况,是否锁有时长。
- `<1>` 处,`leaseTime != -1` ,意味着锁设置了时长,则调用 [「3.1 `#tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,直接获得分布式锁。
- `<2>` 处,锁未设置了时长,所以先调用 [「3.1 `#tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,获得 **Lock WatchDog 的锁超时时长**的分布式锁,然后在回调中,再调用 `#scheduleExpirationRenewal(long threadId)` 方法,创建定时任务,定时调用 [「3.4 renewExpirationAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) **续锁**。详细解析,见 TODO 。
## 5.2 更强的 tryLockAsync
`#tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId)` 方法,**更强**的异步加锁。主要强在 2 点:
- 1、增加监听锁释放的消息的监听器从而实现等待锁的客户端快速抢占锁的逻辑。
- 2、增加锁超时自动释放没有锁释放消息的处理。
整体代码如下:
```
// RedissonLock.java
1: private void tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Boolean> result, long currentThreadId) {
2: // 如果 result 已经完成,则直接返回,并取消订阅
3: if (result.isDone()) {
4: unsubscribe(subscribeFuture, currentThreadId);
5: return;
6: }
7:
8: // 如果剩余时间 time 小于 0 ,说明等待超时,则取消订阅,并通过 result 通知失败
9: if (time.get() <= 0) {
10: unsubscribe(subscribeFuture, currentThreadId);
11: trySuccessFalse(currentThreadId, result);
12: return;
13: }
14:
15: // 记录当前时间
16: long curr = System.currentTimeMillis();
17: // 获得分布式锁
18: RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
19: ttlFuture.onComplete((ttl, e) -> {
20: // 如果发生异常,则取消订阅,并通过 result 通知异常
21: if (e != null) {
22: unsubscribe(subscribeFuture, currentThreadId);
23: result.tryFailure(e);
24: return;
25: }
26:
27: // lock acquired
28: // 如果获得到锁,则取消订阅,并通过 result 通知获得锁成功
29: if (ttl == null) {
30: unsubscribe(subscribeFuture, currentThreadId);
31: if (!result.trySuccess(true)) {
32: unlockAsync(currentThreadId);
33: }
34: return;
35: }
36:
37: // 减掉已经等待的时间
38: long el = System.currentTimeMillis() - curr;
39: time.addAndGet(-el);
40:
41: // 如果无剩余等待的时间,则取消订阅,并通过 result 通知获得锁失败
42: if (time.get() <= 0) {
43: unsubscribe(subscribeFuture, currentThreadId);
44: trySuccessFalse(currentThreadId, result);
45: return;
46: }
47:
48: // waiting for message
49: // 记录新的当前时间
50: long current = System.currentTimeMillis();
51: // 获得当前线程对应的 RedissonLockEntry 对象
52: RedissonLockEntry entry = getEntry(currentThreadId);
53: // 尝试获得 entry 中的信号量,如果获得成功,说明 SUBSCRIBE 已经收到释放锁的消息,则直接立马再次去获得锁。
54: if (entry.getLatch().tryAcquire()) {
55: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
56: } else {
57: // 创建 AtomicBoolean 变量 executed ,用于标记下面创建的 listener 是否执行。
58: AtomicBoolean executed = new AtomicBoolean();
59: // 创建 AtomicReference 对象,用于指向定时任务
60: AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
61:
62: // 创建监听器 listener ,用于在 RedissonLockEntry 的回调,就是我们看到的 PublishSubscribe 监听到释放锁的消息,进行回调。
63: Runnable listener = () -> {
64: // 标记已经执行
65: executed.set(true);
66: // 如果有定时任务的 Future ,则进行取消
67: if (futureRef.get() != null) {
68: futureRef.get().cancel();
69: }
70:
71: // 减掉已经等待的时间
72: long elapsed = System.currentTimeMillis() - current;
73: time.addAndGet(-elapsed);
74:
75: // 再次获得分布式锁
76: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
77: };
78: // 添加 listener 到 RedissonLockEntry 中
79: entry.addListener(listener);
80:
81: // 下面,会创建一个定时任务。因为极端情况下,可能不存在释放锁的消息,例如说锁自动超时释放,所以需要改定时任务,在获得到锁的超时后,主动去抢下。
82: long t = time.get();
83: if (ttl >= 0 && ttl < time.get()) { // 如果剩余时间小于锁的超时时间,则使用剩余时间。
84: t = ttl;
85: }
86: // 如果 listener 未执行
87: if (!executed.get()) {
88: Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
89: @Override
90: public void run(Timeout timeout) throws Exception {
91: // 移除 listener 从 RedissonLockEntry 中
92: if (entry.removeListener(listener)) {
93: // 减掉已经等待的时间
94: long elapsed = System.currentTimeMillis() - current;
95: time.addAndGet(-elapsed);
96:
97: // 再次获得分布式锁
98: tryLockAsync(time, leaseTime, unit, subscribeFuture, result, currentThreadId);
99: }
100: }
101: }, t, TimeUnit.MILLISECONDS);
102: // 记录 futureRef 执行 scheduledFuture
103: futureRef.set(scheduledFuture);
104: }
105: }
106: });
107: }
```
- 第 2 至 46 行:和 [「5. tryLockAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 基本一致,就不重复哔哔了。
- 第 52 行:调用 `#getEntry(long threadId)` 方法,获得当前线程对应的 RedissonLockEntry 对象。此处有点“失忆”的胖友,看看 [「4. LockPubSub」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 的结尾。
- 第 53 至 55 行:尝试获得 `entry` 中的信号量,如果获得成功,说明 SUBSCRIBE 已经收到释放锁的消息,则调用 [「5.2 `##tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture subscribeFuture, RPromise result, long currentThreadId)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,直接立马再次去获得锁。
- 第 58 行:创建 AtomicBoolean 变量 `executed` ,用于标记下面创建的 `listener` 是否执行。
- 第 60 行:声明
```
futureRef
```
变量,用于设置第 87 至 104 行创建的定时任务。
因为极端情况下,可能不存在释放锁的消息,例如说锁自动超时释放,所以需要改定时任务,在获得到锁的超时后,主动去抢下。
- 第 82 至 85 行:计算定时任务的延迟时间时间。如果剩余时间小于锁的超时时间,则使用剩余时间。
- 第 87 行:通过 `executed` 变量,判断 `listener` 未执行。
- 第 103 行: 记录 `futureRef` 为 `scheduledFuture` 。
- 第 92 行:移除 `listener` 从 RedissonLockEntry 中。避免,可能存在的并发执行。
- 第 98 行:调用 [「5.2 `##tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture subscribeFuture, RPromise result, long currentThreadId)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,再次去获得锁。
- 这个定时任务,真的处理的是细节中的细节。之前思考获得分布式失败客户端的等待通知,只考虑了 Redis Pub/Sub 机制来实现,没有想到如果没有 PUBLISH 消息的场景。这块的逻辑,算是看 RedissonLock 最大的收获吧。
- 第 62 至 79 行:创建监听器
```
listener
```
,用于在 RedissonLockEntry 的回调,就是我们看到的 PublishSubscribe 监听到释放锁的消息,进行回调。
- 第 79 行:添加 `listener` 到 RedissonLockEntry 中。😈 如果胖友又“失忆”了,调回到 [「4. LockPubSub」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 再瞅瞅。
- 第 65 行:通过 `executed` 标记已经执行。
- 第 66 至 69 行:如果有定时任务的 Future ,则进行取消。
- 第 71 至 74 行:减掉已经等待的时间。
- 第 76 行:调用 [「5.2 `##tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture subscribeFuture, RPromise result, long currentThreadId)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,再次去获得锁。
------
至此RedissonLock 加锁的逻辑我们已经全部看完。如果觉得略感迷糊的胖友,可以多多调试下。因为艿艿有点偷懒,未画一些图来辅助胖友理解,所以胖友可以自己画一画,嘿嘿。
## 5.3 遗漏的 tryLockAsync
还有两个重载的 `#tryLockAsync(...)` 方法,它们是未设置锁定时长的两个。代码如下:
```
// RedissonLock.java
@Override
public RFuture<Boolean> tryLockAsync() {
return tryLockAsync(Thread.currentThread().getId());
}
@Override
public RFuture<Boolean> tryLockAsync(long threadId) {
return tryAcquireOnceAsync(-1, null, threadId);
}
```
- 最终都调用 `#tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId)` 方法,真正实现异步加锁的逻辑。
`#tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId)` 方法,真正实现异步加锁的逻辑。代码如下:
```
// RedissonLock.java
private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
// 情况一,如果锁有时长,则直接获得分布式锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
// 情况二,如果锁无时长,则先获得 Lock WatchDog 的锁超时时长
RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
// 如果发生异常,则直接返回
if (e != null) {
return;
}
// lock acquired
// 如果获得到锁,则创建定时任务,定时续锁
if (ttlRemaining) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
```
- 看到这个方法,是不是发现很熟悉,和 [「5.1 tryAcquireAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 基本一模一样。差别在于它的返回的结果是 `RFuture<Boolean>` 。
- 有一点要特别注意,因为本小节我们看到的两个 `#tryLockAsync(...)` 方法,是尝试去加锁。如果加锁失败,则返回 `false` 即可,所以不会像我们在 [「5.1 tryLockAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,无限重试直到等待超时(超过 `waitTime`)。
# 6. tryLock
> 艿艿:本小节和 [「5. tryLockAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 相对,为**同步**加锁。扶你起来,胖友还可以继续怼源码。
`#tryLock(long waitTime, long leaseTime, TimeUnit unit)` 方法,同步加锁,并返回是否成功。。代码如下:
```
// RedissonLock.java
@Override
public boolean tryLock(long waitTime, TimeUnit unit) throws InterruptedException {
return tryLock(waitTime, -1, unit);
}
1: @Override
2: public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
3: long time = unit.toMillis(waitTime);
4: long current = System.currentTimeMillis();
5: long threadId = Thread.currentThread().getId();
6: // 同步获加锁
7: Long ttl = tryAcquire(leaseTime, unit, threadId);
8: // lock acquired
9: // 加锁成功,直接返回 true 加锁成功
10: if (ttl == null) {
11: return true;
12: }
13:
14: // 减掉已经等待的时间
15: time -= System.currentTimeMillis() - current;
16: // 如果无剩余等待的时间,则返回 false 加锁失败
17: if (time <= 0) {
18: acquireFailed(threadId);
19: return false;
20: }
21:
22: // 记录新的当前时间
23: current = System.currentTimeMillis();
24: // 创建 SUBSCRIBE 订阅的 Future
25: RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
26: // 阻塞等待订阅发起成功
27: if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
28: // 进入到此处,说明阻塞等待发起订阅超时
29: // 取消 SUBSCRIBE 订阅
30: if (!subscribeFuture.cancel(false)) {
31: // 进入到此处,说明取消发起订阅失败,则通过设置回调,在启发订阅完成后,回调取消 SUBSCRIBE 订阅
32: subscribeFuture.onComplete((res, e) -> {
33: if (e == null) {
34: unsubscribe(subscribeFuture, threadId);
35: }
36: });
37: }
38: // 等待超时,则返回 false 加锁失败
39: acquireFailed(threadId);
40: return false;
41: }
42:
43: try {
44: // 减掉已经等待的时间
45: time -= System.currentTimeMillis() - current;
46: // 如果无剩余等待的时间,则返回 false 加锁失败
47: if (time <= 0) {
48: acquireFailed(threadId);
49: return false;
50: }
51:
52: while (true) {
53: // 记录新的当前时间
54: long currentTime = System.currentTimeMillis();
55: // 同步获加锁
56: ttl = tryAcquire(leaseTime, unit, threadId);
57: // lock acquired
58: // 加锁成功,直接返回 true 加锁成功
59: if (ttl == null) {
60: return true;
61: }
62:
63: // 减掉已经等待的时间
64: time -= System.currentTimeMillis() - currentTime;
65: // 如果无剩余等待的时间,则返回 false 加锁失败
66: if (time <= 0) {
67: acquireFailed(threadId);
68: return false;
69: }
70:
71: // waiting for message
72: // 记录新的当前时间
73: currentTime = System.currentTimeMillis();
74:
75: // 通过 RedissonLockEntry 的信号量,阻塞等待锁的释放消息,或者 ttl/time 超时(例如说,锁的自动超时释放)
76: if (ttl >= 0 && ttl < time) {
77: getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
78: } else {
79: getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
80: }
81:
82: // 减掉已经等待的时间
83: time -= System.currentTimeMillis() - currentTime;
84: // 如果无剩余等待的时间,则返回 false 加锁失败
85: if (time <= 0) {
86: acquireFailed(threadId);
87: return false;
88: }
89: }
90: } finally {
91: // 小细节,需要最终取消 SUBSCRIBE 订阅
92: unsubscribe(subscribeFuture, threadId);
93: }
94: // return get(tryLockAsync(waitTime, leaseTime, unit));
95: }
```
- 第 7 行:调用 `#tryAcquire(long leaseTime, TimeUnit unit, long threadId)` 方法,同步加锁。代码如下:
```
// RedissonLock.java
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
```
- 该方法内部,调用的就是 [「5.1 `#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法。
- 第 8 至 12 行:加锁成功,直接返回 `true` 加锁成功。
- 第 15 行:减掉已经等待的时间。
- 第 17 至 20 行:如果无剩余等待的时间,则返回 `false` 加锁失败。
- 第 25 行:调用 `#subscribe(long threadId)` 方法,创建 SUBSCRIBE 订阅的 Future `subscribeFuture` 。
- 【重要差异点】第 27 至 41 行:调用 `#await(subscribeFuture, time, TimeUnit.MILLISECONDS)` 方法,阻塞等待订阅发起成功,因为 `subscribeFuture` 是异步的,需要这一步转同步。如果发生超时,则就会进入第 28 至 37 行的取消逻辑,并在第 38 至 40 行返回 `false` 加锁失败。
- 第 52 至 89 行:反复重试,直到成功加锁返回
```
true
```
,或者超时返回
```
false
```
- 第 54 至 73 行:重试一波第 6 至 20 行的逻辑。
- 【重要差异点】第 75 至 80 行:通过 RedissonLockEntry 的信号量,阻塞等待锁的释放消息,或者
```
ttl
```
/
```
time
```
超时(例如说,锁的自动超时释放)。
- 相比 [「5.2 `#tryLockAsync(AtomicLong time, long leaseTime, TimeUnit unit, RFuture subscribeFuture, RPromise result, long currentThreadId)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,它把信号量的等待和定时任务的等待融合在一起了。
- 等待完成后,如果无剩余时间,在第 82 至 88 行的逻辑中,返回 `false` 加锁失败。
- 等待完成后,如果有剩余时间,在第 56 行:获得重新同步获得锁。
- 第 92 行:调用 `#unsubscribe(RFuture<RedissonLockEntry> future, long threadId)` 方法,小细节,需要最终取消 SUBSCRIBE 订阅。
# 7. lockAsync
> 艿艿:本小节和 [「5. tryLockAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 相似,为**异步**加锁。继续扶你起来,胖友还可以继续怼源码。
`#lockAsync(long leaseTime, TimeUnit unit, long currentThreadId)` 方法,异步加锁,无需返回是否成功。代码如下:
```
// RedissonLock.java
@Override
public RFuture<Void> lockAsync() {
return lockAsync(-1, null);
}
@Override
public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit) {
// 获得线程编号
long currentThreadId = Thread.currentThread().getId();
// 异步锁
return lockAsync(leaseTime, unit, currentThreadId);
}
@Override
public RFuture<Void> lockAsync(long currentThreadId) {
return lockAsync(-1, null, currentThreadId);
}
1: @Override
2: public RFuture<Void> lockAsync(long leaseTime, TimeUnit unit, long currentThreadId) {
3: // 创建 RPromise 对象,用于异步回调
4: RPromise<Void> result = new RedissonPromise<Void>();
5: // 异步加锁
6: RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
7: ttlFuture.onComplete((ttl, e) -> {
8: // 如果发生异常,则通过 result 通知异常
9: if (e != null) {
10: result.tryFailure(e);
11: return;
12: }
13:
14: // lock acquired
15: // 如果获得到锁,则通过 result 通知获得锁成功
16: if (ttl == null) {
17: if (!result.trySuccess(null)) { // 如果处理 result 通知对结果返回 false ,意味着需要异常释放锁
18: unlockAsync(currentThreadId);
19: }
20: return;
21: }
22:
23: // 创建 SUBSCRIBE 订阅的 Future
24: RFuture<RedissonLockEntry> subscribeFuture = subscribe(currentThreadId);
25: subscribeFuture.onComplete((res, ex) -> {
26: // 如果发生异常,则通过 result 通知异常
27: if (ex != null) {
28: result.tryFailure(ex);
29: return;
30: }
31:
32: // 异步加锁
33: lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
34: });
35: });
36:
37: return result;
38: }
```
- 第 6 行:调用 [「5.1 `#tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,执行异步获得锁。
- 第 7 至 35 行:又是熟悉的配方,在回调中,处理响应的加锁结果。差异就在第 34 行,见 [「7.1 更强的 lockAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 的详细解析。
## 7.1 更强的 lockAsync
实际上,`#lockAsync(long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId)` 方法,和 [「5.2 更强的 tryLockAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 是**基本一致**的。那么为什么不直接重用呢?注意,这个方法**不需要考虑等待超时**,有一种“劳资有钱,必须拿到锁”。
代码如下:
```
// RedissonLock.java
private void lockAsync(long leaseTime, TimeUnit unit, RFuture<RedissonLockEntry> subscribeFuture, RPromise<Void> result, long currentThreadId) {
// 获得分布式锁
RFuture<Long> ttlFuture = tryAcquireAsync(leaseTime, unit, currentThreadId);
ttlFuture.onComplete((ttl, e) -> {
// 如果发生异常,则取消订阅,并通过 result 通知异常
if (e != null) {
unsubscribe(subscribeFuture, currentThreadId);
result.tryFailure(e);
return;
}
// lock acquired
// 如果获得到锁,则取消订阅,并通过 result 通知获得锁成功
if (ttl == null) {
unsubscribe(subscribeFuture, currentThreadId);
if (!result.trySuccess(null)) {
unlockAsync(currentThreadId);
}
return;
}
// 获得当前线程对应的 RedissonLockEntry 对象
RedissonLockEntry entry = getEntry(currentThreadId);
// 尝试获得 entry 中的信号量,如果获得成功,说明 SUBSCRIBE 已经收到释放锁的消息,则直接立马再次去获得锁。
if (entry.getLatch().tryAcquire()) {
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
} else {
// waiting for message
// 创建 AtomicReference 对象,用于指向定时任务
AtomicReference<Timeout> futureRef = new AtomicReference<Timeout>();
// 创建监听器 listener ,用于在 RedissonLockEntry 的回调,就是我们看到的 PublishSubscribe 监听到释放锁的消息,进行回调。
Runnable listener = () -> {
// 如果有定时任务的 Future ,则进行取消
if (futureRef.get() != null) {
futureRef.get().cancel();
}
// 再次获得分布式锁
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
};
// 添加 listener 到 RedissonLockEntry 中
entry.addListener(listener);
// 下面,会创建一个定时任务。因为极端情况下,可能不存在释放锁的消息,例如说锁自动超时释放,所以需要改定时任务,在获得到锁的超时后,主动去抢下。
if (ttl >= 0) {
Timeout scheduledFuture = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// 移除 listener 从 RedissonLockEntry 中
if (entry.removeListener(listener)) {
// 再次获得分布式锁
lockAsync(leaseTime, unit, subscribeFuture, result, currentThreadId);
}
}
}, ttl, TimeUnit.MILLISECONDS);
// 记录 futureRef 执行 scheduledFuture
futureRef.set(scheduledFuture);
}
}
});
}
```
- 更加熟悉的配方,全程无需处理等待锁超时的逻辑。胖友自己瞅瞅,哈哈哈。
# 8. lock
> 艿艿:本小节和 [「6. tryLoc」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 相对,为**同步**加锁。再次扶你起来,胖友还可以继续怼源码。
`#tryLock(long waitTime, long leaseTime, TimeUnit unit)` 方法,同步加锁,无需返回是否成功。代码如下:
```
// RedissonLock.java
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lock(long leaseTime, TimeUnit unit) {
try {
lock(leaseTime, unit, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lock(-1, null, true);
}
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
lock(leaseTime, unit, true);
}
1: private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
2: long threadId = Thread.currentThread().getId();
3: // 同步获加锁
4: Long ttl = tryAcquire(leaseTime, unit, threadId);
5: // lock acquired
6: // 加锁成功,直接返回
7: if (ttl == null) {
8: return;
9: }
10:
11: // 创建 SUBSCRIBE 订阅的 Future
12: RFuture<RedissonLockEntry> future = subscribe(threadId);
13: // 阻塞等待订阅发起成功
14: commandExecutor.syncSubscription(future);
15:
16: try {
17: while (true) {
18: // 同步获加锁
19: ttl = tryAcquire(leaseTime, unit, threadId);
20: // lock acquired
21: // 加锁成功,直接返回
22: if (ttl == null) {
23: break;
24: }
25:
26: // waiting for message
27: // 通过 RedissonLockEntry 的信号量,阻塞等待锁的释放消息,或者 ttl/time 超时(例如说,锁的自动超时释放)
28: if (ttl >= 0) {
29: try {
30: getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
31: } catch (InterruptedException e) {
32: // 如果允许打断,则抛出 e
33: if (interruptibly) {
34: throw e;
35: }
36: // 如果不允许打断,则继续
37: getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
38: }
39: } else {
40: if (interruptibly) {
41: getEntry(threadId).getLatch().acquire();
42: } else {
43: getEntry(threadId).getLatch().acquireUninterruptibly();
44: }
45: }
46: }
47: } finally {
48: // 小细节,需要最终取消 SUBSCRIBE 订阅
49: unsubscribe(future, threadId);
50: }
51: // get(lockAsync(leaseTime, unit));
52: }
53:
54: private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
55: return get(tryAcquireAsync(leaseTime, unit, threadId));
56: }
```
- 太过熟悉,就不哔哔了。
------
至此,加锁的几种组合排列,我们就已经看完了。😈 是不是有一种加锁的 Lua 脚本蛮简单的,调用 Lua 脚本实现阻塞等待的逻辑,细节还是蛮多的。如果让艿艿自己来实现这块的逻辑,估计会有一些细节处理不到位。嘿嘿。
# 9. unlockAsync
`#unlockAsync(long threadId)` 方法,异步解锁。代码如下:
```
// RedissonLock.java
@Override
public RFuture<Void> unlockAsync() {
// 获得线程编号
long threadId = Thread.currentThread().getId();
// 执行解锁
return unlockAsync(threadId);
}
1: @Override
2: public RFuture<Void> unlockAsync(long threadId) {
3: // 创建 RPromise 对象,用于异步回调
4: RPromise<Void> result = new RedissonPromise<Void>();
5:
6: // 解锁逻辑
7: RFuture<Boolean> future = unlockInnerAsync(threadId);
8:
9: future.onComplete((opStatus, e) -> {
10: // 如果发生异常,并通过 result 通知异常
11: if (e != null) {
12: cancelExpirationRenewal(threadId);
13: result.tryFailure(e);
14: return;
15: }
16:
17: // 解锁的线程不对,则创建 IllegalMonitorStateException 异常,并通过 result 通知异常
18: if (opStatus == null) {
19: IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
20: + id + " thread-id: " + threadId);
21: result.tryFailure(cause);
22: return;
23: }
24:
25: // 取消定时过期
26: cancelExpirationRenewal(threadId);
27:
28: // 通知 result 解锁成功
29: result.trySuccess(null);
30: });
31:
32: return result;
33: }
```
- 第 7 行:调用 [3.2 `#unlockAsync(long threadId)`](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,执行解锁逻辑。
- 第 10 至 15 行:如果发生异常,并通过 `result` 通知异常。
- 第 17 至 23 行:解锁的线程不对,则创建 IllegalMonitorStateException 异常,并通过 `result` 通知异常。😈 这里,仔细回忆下解锁 Lua 脚本的返回值。嘿嘿。
- 第 26 行:调用 `#cancelExpirationRenewal(long threadId)` 方法取消定期过期。TODO
- 第 29 行:通知 `result` 解锁成功。
# 10. unlock
`#unlock()` 方法,同步解锁。代码如下:
```
// RedissonLock.java
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
```
- 简单,基于 [「9. `#unlockAsync(long threadId)`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法实现。
# 11. forceUnlock
`#forceUnlock()` 方法,强制解锁。代码如下:
```
// RedissonLock.java
@Override
public boolean forceUnlock() { // 同步
return get(forceUnlockAsync());
}
@Override
public RFuture<Boolean> deleteAsync() { // 异步
return forceUnlockAsync();
}
```
- 无论是同步还是异步的强制解锁,都是基于 [「3.3 `#forceUnlockAsync()`」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法实现。
# 12. ExpirationEntry
本小节,我们来看看在 [「3.4 renewExpirationAsync」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 中,提到的**续锁**的功能。
首先,我们来看看 ExpirationEntry 类。它是 RedissonLock 的内部类,记录**续租**任务的信息。代码如下:
```
// RedissonLock.java
public static class ExpirationEntry {
/**
* 线程与计数器的映射
*
* KEY线程编号
* VALUE计数
*/
private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
/**
* 定时任务
*/
private volatile Timeout timeout;
public ExpirationEntry() {
super();
}
/**
* 增加线程的计数
*
* @param threadId 线程编号
*/
public void addThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
counter = 1;
} else {
counter++;
}
threadIds.put(threadId, counter);
}
public boolean hasNoThreads() {
return threadIds.isEmpty();
}
public Long getFirstThreadId() {
if (threadIds.isEmpty()) {
return null;
}
return threadIds.keySet().iterator().next();
}
/**
* 减少线程的技术
*
* @param threadId 线程编号
*/
public void removeThreadId(long threadId) {
Integer counter = threadIds.get(threadId);
if (counter == null) {
return;
}
counter--;
if (counter == 0) {
threadIds.remove(threadId);
} else {
threadIds.put(threadId, counter);
}
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}
public Timeout getTimeout() {
return timeout;
}
}
```
- 可能粗略这么一看,有种然并卵的感觉,不要着急。我们下面接着看。
- 在 RedissonLock 的类中,有个
```
EXPIRATION_RENEWAL_MAP
```
静态属性,如下:
```
// RedissonLock.java
/**
* ExpirationEntry 的映射
*
* key {@link #entryName}
*/
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();
```
## 12.1 scheduleExpirationRenewal
`#scheduleExpirationRenewal()` 方法,发起**续锁**的定时任务。代码如下:
```
// RedissonLock.java
1: private void scheduleExpirationRenewal(long threadId) {
2: // 创建 ExpirationEntry 对象
3: ExpirationEntry entry = new ExpirationEntry();
4: // 添加到 EXPIRATION_RENEWAL_MAP 中
5: ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
6: // 添加线程编号到 ExpirationEntry 中
7: if (oldEntry != null) {
8: oldEntry.addThreadId(threadId);
9: } else {
10: entry.addThreadId(threadId);
11: // 创建定时任务,用于续锁
12: renewExpiration();
13: }
14: }
15:
16: private void renewExpiration() {
17: // 获得 ExpirationEntry 队形,从 EXPIRATION_RENEWAL_MAP 中
18: ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
19: if (ee == null) { // 如果不存在,返回
20: return;
21: }
22:
23: // 创建 Timeout 定时任务,实现定时续锁
24: Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
25:
26: @Override
27: public void run(Timeout timeout) throws Exception {
28: // 获得 ExpirationEntry 对象
29: ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
30: if (ent == null) { // 如果不存在,返回
31: return;
32: }
33: // 获得 threadId 编号
34: Long threadId = ent.getFirstThreadId();
35: if (threadId == null) { // 如果不存在,则返回
36: return;
37: }
38:
39: // 执行续锁
40: RFuture<Boolean> future = renewExpirationAsync(threadId);
41: future.onComplete((res, e) -> {
42: // 如果发生异常,则打印异常日志,并返回。此时,就不会在定时续租了
43: if (e != null) {
44: log.error("Can't update lock " + getName() + " expiration", e);
45: return;
46: }
47:
48: // 续锁成功,则重新发起定时任务
49: if (res) {
50: // reschedule itself
51: renewExpiration();
52: }
53: });
54: }
55:
56: }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // 定时,每 internalLockLeaseTime / 3 秒执行一次。
57:
58: // 设置定时任务到 ExpirationEntry 中
59: ee.setTimeout(task);
60: }
```
- 第 2 至 10 行:创建 ExpirationEntry 对象,并添加到 `EXPIRATION_RENEWAL_MAP` 中,之后添加线程编号到 ExpirationEntry 中。
- 第 12 行:当且仅当
```
entryName
```
对应的 ExpirationEntry 对象
首次
创建时,才会调用
```
#renewExpiration()
```
方法,创建定时任务,用于续锁。
- 【重要】第 23 至 56 行:创建 Timeout 定时任务,定时每 `internalLockLeaseTime / 3` 秒执行一次**续锁**。
- 第 40 行:会调用 [「3.4 `#renewExpirationAsync(long threadId)` 方法」](https://svip.iocoder.cn/Redisson/ReentrantLock/#) 方法,执行续锁。
- 第 42 至 46 行:如果发生异常,则打印异常日志,并返回。此时,就不会在定时续租了。
- 【重要】第 48 至 52 行:如果续锁成功,则调用 `#renewExpiration()` 方法,重新发起定时任务。
- 第 59 行:设置定时任务到 ExpirationEntry 中。
## 12.2 cancelExpirationRenewal
`#cancelExpirationRenewal(Long threadId)` 方法,取消定时任务。代码如下:
```
// RedissonLock.java
void cancelExpirationRenewal(Long threadId) {
// 获得 ExpirationEntry 对象
ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (task == null) { // 如果不存在,返回
return;
}
// 从 ExpirationEntry 中,移除线程编号
if (threadId != null) {
task.removeThreadId(threadId);
}
// 如果 ExpirationEntry 的所有线程被清空
if (threadId == null || task.hasNoThreads()) {
// 取消定时任务
task.getTimeout().cancel();
// 从 EXPIRATION_RENEWAL_MAP 中移除
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
}
}
```
- 当且仅当 `entryName` 对应的 `EXPIRATION_RENEWAL_MAP` 的 ExpirationEntry 对象,所有线程都被移除后,会取消定时任务。
- 整体逻辑比较简单,胖友自己瞅瞅。
# 13. 其它方法
其它方法,比较简单,胖友自己瞅瞅即可。代码如下:
```
// RedissonLock.java
@Override
public Condition newCondition() {
// TODO implement
throw new UnsupportedOperationException();
}
@Override
public boolean isLocked() {
return isExists();
}
@Override
public RFuture<Boolean> isLockedAsync() {
return isExistsAsync();
}
@Override
public RFuture<Boolean> isExistsAsync() {
return commandExecutor.writeAsync(getName(), codec, RedisCommands.EXISTS, getName());
}
@Override
public boolean isHeldByCurrentThread() {
return isHeldByThread(Thread.currentThread().getId());
}
@Override
public boolean isHeldByThread(long threadId) {
RFuture<Boolean> future = commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, RedisCommands.HEXISTS, getName(), getLockName(threadId));
return get(future);
}
private static final RedisCommand<Integer> HGET = new RedisCommand<Integer>("HGET", ValueType.MAP_VALUE, new IntegerReplayConvertor(0));
public RFuture<Integer> getHoldCountAsync() {
return commandExecutor.writeAsync(getName(), LongCodec.INSTANCE, HGET, getName(), getLockName(Thread.currentThread().getId()));
}
@Override
public int getHoldCount() {
return get(getHoldCountAsync());
}
```
# 666. 彩蛋
细节比想象中的多,代码也比想象中的多,整篇博客差不多写了 1.5 天左右。
胖友看完之后,如果还有一些细节不清晰,建议可以多多调试。总的来说,如果项目中,想要使用 Redis 分布式锁,可以考虑直接使用 Redisson 提供的 Redisson 可重入锁。可能有些胖友项目中,已经使用了 Jedis 作为 Redis 的客户端,那么可以单独使用 Redisson 来做分布式锁。
之前也和一些朋友聊过,他们项目也是采用 Jedis + Redisson 的组合,妥妥的,没问题。
满足,在 2019-10-04 的 18:59 写完了这篇博客,美滋滋。