首页 > Java > java教程 > 正文

Java并发编程:使用ExecutorService限制并发线程数量

碧海醫心
发布: 2025-11-29 19:26:22
原创
994人浏览过

java并发编程:使用executorservice限制并发线程数量

本教程详细介绍了如何在Java应用程序中利用`Executors`框架来限制并发执行的线程数量。通过创建固定大小的线程池(`FixedThreadPool`),您可以有效地管理任务的并行执行,避免资源过度消耗。文章将涵盖任务的定义(`Runnable`或`Callable`)、线程池的创建与任务提交,以及确保线程池优雅关闭的关键步骤,并提供实际代码示例。

1. 理解并发任务与线程池的必要性

在开发高性能的Java应用程序时,经常会遇到需要同时处理大量独立任务的场景,例如批量数据处理、文件I/O操作或网络请求。直接为每个任务创建一个新线程虽然简单,但会导致线程数量失控,进而引发系统资源耗尽(如内存溢出)、上下文切换开销增大,甚至程序崩溃。

Java 5引入的Executors框架提供了一种更高级、更健壮的方式来管理线程,即通过线程池(ThreadPoolExecutor)。线程池允许我们预先创建一组线程,并重复利用这些线程来执行任务,从而有效限制并发量,提高资源利用率和系统稳定性。

2. 定义并发任务:Runnable与Callable

在Java中,并发任务通常通过实现Runnable或Callable接口来定义。

立即学习Java免费学习笔记(深入)”;

  • Runnable: 适用于不需要返回结果的任务。它的run()方法不接受参数,也不返回任何值。
  • Callable: 适用于需要返回结果或可能抛出异常的任务。它的call()方法可以返回一个泛型结果,并声明抛出异常。

为了演示,我们假设有一个数据序列化任务。我们将把原始问题中的serializeDestinationEmploye方法封装到一个Runnable任务中。

import com.google.gson.Gson;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.file.Path; // 假设有一个路径来存储文件

// 假设 EventuelleDestination, EmployeDao, EntrepriseDao 是已定义的类
// 这里为了简化,我们只关注序列化逻辑
class EventuelleDestination {
    private String id; // 假设有一个ID
    private String name; // 假设有一个名称
    // ... 其他属性和getter/setter
    public EventuelleDestination(String id, String name) {
        this.id = id;
        this.name = name;
    }
    public String getId() { return id; }
    public String getName() { return name; }
    @Override
    public String toString() { return "EventuelleDestination{" + "id='" + id + '\'' + ", name='" + name + '\'' + '}'; }
}

class SerializationTask implements Runnable {
    private final EventuelleDestination destination;
    private final Path outputDirectory; // 输出目录

    public SerializationTask(EventuelleDestination destination, Path outputDirectory) {
        this.destination = destination;
        this.outputDirectory = outputDirectory;
    }

    @Override
    public void run() {
        // 模拟原始的序列化逻辑
        // 实际应用中,employeDao和entrepriseDao应通过依赖注入或其他方式获取
        // 这里为了演示,我们简化文件名生成
        String filename = "/" + destination.getId() + "_" + destination.getName() + ".json";

        try (Writer writer = new FileWriter(outputDirectory.resolve(filename).toFile())) {
            new Gson().toJson(destination, writer);
            System.out.println(Thread.currentThread().getName() + " - " + destination + " has been serialized...");
        } catch (IOException e) {
            System.err.println(Thread.currentThread().getName() + " - Error serializing " + destination + ": " + e.getMessage());
            e.printStackTrace();
        }
    }
}
登录后复制

在上述代码中,SerializationTask实现了Runnable接口,其run方法包含了对单个EventuelleDestination对象进行JSON序列化的逻辑。通过在输出中包含Thread.currentThread().getName(),我们可以观察到是哪个线程执行了任务。

炫彩电子商务平台源码
炫彩电子商务平台源码

炫彩电子商务系统由郑州炫彩网络科技有限公司完全自主开发,使用世界上最流行高效的PHP程序语言,并用小巧的MySql作为数据库服务器,并且使用Smarty引擎来分离网站程序与前端设计代码,让建立的网站可以自由制作个性化的页面。

炫彩电子商务平台源码 153
查看详情 炫彩电子商务平台源码

3. 创建并管理固定大小的线程池

为了限制并发线程的数量,Java的Executors框架提供了Executors.newFixedThreadPool(int nThreads)方法。这个方法会创建一个拥有固定线程数量的ExecutorService。当提交的任务多于线程池中的线程数时,多余的任务会在队列中等待,直到有空闲线程可用。

以下是如何创建固定线程池并提交任务的示例:

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ThreadPoolDemo {

    public static void main(String[] args) {
        ThreadPoolDemo demo = new ThreadPoolDemo();
        demo.runSerializationDemo();
    }

    private void runSerializationDemo() {
        // 1. 准备任务数据
        List<EventuelleDestination> destinations = new ArrayList<>();
        for (int i = 0; i < 20; i++) {
            destinations.add(new EventuelleDestination("ID_" + i, "Location_" + i));
        }

        // 2. 指定输出目录
        Path outputDir = Paths.get("serialized_data");
        try {
            Files.createDirectories(outputDir); // 确保目录存在
        } catch (IOException e) {
            System.err.println("Failed to create output directory: " + e.getMessage());
            return;
        }

        // 3. 创建固定大小的线程池,限制同时运行3个线程
        // 这里的3是根据原始问题中的需求设定的
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        System.out.println("ExecutorService created with 3 threads.");

        // 4. 提交所有任务到线程池
        for (EventuelleDestination dest : destinations) {
            executorService.submit(new SerializationTask(dest, outputDir));
        }

        // 5. 优雅地关闭线程池
        shutdownAndAwaitTermination(executorService);
        System.out.println("All serialization tasks completed.");
    }

    /**
     * 优雅地关闭ExecutorService,等待所有任务完成。
     * 这是来自Javadoc的推荐模式。
     * @param executorService 要关闭的ExecutorService
     */
    void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown(); // 禁用新任务提交

        try {
            // 等待现有任务在指定时间内终止
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow(); // 如果超时,取消当前正在执行的任务
                // 再次等待,确保任务对取消做出响应
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("ExecutorService did not terminate.");
                }
            }
        } catch (InterruptedException ie) {
            // (重新)取消如果当前线程也被中断
            executorService.shutdownNow();
            // 保留中断状态
            Thread.currentThread().interrupt();
        }
    }
}
登录后复制

在runSerializationDemo方法中:

  1. 我们创建了一个EventuelleDestination对象列表,模拟待处理的数据。
  2. 指定了一个用于存储序列化文件的输出目录。
  3. 通过Executors.newFixedThreadPool(3)创建了一个只包含3个线程的线程池。这意味着无论我们提交多少任务,最多只有3个任务会同时运行。
  4. 将所有SerializationTask实例提交给executorService。submit()方法会将任务放入线程池的内部队列,并由线程池中的空闲线程按序执行。
  5. 最后,调用shutdownAndAwaitTermination方法来确保线程池被正确关闭。

4. 优雅地关闭ExecutorService

正确关闭ExecutorService至关重要,以防止资源泄露或程序无法退出。ExecutorService提供了两种关闭机制:

  • shutdown(): 启动有序关闭,不再接受新任务,但会完成所有已提交的任务(包括正在执行的和在队列中等待的)。
  • shutdownNow(): 尝试立即停止所有正在执行的任务,并清空任务队列中所有未开始的任务。它会返回一个未执行任务的列表。

通常,最佳实践是先调用shutdown(),然后使用awaitTermination()等待一段时间,让已提交的任务有机会完成。如果超时仍未完成,则可以调用shutdownNow()强制关闭。上述shutdownAndAwaitTermination方法演示了这种优雅关闭的模式。

5. 注意事项与最佳实践

  • 线程池大小的考量: 固定线程池的大小应根据应用程序的特性和系统资源来决定。
    • CPU密集型任务: 线程数通常设置为CPU核心数或核心数+1,以避免过多的上下文切换。
    • I/O密集型任务: 线程数可以适当调大,因为线程在等待I/O操作时不会占用CPU,其他线程可以利用CPU。一个常见的经验法则是 CPU核心数 * (1 + 等待时间/计算时间)。
  • 异常处理: 在Runnable或Callable的run()/call()方法中,务必包含健壮的异常处理逻辑,以防止单个任务的失败导致整个线程崩溃或影响其他任务。
  • 任务结果处理(针对Callable): 如果使用Callable,executorService.submit()会返回一个Future对象。您可以使用Future.get()方法来获取任务的执行结果,或者检查任务是否完成以及是否抛出异常。
  • 日志与监控: 在多线程环境中,日志输出的顺序可能与实际执行顺序不一致。在日志中包含线程名称或时间戳(如Instant.now())有助于调试和分析。同时,监控线程池的状态(如活动线程数、队列大小)对于诊断性能问题至关重要。
  • 避免无限期等待: 在awaitTermination中使用超时时间是防止程序无限期阻塞的关键。

总结

通过Java的Executors框架,特别是Executors.newFixedThreadPool(),我们可以轻松地实现对并发线程数量的精确控制。这种方法不仅简化了多线程编程,还提高了应用程序的健壮性、资源利用率和可维护性。理解任务的定义、线程池的生命周期管理以及优雅关闭机制,是编写高效、稳定并发程序的基石。

以上就是Java并发编程:使用ExecutorService限制并发线程数量的详细内容,更多请关注php中文网其它相关文章!

编程速学教程(入门课程)
编程速学教程(入门课程)

编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号