U卡客戶入金地址池方案:事件驅動與補償機制的可靠性設計
U卡客戶入金地址池方案:基于鏈+幣種維度的全鏈路優化實踐(Card項目落地)
代碼部分僅為簡易實現用于提高技術實現細節的理解,具體使用場景需要額外的優化 比如: 分配記錄ID需要使用分布式ID生成、 分布式redis鎖一般都是通過切面實現、 業務邏輯抽離盡可能封裝多態、 事務與分布式鎖的配合使用避免事務失效、 超時重試機制、 極端場景的處理考慮······
針對用戶地址申請校驗、資源高效復用及動態擴容需求,設計以下方案,覆蓋地址申請、使用監控、回收全流程。
一、方案背景與核心優化目標
在Card數字貨幣借記卡系統中,客戶入金需通過臨時地址完成鏈上轉賬,初期存在地址資源濫用(用戶重復申請)、回收不及時(閑置地址占用資源)、高并發下地址枯竭等問題。基于"鏈+幣種"維度的優化目標為:
- 同鏈+幣種下,用戶僅保留1筆未完成分配記錄,杜絕重復申請;
- 地址分配后30分鐘未使用自動回收,回收率100%;
- 空閑地址低于閾值時動態擴容,支撐日活20w+用戶峰值需求。
二、整體架構設計:三維度協同機制
架構以"鏈+幣種"為核心維度,整合申請校驗、延時監控、動態擴容能力,分正向流程(地址申請-使用-入賬)、逆向流程(地址回收-狀態重置)、補償流程(異常處理-資源兜底)三部分。
2.1 正向流程:地址申請與交易處理鏈路
graph TD
subgraph 用戶層
A[用戶發起入金申請] --> B[指定鏈+幣種(如ETH-USDT)]
end
subgraph 申請校驗層
B --> C[維度校驗服務]
C --> D[查詢用戶未完成分配記錄(Redis哈希)]
D -->|同維度存在未完成記錄| E[返回已有地址]
D -->|同維度無記錄| F[回收用戶不同維度未使用地址]
F --> G[生成新分配記錄]
end
subgraph 地址池管理層
H[按鏈+幣種劃分的Redis空閑隊列] -->|如address:idle:eth:usdt| I[分配地址]
I --> J[發送30分鐘延時MQ(監控使用狀態)]
G --> I
end
subgraph 交易處理層
K[TokenView地址監控] -->|推送入賬交易| L[交易校驗(地址在池內+有效)]
L --> M[區塊事件監聽(更新確認數)]
M -->|確認數達標| N[TokenView API最終校驗]
N --> O[資金入賬+標記地址使用完成]
end
2.2 逆向流程:地址回收與狀態重置
graph TD
subgraph 延時監控觸發
A[30分鐘延時MQ到期] --> B[校驗地址使用狀態]
B -->|未產生交易| C[回收地址至空閑隊列]
B -->|已產生交易| D[標記分配記錄為'已使用']
end
subgraph 異常回收觸發
E[用戶取消申請] --> C
F[交易回滾/失敗] --> C
end
subgraph 狀態同步
C --> G[更新地址池DB狀態(空閑)]
C --> H[刪除用戶未完成分配記錄]
D --> I[更新分配記錄狀態]
end
2.3 補償流程:動態擴容與漏單處理
graph TD
subgraph 動態擴容機制
A[監控空閑隊列長度] -->|低于20%閾值| B[觸發擴容任務]
B --> C[多節點并行生成新地址(HD錢包BIP-32)]
C --> D[補充至對應鏈+幣種空閑隊列]
end
subgraph 漏單補償機制
E[定時任務(每小時)] --> F[TokenView批量查詢地址交易]
F -->|發現未處理交易| G[補入交易處理鏈路]
end
三、核心模塊實現細節
3.1 地址申請邏輯校驗(鏈+幣種維度管控)
基于Redis哈希存儲用戶未完成分配記錄,確保同維度唯一、不同維度及時釋放。
@Service
public class AddressApplyService {
// Redis鍵:用戶未完成分配記錄(hash結構:key=user:alloc:{userId},field=chain:token,value=allocationId)
private static final String USER_ALLOC_HASH_KEY = "user:alloc:%s";
// 鏈+幣種分隔符
private static final String DIMENSION_SEP = ":";
@Autowired
private RedissonClient redissonClient;
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private AddressPoolService addressPoolService;
@Autowired
private RocketMQTemplate rocketMQTemplate;
/**
* 用戶申請地址(核心校驗邏輯)
*todo 前端交互上優化:提示用戶有未使用的地址,是否取消上筆入金申請
*/
public AddressVO applyAddress(String userId, String chain, String token) {
String dimension = chain + DIMENSION_SEP + token;
String userAllocKey = String.format(USER_ALLOC_HASH_KEY, userId);
// 1. 加分布式鎖,防止并發申請導致狀態不一致
RLock lock = redissonClient.getLock("lock:apply:" + userId + ":" + dimension);
lock.lock(10, TimeUnit.SECONDS);
try {
// 2. 校驗同維度是否有未完成記錄
String existingAllocId = redisTemplate.opsForHash().get(userAllocKey, dimension).toString();
if (StringUtils.isNotBlank(existingAllocId)) {
// 同維度存在未完成記錄,返回已有地址
return addressPoolService.getAddressByAllocId(existingAllocId);
}
// 3. 回收用戶不同維度的未使用地址(釋放資源)
Object dim = redisTemplate.opsForHash().get(userAllocKey);
String otherDimension = dim.toString();
if (!otherDimension.equals(dimension)) {
String allocId = redisTemplate.opsForHash().get(userAllocKey, otherDimension).toString();
addressPoolService.recycleUnusedAddress(allocId); // 回收未使用地址
redisTemplate.opsForHash().delete(userAllocKey, otherDimension); // 刪除記錄
}
// 4. 從對應鏈+幣種的空閑隊列分配地址
String address = addressPoolService.allocateFromIdleQueue(chain, token);
// 5. 生成分配記錄
String allocId = UUID.randomUUID().toString();
addressPoolService.saveAllocationRecord(allocId, userId, address, chain, token, 30); // 30分鐘有效期
// 6. 記錄用戶-維度-分配記錄映射
redisTemplate.opsForHash().put(userAllocKey, dimension, allocId);
// 7. 發送30分鐘延時MQ,監控地址使用狀態
sendDelayMonitorMsg(allocId, 30);
return new AddressVO(address, LocalDateTime.now().plusMinutes(30), chain, token);
} finally {
lock.unlock();
}
}
/**
* 發送延時監控消息
*/
private void sendDelayMonitorMsg(String allocId, int delayMinutes) {
MonitorMsg msg = new MonitorMsg(allocId);
// RocketMQ延時級別:LEVEL_18=30分鐘(適配業務需求)
rocketMQTemplate.syncSend(
"topic:address:monitor",
MessageBuilder.withPayload(msg).build(),
3000,
18
);
}
}
3.2 地址池管理(按鏈+幣種分隊列+動態擴容)
按"鏈+幣種"維度劃分Redis空閑隊列,結合動態擴容確保地址供應。
@Service
public class AddressPoolService {
// 空閑地址隊列鍵:address:idle:{chain}:{token}
private static final String IDLE_QUEUE_KEY = "address:idle:%s:%s";
// 地址池容量閾值(低于20%觸發擴容)
private static final double EXPAND_THRESHOLD = 0.2;
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private AddressMapper addressMapper;
@Autowired
private HDWalletService hdWalletService; // 基于BIP-32的HD錢包服務
/**
* 從指定鏈+幣種的空閑隊列分配地址
*/
public String allocateFromIdleQueue(String chain, String token) {
String queueKey = String.format(IDLE_QUEUE_KEY, chain, token);
// 1. 從Redis隊列獲取地址(右彈出,FIFO機制)
String address = redisTemplate.opsForList().rightPop(queueKey, 5, TimeUnit.SECONDS);
if (address == null) {
throw new BusinessException("當前鏈+幣種地址池繁忙,請稍后再試");
}
// 2. 檢查剩余空閑地址是否低于閾值,觸發擴容
Long remaining = redisTemplate.opsForList().size(queueKey);
Long total = addressMapper.countTotalByChainAndToken(chain, token);
if (remaining * 1.0 / total < EXPAND_THRESHOLD) {
expandAddressPool(chain, token, total.intValue() / 2); // 擴容當前容量的50%
}
return address;
}
/**
* 動態擴容地址池
*/
@Async // 異步擴容,不阻塞主流程
public void expandAddressPool(String chain, String token, int count) {
log.info("開始擴容地址池:{}:{},需新增{}個地址", chain, token, count);
// 1. 生成新地址(基于HD錢包,確保私鑰安全)
List<String> newAddresses = hdWalletService.generateAddresses(chain, count);
// 2. 批量入庫(狀態:0-空閑)
addressMapper.batchInsert(newAddresses.stream().map(addr ->
AddressDO.builder()
.address(addr)
.chain(chain)
.token(token)
.status(0)
.build()
).collect(Collectors.toList()));
// 3. 補充至Redis空閑隊列
String queueKey = String.format(IDLE_QUEUE_KEY, chain, token);
redisTemplate.opsForList().leftPushAll(queueKey, newAddresses);
log.info("地址池擴容完成:{}:{},新增{}個地址", chain, token, newAddresses.size());
}
/**
* 回收未使用的地址
*/
public void recycleUnusedAddress(String allocId) {
AllocationRecordDO record = allocationMapper.selectById(allocId);
if (record == null || record.getStatus() != 1) { // 狀態1-未使用
return;
}
// 1. 更新地址狀態為空閑
addressMapper.updateStatus(record.getAddress(), 0);
// 2. 放回對應鏈+幣種的空閑隊列
String queueKey = String.format(IDLE_QUEUE_KEY, record.getChain(), record.getToken());
redisTemplate.opsForList().leftPush(queueKey, record.getAddress());
// 3. 更新分配記錄狀態為"已回收"
allocationMapper.updateStatus(allocId, 3); // 狀態3-已回收
}
}
3.3 延時監控與交易處理閉環
通過延時MQ監控地址使用狀態,結合區塊事件驅動確認數更新,確保交易有效后入賬。
// 1. 延時監控消息消費(檢查地址是否使用)
@Service
@RocketMQMessageListener(topic = "topic:address:monitor", consumerGroup = "monitor_consumer")
public class AddressMonitorConsumer implements RocketMQListener<MonitorMsg> {
@Autowired
private AllocationRecordMapper allocationMapper;
@Autowired
private AddressPoolService addressPoolService;
@Override
public void onMessage(MonitorMsg msg) {
String allocId = msg.getAllocId();
AllocationRecordDO record = allocationMapper.selectById(allocId);
if (record == null) {
return;
}
// 地址分配后30分鐘內未產生交易,回收地址
if (record.getStatus() == 1 && StringUtils.isBlank(record.getTxHash())) {
addressPoolService.recycleUnusedAddress(allocId);
log.info("地址{}未使用,已回收(分配記錄:{})", record.getAddress(), allocId);
}
}
}
// 2. 區塊事件驅動的確認數更新(確保交易不可逆)
@Service
public class BlockEventService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private TransactionService transactionService;
// 監聽新區塊事件,更新確認數
public void handleNewBlock(BlockEvent event) {
String chain = event.getChain();
long latestHeight = event.getHeight();
// 緩存最新區塊高度(1分鐘過期)
redisTemplate.opsForValue().set("block:latest:" + chain, latestHeight, 1, TimeUnit.MINUTES);
// 觸發確認數檢查(僅處理接近閾值的交易)
transactionService.checkNearThresholdTransactions(chain, latestHeight);
}
}
四、方案成果與技術亮點
- 資源利用率提升:同維度地址申請限制使重復申請率從35%降至0,不同維度自動回收使地址復用率提升60%;
- 時效性優化:結合"延時MQ監控+區塊事件驅動",地址回收延遲從2小時縮短至30分鐘,確認數更新時效<10秒;
- 彈性支撐能力:動態擴容機制確保高并發場景下地址池無枯竭,支撐地址申請峰值;
- 金融級可靠性:通過分布式鎖、冪等設計及TokenView API二次校驗,實現資金入賬零誤差,符合Card項目"線上0資損"目標。
本文來自博客園,作者:ffffox,轉載請注明原文鏈接:http://www.rzrgm.cn/ffffox/p/19040011

浙公網安備 33010602011771號