Java 开发 / 数据库

通过 Jdbc + Job 解决跨库大表数据迁移

言七墨 · 1月27日 · 2020年 · · 504次已读

前言

将 scala 项目重构为 Java 微服务,在进行读迁移时,其中有一块业务数据量是亿级别的,需要进行跨库数据迁移(注意:表结构发生了很大变化),并且在迁移过程中尽量做到不停服迁移。

分析

针对亿级别的数据量级,肯定不能使用脚本迁移的方式迁移,更不可能使用 mysqldump 命令进行导入导出进行数据迁移。网上还有以导入导出文件的形式迁移百万级数据,但是这个操作需要修改 mysql 的配置,保险起见,直接 pass …最终,选择通过 Jdbc 配合 Job 在双写阶段进行数据迁移,待数据迁移基本完毕,并且数据一致性校验没问题后,再进行读迁移的上线(此时,只需要业务方上线,将读切换到现在的微服务即可)。

方案

  • 确定一个读迁移前的上线点:这个上线点前提是保证已有写接口不会受到任何影响(因为此次读迁移涉及到对写接口的性能优化),上线前,需要做好写接口的回归测试,上线时,也需要对写相关的的业务进行回归测试,这个上线点还是处于双写阶段
  • 确定此次上线的内容:读相关接口、通过 Jdbc + Job 进行数据迁移代码
    • 迁移数据的几个 Job 都需要设置开关及迁移的 QPS
    • 按照优先级依次打开 Job 的开关
  • 数据一致性校验:由于数据量特别大,主要验证数据量是否正确、数据内容进行抽样检查、针对近一个月的数据进行 md5 验证等等
  • 读迁移上线:待数据完全持平与老表的数据后,关闭所有迁移 Job 的开关,开始读迁移

实现

  • 建立数据库连接
@Override
public Connection getConnection(String url, String username, String password) {
    try {
        Class.forName("com.mysql.jdbc.Driver");
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    }
    Connection conn = null;
    try {
        url += "&serverTimezone=Asia/Shanghai&useServerPrepStmts=false&useSSL=true";
        conn = DriverManager.getConnection(url, username, password);
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return conn;
}
  • 通过 Jdbc + Job 进行数据迁移
/**
 * 数据迁移JOB
 *
 * @author qimok
 * @since 2020-1-15
 */
@Component
@Slf4j
@SuppressWarnings({"checkstyle:linelength", "checkstyle:magicnumber", "checkstyle:methodlength", "PMD"})
public class OldTableMigrateJob extends AbstractSimpleElasticJob {

    @Value("${old.table.migration.enabled}")
    private Boolean enabled;

    @Value("${db.old.table.url}")
    private String oldTableUrl;

    @Value("${db.old.table.username}")
    private String oldTableUsername;

    @Value("${db.old.table.password}")
    private String oldTablePassword;

    @Value("${db.new.table.url}")
    private String newTableUrl;

    @Value("${db.new.table.username}")
    private String newTableUsername;

    @Value("${db.new.table.password}")
    private String newTablePassword;

    @Value("${migration.qps}")
    private Integer qps;

    @Value("${migration.every.commit.count}")
    private Integer everyCommitCount;

    private static final String OLD_TABLE_MIGRATION = "old:table:migration";

    @Autowired
    private RedisService redisService;

    @Autowired
    private JdbcService jdbcService;

    /**
     * 按照 old_table 表的 ID 正序迁移,不重复
     */
    @Override
    public void process(JobExecutionMultipleShardingContext shardingContext) {
        if (!enabled) {
            return;
        }
        long solveCount = qps * 2 * 60;
        long startTime = System.currentTimeMillis(); // 记录开始时间
        Connection connOld = null; 
        Connection connNew = null; 
        Statement stmtOld = null;
        ResultSet resFromOld = null;
        PreparedStatement pstmt = null;
        long currId = 0; // 当前ID
        long currMaxId = 0; // 当前最大的ID
        long count = 0; // 迁移数量
        try {
            String value = redisService.getFromString(OLD_TABLE_MIGRATION);
            if (StringUtils.isNotBlank(value)) {
                currId = Integer.valueOf(value);
            } else {
                redisService.setToString(OLD_TABLE_MIGRATION, "0");
            }
            connOld = jdbcService.getConnection(oldTableUrl, oldTableUsername, oldTablePassword); // 连接 old_table 数据库
            connNew = jdbcService.getConnection(newTableUrl, newTableUsername, newTablePassword); // 连接 new_table 数据库
            connNew.setAutoCommit(false); // 设置手动提交
            String oldSql = "SELECT a.id,  LOWER(CONCAT(\n"
                    + "    SUBSTR(HEX(a.guid), 1, 8), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 9, 4), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 13, 4), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 17, 4), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 21)\n"
                    + "  )) as guid\n"
                    + "..."
                    + "a.created, a.created\n"
                    + "    FROM old.old_table a left join old.extra b\n"
                    + "    on a.id = b.deliverId where a.id > " + currId + " limit " + solveCount;

            String newSql = "INSERT IGNORE INTO new.new_table (id, guid, ..., created, updated) "
                    + "VALUES (?, ?, ..., ?, ?)";
            stmtOld = connOld.createStatement();
            stmtOld.execute(oldSql); // 执行 sql
            resFromOld = stmtOld.getResultSet(); // 获取 old_table 结果集;
            pstmt = connNew.prepareStatement(newSql); // 预编译
            int num = 0;
            while (resFromOld.next()) {
                num++;
                count++;
                pstmt.setLong(1, resFromOld.getLong("id"));
                pstmt.setString(2, resFromOld.getString("guid"));
                ...
                ...
                ...
                pstmt.setTimestamp(16, resFromOld.getTimestamp("created"));
                pstmt.setTimestamp(17, resFromOld.getTimestamp("created"));
                pstmt.addBatch();
                // 每 everyCommitCount 条数据提交一次事务
                if (num > everyCommitCount) {
                    pstmt.executeBatch();
                    connNew.commit();
                    num = 0;
                }
                currMaxId = Math.max(resFromOld.getLong("id"), currMaxId);
            }
            pstmt.executeBatch();
            connNew.commit();
        } catch (Exception e) {
            try {
                assert connNew != null;
                connNew.rollback();
            } catch (SQLException e1) {
                e1.printStackTrace();
            }
            e.printStackTrace();
        } finally {
            if (currMaxId != 0) {
                redisService.setToString(OLD_TABLE_MIGRATION, String.valueOf(currMaxId)); // 当前最大ID作为下次执行的偏移位置
            }
            jdbcService.close(connOld, connNew, stmtOld, resFromOld, pstmt);
            long endTime = System.currentTimeMillis(); // 记录程序结束时间
            log.info(String.format("old_table 数据迁移--->本次总共扫描了:%s 条数据,本次迁移的最大的ID:%s,消耗时长:%s 秒",
                    count, currMaxId, (endTime - startTime) / 1000));
        }
    }
}

数据迁移有可能涉及到某些字段的批量更新操作,基本同上面代码,只是将上面的插入语句,改为更新语句。

  • 关闭数据库连接
@Override
public void close(Connection connOld, Connection connNew, Statement stmtOld, ResultSet resFromOld,
                  PreparedStatement pstmt) {
    if (connOld != null) {
        try {
            connOld.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    if (connNew != null) {
        try {
            connNew.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    if (stmtOld != null) {
        try {
            stmtOld.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    if (resFromOld != null) {
        try {
            resFromOld.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
    if (pstmt != null) {
        try {
            pstmt.close();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }
}

参考

  • https://www.w3cschool.cn/architectroad/architectroad-data-smooth-migration.html
  • https://blog.csdn.net/superPojo/article/details/78749113
  • https://www.cnblogs.com/tommy-huang/p/4540407.html
0 条回应