nsq源碼分析
nsq的源碼比較簡(jiǎn)單,值得一讀,特別是golang開發(fā)人員,下面重點(diǎn)介紹nsqd,nsqd是nsq的核心,其他的都是輔助工具,看完這篇文章希望你能對(duì)消息隊(duì)列的原理和實(shí)現(xiàn)有一定的了解。
nsqd是一個(gè)守護(hù)進(jìn)程,負(fù)責(zé)接收,排隊(duì),投遞消息給客戶端,并不保證消息的嚴(yán)格順序,nsqd默認(rèn)監(jiān)聽一個(gè)tcp端口 (4150) 和一個(gè)http端口 (4151) 以及一個(gè)可選的https端口
對(duì)訂閱了同一個(gè)topic的同一個(gè)channel的消費(fèi)者使用負(fù)載均衡策略,其實(shí)就是多個(gè)協(xié)程消費(fèi)同一個(gè)channel
只要channel存在,即使沒有該channel的消費(fèi)者,也會(huì)將生產(chǎn)者的message緩存到隊(duì)列(內(nèi)存隊(duì)列和磁盤隊(duì)列)中,當(dāng)有新的消費(fèi)者產(chǎn)生后,就開始消費(fèi)隊(duì)列中的所有消息
保證隊(duì)列中的 message 至少會(huì)被消費(fèi)一次(在進(jìn)程意外退出的時(shí)候這點(diǎn)都保證不了),并不能保證成功消費(fèi)一次,即使 nsqd退出,也會(huì)將隊(duì)列中的消息暫存磁盤上(進(jìn)程退出的時(shí)候會(huì)將緩存中的消息存到磁盤上,意外情況如掉電就不行了,緩存中的消息就沒有機(jī)會(huì)存盤而丟失,在實(shí)戰(zhàn)中一般不會(huì)使用緩存隊(duì)列即內(nèi)存buffer為0,全部使用磁盤隊(duì)列)
限定內(nèi)存占用,能夠配置nsqd中每個(gè)channel隊(duì)列在內(nèi)存中緩存的message數(shù)量,一旦channel的buffer寫滿,就將message寫到磁盤中,這點(diǎn)使用golang select的優(yōu)先級(jí)功能,default優(yōu)先級(jí)最低
topic,channel 一旦建立,將會(huì)一直存在,要及時(shí)在管理臺(tái)或者用代碼清除無效的 topic 和 channel,避免資源的浪費(fèi),每個(gè)topic和channel都有獨(dú)立的協(xié)程處理自身的消息,默認(rèn)的buffer和其他的一些信息
nsq消息沒有備份,一旦出現(xiàn)進(jìn)程意外情況退出,可能會(huì)出現(xiàn)消息丟失,如沒有消費(fèi)成功的消息,寫入文件但沒有真正落盤的消息,這種意外情況很難杜絕,像意外退出這種情況kafka,redis等都會(huì)遇到這樣的問題,最后都會(huì)采用一個(gè)折中的策略,定時(shí)將數(shù)據(jù)落盤
//原文:http://www.rzrgm.cn/hlxs/p/11445103.html 作者:啊漢
type Topic struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms messageCount uint64 //消息總數(shù)量 messageBytes uint64 //消息總長(zhǎng)度 sync.RWMutex name string //topic name channelMap map[string]*Channel //保存topic下面的所有channel backend BackendQueue //磁盤隊(duì)列 memoryMsgChan chan *Message //內(nèi)存隊(duì)列 startChan chan int exitChan chan int channelUpdateChan chan int waitGroup util.WaitGroupWrapper exitFlag int32 //退出標(biāo)記 idFactory *guidFactory //生成msg id的工廠 ephemeral bool //是否臨時(shí)topic deleteCallback func(*Topic) //刪除topic方法指針 deleter sync.Once paused int32 //暫停標(biāo)記,1暫停, 0正常 pauseChan chan int ctx *context }
Topic創(chuàng)建
nsqd用map[string]*Topic來保存所有topic,producter在發(fā)消息的時(shí)候回指定topic,nsqd在收到消息后會(huì)判斷topic是否存在,不存在就會(huì)自動(dòng)創(chuàng)建,每創(chuàng)建一個(gè)新的topic就會(huì)啟動(dòng)一個(gè)協(xié)程,用于處理topic相關(guān)的消息,如將內(nèi)存/磁盤中的消息復(fù)制給topic中的每個(gè)channel、channel數(shù)量變化、channel暫停、topic退出
消息結(jié)構(gòu)
// Command represents a command from a client to an NSQ daemon
//原文:http://www.rzrgm.cn/hlxs/p/11445103.html 作者:啊漢
type Command struct { Name []byte //命令名稱,可選:IDENTIFY、FIN、RDY、REQ、PUB、MPUB、DPUB、NOP、TOUCH、SUB、CLS、AUTH Params [][]byte //不同的命令做不同解析,涉及到topic的,Params[0]為topic name Body []byte //消息內(nèi)容 } // WriteTo implements the WriterTo interface and // serializes the Command to the supplied Writer. // // It is suggested that the target Writer is buffered // to avoid performing many system calls. func (c *Command) WriteTo(w io.Writer) (int64, error) { var total int64 var buf [4]byte n, err := w.Write(c.Name) //命名名稱,nsqd根據(jù)這個(gè)名稱執(zhí)行相關(guān)功能 total += int64(n) if err != nil { return total, err } for _, param := range c.Params { n, err := w.Write(byteSpace) //空格 total += int64(n) if err != nil { return total, err } n, err = w.Write(param) //參數(shù) total += int64(n) if err != nil { return total, err } } n, err = w.Write(byteNewLine) //空行\(zhòng)n total += int64(n) if err != nil { return total, err } //消息內(nèi)容 if c.Body != nil { bufs := buf[:] binary.BigEndian.PutUint32(bufs, uint32(len(c.Body))) n, err := w.Write(bufs) //消息長(zhǎng)度4字節(jié) total += int64(n) if err != nil { return total, err } n, err = w.Write(c.Body) //消息內(nèi)容 total += int64(n) if err != nil { return total, err } } return total, nil }
nsqd收到這個(gè)結(jié)構(gòu)做解析,就能知道命令名稱(干什么),topic name,消息內(nèi)容等,不同的命令,命令參數(shù)不一樣
func (p *protocolV2) Exec(client *clientV2, params [][]byte) ([]byte, error) { if bytes.Equal(params[0], []byte("IDENTIFY")) { return p.IDENTIFY(client, params) } err := enforceTLSPolicy(client, p, params[0]) if err != nil { return nil, err } switch { case bytes.Equal(params[0], []byte("FIN")): return p.FIN(client, params) case bytes.Equal(params[0], []byte("RDY")): return p.RDY(client, params) case bytes.Equal(params[0], []byte("REQ")): return p.REQ(client, params) case bytes.Equal(params[0], []byte("PUB")): return p.PUB(client, params) case bytes.Equal(params[0], []byte("MPUB")): return p.MPUB(client, params) case bytes.Equal(params[0], []byte("DPUB")): return p.DPUB(client, params) case bytes.Equal(params[0], []byte("NOP")): return p.NOP(client, params) case bytes.Equal(params[0], []byte("TOUCH")): return p.TOUCH(client, params) case bytes.Equal(params[0], []byte("SUB")): return p.SUB(client, params) case bytes.Equal(params[0], []byte("CLS")): return p.CLS(client, params) case bytes.Equal(params[0], []byte("AUTH")): return p.AUTH(client, params) } return nil, protocol.NewFatalClientErr(nil, "E_INVALID", fmt.Sprintf("invalid command %s", params[0])) }
Topic收到消息
nsqd收到上面這個(gè)結(jié)構(gòu),解析之后,就會(huì)執(zhí)行相關(guān)功能,我們以PUB命令為例:
1:讀到空行處,能拿到命令名稱和參數(shù),命令名稱=PUB,命令參數(shù)為topicName
2:檢查topicName是否有效
3:獲取消息內(nèi)容長(zhǎng)度,讀取4個(gè)字節(jié)
4:分配對(duì)應(yīng)內(nèi)容長(zhǎng)度空間,讀取對(duì)應(yīng)長(zhǎng)度字節(jié)存入
5:獲取topicName信息,沒有就創(chuàng)建
6:構(gòu)造消息結(jié)構(gòu)體nsqd.Message,自動(dòng)生成消息id
7:將消息提交給對(duì)應(yīng)的topic,Topic.PutMessage
8:將消息寫入topic對(duì)應(yīng)的內(nèi)存消息通道,內(nèi)存消息通道默認(rèn)大小為10000,如通道滿了則寫入磁盤
Topic中的消息分發(fā)給channel
在創(chuàng)建topic的時(shí)候回啟動(dòng)一個(gè)協(xié)程處理各種消息,其中就包括消費(fèi)topic中的消息,topic只是將消息投遞到其中的每個(gè)channel中,如topic下面有10個(gè)channel,則要復(fù)制9個(gè)nsqd.Message,每個(gè)channel一個(gè)nsqd.Message,但是消息id和消息內(nèi)容是一樣的,消息內(nèi)容并不會(huì)被復(fù)制,topic收到消息將消息分發(fā)給channel就完事了,消息怎么發(fā)給消費(fèi)者,由channel負(fù)責(zé)
type Channel struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms requeueCount uint64 //重新入隊(duì)數(shù)量 messageCount uint64 //消息數(shù)量 timeoutCount uint64 //超時(shí)數(shù)量,已經(jīng)消費(fèi),但沒有反饋結(jié)果,會(huì)重新加入隊(duì)列,messageCount不會(huì)自增 sync.RWMutex topicName string //topic name name string //channel name ctx *context backend BackendQueue //將消息寫入磁盤的隊(duì)列,維護(hù)磁盤消息的讀寫 memoryMsgChan chan *Message //內(nèi)存消息隊(duì)列,通道buffer默認(rèn)10000 exitFlag int32 //退出標(biāo)記,1表示退出,0沒有退出 exitMutex sync.RWMutex // state tracking clients map[int64]Consumer //連接到這個(gè)topic-channel的所有client paused int32 //暫停標(biāo)記,0不暫停,1暫停,暫停就不會(huì)往這個(gè)channel中copy消息 ephemeral bool //臨時(shí)channel標(biāo)記,臨時(shí)channel不會(huì)存到文件中 deleteCallback func(*Channel) //用于從topic中刪除channel deleter sync.Once // Stats tracking e2eProcessingLatencyStream *quantile.Quantile // TODO: these can be DRYd up deferredMessages map[MessageID]*pqueue.Item //延遲消息map,方便查找 deferredPQ pqueue.PriorityQueue //延遲消息隊(duì)列 deferredMutex sync.Mutex inFlightMessages map[MessageID]*Message //消費(fèi)中的消息map,方便查找 inFlightPQ inFlightPqueue //消費(fèi)中的消息隊(duì)列 inFlightMutex sync.Mutex }
client訂閱topic消息
訂閱發(fā)送的還是Command這個(gè)結(jié)構(gòu),只不過訂閱沒有消息內(nèi)容而已,指定topic和channel就行,如果topic和channel不存在都會(huì)自動(dòng)創(chuàng)建,client和server建立的是tcp長(zhǎng)連接,server會(huì)啟動(dòng)兩個(gè)協(xié)程,一個(gè)用于發(fā)消息,一個(gè)用于接收消息,建立連接后,channel會(huì)把client加入它的map[int64]Consumer中,key為clientId,當(dāng)topic收到消息后,會(huì)分發(fā)給channel,channel通過發(fā)消息的協(xié)程發(fā)給client
channel將消息推給消費(fèi)者
channel中的消息存在兩個(gè)地方:內(nèi)存通道和磁盤隊(duì)列,topic將消息分發(fā)給channel時(shí),通過go的select將消息分發(fā)給內(nèi)存通道或是磁盤隊(duì)列,由于select的default分支優(yōu)先級(jí)比case低,所以只要內(nèi)存通道沒滿,就會(huì)往內(nèi)存通道中寫,否則就寫入磁盤,
diskqueue.diskQueue維護(hù)著磁盤數(shù)據(jù)的讀寫,每個(gè)非臨時(shí)的topic和channel都有這樣一個(gè)字段。
發(fā)消息的協(xié)程就會(huì)一直讀內(nèi)存通道和磁盤隊(duì)列中的數(shù)據(jù),將消息發(fā)給client
nsq消息類型有三種如下:
// frame types const ( FrameTypeResponse int32 = 0 //響應(yīng) FrameTypeError int32 = 1 //錯(cuò)誤 FrameTypeMessage int32 = 2 //消息 )
消息發(fā)送給client之后,也不知道消息到底有沒有消費(fèi)成功,有可能client收到消息之后就崩潰了,所以消息發(fā)給client之后,需要client給server發(fā)一個(gè)FIN消息告訴server,這個(gè)消息我消費(fèi)成功,所以在將消息發(fā)送給client之后,消息出了內(nèi)存隊(duì)列/磁盤隊(duì)列,進(jìn)入了一個(gè)新的隊(duì)列,叫飛行隊(duì)列,表示這個(gè)消息正在運(yùn)輸消費(fèi)中,為了維護(hù)在消費(fèi)中的消息,nsq使用了兩個(gè)數(shù)據(jù)結(jié)構(gòu):
type inFlightPqueue []*Message inFlightPQ inFlightPqueue //按照超時(shí)時(shí)間排序的最小堆 inFlightMessages map[MessageID]*Message //保存消息
消息發(fā)送給client之后,同時(shí)會(huì)將消息存入inFlightPQ和inFlightMessages中,inFlightPQ中的消息都設(shè)置了超時(shí)時(shí)間默認(rèn)是1分鐘,如果1分鐘后還沒有收到client發(fā)過來的FIN消息,會(huì)將消息重新加入待消費(fèi)隊(duì)列,讓client重新消費(fèi),目的是想保證每個(gè)消息至少被消費(fèi)一次,由于消息可保存在內(nèi)存中,進(jìn)程可能隨時(shí)掛掉并不能保證每個(gè)消息都至少被消費(fèi)一次,如果不用內(nèi)存隊(duì)列,完全使用磁盤隊(duì)列,當(dāng)進(jìn)程意外崩掉的時(shí)候,消息是否丟失要看磁盤隊(duì)列的具體實(shí)現(xiàn),完全使用磁盤隊(duì)列性能差點(diǎn),安全性更高
inFlightMessages就是為了方便通過消息id查找消息,收到client發(fā)送過來的FIN消息時(shí)就會(huì)將消息從inFlightPQ和inFlightMessages中刪除,表示這個(gè)消息已經(jīng)消費(fèi)成功,數(shù)據(jù)也就被扔掉了
延遲消息
發(fā)延遲消息和發(fā)普通消息的區(qū)別是producter在生成延遲消息的時(shí)候指定了延遲時(shí)間,單位毫秒,命令:DPUB
延遲消息存在內(nèi)存中,并沒有存到磁盤中,延遲消息要是存在磁盤中,實(shí)現(xiàn)起來還是比較復(fù)雜
延遲消息同樣使用了一個(gè)隊(duì)列和一個(gè)map,結(jié)構(gòu)如下:
type Item struct { Value interface{} //*Message Priority int64 //執(zhí)行的時(shí)間戳,單位毫秒 Index int //隊(duì)列索引 } type PriorityQueue []*Item deferredPQ pqueue.PriorityQueue deferredMessages map[MessageID]*pqueue.Item deferredPQ和inFlightPQ一樣,是按照時(shí)間排序的最小堆
那么nsq是怎么判斷消息超時(shí),延遲消息的執(zhí)行時(shí)間到了呢?
nsq有一個(gè)專門的協(xié)程來處理這兩種情況,實(shí)現(xiàn)也很簡(jiǎn)單,就是每100毫秒檢查一次,看是否有超時(shí)的消息,延遲消息是否執(zhí)行時(shí)間是否到了,如果消息超時(shí),則重新將消息加入待消費(fèi)隊(duì)列,每次將消息發(fā)送給client的時(shí)候,重試次數(shù)都會(huì)加一,即Message.Attempts++
延遲消息執(zhí)行時(shí)間要是到了,就會(huì)當(dāng)做一個(gè)普通的消息加入待消費(fèi)隊(duì)列,后面的流程都是一樣的,默認(rèn)最大延遲時(shí)間為1小時(shí),所有的默認(rèn)值在進(jìn)程啟動(dòng)時(shí)都是可重新指定的
nsqd啟動(dòng)過程
1:加載啟動(dòng)參數(shù)
啟動(dòng)參數(shù)定義了結(jié)構(gòu)nsqd.Options,并初始化好了默認(rèn)值,在進(jìn)程啟動(dòng)的時(shí)候可以指定對(duì)應(yīng)的值,通過反射將這些參數(shù)賦給nsqd.Options,通過nsqd.Options就能方便的使用各個(gè)參數(shù)
2:加載topic和channel并啟動(dòng)
在nsqd啟動(dòng)的時(shí)候會(huì)加載配置文件nsqd.dat,驗(yàn)證topic和channel名稱格式是否有效,然后啟動(dòng)所有topic,該暫停的就暫停,當(dāng)topic和channel發(fā)生變更的時(shí)候回將所有信息重新保存到nsqd.dat中,如新增/刪除/暫停/啟動(dòng)topic和channel會(huì)保存文件
topic和channel保存到文件中的結(jié)構(gòu)
type meta struct { Topics []struct { Name string `json:"name"` Paused bool `json:"paused"` Channels []struct { Name string `json:"name"` Paused bool `json:"paused"` } `json:"channels"` } `json:"topics"` }
3:?jiǎn)?dòng)tcp/http/https服務(wù)
nsq可以通過tcp和http通過服務(wù),http和https提供的服務(wù)是一樣,區(qū)別在于協(xié)議本身,當(dāng)client通過tcp和server建立連接后,server會(huì)啟動(dòng)兩個(gè)協(xié)程,一個(gè)用于發(fā)消息,一個(gè)用于收消息
tcp提供的服務(wù)如下:
| 服務(wù)命令 | 服務(wù)描述 |
| INENTIFY | 認(rèn)證 |
| FIN | 消費(fèi)完成 |
| RDY | 指定可同時(shí)處理的消息數(shù)量 |
| REQ | 消息重新加入隊(duì)列 |
| PUB | 發(fā)布單條消息 |
| MPUB | 發(fā)布多條消息 |
| DPUB | 發(fā)布單條延遲消息 |
| NOP | 不做任何處理 |
| TOUCH | 重新設(shè)置消息處理超時(shí)時(shí)間 |
| SUB | 訂閱,訂閱后才能消費(fèi)消息 |
| CLS | 關(guān)閉停止消費(fèi) |
| AUTH | 授權(quán) |
client和server建立連接后,client通過命令I(lǐng)NENTIFY將認(rèn)證信息發(fā)給服務(wù)端,如果server在啟動(dòng)的時(shí)候指定了授權(quán)地址,server就會(huì)告訴client你需要認(rèn)證,client就會(huì)通過命令A(yù)UTH將秘鑰發(fā)給server,server去授權(quán)地址進(jìn)行驗(yàn)證,驗(yàn)證通過后,就可以進(jìn)行正常的消息發(fā)布和訂閱了
http和https提供服務(wù)如下:
| 服務(wù)名稱 |
| 發(fā)布單條/多條消息 |
| topic新增/刪除/情況topic中消息/暫停/啟動(dòng) |
| channel新增/刪除/情況topic中消息/暫停/啟動(dòng) |
| nsq狀態(tài)信息 |
| ping |
| 啟動(dòng)參數(shù)查詢和修改 |
tcp服務(wù)能發(fā)布和消費(fèi)消息,http/https則只能發(fā)布消息,發(fā)布消息最后調(diào)的是同一個(gè)接口
端口信息
| 協(xié)議名稱 | 默認(rèn)端口 |
| tcp | 4150 |
| http | 4151 |
| https | 4152 |
心跳
心跳默認(rèn)30秒,在認(rèn)證(INENTIFY)的時(shí)候client可以指定心跳時(shí)間間隔,server按照心跳給client發(fā)消息,消息內(nèi)容:_heartbeat_,如果發(fā)送失敗,發(fā)送消息的協(xié)程就會(huì)退出,這樣server就不在給client發(fā)消息了,server如果從client讀消息失敗,接收消息的協(xié)程就會(huì)退出,關(guān)閉和client的連接,從channel中將client移除,這樣就不在收client發(fā)來的消息,server中也就沒有client的任何信息了
consumer和producter連著nsqd的同一個(gè)端口,為什么consumer能消費(fèi)消息,而producter卻不會(huì)呢?
nsq是個(gè)基于發(fā)布和訂閱的消息隊(duì)列,只有訂閱了才能消費(fèi)消息,consumer和producter雖然連著同一個(gè)端口,consumer在建立連接后,會(huì)發(fā)送SUB命令,告訴server我要訂閱,而producter并沒有,consumer在發(fā)送SUB命令后還會(huì)發(fā)送RDY命令告訴server能同時(shí)處理消息的個(gè)數(shù),當(dāng)rdyCount=0時(shí),server也不會(huì)給consumer推消息,所以SUB和RDY這兩個(gè)命令缺一不可
nsq消息文件的存取
nsq可以將消息存在內(nèi)存中或是文件中,存在內(nèi)存的好處就是速度快,確定就是一旦進(jìn)程退出消息就丟失了,所以在實(shí)戰(zhàn)中消息都會(huì)寫到磁盤文件,雖然慢點(diǎn)但不容易丟消息
封裝消息存取文件的實(shí)現(xiàn)在github.com/nsqio/go-diskqueue/diskqueue.go中
topic收到消息后,可以將消息存在內(nèi)存中或是文件中,當(dāng)內(nèi)存channel寫滿之后就會(huì)寫入文件,當(dāng)我們把channel的buffer設(shè)置成0后,所有的消息就會(huì)寫文件
每個(gè)topic都會(huì)啟動(dòng)一個(gè)協(xié)程將其收到的消息復(fù)制給其下面的每個(gè)channel,channel在將消息推送給consumer,channel收到topic發(fā)過來(函數(shù)調(diào)用)的消息,可將消息存入內(nèi)存或是文件
消息寫入內(nèi)存,topic下面的channel其實(shí)是共享一份數(shù)據(jù),因?yàn)閿?shù)據(jù)都是自讀的,而寫入文件卻是每個(gè)channel都有一組文件并將消息吸入,真正做到了讀時(shí)復(fù)制,每個(gè)topic和channel都會(huì)實(shí)例化一個(gè)diskQueue,其結(jié)構(gòu)如下
// diskQueue implements a filesystem backed FIFO queue
//原文:http://www.rzrgm.cn/hlxs/p/11445103.html 作者:啊漢
type diskQueue struct { // 64bit atomic vars need to be first for proper alignment on 32bit platforms // run-time state (also persisted to disk) readPos int64 //已經(jīng)讀的位置 writePos int64 //已經(jīng)寫的位置 readFileNum int64 //正在讀的文件編號(hào) writeFileNum int64 //正在寫的文件編號(hào) depth int64 //沒有消費(fèi)的消息數(shù)量 sync.RWMutex // instantiation time metadata name string // topicName 或者 topicName + ":" + channelName dataPath string //存消息文件的目錄 maxBytesPerFile int64 // currently this cannot change once created minMsgSize int32 //消息最小值 maxMsgSize int32 //消息最大值 syncEvery int64 // number of writes per fsync syncTimeout time.Duration // duration of time per fsync exitFlag int32 //退出標(biāo)記 needSync bool //強(qiáng)制將文件緩沖區(qū)的數(shù)據(jù)寫入磁盤 // keeps track of the position where we have read // (but not yet sent over readChan) nextReadPos int64 //下次讀的位置 nextReadFileNum int64 //下次讀的文件編號(hào) readFile *os.File //正在讀的文件 writeFile *os.File //正在寫的文件 reader *bufio.Reader //讀緩沖區(qū),默認(rèn)4K writeBuf bytes.Buffer //寫緩沖區(qū) // exposed via ReadChan() readChan chan []byte //讀channel // internal channels writeChan chan []byte //寫channel writeResponseChan chan error //寫結(jié)果通知 emptyChan chan int //刪除所有文件channel emptyResponseChan chan error //刪除通知channel exitChan chan int //退出channel exitSyncChan chan int //退出命令同步等待channel logf AppLogFunc //寫日志 }
文件名命名:目錄 + topicName:channelName + .diskqueue.000001.dat
func (d *diskQueue) fileName(fileNum int64) string { return fmt.Sprintf(path.Join(d.dataPath, "%s.diskqueue.%06d.dat"), d.name, fileNum) }
diskQueue在實(shí)例化的時(shí)候回初始化相關(guān)的屬性,當(dāng)文件大小大于指定文件的最大值時(shí),文件編號(hào)writeFileNum就會(huì)自增1,新來的消息就會(huì)寫入新的文件
按順序讀寫文件,每個(gè)消息寫文件的格式是:消息長(zhǎng)度(4字節(jié)) + 消息內(nèi)容,這樣讀消息也就很容易了,先讀4字節(jié),知道消息的長(zhǎng)度,接著讀消息內(nèi)容,下一個(gè)消息也是這樣讀,當(dāng)下一個(gè)消息讀的位置大于文件的最大值時(shí)說明這個(gè)文件讀完了,可以從下一個(gè)文件開始寫了,
寫文件是同步的,寫完之后直接反饋消息是否寫入成功,由于文件系統(tǒng)的緩存原因,系統(tǒng)并不是把消息馬上寫入磁盤,而是寫入了文件的緩沖區(qū),所以需要定時(shí)的將文件緩沖區(qū)的內(nèi)容寫入磁盤,nsq使用了兩個(gè)策略將文件緩沖區(qū)的內(nèi)容寫入磁盤。兩個(gè)策略同時(shí)進(jìn)行
1:默認(rèn)每2500條消息強(qiáng)制將文件緩存內(nèi)容寫入磁盤
2:默認(rèn)每?jī)擅霃?qiáng)制將文件緩存內(nèi)容寫入磁盤
在將消息強(qiáng)制寫入磁盤的同時(shí),也會(huì)將隊(duì)列當(dāng)前狀態(tài)寫入另一個(gè)文件,若程序退出,下次啟動(dòng)后就能正常進(jìn)行文件的讀寫,寫入內(nèi)容包括:
1:剩余消息數(shù)量
2:正在讀的文件編號(hào)
3:讀文件偏移量
4:正在寫的文件編號(hào)
5:寫文件偏移量
磁盤文件的刪除,如果一個(gè)文件中的消息全部被消費(fèi)了,那這個(gè)文件將被刪除
斷開重連
斷開后如果不能自動(dòng)重連,那就是死都不知道怎么死的,所以nsq是有斷開重連功能的
server短發(fā)現(xiàn)斷開后,不會(huì)自動(dòng)重連,鬼知道你是不是主動(dòng)斷開,所以server發(fā)現(xiàn)斷開了,就將client的相關(guān)信息完全刪除,就像client從沒有出現(xiàn)過
client斷開后會(huì)自動(dòng)重連,client分consumer和producer
consumer自動(dòng)重連:consumer作為消費(fèi)者就是讀,所以當(dāng)讀失敗的時(shí)候,consumer會(huì)關(guān)閉讀寫功能,就斷開連接,當(dāng)consumer收到的所有消息處理完成后,就會(huì)自動(dòng)重連,注意寫失敗并不會(huì)自動(dòng)重連
producer自動(dòng)重連:producer作為生產(chǎn)者就是寫,所以當(dāng)寫失敗的時(shí)候,producer按照狀態(tài)來決定是否重連,如果發(fā)現(xiàn)狀態(tài)為非連接狀態(tài)就連接,收到斷開是不會(huì)重連的,在寫失敗的時(shí)候才會(huì)重連
參考資料
未完。。。
浙公網(wǎng)安備 33010602011771號(hào)