中间件

Sharding-JDBC 支持动态扩容及刷新 ActualDataNodes

言七墨 · 9月7日 · 2020年 · 2561次已读

需求

目前,有一个基于自增message_id的范围分片表(message_id_mapping),如何动态地扩展分表并刷新分表的配置(ActualDataNodes)?

开发

  • 范围分片配置
# message_id_mapping表的路由策略
spring.shardingsphere.sharding.tables.message_id_mapping.actual-data-nodes = shardingds.message_id_mapping
spring.shardingsphere.sharding.tables.message_id_mapping.table-strategy.standard.sharding-column = message_id
spring.shardingsphere.sharding.tables.message_id_mapping.table-strategy.standard.precise-algorithm-class-name = com.xxx.yyy.persistence.config.MessageIdShardingAlgorithm
spring.shardingsphere.sharding.tables.message_id_mapping.table-strategy.standard.range-algorithm-class-name = com.xxx.yyy.persistence.config.MessageIdShardingAlgorithm
  • model
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import java.util.Optional;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ModelConstants {

    /**
     * message_id_mapping 逻辑表名
     */
    public static final String MESSAGE_ID_MAPPING = "message_id_mapping";

    /**
     * message_id_mapping 分表前缀
     */
    public static final String MESSAGE_ID_MAPPING_PREFIX = "message_id_mapping_";

    /**
     * message_id_mapping 单分片表数量
     */
    public static final Long MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY = 20000000L;

}
  • 范围分片算法类
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.PreciseShardingValue;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingAlgorithm;
import org.apache.shardingsphere.api.sharding.standard.RangeShardingValue;
import java.util.Collection;
import java.util.LinkedHashSet;
import java.util.Set;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING_PREFIX;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;

/**
 * 范围分片算法
 *
 * @author qimok
 * @since 2020-09-07
 */
public class MessageIdShardingAlgorithm implements PreciseShardingAlgorithm<Long>, RangeShardingAlgorithm<Long> {

    @Override
    public String doSharding(Collection<String> availableTargetNames, PreciseShardingValue<Long> shardingValue) {
        Long value = shardingValue.getValue();
        Long subTableSuffix = value / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
        if (availableTargetNames.contains(MESSAGE_ID_MAPPING_PREFIX + subTableSuffix)) {
            return MESSAGE_ID_MAPPING_PREFIX + subTableSuffix;
        } else {
            throw new UnsupportedOperationException();
        }
    }

    @Override
    public Collection<String> doSharding(Collection<String> availableTargetNames, RangeShardingValue<Long> shardingValue) {
        Set<String> result = new LinkedHashSet<>();
        Boolean isHasLowerBound = shardingValue.getValueRange().hasLowerBound();
        Boolean isHasUpperBound = shardingValue.getValueRange().hasUpperBound();
        Long lowerSubTableSuffix = isHasLowerBound
                ? shardingValue.getValueRange().lowerEndpoint() / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY : 0L;
        Long upperSubTableSuffix = isHasUpperBound
                ? shardingValue.getValueRange().upperEndpoint() / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY
                : availableTargetNames.size() - 1;
        Long offset = lowerSubTableSuffix;
        while (offset <= upperSubTableSuffix) {
            if (availableTargetNames.contains(MESSAGE_ID_MAPPING_PREFIX + offset)) {
                result.add(MESSAGE_ID_MAPPING_PREFIX + offset);
            } else {
                throw new UnsupportedOperationException();
            }
            offset++;
        }
        return result;
    }

}
  • 动态分表配置类
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * 动态分表配置
 *
 * @author qimok
 * @since 2020-09-07
 */
@ConfigurationProperties(prefix = "dynamic.table")
@Data
public class DynamicTablesProperties {

    String[] names;

}
  • 动态刷新actualDataNodes
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.xxx.zzz.core.biz.XxxService;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.core.config.ShardingConfigurationException;
import org.apache.shardingsphere.core.rule.DataNode;
import org.apache.shardingsphere.core.rule.ShardingRule;
import org.apache.shardingsphere.core.rule.TableRule;
import org.apache.shardingsphere.shardingjdbc.jdbc.core.datasource.ShardingDataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.sql.DataSource;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.commons.collections.CollectionUtils.isNotEmpty;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING;
import static com.xxx.yyy.model.ModelConstants.MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;

/**
 * 基于范围分表的 ActualDataNodes 动态刷新JOB
 *
 * @author qimok
 * @since 2020-09-07
 */
@Slf4j
@Component
@EnableConfigurationProperties(DynamicTablesProperties.class)
public class ShardingTableRuleActualDataNodesRefreshJob {

    @Autowired
    private DynamicTablesProperties dynamicTables;

    @Resource(name = "shardingDataSource")
    private DataSource dataSource;

    @Autowired
    private XxxService xxxService;

    /**
     * 6 个小时执行一次
     */
    @PostConstruct
    @Scheduled(fixedRate = 1000 * 60 * 60 * 12)
    public void refreshActualDataNodes() throws NoSuchFieldException, IllegalAccessException {
        log.info("Job 动态刷新 actualDataNodes START");
        if (dynamicTables.getNames() == null || dynamicTables.getNames().length == 0) {
            log.error("【dynamic.table.names】配置为空!");
            return;
        }
        for (int i = 0; i < dynamicTables.getNames().length; i++) {
            String dynamicTableName = dynamicTables.getNames()[i];
            TableRule tableRule = null;
            try {
                ShardingDataSource shardingDataSource = (ShardingDataSource) dataSource;
                ShardingRule shardingRule = shardingDataSource.getRuntimeContext().getRule();
                tableRule = shardingRule.getTableRule(dynamicTableName);
            } catch (ShardingConfigurationException e) {
                log.error(String.format("逻辑表:%s 动态分表配置错误!", dynamicTableName));
            }
            String dataSourceName = tableRule.getActualDataNodes().get(0).getDataSourceName();
            String logicTableName = tableRule.getLogicTable();
            assert tableRule != null;
            List<DataNode> newDataNodes = getDataNodes(dynamicTableName, dataSourceName, logicTableName);
            if (newDataNodes.isEmpty()) {
                throw new UnsupportedOperationException();
            }
            createSubTableIfAbsent(logicTableName, newDataNodes);
            dynamicRefreshDatasource(dataSourceName, tableRule, newDataNodes);
        }
        log.info("Job 动态刷新 actualDataNodes END");
    }

    /**
     * 补偿创建分表
     */
    private void createSubTableIfAbsent(String logicTableName, List<DataNode> newDataNodes) {
        if (isNotEmpty(newDataNodes)) {
            List<String> subTableNames = newDataNodes.stream().map(DataNode::getTableName).collect(Collectors.toList());
            xxxService.createSubTableIfAbsent(logicTableName, subTableNames);
        }
    }

    /**
     * 获取数据节点
     */
    private List<DataNode> getDataNodes(String tableName, String dataSourceName, String logicTableName) {
        Set<DataNode> newDataNodes = Sets.newHashSet();
        StringBuilder stringBuilder = new StringBuilder().append(dataSourceName).append(".").append(logicTableName);
        final int length = stringBuilder.length();
        // 根据自增id范围分表的场景
        if (tableName.equals(MESSAGE_ID_MAPPING)) {
            Long maxMessageId = xxxService.getMaxMessageId();
            Long notFullSubTableSuffix = maxMessageId / MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
            Long lastSubTableSize = maxMessageId % MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY;
            if (lastSubTableSize > MESSAGE_ID_MAPPING_SINGLE_TABLE_CAPACITY >> 1) {
                // 当最后一个分表的容量达到一半时,就扩展出一个分表
                notFullSubTableSuffix++;
            }
            while (notFullSubTableSuffix >= 0L) {
                stringBuilder.setLength(length);
                stringBuilder.append("_").append(notFullSubTableSuffix);
                DataNode dataNode = new DataNode(stringBuilder.toString());
                newDataNodes.add(dataNode);
                notFullSubTableSuffix--;
            }
        }
        // 扩展点
        return Lists.newArrayList(newDataNodes);
    }

    /**
     * 动态刷新数据源
     */
    private void dynamicRefreshDatasource(String dataSourceName, TableRule tableRule, List<DataNode> newDataNodes)
            throws NoSuchFieldException, IllegalAccessException {
        Set<String> actualTables = Sets.newHashSet();
        Map<DataNode, Integer> dataNodeIndexMap = Maps.newHashMap();
        AtomicInteger index = new AtomicInteger(0);
        newDataNodes.forEach(dataNode -> {
            actualTables.add(dataNode.getTableName());
            if (index.intValue() == 0) {
                dataNodeIndexMap.put(dataNode, 0);
            } else {
                dataNodeIndexMap.put(dataNode, index.intValue());
            }
            index.incrementAndGet();
        });
        // 动态刷新:actualDataNodesField
        Field actualDataNodesField = TableRule.class.getDeclaredField("actualDataNodes");
        Field modifiersField = Field.class.getDeclaredField("modifiers");
        modifiersField.setAccessible(true);
        modifiersField.setInt(actualDataNodesField, actualDataNodesField.getModifiers() & ~Modifier.FINAL);
        actualDataNodesField.setAccessible(true);
        actualDataNodesField.set(tableRule, newDataNodes);
        // 动态刷新:actualTablesField
        Field actualTablesField = TableRule.class.getDeclaredField("actualTables");
        actualTablesField.setAccessible(true);
        actualTablesField.set(tableRule, actualTables);
        // 动态刷新:dataNodeIndexMapField
        Field dataNodeIndexMapField = TableRule.class.getDeclaredField("dataNodeIndexMap");
        dataNodeIndexMapField.setAccessible(true);
        dataNodeIndexMapField.set(tableRule, dataNodeIndexMap);
        // 动态刷新:datasourceToTablesMapField
        Map<String, Collection<String>> datasourceToTablesMap = Maps.newHashMap();
        datasourceToTablesMap.put(dataSourceName, actualTables);
        Field datasourceToTablesMapField = TableRule.class.getDeclaredField("datasourceToTablesMap");
        datasourceToTablesMapField.setAccessible(true);
        datasourceToTablesMapField.set(tableRule, datasourceToTablesMap);
    }

}

说明:如果是分库分表,以下操作需要做好封装:

xxxService.createSubTableIfAb七墨博客sent(logicTableName, subTableNames);

xxxService.getMaxMessageId();

扩展

  • 基于范围分片时,如果需要通过代码动态创建新的分表,当基于分片的数据源创建分表时,创建SQL会被Sha言七墨rding-JDBC自动路由,从而导致创建语句报错:
// 举例
create table if not exists message_id_mapping_20 like message_id_mapping;
// Sharding-JDBC 给路由成 ==>
create table if not exists message_id_mapping_20 like message_id_mapping_21;
// 说明
由于当前没有 message_id_mapping_21 表,导致创建报错,说白了,就是不想让它给我自动路由,该怎么办?
言七墨
  • 当前的解决办法:

根据https://qimok.cn分表的数据源配置创建普通JDBC数据源,基于普通JDBhttps://qimok.cnC数据源去创建新的分表。如果读者朋友有更好的解决办法,也请评论在底下,谢谢~

2 条回应
  1. 匿名2021-3-17 · 20:40

    说明里面是什么