前言
将 scala 项目重构为 Java 微服务,在进行读迁移时,其中有一块业务数据量是亿级别的,需要进行跨库数据迁移(注意:表结构发生了很大变化),并且在迁移过程中尽量做到不停服迁移。
分析
针对亿级别的数据量级,肯定不能使用脚本迁移的方式迁移,更不可能使用 mysqldump 命令进行导入导出进行数据迁移。网上还有以导入导出文件的形式迁移百万级数据,但是这个操作需要修改 mysql 的配置,保险起见,直接 pass …最终,选择通过 Jdbc 配合 Jo
方案
- 确定一个读迁移前的上线点:这个上线点前提是保证已有写接口不会受到任何影响(因为此次读迁移涉及到对写接口的性能优化),上线前,需要做好写接口的回归测试,上线时,也需要对写相关的的业务进行回归测试,这个上线点还是处于双写阶段
- 确定此次上线的内容:读相关接口、通过 Jdbc + Job 进行数据迁移代码
https://qimok.cn - 迁移数据的几个 Job 都需要设置开关及迁移的 QPS
- 按照优
https://qimok.cn 先级依次打开 Job 的开关
- 数据一致性校验:由于数据量特别大,主要验证数据量是否正确、数据内容进行抽样检查、针对近一个月的数据进行 md5 验证等等
- 读迁移上线:待数据完全持平与老表的数据后,关闭所有迁移 Job 的开关,开始读迁移
实现
- 建立数据库连接
@Override
public Connection getConnection(String url, String username, String password) {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
Connection conn = null;
try {
url += "&serverTimezone=Asia/Shanghai&useServerPrepStmts=false&useSSL=true";
conn = DriverManager.getConnection(url, username, password);
} catch (SQLException e) {
e.printStackTrace();
}
return conn;
}
- 通过 Jdbc + Job 进行数据迁移
/**
* 数据迁移JOB
*
* @author qimok
* @since 2020-1-15
*/
@Component
@Slf4j
@SuppressWarnings({"checkstyle:linelength", "checkstyle:magicnumber", "checkstyle:methodlength", "PMD"})
public class OldTableMigrateJob extends AbstractSimpleElasticJob {
@Value("${old.table.migration.enabled}")
private Boolean enabled;
@Value("${db.old.table.url}")
private String oldTableUrl;
@Value("${db.old.table.username}")
private String oldTableUsername;
@Value("${db.old.table.password}")
private String oldTablePassword;
@Value("${db.new.table.url}")
private String newTableUrl;
@Value("${db.new.table.username}")
private String newTableUsername;
@Value("${db.new.table.password}")
private String newTablePassword;
@Value("${migration.qps}")
private Integer qps;
@Value("${migration.every.commit.count}")
private Integer everyCommitCount;
private static final String OLD_TABLE_MIGRATION = "old:table:migration";
@Autowired
private RedisService redisService;
@Autowired
private JdbcService jdbcService;
/**
* 按照 old_table 表的 ID 正序迁移,不重复
*/
@Override
public void process(JobExecutionMultipleShardingContext shardingContext) {
if (!enabled) {
return;
}
long solveCount = qps * 2 * 60;
long startTime = System.currentTimeMillis(); // 记录开始时间
Connection connOld = null;
Connection connNew = null;
Statement stmtOld = null;
ResultSet resFromOld = null;
PreparedStatement pstmt = null;
long currId = 0; // 当前ID
long currMaxId = 0; // 当前最大的ID
long count = 0; // 迁移数量
try {
String value = redisService.getFromString(OLD_TABLE_MIGRATION);
if (StringUtils.isNotBlank(value)) {
currId = Integer.valueOf(value);
} else {
redisService.setToString(OLD_TABLE_MIGRATION, "0");
}
connOld = jdbcService.getConnection(oldTableUrl, oldTableUsername, oldTablePassword); // 连接 old_table 数据库
connNew = jdbcService.getConnection(newTableUrl, newTableUsername, newTablePassword); // 连接 new_table 数据库
connNew.setAutoCommit(false); // 设置手动提交
String oldSql = "SELECT a.id, LOWER(CONCAT(\n"
+ " SUBSTR(HEX(a.guid), 1, 8), '-',\n"
+ " SUBSTR(HEX(a.guid), 9, 4), '-',\n"
+ " SUBSTR(HEX(a.guid), 13, 4), '-',\n"
+ " SUBSTR(HEX(a.guid), 17, 4), '-',\n"
+ " SUBSTR(HEX(a.guid), 21)\n"
+ " )) as guid\n"
+ "..."
+ "a.created, a.created\n"
+ " FROM old.old_table a left join old.extra b\n"
+ " on a.id = b.deliverId where a.id > " + currId + " limit " + solveCount;
String newSql = "INSERT IGNORE INTO new.new_table (id, guid, ..., created, updated) "
+ "VALUES (?, ?, ..., ?, ?)";
stmtOld = connOld.createStatement();
stmtOld.execute(oldSql); // 执行 sql
resFromOld = stmtOld.getResultSet(); // 获取 old_table 结果集;
pstmt = connNew.prepareStatement(newSql); // 预编译
int num = 0;
while (resFromOld.next()) {
num++;
count++;
pstmt.setLong(1, resFromOld.getLong("id"));
pstmt.setString(2, resFromOld.getString("guid"));
...
...
...
pstmt.setTimestamp(16, resFromOld.getTimestamp("created"));
pstmt.setTimestamp(17, resFromOld.getTimestamp("created"));
pstmt.addBatch();
// 每 everyCommitCount 条数据提交一次事务
if (num > everyCommitCount) {
pstmt.executeBatch();
connNew.commit();
num = 0;
}
currMaxId = Math.max(resFromOld.getLong("id"), currMaxId);
}
pstmt.executeBatch();
connNew.commit();
} catch (Exception e) {
try {
assert connNew != null;
connNew.rollback();
} catch (SQLException e1) {
e1.printStackTrace();
}
e.printStackTrace();
} finally {
if (currMaxId != 0) {
redisService.setToString(OLD_TABLE_MIGRATION, String.valueOf(currMaxId)); // 当前最大ID作为下次执行的偏移位置
}
jdbcService.close(connOld, connNew, stmtOld, resFromOld, pstmt);
long endTime = System.currentTimeMillis(); // 记录程序结束时间
log.info(String.format("old_table 数据迁移--->本次总共扫描了:%s 条数据,本次迁移的最大的ID:%s,消耗时长:%s 秒",
count, currMaxId, (endTime - startTime) / 1000));
}
}
}
数据迁移有可能涉及
- 关闭数据库连接
@Override
public void close(Connection connOld, Connection connNew, Statement stmtOld, ResultSet resFromOld,
PreparedStatement pstmt) {
if (connOld != null) {
try {
connOld.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (connNew != null) {
try {
connNew.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (stmtOld != null) {
try {
stmtOld.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (resFromOld != null) {
try {
resFromOld.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if (pstmt != null) {
try {
pstmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
参考
- https://www.w3cschool.cn/architectroad/architectroad-data-smooth-migration.html
- https://blog.csdn.n
https://qimok.cn et/superPojo/article/details/78749113 - https://www.cnblogs.com/tommy-huang/p/4540407.html