技术标签: redisson的锁的类型
分布式锁
关于什么是分布式锁,举一个很简单的例子:现在大多数人都会使用信用卡,信用卡有账单日和还款日,账单日就是统计你需要在还款日截止之前需要还多少钱。
上述场景意味着银行的服务器需要在账单日有一个定时任务去统计每个人的账单,如果银行的服务器是一台机器的话,按照最简单的计算方式顺序遍历计算就可以了,但是面对庞大的用户群体,一台服务器是远远不够的,除了需要解决计算的问题,也需要考虑单点故障的问题,所以部署多台服务器是显然的。但是部署多台服务器需要面临的就是共享资源的抢占问题,如果有一台服务器正在计算你的账单,另一台服务器正好也在计算你的账单,那是不是意味着你一个月需要还多份账单给银行呢,这样显然是不可能的。这个时候分布式锁就能解决上述统计多份账单的问题,分布式锁会在第一台服务器开始计算之前先锁定资源,其他的服务器就会发现你的账单资源已经被锁定,也就不会重复计算。而是把自己的计算能力放在其他还未被锁定的资源上面。
如何实现分布式锁?
实现分布式锁有很多方式,比如数据库实现,zookeeper实现,还有我们现在要介绍的redis实现等等。其中的优缺点我们就不细说,直接来看redisson实现分布式锁。
maven依赖:
org.redisson
redisson
3.10.6
如果需要将redisson集成到spring里面去:
/**
* 创建RedissonClient(这里以单机redis为例子)
*
* @return
*/
@Bean(destroyMethod = "shutdown")
public static RedissonClient singletonRedisson() {
Config config = new Config();
// 可以将配置参数放到配置文件中
config.useSingleServer().setAddress("redis://127.0.0.1:6379");
return Redisson.create(config);
}
使用方式:
@Service
public class XxxService {
/**
* 从Spring中引用
*/
@Autowired
private RedissonClient redisson;
/**
* 需要获取分布式锁的业务方法
*/
public void redisson() {
RLock rLock = redisson.getLock("lockName");
// 如果获取锁成功
if (rLock.tryLock()) {
try {
System.out.println("获取锁成功,执行业务逻辑");
} finally {
// 解锁
rLock.unlock();
System.out.println("解锁成功");
}
}
}
}
下面开始分析redisson的实现方式:
tryLock()
/**
* 尝试获取锁
*
* @return
*/
public boolean tryLock() {
return (Boolean)this.get(this.tryLockAsync());
}
/**
* 一看就是为了获取异步执行的结果,所以重点应该看tryLockAsync()
*
* @return
*/
protected final V get(RFuture future) {
return this.commandExecutor.get(future);
}
/**
* 异步获取锁
*
* @return
*/
public RFuture tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}
/**
* 尝试获取锁
*
* @param threadId
* @return
*/
public RFuture tryLockAsync(long threadId) {
return this.tryAcquireOnceAsync(-1L, (TimeUnit) null, threadId);
}
/**
* 尝试获取锁
*
* @param leaseTime
* @param unit
* @param threadId
* @return
*/
private RFuture tryAcquireOnceAsync(long leaseTime, TimeUnit unit, long threadId) {
/**
* 如果自定义过期时间
*/
if (leaseTime != -1L) {
return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
}
/**
* 如果是默认的过期时间
*/
else {
RFuture ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
//没有异常
if (e == null) {
//成功获取锁
if (ttlRemaining) {
// 更新过期时间
this.scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}
}
/**
* 真正的获取锁的代码
* @param leaseTime
* @param unit
* @param threadId
* @param command
* @param
* @return
*/
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
//这个字段后面用作续过期时间
this.internalLockLeaseTime = unit.toMillis(leaseTime);
/**
* 利用lua脚本执行相关逻辑
*/
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);",
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果key不存在
if (redis.call('exists', KEYS[1]) == 0)
//设置这个key,并且设置超时时间,获取锁成功
then redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
//如果key存在
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
//就对key自增,并且重置过期时间(重入锁)
then redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil; end;
//获取key剩下的时间
return redis.call('pttl', KEYS[1]);
/**
* 重置过期时间
* @param threadId
*/
private void scheduleExpirationRenewal(long threadId) {
RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
//检查是否存在指定的定时任务
RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
//如果已经存在指定的定时任务
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
}
//如果是第一次创建这个定时任务
else {
entry.addThreadId(threadId);
this.renewExpiration();
}
}
/**
* 定时任务重置过期时间
*/
private void renewExpiration() {
RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (ee != null) {
//每三分之一的过期时间续一次,直至解锁
Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
public void run(Timeout timeout) throws Exception {
RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
if (ent != null) {
//拿到第一个线程id
Long threadId = ent.getFirstThreadId();
if (threadId != null) {
//续时间
RFuture future = RedissonLock.this.renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
// 如果有异常,打印日志
if (e != null) {
RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
}
// 没有异常就继续续时间
else {
RedissonLock.this.renewExpiration();
}
});
}
}
}
}, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
}
/**
* 重置指定线程id的过期时间
* @param threadId
* @return
*/
protected RFuture renewExpirationAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;",
Collections.singletonList(this.getName()),
new Object[]{this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果已经存在这个key,那就给它续时间
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1)
then redis.call('pexpire', KEYS[1], ARGV[1]);
return 1; end;
return 0;
unlock()
/**
* 解锁
*/
public void unlock() {
try {
this.get(this.unlockAsync(Thread.currentThread().getId()));
} catch (RedisException var2) {
if (var2.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException)var2.getCause();
} else {
throw var2;
}
}
}
/**
* 解锁(异步)
* @param threadId
* @return
*/
public RFuture unlockAsync(long threadId) {
RPromise result = new RedissonPromise();
RFuture future = this.unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
if (e != null) {
//取消续时间的任务
this.cancelExpirationRenewal(threadId);
result.tryFailure(e);
} else if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
result.tryFailure(cause);
} else {
//取消续时间的任务
this.cancelExpirationRenewal(threadId);
result.trySuccess((Object)null);
}
});
return result;
}
/**
* 真正执行解锁的代码
* @param threadId
* @return
*/
protected RFuture unlockInnerAsync(long threadId) {
return this.commandExecutor.evalWriteAsync(this.getName(),
LongCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;",
Arrays.asList(this.getName(), this.getChannelName()),
new Object[]{LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId)});
}
//如果不存在key,直接返回
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0)
then return nil;end;
//对key减一后再统计key的重入次数(考虑重入锁)
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
//如果次数大于0,就重置过期时间
if (counter > 0)
then redis.call('pexpire', KEYS[1], ARGV[2]); return 0;
//如果次数小于等于0,就删除key,并且发布消息(考虑到订阅了消息的线程正在等待)
else redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1; end;
return nil;
/**
* 取消续时间的任务
* @param threadId
*/
void cancelExpirationRenewal(Long threadId) {
//从Map中获取任务
RedissonLock.ExpirationEntry task = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
if (task != null) {
if (threadId != null) {
//移除线程id
task.removeThreadId(threadId);
}
if (threadId == null || task.hasNoThreads()) {
//取消续时间的任务
task.getTimeout().cancel();
//移除任务
EXPIRATION_RENEWAL_MAP.remove(this.getEntryName());
}
}
}
从上述源码分析上看,
1.tryLock()的过期时间默认是30秒,并且会存在一个定时续过期时间的任务,保证业务执行时间不会超过过期时间,抢占失败即返回false
2.tryLock(long waitTime, TimeUnit unit)这个方法我们没有分析,从源码中看,为了实现waitTime,使用了redis的订阅发布功能。也就是没有抢到锁的线程订阅消息,直至waitTime过期返回false或者被通知新一轮的开始抢占锁。当然,它如果抢占到锁,锁的过期时间也是30秒,同样也会存在一个定时任务续过期时间
3.tryLock(long waitTime, long leaseTime, TimeUnit unit)这个方法的参数leaseTime如果不是-1的话,是不会有定时任务续过期时间的,也就存在业务处理时间可能超过过期时间的风险。其他的和tryLock(long waitTime, TimeUnit unit)一致
4.lock()这个方法是要保证一定要抢到锁的,它的默认过期时间也是30秒,和tryLock()不同的是,它如果没抢占到锁,会一直自旋
以上是我对于redisson分布式锁的简单分析,感兴趣的小伙伴也可以自己对源码做一个简单了解。
文章浏览阅读275次。临近国庆,又回过头来鼓捣docker,因为从事php开发,所以还是先从环境入手。本来考虑搭建php+mysql+nginx+redis全部,但是由于使用的都是公司的mysql和redis,故只搭建php+nginx,因为我的操作系统是win10,一下操作都是在win下完成的。首先先拉取镜像,当然你也可以自己编写dockerfile去构建自己的镜像。这里先拉取nginx镜像:docker pu..._dockerfile 安装php+nginx
文章浏览阅读1.2k次,点赞41次,收藏16次。计算机毕业设计吊打导师Hadoop+Hive+PySpark旅游景点推荐 旅游推荐系统 景区游客满意度预测与优化 Apriori算法 景区客流量预测 旅游大数据 景点规划 知识图谱 机器学习 深度学习 人工智能
文章浏览阅读1.2k次。阅读五分钟,每日十点,和您一起终身学习,这里是程序员Android本篇文章主要介绍开发中我们没有源码的GMS Crash崩溃后的解决方案,通过阅读本篇文章,您将收获以下内容:一、gms.ui Service not registered CrashGMS(GoogleMobile Service)包是出口国外手机中Google限制必须要预制的,如果不预置无法过Google CTS认证,会导致手...
文章浏览阅读10w+次,点赞880次,收藏3.9k次。1、什么是进程?什么是线程?进程是:一个应用程序(1个进程是一个软件)。线程是:一个进程中的执行场景/执行单元。注意:一个进程可以启动多个线程。eg.对于java程序来说,当在DOS命令窗口中输入:java HelloWorld 回车之后。会先启动JVM,而JVM就是一个进程。JVM再启动一个主线程调用main方法。同时再启动一个垃圾回收线程负责看护,回收垃圾。最起码,现在的java程序中至少有两个线程并发,一个是垃圾回收线程,一个是执行main方法的主线程。3、进程和线程是什么关系?_java多线程
文章浏览阅读2.2k次。<template> <div class="video-player"> <div id="divPlugin" class="divPlugin" ref="divPlugin" v-if="plugin"> </div> <!-- <div class="down" v-else> <a href="http://jbfsys.oss-cn-bei.._海康威视divplugin 浮层问题
文章浏览阅读3.6k次,点赞7次,收藏10次。 Android P之init进程启动源码分析指南之三前言 在前面的篇章Android P之init进程启动源码分析指南之一和Android P之init进程启动源码分析指南之二讲解了init进程经过前面两个阶段以后,已经建立了相关的文件系统,属性系统,SELinux安全策略系统。但是我们知道init进程做的远远不止这些,还要启动一些Android的native service系统服务及其其他相关的操作,但是如果都是像属性系统和SELinux系统那样一行行代码去做,显得有点杂乱繁琐,而且_exec_start update_verifier_nonencrypted
文章浏览阅读4.8k次,点赞3次,收藏44次。静态手势识别总体方案0.说明1.实现目标2.实现步骤1)总体思路2)每部分效果基于高斯肤色模型和动态阈值的手势分割基于Canny算法的轮廓提取基于Hu矩的量化基于傅里叶描述子的量化分类融合特征分类其他尝试0.说明静态手势识别是2019年四五月份做的一次设计,实验平台是Matlab。主要针对静态手势,采用肤色模型分离手部区域,提取手势的轮廓信息,采用不同的描述方式进行量化,最后采用BP神经网络和..._手势识别有哪些方案
文章浏览阅读1.4w次。##机器学习(Machine Learning)&深度学习(Deep Learning)资料(Chapter 1)---#####注:机器学习资料[篇目一](https://github.com/ty4z2008/Qix/blob/master/dl.md)共500条,[篇目二](https://github.com/ty4z2008/Qix/blob/master/dl2.md)开始更新...
文章浏览阅读9.7k次。GTX650刷入UEFI模块gtx650黑苹果需要关闭csm模式,但是微软规定csm模式关闭必须得所有pcie设备开启uefi模式,但是老的gtx650中vbios没有uefi模块,所以需要先刷vbios,刷入方法在我的资源解压后的Inno3D UEFI Updater中的readme中,前提需要开启主板uefi模式,最好用gpuz备份好原来的bios。下面是简单的步骤,仅供参考,目前是实现了华硕GTX650-FMLII-1GD5的UEFI刷入,其他的显卡并没有实物卡测试。打开主板UEFI模式:._gtx650 uefi
文章浏览阅读2.6w次,点赞26次,收藏269次。拆分输出字符串求n阶方阵里所有数的和合法的三角形个数整型数组求整数对最小和机器人走迷宫【2022 Q1 Q2 |200分】数格子两个超大整型数相加字符串格式化输出【2022 Q1 Q2 |100分】树形目录操作【2022 Q1 Q2 |200分】整型数组按个位值排序奥运会奖牌榜的排名【2022 Q1 | 100分】无重复字符的最长子串最长回文子串两个字符串的最长公共子串括号生成字符串处理一个正整数到 Excel 编号之间的转换字符串压缩搜索矩阵免单统计数组的转换藏宝_华为机试垂直矩阵
文章浏览阅读853次。HZNUOJ1527(巨水题题解)Description输入两个正整数X,Y,求出[X,Y]内被除3余1并且被除5余3的整数的和Input输入两个正整数X,YOutput求所有满足条件的数的和对 x到 y进行遍历 并判断同时累计符合要求的个数。#include<stdio.h>int main(){ int x, y,sum=0; scanf("%d %d", &x, &y); for (int i = x; i <= y; i++) { i_输入两个正整数x,y(其中x
文章浏览阅读1.8k次。分享一篇讲得特别详细的blog前后端分离架构:Web实现前后端分离,前后端解耦