单个接口承接id+状态变化的一种思路记录
曾几何时,为了完成一个单实例单库的高并发接口开发绞尽脑汁,现在整理一下,仅供参考哈。
1. 核心组件架构
2. 关键实现代码
2.1 状态枚举与版本控制
public class Transaction {private String transactionId;private TransactionStatus status;private long version; // 乐观锁版本号// 状态顺序定义public enum TransactionStatus {STATUS1(1), STATUS2(2), STATUS3(3);private final int order;TransactionStatus(int order) { this.order = order; }public boolean isAfter(TransactionStatus other) {return this.order > other.order;}}
}
2.2 Redis分布式锁 + 本地缓存
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;public class TransactionLockService {private final RedissonClient redisson;private final Cache<String, TransactionStatus> localCache = Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build();// 跟踪当前线程持有的锁(用于防止误释放其他线程的锁)private final ThreadLocal<ConcurrentHashMap<String, RLock>> heldLocks = ThreadLocal.withInitial(ConcurrentHashMap::new);public TransactionLockService(RedissonClient redisson) {this.redisson = redisson;}/*** 尝试获取锁* @return true-获取成功且状态需要处理, false-无需处理或获取失败*/public boolean tryLock(String transactionId, TransactionStatus status) {// 1. 本地缓存快速检查TransactionStatus cached = localCache.getIfPresent(transactionId);if (cached != null && !status.isAfter(cached)) {return false;}// 2. 获取分布式锁RLock lock = redisson.getLock("txn:" + transactionId);try {if (lock.tryLock(100, 3000, TimeUnit.MILLISECONDS)) {heldLocks.get().put(transactionId, lock);return true;}} catch (InterruptedException e) {Thread.currentThread().interrupt();log.warn("锁获取被中断", e);}return false;}/*** 释放指定事务ID的锁*/public void unlock(String transactionId) {ConcurrentHashMap<String, RLock> locks = heldLocks.get();RLock lock = locks.remove(transactionId);if (lock != null && lock.isHeldByCurrentThread()) {try {lock.unlock();} catch (IllegalMonitorStateException e) {// 锁可能已自动过期log.debug("锁释放异常(可能已自动过期)", e);}}}/*** 释放当前线程持有的所有锁*/public void unlockAll() {ConcurrentHashMap<String, RLock> locks = heldLocks.get();locks.forEach((txnId, lock) -> {if (lock.isHeldByCurrentThread()) {try {lock.unlock();} catch (IllegalMonitorStateException e) {log.debug("锁释放异常: {}", txnId, e);}}});locks.clear();}/*** 更新本地缓存状态*/public void updateLocalCache(String transactionId, TransactionStatus status) {localCache.put(transactionId, status);}
}
2.3 数据库操作(乐观锁版)
@Repository
public class TransactionRepository {@Transactionalpublic boolean updateWithOptimisticLock(String transactionId, TransactionStatus newStatus,long expectedVersion) {// 使用CAS操作String sql = """UPDATE transactions SET status = :status, version = version + 1 WHERE transaction_id = :id AND version = :version""";int updated = jdbcTemplate.update(sql, Map.of("status", newStatus.name(), "id", transactionId,"version", expectedVersion));return updated > 0;}public Optional<Transaction> findById(String transactionId) {// 查询时携带版本号String sql = "SELECT *, version FROM transactions WHERE transaction_id = ?";return jdbcTemplate.query(sql, this::mapper, transactionId).stream().findFirst();}
}
2.4 核心处理逻辑(带重试机制)
public class TransactionProcessor {private final TransactionLockService lockService;private final TransactionRepository repository;private final RedisTemplate<String, Object> redisTemplate;@Retryable(maxAttempts = 3, backoff = @Backoff(delay = 100))public void process(String transactionId, TransactionStatus status) {try {// 1. 获取分布式锁if (!lockService.tryLock(transactionId, status)) {log.warn("操作被跳过:状态已更新或锁获取失败");return;}// 2. 查询当前状态(带版本号)Transaction current = repository.findById(transactionId).orElseThrow(() -> new NotFoundException("交易不存在"));// 3. 状态顺序验证if (!status.isAfter(current.getStatus())) {return;}// 4. 乐观锁更新if (!repository.updateWithOptimisticLock(transactionId, status, current.getVersion())) {throw new OptimisticLockingFailureException("版本冲突");}// 5. 更新缓存redisTemplate.opsForValue().set("txn:status:" + transactionId, status, 1, TimeUnit.HOURS);} catch (Exception e) {log.error("处理失败", e);// 6. 记录失败操作auditService.logFailure(transactionId, status, e);throw e; // 触发重试} finally {// 7. 释放锁(通过锁的自动过期保证最终释放)lockService.unlock(transactionId);}}
}
3. 异常处理增强设计
3.1 失败操作审计表
CREATE TABLE transaction_audit (id BIGINT AUTO_INCREMENT,transaction_id VARCHAR(64) NOT NULL,attempted_status VARCHAR(32) NOT NULL,error_message TEXT,created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,PRIMARY KEY (id)
);
3.2 自动重试策略
# Spring配置示例
spring:retry:max-attempts: 3backoff:initial-delay: 100msmultiplier: 2
3.3 监控指标(Micrometer示例)
@Bean
public MeterBinder transactionMetrics(TransactionProcessor processor) {return registry -> {Counter.builder("transaction.processing").tag("result", "success").register(registry);Counter.builder("transaction.conflicts").description("乐观锁冲突次数").register(registry);};
}
4. 性能优化要点
-
多级缓存策略:
- 本地缓存(Caffeine):应对高频重复请求
- Redis缓存:存储最新状态,TTL 1小时
- 数据库:最终一致性保证
-
锁优化:
- 本地快速失败检查 → Redis分布式锁 → 数据库行锁(FOR UPDATE)
- 锁自动过期机制防止死锁
-
批量处理:
@Scheduled(fixedDelay = 5000) public void batchRetryFailedOperations() {List<FailedOperation> failures = auditService.getRecentFailures();failures.forEach(f -> processor.process(f.getTransactionId(), f.getStatus())); }
5. 部署架构建议
QPS2000左右的,足够用了。