一、思考问题
之前项目中使用的是`基于 Redis 实现分布式锁`中的基于 redis 1.5.* 版本使用 setIfAbsent
实现的分布式锁,为了使用简单,也是通过AOP切面
的方式实现的,但是随着并发量的增长,难免会产生一些担心:
- 假如设定了不合适的锁的最大超时时间,有可能任务还没有执行完,锁就过期了,同样的请求过来,获取锁成功,如果没有做好幂等,有可能造成数据不一致的问题,或者浪费
IO
资源。【Redisson
的锁续租
特性可以解决此问题】 - 接
第1项
,当第一个获取锁的线程还没有执行完,但是锁已经过期了,第二个线程又加锁成功,此时,第一个线程执行完业务操作后,释放锁的时候是没有做检查的,此时,会导致释放了第二个线程的锁,如此循环,有可能会造成严重的问题。(如,秒杀中的超卖就有可能是此原因造成的)【Redisson
自带检查当前线程是否获得此锁的方法-isHeldByCurrentThread()
】 - 对于开发人员来说,使用分布式锁,并不应该关心锁的
value
值。【当使用Redisson
作为分布式锁的时候,开发人员无需显式设定value
值Redisson
框架会帮我们生成一个由UUID
和加锁操作的线程的threadId
用冒号拼接起来的字符串当作value
值】
上面的中括号中顺便提了一下Rediss
可以解决相应的问题,但是Redisson
没有缺点了吗?
- 当使用
Redisson锁续租
特性时,假如遇到微服务重启,是否会造成死锁?- 答案:不会。因为
锁续租
是通过N
的时间轮(七墨博客 ettyHashedWheelTimer
)来实现的,服务启动时,假如用到锁续租
的特性,会生成一个单例的时间轮,服务销毁时,此时间轮也会被destory
掉,即锁不会再被续租,当达到指定的超时时间或者默认的超时时间30s
的时候,锁会自动释放掉。
- 答案:不会。因为
- 当负责存储分布式锁的
Redisson
的言七墨 Redis
客户端宕机,是否会造成死锁?答案:分情况猜测、讨论 。- 业务假如正常处理完成,此时由于
Redis
节点宕机,解锁操作会失败,此时当前锁有可能会被无限hang
住,假如当前锁没有被无限hang
住,而是被失效掉了,但业务还在处理,那么订阅该锁的其它线程有可能会尝试获取锁,由于Redis
节点宕机,理论上来说,是会加锁失败的;如果真的可以加锁成功,也会造成线程安全问题
- 虽然使用
Redisson
作为分布式锁在言七墨 Redis
客户端宕机时,有可能会出现问题,但是使用Redis
作为分布式言七墨 锁,恐怕问题会更多,这里不做过多分析
- 业务假如正常处理完成,此时由于
二、Talk is cheap. Show me the code
1、引入 Redisson 的 jar
compile("org.redisson:redisson:3.5.7")
2、Redisson 客户端配置
/**
* @author: qimok
* @since: 2020-03-05
*/
@Configuration
public class RedissonClientConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
return Redisson.create(config);
}
}
3、声明 RedisLock 注解
/**
* @author qimok
* @since 2020-03-05
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SuppressWarnings("checkstyle:magicnumber")
public @interface RedisLock {
/**
* 业务键
*/
String key();
/**
* 分布式锁的最大过期时间(单位:毫秒),默认不设置锁的最大超时时间
* <p>
* 如果不想使用分布式锁的无限续租,则"指定"锁的最大过期时间(并发要求比较高的场景)
* 如果想要使用分布式锁的无限续租,则"无需指定"过期时间(耗时比较久的单例Job、业务请求)
*/
String expire() default "-1";
}
4、切面配置
/**
* @author qimok
* @since 2020-03-05
*/
@Aspect
@Component
@Slf4j(module = MODULE_LOCK)
@SuppressWarnings({"checkstyle:magicnumber"})
public class RedisLockAspect {
@Autowired
private RedissonClient redissonClient;
/**
* 分布式锁
* <p>
* 锁住的键(key):prefix + value,对应的值:value
*/
@Around("@annotation(com.xingren.message.infrastructure.aop.annotation.RedisLock) && @annotation(redisLock)")
public Object aroundLockMethod(ProceedingJoinPoint joinPoint, RedisLock redisLock) {
ExpressionParser parser = new SpelExpressionParser();
EvaluationContext ctx = new StandardEvaluationContext();
ctx.setVariable("arguments", joinPoint.getArgs());
String key = redisLock.key().indexOf("arguments") == -1
? redisLock.key() : parser.parseExpression(redisLock.key()).getValue(ctx, String.class);
Long leaseTime = redisLock.expire().indexOf("arguments") == -1
? Long.valueOf(redisLock.expire())
: parser.parseExpression(redisLock.expire()).getValue(ctx, Long.class);
RLock lock = redissonClient.getLock(key);
boolean isLock = Boolean.FALSE;
if (leaseTime != -1) {
// 指定锁的租约时间
try {
/**
* 方法签名:tryLock(long waitTime, long leaseTime, TimeUnit unit)
* <p>
* waitTime:拿不到锁的等待时间
* leaseTime:租约时间(只要 leaseTime 的值不是 -1,就不会开启看门狗的定时任务,可防止死锁)
* <p>
* 方法签名:tryLock(long waitTime, TimeUnit unit),此处的 leaseTime 默认为 -1,则会开启看门狗的定时任务
* unit:时间单位
*/
isLock = lock.tryLock(0L, leaseTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error(String.format("加分布式锁异常, key:%s, e:%s", key, e));
}
} else {
// 使用分布式锁的无限续租特性(请注意,控制不好,容易产生死锁)
isLock = lock.tryLock();
}
if (!isLock) {
throw new LockFailedException(String.format("分布式锁获取失败, key: %s", key));
}
try {
log.debug(String.format("成功获取分布式锁, key: %s ", key));
return joinPoint.proceed();
} catch (Throwable throwable) {
log.error(String.format("获取锁后业务逻辑执行失败, key: %s, error: %s", key, getRootCauseMessage(throwable)));
throw new RuntimeException(throwable);
} finally {
// 检验该锁是否被线程占有(强调的是当前锁是否被占有,占有该锁的线程不一定是当前线程)& 检查当前线程是否获得此锁
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
log.debug(String.format("已经释放分布式锁, key: %s", key));
}
}
}
public class LockFailedException extends RuntimeException {
LockFailedException(String message) {
super(message);
}
}
}
5、使用方法
/**
* 如果没有 xxx,就创建
*/
public Long createXxxIfAbsent(Xxx xxx) {
// 其它的业务操作
do {
try {
String key = xxx.getId();
// 加锁创建 xxx
Long xxxId = creatingWithLockService.createXxxWithLock(key, sleepTime, xxx, COMM_LOG);
if (xxxId != 0L) {
// 其它的业务操作
return xxxId;
}
} catch (RuntimeException e) {
handleRuntimeException(COMM_LOG, xxx.getXxxNo(), retries);
}
} while (retries <= retryTimes);
throw new DomainException(XXX_CREATE_REACH_DEMAND_TIMES);
}
/**
* 加锁失败时的重试
*/
public void handleRuntimeException(String commLog, String xxxNo, Integer retries) {
// 最后一次尝试加锁失败,无需等待
if (retries <= retryTimes) {
// 加锁失败,休眠指定时间
try {
log.info(commLog + String.format("Xxx号:%s, 锁等待第 %s 次,等待时长 %s 毫秒", xxxNo, retries, sleepTime));
Thread.sleep(sleepTime);
} catch (InterruptedException ie) {
log.error(commLog + String.format("Xxx号:%s, 锁等待异常!", xxxNo), ie);
}
}
}
// #arguments[0] 取 key 的值,#arguments[1] 取 sleepTime 的值
@RedisLock(key = "#arguments[0]", expire = "#arguments[1]")
public Long createXxxWithLock(String key, Long sleepTime, Xxx xxx, String commLog) {
// 加锁成功后的业务逻辑处理
}
- 如果无需重试,则调用加锁代码的时候
catche
一下异常(加锁失败会抛出运行时异常,即上面代码中的 LockFailedException),如下:
// ...
try {
Long xxxId = creatingWithLockService.createXxxWithLock(key, sleepTime, xxx, COMM_LOG);
} catch (RuntimeException e) {
log.debug("加锁失败...");
}
// ...
三、结语
已做过压力测试,且续租(debug
过源码)和释放锁都是正常进行,上线许久,未发现过任何问题。