<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      ClickHouse UPDATE 機(jī)制詳解

      ClickHouse UPDATE 機(jī)制詳解

      問題現(xiàn)象

      在使用ClickHouse進(jìn)行UPDATE操作時(shí),經(jīng)常會(huì)遇到這樣的現(xiàn)象:

      UPDATE ethereum.block_tasks SET
          status = 'pending', 
          owner = 'consumer-1_1758676754070328000', 
          assigned_at = '2025-09-24 09:19:14.07', 
          updated_at = '2025-09-24 09:19:14.07'
      WHERE start_block = 12345;
      

      執(zhí)行結(jié)果:

      • RowsAffected = 0 ?
      • 但通過SELECT查詢卻能查到更新后的數(shù)據(jù) ?

      這種看似矛盾的現(xiàn)象讓很多開發(fā)者困惑,實(shí)際上是ClickHouse UPDATE機(jī)制的正常行為。

      ClickHouse UPDATE機(jī)制原理

      1. 異步Mutations機(jī)制

      ClickHouse的UPDATE操作不是傳統(tǒng)的就地更新,而是通過mutations機(jī)制異步處理:

      傳統(tǒng)數(shù)據(jù)庫(kù)UPDATE:
      [數(shù)據(jù)] → [直接修改] → [立即生效]
      
      ClickHouse UPDATE:
      [數(shù)據(jù)] → [創(chuàng)建mutation] → [后臺(tái)異步處理] → [最終生效]
      

      2. 執(zhí)行流程

      UPDATE table SET column = 'value' WHERE condition;
      

      執(zhí)行步驟:

      1. 提交mutation:ClickHouse立即返回,但實(shí)際更新在后臺(tái)進(jìn)行
      2. 異步處理:后臺(tái)進(jìn)程處理mutation,重寫相關(guān)數(shù)據(jù)塊
      3. 最終一致性:查詢時(shí)總是返回最新數(shù)據(jù)

      3. 為什么RowsAffected = 0

      • RowsAffected = 0 表示mutation已成功提交到隊(duì)列
      • 不表示實(shí)際影響的行數(shù)
      • 實(shí)際更新在后臺(tái)異步進(jìn)行
      • 這是ClickHouse的設(shè)計(jì)特性,不是錯(cuò)誤

      監(jiān)控Mutation狀態(tài)

      1. 查看Mutation隊(duì)列

      -- 查看所有mutations
      SELECT 
          mutation_id,
          table,
          command,
          create_time,
          is_done,
          latest_failed_part,
          latest_fail_reason
      FROM system.mutations 
      WHERE table = 'block_tasks' 
      ORDER BY create_time DESC 
      LIMIT 10;
      

      2. 監(jiān)控執(zhí)行進(jìn)度

      -- 查看未完成的mutations
      SELECT count() as pending_mutations
      FROM system.mutations 
      WHERE table = 'block_tasks' 
      AND is_done = 0;
      
      -- 查看最近的mutation詳情
      SELECT 
          mutation_id,
          create_time,
          is_done,
          elapsed_time
      FROM system.mutations 
      WHERE table = 'block_tasks' 
      ORDER BY create_time DESC 
      LIMIT 1;
      

      3. 檢查Mutation性能

      -- 查看mutation性能統(tǒng)計(jì)
      SELECT 
          table,
          count() as total_mutations,
          sum(is_done) as completed_mutations,
          avg(elapsed_time) as avg_elapsed_time
      FROM system.mutations 
      WHERE table = 'block_tasks'
      GROUP BY table;
      

      等待時(shí)間估算

      1. 影響因子

      因素 影響程度 說明
      數(shù)據(jù)量 數(shù)據(jù)越多,處理時(shí)間越長(zhǎng)
      數(shù)據(jù)塊數(shù)量 每個(gè)塊需要單獨(dú)處理
      系統(tǒng)負(fù)載 CPU、內(nèi)存、磁盤I/O
      UPDATE復(fù)雜度 子查詢、批量更新
      并發(fā)度 其他mutation的競(jìng)爭(zhēng)

      2. 典型等待時(shí)間

      小表(<10萬行):     5-30秒
      中等表(10萬-1000萬行):  1-10分鐘
      大表(>1000萬行):   10-60分鐘
      

      3. 實(shí)際測(cè)試數(shù)據(jù)

      -- 測(cè)試不同規(guī)模的UPDATE時(shí)間
      -- 100萬行數(shù)據(jù),簡(jiǎn)單UPDATE:約2-5分鐘
      -- 1000萬行數(shù)據(jù),批量UPDATE:約10-30分鐘
      -- 復(fù)雜子查詢UPDATE:時(shí)間增加2-5倍
      

      代碼實(shí)現(xiàn)方案

      1. Go語言等待實(shí)現(xiàn)

      package main
      
      import (
          "fmt"
          "time"
          "gorm.io/gorm"
      )
      
      // 等待mutation完成的通用函數(shù)
      func waitForMutation(db *gorm.DB, tableName string, timeout time.Duration) error {
          start := time.Now()
          ticker := time.NewTicker(1 * time.Second)
          defer ticker.Stop()
          
          for {
              select {
              case <-ticker.C:
                  var count int64
                  err := db.Raw("SELECT count() FROM system.mutations WHERE table = ? AND is_done = 0", tableName).Scan(&count).Error
                  if err != nil {
                      return err
                  }
                  
                  if count == 0 {
                      fmt.Printf("Mutation completed in %v\n", time.Since(start))
                      return nil
                  }
                  
                  // 檢查超時(shí)
                  if time.Since(start) > timeout {
                      return fmt.Errorf("mutation timeout after %v", timeout)
                  }
                  
              case <-time.After(timeout):
                  return fmt.Errorf("mutation timeout after %v", timeout)
              }
          }
      }
      
      // 使用示例
      func updateBlockTask(db *gorm.DB, taskID int64) error {
          // 執(zhí)行UPDATE
          err := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Error
          if err != nil {
              return err
          }
          
          // 等待mutation完成
          return waitForMutation(db, "block_tasks", 5*time.Minute)
      }
      

      2. 帶進(jìn)度顯示的等待

      // 顯示mutation進(jìn)度的等待函數(shù)
      func waitForMutationWithProgress(db *gorm.DB, tableName string, timeout time.Duration) error {
          start := time.Now()
          ticker := time.NewTicker(2 * time.Second)
          defer ticker.Stop()
          
          for {
              select {
              case <-ticker.C:
                  var mutations []struct {
                      MutationID string    `gorm:"column:mutation_id"`
                      CreateTime time.Time `gorm:"column:create_time"`
                      IsDone     bool      `gorm:"column:is_done"`
                  }
                  
                  err := db.Raw(`
                      SELECT mutation_id, create_time, is_done 
                      FROM system.mutations 
                      WHERE table = ? AND is_done = 0 
                      ORDER BY create_time DESC 
                      LIMIT 5
                  `, tableName).Scan(&mutations).Error
                  
                  if err != nil {
                      return err
                  }
                  
                  if len(mutations) == 0 {
                      fmt.Printf("All mutations completed in %v\n", time.Since(start))
                      return nil
                  }
                  
                  // 顯示進(jìn)度
                  elapsed := time.Since(start)
                  fmt.Printf("Mutation in progress for %v, %d pending...\n", elapsed, len(mutations))
                  
                  // 檢查超時(shí)
                  if elapsed > timeout {
                      return fmt.Errorf("mutation timeout after %v", timeout)
                  }
                  
              case <-time.After(timeout):
                  return fmt.Errorf("mutation timeout after %v", timeout)
              }
          }
      }
      

      3. 批量更新優(yōu)化

      // 批量更新,減少mutation數(shù)量
      func batchUpdateTasks(db *gorm.DB, tasks []*model.BlockTask) error {
          return db.Transaction(func(tx *gorm.DB) error {
              for _, task := range tasks {
                  err := tx.Table(model.BlockTaskTableName).
                      Where("start_block = ? AND end_block = ?", task.StartBlock, task.EndBlock).
                      Updates(map[string]interface{}{
                          "status":     task.Status,
                          "updated_at": time.Now(),
                      }).Error
                  if err != nil {
                      return err
                  }
              }
              return nil
          })
      }
      

      最佳實(shí)踐

      1. 設(shè)置合理的超時(shí)時(shí)間

      // 根據(jù)表大小動(dòng)態(tài)設(shè)置超時(shí)時(shí)間
      func getMutationTimeout(tableSize int64) time.Duration {
          switch {
          case tableSize < 100000:
              return 1 * time.Minute
          case tableSize < 1000000:
              return 5 * time.Minute
          case tableSize < 10000000:
              return 15 * time.Minute
          default:
              return 30 * time.Minute
          }
      }
      

      2. 異步處理策略

      // 對(duì)于非關(guān)鍵更新,使用異步處理
      func asyncUpdate(db *gorm.DB, taskID int64) {
          go func() {
              err := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Error
              if err != nil {
                  log.Printf("Async update failed: %v", err)
              }
          }()
      }
      

      3. 錯(cuò)誤處理和重試

      // 帶重試的更新函數(shù)
      func updateWithRetry(db *gorm.DB, taskID int64, maxRetries int) error {
          for i := 0; i < maxRetries; i++ {
              err := db.Exec("UPDATE block_tasks SET status = 'finished' WHERE id = ?", taskID).Error
              if err == nil {
                  // 等待mutation完成
                  err = waitForMutation(db, "block_tasks", 5*time.Minute)
                  if err == nil {
                      return nil
                  }
              }
              
              if i < maxRetries-1 {
                  time.Sleep(time.Duration(i+1) * time.Second) // 指數(shù)退避
              }
          }
          return fmt.Errorf("update failed after %d retries", maxRetries)
      }
      

      常見問題解決

      1. Mutation卡住不動(dòng)

      -- 檢查是否有失敗的mutations
      SELECT 
          mutation_id,
          latest_failed_part,
          latest_fail_reason
      FROM system.mutations 
      WHERE table = 'block_tasks' 
      AND is_done = 0 
      AND latest_failed_part != '';
      

      2. 性能優(yōu)化

      -- 優(yōu)化mutation性能的設(shè)置
      ALTER TABLE block_tasks MODIFY SETTING 
          number_of_mutations_to_throw = 100,
          number_of_mutations_to_delay = 50;
      

      3. 監(jiān)控和告警

      // 監(jiān)控mutation積壓
      func monitorMutationBacklog(db *gorm.DB) {
          ticker := time.NewTicker(30 * time.Second)
          go func() {
              for range ticker.C {
                  var count int64
                  db.Raw("SELECT count() FROM system.mutations WHERE is_done = 0").Scan(&count)
                  
                  if count > 10 {
                      log.Printf("Warning: %d mutations pending", count)
                  }
              }
          }()
      }
      

      總結(jié)

      ClickHouse的UPDATE機(jī)制具有以下特點(diǎn):

      1. 異步處理:UPDATE立即返回,實(shí)際更新在后臺(tái)進(jìn)行
      2. 最終一致性:查詢時(shí)總是返回最新數(shù)據(jù)
      3. RowsAffected不可靠:不能依賴此值判斷更新是否成功
      4. 需要等待機(jī)制:通過監(jiān)控system.mutations表等待完成
      5. 性能考慮:大表更新可能需要較長(zhǎng)時(shí)間

      關(guān)鍵要點(diǎn):

      • 理解異步機(jī)制,不要被RowsAffected = 0誤導(dǎo)
      • 實(shí)現(xiàn)等待機(jī)制,確保更新完成
      • 設(shè)置合理的超時(shí)時(shí)間
      • 監(jiān)控mutation狀態(tài),及時(shí)發(fā)現(xiàn)問題
      • 考慮異步處理,提高系統(tǒng)響應(yīng)性

      這種機(jī)制雖然增加了復(fù)雜性,但提供了更好的并發(fā)性能和最終一致性保證。

      posted @ 2025-09-24 11:50  若-飛  閱讀(59)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 临高县| 99久久免费只有精品国产| 国产精品爽爽久久久久久| 国产一区二三区日韩精品| 国产午夜91福利一区二区| 久久天天躁综合夜夜黑人鲁色| 天堂亚洲免费视频| 亚洲成人四虎在线播放| 久久99热只有频精品8| 国产精品天天看天天狠| 亚洲 欧美 中文 日韩aⅴ| 日韩亚洲精品中文字幕| 疯狂三人交性欧美| 亚洲一区二区三区| 新久久国产色av免费看| 一区二区三区成人| 国产乱久久亚洲国产精品| 国产精品中文字幕日韩| 亚洲精品无码久久久影院相关影片 | 恩施市| 国产亚洲综合区成人国产| 国产资源精品中文字幕| 名山县| 人妻va精品va欧美va| 99久热在线精品视频| 久久精品免视看国产成人| 四虎永久播放地址免费| 久久精品久久电影免费理论片| 成人aⅴ综合视频国产| 99人体免费视频| 一本大道久久香蕉成人网| 亚洲精品中文字幕第一页| 日本一区二区三区在线 |观看| 日本熟妇XXXX潮喷视频| 国产精品高清视亚洲乱码| 国产偷窥厕所一区二区| 国产成人人综合亚洲欧美丁香花| 亚洲av无码一区二区三区网站| 国产av第一次处破| 无码人妻斩一区二区三区| 国产一区精品在线免费看|