需求
目前,有一个基于自增message_id
的范围分片表(message_id_mapping
),如何动态地扩展分表并刷新分表的配置(ActualDataNodes
)?
开发
- 范围分片配置
# message_id_mapping表的路由策略
spring.shardingsphere.sharding.tables.message_id_mapping.actual-data-nodes = shardingds.message_id_mapping
spring.shardingsphere.sharding.tables.message_id_mapping.table-strategy.standard.sharding-column = message_id
spring.shardingsphere.sharding.tables.message_id_mapping.table-strategy.standard.precise-algorithm-class-name = com.xxx.yyy.persistence.config.MessageIdShardingAlgorithm
spring.shardingsphere.sharding.tables.message_id_mapping.table-strategy.standard.range-algorithm-class-name = com.xxx.yyy.persistence.config.MessageIdShardingAlgorithm
model
类
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Optional;
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ModelConstants {
/**
* message_id_mapping 逻辑表名
*/
public static final String MESSAGE_ID_MAPPING = "message_id_mapping";
/**
* message_id_mapping 分表前缀
*/
public static final String MESSAGE_ID_MAPPING_PREFIX = "message_id_mapping_";
/**
* message_id_mapping 单分片表数量
*/
public static final Long MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY = 20000000L;
}
- 范围分片算法类
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingValue;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING_PREFIX;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
/**
* 范围分片算法
*
* @author qimok
* @since 2020-09-07
*/
public class MessageIdShardingAlgorithm implements PreciseShardingAlgorithm<Long>, RangeShardingAlgorithm<Long> {
@Override
public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
Long value = shardingValue.getValue();
Long subTableSuffix = value / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
if (availableTargetNames.contains(MESSAGE_ID_MAPPING_PREFIX + subTableSuffix)) {
return MESSAGE_ID_MAPPING_PREFIX + subTableSuffix;
} else {
throw new UnsupportedOperationException();
}
}
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Long> shardingValue) {
Set<String> result = new LinkedHashSet<>();
Boolean isHasLowerBound = shardingValue.getValueRange().hasLowerBound();
Boolean isHasUpperBound = shardingValue.getValueRange().hasUpperBound();
Long lowerSubTableSuffix = isHasLowerBound
? shardingValue.getValueRange().lowerEndpoint() / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY : 0L;
Long upperSubTableSuffix = isHasUpperBound
? shardingValue.getValueRange().upperEndpoint() / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY
: availableTargetNames.size() - 1;
Long offset = lowerSubTableSuffix;
while (offset <= upperSubTableSuffix) {
if (availableTargetNames.contains(MESSAGE_ID_MAPPING_PREFIX + offset)) {
result.add(MESSAGE_ID_MAPPING_PREFIX + offset);
} else {
throw new UnsupportedOperationException();
}
offset++;
}
return result;
}
}
- 动态分表配置类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* 动态分表配置
*
* @author qimok
* @since 2020-09-07
*/
@ConfigurationProperties(prefix = "dynamic.table")
@Data
public class DynamicTablesProperties {
String[] names;
}
- 动态刷新
actualDataNodes
类
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.xxx.zzz.core.biz.XxxService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.core.config.ShardingConfigurationException;
import org.apache.shardingsphere.core.rule.DataNode;
import org.apache.shardingsphere.core.rule.ShardingRule;
import org.apache.shardingsphere.core.rule.TableRule;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.commons.collections.CollectionUtils.isNotEmpty;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
/**
* 基于范围分表的 ActualDataNodes 动态刷新JOB
*
* @author qimok
* @since 2020-09-07
*/
@Slf4j
@Component
@EnableConfigurationProperties(DynamicTablesProperties.class)
public class ShardingTableRuleActualDataNodesRefreshJob {
@Autowired
private DynamicTablesProperties dynamicTables;
@Resource(name = "shardingDataSource")
private DataSource dataSource;
@Autowired
private XxxService xxxService;
/**
* 6 个小时执行一次
*/
@PostConstruct
@Scheduled(fixedRate = 1000 * 60 * 60 * 12)
public void refreshActualDataNodes() throws NoSuchFieldException, IllegalAccessException {
log.info("Job 动态刷新 actualDataNodes START");
if (dynamicTables.getNames() == null || dynamicTables.getNames().length == 0) {
log.error("【dynamic.table.names】配置为空!");
return;
}
for (int i = 0; i < dynamicTables.getNames().length; i++) {
String dynamicTableName = dynamicTables.getNames()[i];
TableRule tableRule = null;
try {
ShardingDataSource shardingDataSource = (ShardingDataSource) dataSource;
ShardingRule shardingRule = shardingDataSource.getRuntimeContext().getRule();
tableRule = shardingRule.getTableRule(dynamicTableName);
} catch (ShardingConfigurationException e) {
log.error(String.format("逻辑表:%s 动态分表配置错误!", dynamicTableName));
}
String dataSourceName = tableRule.getActualDataNodes().get(0).getDataSourceName();
String logicTableName = tableRule.getLogicTable();
assert tableRule != null;
List<DataNode> newDataNodes = getDataNodes(dynamicTableName, dataSourceName, logicTableName);
if (newDataNodes.isEmpty()) {
throw new UnsupportedOperationException();
}
createSubTableIfAbsent(logicTableName, newDataNodes);
dynamicRefreshDatasource(dataSourceName, tableRule, newDataNodes);
}
log.info("Job 动态刷新 actualDataNodes END");
}
/**
* 补偿创建分表
*/
private void createSubTableIfAbsent(String logicTableName, List<DataNode> newDataNodes) {
if (isNotEmpty(newDataNodes)) {
List<String> subTableNames = newDataNodes.stream().map(DataNode::getTableName).collect(Collectors.toList());
xxxService.createSubTableIfAbsent(logicTableName, subTableNames);
}
}
/**
* 获取数据节点
*/
private List<DataNode> getDataNodes(String tableName, String dataSourceName, String logicTableName) {
Set<DataNode> newDataNodes = Sets.newHashSet();
StringBuilder stringBuilder = new StringBuilder().append(dataSourceName).append(".").append(logicTableName);
final int length = stringBuilder.length();
// 根据自增id范围分表的场景
if (tableName.equals(MESSAGE_ID_MAPPING)) {
Long maxMessageId = xxxService.getMaxMessageId();
Long notFullSubTableSuffix = maxMessageId / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
Long lastSubTableSize = maxMessageId % MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
if (lastSubTableSize > MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY >> 1) {
// 当最后一个分表的容量达到一半时,就扩展出一个分表
notFullSubTableSuffix++;
}
while (notFullSubTableSuffix >= 0L) {
stringBuilder.setLength(length);
stringBuilder.append("_").append(notFullSubTableSuffix);
DataNode dataNode = new DataNode(stringBuilder.toString());
newDataNodes.add(dataNode);
notFullSubTableSuffix--;
}
}
// 扩展点
return Lists.newArrayList(newDataNodes);
}
/**
* 动态刷新数据源
*/
private void dynamicRefreshDatasource(String dataSourceName, TableRule tableRule, List<DataNode> newDataNodes)
throws NoSuchFieldException, IllegalAccessException {
Set<String> actualTables = Sets.newHashSet();
Map<DataNode, Integer> dataNodeIndexMap = Maps.newHashMap();
AtomicInteger index = new AtomicInteger(0);
newDataNodes.forEach(dataNode -> {
actualTables.add(dataNode.getTableName());
if (index.intValue() == 0) {
dataNodeIndexMap.put(dataNode, 0);
} else {
dataNodeIndexMap.put(dataNode, index.intValue());
}
index.incrementAndGet();
});
// 动态刷新:actualDataNodesField
Field actualDataNodesField = TableRule.class.getDeclaredField("actualDataNodes");
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(actualDataNodesField, actualDataNodesField.getModifiers() & ~Modifier.FINAL);
actualDataNodesField.setAccessible(true);
actualDataNodesField.set(tableRule, newDataNodes);
// 动态刷新:actualTablesField
Field actualTablesField = TableRule.class.getDeclaredField("actualTables");
actualTablesField.setAccessible(true);
actualTablesField.set(tableRule, actualTables);
// 动态刷新:dataNodeIndexMapField
Field dataNodeIndexMapField = TableRule.class.getDeclaredField("dataNodeIndexMap");
dataNodeIndexMapField.setAccessible(true);
dataNodeIndexMapField.set(tableRule, dataNodeIndexMap);
// 动态刷新:datasourceToTablesMapField
Map<String, Collection<String>> datasourceToTablesMap = Maps.newHashMap();
datasourceToTablesMap.put(dataSourceName, actualTables);
Field datasourceToTablesMapField = TableRule.class.getDeclaredField("datasourceToTablesMap");
datasourceToTablesMapField.setAccessible(true);
datasourceToTablesMapField.set(tableRule, datasourceToTablesMap);
}
}
说明:如果是分库分表
,以下操作需要做好封装:
xxxService.createSubT
xxxService.getMaxMessageId();
扩展
- 基于范围分片时,如果需要通过代码动态创建新的分表,当基于分片的数据源创建分表时,创建
SQL
会被Sharding-JDBC
自动路由,从而导致创建语句报错:
// 举例
create table if not exists message_id_mapping_20 like message_id_mapping;
// Sharding-JDBC 给路由成 ==>
create table if not exists message_id_mapping_20 like message_id_mapping_21;
// 说明
由于当前没有 message_id_mapping_21 表,导致创建报错,说白了,就是不想让它给我自动路由,该怎么办?
- 当前的解决办法:
根据分表的数据源配置创建普通JDBC
数据源,基于普通JDBC
数据源去创建新的分表。如果读者朋友有更好的解决办法,也请评论在底下,谢谢~
配置文件 能贴出来吗
大哥,问个问题,shardingjdbc从3.x升级到4.x启动时出现了表或视图不存在,数据库里表是存在的,相对3版本启动时正常,这个有什么解决思路吗
5.0.0-beta 也好使
system.out.print(“我提交了一个评论”)
说明里面是什么
?