首页 > Java > java教程 > 正文

Java并发编程:使用ExecutorService限制线程数量的教程

花韻仙語
发布: 2025-11-29 18:59:00
原创
690人浏览过

Java并发编程:使用ExecutorService限制线程数量的教程

本教程详细介绍了如何在java中利用`executors`框架和`executorservice`来限制并发执行的线程数量。通过将任务封装为`runnable`,并使用`executors.newfixedthreadpool()`创建固定大小的线程池,可以有效地管理资源并控制并发级别。文章还涵盖了任务提交、线程池的优雅关闭机制以及相关的最佳实践,旨在提供一个清晰、专业的并发编程指南。

引言

在Java应用程序开发中,面对需要并行处理大量任务的场景时,合理地管理并发线程至关重要。直接创建无限数量的线程可能导致系统资源耗尽、性能下降甚至程序崩溃。为了解决这一问题,Java 5引入了java.util.concurrent包,其中的Executors框架为我们提供了一套强大的工具来管理线程池,从而有效地限制和控制并发线程的数量。本教程将指导您如何使用ExecutorService来创建一个固定大小的线程池,以处理并发任务,并确保资源的有效利用。

核心概念:Executors框架与ExecutorService

Executors框架是Java并发编程的核心组件之一,它提供了一系列工厂方法来创建不同类型的ExecutorService实例。ExecutorService是一个高级接口,用于管理线程的生命周期和任务的提交。通过使用线程池,我们可以重用线程,而不是为每个任务都创建新线程,这大大降低了线程创建和销毁的开销。

为了限制并发线程的数量,最常用的方法是使用Executors.newFixedThreadPool(int nThreads)方法。这个方法会创建一个固定大小的线程池,该线程池中的线程数量始终保持不变。当有新任务提交时,如果池中所有线程都在忙碌,那么新任务将被放入一个等待队列中,直到有空闲线程可用。

定义并发任务:Runnable接口

在使用ExecutorService之前,我们需要将要并行执行的逻辑封装成一个任务。Java提供了两个核心接口来定义任务:Runnable和Callable。

立即学习Java免费学习笔记(深入)”;

  • Runnable: 适用于不需要返回结果且不抛出受检异常的任务。它只包含一个run()方法。
  • Callable: 适用于需要返回结果且可能抛出受检异常的任务。它包含一个call()方法,并返回一个Future对象。

在本教程中,我们将以一个文件序列化任务为例,使用Runnable接口来定义任务。假设我们有一个EventuelleDestination对象列表,需要为每个对象执行序列化操作,并将结果写入文件。

首先,定义一个EventuelleDestination及其相关依赖的模拟类,以便构建完整的示例:

易优微信教育培训小程序模板
易优微信教育培训小程序模板

易优微信教育培训小程序模板是基于前端开源小程序+后端易优cms+标签化API接口,是一套开源、快速搭建个性化需求的小程序CMS。轻量级TP底层框架,前后端分离,标签化API接口可对接所有小程序,支持二次开发。即使小白用户也能轻松搭建制作一套完整的线上版小程序。 微信教育培训小程序模板主要特点:1、代码开源,支持二次修改2、微信原生写法,兼容性更好,代码可读性更强3、功能接口完整,支持eyoucms

易优微信教育培训小程序模板 0
查看详情 易优微信教育培训小程序模板
// EventuelleDestination.java - 模拟业务对象
package com.example.concurrency;

import java.util.Objects;

public class EventuelleDestination {
    private int id;
    private Acceuillant eventuelAcceuillant;

    public EventuelleDestination(int id, Acceuillant acceuillant) {
        this.id = id;
        this.eventuelAcceuillant = acceuillant;
    }

    public int getId() { return id; }
    public Acceuillant getEventuelAcceuillant() { return eventuelAcceuillant; }

    @Override
    public String toString() {
        return "EventuelleDestination{" + "id=" + id + ", acceuillantId=" + eventuelAcceuillant.getId() + '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        EventuelleDestination that = (EventuelleDestination) o;
        return id == that.id && Objects.equals(eventuelAcceuillant, that.eventuelAcceuillant);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, eventuelAcceuillant);
    }
}

// Acceuillant.java - 模拟嵌套对象
package com.example.concurrency;

import java.util.Objects;

public class Acceuillant {
    private int id;

    public Acceuillant(int id) { this.id = id; }
    public int getId() { return id; }

    @Override
    public String toString() {
        return "Acceuillant{" + "id=" + id + '}';
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        Acceuillant that = (Acceuillant) o;
        return id == that.id;
    }

    @Override
    public int hashCode() {
        return Objects.hash(id);
    }
}

// EmployeDao.java - 模拟数据访问层
package com.example.concurrency;

public class EmployeDao {
    public Employe getEmploye() { return new Employe(100); }
}

// Employe.java - 模拟员工对象
package com.example.concurrency;

public class Employe {
    private int id;
    public Employe(int id) { this.id = id; }
    public int getId() { return id; }
}

// EntrepriseDao.java - 模拟数据访问层
package com.example.concurrency;

public class EntrepriseDao {
    public int retrouveEmplacementIdParDepartementId(int departmentId) {
        // 模拟耗时操作或业务逻辑
        try {
            Thread.sleep(5 + (int)(Math.random() * 95)); // 模拟随机耗时 5-100ms
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Thread interrupted during mock DAO call", e);
        }
        return departmentId + 500;
    }
}
登录后复制

接下来,我们将序列化逻辑封装到SerializationTask类中,它实现了Runnable接口:

// SerializationTask.java - 封装序列化任务
package com.example.concurrency;

import com.google.gson.Gson;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.file.Path;
import java.nio.file.Files;

public class SerializationTask implements Runnable {
    private final EventuelleDestination destination;
    private final Path outputDirectory;
    private final EmployeDao employeDao;
    private final EntrepriseDao entrepriseDao;

    public SerializationTask(EventuelleDestination destination, Path outputDirectory, EmployeDao employeDao, EntrepriseDao entrepriseDao) {
        this.destination = destination;
        this.outputDirectory = outputDirectory;
        this.employeDao = employeDao;
        this.entrepriseDao = entrepriseDao;
    }

    @Override
    public void run() {
        Gson gson = new Gson();
        try {
            // 确保输出目录存在
            Files.createDirectories(outputDirectory);

            String filename = employeDao.getEmploye().getId() + "_" +
                              entrepriseDao.retrouveEmplacementIdParDepartementId(destination.getEventuelAcceuillant().getId()) + "_" +
                              destination.getEventuelAcceuillant().getId() + ".json";
            Path filePath = outputDirectory.resolve(filename);

            try (Writer writer = new FileWriter(filePath.toFile())) {
                gson.toJson(destination, writer);
                System.out.println(Thread.currentThread().getName() + ": " + destination + " has been serialized to " + filePath);
            }
        } catch (IOException e) {
            System.err.println(Thread.currentThread().getName() + ": Error serializing " + destination + ": " + e.getMessage());
            e.printStackTrace();
        } catch (RuntimeException e) { // 捕获模拟DAO中可能抛出的RuntimeException
            System.err.println(Thread.currentThread().getName() + ": Runtime error during serialization of " + destination + ": " + e.getMessage());
            e.printStackTrace();
        }
    }
}
登录后复制

使用ExecutorService管理线程池

现在我们有了定义好的任务,接下来将使用ExecutorService来创建固定大小的线程池,并提交这些任务。

// AppExecutorDemo.java - 主应用程序
package com.example.concurrency;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

public class AppExecutorDemo {

    // 定义输出目录
    private final Path outputDir = Paths.get("serialized_data");

    public static void main(String[] args) {
        AppExecutorDemo app = new AppExecutorDemo();
        app.runDemo();
    }

    private void runDemo() {
        // 准备模拟数据和依赖
        EmployeDao employeDao = new EmployeDao();
        EntrepriseDao entrepriseDao = new EntrepriseDao();

        // 创建20个 EventuelleDestination 对象作为任务数据
        List<EventuelleDestination> destinations = IntStream.rangeClosed(1, 20)
                                                        .mapToObj(i -> new EventuelleDestination(i, new Acceuillant(i * 10)))
                                                        .toList();

        // 创建一个固定大小为3的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        System.out.println("ExecutorService created with 3 threads. Submitting tasks...");

        // 提交每个序列化任务到线程池
        for (EventuelleDestination dest : destinations) {
            executorService.submit(new SerializationTask(dest, outputDir, employeDao, entrepriseDao));
        }

        System.out.println("All tasks submitted. Awaiting termination...");
        // 优雅地关闭线程池
        shutdownAndAwaitTermination(executorService);
        System.out.println("ExecutorService terminated. All tasks completed or cancelled.");
    }

    /**
     * 优雅地关闭ExecutorService,等待已提交任务完成。
     * 此方法基于JavaDoc中ExecutorService的推荐关闭模式。
     *
     * @param executorService 要关闭的ExecutorService实例
     */
    private void shutdownAndAwaitTermination(ExecutorService executorService) {
        executorService.shutdown(); // 禁用新任务提交
        try {
            // 等待已提交任务在指定时间内完成
            if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                executorService.shutdownNow(); // 如果超时,则取消当前正在执行的任务
                // 再次等待,确保任务响应中断
                if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
                    System.err.println("Executor service did not terminate completely after forced shutdown. " + Instant.now());
                }
            }
        } catch (InterruptedException ex) {
            // 如果当前线程在等待期间被中断,则重新取消任务
            executorService.shutdownNow();
            // 重新设置中断状态
            Thread.currentThread().interrupt();
        }
    }
}
登录后复制

运行上述AppExecutorDemo类,您将看到类似以下的输出(具体的线程ID和时间戳会有所不同,但关键是pool-1-thread-X的数量不会超过3):

ExecutorService created with 3 threads. Submitting tasks...
All tasks submitted. Awaiting termination...
pool-1-thread-1: EventuelleDestination{id=1, acceuillantId=10} has been serialized to serialized_data/100_510_10.json
pool-1-thread-2: EventuelleDestination{id=2, acceuillantId=20} has been serialized to serialized_data/100_520_20.json
pool-1-thread-3: EventuelleDestination{id=3, acceuillantId=30} has been serialized to serialized_data/100_530_30.json
pool-1-thread-1: EventuelleDestination{id=4, acceuillantId=40} has been serialized to serialized_data/100_540_40.json
pool-1-thread-2: EventuelleDestination{id=5, acceuillantId=50} has been serialized to serialized_data/100_550_50.json
pool-1-thread-3: EventuelleDestination{id=6, acceuillantId=60} has been serialized to serialized_data/100_560_60.json
... (输出将继续,但始终只有3个线程在活跃地执行任务)
ExecutorService terminated. All tasks completed or cancelled.
登录后复制

从输出中可以看到,尽管我们提交了20个任务,但实际执行任务的线程(例如pool-1-thread-1、pool-1-thread-2、pool-1-thread-3)只有3个,这正是newFixedThreadPool(3)所实现的效果。

优雅关闭ExecutorService

正确关闭ExecutorService是并发编程中的一个重要环节。如果不在应用程序退出前关闭线程池,可能会导致程序无法正常终止,或者资源泄漏。shutdownAndAwaitTermination方法提供了一种优雅的关闭机制:

  1. executorService.shutdown(): 启动有序关闭,不再接受新任务,但会允许已提交的任务(包括等待队列中的任务)完成执行。
  2. executorService.awaitTermination(timeout, unit): 阻塞当前线程,直到所有任务完成执行,或者超时发生,或者当前线程被中断。
  3. executorService.shutdownNow(): 如果awaitTermination超时,表示任务未能及时完成,此时可以调用shutdownNow()尝试立即停止所有正在执行的任务,并清空等待队列。此方法会向所有正在执行的线程发送中断信号。

注意事项与最佳实践

  1. 选择合适的线程池大小: newFixedThreadPool()的线程数量应根据CPU核心数、任务类型(CPU密集型或I/O密集型)和系统资源进行权衡。对于CPU密集型任务,通常建议线程数接近CPU核心数;对于I/O密集型

以上就是Java并发编程:使用ExecutorService限制线程数量的教程的详细内容,更多请关注php中文网其它相关文章!

编程速学教程(入门课程)
编程速学教程(入门课程)

编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号