
1. 问题描述与数据模型
在实际应用中,我们经常需要从一系列时间序列数据中,根据某些条件(如货币类型、日期)筛选出具有特定时间戳(最新或最早)的记录。例如,给定一个包含货币交易信息的列表,我们希望获取每种货币在每天的最新交易记录。
假设我们有如下的Currency数据模型:
import java.time.LocalDateTime;
import java.time.LocalDate; // 需要导入LocalDate
class Currency {
private Integer id;
private String name; // 货币名称,如 "USD", "BUSD"
private LocalDateTime lastReceived; // 最后接收时间戳
// 构造函数
public Currency(Integer id, String name, LocalDateTime lastReceived) {
this.id = id;
this.name = name;
this.lastReceived = lastReceived;
}
// Getter方法
public Integer getId() { return id; }
public String getName() { return name; }
public LocalDateTime getLastReceived() { return lastReceived; }
// 为了方便打印,重写toString方法
@Override
public String toString() {
return "Currency{" +
"id=" + id +
", name='" + name + '\'' +
", lastReceived=" + lastReceived +
'}';
}
}我们的目标是,对于给定的Currency列表,例如:
| ID | NAME | LAST_RECEIVED |
|---|---|---|
| 1 | USD | 2022-05-18 09:04:01.545 |
| 2 | USD | 2022-05-18 08:04:01.545 |
| 3 | USD | 2022-05-19 08:04:01.545 |
| 4 | USD | 2022-05-20 08:04:01.545 |
| 5 | USD | 2022-05-20 11:04:01.545 |
| 6 | BUSD | 2022-05-18 08:04:01.545 |
我们期望得到的结果是每种货币在每天的最新(最大)时间戳记录:
| ID | NAME | LAST_RECEIVED |
|---|---|---|
| 1 | USD | 2022-05-18 09:04:01.545 |
| 3 | USD | 2022-05-19 08:04:01.545 |
| 5 | USD | 2022-05-20 11:04:01.545 |
| 6 | BUSD | 2022-05-18 08:04:01.545 |
请注意,问题描述中提到了“least timestamp”,但根据提供的预期输出,实际需求是获取“latest timestamp”(即最大时间戳)。本教程将以获取“最新时间戳”为目标进行讲解。如果实际需求是“最早时间戳”,只需将maxBy替换为minBy。
立即学习“Java免费学习笔记(深入)”;
2. 使用Java 8 Stream API解决
Java 8的Stream API提供了强大的数据处理能力,结合Collectors.groupingBy和Collectors.collectingAndThen,我们可以优雅地解决这个问题。
2.1 获取所有货币在每天的最新记录
为了实现“按货币名称和日期分组,并获取最新时间戳”的需求,我们需要一个复合键来分组。这个复合键应该包含货币名称和lastReceived字段的日期部分。
import java.time.LocalDateTime;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
public class CurrencyProcessor {
public static void main(String[] args) {
List data = Arrays.asList(
new Currency(1, "USD", LocalDateTime.parse("2022-05-18T09:04:01.545")),
new Currency(2, "USD", LocalDateTime.parse("2022-05-18T08:04:01.545")),
new Currency(3, "USD", LocalDateTime.parse("2022-05-19T08:04:01.545")),
new Currency(4, "USD", LocalDateTime.parse("2022-05-20T08:04:01.545")),
new Currency(5, "USD", LocalDateTime.parse("2022-05-20T11:04:01.545")),
new Currency(6, "BUSD", LocalDateTime.parse("2022-05-18T08:04:01.545"))
);
// 1. 分组并获取每日期货的最新记录
List latestByDateAndCurrency = new ArrayList<>(data
.stream()
.collect(Collectors.groupingBy(
// 复合键:货币名称 + 日期
curr -> Arrays.asList(curr.getName(), curr.getLastReceived().toLocalDate()),
// 收集器:先找到最大值,然后从Optional中取出
Collectors.collectingAndThen(
Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)),
Optional::get // 确保Optional有值,否则抛出NoSuchElementException
)
))
.values()); // 获取Map中的所有值,即筛选出的Currency对象
System.out.println("所有货币在每天的最新记录 (未排序):");
latestByDateAndCurrency.forEach(System.out::println);
// 2. 对结果进行排序(如果需要)
// 可以直接对上述列表进行排序
latestByDateAndCurrency.sort(
Comparator.comparing(Currency::getName) // 先按货币名称排序
.thenComparing(Currency::getLastReceived) // 再按时间戳排序
);
System.out.println("\n所有货币在每天的最新记录 (排序后):");
latestByDateAndCurrency.forEach(System.out::println);
}
} 代码解析:
- data.stream(): 创建一个Currency对象的Stream。
-
Collectors.groupingBy(...): 这是核心操作。它根据提供的分类函数将Stream中的元素分组到一个Map中。
- 分类函数 curr -> Arrays.asList(curr.getName(), curr.getLastReceived().toLocalDate()): 这里我们创建了一个List作为复合键。它包含了货币名称(curr.getName())和lastReceived字段的日期部分(curr.getLastReceived().toLocalDate())。这样可以确保同一货币在同一天的记录被分到同一个组。
-
下游收集器 Collectors.collectingAndThen(Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)), Optional::get):
- Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)): 这是groupingBy的二级收集器,它会在每个分组内部找到lastReceived时间戳最大的Currency对象。maxBy返回一个Optional
。 - Optional::get: collectingAndThen的作用是在前一个收集器(maxBy)的结果上执行一个转换函数。在这里,我们使用Optional::get从Optional
中取出实际的Currency对象。注意:使用Optional::get时需谨慎,如果分组为空(通常不会发生在这种场景下),它会抛出NoSuchElementException。在生产代码中,更安全的做法是使用orElse或orElseThrow。
- Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)): 这是groupingBy的二级收集器,它会在每个分组内部找到lastReceived时间戳最大的Currency对象。maxBy返回一个Optional
-
.values(): groupingBy操作返回一个Map
- , Currency>。我们通过.values()方法获取Map中所有的值(即我们筛选出的Currency对象),这些值就是我们需要的最新记录。
- new ArrayList(...): 将Map的values()集合转换为ArrayList。
- 排序: 如果需要对最终结果进行排序,可以直接对latestByDateAndCurrency列表使用sort方法,结合Comparator.comparing和thenComparing进行多字段排序。
2.2 获取特定货币在每天的最新记录
如果只需要获取特定货币(例如"USD")的最新记录,可以在Stream操作开始时添加一个filter步骤。
import java.util.stream.Collectors;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.time.LocalDateTime;
public class SpecificCurrencyProcessor {
public static void main(String[] args) {
List data = Arrays.asList(
new Currency(1, "USD", LocalDateTime.parse("2022-05-18T09:04:01.545")),
new Currency(2, "USD", LocalDateTime.parse("2022-05-18T08:04:01.545")),
new Currency(3, "USD", LocalDateTime.parse("2022-05-19T08:04:01.545")),
new Currency(4, "USD", LocalDateTime.parse("2022-05-20T08:04:01.545")),
new Currency(5, "USD", LocalDateTime.parse("2022-05-20T11:04:01.545")),
new Currency(6, "BUSD", LocalDateTime.parse("2022-05-18T08:04:01.545"))
);
String targetCurrency = "USD";
List lastUSDByDate = new ArrayList<>(data
.stream()
.filter(curr -> targetCurrency.equalsIgnoreCase(curr.getName())) // 筛选特定货币
.collect(Collectors.groupingBy(
curr -> curr.getLastReceived().toLocalDate(), // 此时分组键只需日期
Collectors.collectingAndThen(
Collectors.maxBy(Comparator.comparing(Currency::getLastReceived)),
Optional::get
)
))
.values()
);
// 对结果进行排序
lastUSDByDate.sort(Comparator.comparing(Currency::getLastReceived));
System.out.println(targetCurrency + " 在每天的最新记录:");
lastUSDByDate.forEach(System.out::println);
}
} 代码解析:
- .filter(curr -> targetCurrency.equalsIgnoreCase(curr.getName())): 在分组之前,先过滤出我们感兴趣的特定货币(例如"USD")的记录。
- groupingBy(curr -> curr.getLastReceived().toLocalDate(), ...): 由于已经筛选了特定货币,分组键就不需要再包含货币名称,只需按日期分组即可。
3. 原生SQL查询替代方案(适用于数据库层面)
如果数据量庞大,并且数据存储在关系型数据库中,使用Java Stream API在内存中处理可能效率不高。此时,利用数据库的强大查询能力,特别是窗口函数,是更优的选择。
以下是一个使用PostgreSQL的SQL查询示例,通过窗口函数实现相同的功能:
SELECT id, name, last_received
FROM (
SELECT c.*,
ROW_NUMBER() OVER (
PARTITION BY name, to_char(last_received, 'yyyy-MM-dd')
ORDER BY last_received DESC
) AS rn -- 或者使用ROW_NUMBER(), RANK(), DENSE_RANK()
FROM Currency c
WHERE c.name = :currName -- 如果需要筛选特定货币
) tbl
WHERE rn = 1
ORDER BY last_received;SQL查询解析:
- *内层查询 `SELECT c., ROW_NUMBER() OVER (...) AS rn FROM Currency c WHERE c.name = :currName`**:
- ROW_NUMBER() OVER (PARTITION BY name, to_char(last_received, 'yyyy-MM-dd') ORDER BY last_received DESC): 这是核心的窗口函数。
- PARTITION BY name, to_char(last_received, 'yyyy-MM-dd'): 这指定了分组的依据。它会根据货币名称和last_received字段的日期部分进行分区。在每个分区内部,行号会独立计算。
- ORDER BY last_received DESC: 在每个分区内部,行会根据last_received时间戳降序排列(最新时间戳排在最前面)。
- AS rn: 为计算出的行号指定别名rn。
- ROW_NUMBER() OVER (PARTITION BY name, to_char(last_received, 'yyyy-MM-dd') ORDER BY last_received DESC): 这是核心的窗口函数。
-
外层查询 SELECT id, name, last_received FROM (...) tbl WHERE rn = 1 ORDER BY last_received:
- WHERE rn = 1: 从内层查询的结果中,我们只选择每个分区中rn为1的记录,这正是每个分组中last_received最新的那条记录。
- ORDER BY last_received: 对最终结果按时间戳进行排序。
JPA集成:
如果使用JPA,可以将上述原生SQL查询集成到你的Repository接口中,例如:
import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.Query; import org.springframework.data.repository.query.Param; import java.util.List; public interface CurrencyRepository extends JpaRepository{ @Query(nativeQuery = true, value = """ SELECT id, name, last_received FROM ( SELECT c.*, ROW_NUMBER() OVER ( PARTITION BY name, to_char(last_received, 'yyyy-MM-dd') ORDER BY last_received DESC ) AS rn FROM Currency c WHERE c.name = :currName ) tbl WHERE rn = 1 ORDER BY last_received """) List findLastByDateByCurrencyName(@Param("currName") String currName); }
4. 注意事项与总结
- “最新”与“最早”: 本文示例是获取“最新”时间戳(maxBy)。如果需要获取“最早”时间戳,只需将Collectors.maxBy替换为Collectors.minBy。
- Optional::get的风险: 在Collectors.collectingAndThen中使用Optional::get时,如果分组为空,会抛出NoSuchElementException。在大多数实际场景中,groupingBy通常不会产生空分组,但为了健壮性,可以考虑使用Optional.orElse(null)或Optional.orElseThrow(() -> new MyCustomException("No element found in group"))。
-
性能考量:
- 对于小到中等规模的数据集,Java 8 Stream API在内存中处理是高效且简洁的。
- 对于大规模数据集,尤其是在数据库中,原生SQL查询(特别是利用窗口函数)通常能提供更好的性能,因为它利用了数据库的索引和优化能力,避免了将大量数据加载到应用内存中进行处理。
- 日期处理: LocalDateTime.toLocalDate()方法是关键,它确保我们只比较日期部分,忽略时间,从而实现“每天”的聚合。
- 可读性: Java 8 Stream API提供了声明式编程风格,使得代码意图清晰,易于理解和维护。
通过本文的讲解,你应该能够熟练运用Java 8 Stream API解决按日期和特定字段分组并筛选最新/最早记录的问题,并了解在数据库层面如何使用原生SQL实现相同功能。根据你的数据量和系统架构,选择最适合的解决方案。










