背景
前面写过一篇通过 Jdbc + Job 解决跨库大表数据迁移,那只是个初始版本,后面对其进
优化点
- 单实例 Job 优化为多实例 Job
- 单线程优化为多线程
- 普通偏移优化为分段偏移
代码
下面是主要代码,其它代码可参考通过 Jdbc + Job 解决跨库大表数据迁移:
/**
* 表数据迁移JOB
*
* @author qimok
* @since 2020-1-17
*/
@Component
@Slf4j(system = SERVICE_NAME, module = DATA_MIGRATE_JOB)
@SuppressWarnings({"checkstyle:linelength", "checkstyle:magicnumber", "checkstyle:methodlength", "PMD"})
public class OldTableMigrateJob {
@Value("${old.table.migration.enabled}")
private Boolean enabled;
@Value("${migration.qps}")
private Integer qps;
@Value("${migration.every.commit.count}")
private Integer everyCommitCount;
@Value("${migration.thread.num}")
private Integer threadNum;
@Autowired
private RedisService redisService;
@Autowired
private JdbcConnection conn;
/**
* 按照privdoc库的表正序迁移,不重复
* <p>
* 每10秒执行一次
*/
@Scheduled(fixedRate = 1000 * 10)
public void process() {
if (enabled == null || !enabled && !"1".equals(redisService.getFromString(MIGRATION_SWITCH))) {
return;
}
long beginTime = System.currentTimeMillis();
String commLog = "数据迁移>>开始时间(" + OffsetDateTime.now().toLocalDateTime() + ")";
ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
CountDownLatch latch = new CountDownLatch(threadNum);
try {
for (int i = 0; i < threadNum; i++) {
executorService.submit(() -> {
Long solveCount = qps / threadNum * 60L;
Long offset = redisService.incr(MIGRATION_INCR);
// offset == 1L 表示刚开始迁移,当前ID需要设置为0
Long currId = offset == 0L ? 0L : (offset - 1) * solveCount; // 当前ID
Long idLe = offset * solveCount;
executeMigrate(commLog, offset, currId, idLe);
latch.countDown();
});
}
latch.await();
} catch (InterruptedException e) {
log.error(commLog + "整体异常【offset: %s】", e);
} finally {
long endTime = System.currentTimeMillis();
log.info(String.format(commLog + "线程数量:%s 个,总花费时长:%s 秒",
threadNum, String.format("%.4f", (endTime - beginTime) / 1000d)));
executorService.shutdown();
}
}
private void executeMigrate(String commLog, Long offset, Long currId, Long idLe) {
commLog = String.format("%s【offset: %s,处理的ID的范围:(%s, %s]】->", commLog, offset, currId, idLe);
log.info(commLog + "开始...");
long startTime = System.currentTimeMillis();
Connection connOld = null;
Connection connNew = null;
Statement stmt = null;
ResultSet resFromOld = null;
PreparedStatement pstmt = null;
Long currMaxId = 0L; // 当前最大的ID
Long count = 0L; // 迁移数量
try {
connOld = conn.getOldConn();
connNew = conn.getNewConn();
connNew.setAutoCommit(false);
String oldSql = "SELECT a.id, LOWER(CONCAT(\n"
+ " SUBSTR(HEX(a.guid), 1, 8), '-',\n"
+ " SUBSTR(HEX(a.guid), 9, 4), '-',\n"
+ " SUBSTR(HEX(a.guid), 13, 4), '-',\n"
+ " SUBSTR(HEX(a.guid), 17, 4), '-',\n"
+ " SUBSTR(HEX(a.guid), 21)\n"
+ " )) as guid\n"
+ "..."
+ "a.created, a.created\n"
+ " FROM old.old_table a left join old.extra b\n"
+ " on a.id = b.deliverId " + "where a.id > " + currId + " and a.id <= " + idLe;
String newSql = "INSERT IGNORE INTO new.new_table (id, guid, ..., created, updated) "
+ "VALUES (?, ?, ..., ?, ?)";
stmt = connOld.createStatement();
long readStartTime = System.currentTimeMillis();
stmt.execute(oldSql); // 执行sql
long readEndTime = System.currentTimeMillis();
log.info(String.format(commLog + "读花费时长:%s 毫秒", readEndTime - readStartTime));
resFromOld = stmt.getResultSet(); // 获取结果集;
pstmt = connNew.prepareStatement(newSql); // 预编译
int num = 0;
long writeStartTime = System.currentTimeMillis();
while (resFromOld.next()) {
num++;
count++;
pstmt.setLong(1, resFromOld.getLong("id"));
pstmt.setString(2, resFromOld.getString("guid"));
...
...
...
pstmt.setTimestamp(16, resFromOld.getTimestamp("created"));
pstmt.setTimestamp(17, resFromOld.getTimestamp("created"));
pstmt.addBatch();
// 每everyCommitCount条数据提交一次事务
if (num > everyCommitCount) {
long executeStartTime = System.currentTimeMillis();
pstmt.executeBatch();
connNew.commit();
pstmt.clearBatch();
long executeEndTime = System.currentTimeMillis();
log.info(String.format(commLog + "每组执行花费时长(每组执行 %s 条数据):%s 毫秒", everyCommitCount,
executeEndTime - executeStartTime));
num = 0;
}
currMaxId = Math.max(resFromOld.getLong("id"), currMaxId);
}
pstmt.executeBatch();
connNew.commit();
long writeEndTime = System.currentTimeMillis();
log.info(String.format(commLog + "写花费时长:%s 秒",
String.format("%.4f", (writeEndTime - writeStartTime) / 1000d)));
} catch (Exception e) {
log.error(commLog + "错误", e);
} finally {
conn.closeConn(connOld, null, connNew, stmt, resFromOld, pstmt);
long endTime = System.currentTimeMillis(); // 记录程序结束时间
log.info(String.format(commLog + "单次总共扫描了:%s 条数据,本次redis中存的迁移偏移ID: %s, "
+ "本次迁移的最大的ID:%s,消耗时长:%s 秒",
count, currId, currMaxId, String.format("%.4f", (endTime - startTime) / 1000d)));
}
}
}
@Override
public Long incr(String key) {
RedisAtomicLong entityIdCounter = new RedisAtomicLong(key, redisTemplate.getConnectionFactory());
return entityIdCounter.getAndIncrement();
}