0

0

Java并发编程:使用ExecutorService限制线程数量的教程

花韻仙語

花韻仙語

发布时间:2025-11-29 18:59:00

|

716人浏览过

|

来源于php中文网

原创

Java并发编程:使用ExecutorService限制线程数量的教程

本教程详细介绍了如何在java中利用`executors`框架和`executorservice`来限制并发执行的线程数量。通过将任务封装为`runnable`,并使用`executors.newfixedthreadpool()`创建固定大小的线程池,可以有效地管理资源并控制并发级别。文章还涵盖了任务提交、线程池的优雅关闭机制以及相关的最佳实践,旨在提供一个清晰、专业的并发编程指南。

引言

在Java应用程序开发中,面对需要并行处理大量任务的场景时,合理地管理并发线程至关重要。直接创建无限数量的线程可能导致系统资源耗尽、性能下降甚至程序崩溃。为了解决这一问题,Java 5引入了java.util.concurrent包,其中的Executors框架为我们提供了一套强大的工具来管理线程池,从而有效地限制和控制并发线程的数量。本教程将指导您如何使用ExecutorService来创建一个固定大小的线程池,以处理并发任务,并确保资源的有效利用。

核心概念:Executors框架与ExecutorService

Executors框架是Java并发编程的核心组件之一,它提供了一系列工厂方法来创建不同类型的ExecutorService实例。ExecutorService是一个高级接口,用于管理线程的生命周期和任务的提交。通过使用线程池,我们可以重用线程,而不是为每个任务都创建新线程,这大大降低了线程创建和销毁的开销。

为了限制并发线程的数量,最常用的方法是使用Executors.newFixedThreadPool(int nThreads)方法。这个方法会创建一个固定大小的线程池,该线程池中的线程数量始终保持不变。当有新任务提交时,如果池中所有线程都在忙碌,那么新任务将被放入一个等待队列中,直到有空闲线程可用。

定义并发任务:Runnable接口

在使用ExecutorService之前,我们需要将要并行执行的逻辑封装成一个任务。Java提供了两个核心接口来定义任务:Runnable和Callable。

立即学习Java免费学习笔记(深入)”;

  • Runnable: 适用于不需要返回结果且不抛出受检异常的任务。它只包含一个run()方法。
  • Callable: 适用于需要返回结果且可能抛出受检异常的任务。它包含一个call()方法,并返回一个Future对象。

在本教程中,我们将以一个文件序列化任务为例,使用Runnable接口来定义任务。假设我们有一个EventuelleDestination对象列表,需要为每个对象执行序列化操作,并将结果写入文件。

首先,定义一个EventuelleDestination及其相关依赖的模拟类,以便构建完整的示例:

方科网络ERP图文店
方科网络ERP图文店

方科网络ERP图文店II版为仿代码站独立研发的网络版ERP销售程序。本本版本为方科网络ERP图文店版的简化版,去除了部分不同用的功能,使得系统更加精炼实用。考虑到图文店的特殊情况,本系统并未制作出入库功能,而是将销售作为重头,使用本系统,可以有效解决大型图文店员工多,换班数量多,订单混杂不清的情况。下单、取件、结算分别记录操作人员,真正做到订单全程跟踪!无限用户级别,不同的用户级别可以设置不同的价

下载
// EventuelleDestination.java - 模拟业务对象
package com.example.concurrency;

import java.util.Objects;

public class EventuelleDestination {
    private int id;
    private Acceuillant eventuelAcceuillant;

    public EventuelleDestination(int id, Acceuillant acceuillant) {
        this.id = id;
        this.eventuelAcceuillant = acceuillant;
    }

    public int getId() { return id; }
    public Acceuillant getEventuelAcceuillant() { return eventuelAcceuillant; }

    @Override
    public String toString() {
        return "EventuelleDestination{" + "id=" + id + ", acceuillantId=" + eventuelAcceuillant.getId() + '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        EventuelleDestination that = (EventuelleDestination) o;
        return id == that.id && Objects.equals(eventuelAcceuillant, that.eventuelAcceuillant);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, eventuelAcceuillant);
    }
}

// Acceuillant.java - 模拟嵌套对象
package com.example.concurrency;

import java.util.Objects;

public class Acceuillant {
    private int id;

    public Acceuillant(int id) { this.id = id; }
    public int getId() { return id; }

    @Override
    public String toString() {
        return "Acceuillant{" + "id=" + id + '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Acceuillant that = (Acceuillant) o;
        return id == that.id;
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }
}

// EmployeDao.java - 模拟数据访问层
package com.example.concurrency;

public class EmployeDao {
    public Employe getEmploye() { return new Employe(100); }
}

// Employe.java - 模拟员工对象
package com.example.concurrency;

public class Employe {
    private int id;
    public Employe(int id) { this.id = id; }
    public int getId() { return id; }
}

// EntrepriseDao.java - 模拟数据访问层
package com.example.concurrency;

public class EntrepriseDao {
    public int retrouveEmplacementIdParDepartementId(int departmentId) {
        // 模拟耗时操作或业务逻辑
        try {
            Thread.sleep(5 + (int)(Math.random() * 95)); // 模拟随机耗时 5-100ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Thread interrupted during mock DAO call", e);
        }
        return departmentId + 500;
    }
}

接下来,我们将序列化逻辑封装到SerializationTask类中,它实现了Runnable接口:

// SerializationTask.java - 封装序列化任务
package com.example.concurrency;

import com.google.gson.Gson;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.file.Path;
import java.nio.file.Files;

public class SerializationTask implements Runnable {
    private final EventuelleDestination destination;
    private final Path outputDirectory;
    private final EmployeDao employeDao;
    private final EntrepriseDao entrepriseDao;

    public SerializationTask(EventuelleDestination destination, Path outputDirectory, EmployeDao employeDao, EntrepriseDao entrepriseDao) {
        this.destination = destination;
        this.outputDirectory = outputDirectory;
        this.employeDao = employeDao;
        this.entrepriseDao = entrepriseDao;
    }

    @Override
    public void run() {
        Gson gson = new Gson();
        try {
            // 确保输出目录存在
            Files.createDirectories(outputDirectory);

            String filename = employeDao.getEmploye().getId() + "_" +
                              entrepriseDao.retrouveEmplacementIdParDepartementId(destination.getEventuelAcceuillant().getId()) + "_" +
                              destination.getEventuelAcceuillant().getId() + ".json";
            Path filePath = outputDirectory.resolve(filename);

            try (Writer writer = new FileWriter(filePath.toFile())) {
                gson.toJson(destination, writer);
                System.out.println(Thread.currentThread().getName() + ": " + destination + " has been serialized to " + filePath);
            }
        } catch (IOException e) {
            System.err.println(Thread.currentThread().getName() + ": Error serializing " + destination + ": " + e.getMessage());
            e.printStackTrace();
        } catch (RuntimeException e) { // 捕获模拟DAO中可能抛出的RuntimeException
            System.err.println(Thread.currentThread().getName() + ": Runtime error during serialization of " + destination + ": " + e.getMessage());
            e.printStackTrace();
        }
    }
}

使用ExecutorService管理线程池

现在我们有了定义好的任务,接下来将使用ExecutorService来创建固定大小的线程池,并提交这些任务。

// AppExecutorDemo.java - 主应用程序
package com.example.concurrency;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class AppExecutorDemo {

    // 定义输出目录
    private final Path outputDir = Paths.get("serialized_data");

    public static void main(String[] args) {
        AppExecutorDemo app = new AppExecutorDemo();
        app.runDemo();
    }

    private void runDemo() {
        // 准备模拟数据和依赖
        EmployeDao employeDao = new EmployeDao();
        EntrepriseDao entrepriseDao = new EntrepriseDao();

        // 创建20个 EventuelleDestination 对象作为任务数据
        List destinations = IntStream.rangeClosed(1, 20)
                                                        .mapToObj(i -> new EventuelleDestination(i, new Acceuillant(i * 10)))
                                                        .toList();

        // 创建一个固定大小为3的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        System.out.println("ExecutorService created with 3 threads. Submitting tasks...");

        // 提交每个序列化任务到线程池
        for (EventuelleDestination dest : destinations) {
            executorService.submit(new SerializationTask(dest, outputDir, employeDao, entrepriseDao));
        }

        System.out.println("All tasks submitted. Awaiting termination...");
        // 优雅地关闭线程池
        shutdownAndAwaitTermination(executorService);
        System.out.println("ExecutorService terminated. All tasks completed or cancelled.");
    }

    /**
     * 优雅地关闭ExecutorService,等待已提交任务完成。
     * 此方法基于JavaDoc中ExecutorService的推荐关闭模式。
     *
     * @param executorService 要关闭的ExecutorService实例
     */
    private void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown(); // 禁用新任务提交
        try {
            // 等待已提交任务在指定时间内完成
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow(); // 如果超时,则取消当前正在执行的任务
                // 再次等待,确保任务响应中断
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Executor service did not terminate completely after forced shutdown. " + Instant.now());
                }
            }
        } catch (InterruptedException ex) {
            // 如果当前线程在等待期间被中断,则重新取消任务
            executorService.shutdownNow();
            // 重新设置中断状态
            Thread.currentThread().interrupt();
        }
    }
}

运行上述AppExecutorDemo类,您将看到类似以下的输出(具体的线程ID和时间戳会有所不同,但关键是pool-1-thread-X的数量不会超过3):

ExecutorService created with 3 threads. Submitting tasks...
All tasks submitted. Awaiting termination...
pool-1-thread-1: EventuelleDestination{id=1, acceuillantId=10} has been serialized to serialized_data/100_510_10.json
pool-1-thread-2: EventuelleDestination{id=2, acceuillantId=20} has been serialized to serialized_data/100_520_20.json
pool-1-thread-3: EventuelleDestination{id=3, acceuillantId=30} has been serialized to serialized_data/100_530_30.json
pool-1-thread-1: EventuelleDestination{id=4, acceuillantId=40} has been serialized to serialized_data/100_540_40.json
pool-1-thread-2: EventuelleDestination{id=5, acceuillantId=50} has been serialized to serialized_data/100_550_50.json
pool-1-thread-3: EventuelleDestination{id=6, acceuillantId=60} has been serialized to serialized_data/100_560_60.json
... (输出将继续,但始终只有3个线程在活跃地执行任务)
ExecutorService terminated. All tasks completed or cancelled.

从输出中可以看到,尽管我们提交了20个任务,但实际执行任务的线程(例如pool-1-thread-1、pool-1-thread-2、pool-1-thread-3)只有3个,这正是newFixedThreadPool(3)所实现的效果。

优雅关闭ExecutorService

正确关闭ExecutorService是并发编程中的一个重要环节。如果不在应用程序退出前关闭线程池,可能会导致程序无法正常终止,或者资源泄漏。shutdownAndAwaitTermination方法提供了一种优雅的关闭机制:

  1. executorService.shutdown(): 启动有序关闭,不再接受新任务,但会允许已提交的任务(包括等待队列中的任务)完成执行。
  2. executorService.awaitTermination(timeout, unit): 阻塞当前线程,直到所有任务完成执行,或者超时发生,或者当前线程被中断。
  3. executorService.shutdownNow(): 如果awaitTermination超时,表示任务未能及时完成,此时可以调用shutdownNow()尝试立即停止所有正在执行的任务,并清空等待队列。此方法会向所有正在执行的线程发送中断信号。

注意事项与最佳实践

  1. 选择合适的线程池大小: newFixedThreadPool()的线程数量应根据CPU核心数、任务类型(CPU密集型或I/O密集型)和系统资源进行权衡。对于CPU密集型任务,通常建议线程数接近CPU核心数;对于I/O密集型

相关专题

更多
java
java

Java是一个通用术语,用于表示Java软件及其组件,包括“Java运行时环境 (JRE)”、“Java虚拟机 (JVM)”以及“插件”。php中文网还为大家带了Java相关下载资源、相关课程以及相关文章等内容,供大家免费下载使用。

832

2023.06.15

java正则表达式语法
java正则表达式语法

java正则表达式语法是一种模式匹配工具,它非常有用,可以在处理文本和字符串时快速地查找、替换、验证和提取特定的模式和数据。本专题提供java正则表达式语法的相关文章、下载和专题,供大家免费下载体验。

738

2023.07.05

java自学难吗
java自学难吗

Java自学并不难。Java语言相对于其他一些编程语言而言,有着较为简洁和易读的语法,本专题为大家提供java自学难吗相关的文章,大家可以免费体验。

734

2023.07.31

java配置jdk环境变量
java配置jdk环境变量

Java是一种广泛使用的高级编程语言,用于开发各种类型的应用程序。为了能够在计算机上正确运行和编译Java代码,需要正确配置Java Development Kit(JDK)环境变量。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

397

2023.08.01

java保留两位小数
java保留两位小数

Java是一种广泛应用于编程领域的高级编程语言。在Java中,保留两位小数是指在进行数值计算或输出时,限制小数部分只有两位有效数字,并将多余的位数进行四舍五入或截取。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

398

2023.08.02

java基本数据类型
java基本数据类型

java基本数据类型有:1、byte;2、short;3、int;4、long;5、float;6、double;7、char;8、boolean。本专题为大家提供java基本数据类型的相关的文章、下载、课程内容,供大家免费下载体验。

446

2023.08.02

java有什么用
java有什么用

java可以开发应用程序、移动应用、Web应用、企业级应用、嵌入式系统等方面。本专题为大家提供java有什么用的相关的文章、下载、课程内容,供大家免费下载体验。

430

2023.08.02

java在线网站
java在线网站

Java在线网站是指提供Java编程学习、实践和交流平台的网络服务。近年来,随着Java语言在软件开发领域的广泛应用,越来越多的人对Java编程感兴趣,并希望能够通过在线网站来学习和提高自己的Java编程技能。php中文网给大家带来了相关的视频、教程以及文章,欢迎大家前来学习阅读和下载。

16925

2023.08.03

Golang gRPC 服务开发与Protobuf实战
Golang gRPC 服务开发与Protobuf实战

本专题系统讲解 Golang 在 gRPC 服务开发中的完整实践,涵盖 Protobuf 定义与代码生成、gRPC 服务端与客户端实现、流式 RPC(Unary/Server/Client/Bidirectional)、错误处理、拦截器、中间件以及与 HTTP/REST 的对接方案。通过实际案例,帮助学习者掌握 使用 Go 构建高性能、强类型、可扩展的 RPC 服务体系,适用于微服务与内部系统通信场景。

8

2026.01.15

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 2.5万人学习

C# 教程
C# 教程

共94课时 | 6.8万人学习

Java 教程
Java 教程

共578课时 | 46.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号