随着企业软件开发中微服务化不断发展,应用分布式部署如今已是家常便饭,而分布式锁更是个老生常谈的技术话题。笔者发现,目前网上关于分布式锁的文章都不够全面。真实生产环境实现分布式锁是有很多重要的细节点。笔者,参考了一系列文章以及公司本身的生产实践,阐述了分布式锁的实际应用中的要点(包含理论分析与代码实现)。

1、什么是分布式锁?为什么要用分布式锁?

  • 服务分布式部署之后,多个实例(Java进程)争抢共享资源(往往这种情况都是非幂等的操作),需要加锁控制,保证同一时间只有一个客户端对共享资源进行操作,保证数据一致。例如,商品秒杀时,没有正确对商品数量加锁进行并发控制的话,可能出现商品超卖

  • 实现分布式锁需要同时满足四个条件:

    • 互斥,同一时刻只能有一个客户端获取到锁。
    • 不会发生死锁,一个客户端异常导致没有释放锁,其他客户端也要能获取到锁。
    • 解铃还须系铃人,加锁解锁必须是同一客户端,不能误解了别的客户端加的锁。
    • 具有容错性,大多数redis节点正常运行,客户端就能获取锁和解锁。

2、如何用redis来实现分布式锁?

  • 使用redis的setnx(set if not exists)指令实现,只允许一个客户端占用(获取锁),先来先占,业务逻辑处理完毕后,再调用delete指令删除key(释放锁)。
1
127.0.0.1:6379> SET lock:uuid randomNumer EX 5 NX

3、使用redis做分布式锁要注意的问题?

从4个方面考虑,获取锁、释放锁、超时、集群环境下

case 1:

  • 获取锁后,在进行业务逻辑处理的过程中,发生了异常(包括但不限于程序异常、服务器异常、进程奔溃等),导致delete指令没有执行,那么锁就永远不会释放,死锁了。

  • 所以我们在获取锁时为key设置好过期时间,这样即使出现异常,key也会自动过期,把锁释放。(redis2.8之后setnx和expire可以在一条指令上执行)。

case 2:

  • 但是又由于不同情况下业务逻辑的处理时间可能是不定的,可能锁自动过期了,另一客户端获取到锁了,当前客户端的业务代码也可能没有执行完,而等到等到当前客户端业务代码执行完毕,来删除key,释放锁的时候,就释放了别的客户端加的锁了。

case 3:

  • 我们可以在获取锁时,set一个随机数,在释放锁的时候先匹配这个随机数,随机数能匹配上才释放锁,确保当前线程的锁不会被别的线程释放。

  • 但是匹配随机数和删除key并不是一个原子操作,redis也没有提供满足这种需求的原子命令,我们可以使用lua脚本来解决,保证释放锁命令的原子性。

    1
    2
    3
    4
    5
    if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
    else
    return 0
    end
  • 除了使用lua脚本,还可以通过什么方式来解决这个问题?

    • redis事务(生产上并不推荐,用lua即可)。

case 4:

  • 上一步解决了锁释放的问题,但是锁自动过期的问题还是没有解决(即失效时间设置太短,获取锁的客户端可能没处理完业务,锁就过期了,失效时间设置太长,一旦持有锁的客户端出现异常,释放锁失败,其他客户端就长时间无法获取到锁)。Redisson中的看门狗,解决了这一问题(异步定时任务检查锁是否过期,没有过期就自动延长锁的过期时间,只有当前线程释放锁,看门狗才会退出)。

case 5:

  • 前面所述场景在单机情况下问题不大,但是在集群环境(包括redis所有类型的集群部署架构)中,可能主节点挂了,锁在没有来得及同步到从节点,这时候另一客户端请求过来,直接获取到锁了,那么就出现多个客户端同时持有锁的情况了。
  • 对于这个问题,redis的作者antirez提出了Redlock。

Java代码实现(使用的spring-data-redis 2.3.4.RELEASE中的RedisTemplate操作redis):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Test
public void redisDistributedLockTest(){
// key要能唯一区分共享资源
String redisKey = "lock:distributed";
// case 2,随机值
String value = UUID.randomUUID() + ":" + Thread.currentThread().getName();
long expireTm = 100000;
long timeout = 5000;

try {
if(lock(redisKey, value, expireTm, timeout)){
// 加锁成功,开始执行业务逻辑
System.err.println("123456789");

}
}catch (Exception e){
e.printStackTrace();
}finally {
// case 1
// 必须在finally中释放锁
unlock(redisKey + "1", value);
}
}


private boolean lock(String key, String value, long expireTime, long timeout){
timeout *= 1000_000;
long startTime = System.nanoTime();

// 在超时时间范围内轮询获取锁
while (System.nanoTime() - startTime < timeout) {
// SET key value EX expireTime NX , 获取锁
if(redisTemplate.opsForValue().setIfAbsent(key, value, expireTime, TimeUnit.MILLISECONDS)){
return true;
}

// 没获取到锁,休眠一下再抢,降低线程竞争烈度,避免cpu空转
try {
Random random = new Random();
TimeUnit.NANOSECONDS.sleep(random.nextInt(30));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return false;
}

private void unlock(String key, String value){
// https://docs.spring.io/spring-data/redis/docs/2.3.4.RELEASE/reference/html/#scripting
ResourceScriptSource resourceScriptSource =
new ResourceScriptSource(new ClassPathResource("checkanddel.lua"));
RedisScript<Long> checkAndDel = RedisScript.of(resourceScriptSource.getResource(), Long.class);
// case 2
// check value值一直的话,删除key,释放锁。
redisTemplate.execute(checkAndDel, new ArrayList<>(Arrays.asList(key)), value);
}

4、Redlock算法的原理?

Redlock需要有多个master节点, 并且这些节点完全互相独立,不存在主从复制或者其他集群协调机制。

我们假设有5个master,那么,Redlock加锁全过程如下

  1. 获取当前Unix时间,以毫秒为单位。

  2. 依次尝试从5个实例中,使用相同key和随机值获取锁。(设置锁时,客户端需要设置一个网络连接和响应超时时间,这个超时时间应该小于锁的失效时间。例如,你的锁自动失效为10s,则超时时间应该在5-50ms之间,这样可以避免服务端redis已经挂掉,客户端还死死地等待响应结果。如果服务端没有在规定的时间内响应,客户端应该尽快尝试另外的Redis实例)。

  3. 客户端使用当前时间减去开始获取锁的时间(步骤1获取的时间),得到获取锁使用的时间。当且仅当从大多数的Redis节点(这里的话是3个)都获取到锁,并且获取锁使用的时间小于锁的失效时间,整个锁才算获取成功

  4. 如果获取到锁了,key的真正有效时间等于有效时间减去获取锁所使用的时间(即步骤3计算得出的获取锁使用的时间)。

  5. 如果因为某些原因,获取锁失败(比方,没有在至少N/2 + 1个Redis实例中获取到锁,或者取锁时间已经超过了有效时间),那么客户端应该在所有实例上进行解锁(即使有些实例上没有设置锁成功)。

  • Redlock释放锁,向所有Redis实例发送释放锁命令即可,不用关心之前有没有从Redis实例中成功获取到锁。

5、Redlock存在的问题?

节点崩溃后重启
  • 设想A,B,C,D,E五个节点,
  • 客户端1在A,B,C上设置锁成功,D,E上设置失败,满足多数节点设置成功,客户端1获取锁成功;
  • 节点C崩溃重启了,客户端1在C上的设置的锁没有持久化下来,丢失了。
  • 客户端2在C,D,E上设置锁成功,也满足多数节点设置成功,客户端2获取锁成功;
  • 这时有两个客户端同时获取到了同一资源的锁。
时钟跳跃
  • 设想A,B,C,D,E五个节点,
  • 客户端1在A,B,C上设置锁成功,D,E上设置失败,满足多数节点设置成功,客户端1获取锁成功;
  • 节点C时钟向前跳跃,导致上面设置的锁快速过期。
  • 客户端2在C,D,E上设置锁成功,也满足多数节点设置成功,客户端2获取锁成功;
  • 这时有两个客户端同时获取到了同一资源的锁。
解决办法
  • 节点崩溃后,不是立即重启,而是延迟重启,等待重启的时间应该大于锁的有效时间。
  • 时钟跳跃的问题,Redis作者提出,应该禁止人为修改系统时间,使用一个不会“跳跃”式地调整系统时钟的ntpd程序。

Redlock并没有解决业务处理时长过长,超过锁的有效时间,导致锁失效的问题。

Tips: 释放锁时要向所有节点都发起释放锁的指令,是因为对应节点设置锁失败时,可能节点执行set设置成功了,但是返回给客户端的响应包丢失了,在客户端看来只是超时失败。

6、Redisson源码解析

Redisson提供了多种分布式锁的实现,如可重入锁、公平锁、联锁、RedLock、读写锁。是redis官网推荐的Java实现的redis分布式锁类库。详情可见redisson wiki 8. 分布式锁和同步器,对redisson分布式锁使用写的十分详尽了,笔者这里不再赘述。

Redisson中的看门狗,实现了对锁续期,解决了业务处理时长过长,超过锁的有效时间,导致锁失效的问题。笔者这里对Redisson源码中可重入锁的实现中这段逻辑以及获取锁、释放锁进行分析说明。

Redisson获取锁的核心方法在RedissonLock.java:244

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
// RedissonLock.java:244 获取锁
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// 异步获取锁,得到一个future,这个类继承了java.util.concurrent包中的Future和CompletionStage,使用起来就像java中的CompletableFuture
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// future执行完成的回调函数
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}

// lock acquired
if (ttlRemaining == null) {
// 获取到锁就另外起一个schedule线程为锁自动续期
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}

// RedissonLock.java:355 实际调用lua脚本加锁的方法。
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);

// KEYS[1]即为getName(),也就是set锁的key值
// ARGV[1]即为internalLockLeaseTime,key的过期时间
// ARGV[2]即为getLockName(threadId),客户端连接线程的id拼上当前线程id,就是设置锁时set的value值
// 这段lua脚本中就是两个if判断,最后返回key的ttl
// 第一个if,如果锁不存在就使用hash结构hincrby设置key和value,并设置过期时间
// 第二个if,如果锁存在,比较value值,看是不是当前线程的锁,是的话,自增1,并设置过期时间(同一线程多次调用lock,值自增1,可重入)
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

// RedissonLock.java:305
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}

// RedissonLock.java:269
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}

Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}

RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}

if (res) {
// reschedule itself
renewExpiration();
}
});
}
// internalLockLeaseTime即为 lockWatchdogTimeout,看门狗超时时间,默认30s
// 看门狗续期。默认每10s检测一次,看锁是否需要续期,续期至30s
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

ee.setTimeout(task);
}


// RedissonLock.java:573 释放锁
protected RFuture<Boolean> unlockInnerAsync(long threadId) {

return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

释放锁这段lua脚本的解析单独拎出来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- 如果释放锁时,value值与加锁时的不一样,return nil,锁不释放
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 否则就是一样,即为当前线程加的锁,key先减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 减1后的值如果大于0,说明当前锁是可重入锁,设置过期时间,不解锁
if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
-- 减1后值不大于0,那么直接删除key解锁,并发送解锁消息,解锁成功
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;

7、zookeeper实现分布式锁与redis实现的区别?

zk实现分布式锁:
  • 利用zk的临时节点(ephemeral znode)。
  • 客户端尝试创建一个临时节点,比如 /lock;创建成功那么就是拿到了锁,其他客户端会创建失败(znode已存在),即获取锁失败。
  • 持有锁的客户端在处理完业务逻辑后,要释放锁住的共享资源,将znode删除即可,其他客户端就又能继续获取锁了。
  • 临时节点的特性保证了,即使创建znode的客户端崩溃了,相应的znode也会被自动删除。这保证了锁一定会被释放。
zk是如何检测到客户端崩溃的呢?
  • 每个客户端都与zk的某台服务器维护着一个Session,这个Session依赖定时心跳来维持。如果zk长时间收不到客户端的心跳(Session过期时间),那么就认为Session过期,对应客户端创建的临时节点就都会被自动删除。
  • 当客户端由于GC卡顿,导致Session超时,锁失效后,其他客户端获取到锁,还是可能出现多个客户端同时操作共享资源的问题。
zk的watch机制用在分布式锁上:
  • 客户端获取锁失败时,不一定要宣告锁失败。客户端可以进入一种等待状态,当创建锁设置的临时节点被删除后,zk通过watch机制通知客户端去设置节点获取锁。这样实现了本地锁try…lock的效果。(但是要避免羊群效应
zk实现分布式锁:
zk集群环境下分布式锁的状况:
  • 使用zk做分布式锁,要么获取不到锁,一旦获取到了,那么节点的数据是一致的,不会出现redis那种异步同步数据丢失的问题。(zk往集群中写数据的流程网上有很多图解
  • 不依赖全局事件,不会有时钟跳跃的问题。
  • 不依赖锁的有效时间,不会有超时导致锁失效的问题,但是可能有Session超时导致锁失效的问题。
zk实现分布式锁与redis实现的其他特性比较:
  • redis的读写性能比zk强多,高并发情况下,zk做分布式锁,会出现获取锁失败的情况,存在性能瓶颈。
  • zk还能实现读写锁,redis没有。
  • zk的watch机制能实现锁等待的功能,redis不行。
  • zk和redis实现的分布式锁可靠性都有点问题,但是zk的可靠性比redis强太多,但zk也存在性能瓶颈,所以还是要看具体业务场景来选用实现方案。

脚注

时钟跳跃:指时钟运行时的速度,例如比实际速度快10%或2倍。

1999年有论文指出NTP服务器经常提供不正确的时间。
虚拟化可能破坏内核计时。
服务器可能未配置NTP。
NTP返回了不正确的时间。
NTP通过不连续地将时钟跳到正确的时间来纠正偏差

操作系统时钟的一般实现:System.getCurrentTimeMillis()由gettimeofday()(C语言函数)支持的,gettimeofday()查找的是操作系统时钟,该时钟一般使用硬件计时器和NTP(一种网络时间同步服务)的组合来维护,该时钟一般返回一个整数,这个整数是通过某些协议,使用真实时间映射出来的,比如我们熟知的UTC。

羊群效应:一个特定的znode改变,zk触发了所有watch事件。客户端数量很多时,zk server 需要处理的操作会激增。所以需要避免多个客户端watch同一个node。解决方式,在znode序列上向下监视下一个znode就足够了。

参考

redis官网分布式锁文档
http://www.redis.cn/topics/distlock.html

基于Redis的分布式锁到底安全吗?
http://zhangtielei.com/posts/blog-redlock-reasoning.html
http://zhangtielei.com/posts/blog-redlock-reasoning-part2.html

zk与redis分布式锁对比
https://www.cnblogs.com/rjzheng/p/9310976.html

zk官方文档分布式锁介绍
https://zookeeper.apache.org/doc/r3.4.9/recipes.html#sc_recipes_Locks

其他博客关于zk分布式锁文章
https://ouyblog.com/2017/07/%E5%9F%BA%E4%BA%8EZooKeeper%E7%9A%84%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81(%E4%B8%80)
https://ouyblog.com/2017/07/%E5%9F%BA%E4%BA%8EZooKeeper%E7%9A%84%E5%88%86%E5%B8%83%E5%BC%8F%E9%94%81(%E4%BA%8C).html
https://www.cnblogs.com/codershuai/p/4182441.html