<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)資金流程實(shí)現(xiàn)

      鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)資金流程實(shí)現(xiàn)

      鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)是數(shù)字貨幣信用卡系統(tǒng)的核心環(huán)節(jié),需要實(shí)時(shí)、準(zhǔn)確地捕捉用戶充值行為并自動(dòng)完成資金歸集。以下是完整實(shí)現(xiàn)方案,包括架構(gòu)設(shè)計(jì)、核心流程和代碼實(shí)現(xiàn)。

      一、整體架構(gòu)設(shè)計(jì)

      鏈上充值流程

      核心組件

      1. 充值地址生成服務(wù):為每個(gè)用戶生成唯一的充值地址(基于HD錢包派生)
      2. 區(qū)塊監(jiān)聽服務(wù):實(shí)時(shí)同步區(qū)塊鏈數(shù)據(jù),監(jiān)控指定地址的交易
      3. 交易解析服務(wù):驗(yàn)證交易合法性,提取關(guān)鍵信息(金額、發(fā)送方等)
      4. 充值確認(rèn)服務(wù):根據(jù)區(qū)塊鏈確認(rèn)數(shù)判斷交易是否有效
      5. 資金劃轉(zhuǎn)服務(wù):將到賬資金劃轉(zhuǎn)到平臺(tái)歸集錢包或用戶可用余額
      6. 通知服務(wù):向用戶推送充值到賬信息

      二、詳細(xì)流程設(shè)計(jì)

      1. 充值地址生成與關(guān)聯(lián)

      • 基于用戶ID和HD錢包路徑,為每個(gè)用戶生成唯一充值地址
      • 地址格式:支持多種區(qū)塊鏈(BTC使用bech32,ETH使用十六進(jìn)制等)
      • 地址與用戶賬戶一對(duì)一綁定,存入數(shù)據(jù)庫并關(guān)聯(lián)用戶ID

      2. 區(qū)塊監(jiān)聽流程

      • 連接區(qū)塊鏈全節(jié)點(diǎn)或第三方API(如Infura、Alchemy)
      • 從最新區(qū)塊開始監(jiān)聽,同時(shí)回溯檢查歷史區(qū)塊(防止遺漏)
      • 對(duì)每個(gè)區(qū)塊,解析所有交易,篩選涉及平臺(tái)充值地址的交易
      • 采用增量同步策略,記錄已處理的區(qū)塊高度,避免重復(fù)處理

      3. 交易驗(yàn)證與確認(rèn)

      • 驗(yàn)證交易是否成功(區(qū)塊確認(rèn)數(shù) >= 系統(tǒng)設(shè)定閾值,如ETH需要12個(gè)確認(rèn))
      • 驗(yàn)證交易金額是否滿足最小充值限額
      • 檢查交易是否已被處理(防止重復(fù)入賬)
      • 計(jì)算實(shí)際到賬金額(扣除區(qū)塊鏈?zhǔn)掷m(xù)費(fèi)后)

      4. 資金劃轉(zhuǎn)與記賬

      • 定時(shí)/自動(dòng)將到賬資金從用戶充值地址劃轉(zhuǎn)到平臺(tái)歸集錢包(冷錢包)
      • 或直接增加用戶賬戶可用余額(信用額度)
      • 生成充值記錄和系統(tǒng)內(nèi)轉(zhuǎn)賬記錄,確保賬目清晰
      • 觸發(fā)后續(xù)業(yè)務(wù)流程(如自動(dòng)激活卡片、提升額度等)
      充值確認(rèn) → 本地事務(wù)(創(chuàng)建充值記錄+寫入消息表) → 定時(shí)任務(wù)發(fā)送消息 → MQ隊(duì)列 → 消費(fèi)消息(更新余額)
                                        ↓                ↓                ↓
                                     狀態(tài)跟蹤        失敗重試機(jī)制        消費(fèi)重試+死信
      

      三、核心代碼實(shí)現(xiàn)

      1. 充值地址生成服務(wù)

      package com.digitalcredit.deposit.service;
      
      import com.digitalcredit.user.entity.User;
      import com.digitalcredit.user.service.UserService;
      import com.digitalcredit.wallet.entity.DepositAddress;
      import com.digitalcredit.wallet.entity.enums.BlockchainType;
      import com.digitalcredit.wallet.repository.DepositAddressRepository;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Service;
      import org.springframework.transaction.annotation.Transactional;
      import org.web3j.crypto.*;
      import org.web3j.utils.Numeric;
      
      import java.util.HashMap;
      import java.util.Map;
      import java.util.Optional;
      
      @Service
      public class DepositAddressService {
      
          @Autowired
          private DepositAddressRepository depositAddressRepository;
          
          @Autowired
          private UserService userService;
          
          // 平臺(tái)根錢包(用于派生用戶充值地址)
          private final Credentials platformRootCredentials;
          
          // 不同區(qū)塊鏈的HD路徑前綴
          private static final Map<BlockchainType, String> BIP44_PATH_PREFIXES = new HashMap<>();
          static {
              BIP44_PATH_PREFIXES.put(BlockchainType.BITCOIN, "m/44'/0'/0'/0/");
              BIP44_PATH_PREFIXES.put(BlockchainType.ETHEREUM, "m/44'/60'/0'/0/");
              BIP44_PATH_PREFIXES.put(BlockchainType.BSC, "m/44'/56'/0'/0/");
          }
          
          public DepositAddressService(String platformRootPrivateKey) {
              // 初始化平臺(tái)根錢包
              this.platformRootCredentials = Credentials.create(platformRootPrivateKey);
          }
          
          /**
           * 為用戶生成指定區(qū)塊鏈的充值地址
           */
          @Transactional
          public DepositAddress generateDepositAddress(Long userId, BlockchainType blockchainType) {
              // 1. 驗(yàn)證用戶
              User user = userService.getUserById(userId);
              if (user == null) {
                  throw new IllegalArgumentException("User not found");
              }
              
              // 2. 檢查用戶是否已有該鏈的充值地址
              Optional<DepositAddress> existingAddress = depositAddressRepository
                  .findByUserIdAndBlockchainType(userId, blockchainType);
              
              if (existingAddress.isPresent()) {
                  return existingAddress.get();
              }
              
              // 3. 生成用戶唯一索引(可基于用戶ID或自增序列)
              long userIndex = generateUserIndex(userId);
              
              // 4. 構(gòu)建BIP44路徑
              String path = BIP44_PATH_PREFIXES.get(blockchainType) + userIndex;
              
              // 5. 從根錢包派生出用戶充值地址的私鑰
              ECKeyPair userKeyPair = deriveKeyPairFromPath(platformRootCredentials.getEcKeyPair(), path);
              String address;
              
              // 6. 根據(jù)不同區(qū)塊鏈生成地址
              switch (blockchainType) {
                  case ETHEREUM:
                  case BSC:
                      address = "0x" + Keys.getAddress(userKeyPair);
                      break;
                  case BITCOIN:
                      // 比特幣地址生成邏輯(使用bitcoinj等庫)
                      address = generateBitcoinAddress(userKeyPair);
                      break;
                  default:
                      throw new UnsupportedOperationException("Unsupported blockchain type");
              }
              
              // 7. 保存充值地址(私鑰加密存儲(chǔ))
              DepositAddress depositAddress = new DepositAddress();
              depositAddress.setUserId(userId);
              depositAddress.setBlockchainType(blockchainType);
              depositAddress.setAddress(address);
              depositAddress.setDerivationPath(path);
              // 加密存儲(chǔ)私鑰
              depositAddress.setEncryptedPrivateKey(encryptPrivateKey(
                  Numeric.toHexStringWithPrefix(userKeyPair.getPrivateKey())));
              depositAddress.setCreatedAt(System.currentTimeMillis());
              
              return depositAddressRepository.save(depositAddress);
          }
          
          /**
           * 根據(jù)BIP44路徑從父密鑰派生子密鑰
           */
          private ECKeyPair deriveKeyPairFromPath(ECKeyPair parent, String path) {
              String[] pathElements = path.split("/");
              ECKeyPair currentKeyPair = parent;
              
              for (String element : pathElements) {
                  if (element.equals("m")) continue;
                  
                  boolean hardened = element.endsWith("'");
                  int index = Integer.parseInt(hardened ? element.substring(0, element.length() - 1) : element);
                  
                  if (hardened) {
                      index += 0x80000000; // 強(qiáng)化派生索引偏移
                  }
                  
                  // 使用BIP32派生算法
                  currentKeyPair = HDKeyGenerator.deriveChildKey(currentKeyPair, index);
              }
              
              return currentKeyPair;
          }
          
          /**
           * 生成用戶唯一索引
           */
          private long generateUserIndex(Long userId) {
              // 可以使用userId的哈希或數(shù)據(jù)庫自增序列
              return Math.abs(userId.hashCode() % 1000000);
          }
          
          /**
           * 加密私鑰(生產(chǎn)環(huán)境使用AES-256加密)
           */
          private String encryptPrivateKey(String privateKey) {
              // 實(shí)際實(shí)現(xiàn)應(yīng)使用安全的加密算法,密鑰存儲(chǔ)在安全的密鑰管理服務(wù)中
              return EncryptionUtil.encrypt(privateKey, System.getenv("PRIVATE_KEY_ENCRYPTION_KEY"));
          }
          
          /**
           * 獲取用戶的充值地址
           */
          public String getUserDepositAddress(Long userId, BlockchainType blockchainType) {
              DepositAddress address = depositAddressRepository
                  .findByUserIdAndBlockchainType(userId, blockchainType)
                  .orElseThrow(() -> new IllegalArgumentException("No deposit address found for user and blockchain type"));
              
              return address.getAddress();
          }
          
          /**
           * 生成比特幣地址(簡化實(shí)現(xiàn))
           */
          private String generateBitcoinAddress(ECKeyPair keyPair) {
              // 實(shí)際項(xiàng)目中使用bitcoinj等專業(yè)庫實(shí)現(xiàn)
              // 這里僅作示例
              return BitcoinAddressGenerator.generateFromKeyPair(keyPair, false);
          }
      }
      

      2. 區(qū)塊監(jiān)聽與交易處理服務(wù)

      package com.digitalcredit.blockchain.service;
      
      import com.digitalcredit.blockchain.config.BlockchainNodeConfig;
      import com.digitalcredit.blockchain.entity.ChainTransaction;
      import com.digitalcredit.blockchain.entity.enums.TransactionStatus;
      import com.digitalcredit.blockchain.repository.ChainTransactionRepository;
      import com.digitalcredit.deposit.service.DepositProcessingService;
      import com.digitalcredit.wallet.entity.DepositAddress;
      import com.digitalcredit.wallet.repository.DepositAddressRepository;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.retry.annotation.Backoff;
      import org.springframework.retry.annotation.Retryable;
      import org.springframework.scheduling.annotation.Async;
      import org.springframework.scheduling.annotation.Scheduled;
      import org.springframework.stereotype.Service;
      import org.web3j.protocol.Web3j;
      import org.web3j.protocol.core.DefaultBlockParameter;
      import org.web3j.protocol.core.methods.response.EthBlock;
      import org.web3j.protocol.core.methods.response.EthBlockNumber;
      import org.web3j.protocol.http.HttpService;
      
      import javax.annotation.PostConstruct;
      import java.math.BigInteger;
      import java.util.*;
      import java.util.concurrent.*;
      import java.util.stream.Collectors;
      
      @Service
      public class FaultTolerantBlockchainListener {
          private static final Logger logger = LoggerFactory.getLogger(FaultTolerantBlockchainListener.class);
          
          // 每個(gè)鏈的節(jié)點(diǎn)配置
          private final Map<String, List<BlockchainNodeConfig>> nodeConfigs = new ConcurrentHashMap<>();
          
          // 當(dāng)前活躍的Web3j客戶端
          private final Map<String, Web3j> activeWeb3jClients = new ConcurrentHashMap<>();
          
          // 節(jié)點(diǎn)健康狀態(tài)
          private final Map<String, Map<String, Boolean>> nodeHealthStatus = new ConcurrentHashMap<>();
          
          // 平臺(tái)充值地址緩存(小寫)
          private final Set<String> platformAddresses = ConcurrentHashMap.newKeySet();
          
          // 區(qū)塊處理線程池
          private final ExecutorService blockProcessingExecutor = new ThreadPoolExecutor(
              4, 16, 5, TimeUnit.MINUTES,
              new LinkedBlockingQueue<>(1000),
              new ThreadFactory() {
                  private int counter = 0;
                  @Override
                  public Thread newThread(Runnable r) {
                      Thread thread = new Thread(r, "block-processor-" + counter++);
                      thread.setDaemon(true);
                      return thread;
                  }
              },
              new ThreadPoolExecutor.CallerRunsPolicy() // 隊(duì)列滿時(shí)讓提交者執(zhí)行,防止任務(wù)丟失
          );
          
          @Autowired
          private List<BlockchainNodeConfig> allNodeConfigs;
          
          @Autowired
          private DepositAddressRepository depositAddressRepository;
          
          @Autowired
          private ChainTransactionRepository transactionRepository;
          
          @Autowired
          private DepositProcessingService depositProcessingService;
          
          @Autowired
          private BlockchainNodeHealthChecker nodeHealthChecker;
          
          // 上次處理的區(qū)塊高度
          private final Map<String, BigInteger> lastProcessedBlock = new ConcurrentHashMap<>();
          
          // 確認(rèn)數(shù)閾值配置
          private final Map<String, Integer> confirmationThresholds = new HashMap<>();
          {
              confirmationThresholds.put("ETH", 12);
              confirmationThresholds.put("BSC", 12);
              confirmationThresholds.put("BTC", 6);
              confirmationThresholds.put("LTC", 6);
          }
          
          @PostConstruct
          public void initialize() {
              // 1. 按鏈類型分組節(jié)點(diǎn)配置
              for (BlockchainNodeConfig config : allNodeConfigs) {
                  nodeConfigs.computeIfAbsent(config.getChainType(), k -> new ArrayList<>())
                            .add(config);
                  nodeHealthStatus.computeIfAbsent(config.getChainType(), k -> new ConcurrentHashMap<>())
                                 .put(config.getNodeUrl(), true);
              }
              
              // 2. 初始化活躍客戶端
              initializeActiveClients();
              
              // 3. 加載充值地址緩存
              refreshAddressCache();
              
              // 4. 加載上次處理的區(qū)塊高度
              loadLastProcessedBlocks();
              
              // 5. 啟動(dòng)節(jié)點(diǎn)健康檢查
              nodeHealthChecker.start();
              
              logger.info("Blockchain listener initialized with {} chains", nodeConfigs.size());
          }
          
          /**
           * 初始化活躍客戶端(選擇健康的節(jié)點(diǎn))
           */
          private void initializeActiveClients() {
              for (String chainType : nodeConfigs.keySet()) {
                  try {
                      Web3j client = getHealthyWeb3jClient(chainType);
                      if (client != null) {
                          activeWeb3jClients.put(chainType, client);
                          logger.info("Initialized active client for chain: {}", chainType);
                      }
                  } catch (Exception e) {
                      logger.error("Failed to initialize client for chain: {}", chainType, e);
                  }
              }
          }
          
          /**
           * 獲取健康的Web3j客戶端(帶故障轉(zhuǎn)移)
           */
          private Web3j getHealthyWeb3jClient(String chainType) {
              List<BlockchainNodeConfig> configs = nodeConfigs.getOrDefault(chainType, Collections.emptyList());
              if (configs.isEmpty()) {
                  logger.warn("No node configs for chain: {}", chainType);
                  return null;
              }
              
              // 按優(yōu)先級(jí)排序,優(yōu)先選擇主節(jié)點(diǎn)
              configs.sort(Comparator.comparingInt(BlockchainNodeConfig::getPriority));
              
              // 嘗試連接健康節(jié)點(diǎn)
              for (BlockchainNodeConfig config : configs) {
                  if (nodeHealthStatus.get(chainType).getOrDefault(config.getNodeUrl(), false)) {
                      try {
                          Web3j client = Web3j.build(new HttpService(config.getNodeUrl()));
                          // 測試連接
                          client.ethBlockNumber().sendAsync().get(5, TimeUnit.SECONDS);
                          return client;
                      } catch (Exception e) {
                          logger.warn("Node {} for chain {} is unhealthy: {}", 
                                    config.getNodeUrl(), chainType, e.getMessage());
                          nodeHealthStatus.get(chainType).put(config.getNodeUrl(), false);
                      }
                  }
              }
              
              logger.error("No healthy nodes available for chain: {}", chainType);
              return null;
          }
          
          /**
           * 定時(shí)刷新地址緩存(每30分鐘)
           */
          @Scheduled(fixedRate = 30 * 60 * 1000)
          public void refreshAddressCache() {
              try {
                  long start = System.currentTimeMillis();
                  List<String> addresses = depositAddressRepository.findAllActive()
                      .stream()
                      .map(DepositAddress::getAddress)
                      .map(String::toLowerCase)
                      .collect(Collectors.toList());
                  
                  platformAddresses.clear();
                  platformAddresses.addAll(addresses);
                  logger.info("Refreshed deposit addresses cache, count: {}, time: {}ms",
                            addresses.size(), System.currentTimeMillis() - start);
              } catch (Exception e) {
                  logger.error("Failed to refresh address cache", e);
              }
          }
          
          /**
           * 加載上次處理的區(qū)塊高度
           */
          private void loadLastProcessedBlocks() {
              for (String chainType : nodeConfigs.keySet()) {
                  BigInteger lastBlock = transactionRepository.findMaxBlockNumberByChain(chainType)
                      .orElse(BigInteger.ZERO);
                  lastProcessedBlock.put(chainType, lastBlock);
                  logger.info("Loaded last processed block for {}: {}", chainType, lastBlock);
              }
          }
          
          /**
           * 定時(shí)監(jiān)聽新區(qū)塊(每5秒)
           */
          @Scheduled(fixedRate = 5000)
          public void listenForNewBlocks() {
              for (String chainType : new ArrayList<>(nodeConfigs.keySet())) {
                  try {
                      processChain(chainType);
                  } catch (Exception e) {
                      logger.error("Error processing chain: {}", chainType, e);
                      // 嘗試切換客戶端
                      Web3j newClient = getHealthyWeb3jClient(chainType);
                      if (newClient != null) {
                          activeWeb3jClients.put(chainType, newClient);
                          logger.info("Switched to new client for chain: {}", chainType);
                      }
                  }
              }
          }
          
          /**
           * 處理單個(gè)鏈的區(qū)塊
           */
          private void processChain(String chainType) throws Exception {
              Web3j web3j = activeWeb3jClients.get(chainType);
              if (web3j == null) {
                  logger.warn("No active client for chain: {}", chainType);
                  return;
              }
              
              // 獲取最新區(qū)塊高度
              EthBlockNumber blockNumberResp = web3j.ethBlockNumber().send();
              BigInteger latestBlock = blockNumberResp.getBlockNumber();
              
              // 獲取上次處理的區(qū)塊高度
              BigInteger startBlock = lastProcessedBlock.getOrDefault(chainType, BigInteger.ZERO);
              
              // 計(jì)算需要處理的區(qū)塊范圍
              if (latestBlock.compareTo(startBlock) <= 0) {
                  // 沒有新區(qū)塊,檢查確認(rèn)中的交易
                  checkPendingTransactions(chainType, latestBlock);
                  return;
              }
              
              // 限制每次處理的區(qū)塊數(shù)量,防止過載
              BigInteger endBlock = startBlock.add(BigInteger.valueOf(10));
              if (endBlock.compareTo(latestBlock) > 0) {
                  endBlock = latestBlock;
              }
              
              logger.info("Processing chain {}: blocks {} to {}", chainType, startBlock, endBlock);
              
              // 并行處理區(qū)塊
              for (BigInteger blockNum = startBlock.add(BigInteger.ONE); 
                   blockNum.compareTo(endBlock) <= 0; 
                   blockNum = blockNum.add(BigInteger.ONE)) {
                  processSingleBlockAsync(chainType, web3j, blockNum);
              }
              
              // 更新最后處理的區(qū)塊高度
              lastProcessedBlock.put(chainType, endBlock);
          }
          
          /**
           * 異步處理單個(gè)區(qū)塊
           */
          @Async("blockProcessingExecutor")
          @Retryable(
              value = {Exception.class},
              maxAttempts = 3,
              backoff = @Backoff(delay = 1000, multiplier = 2)
          )
          public void processSingleBlockAsync(String chainType, Web3j web3j, BigInteger blockNumber) {
              try {
                  processSingleBlock(chainType, web3j, blockNumber);
              } catch (Exception e) {
                  logger.error("Failed to process block {} on chain {} after retries", 
                            blockNumber, chainType, e);
                  // 記錄失敗的區(qū)塊,供后續(xù)手動(dòng)處理
                  transactionRepository.recordFailedBlock(chainType, blockNumber, e.getMessage());
              }
          }
          
          /**
           * 處理單個(gè)區(qū)塊
           */
          private void processSingleBlock(String chainType, Web3j web3j, BigInteger blockNumber) throws Exception {
              EthBlock block = web3j.ethGetBlockByNumber(
                  DefaultBlockParameter.valueOf(blockNumber), 
                  true // 包含交易詳情
              ).send();
              
              if (block.getBlock() == null) {
                  logger.warn("Block {} not found on chain {}", blockNumber, chainType);
                  return;
              }
              
              // 處理區(qū)塊中的交易
              for (EthBlock.TransactionResult<?> txResult : block.getBlock().getTransactions()) {
                  EthBlock.TransactionObject tx = (EthBlock.TransactionObject) txResult.get();
                  
                  // 檢查是否是平臺(tái)地址的入金交易
                  if (tx.getTo() != null && platformAddresses.contains(tx.getTo().toLowerCase())) {
                      processDepositTransaction(chainType, tx, blockNumber);
                  }
              }
              
              logger.debug("Processed block {} on chain {}, transactions: {}",
                        blockNumber, chainType, block.getBlock().getTransactions().size());
          }
          
          /**
           * 處理充值交易
           */
          private void processDepositTransaction(String chainType, 
                                                EthBlock.TransactionObject tx, 
                                                BigInteger blockNumber) {
              // 檢查交易是否已處理
              if (transactionRepository.existsByTxHashAndChainType(tx.getHash(), chainType)) {
                  return;
              }
              
              // 創(chuàng)建交易記錄
              ChainTransaction transaction = new ChainTransaction();
              transaction.setTxHash(tx.getHash());
              transaction.setChainType(chainType);
              transaction.setFromAddress(tx.getFrom());
              transaction.setToAddress(tx.getTo());
              transaction.setAmount(tx.getValue());
              transaction.setBlockNumber(blockNumber);
              transaction.setGasPrice(tx.getGasPrice());
              transaction.setGasUsed(tx.getGas());
              transaction.setInputData(tx.getInput());
              transaction.setTimestamp(System.currentTimeMillis());
              
              // 初始狀態(tài):待確認(rèn)
              int requiredConfirmations = confirmationThresholds.getOrDefault(chainType, 12);
              transaction.setStatus(TransactionStatus.PENDING_CONFIRMATION);
              transaction.setRequiredConfirmations(requiredConfirmations);
              transaction.setCurrentConfirmations(BigInteger.ZERO);
              
              transactionRepository.save(transaction);
              logger.info("Found new deposit transaction {} on chain {}, amount: {}",
                        tx.getHash(), chainType, tx.getValue());
          }
          
          /**
           * 檢查待確認(rèn)的交易
           */
          private void checkPendingTransactions(String chainType, BigInteger latestBlock) {
              try {
                  List<ChainTransaction> pendingTxs = transactionRepository
                      .findByChainTypeAndStatus(chainType, TransactionStatus.PENDING_CONFIRMATION);
                  
                  for (ChainTransaction tx : pendingTxs) {
                      // 計(jì)算當(dāng)前確認(rèn)數(shù)
                      BigInteger confirmations = latestBlock.subtract(tx.getBlockNumber());
                      tx.setCurrentConfirmations(confirmations);
                      
                      // 檢查是否達(dá)到確認(rèn)閾值
                      if (confirmations.compareTo(BigInteger.valueOf(tx.getRequiredConfirmations())) >= 0) {
                          tx.setStatus(TransactionStatus.CONFIRMED);
                          transactionRepository.save(tx);
                          
                          // 提交給充值處理服務(wù)
                          depositProcessingService.processConfirmedDeposit(tx);
                      } else if (System.currentTimeMillis() - tx.getTimestamp() > 24 * 60 * 60 * 1000) {
                          // 超過24小時(shí)未確認(rèn),標(biāo)記為失敗
                          tx.setStatus(TransactionStatus.CONFIRMATION_FAILED);
                          transactionRepository.save(tx);
                          logger.warn("Transaction {} on chain {} failed to confirm within 24h",
                                    tx.getTxHash(), chainType);
                      } else {
                          transactionRepository.save(tx);
                      }
                  }
              } catch (Exception e) {
                  logger.error("Error checking pending transactions for chain {}", chainType, e);
              }
          }
      }
      
      

      3. 充值處理與資金劃轉(zhuǎn)服務(wù)(RocketMQ)

      0). 數(shù)據(jù)庫設(shè)計(jì)

      -- 充值記錄表
      CREATE TABLE deposit_record (
          id BIGINT AUTO_INCREMENT PRIMARY KEY,
          tx_hash VARCHAR(66) NOT NULL COMMENT '區(qū)塊鏈交易哈希',
          user_id BIGINT NOT NULL COMMENT '用戶ID',
          chain_type VARCHAR(20) NOT NULL COMMENT '區(qū)塊鏈類型(ETH/BSC等)',
          deposit_address VARCHAR(66) NOT NULL COMMENT '充值地址',
          amount DECIMAL(30,18) NOT NULL COMMENT '充值金額',
          block_number BIGINT NOT NULL COMMENT '區(qū)塊高度',
          status VARCHAR(20) NOT NULL COMMENT '狀態(tài)(PENDING/SUCCESS/FAILED)',
          failure_reason TEXT COMMENT '失敗原因',
          created_at BIGINT NOT NULL COMMENT '創(chuàng)建時(shí)間戳',
          completed_at BIGINT COMMENT '完成時(shí)間戳',
          UNIQUE KEY uk_tx_hash (tx_hash),
          KEY idx_user_id (user_id),
          KEY idx_status_create_time (status, created_at)
      ) ENGINE=InnoDB COMMENT='充值記錄表';
      
      -- 本地消息表
      CREATE TABLE local_message (
          id BIGINT AUTO_INCREMENT PRIMARY KEY,
          business_key VARCHAR(66) NOT NULL COMMENT '業(yè)務(wù)唯一標(biāo)識(shí)(如交易哈希)',
          message_type VARCHAR(20) NOT NULL COMMENT '消息類型(DEPOSIT_ARRIVAL/FUND_COLLECTION)',
          topic VARCHAR(50) NOT NULL COMMENT 'MQ主題',
          content TEXT NOT NULL COMMENT '消息內(nèi)容(JSON)',
          status VARCHAR(20) NOT NULL COMMENT '狀態(tài)(PENDING/SENDING/SENT/FAILED)',
          send_count INT NOT NULL DEFAULT 0 COMMENT '發(fā)送次數(shù)',
          max_retry_count INT NOT NULL DEFAULT 3 COMMENT '最大重試次數(shù)',
          next_send_time BIGINT NOT NULL COMMENT '下次發(fā)送時(shí)間戳',
          last_error TEXT COMMENT '最后錯(cuò)誤信息',
          created_at BIGINT NOT NULL COMMENT '創(chuàng)建時(shí)間戳',
          updated_at BIGINT COMMENT '更新時(shí)間戳',
          UNIQUE KEY uk_business_key_type (business_key, message_type),
          KEY idx_status_next_send (status, next_send_time)
      ) ENGINE=InnoDB COMMENT='本地消息表';
      
      -- 用戶賬戶表(簡化)
      CREATE TABLE user_account (
          id BIGINT AUTO_INCREMENT PRIMARY KEY,
          user_id BIGINT NOT NULL UNIQUE COMMENT '用戶ID',
          available_balance DECIMAL(30,18) NOT NULL DEFAULT 0 COMMENT '可用余額',
          total_balance DECIMAL(30,18) NOT NULL DEFAULT 0 COMMENT '總余額',
          updated_at BIGINT NOT NULL COMMENT '更新時(shí)間戳'
      ) ENGINE=InnoDB COMMENT='用戶賬戶表';
      
      

      1). 消息與業(yè)務(wù)實(shí)體類

      package com.digitalcredit.message.entity;
      
      import lombok.Data;
      
      import javax.persistence.*;
      import java.math.BigDecimal;
      
      @Data
      @Entity
      @Table(name = "local_message")
      public class LocalMessage {
          @Id
          @GeneratedValue(strategy = GenerationType.IDENTITY)
          private Long id;
      
          @Column(name = "business_key", nullable = false, length = 66)
          private String businessKey; // 交易哈希
      
          @Column(name = "message_type", nullable = false, length = 20)
          @Enumerated(EnumType.STRING)
          private MessageType messageType; // 消息類型
      
          @Column(name = "topic", nullable = false, length = 50)
          private String topic; // MQ主題
      
          @Column(name = "content", nullable = false, columnDefinition = "TEXT")
          private String content; // 消息內(nèi)容(JSON)
      
          @Column(name = "status", nullable = false, length = 20)
          @Enumerated(EnumType.STRING)
          private MessageStatus status; // 消息狀態(tài)
      
          @Column(name = "send_count", nullable = false)
          private Integer sendCount = 0; // 發(fā)送次數(shù)
      
          @Column(name = "max_retry_count", nullable = false)
          private Integer maxRetryCount = 3; // 最大重試次數(shù)
      
          @Column(name = "next_send_time", nullable = false)
          private Long nextSendTime; // 下次發(fā)送時(shí)間戳
      
          @Column(name = "last_error", columnDefinition = "TEXT")
          private String lastError; // 最后錯(cuò)誤信息
      
          @Column(name = "created_at", nullable = false)
          private Long createdAt; // 創(chuàng)建時(shí)間戳
      
          @Column(name = "updated_at")
          private Long updatedAt; // 更新時(shí)間戳
      
          // 消息類型枚舉
          public enum MessageType {
              DEPOSIT_ARRIVAL, // 資金到賬
              FUND_COLLECTION  // 資金歸集
          }
      
          // 消息狀態(tài)枚舉
          public enum MessageStatus {
              PENDING,   // 待發(fā)送
              SENDING,   // 發(fā)送中
              SENT,      // 已發(fā)送
              FAILED     // 發(fā)送失敗
          }
      }
      
      
      package com.digitalcredit.deposit.entity;
      
      import lombok.Data;
      
      import javax.persistence.*;
      import java.math.BigDecimal;
      
      @Data
      @Entity
      @Table(name = "deposit_record")
      public class DepositRecord {
          @Id
          @GeneratedValue(strategy = GenerationType.IDENTITY)
          private Long id;
      
          @Column(name = "tx_hash", nullable = false, length = 66, unique = true)
          private String txHash; // 區(qū)塊鏈交易哈希
      
          @Column(name = "user_id", nullable = false)
          private Long userId; // 用戶ID
      
          @Column(name = "chain_type", nullable = false, length = 20)
          private String chainType; // 區(qū)塊鏈類型
      
          @Column(name = "deposit_address", nullable = false, length = 66)
          private String depositAddress; // 充值地址
      
          @Column(name = "amount", nullable = false, precision = 30, scale = 18)
          private BigDecimal amount; // 充值金額
      
          @Column(name = "block_number", nullable = false)
          private Long blockNumber; // 區(qū)塊高度
      
          @Column(name = "status", nullable = false, length = 20)
          @Enumerated(EnumType.STRING)
          private DepositStatus status; // 狀態(tài)
      
          @Column(name = "failure_reason", columnDefinition = "TEXT")
          private String failureReason; // 失敗原因
      
          @Column(name = "created_at", nullable = false)
          private Long createdAt; // 創(chuàng)建時(shí)間戳
      
          @Column(name = "completed_at")
          private Long completedAt; // 完成時(shí)間戳
      
          // 充值狀態(tài)枚舉
          public enum DepositStatus {
              PENDING,   // 待處理
              SUCCESS,   // 成功
              FAILED     // 失敗
          }
      }
      
      
          
      

      2). 資金到賬消息消費(fèi)者

      package com.digitalcredit.mq.consumer;
      
      import com.alibaba.fastjson.JSON;
      import com.digitalcredit.account.service.AccountService;
      import com.digitalcredit.deposit.entity.DepositRecord;
      import com.digitalcredit.deposit.entity.enums.DepositStatus;
      import com.digitalcredit.deposit.repository.DepositRecordRepository;
      import com.digitalcredit.mq.message.DepositMessage;
      import org.apache.rocketmq.client.consumer.ConsumeConcurrentlyContext;
      import org.apache.rocketmq.client.consumer.ConsumeConcurrentlyStatus;
      import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
      import org.apache.rocketmq.common.message.MessageExt;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      import org.springframework.transaction.annotation.Transactional;
      
      import java.util.List;
      import java.util.Optional;
      
      @Component
      public class DepositMessageListener implements MessageListenerConcurrently {
          private static final Logger logger = LoggerFactory.getLogger(DepositMessageListener.class);
      
          @Autowired
          private AccountService accountService;
      
          @Autowired
          private DepositRecordRepository depositRecordRepository;
      
          /**
           * 處理資金劃轉(zhuǎn)消息
           */
          @Override
          @Transactional
          public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
              for (MessageExt msg : msgs) {
                  try {
                      String txHash = msg.getKeys();
                      logger.info("開始處理資金劃轉(zhuǎn)消息,txHash: {}, 重試次數(shù): {}", txHash, msg.getReconsumeTimes());
      
                      // 1. 解析消息
                      DepositMessage message = JSON.parseObject(
                          new String(msg.getBody(), "UTF-8"), 
                          DepositMessage.class
                      );
      
                      // 2. 防重復(fù)處理(檢查是否已處理)
                      Optional<DepositRecord> recordOpt = depositRecordRepository.findByTxHash(txHash);
                      if (recordOpt.isEmpty()) {
                          logger.error("未找到對(duì)應(yīng)的充值記錄,txHash: {}", txHash);
                          return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                      }
      
                      DepositRecord record = recordOpt.get();
                      if (record.getStatus() == DepositStatus.SUCCESS) {
                          logger.info("消息已處理,無需重復(fù)處理,txHash: {}", txHash);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                      }
      
                      // 3. 執(zhí)行資金劃轉(zhuǎn)(增加用戶余額)
                      accountService.increaseBalance(
                          message.getUserId(),
                          message.getAmount(),
                          "DEPOSIT",
                          txHash
                      );
      
                      // 4. 更新充值記錄狀態(tài)
                      record.setStatus(DepositStatus.SUCCESS);
                      record.setCompletedAt(System.currentTimeMillis());
                      depositRecordRepository.save(record);
      
                      logger.info("資金劃轉(zhuǎn)處理成功,txHash: {}", txHash);
                      return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
      
                  } catch (Exception e) {
                      logger.error("資金劃轉(zhuǎn)處理失敗,msgId: {}", msg.getMsgId(), e);
                      
                      // 判斷是否達(dá)到最大重試次數(shù)
                      if (msg.getReconsumeTimes() >= 5) {
                          // 記錄失敗原因,后續(xù)人工處理
                          String txHash = msg.getKeys();
                          updateDepositRecordToFailed(txHash, e.getMessage());
                          
                          logger.warn("消息達(dá)到最大重試次數(shù),將進(jìn)入死信隊(duì)列,txHash: {}", txHash);
                          return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 不再重試,進(jìn)入死信隊(duì)列
                      }
                      
                      // 未達(dá)最大重試次數(shù),返回重試
                      return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                  }
              }
              
              return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
          }
      
          /**
           * 更新充值記錄為失敗狀態(tài)
           */
          private void updateDepositRecordToFailed(String txHash, String reason) {
              try {
                  Optional<DepositRecord> recordOpt = depositRecordRepository.findByTxHash(txHash);
                  if (recordOpt.isPresent()) {
                      DepositRecord record = recordOpt.get();
                      record.setStatus(DepositStatus.FAILED);
                      record.setFailureReason(reason);
                      record.setCompletedAt(System.currentTimeMillis());
                      depositRecordRepository.save(record);
                  }
              } catch (Exception e) {
                  logger.error("更新充值記錄為失敗狀態(tài)失敗,txHash: {}", txHash, e);
              }
          }
      }
          
      

      四、關(guān)鍵技術(shù)點(diǎn)說明

      1. 區(qū)塊鏈節(jié)點(diǎn)連接策略

      • 主備節(jié)點(diǎn)機(jī)制:同時(shí)連接多個(gè)節(jié)點(diǎn),當(dāng)主節(jié)點(diǎn)故障時(shí)自動(dòng)切換到備用節(jié)點(diǎn)
      • 連接池管理:維護(hù)長期連接,減少握手開銷
      • 超時(shí)重試:設(shè)置合理的超時(shí)時(shí)間和重試策略,確保網(wǎng)絡(luò)波動(dòng)時(shí)的穩(wěn)定性

      2. 交易去重與冪等性保障

      • 使用交易哈希作為唯一標(biāo)識(shí),確保每筆交易只處理一次
      • 數(shù)據(jù)庫唯一索引:在充值記錄表中對(duì)交易哈希和鏈類型創(chuàng)建唯一索引
      • 狀態(tài)機(jī)設(shè)計(jì):明確的狀態(tài)流轉(zhuǎn)(PENDING → CONFIRMED/INVALID/ERROR),避免重復(fù)處理

      3. 性能優(yōu)化措施

      • 充值地址緩存:將平臺(tái)所有充值地址加載到內(nèi)存,減少數(shù)據(jù)庫查詢
      • 批量處理:區(qū)塊處理采用批量方式,提高效率
      • 異步處理:交易解析和資金劃轉(zhuǎn)采用異步方式,避免阻塞監(jiān)聽線程
      • 分區(qū)表:充值記錄表按時(shí)間分區(qū),提高查詢效率

      4. 容錯(cuò)與恢復(fù)機(jī)制

      • 斷點(diǎn)續(xù)傳:記錄已處理的區(qū)塊高度,服務(wù)重啟后從斷點(diǎn)繼續(xù)處理
      • 重試隊(duì)列:處理失敗的交易加入重試隊(duì)列,定時(shí)重試
      • 監(jiān)控告警:關(guān)鍵指標(biāo)(區(qū)塊同步延遲、未確認(rèn)充值數(shù)量)監(jiān)控和告警
      • 手動(dòng)干預(yù)接口:提供手動(dòng)處理異常充值的接口

      五、安全措施

      1. 私鑰安全

        • 充值地址私鑰加密存儲(chǔ)(AES-256)
        • 加密密鑰通過KMS(密鑰管理服務(wù))管理
        • 敏感操作(如資金劃轉(zhuǎn))需要多重簽名
      2. 交易驗(yàn)證

        • 多重驗(yàn)證:不僅驗(yàn)證地址,還驗(yàn)證金額和交易狀態(tài)
        • 防重放攻擊:檢查交易是否屬于當(dāng)前鏈
      3. 異常監(jiān)控

        • 大額充值預(yù)警:超過閾值的充值觸發(fā)人工審核
        • 異常地址監(jiān)控:對(duì)黑名單地址的轉(zhuǎn)賬進(jìn)行攔截
        • 頻率限制:監(jiān)控同一地址的頻繁轉(zhuǎn)賬行為

      六、總結(jié)

      鏈上充值監(jiān)聽與自動(dòng)劃轉(zhuǎn)系統(tǒng)需要兼顧實(shí)時(shí)性、準(zhǔn)確性和安全性。通過合理的架構(gòu)設(shè)計(jì)和技術(shù)選型,可以實(shí)現(xiàn)高效、可靠的充值處理流程。

      核心要點(diǎn):

      • 采用HD錢包技術(shù)為每個(gè)用戶生成唯一充值地址
      • 實(shí)時(shí)區(qū)塊監(jiān)聽與交易解析,確保不遺漏任何充值
      • 基于區(qū)塊確認(rèn)數(shù)的交易有效性驗(yàn)證機(jī)制
      • 完善的異常處理和容錯(cuò)恢復(fù)機(jī)制
      • 嚴(yán)格的安全措施保護(hù)用戶資金安全

      實(shí)際部署時(shí),應(yīng)根據(jù)支持的區(qū)塊鏈類型和用戶規(guī)模進(jìn)行水平擴(kuò)展,確保系統(tǒng)在高并發(fā)場景下的穩(wěn)定性和處理能力。

      posted @ 2025-07-28 18:01  ffffox  閱讀(35)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 国产极品美女网站在线观看| 欧美色丁香| 久久国产免费直播| 狠狠综合久久av一区二| 国产精品线在线精品| 精品国产av一区二区三区| 亚洲国产精品国自拍av| 91福利视频一区二区| 国产首页一区二区不卡| 亚洲精品国产综合久久一线| 韩国av无码| 日韩深夜福利视频在线观看| 色琪琪丁香婷婷综合久久| 视频一区二区三区四区久久| 亚洲日本欧美日韩中文字幕| 欧美一本大道香蕉综合视频| 久久中精品中文字幕入口| 国产一区二区三区黄网| 熟女在线视频一区二区三区 | 欧美日本在线| 五月婷婷中文字幕| 亚洲午夜精品久久久久久浪潮| 午夜福利片一区二区三区| 久久精品第九区免费观看| 乱人伦中文字幕成人网站在线| 亚洲av第一区二区三区| 国产精品香港三级国产av| 免费无码又爽又刺激成人| 千阳县| 国产成人综合久久亚洲精品| 深夜福利资源在线观看| 菠萝菠萝蜜午夜视频在线播放观看| 亚洲国产精品第一区二区| 国产三级a三级三级| 99久久亚洲综合网精品| 蜜芽久久人人超碰爱香蕉| 精品中文人妻中文字幕| 色偷一区国产精品| 亚洲无码精品视频| 久久亚洲女同第一区综合| 亚洲蜜臀av乱码久久|