需求背景:
goalng常駐內(nèi)存任務(wù)腳本監(jiān)聽(tīng)rbmq執(zhí)行任務(wù)
任務(wù)腳本由supervisor來(lái)管理

當(dāng)rabbitmq長(zhǎng)時(shí)間斷開(kāi)連接會(huì)出現(xiàn)如下圖 進(jìn)程處于fatal狀態(tài)


假如因?yàn)椴豢煽咕芤蛩兀瑀abbitmq服務(wù)器內(nèi)存滿了或者其它原因?qū)е聄abbitmq消息隊(duì)列服務(wù)停止了
如果是短時(shí)間的停止重啟,supervisor是可以即時(shí)喚醒該程序。如果服務(wù)器長(zhǎng)時(shí)間沒(méi)有恢復(fù)正常運(yùn)行,程序就會(huì)出現(xiàn)fatal進(jìn)程啟動(dòng)失敗的狀態(tài),此時(shí)可以通過(guò)告警來(lái)提醒開(kāi)發(fā)人員

如果以上告警能時(shí)時(shí)通知運(yùn)維人員此問(wèn)題可以略過(guò)了。今天討論的是如果在長(zhǎng)時(shí)間斷開(kāi)連接還能在服務(wù)器恢復(fù)正常情況下自動(dòng)實(shí)現(xiàn)重連。

實(shí)現(xiàn)重連方式很多,下面實(shí)現(xiàn)方式比較簡(jiǎn)單
使用通道實(shí)現(xiàn)通知
通知可以被看作是特殊的請(qǐng)求/回應(yīng)用例。在一個(gè)通知用例中,我們并不關(guān)心回應(yīng)的值,我們只關(guān)心回應(yīng)是否已發(fā)生。 所以我們常常使用空結(jié)構(gòu)體類(lèi)型struct{}來(lái)做為通道的元素類(lèi)型,因?yàn)榭战Y(jié)構(gòu)體類(lèi)型的尺寸為零,能夠節(jié)省一些內(nèi)存(雖然常常很少量)。
向一個(gè)通道發(fā)送一個(gè)值來(lái)實(shí)現(xiàn)單對(duì)單通知
我們已知道,如果一個(gè)通道中無(wú)值可接收,則此通道上的下一個(gè)接收操作將阻塞到另一個(gè)協(xié)程發(fā)送一個(gè)值到此通道為止。 所以一個(gè)協(xié)程可以向此通道發(fā)送一個(gè)值來(lái)通知另一個(gè)等待著從此通道接收數(shù)據(jù)的協(xié)程。

- Recv方法創(chuàng)建ampq鏈接
- 啟動(dòng)協(xié)程開(kāi)始執(zhí)行任務(wù)
- MqOpenChannel 打開(kāi)一個(gè)channel通道處理amqp消息
- 拿到消息 處理任務(wù)
3,協(xié)程中捕獲異常發(fā)送消息到 taskQuit <- struct{}{}
4,主進(jìn)程監(jiān)聽(tīng)taskQuit管道 開(kāi)始嘗試重新鏈接amqp
4.1如果在規(guī)定時(shí)間內(nèi)鏈接成功 重新鏈接成功后啟動(dòng)新的協(xié)程處理任務(wù)
4.2如果在規(guī)定時(shí)間內(nèi)沒(méi)有鏈接成功 則退出主進(jìn)程
send.go 生產(chǎn)者
package main import ( "fmt" _ "fmt" "github.com/ichunt2019/golang-rabbitmq-2022/utils/rabbitmq" ) func main() { for i := 0;i<20;i++{ body := fmt.Sprintf("{\"order_id\":%d}",i) fmt.Println(body) /** 使用默認(rèn)的交換機(jī) 如果是默認(rèn)交換機(jī) type QueueExchange struct { QuName string // 隊(duì)列名稱(chēng) RtKey string // key值 ExName string // 交換機(jī)名稱(chēng) ExType string // 交換機(jī)類(lèi)型 Dns string //鏈接地址 } 如果你喜歡使用默認(rèn)交換機(jī) RtKey 此處建議填寫(xiě)成 RtKey 和 QuName 一樣的值 */ queueExchange := rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.1.169:5672/", } _ = rabbitmq.Send(queueExchange,body) } }
recv.go 消費(fèi)者
package main import ( "fmt" "github.com/ichunt2019/golang-rabbitmq-2022/utils/rabbitmq" "time" ) type RecvPro struct { } //// 實(shí)現(xiàn)消費(fèi)者 消費(fèi)消息失敗 自動(dòng)進(jìn)入延時(shí)嘗試 嘗試3次之后入庫(kù)db /* 返回值 error 為nil 則表示該消息消費(fèi)成功 否則消息會(huì)進(jìn)入ttl延時(shí)隊(duì)列 重復(fù)嘗試消費(fèi)3次 3次后消息如果還是失敗 消息就執(zhí)行失敗 進(jìn)入告警 FailAction */ func (t *RecvPro) Consumer(dataByte []byte) error { time.Sleep(time.Second*1) //return errors.New("頂頂頂頂") fmt.Println(string(dataByte)) //time.Sleep(1*time.Second) //return errors.New("頂頂頂頂") return nil } //消息已經(jīng)消費(fèi)3次 失敗了 請(qǐng)進(jìn)行處理 /* 如果消息 消費(fèi)3次后 仍然失敗 此處可以根據(jù)情況 對(duì)消息進(jìn)行告警提醒 或者 補(bǔ)償 入庫(kù)db 釘釘告警等等 */ func (t *RecvPro) FailAction(err error,dataByte []byte) error { fmt.Println(string(dataByte)) fmt.Println(err) fmt.Println("任務(wù)處理失敗了,我要進(jìn)入db日志庫(kù)了") fmt.Println("任務(wù)處理失敗了,發(fā)送釘釘消息通知主人") return nil } func main() { processTask := &RecvPro{} /* runNums: 表示任務(wù)并發(fā)處理數(shù)量 一般建議 普通任務(wù)1-3 就可以了 maxTryConnTimeFromMinute:表示最大嘗試時(shí)間 分鐘 */ err := rabbitmq.Recv(rabbitmq.QueueExchange{ "a_test_0001", "a_test_0001", "hello_go", "direct", "amqp://guest:guest@192.168.1.169:5672/", }, processTask,4,2) if(err != nil){ fmt.Println(err) } }
receiver.go golang-rbmq工具包
package rabbitmq import ( "errors" "strconv" "time" //"errors" "fmt" "github.com/streadway/amqp" "log" ) // 定義全局變量,指針類(lèi)型 var mqConn *amqp.Connection var mqChan *amqp.Channel // 定義生產(chǎn)者接口 type Producer interface { MsgContent() string } // 定義生產(chǎn)者接口 type RetryProducer interface { MsgContent() string } // 定義接收者接口 type Receiver interface { Consumer([]byte) error FailAction(error , []byte) error } // 定義RabbitMQ對(duì)象 type RabbitMQ struct { connection *amqp.Connection Channel *amqp.Channel dns string QueueName string // 隊(duì)列名稱(chēng) RoutingKey string // key名稱(chēng) ExchangeName string // 交換機(jī)名稱(chēng) ExchangeType string // 交換機(jī)類(lèi)型 producerList []Producer retryProducerList []RetryProducer receiverList []Receiver } // 定義隊(duì)列交換機(jī)對(duì)象 type QueueExchange struct { QuName string // 隊(duì)列名稱(chēng) RtKey string // key值 ExName string // 交換機(jī)名稱(chēng) ExType string // 交換機(jī)類(lèi)型 Dns string //鏈接地址 } // 鏈接rabbitMQ func (r *RabbitMQ)MqConnect() (err error){ mqConn, err = amqp.Dial(r.dns) r.connection = mqConn // 賦值給RabbitMQ對(duì)象 if err != nil { fmt.Printf("rbmq鏈接失敗 :%s \n", err) } return } // 關(guān)閉mq鏈接 func (r *RabbitMQ)CloseMqConnect() (err error){ err = r.connection.Close() if err != nil{ fmt.Printf("關(guān)閉mq鏈接失敗 :%s \n", err) } return } // 鏈接rabbitMQ func (r *RabbitMQ)MqOpenChannel() (err error){ mqConn := r.connection r.Channel, err = mqConn.Channel() //defer mqChan.Close() if err != nil { fmt.Printf("MQ打開(kāi)管道失敗:%s \n", err) } return err } // 鏈接rabbitMQ func (r *RabbitMQ)CloseMqChannel() (err error){ r.Channel.Close() if err != nil { fmt.Printf("關(guān)閉mq鏈接失敗 :%s \n", err) } return err } // 創(chuàng)建一個(gè)新的操作對(duì)象 func NewMq(q QueueExchange) RabbitMQ { return RabbitMQ{ QueueName:q.QuName, RoutingKey:q.RtKey, ExchangeName: q.ExName, ExchangeType: q.ExType, dns:q.Dns, } } func (mq *RabbitMQ) sendMsg (body string) (err error) { err = mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err :%s \n", err) } defer func() { _ = mq.Channel.Close() }() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) } } // 用于檢查隊(duì)列是否存在,已經(jīng)存在不需要重復(fù)聲明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 綁定任務(wù) if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } } if mq.ExchangeName != "" && mq.RoutingKey != ""{ err = mq.Channel.Publish( mq.ExchangeName, // exchange mq.RoutingKey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), DeliveryMode: 2, }) }else{ err = mq.Channel.Publish( "", // exchange mq.QueueName, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), DeliveryMode: 2, }) } return } /* 發(fā)送延時(shí)消息 */ func (mq *RabbitMQ)sendDelayMsg(body string,ttl int64) (err error){ err =mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err :%s \n", err) } defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { return } } if ttl <= 0{ return errors.New("發(fā)送延時(shí)消息,ttl參數(shù)是必須的") } table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = mq.RoutingKey table["x-dead-letter-exchange"] = mq.ExchangeName table["x-message-ttl"] = ttl*1000 //fmt.Printf("%+v",table) //fmt.Printf("%+v",mq) // 用于檢查隊(duì)列是否存在,已經(jīng)存在不需要重復(fù)聲明 ttlstring := strconv.FormatInt(ttl,10) queueName := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring) routingKey := fmt.Sprintf("%s_delay_%s",mq.QueueName ,ttlstring) _, err = ch.QueueDeclare(queueName, true, false, false, false, table) if err != nil { return } // 綁定任務(wù) if routingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(queueName, routingKey, mq.ExchangeName, false, nil) if err != nil { return } } header := make(map[string]interface{},1) header["retry_nums"] = 0 var ttl_exchange string var ttl_routkey string if(mq.ExchangeName != "" ){ ttl_exchange = mq.ExchangeName }else{ ttl_exchange = "" } if mq.RoutingKey != "" && mq.ExchangeName != ""{ ttl_routkey = routingKey }else{ ttl_routkey = queueName } err = mq.Channel.Publish( ttl_exchange, // exchange ttl_routkey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), Headers:header, }) if err != nil { return } return } func (mq *RabbitMQ) sendRetryMsg (body string,retry_nums int32,args ...string) { err :=mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err :%s \n", err) } defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) } } //原始路由key oldRoutingKey := args[0] //原始交換機(jī)名 oldExchangeName := args[1] table := make(map[string]interface{},3) table["x-dead-letter-routing-key"] = oldRoutingKey if oldExchangeName != "" { table["x-dead-letter-exchange"] = oldExchangeName }else{ mq.ExchangeName = "" table["x-dead-letter-exchange"] = "" } table["x-message-ttl"] = int64(20000) //fmt.Printf("%+v",table) //fmt.Printf("%+v",mq) // 用于檢查隊(duì)列是否存在,已經(jīng)存在不需要重復(fù)聲明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, table) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 綁定任務(wù) if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } } header := make(map[string]interface{},1) header["retry_nums"] = retry_nums + int32(1) var ttl_exchange string var ttl_routkey string if(mq.ExchangeName != "" ){ ttl_exchange = mq.ExchangeName }else{ ttl_exchange = "" } if mq.RoutingKey != "" && mq.ExchangeName != ""{ ttl_routkey = mq.RoutingKey }else{ ttl_routkey = mq.QueueName } //fmt.Printf("ttl_exchange:%s,ttl_routkey:%s \n",ttl_exchange,ttl_routkey) err = mq.Channel.Publish( ttl_exchange, // exchange ttl_routkey, // routing key false, // mandatory false, // immediate amqp.Publishing { ContentType: "text/plain", Body: []byte(body), Headers:header, }) if err != nil { fmt.Printf("MQ任務(wù)發(fā)送失敗:%s \n", err) } } // 監(jiān)聽(tīng)接收者接收任務(wù) 消費(fèi)者 func (mq *RabbitMQ) ListenReceiver(receiver Receiver) { err :=mq.MqOpenChannel() ch := mq.Channel if err != nil{ log.Printf("Channel err :%s \n", err) } defer mq.Channel.Close() if mq.ExchangeName != "" { if mq.ExchangeType == ""{ mq.ExchangeType = "direct" } err = ch.ExchangeDeclare(mq.ExchangeName, mq.ExchangeType, true, false, false, false, nil) if err != nil { log.Printf("ExchangeDeclare err :%s \n", err) } } // 用于檢查隊(duì)列是否存在,已經(jīng)存在不需要重復(fù)聲明 _, err = ch.QueueDeclare(mq.QueueName, true, false, false, false, nil) if err != nil { log.Printf("QueueDeclare err :%s \n", err) } // 綁定任務(wù) if mq.RoutingKey != "" && mq.ExchangeName != "" { err = ch.QueueBind(mq.QueueName, mq.RoutingKey, mq.ExchangeName, false, nil) if err != nil { log.Printf("QueueBind err :%s \n", err) } } // 獲取消費(fèi)通道,確保rabbitMQ一個(gè)一個(gè)發(fā)送消息 err = ch.Qos(1, 0, false) msgList, err := ch.Consume(mq.QueueName, "", false, false, false, false, nil) if err != nil { log.Printf("Consume err :%s \n", err) } for msg := range msgList { retry_nums,ok := msg.Headers["retry_nums"].(int32) if(!ok){ retry_nums = int32(0) } // 處理數(shù)據(jù) err := receiver.Consumer(msg.Body) if err!=nil { //消息處理失敗 進(jìn)入延時(shí)嘗試機(jī)制 if retry_nums < 3{ fmt.Println(string(msg.Body)) fmt.Printf("消息處理失敗 消息開(kāi)始進(jìn)入嘗試 ttl延時(shí)隊(duì)列 \n") retry_msg(msg.Body,retry_nums,QueueExchange{ mq.QueueName, mq.RoutingKey, mq.ExchangeName, mq.ExchangeType, mq.dns, }) }else{ //消息失敗 入庫(kù)db fmt.Printf("消息處理3次后還是失敗了 入庫(kù)db 釘釘告警 \n") receiver.FailAction(err,msg.Body) } err = msg.Ack(true) if err != nil { fmt.Printf("確認(rèn)消息未完成異常:%s \n", err) } }else { // 確認(rèn)消息,必須為false err = msg.Ack(true) if err != nil { fmt.Printf("消息消費(fèi)ack失敗 err :%s \n", err) } } } } //消息處理失敗之后 延時(shí)嘗試 func retry_msg(msg []byte,retry_nums int32,queueExchange QueueExchange){ //原始隊(duì)列名稱(chēng) 交換機(jī)名稱(chēng) oldQName := queueExchange.QuName oldExchangeName := queueExchange.ExName oldRoutingKey := queueExchange.RtKey if oldRoutingKey == "" || oldExchangeName == ""{ oldRoutingKey = oldQName } if queueExchange.QuName != "" { queueExchange.QuName = queueExchange.QuName + "_retry_3"; } if queueExchange.RtKey != "" { queueExchange.RtKey = queueExchange.RtKey + "_retry_3"; }else{ queueExchange.RtKey = queueExchange.QuName + "_retry_3"; } //fmt.Printf("%+v",queueExchange) mq := NewMq(queueExchange) _ = mq.MqConnect() defer func(){ _ = mq.CloseMqConnect() }() //fmt.Printf("%+v",queueExchange) mq.sendRetryMsg(string(msg),retry_nums,oldRoutingKey,oldExchangeName) } func Send(queueExchange QueueExchange,msg string) (err error){ mq := NewMq(queueExchange) err = mq.MqConnect() if err != nil{ return } defer func(){ _ = mq.CloseMqConnect() }() err = mq.sendMsg(msg) return } //發(fā)送延時(shí)消息 func SendDelay(queueExchange QueueExchange,msg string,ttl int64)(err error){ mq := NewMq(queueExchange) err = mq.MqConnect() if err != nil{ return } defer func(){ _ = mq.CloseMqConnect() }() err = mq.sendDelayMsg(msg,ttl) return } /* runNums 開(kāi)啟并發(fā)執(zhí)行任務(wù)數(shù)量 */ func Recv(queueExchange QueueExchange,receiver Receiver,otherParams ...int) (err error){ var ( exitTask bool maxTryConnNums int //rbmq鏈接失敗后多久嘗試一次 runNums int maxTryConnTimeFromMinute int ) if(len(otherParams) <= 0){ runNums = 1 maxTryConnTimeFromMinute = 0 }else if(len(otherParams) == 1){ runNums = otherParams[0] maxTryConnTimeFromMinute = 0 }else if(len(otherParams) == 2){ runNums = otherParams[0] maxTryConnTimeFromMinute = otherParams[1] } //maxTryConnNums := 360 //rbmq鏈接失敗后最大嘗試次數(shù) //maxTryConnTime := time.Duration(10) //rbmq鏈接失敗后多久嘗試一次 maxTryConnNums = maxTryConnTimeFromMinute * 10 * maxTryConnTimeFromMinute//rbmq鏈接失敗后最大嘗試次數(shù) maxTryConnTime := time.Duration(6) //rbmq鏈接失敗后多久嘗試一次 mq := NewMq(queueExchange) //鏈接rabbitMQ err = mq.MqConnect() if(err != nil){ return } defer func() { if panicErr := recover(); panicErr != nil{ fmt.Println(recover()) err = errors.New(fmt.Sprintf("%s",panicErr)) } }() //rbmq斷開(kāi)鏈接后 協(xié)程退出釋放信號(hào) taskQuit:= make(chan struct{}, 1) //嘗試鏈接rbmq tryToLinkC := make(chan struct{}, 1) //最大嘗試次數(shù) tryToLinkMaxNums := make(chan struct{}, 1) maxTryNums := 0 //嘗試重啟次數(shù) //開(kāi)始執(zhí)行任務(wù) for i:=1;i<=runNums;i++{ go Recv2(mq,receiver,taskQuit); } //如果rbmq斷開(kāi)連接后 嘗試重新建立鏈接 var tryToLink = func() { for { maxTryNums += 1 err = mq.MqConnect() if(err == nil){ tryToLinkC <- struct{}{} break } if(maxTryNums > maxTryConnNums){ tryToLinkMaxNums <- struct{}{} break } //如果鏈接斷開(kāi)了 10秒重新嘗試鏈接一次 time.Sleep(time.Second * maxTryConnTime) } return } scheduleTimer := time.NewTimer(time.Millisecond*300) exitTask = true for{ select { case <-tryToLinkC: //建立鏈接成功后 重新開(kāi)啟協(xié)程執(zhí)行任務(wù) fmt.Println("重新開(kāi)啟新的協(xié)程執(zhí)行任務(wù)") go Recv2(mq,receiver,taskQuit); case <-tryToLinkMaxNums://rbmq超出最大鏈接次數(shù) 退出任務(wù) fmt.Println("rbmq鏈接超過(guò)最大嘗試次數(shù)!") exitTask = false err = errors.New("rbmq鏈接超過(guò)最大嘗試次數(shù)!") case <- taskQuit ://rbmq斷開(kāi)連接后 開(kāi)始嘗試重新建立鏈接 fmt.Println("rbmq斷開(kāi)連接后 開(kāi)始嘗試重新建立鏈接") go tryToLink() case <- scheduleTimer.C: //fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~") } // 重置調(diào)度間隔 scheduleTimer.Reset(time.Millisecond*300) if !exitTask{ break } } fmt.Println("exit") return } func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){ defer func() { fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~") taskQuit <- struct{}{} return }() // 驗(yàn)證鏈接是否正常 err := mq.MqOpenChannel() if(err != nil){ return } mq.ListenReceiver(receiver) } type retryPro struct { msgContent string }
核心代碼分析:
/* runNums 開(kāi)啟并發(fā)執(zhí)行任務(wù)數(shù)量 */ func Recv(queueExchange QueueExchange,receiver Receiver,otherParams ...int) (err error){ var ( exitTask bool maxTryConnNums int //rbmq鏈接失敗后多久嘗試一次 runNums int maxTryConnTimeFromMinute int ) if(len(otherParams) <= 0){ runNums = 1 maxTryConnTimeFromMinute = 0 }else if(len(otherParams) == 1){ runNums = otherParams[0] maxTryConnTimeFromMinute = 0 }else if(len(otherParams) == 2){ runNums = otherParams[0] maxTryConnTimeFromMinute = otherParams[1] } //maxTryConnNums := 360 //rbmq鏈接失敗后最大嘗試次數(shù) //maxTryConnTime := time.Duration(10) //rbmq鏈接失敗后多久嘗試一次 maxTryConnNums = maxTryConnTimeFromMinute * 10 * maxTryConnTimeFromMinute//rbmq鏈接失敗后最大嘗試次數(shù) maxTryConnTime := time.Duration(6) //rbmq鏈接失敗后多久嘗試一次 mq := NewMq(queueExchange) //鏈接rabbitMQ err = mq.MqConnect() if(err != nil){ return } defer func() { if panicErr := recover(); panicErr != nil{ fmt.Println(recover()) err = errors.New(fmt.Sprintf("%s",panicErr)) } }() //rbmq斷開(kāi)鏈接后 協(xié)程退出釋放信號(hào) taskQuit:= make(chan struct{}, 1) //嘗試鏈接rbmq tryToLinkC := make(chan struct{}, 1) //最大嘗試次數(shù) tryToLinkMaxNums := make(chan struct{}, 1) maxTryNums := 0 //嘗試重啟次數(shù) //開(kāi)始執(zhí)行任務(wù) for i:=1;i<=runNums;i++{ go Recv2(mq,receiver,taskQuit); } //如果rbmq斷開(kāi)連接后 嘗試重新建立鏈接 var tryToLink = func() { for { maxTryNums += 1 err = mq.MqConnect() if(err == nil){ tryToLinkC <- struct{}{} break } if(maxTryNums > maxTryConnNums){ tryToLinkMaxNums <- struct{}{} break } //如果鏈接斷開(kāi)了 10秒重新嘗試鏈接一次 time.Sleep(time.Second * maxTryConnTime) } return } scheduleTimer := time.NewTimer(time.Millisecond*300) exitTask = true for{ select { case <-tryToLinkC: //建立鏈接成功后 重新開(kāi)啟協(xié)程執(zhí)行任務(wù) fmt.Println("重新開(kāi)啟新的協(xié)程執(zhí)行任務(wù)") go Recv2(mq,receiver,taskQuit); case <-tryToLinkMaxNums://rbmq超出最大鏈接次數(shù) 退出任務(wù) fmt.Println("rbmq鏈接超過(guò)最大嘗試次數(shù)!") exitTask = false err = errors.New("rbmq鏈接超過(guò)最大嘗試次數(shù)!") case <- taskQuit ://rbmq斷開(kāi)連接后 開(kāi)始嘗試重新建立鏈接 fmt.Println("rbmq斷開(kāi)連接后 開(kāi)始嘗試重新建立鏈接") go tryToLink() case <- scheduleTimer.C: //fmt.Println("~~~~~~~~~~~~~~~~~~~~~~~") } // 重置調(diào)度間隔 scheduleTimer.Reset(time.Millisecond*300) if !exitTask{ break } } fmt.Println("exit") return } func Recv2(mq RabbitMQ,receiver Receiver,taskQuit chan<- struct{}){ defer func() { fmt.Println("rbmq鏈接失敗,協(xié)程任務(wù)退出~~~~~~~~~~~~~~~~~~~~") taskQuit <- struct{}{} return }() // 驗(yàn)證鏈接是否正常 err := mq.MqOpenChannel() if(err != nil){ return } mq.ListenReceiver(receiver) }
場(chǎng)景一:rbmq斷開(kāi)連接 ,開(kāi)始嘗試鏈接,鏈接次數(shù)超過(guò)最大次數(shù) 放棄鏈接

場(chǎng)景二:rbmq斷開(kāi)連接 ,開(kāi)始嘗試鏈接,鏈接次數(shù)未超過(guò)最大次數(shù),重新建議鏈接 開(kāi)始消費(fèi)任務(wù)

最新源碼倉(cāng)庫(kù)地址:https://github.com/sunlongv520/golang-rabbitmq
其它:該rabbitmq包實(shí)現(xiàn)中包含了,消息處理失敗重試機(jī)制,有興趣的同學(xué)可以看看
(重試和重連接是兩個(gè)概念)
重連接 :rabbitmq鏈接失敗導(dǎo)致任務(wù)失敗,此時(shí)要等待rabbitmq服務(wù)器恢復(fù)正常后才能再次啟動(dòng)協(xié)程處理任務(wù)
重試:rabbitmq服務(wù)正常,消息消費(fèi)進(jìn)程也正常,但是消息處理失敗。嘗試多次消費(fèi)消息后還是失敗就ack消息,在整個(gè)重試過(guò)程中不會(huì)阻塞消費(fèi)
本文來(lái)自博客園,作者:孫龍-程序員,轉(zhuǎn)載請(qǐng)注明原文鏈接:http://www.rzrgm.cn/sunlong88/p/15959476.html
浙公網(wǎng)安備 33010602011771號(hào)