Mongodb源碼分析--Mongos之balancer(均衡)
在之前的一篇文章中,介紹了mongos的啟動流程,在那篇文章的結(jié)尾,介紹了mongos使用balancer來進(jìn)行均衡,今天就繼續(xù)講其實現(xiàn)方式。
首先我們看一下Balancer及相關(guān)實現(xiàn)策略的類圖:

可以看到Balancer類里包含一個BalancerPolicy,其指向一個均衡策略,該策略會實現(xiàn)查找并收集要遷移的chunk。
這里先看一下Balancer的類定義,如下:
可以看出balancer繼承自BackgroundJob,所以它是以后臺方式運(yùn)行的。了解了該類的方法和屬性之后,下面我們著手看一下mongos主函數(shù)中啟動balancer.go()的調(diào)用流程。因為balancer繼承自BackgroundJob,所以還要看一下BackgroundJob里go()方法的執(zhí)行代碼, 如下:
上面代碼最終會將執(zhí)行流程轉(zhuǎn)到balancer類的run()方法,如下
上面方法中主要是先構(gòu)造鏈接串,進(jìn)而構(gòu)造連接實例(注:這里使用了鏈接池的概念,我會在后續(xù)章節(jié)中專門介紹其實現(xiàn)機(jī)制)。之后刷新sharding中的相關(guān)信息(確保其有效性),之后調(diào)用_doBalanceRound()方法來收集可能要遷移的chunk(s)信息并最終完成遷移(使用_moveChunks方法)。
下面我們就著重看一下這兩個方法的具體實現(xiàn).
首先是_doBalanceRound方法:
上面的_doBalanceRound方法主要構(gòu)造shardLimitsMap,shardToChunksMap這兩個實例對象集合(map<>類型),其中:
shardLimitsMap:用于收集shard集合中一些“起數(shù)量限制”作用的參數(shù),如maxsize,draining,hasOpsQueued等,因為這幾個參數(shù)如果超出范圍或為true時,相應(yīng)shard 是不可以提供遷移服務(wù)的。
shardToChunksMap:用于收集當(dāng)前shard中的chunk信息,以便后面的遍歷操作。
收集了這些信息之后,通過調(diào)用 _policy->balance()方法來找出可能需要遷移的chunk().
下面就看一下該均衡策略的具體實現(xiàn)(具體內(nèi)容參見注釋):
上面方法通過計算各個shard中的當(dāng)前chunk數(shù)量來推算出那個shard相對較空,并將其放到to(目標(biāo)shard),之后對可能要遷移的chunk進(jìn)行校驗,這里使用了pickChunk()方法,該方法具體實現(xiàn)如下:
完成了校驗之后,得到的就是真正要遷移的chunk的啟始地址,之后就可以進(jìn)行遷移了。到這里,我們還要將執(zhí)行流程跳回到Balancer::run()方法里,看一下最終完成遷移工作的方法movechunk()的實現(xiàn)流程:
上面代碼就是依次遍歷要遷移的chunk,分別根據(jù)其ns信息獲取相應(yīng)的ChunkManager(該類主要執(zhí)行chunk的管理,比如CRUD等),之后就通過該ChunkManager找出當(dāng)前chunk中最小的值(min:參見chunk.h文件,我這里把min,max理解為當(dāng)前chunk中最小和最大記錄對象信息)chunk信息,并開始遷移。
按照慣例,這里還是用一個時序列來大體回顧一下balancer的執(zhí)行流程,如下:

好了,今天的內(nèi)容就先到這里了。
原文鏈接:http://www.rzrgm.cn/daizhj/archive/2011/05/23/mongos_balancer_source_code.html
作者: daizhj, 代震軍
微博: http://t.sina.com.cn/daizhj
Tags: mongodb,c++,balance,chunk,shard,source code
首先我們看一下Balancer及相關(guān)實現(xiàn)策略的類圖:

可以看到Balancer類里包含一個BalancerPolicy,其指向一個均衡策略,該策略會實現(xiàn)查找并收集要遷移的chunk。
這里先看一下Balancer的類定義,如下:
//balace.h
class Balancer : public BackgroundJob {
public:
Balancer();
virtual ~Balancer();
// BackgroundJob methods
virtual void run();
virtual string name() const { return "Balancer"; }
private:
typedef BalancerPolicy::ChunkInfo CandidateChunk;
typedef shared_ptr<CandidateChunk> CandidateChunkPtr;
//mongos名稱(hostname:port)
string _myid;
// Balancer 啟動時間
time_t _started;
// 前移的chunks數(shù)量
int _balancedLastTime;
// 均衡策略(確定要遷移的chunks)
BalancerPolicy* _policy;
//初始化,檢查balancer 能否鏈接到servers.該方法可能拋出網(wǎng)絡(luò)異常
bool _init();
/**
* 收集關(guān)于shards及chunks的信息,以及可能需要遷移的chunks
* @param conn: 指向config server(s)連接
* @param candidateChunks (IN/OUT): 可能需要遷移的chunks
*/
void _doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks );
/**
* 逐個遷移chunk.并返回最終遷移的chunk數(shù)量
* @param candidateChunks 可能需要遷移的chunks
* @return number of chunks effectively moved
*/
int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks );
/*在config server(s)中標(biāo)記并前balancer為活動狀態(tài).*/
void _ping( DBClientBase& conn );
//當(dāng)configdb中的所有服務(wù)均可用時,返回true
bool _checkOIDs();
};
class Balancer : public BackgroundJob {
public:
Balancer();
virtual ~Balancer();
// BackgroundJob methods
virtual void run();
virtual string name() const { return "Balancer"; }
private:
typedef BalancerPolicy::ChunkInfo CandidateChunk;
typedef shared_ptr<CandidateChunk> CandidateChunkPtr;
//mongos名稱(hostname:port)
string _myid;
// Balancer 啟動時間
time_t _started;
// 前移的chunks數(shù)量
int _balancedLastTime;
// 均衡策略(確定要遷移的chunks)
BalancerPolicy* _policy;
//初始化,檢查balancer 能否鏈接到servers.該方法可能拋出網(wǎng)絡(luò)異常
bool _init();
/**
* 收集關(guān)于shards及chunks的信息,以及可能需要遷移的chunks
* @param conn: 指向config server(s)連接
* @param candidateChunks (IN/OUT): 可能需要遷移的chunks
*/
void _doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks );
/**
* 逐個遷移chunk.并返回最終遷移的chunk數(shù)量
* @param candidateChunks 可能需要遷移的chunks
* @return number of chunks effectively moved
*/
int _moveChunks( const vector<CandidateChunkPtr>* candidateChunks );
/*在config server(s)中標(biāo)記并前balancer為活動狀態(tài).*/
void _ping( DBClientBase& conn );
//當(dāng)configdb中的所有服務(wù)均可用時,返回true
bool _checkOIDs();
};
可以看出balancer繼承自BackgroundJob,所以它是以后臺方式運(yùn)行的。了解了該類的方法和屬性之后,下面我們著手看一下mongos主函數(shù)中啟動balancer.go()的調(diào)用流程。因為balancer繼承自BackgroundJob,所以還要看一下BackgroundJob里go()方法的執(zhí)行代碼, 如下:
//background.cpp 線程方式運(yùn)行下面的jobBody方法
BackgroundJob& BackgroundJob::go() {
boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) );
return *this;
}
////background.cpp. Background object can be only be destroyed after jobBody() ran
void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) {
....
const string threadName = name();
if( ! threadName.empty() )
setThreadName( threadName.c_str() );
try {
run();//到這里,mongos開始執(zhí)行子類balancer中的run方法
}
....
if( status->deleteSelf )
delete this;
}
BackgroundJob& BackgroundJob::go() {
boost::thread t( boost::bind( &BackgroundJob::jobBody , this, _status ) );
return *this;
}
////background.cpp. Background object can be only be destroyed after jobBody() ran
void BackgroundJob::jobBody( boost::shared_ptr<JobStatus> status ) {
....
const string threadName = name();
if( ! threadName.empty() )
setThreadName( threadName.c_str() );
try {
run();//到這里,mongos開始執(zhí)行子類balancer中的run方法
}
....
if( status->deleteSelf )
delete this;
}
上面代碼最終會將執(zhí)行流程轉(zhuǎn)到balancer類的run()方法,如下
void Balancer::run() {
/* this is the body of a BackgroundJob so if we throw
here we're basically ending the balancer thread prematurely */
while ( ! inShutdown() ) {
if ( ! _init() ) {//檢查balancer是否鏈到config server和其它shard上
log() << "will retry to initialize balancer in one minute" << endl;
sleepsecs( 60 );
continue;
}
break;
}
/* this is the body of a BackgroundJob so if we throw
here we're basically ending the balancer thread prematurely */
while ( ! inShutdown() ) {
if ( ! _init() ) {//檢查balancer是否鏈到config server和其它shard上
log() << "will retry to initialize balancer in one minute" << endl;
sleepsecs( 60 );
continue;
}
break;
}
//構(gòu)造鏈接串信息
ConnectionString config = configServer.getConnectionString();
//聲明分布式鎖
DistributedLock balanceLock( config , "balancer" );
while ( ! inShutdown() ) {//一直循環(huán)直到程序中斷或關(guān)閉
try {
// 判斷chunk均衡功能是否有效
if ( ! grid.shouldBalance() ) {
log(1) << "skipping balancing round because balancing is disabled" << endl;
sleepsecs( 30 );
continue;
}
//從鏈接池中獲取一個鏈接對象,如無鏈接則直接創(chuàng)建。更多內(nèi)容詳見connpool.cpp文件的
//DBClientBase* DBConnectionPool::get(const string& host) 方法.
ScopedDbConnection conn( config );
_ping( conn.conn() );//標(biāo)識鏈到config server的balancer為活動(live)狀態(tài)
if ( ! _checkOIDs() ) {
uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
}
//重載Shard集合信息(shard 狀態(tài))
Shard::reloadShardInfo();
if ( ! lk.got() ) {
log(1) << "skipping balancing round because another balancer is active" << endl;
conn.done();
sleepsecs( 30 ); // no need to wake up soon
continue;
}
log(1) << "*** start balancing round" << endl;
vector<CandidateChunkPtr> candidateChunks;
//獲取在shard集合中建議遷移的chunk信息(包含要遷移到的目標(biāo)shard信息)
_doBalanceRound( conn.conn() , &candidateChunks );
if ( candidateChunks.size() == 0 ) {//是否有要移動的chunk
log(1) << "no need to move any chunk" << endl;
}
else//開始遷移并返回最終遷移數(shù)量 {
_balancedLastTime = _moveChunks( &candidateChunks );
}
log(1) << "*** end of balancing round" << endl;
conn.done();//將conn放到鏈接池中(為其它后續(xù)操作使用)
sleepsecs( _balancedLastTime ? 5 : 10 );
}
catch ( std::exception& e ) {
log() << "caught exception while doing balance: " << e.what() << endl;
// Just to match the opening statement if in log level 1
log(1) << "*** End of balancing round" << endl;
sleepsecs( 30 ); // sleep a fair amount b/c of error
continue;
}
}
}
ConnectionString config = configServer.getConnectionString();
//聲明分布式鎖
DistributedLock balanceLock( config , "balancer" );
while ( ! inShutdown() ) {//一直循環(huán)直到程序中斷或關(guān)閉
try {
// 判斷chunk均衡功能是否有效
if ( ! grid.shouldBalance() ) {
log(1) << "skipping balancing round because balancing is disabled" << endl;
sleepsecs( 30 );
continue;
}
//從鏈接池中獲取一個鏈接對象,如無鏈接則直接創(chuàng)建。更多內(nèi)容詳見connpool.cpp文件的
//DBClientBase* DBConnectionPool::get(const string& host) 方法.
ScopedDbConnection conn( config );
_ping( conn.conn() );//標(biāo)識鏈到config server的balancer為活動(live)狀態(tài)
if ( ! _checkOIDs() ) {
uassert( 13258 , "oids broken after resetting!" , _checkOIDs() );
}
//重載Shard集合信息(shard 狀態(tài))
Shard::reloadShardInfo();
//聲明balance鎖對象balanceLock
dist_lock_try lk( &balanceLock , "doing balance round" );if ( ! lk.got() ) {
log(1) << "skipping balancing round because another balancer is active" << endl;
conn.done();
sleepsecs( 30 ); // no need to wake up soon
continue;
}
log(1) << "*** start balancing round" << endl;
vector<CandidateChunkPtr> candidateChunks;
//獲取在shard集合中建議遷移的chunk信息(包含要遷移到的目標(biāo)shard信息)
_doBalanceRound( conn.conn() , &candidateChunks );
if ( candidateChunks.size() == 0 ) {//是否有要移動的chunk
log(1) << "no need to move any chunk" << endl;
}
else//開始遷移并返回最終遷移數(shù)量 {
_balancedLastTime = _moveChunks( &candidateChunks );
}
log(1) << "*** end of balancing round" << endl;
conn.done();//將conn放到鏈接池中(為其它后續(xù)操作使用)
sleepsecs( _balancedLastTime ? 5 : 10 );
}
catch ( std::exception& e ) {
log() << "caught exception while doing balance: " << e.what() << endl;
// Just to match the opening statement if in log level 1
log(1) << "*** End of balancing round" << endl;
sleepsecs( 30 ); // sleep a fair amount b/c of error
continue;
}
}
}
上面方法中主要是先構(gòu)造鏈接串,進(jìn)而構(gòu)造連接實例(注:這里使用了鏈接池的概念,我會在后續(xù)章節(jié)中專門介紹其實現(xiàn)機(jī)制)。之后刷新sharding中的相關(guān)信息(確保其有效性),之后調(diào)用_doBalanceRound()方法來收集可能要遷移的chunk(s)信息并最終完成遷移(使用_moveChunks方法)。
下面我們就著重看一下這兩個方法的具體實現(xiàn).
首先是_doBalanceRound方法:
//balance.cpp
void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ) {
assert( candidateChunks );
// 1. 通過查詢ShardsNS::collections來檢查是否有可用sharded集合來均衡chunk
auto_ptr<DBClientCursor> cursor = conn.query( ShardNS::collection , BSONObj() );
vector< string > collections;
while ( cursor->more() ) {
BSONObj col = cursor->next();
// sharded collections will have a shard "key".
if ( ! col["key"].eoo() )
collections.push_back( col["_id"].String() );
}
cursor.reset();
if ( collections.empty() ) {
log(1) << "no collections to balance" << endl;
return;
}
//獲取一個需要均衡的shard信息列表,表中shard信息包括maxsize, currsiez, drain, hsopsqueued
vector<Shard> allShards;
Shard::getAllShards( allShards );
if ( allShards.size() < 2) {
log(1) << "can't balance without more active shards" << endl;
return;
}
//獲取allShards的相應(yīng)狀態(tài)信息交綁定到shardLimitMap相應(yīng)元素中,該shardLimitMap是一個從shardId到對象(BSONObj)的映射
map< string, BSONObj > shardLimitsMap;
for ( vector<Shard>::const_iterator it = allShards.begin(); it != allShards.end(); ++it ) {
const Shard& s = *it;
ShardStatus status = s.getStatus();
LimitsFields::currSize( status.mapped() ) << //當(dāng)前時間狀態(tài)的信息
hardFields::draining( s.isDraining() ) << //當(dāng)前的shard是否正在被移除
LimitsFields::hasOpsQueued( status.hasOpsQueued() )//是否有回寫的隊列信息
);
shardLimitsMap[ s.getName() ] = limitsObj;
}
//遍歷collections集合,根據(jù)均衡策略(balancing policy) ,檢查是否有要遷移的chunk信息
for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) {
const string& ns = *it;//集合的名空間
map< string,vector<BSONObj> > shardToChunksMap;//從shardId 到chunks 的映射
cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) );
while ( cursor->more() ) {
BSONObj chunk = cursor->next();
//以chunk所屬的shard為標(biāo)識,獲取一個chunks的集合來收集位于同一shard的chunk
vector<BSONObj>& chunks = shardToChunksMap[chunk["shard"].String()];
chunks.push_back( chunk.getOwned() );
}
cursor.reset();
if (shardToChunksMap.empty()) {
log(1) << "skipping empty collection (" << ns << ")";
continue;
}
for ( vector<Shard>::iterator i=allShards.begin(); i!=allShards.end(); ++i ) {
// this just makes sure there is an entry in shardToChunksMap for every shard
Shard s = *i;
shardToChunksMap[s.getName()].size();
}
//找出要遷移的chunk,包括源及目標(biāo)(要遷移到的)chunk的起始地址
CandidateChunk* p = _policy->balance( ns , shardLimitsMap , shardToChunksMap , _balancedLastTime /*number of moved chunks in last round*/);
if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) );//存到要均衡的chunk集合中
}
}
void Balancer::_doBalanceRound( DBClientBase& conn, vector<CandidateChunkPtr>* candidateChunks ) {
assert( candidateChunks );
// 1. 通過查詢ShardsNS::collections來檢查是否有可用sharded集合來均衡chunk
auto_ptr<DBClientCursor> cursor = conn.query( ShardNS::collection , BSONObj() );
vector< string > collections;
while ( cursor->more() ) {
BSONObj col = cursor->next();
// sharded collections will have a shard "key".
if ( ! col["key"].eoo() )
collections.push_back( col["_id"].String() );
}
cursor.reset();
if ( collections.empty() ) {
log(1) << "no collections to balance" << endl;
return;
}
//獲取一個需要均衡的shard信息列表,表中shard信息包括maxsize, currsiez, drain, hsopsqueued
vector<Shard> allShards;
Shard::getAllShards( allShards );
if ( allShards.size() < 2) {
log(1) << "can't balance without more active shards" << endl;
return;
}
//獲取allShards的相應(yīng)狀態(tài)信息交綁定到shardLimitMap相應(yīng)元素中,該shardLimitMap是一個從shardId到對象(BSONObj)的映射
map< string, BSONObj > shardLimitsMap;
for ( vector<Shard>::const_iterator it = allShards.begin(); it != allShards.end(); ++it ) {
const Shard& s = *it;
ShardStatus status = s.getStatus();
//最大值 (單位:兆字節(jié), 0為不限制)
BSONObj limitsObj = BSON( ShardFields::maxSize( s.getMaxSize() ) << LimitsFields::currSize( status.mapped() ) << //當(dāng)前時間狀態(tài)的信息
hardFields::draining( s.isDraining() ) << //當(dāng)前的shard是否正在被移除
LimitsFields::hasOpsQueued( status.hasOpsQueued() )//是否有回寫的隊列信息
);
shardLimitsMap[ s.getName() ] = limitsObj;
}
//遍歷collections集合,根據(jù)均衡策略(balancing policy) ,檢查是否有要遷移的chunk信息
for (vector<string>::const_iterator it = collections.begin(); it != collections.end(); ++it ) {
const string& ns = *it;//集合的名空間
map< string,vector<BSONObj> > shardToChunksMap;//從shardId 到chunks 的映射
cursor = conn.query( ShardNS::chunk , QUERY( "ns" << ns ).sort( "min" ) );
while ( cursor->more() ) {
BSONObj chunk = cursor->next();
//以chunk所屬的shard為標(biāo)識,獲取一個chunks的集合來收集位于同一shard的chunk
vector<BSONObj>& chunks = shardToChunksMap[chunk["shard"].String()];
chunks.push_back( chunk.getOwned() );
}
cursor.reset();
if (shardToChunksMap.empty()) {
log(1) << "skipping empty collection (" << ns << ")";
continue;
}
for ( vector<Shard>::iterator i=allShards.begin(); i!=allShards.end(); ++i ) {
// this just makes sure there is an entry in shardToChunksMap for every shard
Shard s = *i;
shardToChunksMap[s.getName()].size();
}
//找出要遷移的chunk,包括源及目標(biāo)(要遷移到的)chunk的起始地址
CandidateChunk* p = _policy->balance( ns , shardLimitsMap , shardToChunksMap , _balancedLastTime /*number of moved chunks in last round*/);
if ( p ) candidateChunks->push_back( CandidateChunkPtr( p ) );//存到要均衡的chunk集合中
}
}
上面的_doBalanceRound方法主要構(gòu)造shardLimitsMap,shardToChunksMap這兩個實例對象集合(map<>類型),其中:
shardLimitsMap:用于收集shard集合中一些“起數(shù)量限制”作用的參數(shù),如maxsize,draining,hasOpsQueued等,因為這幾個參數(shù)如果超出范圍或為true時,相應(yīng)shard 是不可以提供遷移服務(wù)的。
shardToChunksMap:用于收集當(dāng)前shard中的chunk信息,以便后面的遍歷操作。
收集了這些信息之后,通過調(diào)用 _policy->balance()方法來找出可能需要遷移的chunk().
下面就看一下該均衡策略的具體實現(xiàn)(具體內(nèi)容參見注釋):
//balacer_policy.cpp
BalancerPolicy::ChunkInfo* BalancerPolicy::balance( const string& ns,
const ShardToLimitsMap& shardToLimitsMap,
const ShardToChunksMap& shardToChunksMap,
int balancedLastTime ) {
pair<string,unsigned> min("",numeric_limits<unsigned>::max());
pair<string,unsigned> max("",0);
vector<string> drainingShards;
//遍歷shard集合,找到min,max的匹配對象,以及draining的Shard信息
for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ) {
// 遍歷shard,并查看其容量或可用空間是否被耗盡
const string& shard = i->first;
BSONObj shardLimits;
ShardToLimitsIter it = shardToLimitsMap.find( shard );
if ( it != shardToLimitsMap.end() ) shardLimits = it->second;//獲取shard的信息,包括maxsize, currsiez, drain, hsopsqueued
const bool maxedOut = isSizeMaxed( shardLimits );//shard是否已滿
const bool draining = isDraining( shardLimits );//shard是否移除
const bool opsQueued = hasOpsQueued( shardLimits );//shard是否有寫回隊列
//是否合適接收chunk,滿足下面三個條件之一,則視為不合適
// + maxed out shards
// + draining shards
// + shards with operations queued for writeback
const unsigned size = i->second.size();//獲取當(dāng)前shard里的chunk數(shù)
if ( ! maxedOut && ! draining && ! opsQueued ) {
if ( size < min.second ) {//如果當(dāng)前shard中chunk數(shù)與min比較,找出最小size的shard
min = make_pair( shard , size );
}
}
// 檢查shard 是否應(yīng)該遷移(chunk donor)
// Draining shards 比 overloaded shards優(yōu)先級低
if ( size > max.second ) {
max = make_pair( shard , size );//找出最大size的shard
}
if ( draining && (size > 0)) {
drainingShards.push_back( shard );
}
}
// 如果chunk沒有合適的shard接收, 意味著上面循環(huán)中都是類以draining等情況
if ( min.second == numeric_limits<unsigned>::max() ) {
log() << "no availalable shards to take chunks" << endl;
return NULL;
}
log(1) << "collection : " << ns << endl;
log(1) << "donor : " << max.second << " chunks on " << max.first << endl;
log(1) << "receiver : " << min.second << " chunks on " << min.first << endl;
if ( ! drainingShards.empty() ) {
string drainingStr;
joinStringDelim( drainingShards, &drainingStr, ',' );//用逗號將drainingShards連接起來
log(1) << "draining : " << ! drainingShards.empty() << "(" << drainingShards.size() << ")" << endl;
}
// 通過優(yōu)先級解決不均衡問題.
const int imbalance = max.second - min.second;//找出shard中最不均衡的size的差距
const int threshold = balancedLastTime ? 2 : 8;
string from, to;
if ( imbalance >= threshold /*臨界點*/) {
from = max.first;//將shard中chunk最多的作為源
to = min.first;//將shard中chunk最小的作為要遷移的目的地
}
else if ( ! drainingShards.empty() ) {
//對于那些draining的shard,隨機(jī)取出其中一個
from = drainingShards[ rand() % drainingShards.size() ];
to = min.first;
}
else {
// 如已均衡,則返回
return NULL;
}
const vector<BSONObj>& chunksTo = shardToChunksMap.find( to )->second;//找出要遷移到的chunk集合目標(biāo)位置
BSONObj chunkToMove = pickChunk( chunksFrom , chunksTo );//最終選出(校正)要遷移的chunk的起始位置
log() << "chose [" << from << "] to [" << to << "] " << chunkToMove << endl;
//返回上面balaner的操作結(jié)果來執(zhí)行后續(xù)的移動chunk操作
return new ChunkInfo( ns, to, from, chunkToMove );
}
BalancerPolicy::ChunkInfo* BalancerPolicy::balance( const string& ns,
const ShardToLimitsMap& shardToLimitsMap,
const ShardToChunksMap& shardToChunksMap,
int balancedLastTime ) {
pair<string,unsigned> min("",numeric_limits<unsigned>::max());
pair<string,unsigned> max("",0);
vector<string> drainingShards;
//遍歷shard集合,找到min,max的匹配對象,以及draining的Shard信息
for (ShardToChunksIter i = shardToChunksMap.begin(); i!=shardToChunksMap.end(); ++i ) {
// 遍歷shard,并查看其容量或可用空間是否被耗盡
const string& shard = i->first;
BSONObj shardLimits;
ShardToLimitsIter it = shardToLimitsMap.find( shard );
if ( it != shardToLimitsMap.end() ) shardLimits = it->second;//獲取shard的信息,包括maxsize, currsiez, drain, hsopsqueued
const bool maxedOut = isSizeMaxed( shardLimits );//shard是否已滿
const bool draining = isDraining( shardLimits );//shard是否移除
const bool opsQueued = hasOpsQueued( shardLimits );//shard是否有寫回隊列
//是否合適接收chunk,滿足下面三個條件之一,則視為不合適
// + maxed out shards
// + draining shards
// + shards with operations queued for writeback
const unsigned size = i->second.size();//獲取當(dāng)前shard里的chunk數(shù)
if ( ! maxedOut && ! draining && ! opsQueued ) {
if ( size < min.second ) {//如果當(dāng)前shard中chunk數(shù)與min比較,找出最小size的shard
min = make_pair( shard , size );
}
}
// 檢查shard 是否應(yīng)該遷移(chunk donor)
// Draining shards 比 overloaded shards優(yōu)先級低
if ( size > max.second ) {
max = make_pair( shard , size );//找出最大size的shard
}
if ( draining && (size > 0)) {
drainingShards.push_back( shard );
}
}
// 如果chunk沒有合適的shard接收, 意味著上面循環(huán)中都是類以draining等情況
if ( min.second == numeric_limits<unsigned>::max() ) {
log() << "no availalable shards to take chunks" << endl;
return NULL;
}
log(1) << "collection : " << ns << endl;
log(1) << "donor : " << max.second << " chunks on " << max.first << endl;
log(1) << "receiver : " << min.second << " chunks on " << min.first << endl;
if ( ! drainingShards.empty() ) {
string drainingStr;
joinStringDelim( drainingShards, &drainingStr, ',' );//用逗號將drainingShards連接起來
log(1) << "draining : " << ! drainingShards.empty() << "(" << drainingShards.size() << ")" << endl;
}
// 通過優(yōu)先級解決不均衡問題.
const int imbalance = max.second - min.second;//找出shard中最不均衡的size的差距
const int threshold = balancedLastTime ? 2 : 8;
string from, to;
if ( imbalance >= threshold /*臨界點*/) {
from = max.first;//將shard中chunk最多的作為源
to = min.first;//將shard中chunk最小的作為要遷移的目的地
}
else if ( ! drainingShards.empty() ) {
//對于那些draining的shard,隨機(jī)取出其中一個
from = drainingShards[ rand() % drainingShards.size() ];
to = min.first;
}
else {
// 如已均衡,則返回
return NULL;
}
//找出要遷移的chunk集合的起始位置
const vector<BSONObj>& chunksFrom = shardToChunksMap.find( from )->second;const vector<BSONObj>& chunksTo = shardToChunksMap.find( to )->second;//找出要遷移到的chunk集合目標(biāo)位置
BSONObj chunkToMove = pickChunk( chunksFrom , chunksTo );//最終選出(校正)要遷移的chunk的起始位置
log() << "chose [" << from << "] to [" << to << "] " << chunkToMove << endl;
//返回上面balaner的操作結(jié)果來執(zhí)行后續(xù)的移動chunk操作
return new ChunkInfo( ns, to, from, chunkToMove );
}
上面方法通過計算各個shard中的當(dāng)前chunk數(shù)量來推算出那個shard相對較空,并將其放到to(目標(biāo)shard),之后對可能要遷移的chunk進(jìn)行校驗,這里使用了pickChunk()方法,該方法具體實現(xiàn)如下:
//balancer_policy.cpp
//找出需要被遷移的chunk, 這里要考慮to端可能比from端chunks更多的情況
BSONObj BalancerPolicy::pickChunk( const vector<BSONObj>& from, const vector<BSONObj>& to ) {
// It is possible for a donor ('from') shard to have less chunks than a recevier one ('to')
// if the donor is in draining mode.
if ( to.size() == 0 )//如果目標(biāo)位置為空,表示可以將from中數(shù)據(jù)全部遷移過去
return from[0];
/**wo='well ordered'. fields must be in same order in each object.
Ordering is with respect to the signs of the elements
and allows ascending / descending key mixing.
@return <0 if l<r. 0 if l==r. >0 if l>r
*/
//如果要遷移的chunk中最小值與目標(biāo)位置的最大值相同,表示可以將from中數(shù)據(jù)全部遷移過去
if ( from[0]["min"].Obj().woCompare( to[to.size()-1]["max"].Obj() , BSONObj() , false ) == 0 )
return from[0];
//如果要遷移的chunk中最大值與目標(biāo)位置的最小值相同,表示可以將from中最后一個chunk遷移過去
if ( from[from.size()-1]["max"].Obj().woCompare( to[0]["min"].Obj() , BSONObj() , false ) == 0 )
return from[from.size()-1];
return from[0];
}
//找出需要被遷移的chunk, 這里要考慮to端可能比from端chunks更多的情況
BSONObj BalancerPolicy::pickChunk( const vector<BSONObj>& from, const vector<BSONObj>& to ) {
// It is possible for a donor ('from') shard to have less chunks than a recevier one ('to')
// if the donor is in draining mode.
if ( to.size() == 0 )//如果目標(biāo)位置為空,表示可以將from中數(shù)據(jù)全部遷移過去
return from[0];
/**wo='well ordered'. fields must be in same order in each object.
Ordering is with respect to the signs of the elements
and allows ascending / descending key mixing.
@return <0 if l<r. 0 if l==r. >0 if l>r
*/
//如果要遷移的chunk中最小值與目標(biāo)位置的最大值相同,表示可以將from中數(shù)據(jù)全部遷移過去
if ( from[0]["min"].Obj().woCompare( to[to.size()-1]["max"].Obj() , BSONObj() , false ) == 0 )
return from[0];
//如果要遷移的chunk中最大值與目標(biāo)位置的最小值相同,表示可以將from中最后一個chunk遷移過去
if ( from[from.size()-1]["max"].Obj().woCompare( to[0]["min"].Obj() , BSONObj() , false ) == 0 )
return from[from.size()-1];
return from[0];
}
完成了校驗之后,得到的就是真正要遷移的chunk的啟始地址,之后就可以進(jìn)行遷移了。到這里,我們還要將執(zhí)行流程跳回到Balancer::run()方法里,看一下最終完成遷移工作的方法movechunk()的實現(xiàn)流程:
//balance.cpp文件
int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks ) {
//最終遷移的chunk數(shù)
int movedCount = 0;
//遍歷要遷移chunks并逐一開始遷移
for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) {
const CandidateChunk& chunkInfo = *it->get();
//獲取當(dāng)前chunk要使用的db配置信息
DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns );
assert( cfg );
//聲明ChunkManager使用它來
ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns );
assert( cm );
//獲取要遷移的chunk起始地址
const BSONObj& chunkToMove = chunkInfo.chunk;
ChunkPtr c = cm->findChunk( chunkToMove["min"].Obj() );
//下面判斷執(zhí)行兩次,防止執(zhí)行split之后,系統(tǒng)在reload 情況下chunk可能出現(xiàn)min,max不一致情況
if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
// 這里主要防止別處執(zhí)行 split 操作造成負(fù)作用
cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */);
assert( cm );
c = cm->findChunk( chunkToMove["min"].Obj() );
if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
log() << "chunk mismatch after reload, ignoring will retry issue cm: "
<< c->getMin() << " min: " << chunkToMove["min"].Obj() << endl;
continue;
}
}
BSONObj res;
//將chunk, 從當(dāng)前的shard ,移動到指定的shard,并累加遷移數(shù)量
if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , res ) ) {
movedCount++;
continue;
}
//如遷移不成功,記入日志
// the move requires acquiring the collection metadata's lock, which can fail
log() << "balacer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to
<< " chunk: " << chunkToMove << endl;
//chunk是否達(dá)到允許移動的最大尺寸,如果是,則對當(dāng)前shard執(zhí)行split操作
if ( res["chunkTooBig"].trueValue() ) {
// reload just to be safe
cm = cfg->getChunkManager( chunkInfo.ns );
assert( cm );
c = cm->findChunk( chunkToMove["min"].Obj() );
log() << "forcing a split because migrate failed for size reasons" << endl;
res = BSONObj();
//對當(dāng)前的shards進(jìn)行分割(獲取適合的分割點),該方法有些復(fù)雜,我會抽時間寫文章介紹
c->singleSplit( true , res );
log() << "forced split results: " << res << endl;
// TODO: if the split fails, mark as jumbo SERVER-2571
}
}
return movedCount;
}
int Balancer::_moveChunks( const vector<CandidateChunkPtr>* candidateChunks ) {
//最終遷移的chunk數(shù)
int movedCount = 0;
//遍歷要遷移chunks并逐一開始遷移
for ( vector<CandidateChunkPtr>::const_iterator it = candidateChunks->begin(); it != candidateChunks->end(); ++it ) {
const CandidateChunk& chunkInfo = *it->get();
//獲取當(dāng)前chunk要使用的db配置信息
DBConfigPtr cfg = grid.getDBConfig( chunkInfo.ns );
assert( cfg );
//聲明ChunkManager使用它來
ChunkManagerPtr cm = cfg->getChunkManager( chunkInfo.ns );
assert( cm );
//獲取要遷移的chunk起始地址
const BSONObj& chunkToMove = chunkInfo.chunk;
ChunkPtr c = cm->findChunk( chunkToMove["min"].Obj() );
//下面判斷執(zhí)行兩次,防止執(zhí)行split之后,系統(tǒng)在reload 情況下chunk可能出現(xiàn)min,max不一致情況
if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
// 這里主要防止別處執(zhí)行 split 操作造成負(fù)作用
cm = cfg->getChunkManager( chunkInfo.ns , true /* reload */);
assert( cm );
c = cm->findChunk( chunkToMove["min"].Obj() );
if ( c->getMin().woCompare( chunkToMove["min"].Obj() ) || c->getMax().woCompare( chunkToMove["max"].Obj() ) ) {
log() << "chunk mismatch after reload, ignoring will retry issue cm: "
<< c->getMin() << " min: " << chunkToMove["min"].Obj() << endl;
continue;
}
}
BSONObj res;
//將chunk, 從當(dāng)前的shard ,移動到指定的shard,并累加遷移數(shù)量
if ( c->moveAndCommit( Shard::make( chunkInfo.to ) , Chunk::MaxChunkSize , res ) ) {
movedCount++;
continue;
}
//如遷移不成功,記入日志
// the move requires acquiring the collection metadata's lock, which can fail
log() << "balacer move failed: " << res << " from: " << chunkInfo.from << " to: " << chunkInfo.to
<< " chunk: " << chunkToMove << endl;
//chunk是否達(dá)到允許移動的最大尺寸,如果是,則對當(dāng)前shard執(zhí)行split操作
if ( res["chunkTooBig"].trueValue() ) {
// reload just to be safe
cm = cfg->getChunkManager( chunkInfo.ns );
assert( cm );
c = cm->findChunk( chunkToMove["min"].Obj() );
log() << "forcing a split because migrate failed for size reasons" << endl;
res = BSONObj();
//對當(dāng)前的shards進(jìn)行分割(獲取適合的分割點),該方法有些復(fù)雜,我會抽時間寫文章介紹
c->singleSplit( true , res );
log() << "forced split results: " << res << endl;
// TODO: if the split fails, mark as jumbo SERVER-2571
}
}
return movedCount;
}
上面代碼就是依次遍歷要遷移的chunk,分別根據(jù)其ns信息獲取相應(yīng)的ChunkManager(該類主要執(zhí)行chunk的管理,比如CRUD等),之后就通過該ChunkManager找出當(dāng)前chunk中最小的值(min:參見chunk.h文件,我這里把min,max理解為當(dāng)前chunk中最小和最大記錄對象信息)chunk信息,并開始遷移。
按照慣例,這里還是用一個時序列來大體回顧一下balancer的執(zhí)行流程,如下:

好了,今天的內(nèi)容就先到這里了。
原文鏈接:http://www.rzrgm.cn/daizhj/archive/2011/05/23/mongos_balancer_source_code.html
作者: daizhj, 代震軍
微博: http://t.sina.com.cn/daizhj
Tags: mongodb,c++,balance,chunk,shard,source code

浙公網(wǎng)安備 33010602011771號