
在构建现代分布式应用时,经常需要维护共享状态,例如当前在线用户数、商品库存量等。将这些状态存储在 infinispan 等分布式缓存中是一种常见且高效的做法。然而,当多个客户端同时尝试更新同一份数据时,如果没有适当的同步机制,就可能导致数据不一致。典型的场景是“读-修改-写”操作:客户端a读取当前值,客户端b同时读取当前值,然后a修改并写回,b也修改并写回,最终b的修改可能会覆盖a的修改,导致数据丢失或不准确。
例如,在统计在线用户数的场景中,如果应用程序在用户登录时执行以下操作:
当多个用户几乎同时登录时,上述操作序列将面临严重的并发问题,导致最终的在线用户数低于实际值。为了解决这一挑战,Infinispan 提供了多种强大的同步机制。
核心问题在于“读-修改-写”这一系列操作在分布式环境中并非原子性的。在单线程或单节点应用中,我们可以使用 synchronized 关键字或 java.util.concurrent.atomic 包下的类来保证原子性。但在分布式缓存中,数据可能分布在多个节点上,并且有多个客户端并发访问,传统的本地同步机制无法生效。Infinispan 作为分布式数据网格,通过其内置的特性来解决这类分布式并发问题。
Infinispan 提供了多种机制来确保分布式环境下的数据一致性,尤其适用于并发更新场景。
对于需要简单原子性增量或减量操作的场景,Infinispan 提供了专用的分布式计数器(Counters)。这些计数器是高度优化的,能够保证在分布式环境下的原子性操作,并且具有出色的性能。
工作原理: Infinispan 计数器分为两种:
对于在线用户数统计,通常推荐使用强计数器以确保数据的准确性。
示例代码 (使用 Hot Rod 客户端):
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.counter.api.CounterManager;
import org.infinispan.counter.api.StrongCounter;
import org.infinispan.counter.api.CounterConfiguration;
import org.infinispan.counter.api.CounterType;
public class UserCounterService {
private final StrongCounter onlineUsersCounter;
public UserCounterService(RemoteCacheManager cacheManager) {
CounterManager counterManager = cacheManager.get
CounterManager();
// 尝试获取计数器,如果不存在则创建并配置
// 确保计数器只被创建一次,例如在应用启动时
if (!counterManager.is -->
Defined("onlineUsers")) {
counterManager.defineCounter("onlineUsers",
CounterConfiguration.builder(CounterType.UNBOUNDED_STRONG)
.initialValue(0)
.build());
}
this.onlineUsersCounter = counterManager.getStrongCounter("onlineUsers");
}
/**
* 用户登录时调用,原子性地增加在线用户数
*/
public long userLoggedIn() {
return onlineUsersCounter.incrementAndGet().join(); // .join() 用于等待异步操作完成
}
/**
* 用户登出时调用,原子性地减少在线用户数
*/
public long userLoggedOut() {
return onlineUsersCounter.decrementAndGet().join();
}
/**
* 获取当前在线用户数
*/
public long getCurrentOnlineUsers() {
return onlineUsersCounter.getValue().join();
}
public static void main(String[] args) {
// 假设已经配置并启动了 RemoteCacheManager
// RemoteCacheManager cacheManager = new RemoteCacheManager("127.0.0.1:11222");
// UserCounterService service = new UserCounterService(cacheManager);
// 模拟用户登录
// System.out.println("User logged in, current count: " + service.userLoggedIn());
// System.out.println("User logged in, current count: " + service.userLoggedIn());
// System.out.println("Current online users: " + service.getCurrentOnlineUsers());
// cacheManager.stop();
}
}注意事项:
Infinispan 支持 JTA (Java Transaction API) 事务,允许将一系列缓存操作封装在一个事务中,从而保证这些操作的原子性、一致性、隔离性和持久性 (ACID)。
工作原理: 在一个事务中,所有对缓存的修改只有在事务提交时才会被持久化。如果在事务执行过程中发生错误或冲突,整个事务可以回滚,所有修改都会被撤销,从而保证数据的一致性。
示例代码 (使用 Hot Rod 客户端的事务支持):
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.exceptions.HotRodClientException;
import org.infinispan.client.hotrod.transaction.manager.RemoteTransactionManager;
import jakarta.transaction.SystemException;
import jakarta.transaction.UserTransaction;
public class UserCountWithTransaction {
private final RemoteCache<String, Long> userCountCache;
private final RemoteTransactionManager transactionManager;
public UserCountWithTransaction(RemoteCacheManager cacheManager) {
this.userCountCache = cacheManager.getCache("userCountsCache");
this.transactionManager = RemoteTransactionManager.getInstance(cacheManager);
}
public long incrementOnlineUsers() throws SystemException {
UserTransaction userTransaction = transactionManager.getUserTransaction();
try {
userTransaction.begin(); // 开始事务
Long currentCount = userCountCache.get("onlineUsers");
if (currentCount == null) {
currentCount = 0L;
}
Long newCount = currentCount + 1;
userCountCache.put("onlineUsers", newCount); // 在事务内修改
userTransaction.commit(); // 提交事务
return newCount;
} catch (Exception e) {
if (userTransaction != null) {
userTransaction.rollback(); // 发生异常时回滚事务
}
throw new RuntimeException("Failed to increment online users with transaction", e);
}
}
public long getCurrentOnlineUsers() {
return userCountCache.get("onlineUsers") == null ? 0L : userCountCache.get("onlineUsers");
}
}注意事项:
版本化操作是一种乐观锁的实现方式。Infinispan 允许客户端在执行更新操作时提供一个“版本”或“旧值”,只有当缓存中的当前值与提供的旧值匹配时,更新才会成功。这避免了显式锁定,提高了并发性。
工作原理: 客户端首先读取缓存中的值。在尝试更新时,它会使用一个条件操作,例如 replace(key, oldValue, newValue)。如果缓存中 key 对应的值确实是 oldValue,则将其更新为 newValue;否则,操作失败,表示在读取和尝试更新之间,该值已被其他客户端修改。客户端需要捕获失败并重试。
示例代码 (使用 Hot Rod 客户端的 replace 方法):
import org.infinispan.client.hotrod.RemoteCache;
import org.infinispan.client.hotrod.RemoteCacheManager;
public class UserCountWithVersionedOperations {
private final RemoteCache<String, Long> userCountCache;
public UserCountWithVersionedOperations(RemoteCacheManager cacheManager) {
this.userCountCache = cacheManager.getCache("userCountsCache");
// 确保缓存中有一个初始值
userCountCache.putIfAbsent("onlineUsers", 0L);
}
public long incrementOnlineUsers() {
while (true) { // 循环直到更新成功
Long currentCount = userCountCache.get("onlineUsers");
if (currentCount == null) {
currentCount = 0L;
}
Long newCount = currentCount + 1;
// 尝试原子性地替换旧值
boolean success = userCountCache.replace("onlineUsers", currentCount, newCount);
if (success) {
return newCount;
}
// 如果替换失败,说明在读取 currentCount 后有其他线程修改了值,需要重试
// System.out.println("Concurrent update detected, retrying increment...");
}
}
public long getCurrentOnlineUsers() {
return userCountCache.get("onlineUsers") == null ? 0L : userCountCache.get("onlineUsers");
}
}注意事项:
在 Infinispan 分布式缓存中处理并发更新是确保数据一致性的关键。针对不同的场景,Infinispan 提供了多种有效的同步机制:
分布式计数器 (Infinispan Counters):
事务机制 (Infinispan Transactions):
版本化操作 (Versioned Operations / 乐观锁):
对于本文中提到的在线用户数统计问题,Infinispan 分布式计数器无疑是最佳解决方案。它提供了简单、高效且原子性的操作,完美契合了计数的需求,并且在分布式环境中能够可靠地工作,确保数据在多用户并发登录和登出时的准确性。在实际应用中,应根据具体业务需求和对性能、一致性的权衡来选择最合适的同步机制。
以上就是Infinispan 分布式缓存中的并发更新:用户计数场景的同步策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号