
在实际开发中,从数据库批量获取数据是一个常见需求。然而,数据库通常对sql查询中的参数数量有限制(例如,oracle的in子句限制为1000个,mysql默认也有限制)。这意味着我们不能一次性将所有查询键(如id列表)传递给数据库,而需要将它们分成多个小批次进行查询。
原始的代码示例展示了一种处理方式:
AtomicInteger counter = new AtomicInteger();
List<Cat> catList = new ArrayList<>(); // 共享可变状态
List<Dog> dogList = new ArrayList<>(); // 共享可变状态
List<Integer> numbers = Stream.iterate(1, e -> e + 1)
.limit(5000)
.collect(Collectors.toList());
// 将列表分成大小为500的子列表
Collection<List<Integer>> partitionedListOfNumbers = numbers.stream()
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
.values();
// 遍历分批后的列表,并累加结果到共享的可变列表中
partitionedListOfNumbers.stream()
.forEach(list -> {
List<Cat> interimCatList = catRepo.fetchCats(list); // 从数据库获取Cat
catList.addAll(interimCatList); // 副作用:修改外部的catList
List<Dog> interimDogList = dogRepo.fetchDogs(list); // 从数据库获取Dog
dogList.addAll(interimDogList); // 副作用:修改外部的dogList
});上述代码存在一个核心问题:它引入了共享可变状态。catList和dogList在forEach循环内部被反复修改(通过addAll方法)。这种编程模式被称为“副作用”,它使得代码难以理解、测试和并行化。在多线程环境中,如果不进行额外的同步处理,这种共享可变状态会导致数据不一致。即使在单线程环境中,它也违背了函数式编程的纯粹性原则。
为了解决共享可变状态的问题,我们可以充分利用Java Stream API的函数式特性,通过声明式编程风格来转换和聚合数据,而不是通过命令式地修改外部状态。
核心思想是:将每个批次查询的结果视为一个独立的流,然后将所有批次的结果流扁平化(flatten)并收集到一个新的、不可变的列表中。
立即学习“Java免费学习笔记(深入)”;
首先,我们可以更简洁地生成并分批处理输入数字列表。IntStream.rangeClosed是一个很好的替代Stream.iterate来生成连续整数序列的方法。分批逻辑保持不变,依然利用AtomicInteger作为groupingBy的键生成器,将大列表分割成指定大小的子列表集合。
// 使用AtomicInteger为分组提供递增的键
AtomicInteger counter = new AtomicInteger();
// 生成1到5000的整数,并按每500个一组进行分批
Collection<List<Integer>> partitionedListOfNumbers = IntStream.rangeClosed(1, 5000)
.boxed() // 将IntStream转换为Stream<Integer>以便进行后续操作
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
.values();这里的AtomicInteger虽然是可变的,但它的作用仅限于为groupingBy操作提供一个递增的、线程安全的键,以实现列表的分组。它不直接参与最终数据列表的累积,因此其使用并不会引入我们试图避免的“共享可变结果列表”问题。
现在,我们可以利用Stream的链式操作来获取并聚合数据,而无需任何外部的addAll操作。
// 获取Cat列表
List<Cat> catList = partitionedListOfNumbers.stream()
.map(list -> catRepo.fetchCats(list)) // 将每个批次的List<Integer>映射为List<Cat>
.flatMap(List::stream) // 将Stream<List<Cat>>扁平化为Stream<Cat>
.collect(Collectors.toList()); // 收集所有Cat到一个新的List中
// 获取Dog列表
List<Dog> dogList = partitionedListOfNumbers.stream()
.map(list -> dogRepo.fetchDogs(list)) // 将每个批次的List<Integer>映射为List<Dog>
.flatMap(List::stream) // 将Stream<List<Dog>>扁平化为Stream<Dog>
.collect(Collectors.toList()); // 收集所有Dog到一个新的List中通过这种方式,catList和dogList不再是预先声明并被修改的共享可变状态,而是通过流管道的最终collect操作一次性生成的新列表。这完全避免了共享可变性问题。
结合上述优化,完整的代码如下所示:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
// 假设Cat和Dog是您的实体类
class Cat {
private int id;
private String name;
// 构造函数、getter/setter等
public Cat(int id, String name) { this.id = id; this.name = name; }
@Override public String toString() { return "Cat{id=" + id + ", name='" + name + "'}"; }
}
class Dog {
private int id;
private String name;
// 构造函数、getter/setter等
public Dog(int id, String name) { this.id = id; this.name = name; }
@Override public String toString() { return "Dog{id=" + id + ", name='" + name + "'}"; }
}
// 假设这是您的数据库仓库接口
interface CatRepository {
List<Cat> fetchCats(List<Integer> ids);
}
interface DogRepository {
List<Dog> fetchDogs(List<Integer> ids);
}
public class BatchDataFetcher {
// 模拟数据库仓库
static class MockCatRepository implements CatRepository {
@Override
public List<Cat> fetchCats(List<Integer> ids) {
System.out.println("Fetching Cats for IDs: " + ids.size() + " - " + ids.get(0) + "..." + ids.get(ids.size()-1));
return ids.stream()
.map(id -> new Cat(id, "Cat-" + id))
.collect(Collectors.toList());
}
}
static class MockDogRepository implements DogRepository {
@Override
public List<Dog> fetchDogs(List<Integer> ids) {
System.out.println("Fetching Dogs for IDs: " + ids.size() + " - " + ids.get(0) + "..." + ids.get(ids.size()-1));
return ids.stream()
.map(id -> new Dog(id, "Dog-" + id))
.collect(Collectors.toList());
}
}
public static void main(String[] args) {
CatRepository catRepo = new MockCatRepository();
DogRepository dogRepo = new MockDogRepository();
// 使用AtomicInteger为分组提供递增的键
AtomicInteger counter = new AtomicInteger();
// 生成1到5000的整数,并按每500个一组进行分批
Collection<List<Integer>> partitionedListOfNumbers = IntStream.rangeClosed(1, 5000)
.boxed() // 将IntStream转换为Stream<Integer>以便进行后续操作
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
.values();
// 获取Cat列表,使用Stream API避免共享可变状态
List<Cat> catList = partitionedListOfNumbers.stream()
.map(catRepo::fetchCats) // 使用方法引用,将每个批次的List<Integer>映射为List<Cat>
.flatMap(List::stream) // 将Stream<List<Cat>>扁平化为Stream<Cat>
.collect(Collectors.toList()); // 收集所有Cat到一个新的List中
// 获取Dog列表,使用Stream API避免共享可变状态
List<Dog> dogList = partitionedListOfNumbers.stream()
.map(dogRepo::fetchDogs) // 使用方法引用,将每个批次的List<Integer>映射为List<Dog>
.flatMap(List::stream) // 将Stream<List<Dog>>扁平化为Stream<Dog>
.collect(Collectors.toList()); // 收集所有Dog到一个新的List中
System.out.println("\nTotal Cats fetched: " + catList.size());
System.out.println("Total Dogs fetched: " + dogList.size());
// 打印部分结果以验证
// catList.stream().limit(5).forEach(System.out::println);
// dogList.stream().limit(5).forEach(System.out::println);
}
}如果需要从多个不同的仓库获取数据,并且它们的批处理逻辑相同,可以考虑提取一个通用的方法来减少代码重复。这个方法可以接受一个函数式接口(例如Function<List<Integer>, List<T>>)作为参数,用于执行实际的数据获取操作。
import java.util.function.Function;
public class GenericBatchDataFetcher {
// ... (Cat, Dog, MockCatRepository, MockDogRepository 保持不变) ...
public static <T> List<T> fetchBatchedData(
Collection<List<Integer>> partitionedKeys,
Function<List<Integer>, List<T>> dataFetcher) {
return partitionedKeys.stream()
.map(dataFetcher) // 应用传入的数据获取函数
.flatMap(List::stream)
.collect(Collectors.toList());
}
public static void main(String[] args) {
CatRepository catRepo = new MockCatRepository();
DogRepository dogRepo = new MockDogRepository();
AtomicInteger counter = new AtomicInteger();
Collection<List<Integer>> partitionedListOfNumbers = IntStream.rangeClosed(1, 5000)
.boxed()
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
.values();
// 使用通用方法获取Cat列表
List<Cat> catList = fetchBatchedData(partitionedListOfNumbers, catRepo::fetchCats);
// 使用通用方法获取Dog列表
List<Dog> dogList = fetchBatchedData(partitionedListOfNumbers, dogRepo::fetchDogs);
System.out.println("\nTotal Cats fetched: " + catList.size());
System.out.println("Total Dogs fetched: " + dogList.size());
}
}通过这种方式,我们不仅避免了共享可变状态,还提高了代码的模块化和复用性。
通过将原始代码重构为使用Java Stream API,我们成功地消除了共享可变状态的问题。新的实现方式利用map、flatMap和collect等操作,以声明式、函数式编程风格处理数据,带来了以下好处:
这种模式是Java中处理批量数据获取和转换的推荐方式,尤其是在需要遵循函数式编程原则和利用现代Java特性时。
以上就是使用Java Stream实现无共享可变状态的数据批量获取的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号