CompletableFuture 异常处理实践:避免线程池耗尽


想象一个场景。假设我们有一个电商平台,需要异步处理大量的订单数据。我们可以使用 CompletableFuture 来实现异步处理,并提高并发效率。

CompletableFuture 简介

CompletableFuture 是 Java 8 引入的异步编程工具,它可以简化异步任务的处理。CompletableFuture 提供了多种方法来处理任务的完成和异常情况。

思考:以下代码会正常执行吗?

main 函数

Java
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
    // 创建线程池
    ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 4,
            20, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>());

    // 循环执行 task 函数
    for (int i = 0; i < 10; i++) {
        task(executorService);
        System.out.println("------------------------------");
    }

}
  • 创建一个固定大小的线程池,用于执行异步任务。
  • 使用 for 循环10次执行 task 函数,模拟异步处理订单数据。

task 函数

Java
public static void task(ThreadPoolExecutor executorService) {
    // 打印线程池中正在执行任务的线程数量
    System.out.println("Active Threads: " + executorService.getActiveCount());

    List<CompletableFuture<Integer>> futures = new ArrayList<>();

    // 模拟4个异步任务
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(1), executorService));
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(5), executorService));
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(4), executorService));
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(3), executorService));

    // 等待所有任务完成
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
    voidCompletableFuture.handle((v, ex) -> {
	Optional.ofNullable(ex).ifPresent(e -> task(executorService));
        return null;
    }).join();    // 处理结果
    System.out.println("Total sum: " + futures.stream()
            .map(CompletableFuture::join)
            .reduce(0, Integer::sum)
    );
}
  1. 打印当前线程池中正在执行任务的线程数量。
  2. 创建 4 个异步任务,并使用 CompletableFuture.supplyAsync 方法将它们提交到线程池。
  3. 使用 CompletableFuture.allOf 方法等待所有任务完成。
  4. 使用 handle 方法处理任务完成后的结果,异常后让task函数递归执行。
  5. 计算并输出所有任务的最终结果。

calculateResult 函数

Java
private static int calculateResult(int number) {
    if (number == 3) {
        throw new RuntimeException("Error occurred for number: " + number);
    }
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    return number * 2;
}
  1. 如果入参为 3,则抛出异常。
  2. 模拟计算结果的过程,并休眠 1 秒。
  3. 返回计算结果。

执行结果:

Active Threads: 0
Active Threads: 1
Active Threads: 2
Active Threads: 3
Active Threads: 4

问题分析:

for循环中的任务仅仅执行了4次,是否和想象的输出一样?实际上 calculateResult(3) 执行会抛出异常,导致任务无法正常完成。但是,CompletableFuture 并没有终止任务,而是继续运行,占用当前线程资源。最终,导致线程池中的所有线程都被耗尽。

解决方案:

使用 exceptionally 方法捕获异常,终止任务,可以避免线程池耗尽问题。

修复代码:

Java
public static void task(ThreadPoolExecutor executorService) {

    List<CompletableFuture<Integer>> futures = new ArrayList<>();

    // 模拟4个异步任务
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(1), executorService));
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(5), executorService));
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(4), executorService));
    futures.add(CompletableFuture.supplyAsync(() -> calculateResult(3), executorService)
        .exceptionally(t -> {
            System.out.println("任务异常:" + t.getMessage());
            return 0;
        }));

    // 等待所有任务完成
    CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));

    voidCompletableFuture.handle((v, ex) -> {
	Optional.ofNullable(ex).ifPresent(e -> task(executorService));
        return null;
    }).join();

    // 处理结果
    System.out.println("Total sum: " + futures.stream()
            .map(CompletableFuture::join)
            .reduce(0, Integer::sum)
    );
}

修复解释:

  • exceptionally 方法可以捕获任务执行过程中发生的异常,并进行自定义处理。
  • 通过 exceptionally 方法,我们可以避免异常被 CompletableFuture 吞掉,最重要的是保证该任务可以异常完成,从而避免该线程永远处于运行状态。

总结

  • CompletableFuture 的异常处理需要格外谨慎,否则可能导致线程池耗尽。
  • 使用 exceptionally方法捕获异常,显式终止任务,可以有效避免线程池耗尽问题。

Comments

Popular posts from this blog

Spring 内部调用,事务真会失效吗?

优雅摆脱冗长if-else:轻量级策略模式在Java中的简明实践