背景
之前使用过Redis
作为分布式锁,但是因为本身特性的局限性,存在很多隐患,故将Redis
替换成了Redisson
。(本文暂且只考虑Redis
是单实例的场景,如果想要了解在多实例情况下,可以自行搜索RedLock
的具体实现,也很简单)。
其中,通过Redis
作为分布式锁的隐患有:
- 耗时不确定的任务一般不设置过期时间,假如业务机器宕机,
unlock
操作不会执行,会造成死锁 threadA
的锁被threadB
释放的问题- 假如设定了不合适的锁的最大超时时间,有可能任务还没有执行完,锁就
https://qimok.cn 过期了 七墨博客 8230;
基于Redisson
作为分布式锁的执行流程
基于上次Redisson
的代码又做了一些封装,使整个流程功能更强大,具体功能,可直接参考Redi
中的配置信息。
代码
Redisson
客户端配置
/**
* @author: qimok
* @since: 2020-03-05
*/
@Configuration
public class RedissonClientConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private String port;
@Value("${spring.redis.password}")
private String password;
@Bean
public RedissonClient getRedisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
return Redisson.create(config);
}
}
- 注解代码
/**
* @author: qimok
* @since: 2020-03-05
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SuppressWarnings("checkstyle:magicnumber")
public @interface RedisLock {
/**
* 加锁业务键
*/
String key();
/**
* 尝试获取锁的等待时间
* <p>
* 在等待时间内获取锁成功,返回true
* 等待时间结束了,还没有获取到锁那么返回false
*/
String waitTime() default "0";
/**
* 分布式锁的租约时间(单位:毫秒)
* <p>
* 如果`不想`使用分布式锁的无限续租,则"指定"锁的租约时间(并发要求比较高的场景)
* 如果`想要`使用分布式锁的无限续租,则"无需指定"租约时间(耗时比较久的单例Job、业务请求)
*/
String leaseTime() default "-1";
/**
* 是否忽略加锁失败的异常(默认不忽略)
*/
boolean ignoreFail() default false;
/**
* 是否支持降级(默认不支持)
* <p>
* 这里的降级指在`Redis`宕机时,不使用分布式锁,直接调用方法,尽可能保证服务内部业务逻辑的可用性(AP)
* 考虑并发场景下,是否会造成数据的不一致/脏数据问题,假如业务上允许,则应该支持降级,否则不应该支持降级操作(CP)
*/
boolean degrade() default false;
}
- 切面代码
/**
* @author: qimok
* @since: 2020-03-05
*/
@Aspect
@Component
@Slf4j(module = MODULE_LOCK)
@SuppressWarnings({"checkstyle:magicnumber"})
public class RedisLockAspect {
@Autowired
private RedissonClient redissonClient;
@Autowired
private XxxDao xxxDao;
/**
* 分布式锁
* <p>
* 锁住的键(key):prefix + value,对应的值:value
*/
@Around("@annotation(com.xingren.message.infrastructure.aop.annotation.RedisLock) && @annotation(redisLock)")
public Object aroundLockMethod(ProceedingJoinPoint joinPoint, RedisLock redisLock) {
ExpressionParser parser = new SpelExpressionParser();
EvaluationContext ctx = new StandardEvaluationContext();
ctx.setVariable("arguments", joinPoint.getArgs());
String key = !redisLock.key().contains("arguments")
? redisLock.key() : parser.parseExpression(redisLock.key()).getValue(ctx, String.class);
Long waitTime = Long.valueOf(!redisLock.waitTime().contains("arguments")
? redisLock.waitTime() : parser.parseExpression(redisLock.waitTime()).getValue(ctx, String.class));
Long leaseTime = Long.valueOf(!redisLock.leaseTime().contains("arguments")
? redisLock.leaseTime() : parser.parseExpression(redisLock.leaseTime()).getValue(ctx, String.class));
RLock rLock = null;
Boolean isLocked = Boolean.FALSE;
try {
rLock = getRLock(key);
// >>> 降级逻辑 START <<<
if (Objects.isNull(rLock)) {
if (redisLock.degrade()) {
return degradeInvoke(joinPoint, key);
}
// 不支持降级,则抛出异常
throw new RuntimeException(String.format("RLock 获取失败, key: %s", key));
}
// >>> 降级逻辑 END <<<
isLocked = tryLock(rLock, key, waitTime, leaseTime);
if (isLocked) {
log.debug(String.format("成功获取 RedisLock, key: %s ", key));
return joinPoint.proceed();
}
// 失败抛出异常
throw new RuntimeException(String.format("RedisLock 获取失败, key: %s", key));
} catch (RuntimeException e) {
if (redisLock.ignoreFail()) {
return null;
}
throw new RuntimeException(e);
} catch (Throwable throwable) {
log.error(String.format("获取 RedisLock 后业务逻辑执行失败, key: %s, error: %s",
key, getRootCauseMessage(throwable)));
return null;
} finally {
unlock(isLocked, rLock, key);
}
}
private RLock getRLock(String key) {
RLock lock;
try {
lock = redissonClient.getLock(key);
} catch (Exception e) {
lock = null;
log.error(String.format("RLock 获取失败, key: %s", key));
}
return lock;
}
/**
* 降级处理(使用 DB 的分布式锁的处理方式)
*/
private Object degradeInvoke(ProceedingJoinPoint joinPoint, String key) {
long threadId = Thread.currentThread().getId();
log.warn(String.format("降级加 DbLock, key: %s, threadId: %s", key, threadId));
Boolean isLocked = Boolean.FALSE;
try {
isLocked = sessionDao.tryLock(key, threadId);
if (isLocked) {
log.debug(String.format("成功获取 DbLock, key: %s,threadId:%s", key, threadId));
return joinPoint.proceed();
} else {
throw new RuntimeException(String.format("DbLock 获取失败, key: %s, threadId: %s", key, threadId));
}
} catch (Throwable throwable) {
log.error(String.format("获取 DbLock 后业务逻辑执行失败, key: %s, error: %s",
key, getRootCauseMessage(throwable)));
return null;
} finally {
if (isLocked) {
sessionDao.unLock(key, threadId);
log.debug(String.format("释放 DbLock 成功, key: %s,threadId: %s", key, threadId));
}
}
}
/**
* 尝试加锁
*/
private Boolean tryLock(RLock lock, String key, Long waitTime, Long leaseTime) {
try {
return leaseTime != -1L ? lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)
: lock.tryLock(waitTime, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
log.error(String.format("加 RedisLock 异常, key:%s, e:%s", key, e));
throw new RuntimeException(e);
}
}
/**
* 解锁
*/
private void unlock(Boolean isLocked, RLock lock, String key) {
// 检查当前线程是否获得此锁
if (isLocked && lock.isHeldByCurrentThread()) {
lock.unlock();
log.debug(String.format("释放 RedisLock 成功, key: %s", key));
}
}
}
- 持久
言七墨 层代码
/**
* @author: qimok
* @since: 2020-10-12
*/
public interface DbLockDao {
/**
* 尝试加分布式锁
*/
Boolean tryLock(String key, Long threadId);
/**
* 尝试解锁
*/
void unLock(String key, Long threadId);
}
public interface DbLockDaoImpl implements DbLockDao {
@Autowired
private DbLockRepository dbLockRepo;
@Override
public Boolean tryLock(String key, Long threadId) {
DbLock dbLock = new DbLock();
dbLock.setDistributeKey(key);
dbLock.setThreadId(threadId);
return dbLockRepo.tryLock(po);
}
@Override
public void unLock(String key, Long threadId) {
dbLockRepo.unLock(key, threadId);
}
}
@Repository
public class DbLockRepository extends ... {
//...
public Boolean tryLock(DbLock dbLock) {
return dsl.insertInto(this.table()).set(this.record(dbLock, false)).onDuplicateKeyIgnore().execute() == 1;
}
public void unLock(String key, Long threadId) {
dsl.update(DB_LOCK).set(DB_LOCK.DISTRIBUTE_KEY, (Field<String>) null)
.where(DB_LOCK.DISTRIBUTE_KEY.eq(key).and(DB_LOCK.THREAD_ID.eq(threadId)))
.execute();
}
}
- 分布式锁表结构
CREATE TABLE `db_lock` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`distribute_key` varchar(50) DEFAULT NULL COMMENT '分布式key',
`thread_id` bigint(20) NOT NULL COMMENT '线程id',
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `uniq_distribute_key` (`distribute_key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式锁表';
为什么不选择Zookeeper
作为分布式锁
首先了解下基于zk
加锁(Curator
框架)的逻辑:
- 在
zk
的/lock/
目录下创建一个临时有序的节点 - 创建节点成功后,获取
/lock/
目录下所有的临时节点 - 判断当前线程创建的节点是否是所有节点中序号最小的那个节点
- 如果是,则代表加锁成功
- 否则,当前线程对当前节点序号的前一个节点添加一个事件监听器:比如当前线程创建的节点序号为
/lock/003
,所有的节点列表为[/lock/001,/lock/002,/lock/003]
,则当前线程对/lock/002
节点添加一个事件监听器 - 如果
/lock/001
节点对应的线程释放了连接,/lock/001
节点就会被删除,则代表锁被释放了,它的下一个节点/
对应的线程就会被唤醒,然后重新从第七墨博客 lock/0023
步开始执行
使用zk
作为分布式锁优劣
- 优势:
zk
天生的设计定位就是分布式协调服务,它的CP
特性使数据能够保证强一致性- 它的锁模型比较健壮、简单易用,且适合做分布式锁
- 在获取不到锁的时候,只需要添加一个监听器(
Watcher
机制)就可以了,不用一直轮询,尝试加锁的性能消耗比较小
- 劣势:
- 如果有较多的客户端频繁的申请加锁、释放锁,对于
zk
集群的压力会比较大(不能承载很高的并发写流量)
- 如果有较多的客户端频繁的申请加锁、释放锁,对于