中间件

基于 Redisson 实现切面分布式锁(升级版)

言七墨 · 11月23日 · 2020年 · · 577次已读

背景

之前使用七墨博客Redis作为分布式锁,但是因为本身特性的局限性,存在很多隐患,故将Redis替换成了Redisson。(本文暂且只考虑Redis是单实例的场景,如果想要了解在多实例情况下,可以自行搜索RedLock的具体实现,也很简单)。

其中,通过Redis作为分布式锁的隐患有:

  • 耗时不确定的任务一般不设置过期时间,假如业务机器宕机,unlock操作不会执行,会造成死锁
  • threadA的锁被threadB释放的问题
  • 假如设定了不合适的锁的最大超时时间,有可能任务还没有执行完,锁就过期了

基于Redisson作为分布式锁的执行流程

基于上次Redisson的代码又做了一些封装,使整个流程功能更强大,具体功能,可直接参考RedisLock中的配置信息。

代码

  • Redisson客户端配置
/**
 * @author: qimok
 * @since: 2020-03-05
 */
@Configuration
public class RedissonClientConfig {
 
    @Value("${spring.redis.host}")
    private String host;
 
    @Value("${spring.redis.port}")
    private String port;
 
    @Value("${spring.redis.password}")
    private String password;
 
    @Bean
    public RedissonClient getRedisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":" + port).setPassword(password);
        return Redisson.create(config);
    }
 
}
  • 注解代码
/**
 * @author: qimok
 * @since: 2020-03-05
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@SuppressWarnings("checkstyle:magicnumber")
public @interface RedisLock {

    /**
     * 加锁业务键
     */
    String key();

    /**
     * 尝试获取锁的等待时间
     * <p>
     *     在等待时间内获取锁成功,返回true
     *     等待时间结束了,还没有获取到锁那么返回false
     */
    String waitTime() default "0";

    /**
     * 分布式锁的租约时间(单位:毫秒)
     * <p>
     *     如果`不想`使用分布式锁的无限续租,则"指定"锁的租约时间(并发要求比较高的场景)
     *     如果`想要`使用分布式锁的无限续租,则"无需指定"租约时间(耗时比较久的单例Job、业务请求)
     */
    String leaseTime() default "-1";

    /**
     * 是否忽略加锁失败的异常(默认不忽略)
     */
    boolean ignoreFail() default false;

    /**
     * 是否支持降级(默认不支持)
     * <p>
     *     这里的降级指在`Redis`宕机时,不使用分布式锁,直接调用方法,尽可能保证服务内部业务逻辑的可用性(AP)
     *     考虑并发场景下,是否会造成数据的不一致/脏数据问题,假如业务上允许,则应该支持降级,否则不应该支持降级操作(CP)
     */
    boolean degrade() default false;

}
  • 切面代码
/**
 * @author: qimok
 * @since: 2020-03-05
 */
@Aspect
@Component
@Slf4j(module = MODULE_LOCK)
@SuppressWarnings({"checkstyle:magicnumber"})
public class RedisLockAspect {

    @Autowired
    private RedissonClient redissonClient;

    @Autowired
    private XxxDao xxxDao;

    /**
     * 分布式锁
     * <p>
     *     锁住的键(key):prefix + value,对应的值:value
     */
    @Around("@annotation(com.xingren.message.infrastructure.aop.annotation.RedisLock) && @annotation(redisLock)")
    public Object aroundLockMethod(ProceedingJoinPoint joinPoint, RedisLock redisLock) {
        ExpressionParser parser = new SpelExpressionParser();
        EvaluationContext ctx = new StandardEvaluationContext();
        ctx.setVariable("arguments", joinPoint.getArgs());
        String key = !redisLock.key().contains("arguments")
                ? redisLock.key() : parser.parseExpression(redisLock.key()).getValue(ctx, String.class);
        Long waitTime = Long.valueOf(!redisLock.waitTime().contains("arguments")
                ? redisLock.waitTime() : parser.parseExpression(redisLock.waitTime()).getValue(ctx, String.class));
        Long leaseTime = Long.valueOf(!redisLock.leaseTime().contains("arguments")
                ? redisLock.leaseTime() : parser.parseExpression(redisLock.leaseTime()).getValue(ctx, String.class));
        RLock rLock = null;
        Boolean isLocked = Boolean.FALSE;
        try {
            rLock = getRLock(key);
            // >>> 降级逻辑 START <<<
            if (Objects.isNull(rLock)) {
                if (redisLock.degrade()) {
                    return degradeInvoke(joinPoint, key);
                }
                // 不支持降级,则抛出异常
                throw new RuntimeException(String.format("RLock 获取失败, key: %s", key));
            }
            // >>> 降级逻辑 END <<<
            isLocked = tryLock(rLock, key, waitTime, leaseTime);
            if (isLocked) {
                log.debug(String.format("成功获取 RedisLock, key: %s ", key));
                return joinPoint.proceed();
            }
            // 失败抛出异常
            throw new RuntimeException(String.format("RedisLock 获取失败, key: %s", key));
        } catch (RuntimeException e) {
            if (redisLock.ignoreFail()) {
                return null;
            }
            throw new RuntimeException(e);
        } catch (Throwable throwable) {
            log.error(String.format("获取 RedisLock 后业务逻辑执行失败, key: %s, error: %s",
                    key, getRootCauseMessage(throwable)));
            return null;
        } finally {
            unlock(isLocked, rLock, key);
        }
    }

    private RLock getRLock(String key) {
        RLock lock;
        try {
            lock = redissonClient.getLock(key);
        } catch (Exception e) {
            lock = null;
            log.error(String.format("RLock 获取失败, key: %s", key));
        }
        return lock;
    }

    /**
     * 降级处理(使用 DB 的分布式锁的处理方式)
     */
    private Object degradeInvoke(ProceedingJoinPoint joinPoint, String key) {
        long threadId = Thread.currentThread().getId();
        log.warn(String.format("降级加 DbLock, key: %s, threadId: %s", key, threadId));
        Boolean isLocked = Boolean.FALSE;
        try {
            isLocked = sessionDao.tryLock(key, threadId);
            if (isLocked) {
                log.debug(String.format("成功获取 DbLock, key: %s,threadId:%s", key, threadId));
                return joinPoint.proceed();
            } else {
                throw new RuntimeException(String.format("DbLock 获取失败, key: %s, threadId: %s", key, threadId));
            }
        } catch (Throwable throwable) {
            log.error(String.format("获取 DbLock 后业务逻辑执行失败, key: %s, error: %s",
                    key, getRootCauseMessage(throwable)));
            return null;
        } finally {
            if (isLocked) {
                sessionDao.unLock(key, threadId);
                log.debug(String.format("释放 DbLock 成功, key: %s,threadId: %s", key, threadId));
            }
        }
    }

    /**
     * 尝试加锁
     */
    private Boolean tryLock(RLock lock, String key, Long waitTime, Long leaseTime) {
        try {
            return leaseTime != -1L ? lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS)
                    : lock.tryLock(waitTime, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            log.error(String.format("加 RedisLock 异常, key:%s, e:%s", key, e));
            throw new RuntimeException(e);
        }
    }

    /**
     * 解锁
     */
    private void unlock(Boolean isLocked, RLock lock, String key) {
        // 检查当前线程是否获得此锁
        if (isLocked && lock.isHeldByCurrentThread()) {
            lock.unlock();
            log.debug(String.format("释放 RedisLock 成功, key: %s", key));
        }
    }
}
  • 持久层代码
/**
 * @author: qimok
 * @since: 2020-10-12
 */
public interface DbLockDao {    

    /**
     * 尝试加分布式锁
     */
    Boolean tryLock(String key, Long threadId);

    /**
     * 尝试解锁
     */
    void unLock(String key, Long threadId);

}

public interface DbLockDaoImpl implements DbLockDao { 

    @Autowired
    private DbLockRepository dbLockRepo;

    @Override
    public Boolean tryLock(String key, Long threadId) {
        DbLock dbLock = new DbLock();
        dbLock.setDistributeKey(key);
        dbLock.setThreadId(threadId);
        return dbLockRepo.tryLock(po);
    }

    @Override
    public void unLock(String key, Long threadId) {
        dbLockRepo.unLock(key, threadId);
    }

}   

@Repository
public class DbLockRepository extends ... {

    //...

    public Boolean tryLock(DbLock dbLock) {
        return dsl.insertInto(this.table()).set(this.record(dbLock, false)).onDuplicateKeyIgnore().execute() == 1;
    }

    public void unLock(String key, Long threadId) {
        dsl.update(DB_LOCK).set(DB_LOCK.DISTRIBUTE_KEY, (Field<String>) null)
                .where(DB_LOCK.DISTRIBUTE_KEY.eq(key).and(DB_LOCK.THREAD_ID.eq(threadId)))
                .execute();
    }

}
  • 分布式锁表结构
CREATE TABLE `db_lock` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `distribute_key` varchar(50) DEFAULT NULL COMMENT '分布式key',
  `thread_id` bigint(20) NOT NULL COMMENT '线程id',
  PRIMARY KEY (`id`) USING BTREE,
  UNIQUE KEY `uniq_distribute_key` (`distribute_key`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='分布式锁表';

为什么不选择七墨博客Zooke言七墨eper作为分布式锁

首先了解下基于言七墨zk加锁(Curator框架)的逻辑:

  1. zk/lock/目录下创建一个临时有序的节点
  2. 创建节点成功后,获取/lock/目录下所有的临时节点
  3. 判断当前线程创建的节点是否是所有节点中序号最小的那个节点
  4. 如果是,则代表加锁成功
  5. 否则,当前线程对当前节点序号的前一个节点添加一个事件监听器:比如当前线程创建的节点序号为/lock/003,所有的节点列表为[/lock/001,/lock/002,/lock/003],则当前线程对/lock/002节点添加一个事件监听器
  6. 如果/lock/001节点对应的线程释放了连接,/lock/001节点就会被删除,则代表锁被释放了,它的下一个节点/lock/002对应的线程就会被唤醒,然后重新从第3步开始执行

使用zk作为分布式锁优劣

  • 优势:
    • zk天生的设计定位就是分布式协调服务,它的CP特性使数据能够保证强一致性
    • 它的锁模型比较健壮、简单易用,且适合做分布式锁
    • 在获取不到锁的时候,只需要添加一个监听器(Watcher机制)就可以了,不用一直轮询,尝试加锁的性能消耗比较小
  • 劣势:
    • 如果有较多的客户端频繁的申请加锁、释放锁,言七墨对于zk集群的压力会比较大(不能承载很高的并发写流量)
0 条回应