鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)資金流程實(shí)現(xiàn)
鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)資金流程實(shí)現(xiàn)
鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)是數(shù)字貨幣信用卡系統(tǒng)的核心環(huán)節(jié),需要實(shí)時(shí)、準(zhǔn)確地捕捉用戶充值行為并自動(dòng)完成資金歸集。以下是完整實(shí)現(xiàn)方案,包括架構(gòu)設(shè)計(jì)、核心流程和代碼實(shí)現(xiàn)。
一、整體架構(gòu)設(shè)計(jì)
核心組件
- 充值地址生成服務(wù):為每個(gè)用戶生成唯一的充值地址(基于HD錢包派生)
- 區(qū)塊監(jiān)聽服務(wù):實(shí)時(shí)同步區(qū)塊鏈數(shù)據(jù),監(jiān)控指定地址的交易
- 交易解析服務(wù):驗(yàn)證交易合法性,提取關(guān)鍵信息(金額、發(fā)送方等)
- 充值確認(rèn)服務(wù):根據(jù)區(qū)塊鏈確認(rèn)數(shù)判斷交易是否有效
- 資金劃轉(zhuǎn)服務(wù):將到賬資金劃轉(zhuǎn)到平臺(tái)歸集錢包或用戶可用余額
- 通知服務(wù):向用戶推送充值到賬信息
二、詳細(xì)流程設(shè)計(jì)
1. 充值地址生成與關(guān)聯(lián)
- 基于用戶ID和HD錢包路徑,為每個(gè)用戶生成唯一充值地址
- 地址格式:支持多種區(qū)塊鏈(BTC使用bech32,ETH使用十六進(jìn)制等)
- 地址與用戶賬戶一對(duì)一綁定,存入數(shù)據(jù)庫并關(guān)聯(lián)用戶ID
2. 區(qū)塊監(jiān)聽流程
- 連接區(qū)塊鏈全節(jié)點(diǎn)或第三方API(如Infura、Alchemy)
- 從最新區(qū)塊開始監(jiān)聽,同時(shí)回溯檢查歷史區(qū)塊(防止遺漏)
- 對(duì)每個(gè)區(qū)塊,解析所有交易,篩選涉及平臺(tái)充值地址的交易
- 采用增量同步策略,記錄已處理的區(qū)塊高度,避免重復(fù)處理
3. 交易驗(yàn)證與確認(rèn)
- 驗(yàn)證交易是否成功(區(qū)塊確認(rèn)數(shù) >= 系統(tǒng)設(shè)定閾值,如ETH需要12個(gè)確認(rèn))
- 驗(yàn)證交易金額是否滿足最小充值限額
- 檢查交易是否已被處理(防止重復(fù)入賬)
- 計(jì)算實(shí)際到賬金額(扣除區(qū)塊鏈?zhǔn)掷m(xù)費(fèi)后)
4. 資金劃轉(zhuǎn)與記賬
- 定時(shí)/自動(dòng)將到賬資金從用戶充值地址劃轉(zhuǎn)到平臺(tái)歸集錢包(冷錢包)
- 或直接增加用戶賬戶可用余額(信用額度)
- 生成充值記錄和系統(tǒng)內(nèi)轉(zhuǎn)賬記錄,確保賬目清晰
- 觸發(fā)后續(xù)業(yè)務(wù)流程(如自動(dòng)激活卡片、提升額度等)
充值確認(rèn) → 本地事務(wù)(創(chuàng)建充值記錄+寫入消息表) → 定時(shí)任務(wù)發(fā)送消息 → MQ隊(duì)列 → 消費(fèi)消息(更新余額)
↓ ↓ ↓
狀態(tài)跟蹤 失敗重試機(jī)制 消費(fèi)重試+死信
三、核心代碼實(shí)現(xiàn)
1. 充值地址生成服務(wù)
package com.digitalcredit.deposit.service;
import com.digitalcredit.user.entity.User;
import com.digitalcredit.user.service.UserService;
import com.digitalcredit.wallet.entity.DepositAddress;
import com.digitalcredit.wallet.entity.enums.BlockchainType;
import com.digitalcredit.wallet.repository.DepositAddressRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.web3j.crypto.*;
import org.web3j.utils.Numeric;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Service
public class DepositAddressService {
@Autowired
private DepositAddressRepository depositAddressRepository;
@Autowired
private UserService userService;
// 平臺(tái)根錢包(用于派生用戶充值地址)
private final Credentials platformRootCredentials;
// 不同區(qū)塊鏈的HD路徑前綴
private static final Map<BlockchainType, String> BIP44_PATH_PREFIXES = new HashMap<>();
static {
BIP44_PATH_PREFIXES.put(BlockchainType.BITCOIN, "m/44'/0'/0'/0/");
BIP44_PATH_PREFIXES.put(BlockchainType.ETHEREUM, "m/44'/60'/0'/0/");
BIP44_PATH_PREFIXES.put(BlockchainType.BSC, "m/44'/56'/0'/0/");
}
public DepositAddressService(String platformRootPrivateKey) {
// 初始化平臺(tái)根錢包
this.platformRootCredentials = Credentials.create(platformRootPrivateKey);
}
/**
* 為用戶生成指定區(qū)塊鏈的充值地址
*/
@Transactional
public DepositAddress generateDepositAddress(Long userId, BlockchainType blockchainType) {
// 1. 驗(yàn)證用戶
User user = userService.getUserById(userId);
if (user == null) {
throw new IllegalArgumentException("User not found");
}
// 2. 檢查用戶是否已有該鏈的充值地址
Optional<DepositAddress> existingAddress = depositAddressRepository
.findByUserIdAndBlockchainType(userId, blockchainType);
if (existingAddress.isPresent()) {
return existingAddress.get();
}
// 3. 生成用戶唯一索引(可基于用戶ID或自增序列)
long userIndex = generateUserIndex(userId);
// 4. 構(gòu)建BIP44路徑
String path = BIP44_PATH_PREFIXES.get(blockchainType) + userIndex;
// 5. 從根錢包派生出用戶充值地址的私鑰
ECKeyPair userKeyPair = deriveKeyPairFromPath(platformRootCredentials.getEcKeyPair(), path);
String address;
// 6. 根據(jù)不同區(qū)塊鏈生成地址
switch (blockchainType) {
case ETHEREUM:
case BSC:
address = "0x" + Keys.getAddress(userKeyPair);
break;
case BITCOIN:
// 比特幣地址生成邏輯(使用bitcoinj等庫)
address = generateBitcoinAddress(userKeyPair);
break;
default:
throw new UnsupportedOperationException("Unsupported blockchain type");
}
// 7. 保存充值地址(私鑰加密存儲(chǔ))
DepositAddress depositAddress = new DepositAddress();
depositAddress.setUserId(userId);
depositAddress.setBlockchainType(blockchainType);
depositAddress.setAddress(address);
depositAddress.setDerivationPath(path);
// 加密存儲(chǔ)私鑰
depositAddress.setEncryptedPrivateKey(encryptPrivateKey(
Numeric.toHexStringWithPrefix(userKeyPair.getPrivateKey())));
depositAddress.setCreatedAt(System.currentTimeMillis());
return depositAddressRepository.save(depositAddress);
}
/**
* 根據(jù)BIP44路徑從父密鑰派生子密鑰
*/
private ECKeyPair deriveKeyPairFromPath(ECKeyPair parent, String path) {
String[] pathElements = path.split("/");
ECKeyPair currentKeyPair = parent;
for (String element : pathElements) {
if (element.equals("m")) continue;
boolean hardened = element.endsWith("'");
int index = Integer.parseInt(hardened ? element.substring(0, element.length() - 1) : element);
if (hardened) {
index += 0x80000000; // 強(qiáng)化派生索引偏移
}
// 使用BIP32派生算法
currentKeyPair = HDKeyGenerator.deriveChildKey(currentKeyPair, index);
}
return currentKeyPair;
}
/**
* 生成用戶唯一索引
*/
private long generateUserIndex(Long userId) {
// 可以使用userId的哈希或數(shù)據(jù)庫自增序列
return Math.abs(userId.hashCode() % 1000000);
}
/**
* 加密私鑰(生產(chǎn)環(huán)境使用AES-256加密)
*/
private String encryptPrivateKey(String privateKey) {
// 實(shí)際實(shí)現(xiàn)應(yīng)使用安全的加密算法,密鑰存儲(chǔ)在安全的密鑰管理服務(wù)中
return EncryptionUtil.encrypt(privateKey, System.getenv("PRIVATE_KEY_ENCRYPTION_KEY"));
}
/**
* 獲取用戶的充值地址
*/
public String getUserDepositAddress(Long userId, BlockchainType blockchainType) {
DepositAddress address = depositAddressRepository
.findByUserIdAndBlockchainType(userId, blockchainType)
.orElseThrow(() -> new IllegalArgumentException("No deposit address found for user and blockchain type"));
return address.getAddress();
}
/**
* 生成比特幣地址(簡化實(shí)現(xiàn))
*/
private String generateBitcoinAddress(ECKeyPair keyPair) {
// 實(shí)際項(xiàng)目中使用bitcoinj等專業(yè)庫實(shí)現(xiàn)
// 這里僅作示例
return BitcoinAddressGenerator.generateFromKeyPair(keyPair, false);
}
}
2. 區(qū)塊監(jiān)聽與交易處理服務(wù)
package com.digitalcredit.blockchain.service;
import com.digitalcredit.blockchain.config.BlockchainNodeConfig;
import com.digitalcredit.blockchain.entity.ChainTransaction;
import com.digitalcredit.blockchain.entity.enums.TransactionStatus;
import com.digitalcredit.blockchain.repository.ChainTransactionRepository;
import com.digitalcredit.deposit.service.DepositProcessingService;
import com.digitalcredit.wallet.entity.DepositAddress;
import com.digitalcredit.wallet.repository.DepositAddressRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.annotation.Backoff;
import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameter;
import org.web3j.protocol.core.methods.response.EthBlock;
import org.web3j.protocol.core.methods.response.EthBlockNumber;
import org.web3j.protocol.http.HttpService;
import javax.annotation.PostConstruct;
import java.math.BigInteger;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
@Service
public class FaultTolerantBlockchainListener {
private static final Logger logger = LoggerFactory.getLogger(FaultTolerantBlockchainListener.class);
// 每個(gè)鏈的節(jié)點(diǎn)配置
private final Map<String, List<BlockchainNodeConfig>> nodeConfigs = new ConcurrentHashMap<>();
// 當(dāng)前活躍的Web3j客戶端
private final Map<String, Web3j> activeWeb3jClients = new ConcurrentHashMap<>();
// 節(jié)點(diǎn)健康狀態(tài)
private final Map<String, Map<String, Boolean>> nodeHealthStatus = new ConcurrentHashMap<>();
// 平臺(tái)充值地址緩存(小寫)
private final Set<String> platformAddresses = ConcurrentHashMap.newKeySet();
// 區(qū)塊處理線程池
private final ExecutorService blockProcessingExecutor = new ThreadPoolExecutor(
4, 16, 5, TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000),
new ThreadFactory() {
private int counter = 0;
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "block-processor-" + counter++);
thread.setDaemon(true);
return thread;
}
},
new ThreadPoolExecutor.CallerRunsPolicy() // 隊(duì)列滿時(shí)讓提交者執(zhí)行,防止任務(wù)丟失
);
@Autowired
private List<BlockchainNodeConfig> allNodeConfigs;
@Autowired
private DepositAddressRepository depositAddressRepository;
@Autowired
private ChainTransactionRepository transactionRepository;
@Autowired
private DepositProcessingService depositProcessingService;
@Autowired
private BlockchainNodeHealthChecker nodeHealthChecker;
// 上次處理的區(qū)塊高度
private final Map<String, BigInteger> lastProcessedBlock = new ConcurrentHashMap<>();
// 確認(rèn)數(shù)閾值配置
private final Map<String, Integer> confirmationThresholds = new HashMap<>();
{
confirmationThresholds.put("ETH", 12);
confirmationThresholds.put("BSC", 12);
confirmationThresholds.put("BTC", 6);
confirmationThresholds.put("LTC", 6);
}
@PostConstruct
public void initialize() {
// 1. 按鏈類型分組節(jié)點(diǎn)配置
for (BlockchainNodeConfig config : allNodeConfigs) {
nodeConfigs.computeIfAbsent(config.getChainType(), k -> new ArrayList<>())
.add(config);
nodeHealthStatus.computeIfAbsent(config.getChainType(), k -> new ConcurrentHashMap<>())
.put(config.getNodeUrl(), true);
}
// 2. 初始化活躍客戶端
initializeActiveClients();
// 3. 加載充值地址緩存
refreshAddressCache();
// 4. 加載上次處理的區(qū)塊高度
loadLastProcessedBlocks();
// 5. 啟動(dòng)節(jié)點(diǎn)健康檢查
nodeHealthChecker.start();
logger.info("Blockchain listener initialized with {} chains", nodeConfigs.size());
}
/**
* 初始化活躍客戶端(選擇健康的節(jié)點(diǎn))
*/
private void initializeActiveClients() {
for (String chainType : nodeConfigs.keySet()) {
try {
Web3j client = getHealthyWeb3jClient(chainType);
if (client != null) {
activeWeb3jClients.put(chainType, client);
logger.info("Initialized active client for chain: {}", chainType);
}
} catch (Exception e) {
logger.error("Failed to initialize client for chain: {}", chainType, e);
}
}
}
/**
* 獲取健康的Web3j客戶端(帶故障轉(zhuǎn)移)
*/
private Web3j getHealthyWeb3jClient(String chainType) {
List<BlockchainNodeConfig> configs = nodeConfigs.getOrDefault(chainType, Collections.emptyList());
if (configs.isEmpty()) {
logger.warn("No node configs for chain: {}", chainType);
return null;
}
// 按優(yōu)先級(jí)排序,優(yōu)先選擇主節(jié)點(diǎn)
configs.sort(Comparator.comparingInt(BlockchainNodeConfig::getPriority));
// 嘗試連接健康節(jié)點(diǎn)
for (BlockchainNodeConfig config : configs) {
if (nodeHealthStatus.get(chainType).getOrDefault(config.getNodeUrl(), false)) {
try {
Web3j client = Web3j.build(new HttpService(config.getNodeUrl()));
// 測試連接
client.ethBlockNumber().sendAsync().get(5, TimeUnit.SECONDS);
return client;
} catch (Exception e) {
logger.warn("Node {} for chain {} is unhealthy: {}",
config.getNodeUrl(), chainType, e.getMessage());
nodeHealthStatus.get(chainType).put(config.getNodeUrl(), false);
}
}
}
logger.error("No healthy nodes available for chain: {}", chainType);
return null;
}
/**
* 定時(shí)刷新地址緩存(每30分鐘)
*/
@Scheduled(fixedRate = 30 * 60 * 1000)
public void refreshAddressCache() {
try {
long start = System.currentTimeMillis();
List<String> addresses = depositAddressRepository.findAllActive()
.stream()
.map(DepositAddress::getAddress)
.map(String::toLowerCase)
.collect(Collectors.toList());
platformAddresses.clear();
platformAddresses.addAll(addresses);
logger.info("Refreshed deposit addresses cache, count: {}, time: {}ms",
addresses.size(), System.currentTimeMillis() - start);
} catch (Exception e) {
logger.error("Failed to refresh address cache", e);
}
}
/**
* 加載上次處理的區(qū)塊高度
*/
private void loadLastProcessedBlocks() {
for (String chainType : nodeConfigs.keySet()) {
BigInteger lastBlock = transactionRepository.findMaxBlockNumberByChain(chainType)
.orElse(BigInteger.ZERO);
lastProcessedBlock.put(chainType, lastBlock);
logger.info("Loaded last processed block for {}: {}", chainType, lastBlock);
}
}
/**
* 定時(shí)監(jiān)聽新區(qū)塊(每5秒)
*/
@Scheduled(fixedRate = 5000)
public void listenForNewBlocks() {
for (String chainType : new ArrayList<>(nodeConfigs.keySet())) {
try {
processChain(chainType);
} catch (Exception e) {
logger.error("Error processing chain: {}", chainType, e);
// 嘗試切換客戶端
Web3j newClient = getHealthyWeb3jClient(chainType);
if (newClient != null) {
activeWeb3jClients.put(chainType, newClient);
logger.info("Switched to new client for chain: {}", chainType);
}
}
}
}
/**
* 處理單個(gè)鏈的區(qū)塊
*/
private void processChain(String chainType) throws Exception {
Web3j web3j = activeWeb3jClients.get(chainType);
if (web3j == null) {
logger.warn("No active client for chain: {}", chainType);
return;
}
// 獲取最新區(qū)塊高度
EthBlockNumber blockNumberResp = web3j.ethBlockNumber().send();
BigInteger latestBlock = blockNumberResp.getBlockNumber();
// 獲取上次處理的區(qū)塊高度
BigInteger startBlock = lastProcessedBlock.getOrDefault(chainType, BigInteger.ZERO);
// 計(jì)算需要處理的區(qū)塊范圍
if (latestBlock.compareTo(startBlock) <= 0) {
// 沒有新區(qū)塊,檢查確認(rèn)中的交易
checkPendingTransactions(chainType, latestBlock);
return;
}
// 限制每次處理的區(qū)塊數(shù)量,防止過載
BigInteger endBlock = startBlock.add(BigInteger.valueOf(10));
if (endBlock.compareTo(latestBlock) > 0) {
endBlock = latestBlock;
}
logger.info("Processing chain {}: blocks {} to {}", chainType, startBlock, endBlock);
// 并行處理區(qū)塊
for (BigInteger blockNum = startBlock.add(BigInteger.ONE);
blockNum.compareTo(endBlock) <= 0;
blockNum = blockNum.add(BigInteger.ONE)) {
processSingleBlockAsync(chainType, web3j, blockNum);
}
// 更新最后處理的區(qū)塊高度
lastProcessedBlock.put(chainType, endBlock);
}
/**
* 異步處理單個(gè)區(qū)塊
*/
@Async("blockProcessingExecutor")
@Retryable(
value = {Exception.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2)
)
public void processSingleBlockAsync(String chainType, Web3j web3j, BigInteger blockNumber) {
try {
processSingleBlock(chainType, web3j, blockNumber);
} catch (Exception e) {
logger.error("Failed to process block {} on chain {} after retries",
blockNumber, chainType, e);
// 記錄失敗的區(qū)塊,供后續(xù)手動(dòng)處理
transactionRepository.recordFailedBlock(chainType, blockNumber, e.getMessage());
}
}
/**
* 處理單個(gè)區(qū)塊
*/
private void processSingleBlock(String chainType, Web3j web3j, BigInteger blockNumber) throws Exception {
EthBlock block = web3j.ethGetBlockByNumber(
DefaultBlockParameter.valueOf(blockNumber),
true // 包含交易詳情
).send();
if (block.getBlock() == null) {
logger.warn("Block {} not found on chain {}", blockNumber, chainType);
return;
}
// 處理區(qū)塊中的交易
for (EthBlock.TransactionResult<?> txResult : block.getBlock().getTransactions()) {
EthBlock.TransactionObject tx = (EthBlock.TransactionObject) txResult.get();
// 檢查是否是平臺(tái)地址的入金交易
if (tx.getTo() != null && platformAddresses.contains(tx.getTo().toLowerCase())) {
processDepositTransaction(chainType, tx, blockNumber);
}
}
logger.debug("Processed block {} on chain {}, transactions: {}",
blockNumber, chainType, block.getBlock().getTransactions().size());
}
/**
* 處理充值交易
*/
private void processDepositTransaction(String chainType,
EthBlock.TransactionObject tx,
BigInteger blockNumber) {
// 檢查交易是否已處理
if (transactionRepository.existsByTxHashAndChainType(tx.getHash(), chainType)) {
return;
}
// 創(chuàng)建交易記錄
ChainTransaction transaction = new ChainTransaction();
transaction.setTxHash(tx.getHash());
transaction.setChainType(chainType);
transaction.setFromAddress(tx.getFrom());
transaction.setToAddress(tx.getTo());
transaction.setAmount(tx.getValue());
transaction.setBlockNumber(blockNumber);
transaction.setGasPrice(tx.getGasPrice());
transaction.setGasUsed(tx.getGas());
transaction.setInputData(tx.getInput());
transaction.setTimestamp(System.currentTimeMillis());
// 初始狀態(tài):待確認(rèn)
int requiredConfirmations = confirmationThresholds.getOrDefault(chainType, 12);
transaction.setStatus(TransactionStatus.PENDING_CONFIRMATION);
transaction.setRequiredConfirmations(requiredConfirmations);
transaction.setCurrentConfirmations(BigInteger.ZERO);
transactionRepository.save(transaction);
logger.info("Found new deposit transaction {} on chain {}, amount: {}",
tx.getHash(), chainType, tx.getValue());
}
/**
* 檢查待確認(rèn)的交易
*/
private void checkPendingTransactions(String chainType, BigInteger latestBlock) {
try {
List<ChainTransaction> pendingTxs = transactionRepository
.findByChainTypeAndStatus(chainType, TransactionStatus.PENDING_CONFIRMATION);
for (ChainTransaction tx : pendingTxs) {
// 計(jì)算當(dāng)前確認(rèn)數(shù)
BigInteger confirmations = latestBlock.subtract(tx.getBlockNumber());
tx.setCurrentConfirmations(confirmations);
// 檢查是否達(dá)到確認(rèn)閾值
if (confirmations.compareTo(BigInteger.valueOf(tx.getRequiredConfirmations())) >= 0) {
tx.setStatus(TransactionStatus.CONFIRMED);
transactionRepository.save(tx);
// 提交給充值處理服務(wù)
depositProcessingService.processConfirmedDeposit(tx);
} else if (System.currentTimeMillis() - tx.getTimestamp() > 24 * 60 * 60 * 1000) {
// 超過24小時(shí)未確認(rèn),標(biāo)記為失敗
tx.setStatus(TransactionStatus.CONFIRMATION_FAILED);
transactionRepository.save(tx);
logger.warn("Transaction {} on chain {} failed to confirm within 24h",
tx.getTxHash(), chainType);
} else {
transactionRepository.save(tx);
}
}
} catch (Exception e) {
logger.error("Error checking pending transactions for chain {}", chainType, e);
}
}
}
3. 充值處理與資金劃轉(zhuǎn)服務(wù)(RocketMQ)
0). 數(shù)據(jù)庫設(shè)計(jì)
-- 充值記錄表
CREATE TABLE deposit_record (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
tx_hash VARCHAR(66) NOT NULL COMMENT '區(qū)塊鏈交易哈希',
user_id BIGINT NOT NULL COMMENT '用戶ID',
chain_type VARCHAR(20) NOT NULL COMMENT '區(qū)塊鏈類型(ETH/BSC等)',
deposit_address VARCHAR(66) NOT NULL COMMENT '充值地址',
amount DECIMAL(30,18) NOT NULL COMMENT '充值金額',
block_number BIGINT NOT NULL COMMENT '區(qū)塊高度',
status VARCHAR(20) NOT NULL COMMENT '狀態(tài)(PENDING/SUCCESS/FAILED)',
failure_reason TEXT COMMENT '失敗原因',
created_at BIGINT NOT NULL COMMENT '創(chuàng)建時(shí)間戳',
completed_at BIGINT COMMENT '完成時(shí)間戳',
UNIQUE KEY uk_tx_hash (tx_hash),
KEY idx_user_id (user_id),
KEY idx_status_create_time (status, created_at)
) ENGINE=InnoDB COMMENT='充值記錄表';
-- 本地消息表
CREATE TABLE local_message (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
business_key VARCHAR(66) NOT NULL COMMENT '業(yè)務(wù)唯一標(biāo)識(shí)(如交易哈希)',
message_type VARCHAR(20) NOT NULL COMMENT '消息類型(DEPOSIT_ARRIVAL/FUND_COLLECTION)',
topic VARCHAR(50) NOT NULL COMMENT 'MQ主題',
content TEXT NOT NULL COMMENT '消息內(nèi)容(JSON)',
status VARCHAR(20) NOT NULL COMMENT '狀態(tài)(PENDING/SENDING/SENT/FAILED)',
send_count INT NOT NULL DEFAULT 0 COMMENT '發(fā)送次數(shù)',
max_retry_count INT NOT NULL DEFAULT 3 COMMENT '最大重試次數(shù)',
next_send_time BIGINT NOT NULL COMMENT '下次發(fā)送時(shí)間戳',
last_error TEXT COMMENT '最后錯(cuò)誤信息',
created_at BIGINT NOT NULL COMMENT '創(chuàng)建時(shí)間戳',
updated_at BIGINT COMMENT '更新時(shí)間戳',
UNIQUE KEY uk_business_key_type (business_key, message_type),
KEY idx_status_next_send (status, next_send_time)
) ENGINE=InnoDB COMMENT='本地消息表';
-- 用戶賬戶表(簡化)
CREATE TABLE user_account (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
user_id BIGINT NOT NULL UNIQUE COMMENT '用戶ID',
available_balance DECIMAL(30,18) NOT NULL DEFAULT 0 COMMENT '可用余額',
total_balance DECIMAL(30,18) NOT NULL DEFAULT 0 COMMENT '總余額',
updated_at BIGINT NOT NULL COMMENT '更新時(shí)間戳'
) ENGINE=InnoDB COMMENT='用戶賬戶表';
1). 消息與業(yè)務(wù)實(shí)體類
package com.digitalcredit.message.entity;
import lombok.Data;
import javax.persistence.*;
import java.math.BigDecimal;
@Data
@Entity
@Table(name = "local_message")
public class LocalMessage {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "business_key", nullable = false, length = 66)
private String businessKey; // 交易哈希
@Column(name = "message_type", nullable = false, length = 20)
@Enumerated(EnumType.STRING)
private MessageType messageType; // 消息類型
@Column(name = "topic", nullable = false, length = 50)
private String topic; // MQ主題
@Column(name = "content", nullable = false, columnDefinition = "TEXT")
private String content; // 消息內(nèi)容(JSON)
@Column(name = "status", nullable = false, length = 20)
@Enumerated(EnumType.STRING)
private MessageStatus status; // 消息狀態(tài)
@Column(name = "send_count", nullable = false)
private Integer sendCount = 0; // 發(fā)送次數(shù)
@Column(name = "max_retry_count", nullable = false)
private Integer maxRetryCount = 3; // 最大重試次數(shù)
@Column(name = "next_send_time", nullable = false)
private Long nextSendTime; // 下次發(fā)送時(shí)間戳
@Column(name = "last_error", columnDefinition = "TEXT")
private String lastError; // 最后錯(cuò)誤信息
@Column(name = "created_at", nullable = false)
private Long createdAt; // 創(chuàng)建時(shí)間戳
@Column(name = "updated_at")
private Long updatedAt; // 更新時(shí)間戳
// 消息類型枚舉
public enum MessageType {
DEPOSIT_ARRIVAL, // 資金到賬
FUND_COLLECTION // 資金歸集
}
// 消息狀態(tài)枚舉
public enum MessageStatus {
PENDING, // 待發(fā)送
SENDING, // 發(fā)送中
SENT, // 已發(fā)送
FAILED // 發(fā)送失敗
}
}
package com.digitalcredit.deposit.entity;
import lombok.Data;
import javax.persistence.*;
import java.math.BigDecimal;
@Data
@Entity
@Table(name = "deposit_record")
public class DepositRecord {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "tx_hash", nullable = false, length = 66, unique = true)
private String txHash; // 區(qū)塊鏈交易哈希
@Column(name = "user_id", nullable = false)
private Long userId; // 用戶ID
@Column(name = "chain_type", nullable = false, length = 20)
private String chainType; // 區(qū)塊鏈類型
@Column(name = "deposit_address", nullable = false, length = 66)
private String depositAddress; // 充值地址
@Column(name = "amount", nullable = false, precision = 30, scale = 18)
private BigDecimal amount; // 充值金額
@Column(name = "block_number", nullable = false)
private Long blockNumber; // 區(qū)塊高度
@Column(name = "status", nullable = false, length = 20)
@Enumerated(EnumType.STRING)
private DepositStatus status; // 狀態(tài)
@Column(name = "failure_reason", columnDefinition = "TEXT")
private String failureReason; // 失敗原因
@Column(name = "created_at", nullable = false)
private Long createdAt; // 創(chuàng)建時(shí)間戳
@Column(name = "completed_at")
private Long completedAt; // 完成時(shí)間戳
// 充值狀態(tài)枚舉
public enum DepositStatus {
PENDING, // 待處理
SUCCESS, // 成功
FAILED // 失敗
}
}
2). 資金到賬消息消費(fèi)者
package com.digitalcredit.mq.consumer;
import com.alibaba.fastjson.JSON;
import com.digitalcredit.account.service.AccountService;
import com.digitalcredit.deposit.entity.DepositRecord;
import com.digitalcredit.deposit.entity.enums.DepositStatus;
import com.digitalcredit.deposit.repository.DepositRecordRepository;
import com.digitalcredit.mq.message.DepositMessage;
import org.apache.rocketmq.client.consumer.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
@Component
public class DepositMessageListener implements MessageListenerConcurrently {
private static final Logger logger = LoggerFactory.getLogger(DepositMessageListener.class);
@Autowired
private AccountService accountService;
@Autowired
private DepositRecordRepository depositRecordRepository;
/**
* 處理資金劃轉(zhuǎn)消息
*/
@Override
@Transactional
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String txHash = msg.getKeys();
logger.info("開始處理資金劃轉(zhuǎn)消息,txHash: {}, 重試次數(shù): {}", txHash, msg.getReconsumeTimes());
// 1. 解析消息
DepositMessage message = JSON.parseObject(
new String(msg.getBody(), "UTF-8"),
DepositMessage.class
);
// 2. 防重復(fù)處理(檢查是否已處理)
Optional<DepositRecord> recordOpt = depositRecordRepository.findByTxHash(txHash);
if (recordOpt.isEmpty()) {
logger.error("未找到對(duì)應(yīng)的充值記錄,txHash: {}", txHash);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
DepositRecord record = recordOpt.get();
if (record.getStatus() == DepositStatus.SUCCESS) {
logger.info("消息已處理,無需重復(fù)處理,txHash: {}", txHash);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// 3. 執(zhí)行資金劃轉(zhuǎn)(增加用戶余額)
accountService.increaseBalance(
message.getUserId(),
message.getAmount(),
"DEPOSIT",
txHash
);
// 4. 更新充值記錄狀態(tài)
record.setStatus(DepositStatus.SUCCESS);
record.setCompletedAt(System.currentTimeMillis());
depositRecordRepository.save(record);
logger.info("資金劃轉(zhuǎn)處理成功,txHash: {}", txHash);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
logger.error("資金劃轉(zhuǎn)處理失敗,msgId: {}", msg.getMsgId(), e);
// 判斷是否達(dá)到最大重試次數(shù)
if (msg.getReconsumeTimes() >= 5) {
// 記錄失敗原因,后續(xù)人工處理
String txHash = msg.getKeys();
updateDepositRecordToFailed(txHash, e.getMessage());
logger.warn("消息達(dá)到最大重試次數(shù),將進(jìn)入死信隊(duì)列,txHash: {}", txHash);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 不再重試,進(jìn)入死信隊(duì)列
}
// 未達(dá)最大重試次數(shù),返回重試
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
/**
* 更新充值記錄為失敗狀態(tài)
*/
private void updateDepositRecordToFailed(String txHash, String reason) {
try {
Optional<DepositRecord> recordOpt = depositRecordRepository.findByTxHash(txHash);
if (recordOpt.isPresent()) {
DepositRecord record = recordOpt.get();
record.setStatus(DepositStatus.FAILED);
record.setFailureReason(reason);
record.setCompletedAt(System.currentTimeMillis());
depositRecordRepository.save(record);
}
} catch (Exception e) {
logger.error("更新充值記錄為失敗狀態(tài)失敗,txHash: {}", txHash, e);
}
}
}
四、關(guān)鍵技術(shù)點(diǎn)說明
1. 區(qū)塊鏈節(jié)點(diǎn)連接策略
- 主備節(jié)點(diǎn)機(jī)制:同時(shí)連接多個(gè)節(jié)點(diǎn),當(dāng)主節(jié)點(diǎn)故障時(shí)自動(dòng)切換到備用節(jié)點(diǎn)
- 連接池管理:維護(hù)長期連接,減少握手開銷
- 超時(shí)重試:設(shè)置合理的超時(shí)時(shí)間和重試策略,確保網(wǎng)絡(luò)波動(dòng)時(shí)的穩(wěn)定性
2. 交易去重與冪等性保障
- 使用交易哈希作為唯一標(biāo)識(shí),確保每筆交易只處理一次
- 數(shù)據(jù)庫唯一索引:在充值記錄表中對(duì)交易哈希和鏈類型創(chuàng)建唯一索引
- 狀態(tài)機(jī)設(shè)計(jì):明確的狀態(tài)流轉(zhuǎn)(PENDING → CONFIRMED/INVALID/ERROR),避免重復(fù)處理
3. 性能優(yōu)化措施
- 充值地址緩存:將平臺(tái)所有充值地址加載到內(nèi)存,減少數(shù)據(jù)庫查詢
- 批量處理:區(qū)塊處理采用批量方式,提高效率
- 異步處理:交易解析和資金劃轉(zhuǎn)采用異步方式,避免阻塞監(jiān)聽線程
- 分區(qū)表:充值記錄表按時(shí)間分區(qū),提高查詢效率
4. 容錯(cuò)與恢復(fù)機(jī)制
- 斷點(diǎn)續(xù)傳:記錄已處理的區(qū)塊高度,服務(wù)重啟后從斷點(diǎn)繼續(xù)處理
- 重試隊(duì)列:處理失敗的交易加入重試隊(duì)列,定時(shí)重試
- 監(jiān)控告警:關(guān)鍵指標(biāo)(區(qū)塊同步延遲、未確認(rèn)充值數(shù)量)監(jiān)控和告警
- 手動(dòng)干預(yù)接口:提供手動(dòng)處理異常充值的接口
五、安全措施
-
私鑰安全
- 充值地址私鑰加密存儲(chǔ)(AES-256)
- 加密密鑰通過KMS(密鑰管理服務(wù))管理
- 敏感操作(如資金劃轉(zhuǎn))需要多重簽名
-
交易驗(yàn)證
- 多重驗(yàn)證:不僅驗(yàn)證地址,還驗(yàn)證金額和交易狀態(tài)
- 防重放攻擊:檢查交易是否屬于當(dāng)前鏈
-
異常監(jiān)控
- 大額充值預(yù)警:超過閾值的充值觸發(fā)人工審核
- 異常地址監(jiān)控:對(duì)黑名單地址的轉(zhuǎn)賬進(jìn)行攔截
- 頻率限制:監(jiān)控同一地址的頻繁轉(zhuǎn)賬行為
六、總結(jié)
鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)系統(tǒng)需要兼顧實(shí)時(shí)性、準(zhǔn)確性和安全性。通過合理的架構(gòu)設(shè)計(jì)和技術(shù)選型,可以實(shí)現(xiàn)高效、可靠的充值處理流程。
核心要點(diǎn):
- 采用HD錢包技術(shù)為每個(gè)用戶生成唯一充值地址
- 實(shí)時(shí)區(qū)塊監(jiān)聽與交易解析,確保不遺漏任何充值
- 基于區(qū)塊確認(rèn)數(shù)的交易有效性驗(yàn)證機(jī)制
- 完善的異常處理和容錯(cuò)恢復(fù)機(jī)制
- 嚴(yán)格的安全措施保護(hù)用戶資金安全
實(shí)際部署時(shí),應(yīng)根據(jù)支持的區(qū)塊鏈類型和用戶規(guī)模進(jìn)行水平擴(kuò)展,確保系統(tǒng)在高并發(fā)場景下的穩(wěn)定性和處理能力。
本文來自博客園,作者:ffffox,轉(zhuǎn)載請(qǐng)注明原文鏈接:http://www.rzrgm.cn/ffffox/p/19009394

浙公網(wǎng)安備 33010602011771號(hào)