<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Mongodb源碼分析--Mongos之balancer(均衡)

          在之前的一篇文章中,介紹了mongos的啟動流程,在那篇文章的結(jié)尾,介紹了mongos使用balancer來進(jìn)行均衡,今天就繼續(xù)講其實現(xiàn)方式。

          首先我們看一下Balancer及相關(guān)實現(xiàn)策略的類圖:
          
          
          可以看到Balancer類里包含一個BalancerPolicy,其指向一個均衡策略,該策略會實現(xiàn)查找并收集要遷移的chunk。
          
          這里先看一下Balancer的類定義,如下:

      //balace.h
      class Balancer : public BackgroundJob {
          
      public:
              Balancer();
              
      virtual ~Balancer();
              
      // BackgroundJob methods
              virtual void run();
              
      virtual string name() const { return "Balancer"; }

          
      private:
              typedef BalancerPolicy::ChunkInfo CandidateChunk;
              typedef shared_ptr
      <CandidateChunk> CandidateChunkPtr;

              
      //mongos名稱(hostname:port)
              string _myid;

              
      // Balancer 啟動時間
              time_t _started;

              
      // 前移的chunks數(shù)量
              int _balancedLastTime;

              
      // 均衡策略(確定要遷移的chunks)
              BalancerPolicy* _policy;

              
      //初始化,檢查balancer 能否鏈接到servers.該方法可能拋出網(wǎng)絡(luò)異常        
              bool _init();

              
      /**
               * 收集關(guān)于shards及chunks的信息,以及可能需要遷移的chunks
               * @param conn: 指向config server(s)連接
               * @param candidateChunks (IN/OUT): 可能需要遷移的chunks
               
      */
              
      void _doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks );

              
      /**
               * 逐個遷移chunk.并返回最終遷移的chunk數(shù)量
               * @param candidateChunks 可能需要遷移的chunks
               * @return number of chunks effectively moved
               
      */
              
      int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks );

              
      /*在config server(s)中標(biāo)記并前balancer為活動狀態(tài).*/
              
      void _ping( DBClientBase& conn );

              
      //當(dāng)configdb中的所有服務(wù)均可用時,返回true
              bool _checkOIDs();
          };

          
          可以看出balancer繼承自BackgroundJob,所以它是以后臺方式運(yùn)行的。了解了該類的方法和屬性之后,下面我們著手看一下mongos主函數(shù)中啟動balancer.go()的調(diào)用流程。因為balancer繼承自BackgroundJob,所以還要看一下BackgroundJob里go()方法的執(zhí)行代碼, 如下:
         
          //background.cpp 線程方式運(yùn)行下面的jobBody方法
          BackgroundJob& BackgroundJob::go() {
              boost::thread t( boost::bind( 
      &BackgroundJob::jobBody , this, _status ) );
              return *this;
          }

          
      ////background.cpp. Background object can be only be destroyed after jobBody() ran
          void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) {
              ....
              
      const string threadName = name();
              
      if! threadName.empty() )
                  setThreadName( threadName.c_str() );

              
      try {
                  run();
      //到這里,mongos開始執(zhí)行子類balancer中的run方法
              }
              ....

              
      if( status->deleteSelf )
                  delete 
      this;
          }
          
          上面代碼最終會將執(zhí)行流程轉(zhuǎn)到balancer類的run()方法,如下
         
      void Balancer::run() {

            
      /* this is the body of a BackgroundJob so if we throw 
              here we're basically ending the balancer thread prematurely */
              while ( ! inShutdown() ) {

                  
      if ( ! _init() ) {//檢查balancer是否鏈到config server和其它shard上
                      log() << "will retry to initialize balancer in one minute" << endl;
                      sleepsecs( 
      60 );
                      
      continue;
                  }

                  
      break;
              }
             //構(gòu)造鏈接串信息
              ConnectionString config = configServer.getConnectionString();
              
      //聲明分布式鎖
              DistributedLock balanceLock( config , "balancer" );

              
      while ( ! inShutdown() ) {//一直循環(huán)直到程序中斷或關(guān)閉

                  
      try {
                      
                      
      // 判斷chunk均衡功能是否有效
                      if ( ! grid.shouldBalance() ) {
                          log(
      1<< "skipping balancing round because balancing is disabled" << endl;
                          sleepsecs( 
      30 );
                          
      continue;
                      }
                      
                      
      //從鏈接池中獲取一個鏈接對象,如無鏈接則直接創(chuàng)建。更多內(nèi)容詳見connpool.cpp文件的
                      
      //DBClientBase* DBConnectionPool::get(const string& host) 方法.
                      ScopedDbConnection conn( config );

                      _ping( conn.conn() );
      //標(biāo)識鏈到config server的balancer為活動(live)狀態(tài)
                      if ( ! _checkOIDs() ) {
                          uassert( 
      13258 , "oids broken after resetting!" , _checkOIDs() );
                      }

                      
      //重載Shard集合信息(shard 狀態(tài))
                      Shard::reloadShardInfo();
                      //聲明balance鎖對象balanceLock
                      dist_lock_try lk( 
      &balanceLock , "doing balance round" );
                      if ( ! lk.got() ) {
                          log(
      1<< "skipping balancing round because another balancer is active" << endl;
                          conn.done();

                          sleepsecs( 
      30 ); // no need to wake up soon
                          continue;
                      }

                      log(
      1<< "*** start balancing round" << endl;

                      vector
      <CandidateChunkPtr> candidateChunks;
                      
      //獲取在shard集合中建議遷移的chunk信息(包含要遷移到的目標(biāo)shard信息)
                      _doBalanceRound( conn.conn() , &candidateChunks );
                      
      if ( candidateChunks.size() == 0 ) {//是否有要移動的chunk
                          log(1<< "no need to move any chunk" << endl;
                      }
                      
      else//開始遷移并返回最終遷移數(shù)量 {
                          _balancedLastTime 
      = _moveChunks( &candidateChunks );
                      }

                      log(
      1<< "*** end of balancing round" << endl;
                      conn.done();
      //將conn放到鏈接池中(為其它后續(xù)操作使用)

                      sleepsecs( _balancedLastTime 
      ? 5 : 10 );
                  }
                  
      catch ( std::exception& e ) {
                      log() 
      << "caught exception while doing balance: " << e.what() << endl;

                      
      // Just to match the opening statement if in log level 1
                      log(1<< "*** End of balancing round" << endl;

                      sleepsecs( 
      30 ); // sleep a fair amount b/c of error
                      continue;
                  }
              }
          }

          上面方法中主要是先構(gòu)造鏈接串,進(jìn)而構(gòu)造連接實例(注:這里使用了鏈接池的概念,我會在后續(xù)章節(jié)中專門介紹其實現(xiàn)機(jī)制)。之后刷新sharding中的相關(guān)信息(確保其有效性),之后調(diào)用_doBalanceRound()方法來收集可能要遷移的chunk(s)信息并最終完成遷移(使用_moveChunks方法)。

          下面我們就著重看一下這兩個方法的具體實現(xiàn).

          首先是_doBalanceRound方法:
         //balance.cpp
          void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ) {
              assert( candidateChunks );

              
      // 1. 通過查詢ShardsNS::collections來檢查是否有可用sharded集合來均衡chunk
              auto_ptr<DBClientCursor> cursor = conn.query( ShardNS::collection , BSONObj() );
              vector
      < string > collections;
              
      while ( cursor->more() ) {
                  BSONObj col 
      = cursor->next();

                  
      // sharded collections will have a shard "key".
                  if ( ! col["key"].eoo() )
                      collections.push_back( col[
      "_id"].String() );
              }
              cursor.reset();

              
      if ( collections.empty() ) {
                  log(
      1<< "no collections to balance" << endl;
                  
      return;
              }

              
      //獲取一個需要均衡的shard信息列表,表中shard信息包括maxsize, currsiez, drain, hsopsqueued
              vector<Shard> allShards;
              Shard::getAllShards( allShards );
              
      if ( allShards.size() < 2) {
                  log(
      1<< "can't balance without more active shards" << endl;
                  
      return;
              }
              
      //獲取allShards的相應(yīng)狀態(tài)信息交綁定到shardLimitMap相應(yīng)元素中,該shardLimitMap是一個從shardId到對象(BSONObj)的映射
              map< string, BSONObj > shardLimitsMap;
              
      for ( vector<Shard>::const_iterator it = allShards.begin(); it != allShards.end(); ++it ) {
                  
      const Shard& s = *it;
                  ShardStatus status 
      = s.getStatus();
                                          
                                                      //最大值 (單位:兆字節(jié), 0為不限制)       
                  BSONObj limitsObj 
      = BSON( ShardFields::maxSize( s.getMaxSize() ) <<  
                               LimitsFields::currSize( status.mapped() ) << //當(dāng)前時間狀態(tài)的信息
                               hardFields::draining( s.isDraining() )  << //當(dāng)前的shard是否正在被移除
                               LimitsFields::hasOpsQueued( status.hasOpsQueued() )//是否有回寫的隊列信息
                                          );

                  shardLimitsMap[ s.getName() ] 
      = limitsObj;
              }

              
      //遍歷collections集合,根據(jù)均衡策略(balancing policy) ,檢查是否有要遷移的chunk信息
              for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) {
                  
      const string& ns = *it;//集合的名空間            
                  map< string,vector<BSONObj> > shardToChunksMap;//從shardId 到chunks 的映射
                  cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) );
                  
      while ( cursor->more() ) {
                      BSONObj chunk 
      = cursor->next();
                      
      //以chunk所屬的shard為標(biāo)識,獲取一個chunks的集合來收集位于同一shard的chunk
                      vector<BSONObj>& chunks = shardToChunksMap[chunk["shard"].String()];
                      chunks.push_back( chunk.getOwned() );
                  }
                  cursor.reset();

                  
      if (shardToChunksMap.empty()) {
                      log(
      1<< "skipping empty collection (" << ns << ")";
                      
      continue;
                  }

                  
      for ( vector<Shard>::iterator i=allShards.begin(); i!=allShards.end(); ++i ) {
                      
      // this just makes sure there is an entry in shardToChunksMap for every shard
                      Shard s = *i;
                      shardToChunksMap[s.getName()].size();
                  }
                  
      //找出要遷移的chunk,包括源及目標(biāo)(要遷移到的)chunk的起始地址
                  CandidateChunk* p = _policy->balance( ns , shardLimitsMap , shardToChunksMap , _balancedLastTime /*number of moved chunks in last round*/);
                
      if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) );//存到要均衡的chunk集合中
              }
          }
         
          上面的_doBalanceRound方法主要構(gòu)造shardLimitsMap,shardToChunksMap這兩個實例對象集合(map<>類型),其中:
          
          shardLimitsMap:用于收集shard集合中一些“起數(shù)量限制”作用的參數(shù),如maxsize,draining,hasOpsQueued等,因為這幾個參數(shù)如果超出范圍或為true時,相應(yīng)shard 是不可以提供遷移服務(wù)的。
          shardToChunksMap:用于收集當(dāng)前shard中的chunk信息,以便后面的遍歷操作。

          收集了這些信息之后,通過調(diào)用 _policy->balance()方法來找出可能需要遷移的chunk().

          下面就看一下該均衡策略的具體實現(xiàn)(具體內(nèi)容參見注釋):
        
       //balacer_policy.cpp
           BalancerPolicy::ChunkInfo* BalancerPolicy::balance( const string& ns,
                  
      const ShardToLimitsMap& shardToLimitsMap,
                  
      const ShardToChunksMap& shardToChunksMap,
                  
      int balancedLastTime ) {
              pair
      <string,unsigned> min("",numeric_limits<unsigned>::max());
              pair
      <string,unsigned> max("",0);
              vector
      <string> drainingShards;
              
      //遍歷shard集合,找到min,max的匹配對象,以及draining的Shard信息
              for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ) {

                  
      // 遍歷shard,并查看其容量或可用空間是否被耗盡
                  const string& shard = i->first;
                  BSONObj shardLimits;
                  ShardToLimitsIter it 
      = shardToLimitsMap.find( shard );
                  
      if ( it != shardToLimitsMap.end() ) shardLimits = it->second;//獲取shard的信息,包括maxsize, currsiez, drain, hsopsqueued
                  const bool maxedOut = isSizeMaxed( shardLimits );//shard是否已滿
                  const bool draining = isDraining( shardLimits );//shard是否移除
                  const bool opsQueued = hasOpsQueued( shardLimits );//shard是否有寫回隊列

                  
      //是否合適接收chunk,滿足下面三個條件之一,則視為不合適
                  
      // + maxed out shards
                  
      // + draining shards
                  
      // + shards with operations queued for writeback
                  const unsigned size = i->second.size();//獲取當(dāng)前shard里的chunk數(shù)
                  if ( ! maxedOut && ! draining && ! opsQueued ) {
                      
      if ( size < min.second ) {//如果當(dāng)前shard中chunk數(shù)與min比較,找出最小size的shard
                          min = make_pair( shard , size );
                      }
                  }

                  
      // 檢查shard 是否應(yīng)該遷移(chunk donor)
                  
      // Draining shards 比 overloaded shards優(yōu)先級低
                  if ( size > max.second ) {
                      max 
      = make_pair( shard , size );//找出最大size的shard
                  }
                  
      if ( draining && (size > 0)) {
                      drainingShards.push_back( shard );
                  }
              }

              
      // 如果chunk沒有合適的shard接收, 意味著上面循環(huán)中都是類以draining等情況
              if ( min.second == numeric_limits<unsigned>::max() ) {
                  log() 
      << "no availalable shards to take chunks" << endl;
                  
      return NULL;
              }

              log(
      1<< "collection : " << ns << endl;
              log(
      1<< "donor      : " << max.second << " chunks on " << max.first << endl;
              log(
      1<< "receiver   : " << min.second << " chunks on " << min.first << endl;
              
      if ( ! drainingShards.empty() ) {
                  
      string drainingStr;
                  joinStringDelim( drainingShards, 
      &drainingStr, ',' );//用逗號將drainingShards連接起來
                  log(1<< "draining           : " << ! drainingShards.empty() << "(" << drainingShards.size() << ")" << endl;
              }

              
      // 通過優(yōu)先級解決不均衡問題.
              const int imbalance = max.second - min.second;//找出shard中最不均衡的size的差距
              const int threshold = balancedLastTime ? 2 : 8;
              
      string from, to;
              
      if ( imbalance >= threshold /*臨界點*/) {
                  from 
      = max.first;//將shard中chunk最多的作為源
                  to = min.first;//將shard中chunk最小的作為要遷移的目的地
              }
              
      else if ( ! drainingShards.empty() ) {
                  
      //對于那些draining的shard,隨機(jī)取出其中一個
                  from = drainingShards[ rand() % drainingShards.size() ];
                  to 
      = min.first;
              }
              
      else {
                  
      // 如已均衡,則返回
                  return NULL;
              }
              //找出要遷移的chunk集合的起始位置
              const vector<BSONObj>& chunksFrom = shardToChunksMap.find( from )->second;
              const vector<BSONObj>& chunksTo = shardToChunksMap.find( to )->second;//找出要遷移到的chunk集合目標(biāo)位置        
              BSONObj chunkToMove = pickChunk( chunksFrom , chunksTo );//最終選出(校正)要遷移的chunk的起始位置
              log() << "chose [" << from << "] to [" << to << "" << chunkToMove << endl;
              
      //返回上面balaner的操作結(jié)果來執(zhí)行后續(xù)的移動chunk操作
              return new ChunkInfo( ns, to, from, chunkToMove );
          }

          上面方法通過計算各個shard中的當(dāng)前chunk數(shù)量來推算出那個shard相對較空,并將其放到to(目標(biāo)shard),之后對可能要遷移的chunk進(jìn)行校驗,這里使用了pickChunk()方法,該方法具體實現(xiàn)如下:
         
      //balancer_policy.cpp
          
      //找出需要被遷移的chunk, 這里要考慮to端可能比from端chunks更多的情況
          BSONObj BalancerPolicy::pickChunk( const vector<BSONObj>& from, const vector<BSONObj>& to ) {
              
      // It is possible for a donor ('from') shard to have less chunks than a recevier one ('to')
              
      // if the donor is in draining mode.
              if ( to.size() == 0 )//如果目標(biāo)位置為空,表示可以將from中數(shù)據(jù)全部遷移過去
                  return from[0];
              
      /**wo='well ordered'.  fields must be in same order in each object.
                 Ordering is with respect to the signs of the elements
                 and allows ascending / descending key mixing.
                 @return  <0 if l<r. 0 if l==r. >0 if l>r
              
      */
              
      //如果要遷移的chunk中最小值與目標(biāo)位置的最大值相同,表示可以將from中數(shù)據(jù)全部遷移過去
              if ( from[0]["min"].Obj().woCompare( to[to.size()-1]["max"].Obj() , BSONObj() , false ) == 0 )
                  
      return from[0];
              
      //如果要遷移的chunk中最大值與目標(biāo)位置的最小值相同,表示可以將from中最后一個chunk遷移過去
              if ( from[from.size()-1]["max"].Obj().woCompare( to[0]["min"].Obj() , BSONObj() , false ) == 0 )
                  
      return from[from.size()-1];

              
      return from[0];
          }

          完成了校驗之后,得到的就是真正要遷移的chunk的啟始地址,之后就可以進(jìn)行遷移了。到這里,我們還要將執(zhí)行流程跳回到Balancer::run()方法里,看一下最終完成遷移工作的方法movechunk()的實現(xiàn)流程:
          
       
         //balance.cpp文件
          int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks ) {
              
      //最終遷移的chunk數(shù)
              int movedCount = 0;
              
      //遍歷要遷移chunks并逐一開始遷移
              for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) {
                  
      const CandidateChunk& chunkInfo = *it->get();
                  
      //獲取當(dāng)前chunk要使用的db配置信息
                  DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns );
                  assert( cfg );
                  
      //聲明ChunkManager使用它來
                  ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns );
                  assert( cm );
                  
      //獲取要遷移的chunk起始地址
                  const BSONObj& chunkToMove = chunkInfo.chunk;
                  ChunkPtr c 
      = cm->findChunk( chunkToMove["min"].Obj() );
                  
      //下面判斷執(zhí)行兩次,防止執(zhí)行split之后,系統(tǒng)在reload 情況下chunk可能出現(xiàn)min,max不一致情況
                  if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
                      
      // 這里主要防止別處執(zhí)行 split 操作造成負(fù)作用
                      cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */);
                      assert( cm );

                      c 
      = cm->findChunk( chunkToMove["min"].Obj() );
                      
      if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
                          log() 
      << "chunk mismatch after reload, ignoring will retry issue cm: "
                                
      << c->getMin() << " min: " << chunkToMove["min"].Obj() << endl;
                          
      continue;
                      }
                  }

                  BSONObj res;
                  
      //將chunk, 從當(dāng)前的shard ,移動到指定的shard,并累加遷移數(shù)量
                  if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , res ) ) {
                      movedCount
      ++;
                      
      continue;
                  }
                  
      //如遷移不成功,記入日志
                  
      // the move requires acquiring the collection metadata's lock, which can fail
                  log() << "balacer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to
                        
      << " chunk: " << chunkToMove << endl;
                  
      //chunk是否達(dá)到允許移動的最大尺寸,如果是,則對當(dāng)前shard執(zhí)行split操作
                  if ( res["chunkTooBig"].trueValue() ) {
                      
      // reload just to be safe
                      cm = cfg->getChunkManager( chunkInfo.ns );
                      assert( cm );
                      c 
      = cm->findChunk( chunkToMove["min"].Obj() );
                      
                      log() 
      << "forcing a split because migrate failed for size reasons" << endl;
                      
                      res 
      = BSONObj();
                      
      //對當(dāng)前的shards進(jìn)行分割(獲取適合的分割點),該方法有些復(fù)雜,我會抽時間寫文章介紹
                      c->singleSplit( true , res );
                      log() 
      << "forced split results: " << res << endl;

                      
      // TODO: if the split fails, mark as jumbo SERVER-2571
                  }
              }

              
      return movedCount;
          }

          上面代碼就是依次遍歷要遷移的chunk,分別根據(jù)其ns信息獲取相應(yīng)的ChunkManager(該類主要執(zhí)行chunk的管理,比如CRUD等),之后就通過該ChunkManager找出當(dāng)前chunk中最小的值(min:參見chunk.h文件,我這里把min,max理解為當(dāng)前chunk中最小和最大記錄對象信息)chunk信息,并開始遷移。
              
          按照慣例,這里還是用一個時序列來大體回顧一下balancer的執(zhí)行流程,如下:
       
          

          好了,今天的內(nèi)容就先到這里了。


          原文鏈接:http://www.rzrgm.cn/daizhj/archive/2011/05/23/mongos_balancer_source_code.html
          作者: daizhj, 代震軍   
          微博: http://t.sina.com.cn/daizhj
          Tags: mongodb,c++,balance,chunk,shard,source code
      posted @ 2011-05-23 10:53  代震軍  閱讀(11654)  評論(10)    收藏  舉報
      主站蜘蛛池模板: 伊人久久精品无码麻豆一区| 视频一区二区不中文字幕| 四虎成人在线观看免费| 色欧美片视频在线观看| 亚洲一区无码精品色| 国产乱久久亚洲国产精品| 久久精品国产99久久久古代| 中西区| 国产免费网站看v片元遮挡| 亚洲AV片一区二区三区| 中文字幕人妻在线精品| 激情综合色区网激情五月| 熟妇的奶头又大又长奶水视频 | 亚洲精品男男一区二区| 婷婷综合缴情亚洲| 麻豆果冻国产剧情av在线播放| 亚洲欧美成人综合久久久| 久久人体视频| 天天噜噜日日久久综合网| 国产尤物精品自在拍视频首页| 精品无码国产日韩制服丝袜| 中文字幕在线日韩| 天天综合色一区二区三区| 97精品亚成在人线免视频| 午夜欧美精品久久久久久久| 四虎成人高清永久免费看| 欧美日韩精品一区二区三区高清视频| 亚洲国产中文字幕精品| 国产午夜视频在线观看| 亚洲国产日韩在线视频| 国产成人精品永久免费视频| 麻豆aⅴ精品无码一区二区| 亚洲高清 一区二区三区| 中文字幕有码日韩精品| 婷婷色香五月综合缴缴情香蕉| 国产免费午夜福利在线观看| 中国女人内谢69xxxx| 亚洲欧美色综合影院| 熟女精品视频一区二区三区| 国产午夜亚洲精品福利| julia无码中文字幕一区|