
本教程详细介绍了如何在java中利用`executors`框架和`executorservice`来限制并发执行的线程数量。通过将任务封装为`runnable`,并使用`executors.newfixedthreadpool()`创建固定大小的线程池,可以有效地管理资源并控制并发级别。文章还涵盖了任务提交、线程池的优雅关闭机制以及相关的最佳实践,旨在提供一个清晰、专业的并发编程指南。
在Java应用程序开发中,面对需要并行处理大量任务的场景时,合理地管理并发线程至关重要。直接创建无限数量的线程可能导致系统资源耗尽、性能下降甚至程序崩溃。为了解决这一问题,Java 5引入了java.util.concurrent包,其中的Executors框架为我们提供了一套强大的工具来管理线程池,从而有效地限制和控制并发线程的数量。本教程将指导您如何使用ExecutorService来创建一个固定大小的线程池,以处理并发任务,并确保资源的有效利用。
Executors框架是Java并发编程的核心组件之一,它提供了一系列工厂方法来创建不同类型的ExecutorService实例。ExecutorService是一个高级接口,用于管理线程的生命周期和任务的提交。通过使用线程池,我们可以重用线程,而不是为每个任务都创建新线程,这大大降低了线程创建和销毁的开销。
为了限制并发线程的数量,最常用的方法是使用Executors.newFixedThreadPool(int nThreads)方法。这个方法会创建一个固定大小的线程池,该线程池中的线程数量始终保持不变。当有新任务提交时,如果池中所有线程都在忙碌,那么新任务将被放入一个等待队列中,直到有空闲线程可用。
在使用ExecutorService之前,我们需要将要并行执行的逻辑封装成一个任务。Java提供了两个核心接口来定义任务:Runnable和Callable。
立即学习“Java免费学习笔记(深入)”;
在本教程中,我们将以一个文件序列化任务为例,使用Runnable接口来定义任务。假设我们有一个EventuelleDestination对象列表,需要为每个对象执行序列化操作,并将结果写入文件。
首先,定义一个EventuelleDestination及其相关依赖的模拟类,以便构建完整的示例:
易优微信教育培训小程序模板是基于前端开源小程序+后端易优cms+标签化API接口,是一套开源、快速搭建个性化需求的小程序CMS。轻量级TP底层框架,前后端分离,标签化API接口可对接所有小程序,支持二次开发。即使小白用户也能轻松搭建制作一套完整的线上版小程序。 微信教育培训小程序模板主要特点:1、代码开源,支持二次修改2、微信原生写法,兼容性更好,代码可读性更强3、功能接口完整,支持eyoucms
0
// EventuelleDestination.java - 模拟业务对象
package com.example.concurrency;
import java.util.Objects;
public class EventuelleDestination {
private int id;
private Acceuillant eventuelAcceuillant;
public EventuelleDestination(int id, Acceuillant acceuillant) {
this.id = id;
this.eventuelAcceuillant = acceuillant;
}
public int getId() { return id; }
public Acceuillant getEventuelAcceuillant() { return eventuelAcceuillant; }
@Override
public String toString() {
return "EventuelleDestination{" + "id=" + id + ", acceuillantId=" + eventuelAcceuillant.getId() + '}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
EventuelleDestination that = (EventuelleDestination) o;
return id == that.id && Objects.equals(eventuelAcceuillant, that.eventuelAcceuillant);
}
@Override
public int hashCode() {
return Objects.hash(id, eventuelAcceuillant);
}
}
// Acceuillant.java - 模拟嵌套对象
package com.example.concurrency;
import java.util.Objects;
public class Acceuillant {
private int id;
public Acceuillant(int id) { this.id = id; }
public int getId() { return id; }
@Override
public String toString() {
return "Acceuillant{" + "id=" + id + '}';
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Acceuillant that = (Acceuillant) o;
return id == that.id;
}
@Override
public int hashCode() {
return Objects.hash(id);
}
}
// EmployeDao.java - 模拟数据访问层
package com.example.concurrency;
public class EmployeDao {
public Employe getEmploye() { return new Employe(100); }
}
// Employe.java - 模拟员工对象
package com.example.concurrency;
public class Employe {
private int id;
public Employe(int id) { this.id = id; }
public int getId() { return id; }
}
// EntrepriseDao.java - 模拟数据访问层
package com.example.concurrency;
public class EntrepriseDao {
public int retrouveEmplacementIdParDepartementId(int departmentId) {
// 模拟耗时操作或业务逻辑
try {
Thread.sleep(5 + (int)(Math.random() * 95)); // 模拟随机耗时 5-100ms
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Thread interrupted during mock DAO call", e);
}
return departmentId + 500;
}
}接下来,我们将序列化逻辑封装到SerializationTask类中,它实现了Runnable接口:
// SerializationTask.java - 封装序列化任务
package com.example.concurrency;
import com.google.gson.Gson;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.nio.file.Path;
import java.nio.file.Files;
public class SerializationTask implements Runnable {
private final EventuelleDestination destination;
private final Path outputDirectory;
private final EmployeDao employeDao;
private final EntrepriseDao entrepriseDao;
public SerializationTask(EventuelleDestination destination, Path outputDirectory, EmployeDao employeDao, EntrepriseDao entrepriseDao) {
this.destination = destination;
this.outputDirectory = outputDirectory;
this.employeDao = employeDao;
this.entrepriseDao = entrepriseDao;
}
@Override
public void run() {
Gson gson = new Gson();
try {
// 确保输出目录存在
Files.createDirectories(outputDirectory);
String filename = employeDao.getEmploye().getId() + "_" +
entrepriseDao.retrouveEmplacementIdParDepartementId(destination.getEventuelAcceuillant().getId()) + "_" +
destination.getEventuelAcceuillant().getId() + ".json";
Path filePath = outputDirectory.resolve(filename);
try (Writer writer = new FileWriter(filePath.toFile())) {
gson.toJson(destination, writer);
System.out.println(Thread.currentThread().getName() + ": " + destination + " has been serialized to " + filePath);
}
} catch (IOException e) {
System.err.println(Thread.currentThread().getName() + ": Error serializing " + destination + ": " + e.getMessage());
e.printStackTrace();
} catch (RuntimeException e) { // 捕获模拟DAO中可能抛出的RuntimeException
System.err.println(Thread.currentThread().getName() + ": Runtime error during serialization of " + destination + ": " + e.getMessage());
e.printStackTrace();
}
}
}现在我们有了定义好的任务,接下来将使用ExecutorService来创建固定大小的线程池,并提交这些任务。
// AppExecutorDemo.java - 主应用程序
package com.example.concurrency;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class AppExecutorDemo {
// 定义输出目录
private final Path outputDir = Paths.get("serialized_data");
public static void main(String[] args) {
AppExecutorDemo app = new AppExecutorDemo();
app.runDemo();
}
private void runDemo() {
// 准备模拟数据和依赖
EmployeDao employeDao = new EmployeDao();
EntrepriseDao entrepriseDao = new EntrepriseDao();
// 创建20个 EventuelleDestination 对象作为任务数据
List<EventuelleDestination> destinations = IntStream.rangeClosed(1, 20)
.mapToObj(i -> new EventuelleDestination(i, new Acceuillant(i * 10)))
.toList();
// 创建一个固定大小为3的线程池
ExecutorService executorService = Executors.newFixedThreadPool(3);
System.out.println("ExecutorService created with 3 threads. Submitting tasks...");
// 提交每个序列化任务到线程池
for (EventuelleDestination dest : destinations) {
executorService.submit(new SerializationTask(dest, outputDir, employeDao, entrepriseDao));
}
System.out.println("All tasks submitted. Awaiting termination...");
// 优雅地关闭线程池
shutdownAndAwaitTermination(executorService);
System.out.println("ExecutorService terminated. All tasks completed or cancelled.");
}
/**
* 优雅地关闭ExecutorService,等待已提交任务完成。
* 此方法基于JavaDoc中ExecutorService的推荐关闭模式。
*
* @param executorService 要关闭的ExecutorService实例
*/
private void shutdownAndAwaitTermination(ExecutorService executorService) {
executorService.shutdown(); // 禁用新任务提交
try {
// 等待已提交任务在指定时间内完成
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
executorService.shutdownNow(); // 如果超时,则取消当前正在执行的任务
// 再次等待,确保任务响应中断
if (!executorService.awaitTermination(60, TimeUnit.SECONDS)) {
System.err.println("Executor service did not terminate completely after forced shutdown. " + Instant.now());
}
}
} catch (InterruptedException ex) {
// 如果当前线程在等待期间被中断,则重新取消任务
executorService.shutdownNow();
// 重新设置中断状态
Thread.currentThread().interrupt();
}
}
}运行上述AppExecutorDemo类,您将看到类似以下的输出(具体的线程ID和时间戳会有所不同,但关键是pool-1-thread-X的数量不会超过3):
ExecutorService created with 3 threads. Submitting tasks...
All tasks submitted. Awaiting termination...
pool-1-thread-1: EventuelleDestination{id=1, acceuillantId=10} has been serialized to serialized_data/100_510_10.json
pool-1-thread-2: EventuelleDestination{id=2, acceuillantId=20} has been serialized to serialized_data/100_520_20.json
pool-1-thread-3: EventuelleDestination{id=3, acceuillantId=30} has been serialized to serialized_data/100_530_30.json
pool-1-thread-1: EventuelleDestination{id=4, acceuillantId=40} has been serialized to serialized_data/100_540_40.json
pool-1-thread-2: EventuelleDestination{id=5, acceuillantId=50} has been serialized to serialized_data/100_550_50.json
pool-1-thread-3: EventuelleDestination{id=6, acceuillantId=60} has been serialized to serialized_data/100_560_60.json
... (输出将继续,但始终只有3个线程在活跃地执行任务)
ExecutorService terminated. All tasks completed or cancelled.从输出中可以看到,尽管我们提交了20个任务,但实际执行任务的线程(例如pool-1-thread-1、pool-1-thread-2、pool-1-thread-3)只有3个,这正是newFixedThreadPool(3)所实现的效果。
正确关闭ExecutorService是并发编程中的一个重要环节。如果不在应用程序退出前关闭线程池,可能会导致程序无法正常终止,或者资源泄漏。shutdownAndAwaitTermination方法提供了一种优雅的关闭机制:
以上就是Java并发编程:使用ExecutorService限制线程数量的教程的详细内容,更多请关注php中文网其它相关文章!
编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号