首页 > Java > java教程 > 正文

Java虚拟线程与RecursiveAction/Task:兼容性与替代方案

花韻仙語
发布: 2025-10-11 11:10:18
原创
172人浏览过

Java虚拟线程与RecursiveAction/Task:兼容性与替代方案

本文深入探讨了Java中RecursiveAction和RecursiveTask与虚拟线程的兼容性问题。由于它们与ForkJoinPool的固有绑定,无法直接使用虚拟线程。文章继而提出了基于CompletableFuture和StructuredTaskScope(孵化中)的替代方案,演示了如何利用虚拟线程的轻量级特性,高效实现递归任务的并行处理,并提供优化策略。

1. RecursiveAction/Task 与虚拟线程的兼容性分析

java中,recursiveaction和recursivetask是为forkjoinpool设计的抽象基类,用于支持分治算法的并行执行。它们是forkjointask的子类,顾名思义,其设计目标就是运行在forkjoinpool内部。

ForkJoinPool在创建工作线程时,会使用一个特殊的ForkJoinPool.ForkJoinWorkerThreadFactory来生成ForkJoinWorkerThread实例。这些工作线程是Thread类的子类,而非虚拟线程。虚拟线程(Virtual Threads)需要通过Thread.Builder.OfVirtual或Executors.newVirtualThreadPerTaskExecutor()等方式创建。由于ForkJoinWorkerThreadFactory无法创建虚拟线程,因此RecursiveAction和RecursiveTask无法直接与虚拟线程结合使用。

简而言之,RecursiveAction和RecursiveTask的设计理念和实现机制与ForkJoinPool紧密耦合,而ForkJoinPool目前并不支持将虚拟线程作为其工作线程。

2. 重新思考:虚拟线程下的递归任务设计

当考虑将任务分解为子任务并在虚拟线程上执行时,我们应该重新审视RecursiveAction/RecursiveTask所提供的价值。这些类主要为ForkJoinPool提供工作窃取(work-stealing)等机制,以在有限的平台线程上高效平衡工作负载。然而,当每个子任务都可以运行在自己的轻量级虚拟线程上时,这些复杂的负载均衡机制的需求就大大降低了。虚拟线程的优势在于其数量庞大且创建成本极低,这使得“为每个子任务分配一个虚拟线程”成为一种可行的策略。

因此,即使不能直接使用RecursiveAction/RecursiveTask,我们仍然可以轻松地使用其他并发工具在虚拟线程上实现递归任务。

立即学习Java免费学习笔记(深入)”;

3. 基于 CompletableFuture 的自定义递归任务实现

CompletableFuture是Java中处理异步操作的强大工具,它与虚拟线程结合可以非常优雅地实现递归任务。以下是一个示例,展示了如何创建一个类似RecursiveTask的结构,但在虚拟线程上执行:

百度虚拟主播
百度虚拟主播

百度智能云平台的一站式、灵活化的虚拟主播直播解决方案

百度虚拟主播 36
查看详情 百度虚拟主播
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;

public class VirtualThreadRecursiveTask {

    // 示例1:基本递归任务,每个子任务启动一个虚拟线程
    record PseudoTask(int from, int to) {
        public static CompletableFuture<Void> run(int from, int to) {
            // 使用CompletableFuture.runAsync在虚拟线程上执行任务
            return CompletableFuture.runAsync(
                new PseudoTask(from, to)::compute, Thread::startVirtualThread);
        }

        protected void compute() {
            int mid = (from + to) >>> 1; // 计算中间点
            if (mid == from) {
                // 模拟实际处理,可能包含阻塞操作
                System.out.println(Thread.currentThread().getName() + ": Processing range [" + from + ", " + to + "]");
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50)); // 模拟耗时操作
            } else {
                // 递归地创建并运行子任务
                CompletableFuture<Void> sub1 = run(from, mid);
                CompletableFuture<Void> sub2 = run(mid, to);
                sub1.join(); // 等待子任务完成
                sub2.join(); // 等待子任务完成
            }
        }
    }

    // 示例2:优化后的递归任务,减少虚拟线程创建数量
    record OptimizedPseudoTask(int from, int to) {
        public static CompletableFuture<Void> run(int from, int to) {
            return CompletableFuture.runAsync(
                new OptimizedPseudoTask(from, to)::compute, Thread::startVirtualThread);
        }

        protected void compute() {
            CompletableFuture<Void> pendingFutures = null;
            // 循环处理一部分任务,另一部分提交给新线程
            for (int currentFrom = this.from; ; ) {
                int mid = (currentFrom + to) >>> 1;
                if (mid == currentFrom) {
                    // 模拟实际处理
                    System.out.println(Thread.currentThread().getName() + ": Processing range [" + currentFrom + ", " + to + "]");
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
                    break;
                } else {
                    // 提交一个子任务到新的虚拟线程
                    CompletableFuture<Void> sub = run(currentFrom, mid);
                    if (pendingFutures == null) {
                        pendingFutures = sub;
                    } else {
                        pendingFutures = CompletableFuture.allOf(pendingFutures, sub);
                    }
                    currentFrom = mid; // 当前线程处理另一半
                }
            }
            if (pendingFutures != null) {
                pendingFutures.join(); // 等待所有提交的子任务完成
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        System.out.println("--- Running Basic PseudoTask ---");
        long startTime = System.currentTimeMillis();
        PseudoTask.run(0, 1_000).join(); // 执行任务并等待完成
        long endTime = System.currentTimeMillis();
        System.out.println("Basic PseudoTask completed in " + (endTime - startTime) + " ms");

        System.out.println("\n--- Running Optimized PseudoTask ---");
        startTime = System.currentTimeMillis();
        OptimizedPseudoTask.run(0, 1_000_000).join(); // 执行优化后的任务
        endTime = System.currentTimeMillis();
        System.out.println("Optimized PseudoTask completed in " + (endTime - startTime) + " ms");
    }
}
登录后复制

代码解析与注意事项:

  1. CompletableFuture.runAsync(task, Thread::startVirtualThread): 这是核心。runAsync方法接受一个Runnable和一个Executor。通过提供Thread::startVirtualThread作为Executor,我们指示CompletableFuture在每次创建新任务时都启动一个新的虚拟线程来执行它。
  2. join()方法的阻塞性: 在上述示例中,sub1.join()和sub2.join()是阻塞调用。在平台线程中,这种阻塞会导致线程池饥饿。但在虚拟线程中,join()的阻塞并不会阻塞底层平台线程,因为虚拟线程在等待时会被卸载,允许平台线程执行其他虚拟线程。因此,即使存在阻塞join(),性能影响也远小于传统线程。
  3. 虚拟线程数量: 第一个PseudoTask示例会为每个子任务都创建一个新的虚拟线程。对于run(0, 1_000)这样的范围,可能会创建接近2000个虚拟线程。虽然虚拟线程非常轻量,但创建过多仍然会带来一些开销。
  4. 优化策略: 第二个OptimizedPseudoTask示例展示了一种优化方法:当前线程处理一部分任务(通过循环迭代),而只将另一部分任务提交到新的虚拟线程。这种策略可以显著减少虚拟线程的创建数量。例如,对于run(0, 1_000_000),它可能只会创建大约100万个虚拟线程,而不是200万个,这在某些场景下可以带来性能提升。这种优化与ForkJoinPool中工作窃取机制的某些思想有异曲同工之处,但这里是为了管理虚拟线程的创建数量,而非平台线程的负载。

4. 未来的选择:StructuredTaskScope

Java的孵化模块中引入了StructuredTaskScope,它提供了一种更结构化的并发编程模型,尤其适用于虚拟线程。StructuredTaskScope允许在一个明确的父子关系中管理并发任务,确保所有子任务在父任务完成前结束,并能更好地处理错误和取消。

import jdk.incubator.concurrent.StructuredTaskScope;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class StructuredVirtualThreadTask {

    // 示例:使用StructuredTaskScope实现递归任务
    record PseudoTask(int from, int to) {
        public static void run(int from, int to) {
            // 使用ShutdownOnFailure策略,任何子任务失败都会关闭整个作用域
            try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
                new PseudoTask(from, to).compute(scope);
                scope.join(); // 等待所有子任务完成
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException("Task interrupted", e);
            }
        }

        protected Void compute(StructuredTaskScope<Object> scope) {
            for (int currentFrom = this.from; ; ) {
                int mid = (currentFrom + to) >>> 1;
                if (mid == currentFrom) {
                    // 模拟实际处理
                    System.out.println(Thread.currentThread().getName() + ": Processing range [" + currentFrom + ", " + to + "]");
                    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(50));
                    break;
                } else {
                    // 使用scope.fork()在虚拟线程中启动子任务
                    var sub = new PseudoTask(currentFrom, mid);
                    scope.fork(() -> sub.compute(scope));
                    currentFrom = mid; // 当前线程处理另一半
                }
            }
            return null;
        }
    }

    public static void main(String[] args) {
        System.out.println("--- Running StructuredTaskScope PseudoTask ---");
        long startTime = System.currentTimeMillis();
        PseudoTask.run(0, 1_000_000); // 执行任务
        long endTime = System.currentTimeMillis();
        System.out.println("StructuredTaskScope PseudoTask completed in " + (endTime - startTime) + " ms");
    }
}
登录后复制

代码解析与注意事项:

  1. StructuredTaskScope: 这是一个孵化中的API,使用时需要特定的JVM参数(如--enable-preview和--add-modules jdk.incubator.concurrent)。
  2. ShutdownOnFailure: 这是一个StructuredTaskScope的实现,表示如果任何一个子任务失败,整个作用域都会被关闭,所有其他正在运行的子任务都会被取消。
  3. scope.fork(() -> sub.compute(scope)): fork方法在作用域内启动一个新的虚拟线程来执行提供的任务。
  4. scope.join(): 父任务通过join()方法等待作用域内所有子任务完成。与CompletableFuture.join()不同,这里是在作用域级别进行等待,而不是单个子任务。
  5. 结构化并发: StructuredTaskScope提供了更强的结构保证,例如,子任务的生命周期被限定在父任务的生命周期内,这有助于避免资源泄露和更好地管理错误。
  6. 孵化状态: 由于StructuredTaskScope目前仍处于孵化阶段,其API和行为可能会在未来的Java版本中发生变化,不建议在生产环境中使用。然而,它代表了Java并发模型的一个重要发展方向。

总结

尽管RecursiveAction和RecursiveTask无法直接与虚拟线程配合使用,但这并不意味着我们无法在虚拟线程上实现高效的递归任务。通过利用CompletableFuture的异步特性,并结合Thread::startVirtualThread来创建虚拟线程,我们可以轻松构建自定义的递归任务处理机制。此外,StructuredTaskScope作为一项新兴的结构化并发特性,为未来在虚拟线程上管理复杂任务提供了更优雅、更健壮的解决方案。在设计虚拟线程应用程序时,理解这些替代方案及其优势,将有助于我们充分利用虚拟线程的强大功能。

以上就是Java虚拟线程与RecursiveAction/Task:兼容性与替代方案的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号