Java 开发

异步并发利器:CompletableFuture、parallelStream

言七墨 · 12月18日 · 2019年 · · · 331次已读

问题描述

生产上 prometheus 频繁告警java.net.SocketTimeoutException: timeout(远程调用超时),首先确定了超时接口,后查看 Hystrix 的配置,发现 A 服务对 B 服务设置的超时时间是20s(一开始是没有发现未启用这个配置的),故一直在 review B 服务的这个超时的接口,一开始是在整个接口上、接口内部的 IO 操作、接口内部的MQ 操作上都做了时间差的日志打印操作,发到线上后,发现有的请求达到了10s 多,但是 Hystrix 配置的超时时间是 20s 呀,百思不得其解,眼下做的只能是赶紧优化下 B 服务的这个超时接口了(毕竟耗时太严重了)。

耗时接口描述

此接口是一个 IO 密集型的批量操作接口,入参是一个 List<..>,大部分入参的 size == 1(耗时仅几十毫秒),只有少部分会是批量操作,批量操作的 size 最大是 100(最大耗时 10s 多)。接口内部:首先通过某个字段进行数据分组,极限的时候会被分成 100 组数据,分组后再对每组数据进行 IO 操作、MQ 操作,内部不仅有查询操作,还有插入操作,稍微有点复杂(接口已经排除了事务的干扰,即不存在事务耗时问题,核心操作的是原子的)。

解决

通过 Java8 的 CompletableFuture 或 parallelStream 异步并发解决:

下方代码中的 solve(..) 方法是分组后,每组数据都要执行的方法(内部含有 IO 操作、MQ 操作..)

  • 1、使用自定义线程池的 CompletableFuture 解决
public List<MessageSendInfoBO> send(List<MessageSendDTO> messageSendDTOS) {
    // 分组伪代码
    Map<Long, List<MessageSendDTO>> sendDTOMap = messageSendDTOS.stream()
                .collect(Collectors.groupingBy(MessageSendDTO::getSessionId));
    // 声明线程池
    ExecutorService executor = Executors.newFixedThreadPool(4);
    // supplyAsync 可以支持返回值
    // 执行完毕一个 CompletableFuture,再对其进行下一个 CompletableFuture 操作
    // 异步并发处理任务
    List<CompletableFuture<List<MessageSendInfoBO>>> tasks = sendDTOMap.entrySet().stream()
            .map(e -> CompletableFuture.supplyAsync(() -> solve(e.getKey(), e.getValue()), executor))
            .collect(toList());
    try {
        // allOf(静态方法)允许等待所有的任务完成,相当于对上面方法的回调
        // thenApplyAsync:使用传递式的 Executor 任务去运行,可以进一步并行化计算并更有效地使用系统资源
        CompletableFuture.allOf(tasks.toArray(new CompletableFuture[tasks.size()])).thenApplyAsync(v -> tasks.stream().map(CompletableFuture::join).flatMap(List::stream).collect(toList()));
    } catch (Throwable e) {}
    // 通过 CompletableFuture::join 组装每组任务的结果
    return tasks.stream().map(CompletableFuture::join).flatMap(List::stream).collect(toList());
}
  • 2、使用自定义线程池的 parallelStream 解决
public List<MessageSendInfoBO> send(List<MessageSendDTO> messageSendDTOS) {
    // 分组伪代码
    Map<Long, List<MessageSendDTO>> sendDTOMap = messageSendDTOS.stream()
                .collect(Collectors.groupingBy(MessageSendDTO::getSessionId));
    // Collections.synchronizedList(..) 可保证多线程并发安全
    List<MessageSendInfoBO> infoBOs = Collections.synchronizedList(Lists.newArrayList());
    // 声明线程池
    ForkJoinPool forkJoinPool = new ForkJoinPool(4);  CPU最高2核,则线程池可以大点
    try {
        forkJoinPool.submit(() ->
            // 将每组的操作交给线程池执行
            // parallelStream 分而治之,也就是将一个大任务切分成多个小任务,然后异步并发执行
            infoBOs.addAll(sendDTOMap.entrySet().parallelStream().map(e -> solve(e.getKey(), e.getValue()))
                    .flatMap(List::stream)
                    .collect(toList()))
        ).get();
    } catch (InterruptedException e) {
        e.printStackTrace();
    } catch (ExecutionException e) {
        e.printStackTrace();
    }
    return infoBOs;
}
  • 3、使用默认线程池的 parallelStream
public List<MessageSendInfoBO> send(List<MessageSendDTO> messageSendDTOS) {
    Map<Long, List<MessageSendDTO>> sendDTOMap = messageSendDTOS.stream()
                .collect(Collectors.groupingBy(MessageSendDTO::getSessionId));
    // parallelStream 分而治之,也就是将一个大任务切分成多个小任务,然后异步并发执行
    return sendDTOMap.entrySet().parallelStream().map(e -> solve(e.getKey(), e.getValue()))
                .flatMap(List::stream)
                .collect(toList());
}

总结

以上三种方式在本地通过压力测试,发现性能都差不多,并且未发现并发问题,最终使用的是第 3 种方式(使用默认线程池的 parallelStream),发到线上后,观察发现当入参的 size 是 100 时,最大的耗时是 1.7s,这个结果暂时是可以接受的。这个接口可以算是最核心的接口了,后续有时间了,再看看有没有优化的空间,或者使用第 1 种方式,测试出合适的线程池大小,对于 IO 密集型操作而言,或许还有优化的空间。这个优化完了,才发现 Hystrix 的配置未启用,使用的是默认的配置(即超时时间是 1s),不认真哎,不过也借此机会发现了这个接口耗时严重,并加以优化。

0 条回应