数据库

谈谈分库分表的几个核心流程

言七墨 · 9月29日 · 2020年 · 842次已读

一、背景介绍

移动互联网时代,随着软件用户量的不断增长,由此产生的数据量也在飞速增长,比如,用户表、订单表、聊天消息表等。据统计,MySQL单表可以存储10亿级数据,只是这时性能比较差,业界公认MySQL单表容量在1KW量级是最佳状态,因为这时它的BTREE索引树高在3~5之间

既然一张表无法搞定,那么就想办法将数据放到多个地方,目前比较普遍的方案有3个:

  1. 分区
    • 要求数据不是海量(分区数有限,存储能力就有限)
    • 业务并发能力要求不高
  2. 分库分表
    • 互联网行业处理海量数据的通用方法
    • 发展几十年的RDBMS(关系型数据库)具有生态完善、绝对稳定、事务特性的优点,只要有软件的地方,它都是核心存储的首选
  3. NoSQL/NewSQL
    • NoSQL/NewSQL宣传的无论多厉害,就现在各大公司对它的定位,都是RDBMS的补充,而不是取而代之

本文就分库分表的一些核心流程展开介绍:

二、是单库分表,还是分库分表

  • 单库分表适用场景:
    1. 单表数据量太大,查询时需要扫描的行数太多,SQL执行效率低下
    2. CPU出现瓶颈
  • 分库分表适用场景:
    1. 磁盘读IO到了瓶颈:热点数据太多,数据库缓存放不下,每次查询时会产生大量的IO,导致查询速度低下
    2. 请求的数据太多、网络带宽不够
    3. 数据库连接数超过了最大限定值

分库分表的复杂度要高于单库分表,如果数据量不是特别大,且QPS也不是特别高,首选单库分表,待某些指标有接近阈值的迹象时,再考虑分库分表。

分库分表相对单库分表来说,复杂的地方有:

  • 需要约定多个分片数据源
  • 需要定义多个分片数据源的事务管理器
  • 数据一致性的处理方案(强一致 or 最终一致)
  • 数据迁移、后续扩容

三、分布式数据库中间件选型

本文主要针对时下比较流行的两款数据库中间件产品做下介绍:

主要指标Sharding-JDBCMyCat
所属Apache基于阿里 Cobar 二次开发,社区维护
活跃度
ORM支持任意任意
基于客户端还是服务端客户端服务端
分库支持支持
分表支持不支持单库分表
事务自带弱XA、最大努力送达型柔性事务自带弱XA
监控无,可通过其它方式支持自带
读写分离支持支持
限制部分 JDBC 方法不支持、SQL语句限制部分 JDBC 方法不支持、SQL语句限制
数据库连接池任意任意
MySQL交互协议JDBC Driver前后端均用 NIO
开发开发成本高,代码入侵大开发成本小,代码入侵小
运维维护成本低维护成本高
配置难度一般复杂

Sharding-JDBC

架构图:

简单介绍:

  • Sharding-JDBC是一款轻量级的框架,以工程依赖JAR的形式提供功能,无需额外部署和依赖,可以理解为增强版的JDBC驱动
  • 对于运维同事来说,只需要协助一些简单的配置及后续的扩容工作,无需关注底层代码与分片策略规则,相对MyCat,这是 Sharding-JDBC的优势,减少了部署成本以及运维同事的学习成本

MyCat

架构图

简单介绍:

  • MyCat并不是业务系统代码里面的配置,而是独立运行的中间件,所有配置都会交给运维同事执行
  • 对于运维同事来说,它是在数据库Server前增加的一层代理,MyCat本身不存数据,数据是在后端数据库上存储的,因此,数据可靠性以及事务等都是通过数据库保证的
  • MyCat down 掉的时候,系统不能对数据库进行操作,会对所有用户产生影响
  • MyCat比较适合大数据工作

通过以上分析,可见Sharding-JDBC相对MyCat来说,更轻量,首选肯定是Sharding-JDBC,只要代码层面做好防腐层(依赖倒置)的设计,就算以后数据量级达到了百亿、千亿,也可以更加灵活方便的替换其它中间件产品,甚至 NewSQL

四、分布式ID的生成方式

实现方式

  • 完全依赖数据源的方式:ID的生成规则,读取控制完全由数据源控制,常见的如数据库自增长ID、序列号、优雅的Flickr方案、基于Redis的原子操作incr/incrBy产生顺序号、MongodbObjectId、美团(Leaf)的号段模式…
  • 半依赖数据源的方式:ID的生成规则,有部分生成因子需要由数据源(或配置信息)控制,如百度的uid-generator、美团(Leahttps://qimok.cnf)的Snowflake模式…
  • 不依赖数据源的方式:ID的生成规则完全由机器信息独立计算,不依赖任何配https://qimok.cn置信息和数据记录,如常见的UUID及变种、GUID

实践方案

实践方案适用于以上提及的三种实现方式,可作为这三种实现方式的一种补充,旨在提升系统吞吐量,但原有实现方式的局限性依然存在。

  • 实时获取方案:顾名思义,每次要获取ID时,实时生成,简单快捷,ID都是连续不间断的,但吞吐量可能不是最高的。
  • 预生成方案:预先生成一批ID放在数据池里,可简单自增长生成,也可以设置步长,分批生成,需要将这些预先生成的数据,放在存储容器里(JVM内存,Redis,数据库表均可以),可以较大幅度地提升吞吐量,但需要开辟临时存储空间,断电宕机后可能会丢失已有IDID也可能有间断。

选择分布式ID的生成方式时,需要特别注意以下几个地方:

  • 全局唯一:必须保证ID是全局唯一的
  • 高可用:无限接近于100%的可用性
  • 高性能:低延时,ID生成响应要快
  • 接入方便:遵循拿来主义原则,在系统设计和实现上要尽可能的简单
  • 长度适中:不要太长,最好64bit,使用long比较好操作。如果是96bit,需要各种移位,相当的不方便,还有可能有些组件不能支持这么大的ID
  • 分片支持:可以控制ShardingId,比如某一个用户的文章要放在同一个分片内,这样查询效率高,修改也容易,实现稍复杂
  • 信息安全:如果ID是连续的,恶意用户的扒取工作就非常容易做了,直接按照顺序下载指定URL即可;如果是订单号就更危险了,竞争对手可以直接知道我们一天的订单量,所以要结合自己的业务场景来考虑

如果系统要求的吞吐量不是极高,个人推荐了解下优雅的Flickr方案,如果系统要求的吞吐量极高,个人推荐了解下美团的(Leaf)项目,由于篇幅有限,这里不做过多展开。

五、分片/表键选择

分片键的定义

分片键即分库分表的拆分字段,是在水平拆分过程中用于生成拆分规则的数据表字段,根据分片键的值将数据表水平拆分到每个分库/分表中。

数据表拆分的首要原则

要尽可能的找到数据表中的数据在业务逻辑上的主体,并确定大部分(或核心的)数据库操作(查、删、改)都是围绕这个主体进行的,那么就可以使用该主体对应的字段作为分片键。

  • 业务逻辑上的主体,通常与业务的应用场景相关,下面的一些典型应用场景都有明确的业务逻辑主体:
    • 面向用户的互联网应用,都是围绕用户维度来做各种操作,那么业务逻辑主体就是用户,可使用用户对应的字段作为分片键
    • 侧重于卖家的电商应用,都是围绕卖家维度来进行各种操作,那么业务逻辑主体就是卖家,可使用卖家对应的字段作为分片键
  • 如果确实找不到合适的业务逻辑主体作为分片键,可以考虑下面的方式来选定分片键:
    • 根据数据分布和访问的均衡度来考虑分片键,尽量将拆分后的每个分表中的数据相对均匀地分布在不同的物理分表中,这比较适用于大量分析型查询的应用场景
    • 按照数字(字符串)类型与时间类型字段相结合作为分片键,这比较适用于日志检索类的应用场景

确定了业务逻辑的主体,但是还有其它的附属主体该怎么办?

比如消息表(message)有以下几个核心字段:

消息表中,两个聊天对象可以唯一的确定一个会话,即一个会话下的消息是一个独立的集合,大部分操作(查、删、改)都是围绕着会话展开的,故可以拿session_id当作业务逻辑的主体,但是,通过message_idguid操作的业务场景也比较多,此时可以考虑通过冗余映射表来辅助操作:

message_message_id_mapping_*message_guid_mapping_*分别是message_*的冗余映射表(ps,以_*结尾的表,表示分表,如message_*表示消息分表;橙色标记的字段代表表的分片键):

  • 当根据session_id查询时,根据指定的分片规则(下文会介绍),可以直接查询message_*
  • 当根据message_id查询时,根据指定的分片规则,可以直接查询message_message_id_mapping_*,拿到session_id后,再根据session_id(根据指定的分片规则)和message_id查询message_*
  • 当根据guid查询时,根据指定的分片规则,可以直接查询message_guid_mapping_*,拿到session_id后,再根据session_id(根据指定的分片规则)和guid查询message_*

如果对查询的实时性要求很高,可以冗余全量数据,即

冗余全量表 VS 冗余映射表

  1. 速度对比:冗余全量表速度更快,冗余映射表需要二次查询,即使有引入缓存,还是多一次网络开销
  2. 存储成本:冗余全量表需要几倍于冗余映射表的存储成本
  3. 维护代价:冗余全量表维护代价更大,涉及到数据变更时,多张表都要进行修改

或许有人注意到,冗余映射表的后缀也加了_*,没错,冗余映射表也要进行分库分表。

或许有人又说冗余映射表也要分库分表,复杂度比较高,可以将冗余映射表的数据全部存到ES,这也是可以的,网上确实有这样的案例,不过在开始之前,要结合系统的业务定位及使用额外组件的复杂度做一份调研,最终选定最合适的方案。

注意: 如果决定将冗余映射分表存储到数据库中,要注意给冗余映射分表的分片键和映射字段(如本例中session_id)建立组合索引,保证查询时,可以用到覆盖索引,加快查询性能。

六、分片算法/规则

常用的分片算法有取模分片、范围分片、hash分片、复合分片,这里不做过多展开,只介绍最常用的两种:取模分片、范围分片。还是拿五、分片/表键选择中的消息表(采用冗余映射表的方式)为例,为了方便介绍,只介绍单库分表的分片算法,分库分表无非多了个分库规则,基本同分表规则。其中session_idmessage_id都是分布式的自增IDguid是随机生成的GUID

故可以进行如下的分片规则:

  • session_id采用取模分片:两个聊天对象可以唯一确定一个session_id,为了保证每个分表数据的相对均匀,message_*采用取模分片算法
  • message_id采用范围分片:message_id是自增的分布式ID,大部分根据message_id的操作(查、删、改)集中于最新的一部分数据,为了兼容操作最近消息的场景及方便扩容,message_言七墨message_id_mapping_*采用范围分片(采用范围分片,大部分操作只会落在最新的几张分表中,操作的分表越少,性能越高)
  • guid采用取模分片:guid没有规律而言,为了保证分表数据的相对均匀,message_guid_mapping_*采用取模分片算法

通过上面的举例,可以看出,选择分片算法要综合考虑分片键的业务场景、字段值的规律、分片后分表的数据分布、后续的扩容…
注意:对于取模分片的分表数量,建议是 2 的 n 次幂,好处是方便动态缩容…对于程序员来说,2 的 n 次幂,总是有那么一些微妙的地方

七、数据迁移方案

由于本人前面经历过两次大的项目迁移工作,均涉及到比较复杂且迁移量比较大的数据迁移,一开始时,就采用Jdbc + 多实例Job + Redis分段 + 多线程的方式去做数据迁移,故分库分表的数据迁移工作也是通过此种方式实现的,屡试不爽。

由于篇幅有限,这里只介绍下大概的思路及核心伪代码:

  1. 通过JDBC分别与七墨博客数据源表和目标表建立数据库连接
  2. 定义方法内线程池,好处是可以动态的控制Job每次执行时的线程数量,防止迁移速度太快把数据库CPU打满(Job执行结束时,记得销毁线程池)
  3. 多实例Job为了防止重复迁移同一批数据,采用的是Redisincr将数据分段(执行时,每个实例的每个线程会在Redis中取一个偏移值,然后根据偏移值和事先定义好的的每次的迁移数据量计算出要迁移的数据段)
  4. 定义迁移Task接口,方便后续的扩展,每次有迁移或修复任务时,只需要重点关注querySqlexecSql
  5. 定义接口,职能是控制Job的开关和迁移的偏移量,这个要与Redis结合
  6. 适用场景:线上不停机进行数据迁移、脏数据清理、更换数据库(eg,Qracle >> MySQL)、数据修复等操作,且支持跨数据库实例操作
  7. 性能:视具体数据库性能而定(只要数据库性能足够好,迁移性能就可以足够高)
  8. 核心伪代码如下:
/**
 * Job 处理器
 */
@Scheduled(fixedRate = 1000 * 10)
public void process() {
    // 省略 Job 执行的开关
    // 下面没有出现声明的变量取的都是 Apollo 的配置
    ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
    CountDownLatch latch = new CountDownLatch(threadNum);
    try {
        for (int i = 0; i < threadNum; i++) {
            executorService.submit(() -> {
                Long solveCount = qps / threadNum * 60L;
                Long offset = redisService.incr(MIGRATION_INCR);
                // offset == 1L 表示刚开始迁移,当前ID需要设置为0
                Long currId = offset == 0L ? 0L : (offset - 1) * solveCount; // 当前ID
                Long idLe = offset * solveCount;
                // 执行数据迁移
                executeMigrate(commLog, offset, currId, idLe);
                latch.countDown();
            });
        }
        latch.await();
    } catch (InterruptedException e) {
        log.error(commLog + "整体异常【offset: %s】", e);
    } finally {
        executorService.shutdown();
        // 省略一些统计执行消耗时间的日志打印
    }
}

/**
 * 数据迁移执行器
 */
public void executeMigrate(String target, Long offset, Long currId, Long idLe, Integer everyCommitCount) {
    Connection sourceConn = null;
    Connection messageConn = null;
    Statement stmt = null;
    ResultSet res = null;
    PreparedStatement pstmt = null;
    Long count = 0L; // 迁移数量
    try {
        IDataMigrateReady ready = transferringService.toReady(target);
        Integer sourceFlag = ready.getSourceFlag(target);
        sourceConn = sourceFlag == 0 ? conn.getMessageConn() : conn.getXiaoxingConn();
        messageConn = conn.getMessageConn();
        messageConn.setAutoCommit(false);
        String querySql = task.getQuerySql(target, currId, idLe);
        stmt = sourceConn.createStatement();
        long readStartTime = System.currentTimeMillis();
        stmt.execute(querySql); // 执行sql
        res = stmt.getResultSet();
        String execSql = task.getExecSql();
        pstmt = messageConn.prepareStatement(execSql); // 预编译
        int num = 0;
        while (res.next()) {
            num++;
            count++;
            for (int i = 1; i <= ready.getFieldNum(); i++) {
                pstmt.setObject(i, res.getObject("t" + i));
            }
            pstmt.addBatch();
            // 每everyCommitCount条数据提交一次事务
            if (num > everyCommitCount) {
                long executeStartTime = System.currentTimeMillis();
                pstmt.executeBatch();
                messageConn.commit();
                pstmt.clearBatch();
                num = 0;
            }
        }
        pstmt.executeBatch();
        messageConn.commit();
    } catch (Exception e) {
        log.error(String.format("%s ==> 错误,【offset: %s】", target, offset), e);
    } finally {
        conn.closeConn(sourceConn, null, messageConn, stmt, res, pstmt);
        // 省略一些统计执行消耗时间的日志打印
    }
}

/**
 * 数据迁移 Task 的接口定义
 */
public interface IDataMigrateTask {

    /**
     * 获取 查询SQL
     */
    String getQuerySql(String target, Long currId, Long idLe);

    /**
     * 获取 插入SQL
     */
    String getExecSql();

    /**
     * 获取涉及到的字段的数目
     */
    default Integer getFieldNum() {
        return 0;
    }

    /**
     * 获取源标识
     * <p>
     *     0:sourceConn
     *     1:targetConn
     */
    default Integer getSourceFlag(String target) {
        return 0;
    }

}

/**
 * 分布式 Long 值自增
 */
@Override
public Long incr(String key) {
    RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
    return entityIdCounter.getAndIncrement();
}

上面只列出了迁移的核心代码,如果将一张大表的数据同时迁移到各个拆分好的分表及冗余映射表中,还要做一些封装~

执行效率案例:

  • 环境:4核16GMySQL5.7实例
  • 操作:业务高峰期,4个服务实例分别开辟一个含有4个线程的线程池,同时往76张分表中一共迁移2.4亿数据,大约花费5个小时
  • 说明:源表总数据量是2.4亿七墨博客由于目标表存在映射分表,故迁移的总数据量远超2.4亿

缺点: 需要自己实现迁移的代码(核心迁移的执行器代码只需要实现一次,在每次数据迁移时,迁移的Task需要自己实现,总之,只要封装的足够好,后续Task的开发工作就足够简单。重复造轮子,还说出了优越感…)

如果您有更好的迁移方案或者市面上有更好的迁移方案,也请分享一下~

八、无限扩容方案

继续拿五、分片/表键选择中的消息表(采用冗余映射表的方式)为例介绍下最常规的扩容方案:Double 扩容方案

具体操作如下(假设已有2张分表message_0、message_1,要double扩容为message_0、message_1、message_2、message_3):

  1. 运维层面:新增两个分表message_2、message_3,通过MySQLgh-ost工具同步数据:
    • message_0复制到message_2
    • message_1复制到message_3
  2. 待 步骤1 中的数据完全追平后,开始调整分片规则并使之生效:
    • 原 session_id % 2 = 0 => message_0 改为 session_id % 4 = 0 => message_0、session_id % 4 = 2 => message_2
    • 原 session_id % 2 = 1 => message_1 改为 session_id % 4 = 1 => message_1、session_id % 4 = 3 => message_3
  3. 新的分片规则生效后,关闭gh-ost工具同步工作;
  4. 此时,四个分表的数据都已完整,只是有冗余数据(多存了和自己配对的节点的那部分数据),择机清除即可
  • 优点: 无需停机
  • 缺点: 多个分表之间数据有冗余的情况,需要找机会删除冗余的数据(可以通过上面提到的Jdbc + 多实例Job + Redis分段 + 多线程清理)
  • message_*message_guid_mapping_*都是采用的取模分片算法,都可以采用上述的Double扩容方案。
  • message_message_id_mapping_*采用的是范围分片算法,可以找运维同事写个脚本在最后一张分表的容量快用光时,提前创建分表,也可以通过代码去实现,需要注意的是分片规则也要支持自动感知及修改
  • 为了保证业务SQL的执行效率,每个分表的数据容量最好限定在2000万以内,如果超过了2000万,就要开始进行扩容操作。
  • 首次分表时,首先要对线上的业务数据做下统计分析,根据事先指定好的分片规则,计算出每张分表的容量,并根据每天、每月的数据增长量估算出下次扩容的时间。首次分表,推荐每张分表的数据量在1000万以内。

九、总结

本文主要介绍了下分库分表的几个核心流程,分库分表设计时,只要将这几个核心流程设计好,基本就已经完成了一半的工作,另一半的工作就是方案的落地,可参考:

限于篇幅,一些流程只介绍了下常规操作,大家也可以根据本文的流程标题自行搜索,这样可以更加全面的认识并掌握分库分表。如果有的地方介绍有误或者您有更好的解决办法,也请留言,大家一起交流,谢谢~

0 条回应