
本文旨在帮助开发者优化 Google Cloud Pub/Sub 中使用 Java 客户端从拉取订阅模型中获取消息时遇到的高延迟问题。我们将分析同步拉取模式的局限性,并推荐使用异步流式拉取模式以实现更低的延迟和更高的吞吐量。
在使用 Google Cloud Pub/Sub 时,开发者可能会遇到从拉取订阅中获取消息时的高延迟问题,尤其是在处理大量消息时。本文将探讨如何通过调整拉取策略来优化消息传递,从而降低延迟并提高吞吐量。
在同步拉取模式下,客户端发送一个拉取请求,并等待服务器响应。虽然这种模式易于实现,但在高负载情况下可能会遇到性能瓶颈。问题在于,每次只发送一个拉取请求,这限制了客户端可以接收消息的速率。Pub/Sub 服务为了平衡延迟和完整性,倾向于快速返回部分消息,而不是等待收集到最大数量的消息。
以下是一个同步拉取的示例代码:
public List<ReceivedMessage> getMessagesFromSubscription(String projectId, String subscriptionId, int numOfMessages,
CredentialsProvider credentialsProvider) {
List<ReceivedMessage> receivedMessages = new ArrayList<>();
try {
SubscriberStubSettings subscriberStubSettings = getSubscriberStubSettings(credentialsProvider);
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest = PullRequest.newBuilder()
.setMaxMessages(100)
.setSubscription(subscriptionName)
.build();
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
ackIds.add(message.getAckId());
ModifyAckDeadlineRequest modifyAckDeadlineRequest = ModifyAckDeadlineRequest.newBuilder()
.setSubscription(subscriptionName)
.addAckIds(message.getAckId())
.setAckDeadlineSeconds(30)
.build();
subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
}
if (ackIds.isEmpty()) {
// my logic
} else {
AcknowledgeRequest acknowledgeRequest = AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
subscriber.acknowledgeCallable().call(acknowledgeRequest);
receivedMessages = new ArrayList<>(pullResponse.getReceivedMessagesList());
}
}
LOG.info("getMessagesFromSubscription: Received {} Messages for Project Id: {} and" +
" Subscription Id: {}.", receivedMessages.size(), projectId, subscriptionId);
} catch (Exception e) {
LOG.error("getMessagesFromSubscription: Error while pulling message from Pub/Sub " +
"from Project ID: {} and Subscription ID: {}", projectId, subscriptionId, e);
}
return receivedMessages;
}
private SubscriberStubSettings getSubscriberStubSettings(CredentialsProvider credentialsProvider) throws IOException {
SubscriberStubSettings.Builder subscriberStubSettingsBuilder = SubscriberStubSettings
.newBuilder()
.setTransportChannelProvider(SubscriberStubSettings
.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 << 20)
.build());
if (credentialsProvider != null) {
subscriberStubSettingsBuilder = subscriberStubSettingsBuilder.setCredentialsProvider(credentialsProvider);
}
return subscriberStubSettingsBuilder.build();
}这段代码展示了如何使用同步拉取来从 Pub/Sub 订阅中获取消息。它创建了一个 PullRequest,设置了最大消息数量,并调用 subscriber.pullCallable().call(pullRequest) 来同步地获取消息。
为了克服同步拉取的局限性,建议使用异步流式拉取模式。这种模式允许客户端同时发出多个拉取请求,从而显著提高吞吐量并降低延迟。
异步流式拉取的优势:
如何实现异步流式拉取 (示例):
Google Cloud Pub/Sub 提供了异步 API,允许开发者实现流式拉取。以下是一个简化的示例,展示了如何使用 Subscriber 类进行异步拉取:
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
public class AsynchronousPullExample {
public static void main(String[] args) throws Exception {
String projectId = "your-project-id";
String subscriptionId = "your-subscription-id";
ProjectSubscriptionName subscriptionName =
ProjectSubscriptionName.of(projectId, subscriptionId);
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) -> {
System.out.println("Received message: " + message.getData().toStringUtf8());
consumer.ack(); // Acknowledge the message
};
Subscriber subscriber = null;
try {
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
subscriber.startAsync().awaitRunning();
System.out.printf("Listening for messages on %s:%n", subscriptionName.toString());
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
} finally {
if (subscriber != null) {
subscriber.stopAsync();
}
}
}
}代码解释:
注意事项:
通过从同步拉取转向异步流式拉取,可以显著降低 Google Cloud Pub/Sub 的消息传递延迟并提高吞吐量。异步拉取允许客户端并发处理多个请求,从而更有效地利用资源并更快地接收消息。在实施异步拉取时,请务必考虑错误处理、流控和确认机制,以确保应用程序的稳定性和可靠性。
以上就是优化 Google Cloud Pub/Sub 拉取消息延迟:从同步到异步的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号