BFT共識協議Java簡單實現
關于拜占庭容錯
拜占庭容錯指的是若干節點之間要達成信息一致,但是其中節點可能會作惡,比如發送不一致的假消息,這一點與Raft等以前熟知的非拜占庭容錯協議不一樣,非拜占庭容錯場景里的節點可能會故障,但不會故意作惡。
BFT協議里要求有3N+1個節點,那么如果誠實節點達到2N+1,則整個節點網絡可以達到共識。
分為3個階段:PRE-PREPARE, PREPARE, COMMIT
代碼
public class Message {
enum Type {PRE_PREPARE, PREPARE, COMMIT};
Type type; //消息類型
int viewNumber; //任期,主節點不換任期不變
int seqNumber; //消息在某任期下的序列號
String digest; //消息摘要
int senderId; //發送者ID,消息是誰發的
public Message(Type type, int viewNumber, int seqNumber, String digest, int senderId){
this.type = type;
this.viewNumber = viewNumber;
this.seqNumber = seqNumber;
this.digest = digest;
this.senderId = senderId;
}
import com.alibaba.fastjson2.JSON;
import java.util.HashMap;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* 模擬節點
* */
public class Node implements Runnable{
int id;
public boolean primary;
//List<Node> allNodes; // 模擬廣播
Queue<Message> preQueue = new LinkedBlockingQueue<Message>();
Queue<Message> prepareQueue = new LinkedBlockingQueue<Message>();
Queue<Message> commitQueue = new LinkedBlockingQueue<Message>();
HashMap<String, Integer> msgMap = new HashMap<>(); //key viewnumber+seqNubmer+type+digest value count
HashMap<String, Boolean> msgBroadMap = new HashMap<>(); //key viewnumber+seqNubmer+type+digest value bool
public Node (int id, boolean primary){
this.id = id;
this.primary = primary;
}
//接收消息
void receive(Message message) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String key = getMessageKey(message);
if(null==msgMap.get(key)){
msgMap.put(key, 1);
}else {
msgMap.put(key, msgMap.get(key)+1);
}
msgBroadMap.put(key, false);
//System.out.println(msgMap);
//System.out.println(msgBroadMap);
switch (message.type){
case PRE_PREPARE: preQueue.add(message); break;
case PREPARE: prepareQueue.add(message); break;
case COMMIT: commitQueue.add(message); break;
}
}
private boolean isMessageTrue(Message msg){
String key = getMessageKey(msg);
if(null!=msgMap.get(key)){
if(msgMap.get(key) >= 3) return true;
}
return false;
}
// 節點收PRE_PREPARE消息,到網絡中廣播 prepare消息
void handlePrePrepare(Message msg) {
System.out.println(this.id + "節點收到PRE-PREPARE消息" + JSON.toJSONString(msg));
NetwokContext.broadcast(msg, Message.Type.PREPARE, this.id);
String key = getMessageKey(msg);
msgBroadMap.put(key, true);
}
// 節點收到prepare消息,判斷一致的消息數量是否達到2n+1,是則廣播commit消息
void handlePrepare(Message msg) {
String key = getMessageKey(msg);
if(isMessageTrue(msg) && !msgBroadMap.get(key)){
System.out.println(this.id + "節點收到PREPARE消息" + JSON.toJSONString(msg) + "后,已收到滿足數量的一致消息,廣播COMMIT消息");
NetwokContext.broadcast(msg, Message.Type.COMMIT, this.id);
msgBroadMap.put(key, true);
}
}
//節點收到commit消息,判斷一致的消息數量是否達到2n+1,是則最終提交
void handleCommit(Message msg){
String key = getMessageKey(msg);
if(isMessageTrue(msg) && !msgBroadMap.get(key)) {
System.out.println(this.id + "節點收到COMMIT消息" + JSON.toJSONString(msg) + "后,已收到滿足數量的一致消息,進行本地提交執行");
msgBroadMap.put(key, true);
}
}
@Override
public void run() {
while (true){
if(!preQueue.isEmpty()) handlePrePrepare(preQueue.poll());
if(!prepareQueue.isEmpty()) handlePrepare(prepareQueue.poll());
if(!commitQueue.isEmpty()) handleCommit(commitQueue.poll());
try {
Thread.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private String getMessageKey(Message msg){
return msg.viewNumber+""+ msg.seqNumber + "" + msg.type + msg.digest;
}
}
上面是消息類和節點類,接下來是這個4節點網絡環境的模擬上下文,以及測試程序。
/**
* 模擬節點中保存的網絡上下文
* */
public class NetwokContext {
public static List<Node> allNodes = new ArrayList<>(); // 模擬廣播
public static void broadcast(Message msg, Message.Type type,int senderId){
for(Node node : allNodes){
Message amsg = new Message(type, msg.viewNumber, msg.seqNumber, msg.digest, senderId);
node.receive(amsg);
}
}
}
public class PBFTdemo {
public static void main(String[] args) throws Exception{
// 模擬 4 個節點,其中 Node0 是主節點
Node node0 = new Node(0, true);
Node node1 = new Node(1, false);
Node node2 = new Node(2, false);
Node node3 = new Node(3, false);
NetwokContext.allNodes.add(node0);
NetwokContext.allNodes.add(node1);
NetwokContext.allNodes.add(node2);
NetwokContext.allNodes.add(node3);
Thread thread0 = new Thread(node0);
Thread thread1 = new Thread(node1);
Thread thread2 = new Thread(node2);
Thread thread3 = new Thread(node3);
thread0.setDaemon(true);
thread1.setDaemon(true);
thread2.setDaemon(true);
thread3.setDaemon(true);
thread0.start();
thread1.start();
thread2.start();
thread3.start();
case1();
//case2();
TimeUnit.SECONDS.sleep(10);
}
//正常情況
private static void case1(){
String digest = "提刀上洛"; // 簡化
Message prePrepare = new Message(Message.Type.PRE_PREPARE, 0, 1, digest, 0);
for (Node node : NetwokContext.allNodes) {
node.receive(prePrepare);
}
}
//主節點作惡
private static void case2(){
NetwokContext.allNodes.get(0).receive(new Message(Message.Type.PRE_PREPARE, 0, 1, "提刀上洛", 0));
NetwokContext.allNodes.get(1).receive(new Message(Message.Type.PRE_PREPARE, 0, 1, "提刀上洛", 0));
NetwokContext.allNodes.get(2).receive(new Message(Message.Type.PRE_PREPARE, 0, 1, "跑路", 0));
NetwokContext.allNodes.get(3).receive(new Message(Message.Type.PRE_PREPARE, 0, 1, "跑路", 0));
}
}
case1運行結果:
1節點收到PRE-PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
0節點收到PRE-PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
2節點收到PRE-PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
3節點收到PRE-PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
0節點收到PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PREPARE","viewNumber":0}后,已收到滿足數量的一致消息,廣播COMMIT消息
1節點收到PREPARE消息{"digest":"提刀上洛","senderId":1,"seqNumber":1,"type":"PREPARE","viewNumber":0}后,已收到滿足數量的一致消息,廣播COMMIT消息
2節點收到PREPARE消息{"digest":"提刀上洛","senderId":1,"seqNumber":1,"type":"PREPARE","viewNumber":0}后,已收到滿足數量的一致消息,廣播COMMIT消息
3節點收到PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PREPARE","viewNumber":0}后,已收到滿足數量的一致消息,廣播COMMIT消息
0節點收到COMMIT消息{"digest":"提刀上洛","senderId":1,"seqNumber":1,"type":"COMMIT","viewNumber":0}后,已收到滿足數量的一致消息,進行本地提交執行
1節點收到COMMIT消息{"digest":"提刀上洛","senderId":1,"seqNumber":1,"type":"COMMIT","viewNumber":0}后,已收到滿足數量的一致消息,進行本地提交執行
2節點收到COMMIT消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"COMMIT","viewNumber":0}后,已收到滿足數量的一致消息,進行本地提交執行
3節點收到COMMIT消息{"digest":"提刀上洛","senderId":1,"seqNumber":1,"type":"COMMIT","viewNumber":0}后,已收到滿足數量的一致消息,進行本地提交執行
case2運行結果:
0節點收到PRE-PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
1節點收到PRE-PREPARE消息{"digest":"提刀上洛","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
2節點收到PRE-PREPARE消息{"digest":"跑路","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
3節點收到PRE-PREPARE消息{"digest":"跑路","senderId":0,"seqNumber":1,"type":"PRE_PREPARE","viewNumber":0}
浙公網安備 33010602011771號