
在响应式编程中,不应在 `map()` 内部直接调用 `webclient` 并手动 `subscribe()`;而应使用 `flatmap()` 等异步转换操作符来组合 `mono`/`flux`,以保持响应式链的完整性与非阻塞性。
map() 是一个同步、纯函数式操作符,其设计初衷是将一个值(如 T)转换为另一个值(如 R),不引入任何异步副作用。一旦你在 map() 中执行 WebClient.post(...).subscribe(),就破坏了响应式流的核心契约:你不仅丢弃了返回的 Mono
正确的做法是使用 flatMap():它接受一个 T → Publisher
以下是重构后的关键代码片段(已移除 subscribe(),改用 flatMap,并优化错误处理与重试逻辑):
return mailTemplateMappingRepository
.findById(request.getTemplateKey())
.switchIfEmpty(Mono.error(new MailTemplateNotSupportedException(
"The template with key " + request.getTemplateKey() + " is not supported!!!")))
.flatMap(t -> {
log.info("sendEmailWithRetry: request {}", request);
log.info("sendEmailWithRetry: templateMappings {}", t);
// 同步逻辑仍可保留在 flatMap 内(如 token 刷新判断)
if (!businessUnitAuthTokens.containsKey(t.getExactTargetBusinessUnit())) {
updateBusinessUnitToken(t);
}
String token = "Bearer " + businessUnitAuthTokens.get(t.getExactTargetBusinessUnit());
String uri = exactTargetMessageDefinitionSendsUrl.replace("{key}", t.getExactTargetKey());
Map mailTriggerPayload = generateMailTriggerPayload(request);
// 定义重试策略(注意:retryWhen 作用于下游 Publisher)
RetryBackoffSpec is401RetrySpec = Retry.backoff(1, Duration.ofSeconds(2))
.filter(throwable -> throwable instanceof Unauthorized)
.doBeforeRetry(retrySignal -> {
log.warn("UNAUTHORIZED detected; refreshing token for BU: {}", t.getExactTargetBusinessUnit());
updateBusinessUnitToken(t);
})
.onRetryExhaustedThrow((spec, signal) ->
new ExactTargetException(
HttpStatus.UNAUTHORIZED.value(),
signal.failure().getMessage(),
"Authorization failed after retries for business unit: " + t.getExactTargetBusinessUnit()
)
);
// ✅ 正确:返回 Mono(由 WebClient 返回),由 flatMap 自动订阅并融合
return restClientService.post(uri, mailTriggerPayload, token, String.class)
.onErrorResume(error -> {
if (error instanceof ExactTargetException) {
return Mono.error(new ExactTargetException(
((ExactTargetException) error).getStatus(),
((ExactTargetException) error).getBody(),
"Exact Target error: status=" + ((ExactTargetException) error).getStatus()
+ ", body=" + ((ExactTargetException) error).getBody()
));
}
return Mono.error(error);
})
.retryWhen(is401RetrySpec);
}); 关键注意事项:
- ✅ 永远不要在 map() / filter() / doOnNext() 等同步操作符中调用 subscribe() —— 这是响应式编程中最常见的反模式。
- ✅ flatMap() 是处理“一个值 → 一个异步任务”的标准方式;若需并发多个请求,可考虑 flatMapSequential() 或 concatMap() 控制顺序。
- ⚠️ updateBusinessUnitToken(t) 若本身是阻塞或非响应式操作,应封装为 Mono.fromCallable(...) 并用 flatMap 组合,避免污染响应式线程(如 block())。
- ? 错误类型需精确匹配(如 Unauthorized 是否为自定义异常?确保 filter() 中能正确识别);建议统一使用 Spring 的 WebClientResponseException 子类便于标准化处理。
- ? 日志中避免打印敏感信息(如完整 token、payload),生产环境应脱敏。
遵循此模式,你的链将真正具备响应式特性:可组合、可背压、可追踪、可监控,并与 Spring WebFlux 生态无缝集成。










