首页 > Java > java教程 > 正文

使用并行流并发处理共享列表并收集结果

DDD
发布: 2025-09-09 22:39:01
原创
381人浏览过

使用并行流并发处理共享列表并收集结果

本文将探讨如何高效地并发处理共享列表,并收集处理结果。在处理大量数据时,将任务分解为多个子任务并行执行可以显著提高效率。Java 8引入的并行流(Parallel Streams)为我们提供了一种简洁而强大的方式来实现这一目标。

并行流简介

并行流是Java 8 Stream API的一个特性,它允许你以声明式的方式并行处理集合数据。与传统的顺序流不同,并行流会将数据分割成多个块,并在多个线程上同时处理这些块。这使得我们可以充分利用多核处理器的优势,从而加速数据处理过程。

使用并行流处理子列表

假设我们有一个大型列表,需要将其分割成多个子列表,并对每个子列表执行耗时的handle()操作。以下代码展示了如何使用并行流来实现这一目标:

import java.util.List;
import java.util.stream.Collectors;

class Foo {
    private int len;

    public Foo(int len) {
        this.len = len;
    }

    public void process(List<Bar> list) {
        int start = 0;
        while (start < list.size()) {
            int end = Math.min(start + len, list.size());
            List<Bar> sublist = list.subList(start, end);
            processSublist(sublist);
            start = end;
        }
    }

    private void processSublist(List<Bar> sublist) {
        // 使用并行流处理子列表
        sublist.parallelStream()
               .forEach(this::handle);
    }

    private void handle(Bar bar) {
        // 耗时的处理逻辑
        // 例如:bar.doSomething();
        try {
            Thread.sleep(10); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

class Bar {
    // Bar 类的定义
}
登录后复制

在这个例子中,processSublist()方法接收一个子列表,并使用parallelStream()方法将其转换为并行流。然后,forEach()方法将对流中的每个元素(Bar对象)调用handle()方法。由于使用了并行流,handle()方法将会在多个线程上同时执行,从而加速整个处理过程。

收集处理结果

如果handle()方法返回一个结果,并且我们需要将所有结果收集到一个列表中,可以使用map()和collect()方法:

import java.util.List;
import java.util.stream.Collectors;

class Foo {
    private int len;

    public Foo(int len) {
        this.len = len;
    }

    public void process(List<Bar> list) {
        int start = 0;
        while (start < list.size()) {
            int end = Math.min(start + len, list.size());
            List<Bar> sublist = list.subList(start, end);
            processSublist(sublist);
            start = end;
        }
    }

    private void processSublist(List<Bar> sublist) {
        // 使用并行流处理子列表并收集结果
        List<Result> results = sublist.parallelStream()
                .map(this::handle)
                .collect(Collectors.toList());

        // 处理结果列表
        // 例如:results.forEach(result -> System.out.println(result.getValue()));
    }

    private Result handle(Bar bar) {
        // 耗时的处理逻辑,返回一个结果
        // 例如:return new Result(bar.getValue() * 2);
        try {
            Thread.sleep(10); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Result(1); // 示例返回值
    }
}

class Bar {
    // Bar 类的定义
}

class Result {
    private int value;

    public Result(int value) {
        this.value = value;
    }

    public int getValue() {
        return value;
    }
}
登录后复制

在这个例子中,map()方法将对流中的每个Bar对象调用handle()方法,并将返回的结果(Result对象)转换为一个新的流。然后,collect(Collectors.toList())方法将收集这个流中的所有结果,并将它们存储到一个新的List<Result>中。

表单大师AI
表单大师AI

一款基于自然语言处理技术的智能在线表单创建工具,可以帮助用户快速、高效地生成各类专业表单。

表单大师AI 74
查看详情 表单大师AI

同步共享资源

在使用并行流时,需要特别注意同步共享资源。如果handle()方法访问或修改了共享变量,必须使用适当的同步机制(例如,synchronized关键字或java.util.concurrent包中的类)来确保线程安全。否则,可能会导致数据竞争、死锁或其他并发问题。

例如,如果handle()方法需要更新一个共享计数器,可以使用AtomicInteger类来实现线程安全的计数:

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

class Foo {
    private int len;
    private AtomicInteger counter = new AtomicInteger(0);

    public Foo(int len) {
        this.len = len;
    }

    public void process(List<Bar> list) {
        int start = 0;
        while (start < list.size()) {
            int end = Math.min(start + len, list.size());
            List<Bar> sublist = list.subList(start, end);
            processSublist(sublist);
            start = end;
        }
    }

    private void processSublist(List<Bar> sublist) {
        // 使用并行流处理子列表
        sublist.parallelStream()
                .forEach(this::handle);
    }

    private void handle(Bar bar) {
        // 耗时的处理逻辑,更新共享计数器
        counter.incrementAndGet();
        try {
            Thread.sleep(10); // 模拟耗时操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getCounter() {
        return counter.get();
    }
}

class Bar {
    // Bar 类的定义
}
登录后复制

在这个例子中,AtomicInteger counter是一个线程安全的计数器。handle()方法使用counter.incrementAndGet()方法来原子地增加计数器的值。这确保了即使在多个线程同时执行handle()方法时,计数器的值也能正确更新。

注意事项

  • 并行流的性能优势只有在处理大量数据且handle()方法耗时较长时才能体现出来。对于小数据集或简单的handle()方法,使用顺序流可能更有效率。
  • 过度使用并行流可能会导致线程上下文切换的开销增加,从而降低性能。
  • 在使用并行流时,应该仔细考虑线程安全问题,并使用适当的同步机制来保护共享资源。

总结

Java的并行流为我们提供了一种方便而强大的方式来并发处理集合数据。通过将列表分割成多个子列表,并使用parallelStream()方法,可以充分利用多核处理器的优势,显著提升处理效率。然而,在使用并行流时,需要特别注意同步共享资源,并仔细评估其性能影响。在合适的场景下,并行流可以极大地提高数据处理的速度和效率。

以上就是使用并行流并发处理共享列表并收集结果的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号