
1. 问题背景与挑战
在实际开发中,从数据库批量获取数据是一个常见需求。然而,数据库通常对sql查询中的参数数量有限制(例如,oracle的in子句限制为1000个,mysql默认也有限制)。这意味着我们不能一次性将所有查询键(如id列表)传递给数据库,而需要将它们分成多个小批次进行查询。
原始的代码示例展示了一种处理方式:
AtomicInteger counter = new AtomicInteger(); ListcatList = new ArrayList<>(); // 共享可变状态 List dogList = new ArrayList<>(); // 共享可变状态 List numbers = Stream.iterate(1, e -> e + 1) .limit(5000) .collect(Collectors.toList()); // 将列表分成大小为500的子列表 Collection > partitionedListOfNumbers = numbers.stream() .collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500)) .values(); // 遍历分批后的列表,并累加结果到共享的可变列表中 partitionedListOfNumbers.stream() .forEach(list -> { List
interimCatList = catRepo.fetchCats(list); // 从数据库获取Cat catList.addAll(interimCatList); // 副作用:修改外部的catList List interimDogList = dogRepo.fetchDogs(list); // 从数据库获取Dog dogList.addAll(interimDogList); // 副作用:修改外部的dogList });
上述代码存在一个核心问题:它引入了共享可变状态。catList和dogList在forEach循环内部被反复修改(通过addAll方法)。这种编程模式被称为“副作用”,它使得代码难以理解、测试和并行化。在多线程环境中,如果不进行额外的同步处理,这种共享可变状态会导致数据不一致。即使在单线程环境中,它也违背了函数式编程的纯粹性原则。
2. 采用Java Stream API实现无副作用的数据获取
为了解决共享可变状态的问题,我们可以充分利用Java Stream API的函数式特性,通过声明式编程风格来转换和聚合数据,而不是通过命令式地修改外部状态。
核心思想是:将每个批次查询的结果视为一个独立的流,然后将所有批次的结果流扁平化(flatten)并收集到一个新的、不可变的列表中。
立即学习“Java免费学习笔记(深入)”;
2.1 优化输入列表的生成与分批
首先,我们可以更简洁地生成并分批处理输入数字列表。IntStream.rangeClosed是一个很好的替代Stream.iterate来生成连续整数序列的方法。分批逻辑保持不变,依然利用AtomicInteger作为groupingBy的键生成器,将大列表分割成指定大小的子列表集合。
// 使用AtomicInteger为分组提供递增的键 AtomicInteger counter = new AtomicInteger(); // 生成1到5000的整数,并按每500个一组进行分批 Collection> partitionedListOfNumbers = IntStream.rangeClosed(1, 5000) .boxed() // 将IntStream转换为Stream
以便进行后续操作 .collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500)) .values();
这里的AtomicInteger虽然是可变的,但它的作用仅限于为groupingBy操作提供一个递增的、线程安全的键,以实现列表的分组。它不直接参与最终数据列表的累积,因此其使用并不会引入我们试图避免的“共享可变结果列表”问题。
2.2 使用map、flatMap和collect处理数据
现在,我们可以利用Stream的链式操作来获取并聚合数据,而无需任何外部的addAll操作。
-
map操作:对于partitionedListOfNumbers中的每个子列表(即一个批次),我们调用相应的数据库查询方法(catRepo.fetchCats或dogRepo.fetchDogs),这将返回一个包含该批次结果的List
或List 。此时,我们得到的是一个Stream - >或Stream
- >。
-
flatMap操作:由于我们最终需要一个扁平化的List
或List ,而不是一个列表的列表,flatMap操作就派上用场了。它会将Stream - >中的每个内部列表展开,并将所有元素合并到一个新的Stream
中。List::stream是一个方法引用,用于将每个内部列表转换为一个流。 - collect操作:最后,使用Collectors.toList()将扁平化后的流中的所有元素收集到一个新的List中。这个新的List是不可变的,因为它是在流管道的末端一次性构建的,没有在处理过程中被外部修改。
// 获取Cat列表 ListcatList = partitionedListOfNumbers.stream() .map(list -> catRepo.fetchCats(list)) // 将每个批次的List 映射为List .flatMap(List::stream) // 将Stream >扁平化为Stream
.collect(Collectors.toList()); // 收集所有Cat到一个新的List中 // 获取Dog列表 List dogList = partitionedListOfNumbers.stream() .map(list -> dogRepo.fetchDogs(list)) // 将每个批次的List 映射为List .flatMap(List::stream) // 将Stream >扁平化为Stream
.collect(Collectors.toList()); // 收集所有Dog到一个新的List中
通过这种方式,catList和dogList不再是预先声明并被修改的共享可变状态,而是通过流管道的最终collect操作一次性生成的新列表。这完全避免了共享可变性问题。
3. 完整优化代码示例
结合上述优化,完整的代码如下所示:
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
// 假设Cat和Dog是您的实体类
class Cat {
private int id;
private String name;
// 构造函数、getter/setter等
public Cat(int id, String name) { this.id = id; this.name = name; }
@Override public String toString() { return "Cat{id=" + id + ", name='" + name + "'}"; }
}
class Dog {
private int id;
private String name;
// 构造函数、getter/setter等
public Dog(int id, String name) { this.id = id; this.name = name; }
@Override public String toString() { return "Dog{id=" + id + ", name='" + name + "'}"; }
}
// 假设这是您的数据库仓库接口
interface CatRepository {
List fetchCats(List ids);
}
interface DogRepository {
List fetchDogs(List ids);
}
public class BatchDataFetcher {
// 模拟数据库仓库
static class MockCatRepository implements CatRepository {
@Override
public List fetchCats(List ids) {
System.out.println("Fetching Cats for IDs: " + ids.size() + " - " + ids.get(0) + "..." + ids.get(ids.size()-1));
return ids.stream()
.map(id -> new Cat(id, "Cat-" + id))
.collect(Collectors.toList());
}
}
static class MockDogRepository implements DogRepository {
@Override
public List fetchDogs(List ids) {
System.out.println("Fetching Dogs for IDs: " + ids.size() + " - " + ids.get(0) + "..." + ids.get(ids.size()-1));
return ids.stream()
.map(id -> new Dog(id, "Dog-" + id))
.collect(Collectors.toList());
}
}
public static void main(String[] args) {
CatRepository catRepo = new MockCatRepository();
DogRepository dogRepo = new MockDogRepository();
// 使用AtomicInteger为分组提供递增的键
AtomicInteger counter = new AtomicInteger();
// 生成1到5000的整数,并按每500个一组进行分批
Collection> partitionedListOfNumbers = IntStream.rangeClosed(1, 5000)
.boxed() // 将IntStream转换为Stream以便进行后续操作
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
.values();
// 获取Cat列表,使用Stream API避免共享可变状态
List catList = partitionedListOfNumbers.stream()
.map(catRepo::fetchCats) // 使用方法引用,将每个批次的List映射为List
.flatMap(List::stream) // 将Stream>扁平化为Stream
.collect(Collectors.toList()); // 收集所有Cat到一个新的List中
// 获取Dog列表,使用Stream API避免共享可变状态
List dogList = partitionedListOfNumbers.stream()
.map(dogRepo::fetchDogs) // 使用方法引用,将每个批次的List映射为List
.flatMap(List::stream) // 将Stream>扁平化为Stream
.collect(Collectors.toList()); // 收集所有Dog到一个新的List中
System.out.println("\nTotal Cats fetched: " + catList.size());
System.out.println("Total Dogs fetched: " + dogList.size());
// 打印部分结果以验证
// catList.stream().limit(5).forEach(System.out::println);
// dogList.stream().limit(5).forEach(System.out::println);
}
}
4. 进一步重构与注意事项
4.1 提取通用逻辑
如果需要从多个不同的仓库获取数据,并且它们的批处理逻辑相同,可以考虑提取一个通用的方法来减少代码重复。这个方法可以接受一个函数式接口(例如Function, List
import java.util.function.Function;
public class GenericBatchDataFetcher {
// ... (Cat, Dog, MockCatRepository, MockDogRepository 保持不变) ...
public static List fetchBatchedData(
Collection> partitionedKeys,
Function, List> dataFetcher) {
return partitionedKeys.stream()
.map(dataFetcher) // 应用传入的数据获取函数
.flatMap(List::stream)
.collect(Collectors.toList());
}
public static void main(String[] args) {
CatRepository catRepo = new MockCatRepository();
DogRepository dogRepo = new MockDogRepository();
AtomicInteger counter = new AtomicInteger();
Collection> partitionedListOfNumbers = IntStream.rangeClosed(1, 5000)
.boxed()
.collect(Collectors.groupingBy(num -> counter.getAndIncrement() / 500))
.values();
// 使用通用方法获取Cat列表
List catList = fetchBatchedData(partitionedListOfNumbers, catRepo::fetchCats);
// 使用通用方法获取Dog列表
List dogList = fetchBatchedData(partitionedListOfNumbers, dogRepo::fetchDogs);
System.out.println("\nTotal Cats fetched: " + catList.size());
System.out.println("Total Dogs fetched: " + dogList.size());
}
}
通过这种方式,我们不仅避免了共享可变状态,还提高了代码的模块化和复用性。
4.2 注意事项
- 性能考量:虽然Stream API通常效率很高,但在处理海量数据时,需要注意Stream操作可能带来的额外开销(如装箱/拆箱、中间集合的创建)。对于数据库操作,网络延迟和数据库自身的性能通常是主要瓶颈。
- 异常处理:在实际应用中,数据库查询可能会抛出异常。在Stream管道中处理异常需要额外的考虑,例如使用try-catch块包装map中的方法调用,或者使用Either、Optional等函数式错误处理模式。
- AtomicInteger的使用:AtomicInteger在这里用于为groupingBy操作生成批次键,它本身是可变的,但这种可变性是局部且受控的,不会影响最终结果列表的纯粹性。它是在Stream的中间操作中使用的,而不是在终端操作中用于累积最终结果。
- forEach的谨慎使用:forEach是一个终端操作,它允许对流中的每个元素执行一个动作。然而,如果这个动作涉及到修改外部状态,那么就可能引入副作用,从而破坏Stream API的函数式编程优势。在大多数情况下,当需要聚合或转换数据时,应优先考虑collect、reduce、map、filter等操作,而不是forEach。
5. 总结
通过将原始代码重构为使用Java Stream API,我们成功地消除了共享可变状态的问题。新的实现方式利用map、flatMap和collect等操作,以声明式、函数式编程风格处理数据,带来了以下好处:
- 避免副作用:结果列表在流管道的末端一次性构建,而不是在循环中逐步修改,使得代码更纯粹、更易于推理。
- 提高可读性:流管道清晰地表达了数据转换的意图,从分批到获取再到聚合,逻辑一目了然。
- 易于并行化:由于没有共享可变状态,这种代码结构更容易安全地转换为并行流(通过parallelStream()),从而利用多核处理器提高性能。
- 代码复用性:通过提取通用方法,可以进一步提高代码的复用性。
这种模式是Java中处理批量数据获取和转换的推荐方式,尤其是在需要遵循函数式编程原则和利用现代Java特性时。










