CompletableFuture 异常处理实践:避免线程池耗尽
想象一个场景。假设我们有一个电商平台,需要异步处理大量的订单数据。我们可以使用 CompletableFuture 来实现异步处理,并提高并发效率。
CompletableFuture 简介
CompletableFuture 是 Java 8 引入的异步编程工具,它可以简化异步任务的处理。CompletableFuture 提供了多种方法来处理任务的完成和异常情况。
思考:以下代码会正常执行吗?
main 函数
Java
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
// 创建线程池
ThreadPoolExecutor executorService = new ThreadPoolExecutor(4, 4,
20, TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
// 循环执行 task 函数
for (int i = 0; i < 10; i++) {
task(executorService);
System.out.println("------------------------------");
}
}
- 创建一个固定大小的线程池,用于执行异步任务。
- 使用 for 循环10次执行 task 函数,模拟异步处理订单数据。
task 函数
Java
public static void task(ThreadPoolExecutor executorService) {
// 打印线程池中正在执行任务的线程数量
System.out.println("Active Threads: " + executorService.getActiveCount());
List<CompletableFuture<Integer>> futures = new ArrayList<>();
// 模拟4个异步任务
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(1), executorService));
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(5), executorService));
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(4), executorService));
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(3), executorService));
// 等待所有任务完成
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
voidCompletableFuture.handle((v, ex) -> {
Optional.ofNullable(ex).ifPresent(e -> task(executorService));
return null;
}).join();
// 处理结果
System.out.println("Total sum: " + futures.stream()
.map(CompletableFuture::join)
.reduce(0, Integer::sum)
);
}
- 打印当前线程池中正在执行任务的线程数量。
- 创建 4 个异步任务,并使用
CompletableFuture.supplyAsync
方法将它们提交到线程池。 - 使用
CompletableFuture.allOf
方法等待所有任务完成。 - 使用
handle
方法处理任务完成后的结果,异常后让task函数递归执行。 - 计算并输出所有任务的最终结果。
calculateResult 函数
Java
private static int calculateResult(int number) {
if (number == 3) {
throw new RuntimeException("Error occurred for number: " + number);
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return number * 2;
}
- 如果入参为 3,则抛出异常。
- 模拟计算结果的过程,并休眠 1 秒。
- 返回计算结果。
执行结果:
Active Threads: 0
Active Threads: 1
Active Threads: 2
Active Threads: 3
Active Threads: 4
问题分析:
for循环中的任务仅仅执行了4次,是否和想象的输出一样?实际上 calculateResult(3)
执行会抛出异常,导致任务无法正常完成。但是,CompletableFuture
并没有终止任务,而是继续运行,占用当前线程资源。最终,导致线程池中的所有线程都被耗尽。
解决方案:
使用 exceptionally
方法捕获异常,终止任务,可以避免线程池耗尽问题。
修复代码:
Java
public static void task(ThreadPoolExecutor executorService) {
List<CompletableFuture<Integer>> futures = new ArrayList<>();
// 模拟4个异步任务
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(1), executorService));
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(5), executorService));
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(4), executorService));
futures.add(CompletableFuture.supplyAsync(() -> calculateResult(3), executorService)
.exceptionally(t -> {
System.out.println("任务异常:" + t.getMessage());
return 0;
}));
// 等待所有任务完成
CompletableFuture<Void> voidCompletableFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
voidCompletableFuture.handle((v, ex) -> {
Optional.ofNullable(ex).ifPresent(e -> task(executorService));
return null;
}).join();
// 处理结果
System.out.println("Total sum: " + futures.stream()
.map(CompletableFuture::join)
.reduce(0, Integer::sum)
);
}
修复解释:
exceptionally
方法可以捕获任务执行过程中发生的异常,并进行自定义处理。- 通过
exceptionally
方法,我们可以避免异常被 CompletableFuture 吞掉,最重要的是保证该任务可以异常完成,从而避免该线程永远处于运行状态。
总结
- CompletableFuture 的异常处理需要格外谨慎,否则可能导致线程池耗尽。
- 使用
exceptionally
方法捕获异常,显式终止任务,可以有效避免线程池耗尽问题。
Comments
Post a Comment