使用Java操作Zookeeper
目錄
一、介紹
2.1、導入依賴
2.2、連接zk集群
2.3、操作數據操作
3.1、導入依賴
3.2、使用示例
一、介紹
這里主要記錄通過Java調用API來操作Zookeeper集群的數據,對于zookeeper集群的搭建或者命令,可以參考:
目前接觸到的Java操作Zookeeper,有兩套API,一套是zookeeper官方提供的(zookeeper),另外一套是封裝了官方API的API(zkClient),從描述上來看,就知道,官方的API可能不是那么好用,不然也不會在封裝。
二、zookeeper API
2.1、導入依賴
使用zookeeper官方api的時候,請保證jar包的版本,和zk集群中zk的版本相同
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.7</version>
</dependency>
2.2、連接zk集群
下面是代碼示例,兩種形式(分別使用匿名類和Lambda表達式):
package cn.ganlixin.zk;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
import java.io.IOException;
public class ZookeeperDemo {
@Test
public void connectZkCluster() throws IOException, KeeperException, InterruptedException {
// 構造方法
// ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
// 匿名對象形式
ZooKeeper zooKeeper = new ZooKeeper(
"192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181",
20000,
new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
// 發生變更的節點路徑
String path = watchedEvent.getPath();
System.out.println("path:" + path);
// 通知狀態
Watcher.Event.KeeperState state = watchedEvent.getState();
System.out.println("KeeperState:" + state);
// 事件類型
Watcher.Event.EventType type = watchedEvent.getType();
System.out.println("EventType:" + type);
}
}
);
// 關閉連接
zooKeeper.close();
// Lamdba形式
ZooKeeper zk = new ZooKeeper(
"192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181",
20000,
watchedEvent -> {
// 發生變更的節點路徑
String path = watchedEvent.getPath();
System.out.println("path:" + path);
// 通知狀態
Watcher.Event.KeeperState state = watchedEvent.getState();
System.out.println("KeeperState:" + state);
// 事件類型
Watcher.Event.EventType type = watchedEvent.getType();
System.out.println("EventType:" + type);
}
);
zk.close();
}
}
運行上面的代碼,控制臺輸出如下(輸出了兩遍,是因為創建了兩次連接)
path:null KeeperState:SyncConnected EventType:None path:null KeeperState:SyncConnected EventType:None
2.3、操作數據操作
操作Zk中的數據,方式也很簡單,只需要使用創建的zk連接,調用對應的方法即可(方法名與zk命令行中命令相同)
package cn.ganlixin.zk;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import java.io.IOException;
public class ZookeeperDemo {
@Test
public void manageData() throws KeeperException, InterruptedException, IOException {
// 創建zk連接
ZooKeeper zk = new ZooKeeper(
"192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181",
20000,
null
);
// 創建節點
zk.create("/abc", "123".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// 獲取節點數據
// getData(String path, boolean watch, Stat stat);
Stat stat = new Stat();
byte[] data = zk.getData("/abc", false, stat);
System.out.println(new String(data)); // 數據內容 123
System.out.println(stat.getDataLength()); // 節點狀態(數據長度) 3
// 對/abc進行watch
zk.getData("/abc",
watchedEvent -> {
System.out.println("path:" + watchedEvent.getPath());
System.out.println("KeeperState:" + watchedEvent.getState());
System.out.println("EventType:" + watchedEvent.getType());
},
null);
// 設置節點數據
// setData(String path, byte[] data, int version)
// 指定version為-1,表示不關心版本
zk.setData("/abc", "456".getBytes(), -1);
// 設置兩次,第二次不會觸發通知
zk.setData("/abc", "789".getBytes(), -1);
// 阻塞,以等待通知
Thread.sleep(1000);
zk.close();
}
}
上面的程序,運行輸出結果如下:
123 3 path:/abc KeeperState:SyncConnected EventType:NodeDataChanged
可以看到,只顯示了一次通知,和與預期相符。
三、zkClient API
因為Zookeeper API比較復雜,使用并不方便,所以出現了ZkClient,ZkClient對Zookeeper API進行了封裝,利用ZkClient可以更加方便地對Zookeeper進行操作。
3.1、導入依賴
因為zkClient是對zookeeper的再封裝,所以需要注意zkClient中zookeeper的版本與zk集群的版本相同,可以在maven倉庫中查看對應關系
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
3.2、使用示例
下面是個簡單的示例:
package cn.ganlixin.zk;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
import org.I0Itec.zkclient.ZkClient;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.Watcher;
import org.junit.Test;
import java.util.List;
public class ZkClientDemo {
@Test
public void testConn() throws InterruptedException {
ZkClient zkClient = new ZkClient(
"192.168.1.3:2181,192.168.1.4:2181,192.168.1.5:2181",
20000
);
// 創建節點
zkClient.createPersistent("/abc", "hello");
zkClient.createEphemeral("/xyz", "world");
zkClient.create("/opq", "world", CreateMode.EPHEMERAL_SEQUENTIAL);
String data = zkClient.readData("/abc");
System.out.println(data);
// 監聽狀態變化
zkClient.subscribeStateChanges(new IZkStateListener() {
@Override
public void handleStateChanged(Watcher.Event.KeeperState keeperState) throws Exception {
System.out.println("state:" + keeperState);
}
@Override
public void handleNewSession() throws Exception {
System.out.println("new session");
}
@Override
public void handleSessionEstablishmentError(Throwable throwable) throws Exception {
throwable.printStackTrace();
}
});
// 監聽子節點發生變化
zkClient.subscribeChildChanges("/", new IZkChildListener() {
@Override
public void handleChildChange(String path, List<String> list) throws Exception {
System.out.println("watch path:" + path);
// 輸出所有子節點
list.forEach(str -> {
System.out.println(str);
});
}
});
Thread.sleep(100000);
}
}
如需轉載,請注明文章出處,謝謝!!!
浙公網安備 33010602011771號