
在java中,recursiveaction和recursivetask是为forkjoinpool设计的抽象基类,用于支持分治算法的并行执行。它们是forkjointask的子类,顾名思义,其设计目标就是运行在forkjoinpool内部。
ForkJoinPool在创建工作线程时,会使用一个特殊的ForkJoinPool.ForkJoinWorkerThreadFactory来生成ForkJoinWorkerThread实例。这些工作线程是Thread类的子类,而非虚拟线程。虚拟线程(Virtual Threads)需要通过Thread.Builder.OfVirtual或Executors.newVirtualThreadPerTaskExecutor()等方式创建。由于ForkJoinWorkerThreadFactory无法创建虚拟线程,因此RecursiveAction和RecursiveTask无法直接与虚拟线程结合使用。
简而言之,RecursiveAction和RecursiveTask的设计理念和实现机制与ForkJoinPool紧密耦合,而ForkJoinPool目前并不支持将虚拟线程作为其工作线程。
当考虑将任务分解为子任务并在虚拟线程上执行时,我们应该重新审视RecursiveAction/RecursiveTask所提供的价值。这些类主要为ForkJoinPool提供工作窃取(work-stealing)等机制,以在有限的平台线程上高效平衡工作负载。然而,当每个子任务都可以运行在自己的轻量级虚拟线程上时,这些复杂的负载均衡机制的需求就大大降低了。虚拟线程的优势在于其数量庞大且创建成本极低,这使得“为每个子任务分配一个虚拟线程”成为一种可行的策略。
因此,即使不能直接使用RecursiveAction/RecursiveTask,我们仍然可以轻松地使用其他并发工具在虚拟线程上实现递归任务。
立即学习“Java免费学习笔记(深入)”;
CompletableFuture是Java中处理异步操作的强大工具,它与虚拟线程结合可以非常优雅地实现递归任务。以下是一个示例,展示了如何创建一个类似RecursiveTask的结构,但在虚拟线程上执行:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
public class VirtualThreadRecursiveTask {
// 示例1:基本递归任务,每个子任务启动一个虚拟线程
record PseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
// 使用CompletableFuture.runAsync在虚拟线程上执行任务
return CompletableFuture.runAsync(
new PseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
int mid = (from + to) >>> 1; // 计算中间点
if (mid == from) {
// 模拟实际处理,可能包含阻塞操作
System.out.println(Thread.currentThread().getName() + ": Processing range [" + from + ", " + to + "]");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); // 模拟耗时操作
} else {
// 递归地创建并运行子任务
CompletableFuture<Void> sub1 = run(from, mid);
CompletableFuture<Void> sub2 = run(mid, to);
sub1.join(); // 等待子任务完成
sub2.join(); // 等待子任务完成
}
}
}
// 示例2:优化后的递归任务,减少虚拟线程创建数量
record OptimizedPseudoTask(int from, int to) {
public static CompletableFuture<Void> run(int from, int to) {
return CompletableFuture.runAsync(
new OptimizedPseudoTask(from, to)::compute, Thread::startVirtualThread);
}
protected void compute() {
CompletableFuture<Void> pendingFutures = null;
// 循环处理一部分任务,另一部分提交给新线程
for (int currentFrom = this.from; ; ) {
int mid = (currentFrom + to) >>> 1;
if (mid == currentFrom) {
// 模拟实际处理
System.out.println(Thread.currentThread().getName() + ": Processing range [" + currentFrom + ", " + to + "]");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
break;
} else {
// 提交一个子任务到新的虚拟线程
CompletableFuture<Void> sub = run(currentFrom, mid);
if (pendingFutures == null) {
pendingFutures = sub;
} else {
pendingFutures = CompletableFuture.allOf(pendingFutures, sub);
}
currentFrom = mid; // 当前线程处理另一半
}
}
if (pendingFutures != null) {
pendingFutures.join(); // 等待所有提交的子任务完成
}
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("--- Running Basic PseudoTask ---");
long startTime = System.currentTimeMillis();
PseudoTask.run(0, 1_000).join(); // 执行任务并等待完成
long endTime = System.currentTimeMillis();
System.out.println("Basic PseudoTask completed in " + (endTime - startTime) + " ms");
System.out.println("\n--- Running Optimized PseudoTask ---");
startTime = System.currentTimeMillis();
OptimizedPseudoTask.run(0, 1_000_000).join(); // 执行优化后的任务
endTime = System.currentTimeMillis();
System.out.println("Optimized PseudoTask completed in " + (endTime - startTime) + " ms");
}
}代码解析与注意事项:
Java的孵化模块中引入了StructuredTaskScope,它提供了一种更结构化的并发编程模型,尤其适用于虚拟线程。StructuredTaskScope允许在一个明确的父子关系中管理并发任务,确保所有子任务在父任务完成前结束,并能更好地处理错误和取消。
import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class StructuredVirtualThreadTask {
// 示例:使用StructuredTaskScope实现递归任务
record PseudoTask(int from, int to) {
public static void run(int from, int to) {
// 使用ShutdownOnFailure策略,任何子任务失败都会关闭整个作用域
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
new PseudoTask(from, to).compute(scope);
scope.join(); // 等待所有子任务完成
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException("Task interrupted", e);
}
}
protected Void compute(StructuredTaskScope<Object> scope) {
for (int currentFrom = this.from; ; ) {
int mid = (currentFrom + to) >>> 1;
if (mid == currentFrom) {
// 模拟实际处理
System.out.println(Thread.currentThread().getName() + ": Processing range [" + currentFrom + ", " + to + "]");
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
break;
} else {
// 使用scope.fork()在虚拟线程中启动子任务
var sub = new PseudoTask(currentFrom, mid);
scope.fork(() -> sub.compute(scope));
currentFrom = mid; // 当前线程处理另一半
}
}
return null;
}
}
public static void main(String[] args) {
System.out.println("--- Running StructuredTaskScope PseudoTask ---");
long startTime = System.currentTimeMillis();
PseudoTask.run(0, 1_000_000); // 执行任务
long endTime = System.currentTimeMillis();
System.out.println("StructuredTaskScope PseudoTask completed in " + (endTime - startTime) + " ms");
}
}代码解析与注意事项:
尽管RecursiveAction和RecursiveTask无法直接与虚拟线程配合使用,但这并不意味着我们无法在虚拟线程上实现高效的递归任务。通过利用CompletableFuture的异步特性,并结合Thread::startVirtualThread来创建虚拟线程,我们可以轻松构建自定义的递归任务处理机制。此外,StructuredTaskScope作为一项新兴的结构化并发特性,为未来在虚拟线程上管理复杂任务提供了更优雅、更健壮的解决方案。在设计虚拟线程应用程序时,理解这些替代方案及其优势,将有助于我们充分利用虚拟线程的强大功能。
以上就是Java虚拟线程与RecursiveAction/Task:兼容性与替代方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号