背景
需求:通过 Job 每秒去 Redis 中获取 Key 前缀为:“message:xxx:yyy:id: ”的所有 Value,Value以字符串类型存储,键值对存储预估峰值:几十万。
Redis中有一个经典的问题,在巨大的数据量的情况下,做类似于查找符
言七墨 合某种规则的 Key 的信息,有两种方式:1、通过 keys 命令,简单粗暴,由于Redis单线程这一特性,keys 命令以阻塞的方式执
言七墨 行。keys 命令通过遍历查找,时间复杂度是 O(n)。Redis 库中 key 的数量越多,查找实现的代价越大,产生阻塞的时间越长。2、通过 scan 命令,以非阻塞的方式实现 key 值的查找,绝大多数情况下是可以替代 keys 命令的,可选性更强。
代码实现
- 方法一:通过 scan 先获取以“message:xxx:yyy:id: ”为 Key
七墨博客 前缀的所有完整的 Key七墨博客 ,再通过获取到的 Key 拿所有的 Value
/**
* 通过 key 获取 value
* <p>
* pattern:message:xxx:yyy:id:
* limit:每次限制筛选的数量,不建议 Integer.MAX_VALUE
*/
public List<String> assembleScanValues(String pattern, Long limit) {
List<String> values = assembleScanKeys(pattern, limit);
return redisTemplate.opsForValue().multiGet(values).stream().filter(StringUtils::isNotBlank).collect(toList());
}
/**
* 组装 scan 的结果集
*/
public List<String> assembleScanKeys(String pattern, Long limit) {
HashSet<String> set = new HashSet<>();
Cursor<String> cursor = scan(redisTemplate, pattern, limit);
while (cursor.hasNext()) {
set.add(cursor.next());
}
try {
cursor.close();
} catch (Exception e) {
log.error("关闭 redis connection 失败");
}
return set.stream().map(String::valueOf).collect(toList());
}
/**
* 自定义 redis scan 操作
*/
private Cursor<String> scan(RedisTemplate redisTemplate, String pattern, Long limit) {
ScanOptions options = ScanOptions.scanOptions().match(pattern).count(limit).build();
RedisSerializer<String> redisSerializer = (RedisSerializer<String>) redisTemplate.getKeySerializer();
return (Cursor) redisTemplate.executeWithStickyConnection(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection redisConnection)
throws org.springframework.dao.DataAccessException {
return new ConvertingCursor<>(redisConnection.scan(options), redisSerializer::deserialize);
}
});
}
- 方法二:通过 scan
七墨博客 获取到 Key 的同时,去获取对应的 Value
/**
* 组装分布式缓存中的 value 值
* <p>
* pattern:message:xxx:yyy:id:
* limit:每次限制筛选的数量,不建议 Integer.MAX_VALUE
*/
public List<String> assembleScanValues(String pattern, Long limit) {
Set<String> valueSet = scan(redisTemplate, pattern, limit);
return valueSet.stream().map(String::valueOf).collect(toList());
}
/**
* 组装 scan 的结果集
*/
private Set<String> scan(RedisTemplate redisTemplate, String pattern, Long limit) {
return (Set<String>) redisTemplate.execute(new RedisCallback<Set<String>>() {
@Override
public Set<String> doInRedis(RedisConnection connection) throws DataAccessException {
Set<String> valueSet = new HashSet<>();
try (Cursor<byte[]> cursor = connection.scan(new ScanOptions.ScanOptionsBuilder()
.match(pattern).count(limit).build())) {
while (cursor.hasNext()) {
byte[] bytes = connection.get(cursor.next());
String value = String.valueOf(redisTemplate.getValueSerializer().deserialize(bytes));
valueSet.add(value);
}
} catch (IOException e) {
log.error(String.format("get cursor close {%s}", e));
}
return valueSet;
}
});
}
我现在需要一个能够遍历redis中的key的方法,你这个方法 我需要怎么引用呢,麻烦大神解答一下
你这边需要给你存储的 key 定义一个公共前缀,然后基于公共前缀进行遍历,即上面代码中的 pattern 字段
Set valueSet = scan(redisTemplate, pattern, limit);
这行代码中的 redisTemplate 和pattern是需要传什么内容的参数呢
1、redisTemplate 指的是 StringRedisTemplate 对象
2、pattern 是存储在 redis 中的 key 前缀
Scan is not supported accros multiple nodes within a cluster
Scan is a command for single redis node. If you do want to use it in cluster, first get nodes list in the cluster, and run scan for each node.