所谓的分布式锁重入是指同一个线程或者是同一个客户端可以多次获取同一个分布式锁而不会导致锁竞争或者死锁问题出现的一种机制,简单的来讲,锁重入就是要让同一个线程在已经持有了锁的情况下还能够再次获取并且能够正确的释放这个锁而不会导致死锁问题的出现,这种实现机制是对于普通的分布式锁的一种扩展,主要就是为了应对在业务操作中同一个线程多次请求同一个锁的情况。
??在Redis实现的分布式锁中,想要实现锁重入机制需要对锁操作进行额外的处理,因为Redis本身是没有对相关的机制进行支持,所以需要通过各种的扩展手段来实现这个功能,一般比较常见的方式就是通过判断锁的唯一标识以及锁计数器的方式来实现锁重入操作。
锁重入的原理
??锁的持有者标识
??在之前的分享中,我们介绍过了如何通过Redis来实现分布式锁的基本原理,一般情况下我们会通过一个唯一标识来标记锁的持有者是谁,例如比较常用的就是通过线程id、通过客户端id等方式,当一个线程获取到锁的时候,在Redis中我们通过记录线程的唯一标识来保证锁的唯一性。
??锁计数器
??在单线程实现锁重入机制的时候,其底层实现就是通过维护了一个锁计数器来记录进入锁的线程的次数,这里也是同样的道理,如果同一个线程再次请求了同样的锁,那么我们还是可以通过同样的机制来记录锁的锁定次数。当每次线程请求同一个锁的时候,Redis可以通过检查锁是否已经被当前线程所持有的方式来判断是否对计数器进行加一操作,如果持有那么计数器加一,如果没有那么就会判断锁是否被其他线程占用,是否可以获取等操作,并且获取成功之后,计数器就会初始化为1。
??锁释放
??既然提到了锁的重入、计数器等操作,那么锁能否被释放的条件就会变成计数器是否为1,如果计数器为零那么表示没有现成使用该锁了,那么该锁就可以被释放,否则就是进行计数器减一操作,表示持有锁的一个线程释放了该锁,其他线程还在锁定中。
??这里博主提出一个问题,分布式锁的释放是否需要遵守加锁顺序?有兴趣的读者可以在评论区讨论一下。
Redis实现分布式重入锁的步骤
??假设,我们通过SET命令的方式来实现分布式锁的创建操作,并且通过线程ID或者是UUID来指定分布式锁的持有者的唯一标识。如下所示。
加锁操作
??通过SET key value NX PX time命令来实现加锁操作,当进行加锁操作的时候,如果判断到锁已经存在了,那么就需要检查当前锁是否被当前线程持有,如果是那么就对计数器加一,如果不是那么就是等待或者是拒绝的操作。
计数器管理
??当通过上面的命令进行加锁的时候,锁的value中存储的就不只是一个锁的唯一标识了,还会存储一个计数器标识,当进行锁重入处理的时候,这个计数器也会随之加一减一。最为简单的存储结构就是通过Redis中Hash来存储,这样方式更加简单高效。
锁释放
??当需要进行锁释放的时候,需要检查当前线程持有的锁的计数器,如果计数器大于1,那么就表示该线程还有其他的层次的调用来持有该锁,这个时候表示持有该锁的一层线程执行逻辑处理完成,如果计数器为1,那么就表示就只有当前这个线程持有该锁了,用玩之后就可以直接对其进行释放了。
具体代码实现
??这里我们通过一个Python代码来简单演示一下Redis实现分布式重入锁的机制,这里我们用到的两个命令分别是SET和HINCRBY。其中HINCRBY这个命令是Redis提供的一个用来在Hash结构上执行加法操作的命令,也就是说它可以对Hash结构中的指定的整数进行加一操作,初始化为0,基础语法如下所示。
HINCRBY key field increment
- key:哈希表的名称(键)。
- field:哈希表中要更新的字段名。
- increment:要增加的整数值。这个值可以是正数或负数。
锁机制实现
??如下所示。
import redis
import uuid
import time
class RedisReentrantLock:
def __init__(self, redis_client, lock_key, expire_time=3000):
self.redis = redis_client
self.lock_key = lock_key
self.expire_time = expire_time
self.lock_owner = str(uuid.uuid4()) # 用于标识锁的持有者
def acquire(self):
while True:
# 使用 Redis 的 SETNX 原子操作来设置锁
# 如果锁不存在,则设置成功并返回 True
if self.redis.setnx(self.lock_key, self.lock_owner):
# 如果锁不存在,设置成功,设置过期时间
self.redis.pexpire(self.lock_key, self.expire_time)
self.redis.hset(self.lock_key, 'counter', 1) # 初始计数器为 1
return True
else:
# 如果锁已经存在,检查是否是当前持有者
current_owner = self.redis.get(self.lock_key)
if current_owner == self.lock_owner.encode('utf-8'):
# 如果是当前持有者,增加计数器
self.redis.hincrby(self.lock_key, 'counter', 1)
return True
else:
# 否则等待一段时间后再试
time.sleep(0.1)
def release(self):
# 获取锁的计数器
counter = int(self.redis.hget(self.lock_key, 'counter') or 0)
if counter > 1:
# 如果计数器大于 1,减少计数器
self.redis.hincrby(self.lock_key, 'counter', -1)
else:
# 如果计数器为 1,完全释放锁
self.redis.delete(self.lock_key)
使用该机制
??实现完该分布式锁之后,接下来我们就来看看如何使用这个操作。
# 创建 Redis 客户端
r = redis.StrictRedis(host='localhost', port=6379, db=0)
# 创建一个 RedisReentrantLock 实例
lock = RedisReentrantLock(r, 'my_lock')
# 获取锁
if lock.acquire():
try:
# 执行临界区代码
print("Lock acquired, performing task...")
time.sleep(2) # 模拟执行任务
finally:
# 释放锁
lock.release()
else:
print("Failed to acquire lock")
总结??
通过上面的实现,我们简单总结一下,想要实现锁的重入,首先需要知道锁重入的条件,也就是通过判断获取锁的客户端线程唯一标识与当前持有锁的线程的唯一标识是否一致,其次就是需要再记录锁绑定的同时还要记录一个锁的计数器,然后通过各种的原子性的操作来保证这些操作不会因为并发而导致其他问题出现,解决了这些问题之后,就是需要考虑更加深层次的问题。也就是锁的超时机制、锁的并发、锁的释放等等。这样我们就可以在Redis中实现锁重入机制来保证同一线程多次获取和释放同一把锁而不会造成阻塞的问题。