
本教程详细讲解如何利用java stream api,结合`collectors.groupingby`与`collectors.reducing`进行复杂数据聚合。我们将通过自定义度量模型,实现按月分组统计特定数值的总和,并计算每个月内满足条件的记录数量,同时探讨如何扩展以实现独立实体的去重计数。
在现代Java应用中,数据处理和聚合是常见的任务。Java Stream API提供了强大而灵活的工具来处理集合数据,但当需要同时进行多条件分组、求和、计数等复杂聚合时,简单的counting()或summingInt()可能无法满足需求。本文将深入探讨如何利用Collectors.groupingBy结合Collectors.reducing,通过自定义聚合逻辑来解决这类问题。
1. 核心数据模型定义
首先,我们定义处理数据所需的基本模型类。为了简洁,这里使用Java 14+ 引入的 record 类型。
import java.time.LocalDate;
import java.math.BigDecimal; // 考虑到数值精度,使用BigDecimal
// 事件状态枚举
enum Statement {
STATUS1, STATUS2, STATUS3, STATUS4 // 示例中包含STATUS4
}
// 原始Person数据模型
record Person(String id,
Statement event,
LocalDate eventDate,
int value) {} // 简化value为int类型
// 最终聚合结果的数据传输对象
record DTO(int month,
BigDecimal totalSum,
int totalPersons) {}2. 自定义聚合度量模型 (PersonGroupMetric)
为了在一次Stream操作中同时累积多个聚合值(例如总和与计数),我们需要一个自定义的度量模型。这个模型将封装我们感兴趣的聚合状态,并提供合并这些状态的方法。
record PersonGroupMetric(int count, int sum) {
// 定义一个空度量,作为reducing操作的初始值
public static final PersonGroupMetric EMPTY = new PersonGroupMetric(0, 0);
// 构造函数:将单个Person对象转换为初始度量
public PersonGroupMetric(Person p) {
// 每遇到一个Person对象,计数加1,总和累加其value
this(1, p.value());
}
// 合并方法:将两个PersonGroupMetric实例的计数和总和相加
public PersonGroupMetric add(PersonGroupMetric other) {
return new PersonGroupMetric(
this.count + other.count,
this.sum + other.sum
);
}
}PersonGroupMetric记录了每个组(这里是每个月)的记录数量和值总和。
立即学习“Java免费学习笔记(深入)”;
- EMPTY常量作为reducing操作的“身份元素”,表示一个初始的、不影响聚合结果的度量。
- PersonGroupMetric(Person p)构造函数负责将Stream中的每个Person对象映射成一个初始的PersonGroupMetric实例。
- add方法则定义了如何将两个PersonGroupMetric实例合并成一个新的实例,这是reducing操作的核心逻辑。
3. Stream聚合逻辑
现在,我们利用Collectors.groupingBy和Collectors.reducing来实现复杂的聚合。
假设我们有以下示例数据:
import java.util.List;
import java.util.Map;
import static java.util.stream.Collectors.*; // 导入所有Collectors静态方法
public class StreamAggregationExample {
public static void main(String[] args) {
var src = List.of(
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 01, 10), 1),
new Person("per2", Statement.STATUS2, LocalDate.of(2022, 01, 10), 2),
new Person("per3", Statement.STATUS3, LocalDate.of(2022, 01, 10), 3),
new Person("per1", Statement.STATUS4, LocalDate.of(2022, 01, 10), 1), // per1在1月份出现两次
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 02, 10), 1),
new Person("per2", Statement.STATUS1, LocalDate.of(2022, 03, 10), 1),
new Person("per3", Statement.STATUS2, LocalDate.of(2022, 03, 10), 2)
);
// 使用groupingBy和reducing进行聚合
Map aggregatedMetrics = src.stream()
// 可以选择性地添加过滤条件,例如只处理特定STATUS
// .filter(p -> p.event() == Statement.STATUS1 || p.event() == Statement.STATUS2)
.collect(groupingBy(
p -> p.eventDate().getMonthValue(), // 按月份分组
reducing(
PersonGroupMetric.EMPTY, // 1. 初始值 (identity)
PersonGroupMetric::new, // 2. 映射器 (mapper): 将Person转换为PersonGroupMetric
PersonGroupMetric::add // 3. 累加器 (accumulator): 合并PersonGroupMetric
)
));
System.out.println("聚合结果 (Map):");
aggregatedMetrics.forEach((month, metric) ->
System.out.println("Month: " + month + ", Count: " + metric.count() + ", Sum: " + metric.sum()));
}
} 在这个Stream操作中:
- groupingBy(p -> p.eventDate().getMonthValue(), ...):首先根据Person对象的eventDate字段提取月份值,作为分组的键。
- reducing(...):这是实现自定义聚合的关键。
- PersonGroupMetric.EMPTY:reducing操作的起始值,当一个组第一次被处理时,或者当没有元素时,它将作为基础。
- PersonGroupMetric::new:这是一个映射函数,它将Stream中的每个Person对象转换为一个PersonGroupMetric实例。
- PersonGroupMetric::add:这是一个累加器(或组合器)函数,它定义了如何将两个PersonGroupMetric实例合并成一个。groupingBy在处理同一个组内的元素时,会反复调用这个方法来累积结果。
4. 结果转换与展示
聚合完成后,我们得到一个Map
// ... (接上文的main方法)
// 将聚合结果映射到最终的DTO列表
List finalResult = aggregatedMetrics.entrySet().stream()
.map(entry -> new DTO(
entry.getKey(), // 月份
new BigDecimal(entry.getValue().sum()), // 总和 (转换为BigDecimal)
entry.getValue().count() // 记录数量
))
.sorted((dto1, dto2) -> Integer.compare(dto1.month(), dto2.month())) // 按月份排序
.collect(toList());
System.out.println("\n最终结果 (List):");
finalResult.forEach(System.out::println);
}
} 输出结果:
聚合结果 (Map): Month: 1, Count: 4, Sum: 7 Month: 2, Count: 1, Sum: 1 Month: 3, Count: 2, Sum: 3 最终结果 (List ): DTO[month=1, totalSum=7, totalPersons=4] DTO[month=2, totalSum=1, totalPersons=1] DTO[month=3, totalSum=3, totalPersons=2]
5. 注意事项与扩展:实现独立人数统计
当前PersonGroupMetric中的count字段统计的是Person记录的出现次数。例如,在1月份,per1出现了两次,因此count为4(per1(STATUS1), per2(STATUS2), per3(STATUS3), per1(STATUS4))。然而,原始需求中“Person Count”可能指的是独立(去重)的人数(即1月份是3人:per1, per2, per3)。
要实现独立人数统计,我们需要修改PersonGroupMetric来存储独立的人员ID。
import java.util.HashSet; import java.util.Set; // 修改后的聚合度量模型,包含独立ID集合 record PersonGroupMetricWithDistinct(SetdistinctIds, int sum) { public static final PersonGroupMetricWithDistinct EMPTY = new PersonGroupMetricWithDistinct(new HashSet<>(), 0); public PersonGroupMetricWithDistinct(Person p) { this(new HashSet<>(Set.of(p.id())), p.value()); // 初始化时添加当前Person的ID } public PersonGroupMetricWithDistinct add(PersonGroupMetricWithDistinct other) { Set mergedIds = new HashSet<>(this.distinctIds); mergedIds.addAll(other.distinctIds); // 合并独立ID集合 return new PersonGroupMetricWithDistinct( mergedIds, this.sum + other.sum ); } }
然后,在Stream聚合逻辑中,使用新的PersonGroupMetricWithDistinct:
// ... (接上文的main方法,替换PersonGroupMetric为PersonGroupMetricWithDistinct)
Map aggregatedMetricsDistinct = src.stream()
.collect(groupingBy(
p -> p.eventDate().getMonthValue(),
reducing(
PersonGroupMetricWithDistinct.EMPTY,
PersonGroupMetricWithDistinct::new,
PersonGroupMetricWithDistinct::add
)
));
System.out.println("\n聚合结果 (Map):");
aggregatedMetricsDistinct.forEach((month, metric) ->
System.out.println("Month: " + month + ", Distinct Person Count: " + metric.distinctIds().size() + ", Sum: " + metric.sum()));
// 将聚合结果映射到最终的DTO列表 (使用独立人数)
List finalResultDistinct = aggregatedMetricsDistinct.entrySet().stream()
.map(entry -> new DTO(
entry.getKey(),
new BigDecimal(entry.getValue().sum()),
entry.getValue().distinctIds().size() // 使用独立ID的数量
))
.sorted((dto1, dto2) -> Integer.compare(dto1.month(), dto2.month()))
.collect(toList());
System.out.println("\n最终结果 (List - 独立人数):");
finalResultDistinct.forEach(System.out::println); 此时的输出将更符合独立人数的统计:
聚合结果 (Map): Month: 1, Distinct Person Count: 3, Sum: 7 Month: 2, Distinct Person Count: 1, Sum: 1 Month: 3, Distinct Person Count: 2, Sum: 3 最终结果 (List - 独立人数): DTO[month=1, totalSum=7, totalPersons=3] DTO[month=2, totalSum=1, totalPersons=1] DTO[month=3, totalSum=3, totalPersons=2]
总结
通过本教程,我们学习了如何利用Java Stream API中的Collectors.groupingBy与Collectors.reducing,结合自定义的度量模型,实现复杂的数据聚合任务。这种方法不仅能够灵活地进行多条件分组,还能在一次Stream操作中同时计算多个聚合指标,如总和与计数。特别是当需要进行去重计数时,通过在自定义度量模型中引入Set来跟踪独立实体,可以优雅地解决这一挑战。掌握这种高级聚合技巧,将显著提升您在Java数据处理方面的能力。










