
本教程深入探讨如何使用 java stream api 处理复杂的数据聚合需求,特别是针对多条件分组、求和以及去重计数。通过构建自定义统计模型和巧妙运用 `collectors.groupingby` 结合 `collectors.reducing`,文章展示了如何高效且准确地从嵌套集合中提取所需数据,解决按月份统计总值和独立人数的常见挑战。
在进行数据处理之前,首先需要明确数据结构。本教程将围绕 Person 对象进行聚合操作,并期望将结果封装到 DTO 对象中。为了代码的简洁性,我们使用 Java 14 引入的 record 类型定义数据模型。
import java.time.LocalDate;
import java.math.BigDecimal; // DTO中使用BigDecimal
// 定义事件状态枚举
enum Statement {
STATUS1, STATUS2, STATUS3, STATUS4 // 示例中可能包含更多状态
}
// Person 记录,代表一个事件或一个人的某次记录
record Person(String id,
Statement event,
LocalDate eventDate,
int value) {} // 简化 value 类型为 int
// 最终结果的 DTO
record DTO(int month,
BigDecimal totalSum,
int totalPersons) {}初始数据可能以 Map<String, List<Person>> 的形式存在,其中键是 pId(Person ID),值是该 pId 对应的 Person 对象列表。
我们的目标是:
原始尝试中,常见的错误是直接对每个 Person 记录进行计数,导致 totalPersons 统计的是事件数量而非独立个体数量。例如,如果 per1 在同一个月内有两次记录,我们希望它只被计数一次。
立即学习“Java免费学习笔记(深入)”;
为了同时处理求和与去重计数,并确保聚合逻辑的清晰与可复用性,我们引入一个自定义的聚合器 PersonGroupMetric。这个 record 将存储每个分组的中间统计结果。
record PersonGroupMetric(int count, int sum) {
// 定义一个空的度量值,作为 reducing 操作的初始值
public static final PersonGroupMetric EMPTY = new PersonGroupMetric(0, 0);
// 构造函数:将一个 Person 对象映射为初始的 PersonGroupMetric
// 注意:此处的 count 统计的是事件数量,后续会处理去重
public PersonGroupMetric(Person p) {
this(1, p.value());
}
// 合并方法:定义如何将两个 PersonGroupMetric 实例合并
public PersonGroupMetric add(PersonGroupMetric other) {
return new PersonGroupMetric(
this.count + other.count, // 累加事件计数
this.sum + other.sum // 累加值总和
);
}
}关键点: PersonGroupMetric 的 count 字段在 add 方法中累加的是事件数量,而不是独立人数。这是 Collectors.reducing 的一个特性,它会根据 mapper 将每个元素转换为 PersonGroupMetric,然后通过 combiner 累加。要实现独立人数计数,我们需要在 PersonGroupMetric 中额外存储 id,或者在聚合的后期进行处理。考虑到问题描述中 Person Count 的期望结果(月1为3人,月3为2人),这暗示了我们需要对 id 进行去重。
为了实现按月份分组,并计算总和与去重人数,我们将结合使用 Collectors.groupingBy 和 Collectors.reducing。然而,reducing 本身难以直接处理去重计数。因此,我们将采取两步走的策略:
第一步:按月份分组,计算总和,并收集所有 Person 的 id。
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.stream.Collectors.*;
// 假设 employees 是 Map<String, List<Person>>
// 为了简化示例,我们直接从 List<Person> src 开始
var src = List.of(
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 1, 10), 1),
new Person("per2", Statement.STATUS2, LocalDate.of(2022, 1, 10), 2),
new Person("per3", Statement.STATUS3, LocalDate.of(2022, 1, 10), 3),
new Person("per1", Statement.STATUS4, LocalDate.of(2022, 1, 10), 1), // per1 在月1有重复记录
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 2, 10), 1),
new Person("per2", Statement.STATUS1, LocalDate.of(2022, 3, 10), 1),
new Person("per3", Statement.STATUS2, LocalDate.of(2022, 3, 10), 2)
);
// 定义一个临时的聚合结果类,用于在中间阶段存储总和和所有ID
record TempMonthMetric(int sum, Set<String> uniqueIds) {
public static final TempMonthMetric EMPTY = new TempMonthMetric(0, ConcurrentHashMap.newKeySet()); // 使用并发Set以防并行流
public TempMonthMetric(Person p) {
this(p.value(), Set.of(p.id()));
}
public TempMonthMetric add(TempMonthMetric other) {
Set<String> combinedIds = ConcurrentHashMap.newKeySet();
combinedIds.addAll(this.uniqueIds);
combinedIds.addAll(other.uniqueIds);
return new TempMonthMetric(this.sum + other.sum, combinedIds);
}
}
Map<Integer, TempMonthMetric> monthlyAggregations = src.stream()
.collect(groupingBy(
p -> p.eventDate().getMonthValue(),
reducing(
TempMonthMetric.EMPTY,
TempMonthMetric::new,
TempMonthMetric::add
)
));代码解析:
第二步:将中间结果映射到最终的 DTO。
import java.util.Comparator;
import java.math.BigDecimal;
List<DTO> fin = monthlyAggregations.entrySet().stream()
.map(entry -> new DTO(
entry.getKey(), // 月份
new BigDecimal(entry.getValue().sum()), // 总和
entry.getValue().uniqueIds().size() // 独立人数 = 集合大小
))
.sorted(Comparator.comparing(DTO::month)) // 按月份排序
.collect(toList());
// 打印结果
fin.forEach(System.out::println);预期输出:
DTO[month=1, totalSum=7, totalPersons=3] DTO[month=2, totalSum=1, totalPersons=1] DTO[month=3, totalSum=3, totalPersons=2]
这与问题中期望的 Person Count 结果一致。
为了方便读者理解和运行,以下是包含所有必要类和逻辑的完整代码示例:
import java.time.LocalDate;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.math.BigDecimal;
import java.util.Comparator;
import static java.util.stream.Collectors.*;
public class StreamAggregationTutorial {
// 定义事件状态枚举
enum Statement {
STATUS1, STATUS2, STATUS3, STATUS4
}
// Person 记录,代表一个事件或一个人的某次记录
record Person(String id,
Statement event,
LocalDate eventDate,
int value) {}
// 最终结果的 DTO
record DTO(int month,
BigDecimal totalSum,
int totalPersons) {}
// 临时的聚合结果类,用于在中间阶段存储总和和所有ID
record TempMonthMetric(int sum, Set<String> uniqueIds) {
// 定义一个空的度量值,作为 reducing 操作的初始值
// 使用 ConcurrentHashMap.newKeySet() 以支持并行流的安全集合操作
public static final TempMonthMetric EMPTY = new TempMonthMetric(0, ConcurrentHashMap.newKeySet());
// 构造函数:将一个 Person 对象映射为初始的 TempMonthMetric
public TempMonthMetric(Person p) {
this(p.value(), Set.of(p.id()));
}
// 合并方法:定义如何将两个 TempMonthMetric 实例合并
public TempMonthMetric add(TempMonthMetric other) {
Set<String> combinedIds = ConcurrentHashMap.newKeySet();
combinedIds.addAll(this.uniqueIds);
combinedIds.addAll(other.uniqueIds);
return new TempMonthMetric(this.sum + other.sum, combinedIds);
}
}
public static void main(String[] args) {
// 示例数据
var src = List.of(
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 1, 10), 1),
new Person("per2", Statement.STATUS2, LocalDate.of(2022, 1, 10), 2),
new Person("per3", Statement.STATUS3, LocalDate.of(2022, 1, 10), 3),
new Person("per1", Statement.STATUS4, LocalDate.of(2022, 1, 10), 1), // per1 在月1有重复记录
new Person("per1", Statement.STATUS1, LocalDate.of(2022, 2, 10), 1),
new Person("per2", Statement.STATUS1, LocalDate.of(2022, 3, 10), 1),
new Person("per3", Statement.STATUS2, LocalDate.of(2022, 3, 10), 2)
);
// 第一步:按月份分组,计算总和,并收集所有 Person 的 id
Map<Integer, TempMonthMetric> monthlyAggregations = src.stream()
.collect(groupingBy(
p -> p.eventDate().getMonthValue(),
reducing(
TempMonthMetric.EMPTY,
TempMonthMetric::new,
TempMonthMetric::add
)
));
// 第二步:将中间结果映射到最终的 DTO
List<DTO> result = monthlyAggregations.entrySet().stream()
.map(entry -> new DTO(
entry.getKey(), // 月份
new BigDecimal(entry.getValue().sum()), // 总和
entry.getValue().uniqueIds().size() // 独立人数 = 集合大小
))
.sorted(Comparator.comparing(DTO::month)) // 按月份排序
.collect(toList());
// 打印最终结果
System.out.println("--- 最终聚合结果 ---");
result.forEach(System.out::println);
}
}本教程详细展示了如何利用 Java Stream API 的 Collectors.groupingBy 和 Collectors.reducing 来解决复杂的数据聚合问题,特别是涉及多条件分组、求和以及去重计数的需求。通过创建自定义的中间聚合器 (TempMonthMetric),我们能够灵活地在流处理过程中捕获和合并所需的所有统计信息,最终精确地计算出按月份分组的总和与独立人数。这种模式对于处理类似的数据分析和报表生成任务具有很高的参考价值。
以上就是Java Stream API 高级聚合:按多条件分组计算总和与去重计数的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号