0

0

Java ExecutorService:限制并发线程数量的实践指南

心靈之曲

心靈之曲

发布时间:2025-11-29 18:39:01

|

408人浏览过

|

来源于php中文网

原创

Java ExecutorService:限制并发线程数量的实践指南

本教程旨在详细阐述如何在java中利用`executorservice`框架,特别是`executors.newfixedthreadpool`方法,来精确控制并发执行的线程数量。文章将通过一个实际的文件序列化任务示例,指导读者如何定义可执行任务(`runnable`),配置固定大小的线程池,并实现任务的提交与服务的优雅关闭,确保多线程操作在预设的并发上限内高效、稳定地运行。

在现代Java应用开发中,多线程编程是提升程序性能和响应能力的关键技术。然而,不受控制的线程创建和执行可能导致系统资源耗尽、性能下降甚至程序崩溃。特别是在处理I/O密集型任务(如文件读写、网络请求)时,限制并发线程的数量至关重要。Java 5引入的java.util.concurrent包,尤其是Executors框架,为我们提供了强大而灵活的工具来管理线程池,从而实现对并发度的精确控制。

1. 理解并发控制的需求

假设我们有一个List对象列表,需要对列表中的每个元素执行一个序列化操作,将其写入到文件中。如果列表非常大,为每个元素都创建一个新线程会带来巨大的开销。更重要的是,过多的并发文件写入操作可能会导致磁盘I/O瓶颈或操作系统资源限制。因此,我们需要一种机制来限制同时运行的序列化线程数量,例如,只允许最多3个线程同时执行。

原始的序列化任务方法如下:

public void serializeDestinationEmploye(EventuelleDestination e) {
    Gson gson = new Gson();
    String filename = "/" + employeDao.getEmploye().getId() + "_" + entrepriseDao.retrouveEmplacementIdParDepartementId(e.getEventuelAcceuillant().getId()) + "_" + e.getEventuelAcceuillant().getId() + ".json";

    try (Writer writer = new FileWriter(dossierSoumissions.toString() + filename)) {
        gson.toJson(e, writer);
        System.out.println(e + " has been serialized...");
    } catch (IOException fileNotFoundException) {
        fileNotFoundException.printStackTrace();
    }
}

2. Java Executors 框架概览

Executors框架是Java中用于管理线程的强大工具集,它提供了一系列工厂方法来创建不同类型的ExecutorService。ExecutorService是管理和执行Runnable或Callable任务的核心接口。

立即学习Java免费学习笔记(深入)”;

要限制并发线程数量,最常用的方法是使用固定大小的线程池,这可以通过Executors.newFixedThreadPool(int nThreads)方法实现。该方法会创建一个线程池,其中包含固定数量的线程。当有新任务提交时,如果线程池中的所有线程都在忙碌,任务将被放入一个等待队列,直到有线程空闲。

3. 定义可执行任务:Runnable

在使用ExecutorService之前,我们需要将要执行的并发逻辑封装成一个任务。在Java中,任务通常通过实现Runnable接口或Callable接口来定义。对于不返回结果的简单任务,Runnable是更合适的选择。

Napkin AI
Napkin AI

Napkin AI 可以将您的文本转换为图表、流程图、信息图、思维导图视觉效果,以便快速有效地分享您的想法。

下载

我们将上述的serializeDestinationEmploye方法封装到一个实现Runnable接口的类中。为了演示,我们创建一个SerializationTask类:

package com.example.serialization;

import com.google.gson.Gson; // 假设您使用了Gson库
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.file.Path;
import java.time.Instant;

public class SerializationTask implements Runnable {
    private EventuelleDestination destination;
    private Path outputDirectory;
    // 假设 employeDao 和 entrepriseDao 是可用的,或者通过构造函数注入
    // 简化处理,这里直接使用模拟数据或假设它们已初始化
    private String employeId = "emp123"; 
    private String emplacementId = "loc456";

    public SerializationTask(EventuelleDestination destination, Path outputDirectory) {
        this.destination = destination;
        this.outputDirectory = outputDirectory;
    }

    @Override
    public void run() {
        // 模拟原始的序列化逻辑
        Gson gson = new Gson();
        // 实际应用中,employeDao和entrepriseDao应通过依赖注入或传递获取
        String filename = "/" + employeId + "_" + emplacementId + "_" + destination.getId() + ".json";

        try (Writer writer = new FileWriter(outputDirectory.resolve(filename).toString())) {
            gson.toJson(destination, writer);
            System.out.println(Thread.currentThread().getName() + " reporting: " + destination + " has been serialized at " + Instant.now());
        } catch (IOException e) {
            System.err.println("Error serializing " + destination + ": " + e.getMessage());
            e.printStackTrace();
        }
    }

    // 假设 EventuelleDestination 是一个简单的POJO,这里仅为示例提供一个骨架
    public static class EventuelleDestination {
        private String id;
        private String name;

        public EventuelleDestination(String id, String name) {
            this.id = id;
            this.name = name;
        }

        public String getId() { return id; }
        public String getName() { return name; }

        @Override
        public String toString() {
            return "EventuelleDestination{" + "id='" + id + '\'' + ", name='" + name + '\'' + '}';
        }
    }
}

注意:

  • EventuelleDestination、employeDao和entrepriseDao在实际项目中需要有具体的实现。这里为了示例简化了部分依赖。
  • 在run()方法中,我们添加了Thread.currentThread().getName()和Instant.now(),这有助于在日志中追踪哪个线程在何时执行了哪个任务,尤其是在调试并发问题时非常有用。

4. 使用 ExecutorService 实现并发限制

现在,我们来创建并管理ExecutorService,以限制并发线程数量为3。

package com.example.serialization;

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class SerializationApp {

    public static void main(String[] args) {
        SerializationApp app = new SerializationApp();
        app.demoSerialization();
    }

    private void demoSerialization() {
        // 1. 准备数据和输出目录
        Path outputDir = Paths.get("serialized_data");
        try {
            Files.createDirectories(outputDir); // 确保输出目录存在
        } catch (IOException e) {
            System.err.println("Failed to create output directory: " + e.getMessage());
            return;
        }

        List destinations = 
                IntStream.range(1, 20) // 假设有20个待序列化的目的地
                        .mapToObj(i -> new SerializationTask.EventuelleDestination("dest" + i, "Location " + i))
                        .toList();

        // 2. 创建任务列表
        List tasks = new ArrayList<>();
        for (SerializationTask.EventuelleDestination dest : destinations) {
            tasks.add(new SerializationTask(dest, outputDir));
        }

        // 3. 创建固定大小的线程池,限制并发数为3
        // Executors.newFixedThreadPool(3) 将创建一个维护3个线程的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3); 
        System.out.println("ExecutorService created with 3 fixed threads.");

        // 4. 提交所有任务到线程池
        tasks.forEach(executorService::submit);
        System.out.println("All tasks submitted. Waiting for termination...");

        // 5. 优雅地关闭 ExecutorService
        shutdownAndAwaitTermination(executorService);
        System.out.println("All tasks completed and ExecutorService shut down.");
    }

    /**
     * 优雅地关闭 ExecutorService,等待所有任务完成。
     * 这是一个来自 Javadoc 的标准模板,略有修改。
     *
     * @param executorService 要关闭的 ExecutorService
     */
    void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown(); // 禁用新任务的提交
        try {
            // 等待现有任务在指定时间内终止
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow(); // 取消当前正在执行的任务
                // 再次等待任务响应取消
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Executor service did not terminate. " + Instant.now());
                }
            }
        } catch (InterruptedException ex) {
            // (重新)取消,如果当前线程也被中断
            executorService.shutdownNow();
            // 保留中断状态
            Thread.currentThread().interrupt();
        }
    }
}

5. 运行结果示例

当运行上述SerializationApp时,你会看到类似以下的输出(具体顺序和时间戳会因系统而异):

ExecutorService created with 3 fixed threads.
All tasks submitted. Waiting for termination...
pool-1-thread-1 reporting: EventuelleDestination{id='dest1', name='Location 1'} has been serialized at 2023-10-27T08:00:01.123Z
pool-1-thread-2 reporting: EventuelleDestination{id='dest2', name='Location 2'} has been serialized at 2023-10-27T08:00:01.125Z
pool-1-thread-3 reporting: EventuelleDestination{id='dest3', name='Location 3'} has been serialized at 2023-10-27T08:00:01.128Z
pool-1-thread-1 reporting: EventuelleDestination{id='dest4', name='Location 4'} has been serialized at 2023-10-27T08:00:01.150Z
pool-1-thread-2 reporting: EventuelleDestination{id='dest5', name='Location 5'} has been serialized at 2023-10-27T08:00:01.152Z
pool-1-thread-3 reporting: EventuelleDestination{id='dest6', name='Location 6'} has been serialized at 2023-10-27T08:00:01.155Z
... (输出会继续,但你会发现同时执行的线程名只有 pool-1-thread-1, -2, -3)
All tasks completed and ExecutorService shut down.

从输出中可以清楚地看到,尽管我们提交了20个任务,但实际执行任务的线程始终是pool-1-thread-1、pool-1-thread-2和pool-1-thread-3这三个线程,这正是Executors.newFixedThreadPool(3)所实现的效果。

6. 注意事项

  • 输出顺序与执行顺序:System.out.println的输出并不总是按照任务完成的严格时间顺序出现。这是因为不同的线程可能会在不同的时间点将内容写入标准输出流,并且操作系统的调度和缓冲区机制会影响最终在控制台上的显示顺序。如果需要精确的时间戳,务必在日志中包含Instant.now()或其他时间信息。
  • 优雅关闭:shutdownAndAwaitTermination方法是关闭ExecutorService的最佳实践。shutdown()会阻止新任务的提交,但允许已提交的任务继续执行。awaitTermination()则会阻塞当前线程,直到所有任务完成或超时。如果超时,shutdownNow()会尝试中断所有正在执行的任务。正确地关闭线程池可以防止资源泄露和程序挂起。
  • Runnable vs. Callable:如果你的任务需要返回一个结果或者抛出检查异常,那么应该使用Callable接口而不是Runnable。Callable配合Future可以获取任务的执行结果或捕获异常。
  • 异常处理:在Runnable的run()方法中,任何未捕获的运行时异常都将导致执行该任务的线程终止。为了确保程序的健壮性,务必在任务内部捕获并处理可能发生的异常。

总结

通过Java Executors框架,特别是Executors.newFixedThreadPool()方法,我们可以轻松地实现对并发线程数量的精确控制。这不仅简化了多线程编程的复杂性,还有助于优化资源利用,提高应用程序的稳定性和性能。理解如何定义任务、创建线程池以及优雅地关闭服务,是每个Java开发者掌握并发编程的关键技能。

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

832

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

738

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

734

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

397

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

398

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

446

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

430

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16925

2023.08.03

Golang gRPC 服务开发与Protobuf实战
Golang gRPC 服务开发与Protobuf实战

本专题系统讲解 Golang 在 gRPC 服务开发中的完整实践,涵盖 Protobuf 定义与代码生成、gRPC 服务端与客户端实现、流式 RPC(Unary/Server/Client/Bidirectional)、错误处理、拦截器、中间件以及与 HTTP/REST 的对接方案。通过实际案例,帮助学习者掌握 使用 Go 构建高性能、强类型、可扩展的 RPC 服务体系,适用于微服务与内部系统通信场景。

8

2026.01.15

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.5万人学习

C# 教程
C# 教程

共94课时 | 6.8万人学习

Java 教程
Java 教程

共578课时 | 46.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号