
本文旨在解决在使用 Java 客户端从 Google Pub/Sub 拉取消息时遇到的高延迟问题。通过分析同步拉取模式的局限性,并提供异步流式拉取的替代方案,帮助开发者优化消息拉取效率,降低延迟,从而更有效地处理大量消息。
在使用 Google Pub/Sub 时,开发者可能会遇到从主题中拉取消息时延迟较高的问题,尤其是在处理大量消息时。本文将探讨如何优化 Java 客户端的配置,以减少消息拉取延迟,提高吞吐量。
同步拉取的局限性
通常情况下,开发者会采用同步拉取模式,设置 maxMessages 参数来控制每次请求拉取的消息数量。然而,即使设置了较大的 maxMessages 值,实际拉取到的消息数量可能远小于预期。这主要是因为 Pub/Sub 服务在吞吐量和延迟之间进行权衡,倾向于快速返回部分消息,而不是等待收集到最大数量的消息。
问题示例代码:
立即学习“Java免费学习笔记(深入)”;
public ListgetMessagesFromSubscription(String projectId, String subscriptionId, int numOfMessages, CredentialsProvider credentialsProvider) { List receivedMessages = new ArrayList<>(); try { SubscriberStubSettings subscriberStubSettings = getSubscriberStubSettings(credentialsProvider); try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) { String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId); PullRequest pullRequest = PullRequest.newBuilder() .setMaxMessages(100) .setSubscription(subscriptionName) .build(); PullResponse pullResponse = subscriber.pullCallable().call(pullRequest); List ackIds = new ArrayList<>(); for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) { ackIds.add(message.getAckId()); ModifyAckDeadlineRequest modifyAckDeadlineRequest = ModifyAckDeadlineRequest.newBuilder() .setSubscription(subscriptionName) .addAckIds(message.getAckId()) .setAckDeadlineSeconds(30) .build(); subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest); } if (ackIds.isEmpty()) { // my logic } else { AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder() .setSubscription(subscriptionName) .addAllAckIds(ackIds) .build(); subscriber.acknowledgeCallable().call(acknowledgeRequest); receivedMessages = new ArrayList<>(pullResponse.getReceivedMessagesList()); } } LOG.info("getMessagesFromSubscription: Received {} Messages for Project Id: {} and" + " Subscription Id: {}.", receivedMessages.size(), projectId, subscriptionId); } catch (Exception e) { LOG.error("getMessagesFromSubscription: Error while pulling message from Pub/Sub " + "from Project ID: {} and Subscription ID: {}", projectId, subscriptionId, e); } return receivedMessages; } private SubscriberStubSettings getSubscriberStubSettings(CredentialsProvider credentialsProvider) throws IOException { SubscriberStubSettings.Builder subscriberStubSettingsBuilder = SubscriberStubSettings .newBuilder() .setTransportChannelProvider(SubscriberStubSettings .defaultGrpcTransportProviderBuilder() .setMaxInboundMessageSize(20 << 20) .build()); if (credentialsProvider != null) { subscriberStubSettingsBuilder = subscriberStubSettingsBuilder.setCredentialsProvider(credentialsProvider); } return subscriberStubSettingsBuilder.build(); }
上述代码展示了同步拉取消息的典型实现。虽然设置了 maxMessages 为 100,但实际每次拉取的消息数量可能远小于这个值,导致整体延迟增加。
优化策略:异步流式拉取
为了最大限度地提高吞吐量并降低延迟,建议使用异步流式拉取模式。异步拉取允许同时发出多个拉取请求,从而使 Pub/Sub 服务能够更好地利用资源,并根据订阅者的处理能力调整消息发送速率。
异步流式拉取的优势:
- 高吞吐量: 通过并发处理多个拉取请求,显著提高消息处理速度。
- 低延迟: Pub/Sub 服务可以更快地将消息推送到订阅者,减少等待时间。
- 资源利用率高: 充分利用网络和计算资源,提高整体效率。
实现异步流式拉取的步骤:
- 创建流式订阅者: 使用 Subscriber 类创建一个异步流式订阅者。
- 实现回调函数: 定义一个回调函数,用于处理接收到的消息。
- 处理消息: 在回调函数中,对接收到的消息进行处理,并确认消息。
- 启动订阅者: 启动订阅者,开始接收消息。
示例代码 (简要说明,完整代码请参考官方文档):
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.SubscriptionName;
// ...
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
SubscriptionName subscriptionName = SubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack();
};
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
try {
subscriber.startAsync().awaitRunning();
System.out.println("Listening for messages on " + subscriptionName.toString());
// Keep the main thread alive to allow message processing.
Thread.sleep(Long.MAX_VALUE);
} catch (Exception e) {
// Handle error
subscriber.stopAsync();
}注意事项:
- 确保你的应用程序能够处理并发消息。
- 合理设置确认截止时间,避免消息重复传递。
- 监控应用程序的性能,根据实际情况调整配置。
总结
通过采用异步流式拉取模式,可以显著降低 Google Pub/Sub 消息拉取的延迟,提高吞吐量,从而更有效地处理大量消息。在实际应用中,需要根据具体的业务场景和性能需求,选择合适的拉取模式和配置参数,以达到最佳的性能表现。同时,监控和调优是持续的过程,需要不断地根据实际情况进行调整。











