阿里面试:如何用Redis实现分布式锁?
wptr33 2024-12-25 16:02 17 浏览
前言
上一章节我提到了基于zk分布式锁的实现,这章节就来说一下基于Redis的分布式锁实现吧。
- zk实现分布式锁的传送门:zk分布式锁
在开始提到Redis分布式锁之前,我想跟大家聊点Redis的基础知识。
说一下Redis的两个命令:
SETNX key value
setnx 是SET if Not eXists(如果不存在,则 SET)的简写。
用法如图,如果不存在set成功返回int的1,这个key存在了返回0。
SETEX key seconds value
将值 value 关联到 key ,并将 key 的生存时间设为 seconds (以秒为单位)。
如果 key 已经存在,setex命令将覆写旧值。
有小伙伴肯定会疑惑万一set value 成功 set time失败,那不就傻了么,这啊Redis官网想到了。
setex是一个原子性(atomic)操作,关联值和设置生存时间两个动作会在同一时间内完成。
我设置了10秒的失效时间,ttl命令可以查看倒计时,负的说明已经到期了。
跟大家讲这两个命名也是有原因的,因为他们是Redis实现分布式锁的关键。
正文
开始前还是看看场景:
我依然是创建了很多个线程去扣减库存inventory,不出意外的库存扣减顺序变了,最终的结果也是不对的。
单机加synchronized或者Lock这些常规操作我就不说了好吧,结果肯定是对的。
我先实现一个简单的Redis锁,然后我们再实现分布式锁,可能更方便大家的理解。
还记得上面我说过的命令么,实现一个单机的其实比较简单,你们先思考一下,别往下看。
setnx
可以看到,第一个成功了,没释放锁,后面的都失败了,至少顺序问题问题是解决了,只要加锁,缩放后面的拿到,释放如此循环,就能保证按照顺序执行。
但是你们也发现问题了,还是一样的,第一个仔set成功了,但是突然挂了,那锁就一直在那无法得到释放,后面的线程也永远得不到锁,又死锁了。
所以....
setex
知道我之前说这个命令的原因了吧,设置一个过期时间,就算线程1挂了,也会在失效时间到了,自动释放。
我这里就用到了nx和px的结合参数,就是set值并且加了过期时间,这里我还设置了一个过期时间,就是这时间内如果第二个没拿到第一个的锁,就退出阻塞了,因为可能是客户端断连了。
加锁
整体加锁的逻辑比较简单,大家基本上都能看懂,不过我拿到当前时间去减开始时间的操作感觉有点笨, System.currentTimeMillis()消耗很大的。
/**
* 加锁
*
* @param id
* @return
*/
public boolean lock(String id) {
Long start = System.currentTimeMillis();
try {
for (; ; ) {
//SET命令返回OK ,则证明获取锁成功
String lock = jedis.set(LOCK_KEY, id, params);
if ("OK".equals(lock)) {
return true;
}
//否则循环等待,在timeout时间内仍未获取到锁,则获取失败
long l = System.currentTimeMillis() - start;
if (l >= timeout) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
jedis.close();
}
}
System.currentTimeMillis消耗大,每个线程进来都这样,我之前写代码,就会在服务器启动的时候,开一个线程不断去拿,调用方直接获取值就好了,不过也不是最优解,日期类还是有很多好方法的。
@Service
public class TimeServcie {
private static long time;
static {
new Thread(new Runnable(){
@Override
public void run() {
while (true){
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
long cur = System.currentTimeMillis();
setTime(cur);
}
}
}).start();
}
public static long getTime() {
return time;
}
public static void setTime(long time) {
TimeServcie.time = time;
}
}
解锁
解锁的逻辑更加简单,就是一段Lua的拼装,把Key做了删除。
你们发现没,我上面加锁解锁都用了UUID,这就是为了保证,谁加锁了谁解锁,要是你删掉了我的锁,那不乱套了嘛。
LUA是原子性的,也比较简单,就是判断一下Key和我们参数是否相等,是的话就删除,返回成功1,0就是失败。
/**
* 解锁
*
* @param id
* @return
*/
public boolean unlock(String id) {
String script =
"if redis.call('get',KEYS[1]) == ARGV[1] then" +
" return redis.call('del',KEYS[1]) " +
"else" +
" return 0 " +
"end";
try {
String result = jedis.eval(script, Collections.singletonList(LOCK_KEY), Collections.singletonList(id)).toString();
return "1".equals(result) ? true : false;
} finally {
jedis.close();
}
}
验证
我们可以用我们写的Redis锁试试效果,可以看到都按照顺序去执行了
思考
大家是不是觉得完美了,但是上面的锁,有不少瑕疵的,我没思考很多点,你或许可以思考一下,源码我都开源到我的GItHub了。
而且,锁一般都是需要可重入行的,上面的线程都是执行完了就释放了,无法再次进入了,进去也是重新加锁了,对于一个锁的设计来说肯定不是很合理的。
我不打算手写,因为都有现成的,别人帮我们写好了。
redisson
redisson的锁,就实现了可重入了,但是他的源码比较晦涩难懂。
使用起来很简单,因为他们底层都封装好了,你连接上你的Redis客户端,他帮你做了我上面写的一切,然后更完美。
简单看看他的使用吧,跟正常使用Lock没啥区别。
ThreadPoolExecutor threadPoolExecutor =
new ThreadPoolExecutor(inventory, inventory, 10L, SECONDS, linkedBlockingQueue);
long start = System.currentTimeMillis();
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
final RedissonClient client = Redisson.create(config);
final RLock lock = client.getLock("lock1");
for (int i = 0; i <= NUM; i++) {
threadPoolExecutor.execute(new Runnable() {
public void run() {
lock.lock();
inventory--;
System.out.println(inventory);
lock.unlock();
}
});
}
long end = System.currentTimeMillis();
System.out.println("执行线程数:" + NUM + " 总耗时:" + (end - start) + " 库存数为:" + inventory);
上面可以看到我用到了getLock,其实就是获取一个锁的实例。
RedissionLock也没做啥,就是熟悉的初始化。
public RLock getLock(String name) {
return new RedissonLock(connectionManager.getCommandExecutor(), name);
}
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
super(commandExecutor, name);
//命令执行器
this.commandExecutor = commandExecutor;
//UUID字符串
this.id = commandExecutor.getConnectionManager().getId();
//内部锁过期时间
this.internalLockLeaseTime = commandExecutor.
getConnectionManager().getCfg().getLockWatchdogTimeout();
this.entryName = id + ":" + name;
}
加锁
有没有发现很多跟Lock很多相似的地方呢?
尝试加锁,拿到当前线程,然后我开头说的ttl也看到了,是不是一切都是那么熟悉?
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
//当前线程ID
long threadId = Thread.currentThread().getId();
//尝试获取锁
Long ttl = tryAcquire(leaseTime, unit, threadId);
// 如果ttl为空,则证明获取锁成功
if (ttl == null) {
return;
}
//如果获取锁失败,则订阅到对应这个锁的channel
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
//再次尝试获取锁
ttl = tryAcquire(leaseTime, unit, threadId);
//ttl为空,说明成功获取锁,返回
if (ttl == null) {
break;
}
//ttl大于0 则等待ttl时间后继续尝试获取
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
//取消对channel的订阅
unsubscribe(future, threadId);
}
//get(lockAsync(leaseTime, unit));
}
获取锁
获取锁的时候,也比较简单,你可以看到,他也是不断刷新过期时间,跟我上面不断去拿当前时间,校验过期是一个道理,只是我比较粗糙。
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
//如果带有过期时间,则按照普通方式获取锁
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//先按照30秒的过期时间来执行获取锁的方法
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//如果还持有这个锁,则开启定时任务不断刷新该锁的过期时间
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
底层加锁逻辑
你可能会想这么多操作,在一起不是原子性不还是有问题么?
大佬们肯定想得到呀,所以还是LUA,他使用了Hash的数据结构。
主要是判断锁是否存在,存在就设置过期时间,如果锁已经存在了,那对比一下线程,线程是一个那就证明可以重入,锁在了,但是不是当前线程,证明别人还没释放,那就把剩余时间返回,加锁失败。
是不是有点绕,多理解一遍。
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit,
long threadId, RedisStrictCommand<T> command) {
//过期时间
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//如果锁不存在,则通过hset设置它的值,并设置过期时间
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,并且锁的是当前线程,则通过hincrby给数值递增1
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//如果锁已存在,但并非本线程,则返回过期时间ttl
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
解锁
锁的释放主要是publish释放锁的信息,然后做校验,一样会判断是否当前线程,成功就释放锁,还有个hincrby递减的操作,锁的值大于0说明是可重入锁,那就刷新过期时间。
如果值小于0了,那删掉Key释放锁。
是不是又和AQS很像了?
AQS就是通过一个volatile修饰status去看锁的状态,也会看数值判断是否是可重入的。
所以我说代码的设计,最后就万剑归一,都是一样的。
public RFuture<Void> unlockAsync(final long threadId) {
final RPromise<Void> result = new RedissonPromise<Void>();
//解锁方法
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.addListener(new FutureListener<Boolean>() {
@Override
public void operationComplete(Future<Boolean> future) throws Exception {
if (!future.isSuccess()) {
cancelExpirationRenewal(threadId);
result.tryFailure(future.cause());
return;
}
//获取返回值
Boolean opStatus = future.getNow();
//如果返回空,则证明解锁的线程和当前锁不是同一个线程,抛出异常
if (opStatus == null) {
IllegalMonitorStateException cause =
new IllegalMonitorStateException("
attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
//解锁成功,取消刷新过期时间的那个定时任务
if (opStatus) {
cancelExpirationRenewal(null);
}
result.trySuccess(null);
}
});
return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, EVAL,
//如果锁已经不存在, 发布锁释放的消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果释放锁的线程和已存在锁的线程不是同一个线程,返回null
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//通过hincrby递减1的方式,释放一次锁
//若剩余次数大于0 ,则刷新过期时间
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
//否则证明锁已经释放,删除key并发布锁释放的消息
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}
总结
这个写了比较久,但是不是因为复杂什么的,是因为个人工作的原因,最近事情很多嘛,还是那句话,程序员才是我的本职写文章只是个爱好,不能本末倒置了。
大家会发现,你学懂一个技术栈之后,学新的会很快,而且也能发现他们的设计思想和技巧真的很巧妙,也总能找到相似点,和让你惊叹的点。
就拿Doug Lea写的AbstractQueuedSynchronizer(AQS)来说,他写了一行代码,你可能看几天才能看懂,大佬们的思想是真的牛。
我看源码有时候也头疼,但是去谷歌一下,自己理解一下,突然恍然大悟的时候觉得一切又很值。
学习就是一条时而郁郁寡欢,时而开环大笑的路,大家加油,我们成长路上一起共勉。
我是敖丙,一个在互联网苟且偷生的工具人。
最好的关系是互相成就,大家的**「三连」**就是丙丙创作的最大动力,我们下期见!
注:如果本篇博客有任何错误和建议,欢迎人才们留言,你快说句话啊!
你知道的越多,你不知道的越多
相关推荐
- Python自动化脚本应用与示例(python办公自动化脚本)
-
Python是编写自动化脚本的绝佳选择,因其语法简洁、库丰富且跨平台兼容性强。以下是Python自动化脚本的常见应用场景及示例,帮助你快速上手:一、常见自动化场景文件与目录操作...
- Python文件操作常用库高级应用教程
-
本文是在前面《Python文件操作常用库使用教程》的基础上,进一步学习Python文件操作库的高级应用。一、高级文件系统监控1.1watchdog库-实时文件系统监控安装与基本使用:...
- Python办公自动化系列篇之六:文件系统与操作系统任务
-
作为高效办公自动化领域的主流编程语言,Python凭借其优雅的语法结构、完善的技术生态及成熟的第三方工具库集合,已成为企业数字化转型过程中提升运营效率的理想选择。该语言在结构化数据处理、自动化文档生成...
- 14《Python 办公自动化教程》os 模块操作文件与文件夹
-
在日常工作中,我们经常会和文件、文件夹打交道,比如将服务器上指定目录下文件进行归档,或将爬虫爬取的数据根据时间创建对应的文件夹/文件,如果这些还依靠手动来进行操作,无疑是费时费力的,这时候Pyt...
- python中os模块详解(python os.path模块)
-
os模块是Python标准库中的一个模块,它提供了与操作系统交互的方法。使用os模块可以方便地执行许多常见的系统任务,如文件和目录操作、进程管理、环境变量管理等。下面是os模块中一些常用的函数和方法:...
- 21-Python-文件操作(python文件的操作步骤)
-
在Python中,文件操作是非常重要的一部分,它允许我们读取、写入和修改文件。下面将详细讲解Python文件操作的各个方面,并给出相应的示例。1-打开文件...
- 轻松玩转Python文件操作:移动、删除
-
哈喽,大家好,我是木头左!Python文件操作基础在处理计算机文件时,经常需要执行如移动和删除等基本操作。Python提供了一些内置的库来帮助完成这些任务,其中最常用的就是os模块和shutil模块。...
- Python 初学者练习:删除文件和文件夹
-
在本教程中,你将学习如何在Python中删除文件和文件夹。使用os.remove()函数删除文件...
- 引人遐想,用 Python 获取你想要的“某个人”摄像头照片
-
仅用来学习,希望给你们有提供到学习上的作用。1.安装库需要安装python3.5以上版本,在官网下载即可。然后安装库opencv-python,安装方式为打开终端输入命令行。...
- Python如何使用临时文件和目录(python目录下文件)
-
在某些项目中,有时候会有大量的临时数据,比如各种日志,这时候我们要做数据分析,并把最后的结果储存起来,这些大量的临时数据如果常驻内存,将消耗大量内存资源,我们可以使用临时文件,存储这些临时数据。使用标...
- Linux 下海量文件删除方法效率对比,最慢的竟然是 rm
-
Linux下海量文件删除方法效率对比,本次参赛选手一共6位,分别是:rm、find、findwithdelete、rsync、Python、Perl.首先建立50万个文件$testfor...
- Python 开发工程师必会的 5 个系统命令操作库
-
当我们需要编写自动化脚本、部署工具、监控程序时,熟练操作系统命令几乎是必备技能。今天就来聊聊我在实际项目中高频使用的5个系统命令操作库,这些可都是能让你效率翻倍的"瑞士军刀"。一...
- Python常用文件操作库使用详解(python文件操作选项)
-
Python生态系统提供了丰富的文件操作库,可以处理各种复杂的文件操作需求。本教程将介绍Python中最常用的文件操作库及其实际应用。一、标准库核心模块1.1os模块-操作系统接口主要功能...
- 11. 文件与IO操作(文件io和网络io)
-
本章深入探讨Go语言文件处理与IO操作的核心技术,结合高性能实践与安全规范,提供企业级解决方案。11.1文件读写11.1.1基础操作...
- Python os模块的20个应用实例(python中 import os模块用法)
-
在Python中,...
- 一周热门
-
-
C# 13 和 .NET 9 全知道 :13 使用 ASP.NET Core 构建网站 (1)
-
因果推断Matching方式实现代码 因果推断模型
-
git pull命令使用实例 git pull--rebase
-
面试官:git pull是哪两个指令的组合?
-
git 执行pull错误如何撤销 git pull fail
-
git pull 和git fetch 命令分别有什么作用?二者有什么区别?
-
git fetch 和git pull 的异同 git中fetch和pull的区别
-
git pull 之后本地代码被覆盖 解决方案
-
还可以这样玩?Git基本原理及各种骚操作,涨知识了
-
git命令之pull git.pull
-
- 最近发表
- 标签列表
-
- git pull (33)
- git fetch (35)
- mysql insert (35)
- mysql distinct (37)
- concat_ws (36)
- java continue (36)
- jenkins官网 (37)
- mysql 子查询 (37)
- python元组 (33)
- mybatis 分页 (35)
- vba split (37)
- redis watch (34)
- python list sort (37)
- nvarchar2 (34)
- mysql not null (36)
- hmset (35)
- python telnet (35)
- python readlines() 方法 (36)
- munmap (35)
- docker network create (35)
- redis 集合 (37)
- python sftp (37)
- setpriority (34)
- c语言 switch (34)
- git commit (34)