中间件

基于 Redisson 实现切面分布式锁

言七墨 · 8月23日 · 2020年 · 576次已读

一、思考问题

之前项目中使用的是`基于 Redis七墨博客 实现分布式锁`中的基于 redis 1.5.* 版本使用 setIfAbsent实现言七墨的分布式锁,为了使用简单,也是通过AOP切面的方式实现的,但是随着并发量的增长,难免会产生一些担心:

  1. 假如设定了不合适的锁的最大超时时间,有可能任务还没有执行完,锁就过期了,同样的请求过来,获取锁成功,如果没有做好幂等,有可能造成数据不一致的问题,或者浪费IO资源。【Redisson锁续租特性可以解决此问题】
  2. 第1项,当第一个获取锁的线程还没有执行完,但是锁已经过期了,第二个线程又加锁成功,此时,第一个线程执行完业务操作后,释放锁的时候是没有做检查的,此时,会导致释放了第二个线程的锁,如此循环,有可能会造成严重的问题七墨博客。(如,秒杀中的超卖就有可能是此原因造成的)【Redisson自带检查当前线程是否获得此锁的方法-isHeldByCurrentThread()
  3. 对于开发人员来说,使用分布式锁,并不应该关心锁的value值。言七墨【当使用Redisson作为分布式锁的时候,开发人员无需显式设定valueRedisson框架会帮我们生成一个由UUID和加锁操作的线程的threadId用冒号拼接起来的字符串当作value值】

上面的中括号中顺便提了一下Redisson可以解决相应的问题,但是Redisson没有缺点了吗?

  • 当使用Redisson锁续租特性时,假如遇到微服务重启,是否会造成死锁?
    • 答案:不会。因为锁续租是通过Netty的时间轮(HashedWheelTimer)来实现的,服务启动时,假如用到锁续租的特性,会生成一个单例的时间轮,服务销毁时,此时间轮也会被destory掉,即锁不会再被续租,当达到指定的超时时间或者默认的超时时间30s的时候,锁会自动释放掉。
  • 当负责存储分布式锁的RedissonRedis客户端宕机,是否会造成死锁?答案:分情况猜测、讨论 。
    • 业务假如正常处理完成,此时由于Redis节点宕机,解锁操作会失败,此时当前锁有可能会被无限hang住,假如当前锁没有被无限 hang住,而是被失效掉了,但业务还在处理,那么订阅该锁的其它线程有可能会尝试获取锁,由于Redis节点宕机,理论上来说,是会加锁失败的;如果真的可以加锁成功,也会造成线程安全问题
    • 虽然使用Redisson作为分布式锁在Redis客户端宕机时,有可能会出现问题,但是使用Redis作为分布式锁,恐怕问题会更多,这里不做过多分析

二、Talk is cheap. Show me the code

1、引入 Redisson 的 jar

compile("org.redisson:redisson:3.5.7")

2、Redisson 客户端配置

/**
 * @author: qimok
 * @since: 2020-03-05
 */
@Configuration
public class RedissonClientConfig {

    @Value("${spring.redis.host}")
    private String host;

    @Value("${spring.redis.port}")
    private String port;

    @Value("${spring.redis.password}")
    private String password;

    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
        return Redisson.create(config);
    }

}

3、声明 RedisLock 注解

https://qimok.cn
/**
 * @author qimok
 * @since 2020-03-05
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SuppressWarnings("checkstyle:magicnumber")
public @interface RedisLock {

    /**
     * 业务键
     */
    String key();

    /**
     * 分布式锁的最大过期时间(单位:毫秒),默认不设置锁的最大超时时间
     * <p>
     *     如果不想使用分布式锁的无限续租,则"指定"锁的最大过期时间(并发要求比较高的场景)
     *     如果想要使用分布式锁的无限续租,则"无需指定"过期时间(耗时比较久的单例Job、业务请求)
     */
    String expire() default "-1";

}

4、切面配置

/**
 * @author qimok
 * @since 2020-03-05
 */
@Aspect
@Component
@Slf4j(module = MODULE_LOCK)
@SuppressWarnings({"checkstyle:magicnumber"})
public class RedisLockAspect {

    @Autowired
    private RedissonClient redissonClient;

    /**
     * 分布式锁
     * <p>
     *     锁住的键(key):prefix + value,对应的值:value
     */
    @Around("@annotation(com.xingren.message.infrastructure.aop.annotation.RedisLock) && @annotation(redisLock)")
    public Object aroundLockMethod(ProceedingJoinPoint joinPoint, RedisLock redisLock) {
        ExpressionParser parser = new SpelExpressionParser();
        EvaluationContext ctx = new StandardEvaluationContext();
        ctx.setVariable("arguments", joinPoint.getArgs());
        String key = redisLock.key().indexOf("arguments") == -1
                ? redisLock.key() : parser.parseExpression(redisLock.key()).getValue(ctx, String.class);
        Long leaseTime = redisLock.expire().indexOf("arguments") == -1
                ? Long.valueOf(redisLock.expire())
                : parser.parseExpression(redisLock.expire()).getValue(ctx, Long.class);
        RLock lock = redissonClient.getLock(key);
        boolean isLock = Boolean.FALSE;
        if (leaseTime != -1) {
            // 指定锁的租约时间
            try {
                /**
                 * 方法签名:tryLock(long waitTime, long leaseTime, TimeUnit unit)
                 * <p>
                 *     waitTime:拿不到锁的等待时间
                 *     leaseTime:租约时间(只要 leaseTime 的值不是 -1,就不会开启看门狗的定时任务,可防止死锁)
                 *     <p>
                 *         方法签名:tryLock(long waitTime, TimeUnit unit),此处的 leaseTime 默认为 -1,则会开启看门狗的定时任务
                 *     unit:时间单位
                 */
                isLock = lock.tryLock(0L, leaseTime, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                log.error(String.format("加分布式锁异常, key:%s, e:%s", key, e));
            }
        } else {
            // 使用分布式锁的无限续租特性(请注意,控制不好,容易产生死锁)
            isLock = lock.tryLock();
        }
        if (!isLock) {
            throw new LockFailedException(String.format("分布式锁获取失败, key: %s", key));
        }
        try {
            log.debug(String.format("成功获取分布式锁, key: %s ", key));
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            log.error(String.format("获取锁后业务逻辑执行失败, key: %s, error: %s", key, getRootCauseMessage(throwable)));
            throw new RuntimeException(throwable);
        } finally {
            // 检验该锁是否被线程占有(强调的是当前锁是否被占有,占有该锁的线程不一定是当前线程)& 检查当前线程是否获得此锁
            if (lock.isLocked() && lock.isHeldByCurrentThread()) {
                lock.unlock();
                log.debug(String.format("已经释放分布式锁, key: %s", key));
            }
        }
    }

    public class LockFailedException extends RuntimeException {
        LockFailedException(String message) {
            super(message);
        }
    }

}

5、使用方法

/**
 * 如果没有 xxx,就创建
 */
public Long createXxxIfAbsent(Xxx xxx) {
    // 其它的业务操作
    do {
        try {
            String key = xxx.getId();
            // 加锁创建 xxx
            Long xxxId = creatingWithLockService.createXxxWithLock(key, sleepTime, xxx, COMM_LOG);
            if (xxxId != 0L) {
                // 其它的业务操作
                return xxxId;
            }
        } catch (RuntimeException e) {
            handleRuntimeException(COMM_LOG, xxx.getXxxNo(), retries);
        }
    } while (retries <= retryTimes);
    throw new DomainException(XXX_CREATE_REACH_DEMAND_TIMES);
}

/**
 * 加锁失败时的重试
 */
public void handleRuntimeException(String commLog, String xxxNo, Integer retries) {
    // 最后一次尝试加锁失败,无需等待
    if (retries <= retryTimes) {
        // 加锁失败,休眠指定时间
        try {
            log.info(commLog + String.format("Xxx号:%s, 锁等待第 %s 次,等待时长 %s 毫秒", xxxNo, retries, sleepTime));
            Thread.sleep(sleepTime);
        } catch (InterruptedException ie) {
            log.error(commLog + String.format("Xxx号:%s, 锁等待异常!", xxxNo), ie);
        }
    }
}
// #arguments[0] 取 key 的值,#arguments[1] 取 sleepTime 的值
@RedisLock(key = "#arguments[0]", expire = "#arguments[1]")
public Long createXxxWithLock(String key, Long sleepTime, Xxx xxx, String commLog) {
    // 加锁成功后的业务逻辑处理
}
  • 如果无需重试,则调用加锁代码的时候catche一下异常(加锁失败会抛出运行时异常,即上面代码中的 LockFailedException),如下:
// ...
try {
    Long xxxId = creatingWithLockService.createXxxWithLock(key, sleepTime, xxx, COMM_LOG);
} catch (RuntimeException e) {
    log.debug("加锁失败...");
}
// ...

三、结语

已做过压力测试,且续租(debug过源码)和释放锁都是正常进行,上线许久,未发现过任何问题。

四、扩展

0 条回应