Java 开发 / 数据库

[升级版]通过 Jdbc + Job 解决跨库大表数据迁移

言七墨 · 3月20日 · 2020年 · · 394次已读

背景

言七墨

前面写过一篇通过 Jdbc + Job 解决跨库大表数据迁七墨博客,那只是个初始版本,后面对其进行了优化改造,数据迁移性能大幅度提升。

优化点

  • 单实例 Job 优化为多实例 Job
  • 单线程优化为多线程
  • 普通偏移优化七墨博客为分段偏移

代码

下面是主要代码,其它代码可参考通过 Jdbc + Job 解决跨库七墨博客大表数据迁移

/**
 * 表数据迁移JOB
 *
 * @author qimok
 * @since 2020-1-17
 */
@Component
@Slf4j(system = SERVICE_NAME, module = DATA_MIGRATE_JOB)
@SuppressWarnings({"checkstyle:linelength", "checkstyle:magicnumber", "checkstyle:methodlength", "PMD"})
public class OldTableMigrateJob {

    @Value("${old.table.migration.enabled}")
    private Boolean enabled;

    @Value("${migration.qps}")
    private Integer qps;

    @Value("${migration.every.commit.count}")
    private Integer everyCommitCount;

    @Value("${migration.thread.num}")
    private Integer threadNum;

    @Autowired
    private RedisService redisService;

    @Autowired
    private JdbcConnection conn;

    /**
     * 按照privdoc库的表正序迁移,不重复
     * <p>
     *     每10秒执行一次
     */
    @Scheduled(fixedRate = 1000 * 10)
    public void process() {
        if (enabled == null || !enabled && !"1".equals(redisService.getFromString(MIGRATION_SWITCH))) {
            return;
        }
        long beginTime = System.currentTimeMillis();
        String commLog = "数据迁移>>开始时间(" + OffsetDateTime.now().toLocalDateTime() + ")";
        ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
        CountDownLatch latch = new CountDownLatch(threadNum);
        try {
            for (int i = 0; i < threadNum; i++) {
                executorService.submit(() -> {
                    Long solveCount = qps / threadNum * 60L;
                    Long offset = redisService.incr(MIGRATION_INCR);
                    // offset == 1L 表示刚开始迁移,当前ID需要设置为0
                    Long currId = offset == 0L ? 0L : (offset - 1) * solveCount; // 当前ID
                    Long idLe = offset * solveCount;
                    executeMigrate(commLog, offset, currId, idLe);
                    latch.countDown();
                });
            }
            latch.await();
        } catch (InterruptedException e) {
            log.error(commLog + "整体异常【offset: %s】", e);
        } finally {
            long endTime = System.currentTimeMillis();
            log.info(String.format(commLog + "线程数量:%s 个,总花费时长:%s 秒",
                    threadNum, String.format("%.4f", (endTime - beginTime) / 1000d)));
            executorService.shutdown();
        }
    }

    private void executeMigrate(String commLog, Long offset, Long currId, Long idLe) {
        commLog = String.format("%s【offset: %s,处理的ID的范围:(%s, %s]】->", commLog, offset, currId, idLe);
        log.info(commLog + "开始...");
        long startTime = System.currentTimeMillis();
        Connection connOld = null;
        Connection connNew = null;
        Statement stmt = null;
        ResultSet resFromOld = null;
        PreparedStatement pstmt = null;
        Long currMaxId = 0L; // 当前最大的ID
        Long count = 0L; // 迁移数量
        try {
            connOld = conn.getOldConn();
            connNew = conn.getNewConn();
            connNew.setAutoCommit(false);
            String oldSql = "SELECT a.id,  LOWER(CONCAT(\n"
                    + "    SUBSTR(HEX(a.guid), 1, 8), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 9, 4), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 13, 4), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 17, 4), '-',\n"
                    + "    SUBSTR(HEX(a.guid), 21)\n"
                    + "  )) as guid\n"
                    + "..."
                    + "a.created, a.created\n"
                    + "    FROM old.old_table a left join old.extra b\n"
                    + "    on a.id = b.deliverId " + "where a.id > " + currId + " and a.id <= " + idLe;
            String newSql = "INSERT IGNORE INTO new.new_table (id, guid, ..., created, updated) "
                    + "VALUES (?, ?, ..., ?, ?)";
            stmt = connOld.createStatement();
            long readStartTime = System.currentTimeMillis();
            stmt.execute(oldSql); // 执行sql
            long readEndTime = System.currentTimeMillis();
            log.info(String.format(commLog + "读花费时长:%s 毫秒", readEndTime - readStartTime));
            resFromOld = stmt.getResultSet(); // 获取结果集;
            pstmt = connNew.prepareStatement(newSql); // 预编译
            int num = 0;
            long writeStartTime = System.currentTimeMillis();
            while (resFromOld.next()) {
                num++;
                count++;
                pstmt.setLong(1, resFromOld.getLong("id"));
                pstmt.setString(2, resFromOld.getString("guid"));
                ...
                ...
                ...
                pstmt.setTimestamp(16, resFromOld.getTimestamp("created"));
                pstmt.setTimestamp(17, resFromOld.getTimestamp("created"));
                pstmt.addBatch();
                // 每everyCommitCount条数据提交一次事务
                if (num > everyCommitCount) {
                    long executeStartTime = System.currentTimeMillis();
                    pstmt.executeBatch();
                    connNew.commit();
                    pstmt.clearBatch();
                    long executeEndTime = System.currentTimeMillis();
                    log.info(String.format(commLog + "每组执行花费时长(每组执行 %s 条数据):%s 毫秒", everyCommitCount,
                            executeEndTime - executeStartTime));
                    num = 0;
                }
                currMaxId = Math.max(resFromOld.getLong("id"), currMaxId);
            }
            pstmt.executeBatch();
            connNew.commit();
            long writeEndTime = System.currentTimeMillis();
            log.info(String.format(commLog + "写花费时长:%s 秒",
                    String.format("%.4f", (writeEndTime - writeStartTime) / 1000d)));
        } catch (Exception e) {
            log.error(commLog + "错误", e);
        } finally {
            conn.closeConn(connOld, null, connNew, stmt, resFromOld, pstmt);
            long endTime = System.currentTimeMillis(); // 记录程序结束时间
            log.info(String.format(commLog + "单次总共扫描了:%s 条数据,本次redis中存的迁移偏移ID: %s, "
                            + "本次迁移的最大的ID:%s,消耗时长:%s 秒",
                    count, currId, currMaxId, String.format("%.4f", (endTime - startTime) / 1000d)));
        }
    }
}
@Override
public Long incr(String key) {
    RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
    return entityIdCounter.getAndIncrement();
}
言七墨

GitHub 地址:DataMigrate

0 条回应