CompletableFuture是Java中实现异步操作的核心工具,通过supplyAsync和runAsync创建有无返回值的异步任务,支持自定义线程池;其链式编程如thenApply、thenAccept、thenCombine等方法可构建清晰的异步流程;相比传统Future的阻塞等待,CompletableFuture提供非阻塞回调、丰富组合能力及exceptionally、handle、whenComplete等灵活异常处理机制;典型应用场景包括微服务编排聚合、并行数据处理、异步事件响应、超时回退等,显著提升系统响应性和资源利用率。

在Java中,要实现异步操作,
CompletableFuture
在我看来,理解
CompletableFuture
Future
Future
Future
CompletableFuture
最基本的异步任务创建,我们可以用
supplyAsync
runAsync
runAsync
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 模拟一个耗时操作
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("这是一个没有返回值的异步任务,执行线程:" + Thread.currentThread().getName());
});
// 可以在这里做其他事情,不用等待
System.out.println("主线程继续执行...");
future.join(); // 阻塞等待任务完成,但不推荐在主线程频繁使用
System.out.println("异步任务完成!");而
supplyAsync
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "异步操作的结果,执行线程:" + Thread.currentThread().getName();
});
// 在这里可以继续处理其他业务逻辑
System.out.println("主线程在等待结果的同时,可以做点别的...");
future.thenAccept(result -> {
// 当异步任务完成时,处理结果
System.out.println("收到异步结果:" + result + ",处理结果的线程:" + Thread.currentThread().getName());
});
// 为了演示,这里阻塞一下,实际应用中通常不会这样
// future.join();默认情况下,
supplyAsync
runAsync
ForkJoinPool.commonPool()
Executor
立即学习“Java免费学习笔记(深入)”;
ExecutorService customExecutor = Executors.newFixedThreadPool(5);
CompletableFuture<String> customFuture = CompletableFuture.supplyAsync(() -> {
// ... 耗时操作 ...
return "使用自定义线程池的结果";
}, customExecutor);
// ...
customExecutor.shutdown();CompletableFuture
thenApply(Function)
CompletableFuture
CompletableFuture
thenAccept(Consumer)
CompletableFuture
thenRun(Runnable)
CompletableFuture
thenCompose(Function)
CompletableFuture
CompletableFuture
CompletableFuture
flatMap
thenCombine(otherFuture, BiFunction)
CompletableFuture
一个简单的链式调用例子:
CompletableFuture<String> fetchUserData = CompletableFuture.supplyAsync(() -> {
System.out.println("1. 异步获取用户数据...");
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "用户ID: 123";
});
CompletableFuture<String> processData = fetchUserData.thenApplyAsync(userId -> {
System.out.println("2. 处理用户数据:" + userId);
try { Thread.sleep(700); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return userId + ", 状态: 活跃";
});
CompletableFuture<Void> sendNotification = processData.thenAcceptAsync(processedInfo -> {
System.out.println("3. 发送通知:" + processedInfo);
// 模拟发送通知
});
sendNotification.join(); // 等待所有链式操作完成
System.out.println("所有异步操作链完成。");注意
thenApplyAsync
Async
Async
ForkJoinPool.commonPool()
Async
这真是一个经典的问题,也是我刚接触
CompletableFuture
Future
get()
get()
future.get()
更糟糕的是,
Future
Future
Future
CountDownLatch
ExecutorCompletionService
CompletableFuture
Future
complete()
completeExceptionally()
CompletableFuture
thenApply
thenAccept
CompletableFuture
thenCompose
Future
thenCombine
Future
allOf
Future
anyOf
Future
Future
get()
ExecutionException
CompletableFuture
exceptionally
handle
whenComplete
CompletableFuture
complete()
completeExceptionally()
在我看来,
CompletableFuture
处理异步操作中的异常,在我看来,是衡量一个并发工具是否成熟的关键标准之一。
CompletableFuture
Future
ExecutionException
主要的异常处理方法有:
exceptionally(Function<Throwable, T>)
CompletableFuture
exceptionally
CompletableFuture
thenApply
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("开始执行任务...");
if (Math.random() < 0.5) {
throw new RuntimeException("随机错误发生!");
}
return "任务成功完成";
}).exceptionally(ex -> {
System.err.println("捕获到异常:" + ex.getMessage());
return "任务失败,返回默认值"; // 返回备用值
});
System.out.println("最终结果:" + future.join());这里要注意的是,
exceptionally
Throwable
Exception
handle(BiFunction<? super T, Throwable, ? extends U>)
handle
Throwable
null
null
handle
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("执行任务...");
if (Math.random() < 0.5) {
throw new IllegalStateException("状态异常!");
}
return "成功数据";
}).handle((result, ex) -> {
if (ex != null) {
System.err.println("Handle中捕获异常:" + ex.getMessage());
return "从异常中恢复的数据"; // 恢复
} else {
System.out.println("Handle中处理正常结果:" + result);
return result + " (已处理)"; // 处理正常结果
}
});
System.out.println("最终结果:" + future.join());handle
whenComplete(BiConsumer<? super T, ? super Throwable>)
whenComplete
CompletableFuture
whenComplete
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("任务开始...");
if (Math.random() < 0.5) {
throw new ArithmeticException("数学错误!");
}
return "计算结果";
}).whenComplete((result, ex) -> {
if (ex != null) {
System.err.println("任务失败,记录日志:" + ex.getMessage());
} else {
System.out.println("任务成功,记录日志:" + result);
}
// 注意:这里不能修改结果或恢复异常
}).exceptionally(ex -> { // 异常仍然会传递到这里
System.err.println("最终异常恢复:" + ex.getMessage());
return "备用计算结果";
});
System.out.println("最终状态:" + future.join());whenComplete
最后,值得一提的是
join()
get()
get()
ExecutionException
InterruptedException
join()
CompletionException
join()
get()
在我的开发经验中,
CompletableFuture
微服务编排与聚合: 这是最常见的场景之一。假设你的前端页面需要展示一个用户的完整信息,而这些信息分散在不同的微服务中:用户基本信息服务、订单服务、评论服务、积分服务。如果同步调用这些服务,会非常慢。 使用
CompletableFuture
thenCombine
allOf
// 假设这些是调用不同微服务的异步方法
CompletableFuture<UserInfo> userInfoFuture = userService.getUserInfo(userId);
CompletableFuture<List<Order>> ordersFuture = orderService.getOrders(userId);
CompletableFuture<Integer> pointsFuture = pointService.getPoints(userId);
// 等待所有服务调用完成,然后聚合结果
CompletableFuture<FullUserProfile> fullProfileFuture = CompletableFuture.allOf(userInfoFuture, ordersFuture, pointsFuture)
.thenApply(v -> { // v是Void,因为allOf返回Void
UserInfo userInfo = userInfoFuture.join(); // join在这里是安全的,因为allOf保证它们已完成
List<Order> orders = ordersFuture.join();
Integer points = pointsFuture.join();
return new FullUserProfile(userInfo, orders, points);
})
.exceptionally(ex -> {
System.err.println("聚合用户档案失败:" + ex.getMessage());
// 返回一个部分数据或默认值
return new FullUserProfile(null, Collections.emptyList(), 0);
});
FullUserProfile profile = fullProfileFuture.join();
// ... 处理完整的用户档案这样一来,整个页面的加载时间就取决于最慢的那个服务,而不是所有服务时间之和,极大地提升了用户体验。
并行数据处理与计算: 当需要处理大量数据,并且每个数据块的处理是独立的,或者需要对数据进行多个独立的计算时,
CompletableFuture
CompletableFuture
List<String> largeDataSet = Arrays.asList("data1", "data2", "data3", "data4"); // 模拟大数据集
List<CompletableFuture<String>> futures = largeDataSet.stream()
.map(data -> CompletableFuture.supplyAsync(() -> {
// 模拟耗时的数据处理
try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return data.toUpperCase(); // 转换为大写
}))
.collect(Collectors.toList());
// 等待所有处理完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// 获取所有结果
CompletableFuture<List<String>> allResultsFuture = allOf.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
List<String> processedData = allResultsFuture.join();
System.out.println("所有数据处理完成:" + processedData);这比传统的
ExecutorService.submit()
future.get()
异步事件处理: 在事件驱动的架构中,当一个事件发生时,可能需要触发多个不相关的异步操作,比如发送邮件、更新缓存、记录日志等。
public void onUserRegistered(User user) {
CompletableFuture.runAsync(() -> emailService.sendWelcomeEmail(user))
.exceptionally(ex -> {
System.err.println("发送欢迎邮件失败:" + ex.getMessage());
return null;
});
CompletableFuture.runAsync(() -> cacheService.updateUserCache(user))
.exceptionally(ex -> {
System.err.println("更新用户缓存失败:" + ex.getMessage());
return null;
});
CompletableFuture.runAsync(() -> auditLogService.logUserRegistration(user))
.exceptionally(ex -> {
System.err.println("记录注册日志失败:" + ex.getMessage());
return null;
});
System.out.println("用户注册事件已处理,异步任务已启动。");
}主业务流程不会因为这些副作用操作的耗时而阻塞。
超时控制与回退机制:
CompletableFuture
CompletableFuture.anyOf
CompletableFuture.runAfter
CompletableFuture<String> primaryServiceCall = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "主服务结果";
});
// 模拟一个超时Future
CompletableFuture<String> timeout = CompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
return "超时!";
});
// 哪个先完成就用哪个结果
CompletableFuture<Object> result = CompletableFuture.anyOf(primaryServiceCall, timeout)
.thenApply(obj -> {
if ("超时!".equals(obj)) {
System.out.println("主服务超时,使用备用逻辑或抛出异常");
// 可以在这里触发一个回退服务调用
return "备用结果";
}
return obj;
});
System.out.println("最终结果:" + result.join());这在调用外部API或不可靠服务时非常有用,可以防止单个慢服务拖垮整个系统。
这些场景仅仅是冰山一角。在我看来,一旦你掌握了
CompletableFuture
以上就是如何在Java中使用Completable Future实现异步操作的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号