(翻譯 gafferongames)Reliable Ordered Messages 可靠有序消息
https://gafferongames.com/post/reliable_ordered_messages/
Many people will tell you that implementing your own reliable message system on top of UDP is foolish. After all, why reimplement TCP?
But why limit ourselves to how TCP works? But there are so many different ways to implement reliable-messages and most of them work nothing like TCP!
So let’s get creative and work out how we can implement a reliable message system that’s better and more flexible than TCP for real-time games.
很多人會(huì)告訴你,在 UDP 之上實(shí)現(xiàn)你自己的可靠消息系統(tǒng)是愚蠢的——畢竟,為什么要重新實(shí)現(xiàn) TCP 呢?
但為什么我們要局限于 TCP 的工作方式呢?其實(shí),實(shí)現(xiàn)可靠消息的方法有很多,而且大多數(shù)都跟 TCP 完全不一樣!
所以,讓我們發(fā)揮創(chuàng)造力,來設(shè)計(jì)一個(gè)比 TCP 更適合實(shí)時(shí)游戲、更靈活的可靠消息系統(tǒng)吧。
Different Approaches
A common approach to reliability in games is to have two packet types: reliable-ordered and unreliable. You’ll see this approach in many network libraries.
The basic idea is that the library resends reliable packets until they are received by the other side. This is the option that usually ends up looking a bit like TCP-lite for the reliable-packets. It’s not that bad, but you can do much better.
The way I prefer to think of it is that messages are smaller bitpacked elements that know how to serialize themselves. This makes the most sense when the overhead of length prefixing and padding bitpacked messages up to the next byte is undesirable (eg. lots of small messages included in each packet). Sent messages are placed in a queue and each time a packet is sent some of the messages in the send queue are included in the outgoing packet. This way there are no reliable packets that need to be resent. Reliable messages are simply included in outgoing packets until they are received.
The easiest way to do this is to include all unacked messages in each packet sent. It goes something like this: each message sent has an id that increments each time a message is sent. Each outgoing packet includes the start message id followed by the data for n messages. The receiver continually sends back the most recent received message id to the sender as an ack and only messages newer than the most recent acked message id are included in packets.
This is simple and easy to implement but if a large burst of packet loss occurs while you are sending messages you get a spike in packet size due to unacked messages.
You can avoid this by extending the system to have an upper bound on the number of messages included per-packet n. But now if you have a high packet send rate (like 60 packets per-second) you are sending the same message multiple times until you get an ack for that message.
If your round trip time is 100ms each message will be sent 6 times redundantly before being acked on average. Maybe you really need this amount of redundancy because your messages are extremely time critical, but in most cases, your bandwidth would be better spent on other things.
The approach I prefer combines packet level acks with a prioritization system that picks the n most important messages to include in each packet. This combines time critical delivery and the ability to send only n messages per-packet, while distributing sends across all messages in the send queue.
在游戲中實(shí)現(xiàn)可靠性的一種常見方式是使用兩種數(shù)據(jù)包類型:可靠有序和不可靠。你會(huì)在許多網(wǎng)絡(luò)庫中看到這種方法。
基本思想是,庫會(huì)不斷重發(fā)可靠數(shù)據(jù)包,直到它們被對(duì)方接收。這種方法通常會(huì)讓可靠數(shù)據(jù)包看起來有點(diǎn)像簡化版的 TCP。雖然這并不差,但其實(shí)你可以做得更好。
我更喜歡的思路是,把消息看作是較小的位打包元素,它們知道如何自我序列化。當(dāng)不希望使用長度前綴和將打包消息填充到下一個(gè)字節(jié)(例如,每個(gè)數(shù)據(jù)包包含大量小消息)時(shí),這種方式最有意義。
發(fā)送的消息被放入一個(gè)隊(duì)列中,每次發(fā)送數(shù)據(jù)包時(shí),從發(fā)送隊(duì)列中挑選一些消息加入到即將發(fā)出的數(shù)據(jù)包中。這樣就不需要重新發(fā)送整個(gè)可靠數(shù)據(jù)包,可靠消息只要在未收到確認(rèn)前不斷地包含在發(fā)送的數(shù)據(jù)包中即可。
最簡單的實(shí)現(xiàn)方式是:每個(gè)數(shù)據(jù)包都包含所有未確認(rèn)的消息。大致過程是這樣的:每條發(fā)送的消息都有一個(gè) id,每次發(fā)送都會(huì)遞增。每個(gè)發(fā)送的數(shù)據(jù)包都包含起始消息 id,隨后是 n 條消息的數(shù)據(jù)。接收端不斷將最近收到的消息 id 作為 ack 發(fā)回給發(fā)送端,而發(fā)送端只會(huì)在數(shù)據(jù)包中包含比這個(gè) ack id 更新的消息。
這種方式簡單易實(shí)現(xiàn),但如果在發(fā)送消息時(shí)出現(xiàn)大量數(shù)據(jù)包丟失,就會(huì)因?yàn)槲创_認(rèn)的消息堆積而導(dǎo)致數(shù)據(jù)包大小激增。
你可以通過為每個(gè)數(shù)據(jù)包設(shè)置最大消息數(shù) n 來避免這個(gè)問題。但如果你有很高的包發(fā)送率(比如每秒 60 個(gè)包),那你就會(huì)在每條消息被確認(rèn)前多次發(fā)送相同的消息。
如果往返時(shí)間是 100ms,那么平均每條消息會(huì)被重復(fù)發(fā)送 6 次,直到收到確認(rèn)。也許你的消息確實(shí)極度時(shí)間敏感,需要這種冗余,但在大多數(shù)情況下,你的帶寬更應(yīng)該用于其他方面。
我更喜歡的方法是將數(shù)據(jù)包級(jí)別的確認(rèn)機(jī)制與一個(gè)優(yōu)先級(jí)系統(tǒng)結(jié)合起來,每個(gè)數(shù)據(jù)包挑選 n 條最重要的消息發(fā)送。這樣既能保證關(guān)鍵消息的及時(shí)傳遞,又能限制每個(gè)數(shù)據(jù)包的消息數(shù)量,并在整個(gè)發(fā)送隊(duì)列中均衡地分發(fā)發(fā)送機(jī)會(huì)。
Packet Level Acks
To implement packet level acks, we add the following packet header:
數(shù)據(jù)包級(jí)別的確認(rèn)(Packet Level Acks)
為了實(shí)現(xiàn)數(shù)據(jù)包級(jí)別的確認(rèn)機(jī)制,我們需要在數(shù)據(jù)包頭部添加以下字段:
struct Header { uint16_t sequence; uint16_t ack; uint32_t ack_bits; };
These header elements combine to create the ack system: sequence is a number that increases with each packet sent, ack is the most recent packet sequence number received, and ack_bits is a bitfield encoding the set of acked packets.
If bit n is set in ack_bits, then ack - n is acked. Not only is ack_bits a smart encoding that saves bandwidth, it also adds redundancy to combat packet loss. Each ack is sent 32 times. If one packet is lost, there’s 31 other packets with the same ack. Statistically speaking, acks are very likely to get through.
這些包頭字段共同組成了確認(rèn)系統(tǒng):sequence 是一個(gè)隨著每個(gè)發(fā)送的數(shù)據(jù)包遞增的編號(hào),ack 是最近接收到的數(shù)據(jù)包序列號(hào),ack_bits 是一個(gè)用于編碼已確認(rèn)數(shù)據(jù)包集合的位字段。
如果 ack_bits 中的第 n 位被設(shè)置,則 ack - n 被確認(rèn)。ack_bits 不僅是一種節(jié)省帶寬的巧妙編碼方式,它還增加了冗余以應(yīng)對(duì)數(shù)據(jù)包丟失。每個(gè) ack 會(huì)被發(fā)送 32 次。如果一個(gè)數(shù)據(jù)包丟失了,還有另外 31 個(gè)數(shù)據(jù)包帶有相同的 ack。從統(tǒng)計(jì)角度來看,ack 很可能會(huì)成功傳達(dá)。
But bursts of packet loss do happen, so it’s important to note that:
-
If you receive an ack for packet n then that packet was definitely received.
-
If you don’t receive an ack, the packet was most likely not received. But, it might have been, and the ack just didn’t get through. This is extremely rare.
In my experience it’s not necessary to send perfect acks. Building a reliability system on top of a system that very rarely drops acks adds no significant problems. But it does create a challenge for testing this system works under all situations because of the edge cases when acks are dropped.
So please if you do implement this system yourself, setup a soak test with terrible network conditions to make sure your ack system is working correctly. You’ll find such a soak test in the example source code for this article, and the open source network libraries reliable.io and yojimbo which also implement this technique.
但數(shù)據(jù)包丟失的突發(fā)情況確實(shí)會(huì)發(fā)生,所以需要注意以下幾點(diǎn):
1.如果你收到了對(duì)第 n 個(gè)數(shù)據(jù)包的確認(rèn)(ack),那么這個(gè)數(shù)據(jù)包肯定已經(jīng)被接收了。
2.如果你沒有收到確認(rèn),該數(shù)據(jù)包很可能沒有被接收。但也有可能已經(jīng)接收了,只是確認(rèn)沒能傳達(dá)過來。這種情況極為罕見。
根據(jù)我的經(jīng)驗(yàn),沒必要發(fā)送“完美”的確認(rèn)。在一個(gè)極少丟失確認(rèn)的系統(tǒng)之上構(gòu)建可靠性機(jī)制并不會(huì)造成嚴(yán)重問題。但這確實(shí)為測(cè)試系統(tǒng)在所有場景下是否正常工作帶來了挑戰(zhàn),特別是在某些邊緣情況中 ack 丟失時(shí)。
所以,如果你打算自己實(shí)現(xiàn)這個(gè)系統(tǒng),請(qǐng)務(wù)必在極差的網(wǎng)絡(luò)條件下進(jìn)行 soak 測(cè)試,確保你的確認(rèn)機(jī)制能夠正常工作。你可以在本文的示例源碼中找到這樣的 soak 測(cè)試,以及在同樣實(shí)現(xiàn)了這種技術(shù)的開源網(wǎng)絡(luò)庫 reliable.io 和 yojimbo 中找到。
Sequence Buffers
To implement this ack system we need a data structure on the sender side to track whether a packet has been acked so we can ignore redundant acks (each packet is acked multiple times via ack_bits. We also need a data structure on the receiver side to keep track of which packets have been received so we can fill in the ack_bits value in the packet header.
The data structure should have the following properties:
- Constant time insertion (inserts may be random, for example out of order packets…)
- Constant time query if an entry exists given a packet sequence number
- Constant time access for the data stored for a given packet sequence number
- Constant time removal of entries
為了實(shí)現(xiàn)這個(gè) ack 系統(tǒng),我們需要在發(fā)送端使用一種數(shù)據(jù)結(jié)構(gòu)來追蹤哪些數(shù)據(jù)包已經(jīng)被確認(rèn),以便忽略多余的 ack(因?yàn)槊總€(gè)數(shù)據(jù)包會(huì)通過 ack_bits 被多次確認(rèn))。同時(shí),在接收端我們也需要一個(gè)數(shù)據(jù)結(jié)構(gòu),用來追蹤哪些數(shù)據(jù)包已經(jīng)被接收,以便填寫包頭中的 ack_bits 字段。
這個(gè)數(shù)據(jù)結(jié)構(gòu)應(yīng)當(dāng)具備以下特性:
-
常數(shù)時(shí)間的插入(插入可能是隨機(jī)的,例如亂序接收的數(shù)據(jù)包)
-
常數(shù)時(shí)間的存在性查詢(給定一個(gè)數(shù)據(jù)包序列號(hào),查詢是否存在對(duì)應(yīng)條目)
-
常數(shù)時(shí)間的訪問(根據(jù)數(shù)據(jù)包序列號(hào)訪問存儲(chǔ)的數(shù)據(jù))
-
常數(shù)時(shí)間的刪除(移除條目)
You might be thinking. Oh of course, hash table. But there’s a much simpler way:
你可能會(huì)想:“哦,當(dāng)然,用哈希表?!钡鋵?shí),有一種更簡單的方法:
const int BufferSize = 1024; uint32_t sequence_buffer[BufferSize]; struct PacketData { bool acked; }; PacketData packet_data[BufferSize]; PacketData * GetPacketData( uint16_t sequence ) { const int index = sequence % BufferSize; if ( sequence_buffer[index] == sequence ) return &packet_data[index]; else return NULL; }
As you can see the trick here is a rolling buffer indexed by sequence number: 正如你所看到的,這里的技巧是使用一個(gè)按序列號(hào)索引的滾動(dòng)緩沖區(qū):
const int index = sequence % BufferSize;
This works because we don’t care about being destructive to old entries. As the sequence number increases older entries are naturally overwritten as we insert new ones. The sequence_buffer[index] value is used to test if the entry at that index actually corresponds to the sequence number you’re looking for. A sequence buffer value of 0xFFFFFFFF indicates an empty entry and naturally returns NULL for any sequence number query without an extra branch.
When entries are added in order like a send queue, all that needs to be done on insert is to update the sequence buffer value to the new sequence number and overwrite the data at that index:
之所以這樣有效,是因?yàn)槲覀儾魂P(guān)心破壞舊的條目。隨著序列號(hào)的增加,舊的條目會(huì)隨著新條目的插入自然被覆蓋。sequence_buffer[index] 的值用于測(cè)試該索引處的條目是否確實(shí)對(duì)應(yīng)你正在查找的序列號(hào)。一個(gè)值為 0xFFFFFFFF 的序列緩沖區(qū)條目表示該位置為空,并且對(duì)于任何序列號(hào)查詢,它會(huì)自然地返回 NULL,無需額外的分支。
當(dāng)條目按順序添加(像發(fā)送隊(duì)列一樣)時(shí),插入時(shí)只需執(zhí)行以下操作:更新序列緩沖區(qū)的值為新的序列號(hào),并覆蓋該索引處的數(shù)據(jù):
PacketData & InsertPacketData( uint16_t sequence ) { const int index = sequence % BufferSize; sequence_buffer[index] = sequence; return packet_data[index]; }
Unfortunately, on the receive side packets arrive out of order and some are lost. Under ridiculously high packet loss (99%) I’ve seen old sequence buffer entries stick around from before the previous sequence number wrap at 65535 and break my ack logic (leading to false acks and broken reliability where the sender thinks the other side has received something they haven’t…).
The solution to this problem is to walk between the previous highest insert sequence and the new insert sequence (if it is more recent) and clear those entries in the sequence buffer to 0xFFFFFFFF. Now in the common case, insert is very close to constant time, but worst case is linear where n is the number of sequence entries between the previous highest insert sequence and the current insert sequence.
不幸的是,在接收端,數(shù)據(jù)包是亂序到達(dá)的,并且有些數(shù)據(jù)包會(huì)丟失。在極高的數(shù)據(jù)包丟失率(99%)下,我曾見過舊的序列緩沖區(qū)條目依然存在,這些條目來自之前序列號(hào)溢出(65535)之前的狀態(tài),導(dǎo)致我的確認(rèn)邏輯出現(xiàn)問題(導(dǎo)致錯(cuò)誤的確認(rèn)和破壞的可靠性,發(fā)送端錯(cuò)誤地認(rèn)為對(duì)方已經(jīng)接收到某些數(shù)據(jù)包,而實(shí)際上他們并沒有收到……)。
解決這個(gè)問題的方法是,在前一個(gè)最高插入序列和當(dāng)前插入序列之間(如果當(dāng)前插入序列更大)遍歷,并將這些序列緩沖區(qū)條目清除為 0xFFFFFFFF。這樣,在常見情況下,插入操作接近常數(shù)時(shí)間,但最壞情況是線性時(shí)間,其中 n 是前一個(gè)最高插入序列與當(dāng)前插入序列之間的序列條目數(shù)。
Before we move on I would like to note that you can do much more with this data structure than just acks. For example, you could extend the per-packet data to include time sent:
在繼續(xù)之前,我想指出,你可以使用這個(gè)數(shù)據(jù)結(jié)構(gòu)做更多的事情,而不僅僅是處理確認(rèn)。例如,你可以擴(kuò)展每個(gè)數(shù)據(jù)包的數(shù)據(jù),加入發(fā)送時(shí)間:
struct PacketData { bool acked; double send_time; };
With this information you can create your own estimate of round trip time by comparing send time to current time when packets are acked and taking an exponentially smoothed moving average. You can even look at packets in the sent packet sequence buffer older than your RTT estimate (you should have received an ack for them by now…) to create your own packet loss estimate.
有了這些信息,你可以通過將發(fā)送時(shí)間與數(shù)據(jù)包被確認(rèn)時(shí)的當(dāng)前時(shí)間進(jìn)行比較,并采用指數(shù)平滑的移動(dòng)平均方法,來創(chuàng)建你自己的往返時(shí)間(RTT)估算。你甚至可以查看發(fā)送數(shù)據(jù)包序列緩沖區(qū)中比你的 RTT 估算時(shí)間更早的包(你現(xiàn)在應(yīng)該已經(jīng)收到了它們的確認(rèn)……),從而創(chuàng)建你自己的數(shù)據(jù)包丟失估算。
Ack Algorithm
Now that we have the data structures and packet header, here is the algorithm for implementing packet level acks:
On packet send:
-
Insert an entry for for the current send packet sequence number in the sent packet sequence buffer with data indicating that it hasn’t been acked yet
-
Generate ack and ack_bits from the contents of the local received packet sequence buffer and the most recent received packet sequence number
-
Fill the packet header with sequence, ack and ack_bits
-
Send the packet and increment the send packet sequence number
On packet receive:
-
Read in sequence from the packet header
-
If sequence is more recent than the previous most recent received packet sequence number, update the most recent received packet sequence number
-
Insert an entry for this packet in the received packet sequence buffer
-
Decode the set of acked packet sequence numbers from ack and ack_bits in the packet header.
-
Iterate across all acked packet sequence numbers and for any packet that is not already acked call OnPacketAcked( uint16_t sequence ) and mark that packet as acked in the sent packet sequence buffer.
現(xiàn)在我們已經(jīng)有了數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)包頭部,以下是實(shí)現(xiàn)數(shù)據(jù)包級(jí)別確認(rèn)的算法:
發(fā)送數(shù)據(jù)包時(shí):
-
在已發(fā)送數(shù)據(jù)包序列緩沖區(qū)中插入當(dāng)前發(fā)送數(shù)據(jù)包的序列號(hào)條目,數(shù)據(jù)表明該數(shù)據(jù)包尚未被確認(rèn)。
-
從本地接收到的數(shù)據(jù)包序列緩沖區(qū)以及最近接收到的數(shù)據(jù)包序列號(hào)中生成
ack和ack_bits。 -
將數(shù)據(jù)包頭部填充為
sequence、ack和ack_bits。 -
發(fā)送數(shù)據(jù)包并增加發(fā)送數(shù)據(jù)包的序列號(hào)。
接收數(shù)據(jù)包時(shí):
-
從數(shù)據(jù)包頭部讀取
sequence。 -
如果該序列號(hào)比先前接收到的最新序列號(hào)更大,則更新最新接收到的序列號(hào)。
-
在接收到的數(shù)據(jù)包序列緩沖區(qū)中插入該數(shù)據(jù)包的條目。
-
解碼數(shù)據(jù)包頭部中的
ack和ack_bits,獲取已確認(rèn)的數(shù)據(jù)包序列號(hào)集合。 -
遍歷所有已確認(rèn)的數(shù)據(jù)包序列號(hào),對(duì)于任何尚未被確認(rèn)的包,調(diào)用
OnPacketAcked(uint16_t sequence),并在已發(fā)送數(shù)據(jù)包序列緩沖區(qū)中將該數(shù)據(jù)包標(biāo)記為已確認(rèn)。
Importantly this algorithm is done on both sides so if you have a client and a server then each side of the connection runs the same logic, maintaining its own sequence number for sent packets, tracking most recent received packet sequence # from the other side and a sequence buffer of received packets from which it generates sequence, ack and ack_bits to send to the other side.
And that’s really all there is to it. Now you have a callback when a packet is received by the other side: OnPacketAcked. The main benefit of this ack system is now that you know which packets were received, you can build any reliability system you want on top. It’s not limited to just reliable-ordered messages. For example, you could use it to implement delta encoding on a per-object basis.
重要的是,這個(gè)算法是在連接的雙方都執(zhí)行的,所以如果你有一個(gè)客戶端和一個(gè)服務(wù)器,那么連接的每一側(cè)都會(huì)運(yùn)行相同的邏輯:
-
維護(hù)自己發(fā)送數(shù)據(jù)包的序列號(hào);
-
跟蹤來自對(duì)方的最近接收數(shù)據(jù)包的序列號(hào);
-
擁有一個(gè)接收數(shù)據(jù)包的序列緩沖區(qū),并根據(jù)該緩沖區(qū)生成
sequence、ack和ack_bits,然后發(fā)送給對(duì)方。
這就是整個(gè)流程的全部內(nèi)容了?,F(xiàn)在你擁有了一個(gè)回調(diào)函數(shù) OnPacketAcked,當(dāng)某個(gè)數(shù)據(jù)包被對(duì)方接收時(shí)就會(huì)觸發(fā)它。
這個(gè) ack 系統(tǒng)的主要優(yōu)勢(shì)在于:你明確知道哪些數(shù)據(jù)包被成功接收了,因此可以在此基礎(chǔ)上構(gòu)建任何你想要的可靠性系統(tǒng)。它不僅限于可靠有序消息(reliable-ordered messages),例如你可以用它來為每個(gè)對(duì)象實(shí)現(xiàn)差異編碼(delta encoding)。
Message Objects
Messages are small objects (smaller than packet size, so that many will fit in a typical packet) that know how to serialize themselves. In my system they perform serialization using a unified serialize functionunified serialize function.
The serialize function is templated so you write it once and it handles read, write and measure.
Yes. Measure. One of my favorite tricks is to have a dummy stream class called MeasureStream that doesn’t do any actual serialization but just measures the number of bits that would be written if you called the serialize function. This is particularly useful for working out which messages are going to fit into your packet, especially when messages themselves can have arbitrarily complex serialize functions.
消息對(duì)象
消息是一些小型對(duì)象(小于數(shù)據(jù)包的大小,因此在一個(gè)典型的數(shù)據(jù)包中可以容納多個(gè)),它們知道如何自行進(jìn)行序列化。在我的系統(tǒng)中,它們通過一個(gè)統(tǒng)一的 serialize 函數(shù)來執(zhí)行序列化操作。
這個(gè) serialize 函數(shù)是模板化的,因此你只需要寫一次,它就能同時(shí)處理讀?。╮ead)、寫入(write)和測(cè)量(measure)。
是的,測(cè)量。
我最喜歡的技巧之一是使用一個(gè)名為 MeasureStream 的虛擬流類,它不會(huì)執(zhí)行任何實(shí)際的序列化操作,而只是測(cè)量如果你調(diào)用了 serialize 函數(shù)將會(huì)寫入多少位(bit)。
當(dāng)消息本身的 serialize 函數(shù)可能任意復(fù)雜時(shí),這個(gè)技巧在判斷哪些消息可以裝入你的數(shù)據(jù)包中時(shí)尤其有用。
struct TestMessage : public Message { uint32_t a,b,c; TestMessage() { a = 0; b = 0; c = 0; } template <typename Stream> bool Serialize( Stream & stream ) { serialize_bits( stream, a, 32 ); serialize_bits( stream, b, 32 ); serialize_bits( stream, c, 32 ); return true; } virtual SerializeInternal( WriteStream & stream ) { return Serialize( stream ); } virtual SerializeInternal( ReadStream & stream ) { return Serialize( stream ); } virtual SerializeInternal( MeasureStream & stream ) { return Serialize( stream ); } };
The trick here is to bridge the unified templated serialize function (so you only have to write it once) to virtual serialize methods by calling into it from virtual functions per-stream type. I usually wrap this boilerplate with a macro, but it’s expanded in the code above so you can see what’s going on.
Now when you have a base message pointer you can do this and it just works:
這里的技巧在于:通過在每種流類型的虛函數(shù)中調(diào)用統(tǒng)一的模板化 serialize 函數(shù),將這個(gè)統(tǒng)一函數(shù)與虛函數(shù)機(jī)制銜接起來(這樣你只需要寫一次 serialize 函數(shù))。
我通常會(huì)用一個(gè)宏把這些樣板代碼包裝起來,但在上面的代碼中是直接展開的,這樣你可以清楚地看到它是怎么工作的。
現(xiàn)在,當(dāng)你擁有一個(gè)指向基類的消息指針時(shí),你就可以像下面這樣使用它,并且一切都能正常運(yùn)作:
Message * message = CreateSomeMessage();
message->SerializeInternal( stream );
An alternative if you know the full set of messages at compile time is to implement a big switch statement on message type casting to the correct message type before calling into the serialize function for each type. I’ve done this in the past on console platform implementations of this message system (eg. PS3 SPUs) but for applications today (2016) the overhead of virtual functions is neglible.
Messages derive from a base class that provides a common interface such as serialization, querying the type of a message and reference counting. Reference counting is necessary because messages are passed around by pointer and stored not only in the message send queue until acked, but also in outgoing packets which are themselves C++ structs.
This is a strategy to avoid copying data by passing both messages and packets around by pointer. Somewhere else (ideally on a separate thread) packets and the messages inside them are serialized to a buffer. Eventually, when no references to a message exist in the message send queue (the message is acked) and no packets including that message remain in the packet send queue, the message is destroyed.
如果你在編譯時(shí)就知道所有的消息類型,另一種做法是使用一個(gè) 大 switch 語句,根據(jù)消息類型將其轉(zhuǎn)換為正確的消息類型,然后再調(diào)用各自的 serialize 函數(shù)。我過去在一些游戲主機(jī)平臺(tái)(例如 PS3 的 SPU)上實(shí)現(xiàn)這個(gè)消息系統(tǒng)時(shí)就是這么做的,但在如今的應(yīng)用中(2016 年),虛函數(shù)的開銷可以忽略不計(jì)。
所有消息都派生自一個(gè)基類,該基類提供一個(gè)統(tǒng)一的接口,比如序列化、查詢消息類型,以及引用計(jì)數(shù)。
引用計(jì)數(shù)是必須的,因?yàn)橄⑹峭ㄟ^指針傳遞的,而且不僅在消息發(fā)送隊(duì)列中會(huì)被保存直到被確認(rèn)(acked),還會(huì)被保存在數(shù)據(jù)包中,而數(shù)據(jù)包本身是 C++ 結(jié)構(gòu)體。
這是一種避免復(fù)制數(shù)據(jù)的策略,通過指針傳遞消息和數(shù)據(jù)包。
在系統(tǒng)的其它部分(理想情況下是在另一個(gè)線程中),這些數(shù)據(jù)包和它們包含的消息會(huì)被序列化成緩沖區(qū)。
最終,當(dāng)消息在發(fā)送隊(duì)列中不再被引用(表示已經(jīng)收到 ack),并且沒有任何包含該消息的數(shù)據(jù)包仍保留在發(fā)送隊(duì)列中時(shí),該消息就會(huì)被銷毀。
We also need a way to create messages. I do this with a message factory class with a virtual function overriden to create a message by type. It’s good if the packet factory also knows the total number of message types, so we can serialize a message type over the network with tight bounds and discard malicious packets with message type values outside of the valid range:
我們還需要一種方式來創(chuàng)建消息。我是通過一個(gè)消息工廠類(message factory class)來實(shí)現(xiàn)的,其中包含一個(gè)虛函數(shù),通過覆蓋該函數(shù)可以根據(jù)類型創(chuàng)建不同的消息。
如果這個(gè)工廠類還能知道消息類型的總數(shù),那就更好了。這樣我們就可以在網(wǎng)絡(luò)上傳輸消息類型時(shí)使用緊湊的范圍進(jìn)行序列化,并且丟棄那些包含非法消息類型值的惡意數(shù)據(jù)包。
enum TestMessageTypes { TEST_MESSAGE_A, TEST_MESSAGE_B, TEST_MESSAGE_C, TEST_MESSAGE_NUM_TYPES }; // message definitions omitted class TestMessageFactory : public MessageFactory { public: Message * Create( int type ) { switch ( type ) { case TEST_MESSAGE_A: return new TestMessageA(); case TEST_MESSAGE_B: return new TestMessageB(); case TEST_MESSAGE_C: return new TestMessageC(); } } virtual int GetNumTypes() const { return TEST_MESSAGE_NUM_TYPES; } };
Again, this is boilerplate and is usually wrapped by macros, but underneath this is what’s going on.
同樣地,這些代碼本質(zhì)上是樣板代碼(boilerplate),通常會(huì)被宏(macros)包裝起來,但底層實(shí)際執(zhí)行的邏輯就是這些。
Reliable Ordered Message Algorithm
The algorithm for sending reliable-ordered messages is as follows:
On message send:
-
Measure how many bits the message serializes to using the measure stream
-
Insert the message pointer and the # of bits it serializes to into a sequence buffer indexed by message id. Set the time that message has last been sent to -1
-
Increment the send message id
On packet send:
-
Walk across the set of messages in the send message sequence buffer between the oldest unacked message id and the most recent inserted message id from left -> right (increasing message id order).
-
Never send a message id that the receiver can’t buffer or you’ll break message acks (since that message won’t be buffered, but the packet containing it will be acked, the sender thinks the message has been received, and will not resend it). This means you must never send a message id equal to or more recent than the oldest unacked message id plus the size of the message receive buffer.
-
For any message that hasn’t been sent in the last 0.1 seconds and fits in the available space we have left in the packet, add it to the list of messages to send. Messages on the left (older messages) naturally have priority due to the iteration order.
-
Include the messages in the outgoing packet and add a reference to each message. Make sure the packet destructor decrements the ref count for each message.
-
Store the number of messages in the packet n and the array of message ids included in the packet in a sequence buffer indexed by the outgoing packet sequence number so they can be used to map packet level acks to the set of messages included in that packet.
-
Add the packet to the packet send queue.
On packet receive:
-
Walk across the set of messages included in the packet and insert them in the receive message sequence buffer indexed by their message id.
-
The ack system automatically acks the packet sequence number we just received.
On packet ack:
-
Look up the set of messages ids included in the packet by sequence number.
-
Remove those messages from the message send queue if they exist and decrease their ref count.
-
Update the last unacked message id by walking forward from the previous unacked message id in the send message sequence buffer until a valid message entry is found, or you reach the current send message id. Whichever comes first.
On message receive:
-
Check the receive message sequence buffer to see if a message exists for the current receive message id.
-
If the message exists, remove it from the receive message sequence buffer, increment the receive message id and return a pointer to the message.
-
Otherwise, no message is available to receive. Return NULL.
In short, messages keep getting included in packets until a packet containing that message is acked.
We use a data structure on the sender side to map packet sequence numbers to the set of message ids to ack.
Messages are removed from the send queue when they are acked. On the receive side, messages arriving out of order are stored in a sequence buffer indexed by message id, which lets us receive them in the order they were sent.
可靠有序消息算法(Reliable Ordered Message Algorithm)
發(fā)送可靠有序消息的算法如下:
當(dāng)發(fā)送消息時(shí):
-
使用測(cè)量流(measure stream)來測(cè)量該消息序列化后占用的比特?cái)?shù)。
-
將消息指針和其序列化所占用的比特?cái)?shù)插入一個(gè)以消息 ID 為索引的序列緩沖區(qū)中,并將該消息的“上次發(fā)送時(shí)間”設(shè)為 -1。
-
遞增發(fā)送消息的 ID。
當(dāng)發(fā)送數(shù)據(jù)包時(shí):
-
遍歷發(fā)送消息序列緩沖區(qū)中,從最舊的未確認(rèn)消息 ID 到最新插入消息 ID 的消息,順序?yàn)閺淖蟮接遥聪?ID 遞增順序)。
-
切勿發(fā)送接收方無法緩沖的消息 ID,否則將破壞消息確認(rèn)機(jī)制:該消息不會(huì)被緩沖,但包含它的數(shù)據(jù)包可能會(huì)被確認(rèn),于是發(fā)送方會(huì)誤以為該消息已被接收,從而不再重發(fā)它。
-
這意味著你**絕不能發(fā)送消息 ID 大于或等于“最舊未確認(rèn)消息 ID + 消息接收緩沖區(qū)大小”**的消息。
-
-
對(duì)于任何“在過去 0.1 秒內(nèi)未發(fā)送過”且“能裝進(jìn)當(dāng)前數(shù)據(jù)包剩余空間”的消息,將其加入待發(fā)送消息列表。由于遍歷順序是從舊到新,舊消息天然擁有優(yōu)先級(jí)。
-
將這些消息包含在即將發(fā)送的數(shù)據(jù)包中,并為每條消息添加一個(gè)引用計(jì)數(shù)。確保數(shù)據(jù)包的析構(gòu)函數(shù)會(huì)減少每條消息的引用計(jì)數(shù)。
-
將數(shù)據(jù)包中消息數(shù)量
n和包含的消息 ID 數(shù)組存入一個(gè)以該數(shù)據(jù)包序列號(hào)為索引的序列緩沖區(qū)中,以便后續(xù)可以通過包級(jí)別的確認(rèn)(ack)來映射出這些消息。 -
將數(shù)據(jù)包加入發(fā)送隊(duì)列。
當(dāng)接收數(shù)據(jù)包時(shí):
-
遍歷該數(shù)據(jù)包中包含的所有消息,并根據(jù)它們的消息 ID 將其插入到接收消息序列緩沖區(qū)中。
-
Ack系統(tǒng)會(huì)自動(dòng)確認(rèn)剛剛接收到的數(shù)據(jù)包的序列號(hào)。
當(dāng)收到數(shù)據(jù)包確認(rèn)(ack)時(shí):
-
根據(jù)數(shù)據(jù)包的序列號(hào),查找該包中包含的消息 ID 集合。
-
如果這些消息仍存在于消息發(fā)送隊(duì)列中,則將它們移除,并減少它們的引用計(jì)數(shù)。
-
通過從之前的最舊未確認(rèn)消息 ID 開始向前推進(jìn),直到找到一個(gè)有效的消息條目,或達(dá)到當(dāng)前的發(fā)送消息 ID(以先到者為準(zhǔn)),來更新最新的未確認(rèn)消息 ID。
當(dāng)接收消息時(shí):
-
查看接收消息序列緩沖區(qū)中是否存在當(dāng)前的接收消息 ID 對(duì)應(yīng)的消息。
-
如果存在該消息:
-
從接收緩沖區(qū)中移除該消息;
-
增加接收消息 ID;
-
返回該消息的指針。
-
-
如果不存在該消息,則當(dāng)前沒有可接收的消息,返回 NULL。
下面我通過一個(gè)具體的例子來說明「可靠有序消息算法(Reliable Ordered Message Algorithm)」是如何工作的:
?? 假設(shè)場景:
你正在開發(fā)一個(gè)多人射擊游戲,客戶端需要將以下玩家操作消息可靠且有序地發(fā)送給服務(wù)器:
-
M0: 移動(dòng)(向前) -
M1: 開火 -
M2: 切換武器
1. 發(fā)送消息(On message send):
每條消息先通過 MeasureStream 估算序列化后占多少比特。
-
插入
M0到發(fā)送消息緩沖區(qū),消息ID為 0,序列化大小為 12 bits,標(biāo)記上次發(fā)送時(shí)間為-1。 -
插入
M1(ID=1) 和M2(ID=2) 同理。 -
發(fā)送消息ID計(jì)數(shù)器依次變?yōu)椋? → 2 → 3...
2. 發(fā)送數(shù)據(jù)包(On packet send):
假設(shè)當(dāng)前最大接收端能緩存 256 條消息,且最早未確認(rèn)消息 ID 是 0。
從發(fā)送消息緩沖區(qū)中按順序掃描:
-
如果消息還沒在 0.1 秒內(nèi)發(fā)送過,并且可以裝進(jìn)當(dāng)前剩余的包空間,則打包進(jìn)去。
-
此時(shí):
M0和M1都符合條件,被打包進(jìn)Packet #100。 -
Packet header 記錄了:消息個(gè)數(shù) = 2,消息 ID = [0, 1]。
-
把這個(gè)消息ID數(shù)組記錄在一個(gè)
packet sequence buffer中(按包序號(hào)記錄消息ID們)。
3. 接收數(shù)據(jù)包(On packet receive):
服務(wù)器收到 Packet #100:
-
解析出包含
M0,M1,將它們按 message id 插入接收消息緩沖區(qū)。 -
系統(tǒng)自動(dòng)確認(rèn)接收到
Packet #100。
4. 確認(rèn)包(On packet ack):
客戶端收到 ack(確認(rèn))表示 Packet #100 被成功接收。
-
查表得知該包包含
[0, 1]這兩個(gè)消息。 -
從
發(fā)送消息緩沖區(qū)中移除M0,M1,減少引用計(jì)數(shù),消息可被釋放。 -
更新最舊未確認(rèn)消息ID為 2(即
M2)。
5. 接收消息(On message receive):
服務(wù)器應(yīng)用層想要處理消息:
-
檢查
接收消息緩沖區(qū)是否有消息 ID=0。 -
有 → 取出
M0,遞增接收消息ID為 1。 -
再取出
M1,接收ID 變?yōu)?2。 -
如果下一個(gè)包還未到(如
M2沒收到),返回 NULL 等待。
總結(jié):
-
每個(gè)消息被持續(xù)打包發(fā)送,直到確認(rèn)(ack)它被成功收到。
-
可靠性:只會(huì)在收到確認(rèn)后才從隊(duì)列移除消息。
-
有序性:接收端按消息ID排好序緩沖,只有按順序才能取出。
你可以把它理解為一種封裝在數(shù)據(jù)包發(fā)送系統(tǒng)上的“可靠消息傳輸層”,但又比 TCP 更靈活 —— 因?yàn)槟憧梢曰谒鼘?shí)現(xiàn):按對(duì)象 delta 更新、語義自定義重傳等。
The End Result
This provides the user with an interface that looks something like this on send:
這為用戶在發(fā)送時(shí)提供了一個(gè)類似如下的接口:
TestMessage * message = (TestMessage*) factory.Create( TEST_MESSAGE ); if ( message ) { message->a = 1; message->b = 2; message->c = 3; connection.SendMessage( message ); }
And on the receive side:
while ( true ) { Message * message = connection.ReceiveMessage(); if ( !message ) break; if ( message->GetType() == TEST_MESSAGE ) { TestMessage * testMessage = (TestMessage*) message; // process test message } factory.Release( message ); }
Which is flexible enough to implement whatever you like on top of it.

浙公網(wǎng)安備 33010602011771號(hào)