Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 9.1 基本使用org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话它提供以下几类主要方法  :功能 描述 create 在本地目录树中创建一个节点 delete 删除一个节点 exists 测试本地是否存在目标节点 get/set data 从目标节点上读取 / 写数据 get/set ACL 获

9.1 基本使用

org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话

它提供以下几类主要方法  :

功能

描述

create

在本地目录树中创建一个节点

delete

删除一个节点

exists

测试本地是否存在目标节点

get/set data

从目标节点上读取 / 写数据

get/set ACL

获取 / 设置目标节点访问控制列表信息

get children

检索一个子节点上的列表

sync

等待要被传送的数据

 

 

 

 

 

 

 

 

 

表 1 : ZooKeeper API 描述

9.2 增删改查znode数据

package cn.com.toto.zk;

 

import java.io.IOException;

 

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

 

public class SimpleDemo {

    //回话超时时间,设置为与系统默认时间一致

    private static final int SESSION_TIMEOUT = 30000;

    //创建ZooKeeper实例

    ZooKeeper zk;

    //创建Watcher实例

    Watcher wh = new Watcher() {

   

       @Override

       public void process(WatchedEvent event) {

           System.out.println(event.toString());

       }

    };

   

    //初始化ZooKeeper实例

    private void createZKInstance() throws IOException {

       zk = new ZooKeeper("hadoop:2181,hadoop2:2181,hadoop3:2181",SimpleDemo.SESSION_TIMEOUT,this.wh);

    }

   

    private void ZKOperations() throws KeeperException, InterruptedException {

       System.out.println("/n1. 创建 ZooKeeper 节点 (znode zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent");

       zk.create("/zoo2", "myData2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

       System.out.println("/n2.查看是否创建成功:");

       System.out.println(new String(zk.getData("/zoo2", false, null)));

       System.out.println("/n3.修改节点数据");

       zk.setData("/zoo2", "toto".getBytes(), -1);

       System.out.println("/n4.查看是否修改成功:");

       System.out.println(new String(zk.getData("/zoo2", false, null)));

       System.out.println("/n5.删除节点");

       zk.delete("/zoo2", -1);

       System.out.println("/n6.查看节点是否被删除:");

       System.out.println("节点状态:[" + zk.exists("/zoo2", false) + "]");

    }

   

    private void ZKClose() throws InterruptedException {

       zk.close();

    }

   

    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {

       SimpleDemo dm = new SimpleDemo();

       dm.createZKInstance();

       dm.ZKOperations();

       dm.ZKClose();

    }

}

运行结果:

/n1. 创建 ZooKeeper 节点 (znode : zoo2, 数据: myData2 ,权限: OPEN_ACL_UNSAFE ,节点类型: Persistent

一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread primeConnection

信息: Socket connection established to hadoop3/192.168.106.82:2181, initiating session

一月 10, 2017 12:48:26 上午 org.apache.zookeeper.ClientCnxn$SendThread onConnected

信息: Session establishment complete on server hadoop3/192.168.106.82:2181, sessionid = 0x35983c177d00007, negotiated timeout = 30000

WatchedEvent state:SyncConnected type:None path:null

/n2.查看是否创建成功:

myData2

/n3.修改节点数据

/n4.查看是否修改成功:

toto

/n5.删除节点

/n6.查看节点是否被删除:

节点状态:[null]

 

9.3 监听znode

Zookeeper的监听器工作机制

监听器是一个接口,我们的代码中可以实现Wather这个接口,实现其中的process方法,方法中即我们自己的业务逻辑

 

 

 

监听器的注册是在获取数据的操作中实现:

getData(path,watch?)监听的事件是:节点数据变化事件

getChildren(path,watch?)监听的事件是:节点下的子节点增减变化事件

 

9.4其它案例

所需jar包:

图1 项目包结构

package cn.com.toto.zk;

 

import java.util.List;

import java.util.concurrent.CountDownLatch;

 

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.KeeperException;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

import org.junit.Before;

import org.junit.Test;

 

public class SimpleZkClient {

   

    private static final String connectString = "192.168.106.80:2181,192.168.106.81:2181,192.168.106.82:2181";

    private static final int sessionTimeout = 2000;

   

    // latch就相当于一个对象锁,当latch.await()方法执行时,方法所在的线程会等待

    //latchcount减为0时,将会唤醒等待的线程

    CountDownLatch latch = new CountDownLatch(1);

    ZooKeeper zkClient = null;

   

    @Before

    public void init() throws Exception {

       zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

          

           //事件监听回调方法

           @Override

           public void process(WatchedEvent event) {

              if (latch.getCount() > 0 && event.getState() == KeeperState.SyncConnected) {

                  System.out.println("countdown");

                  latch.countDown();

              }

             

              //收到事件通知后的回调函数(应该是我们自己的事件处理逻辑)

              System.out.println(event.getType() + "---" + event.getPath());

              System.out.println(event.getState());

           }

       });

       latch.await();

    }

   

    //创建数据节点到zk

    @Test

    public void testCreate() throws KeeperException, InterruptedException {

       //参数1:要创建的节点的路径  参数2:节点大数据参数3:节点的权限  参数4:节点的类型

       String nodeCreated = zkClient.create("/eclipse", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        //上传的数据可以是任何类型,但都要转成byte

       zkClient.close();

    }

   

    //判断znode是否存在

    @Test

    public void testExist() throws KeeperException, InterruptedException {

       Stat stat = zkClient.exists("/eclipse", false);

       System.out.println(stat == null ? "not exist" : "exist");

    }

   

    //获取znode下的孩子节点

    @Test

    public void getChildren() throws KeeperException, InterruptedException {

        List<String> children = zkClient.getChildren("/", true);

        for(String child : children) {

            System.out.println(child);

        }

        Thread.sleep(Long.MAX_VALUE);

    }

 

    //获取参数

    @Test

    public void getData() throws KeeperException, InterruptedException {

       byte[] data = zkClient.getData("/eclipse", true, null);

       System.out.println(new String(data));

       Thread.sleep(Long.MAX_VALUE);

    }

   

    //删除znode

    @Test

    public void deleteZnode() throws InterruptedException, KeeperException {

        //参数2:指定要删除的版本,-1表示删除所有版本

       zkClient.delete("/eclipse", -1);

    }

   

    //设置参数

    @Test

    public void setData() throws Exception {

       //要注意,这里的/zookeeper 要在zookeeper中的节点中有

       zkClient.setData("/zookeeper", "imissyou angelababy".getBytes(), -1);

 

       byte[] data = zkClient.getData("/zookeeper", false, null);

       System.out.println(new String(data));

    }

}

案例二

package cn.com.toto.zk;

 

import java.util.List;

import java.util.concurrent.CountDownLatch;

 

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.Watcher.Event.KeeperState;

import org.apache.zookeeper.ZooDefs.Ids;

import org.apache.zookeeper.ZooKeeper;

import org.apache.zookeeper.data.Stat;

 

import com.sun.org.apache.bcel.internal.generic.NEW;

 

public class TestZKclient {

    static ZooKeeper zk = null;

   

    public static void main(String[] args) throws Exception {

        final CountDownLatch countDownLatch = new CountDownLatch(1);

        zk = new ZooKeeper("hadoop:2181",2000,new Watcher() {

          

           @Override

           public void process(WatchedEvent event) {

              if (event.getState() == KeeperState.SyncConnected) {

                  countDownLatch.countDown();

              }

              System.out.println(event.getPath());

              System.out.println(event.getType());

              try {

                  zk.getChildren("/zookeeper", true);

              } catch (Exception e) {

                  e.printStackTrace();

              }

           }

       });

       

        countDownLatch.await();

       

        /**

        zk.create("/myboys", "丑陋型".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zk.close();

        **/

       

        /**

        byte[] data = zk.getData("/myboys", true, null);

        System.out.println(new String(data,"UTF-8"));

        Thread.sleep(Long.MAX_VALUE);

        **/

       

        /**

        zk.create("/myboys/wangkai", "测试型".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

        zk.close();

        **/

       

        /**

        List<String> children = zk.getChildren("/myboys", true);

        for(String child : children) {

           System.out.println(child);

        }

        **/

       

        /**zk.delete("/myboys/wangkai", -1);**/

       

        /**zk.setData("/myboys", "fasdfasdf".getBytes(), -1);**/

        /**

        byte[] data = zk.getData("/myboys", true, null);

        System.out.println(new String(data,"UTF-8"));

        **/

       

        Stat stat = zk.exists("/mywives", true);

        System.out.println(stat == null ? "确实不存在" : "存在");

        zk.close();

    }

}

 

9.5 其它网络参考资料

准备工作

拷贝ZooKeeper安装目录下的zookeeper.x.x.x.jar文件到项目的classpath路径下.

创建连接和回调接口

首先需要创建ZooKeeper对象, 后续的一切操作都是基于该对象进行的.

1.  ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException  

以下为各个参数的详细说明:

·         connectString. zookeeper server列表, 以逗号隔开. ZooKeeper对象初始化后, 将从server列表中选择一个server, 并尝试与其建立连接. 如果连接建立失败, 则会从列表的剩余项中选择一个server, 并再次尝试建立连接.

·         sessionTimeout. 指定连接的超时时间.

·         watcher. 事件回调接口.

注意, 创建ZooKeeper对象时, 只要对象完成初始化便立刻返回. 建立连接是以异步的形式进行的, 当连接成功建立后, 会回调watcherprocess方法. 如果想要同步建立与server的连接, 需要自己进一步封装.

1.  public class ZKConnection {  

2.      /** 

3.       * server列表以逗号分割 

4.       */  

5.      protected String hosts = "localhost:4180,localhost:4181,localhost:4182";  

6.      /** 

7.       * 连接的超时时间毫秒 

8.       */  

9.      private static final int SESSION_TIMEOUT = 5000;  

10.     private CountDownLatch connectedSignal = new CountDownLatch(1);  

11.     protected ZooKeeper zk;  

12.   

13.     /** 

14.      * 连接zookeeper server 

15.      */  

16.     public void connect() throws Exception {  

17.         zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new ConnWatcher());  

18.         // 等待连接完成  

19.         connectedSignal.await();  

20.     }  

21.   

22.     public class ConnWatcher implements Watcher {  

23.         public void process(WatchedEvent event) {  

24.             // 连接建立回调process接口时event.getState()KeeperState.SyncConnected  

25.             if (event.getState() == KeeperState.SyncConnected) {  

26.                 // 放开闸门, waitconnect方法上的线程将被唤醒  

27.                 connectedSignal.countDown();  

28.             }  

29.         }  

30.     }  

31. }  

 

创建znode

ZooKeeper对象的create方法用于创建znode.

1.  String create(String path, byte[] data, List acl, CreateMode createMode);  

以下为各个参数的详细说明:

·         path. znode的路径.

·         data. znode关联的数据.

·         acl. 指定权限信息, 如果不想指定权限, 可以传入Ids.OPEN_ACL_UNSAFE.

·         指定znode类型. CreateMode是一个枚举类, 从中选择一个成员传入即可. 关于znode类型的详细说明, 可参考本人的上一篇博文.

1.  /** 

2.   * 创建临时节点 

3.   */  

4.  public void create(String nodePath, byte[] data) throws Exception {  

5.      zk.create(nodePath, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);  

6.  }  

 

获取子node列表

ZooKeeper对象的getChildren方法用于获取子node列表.

1.  List getChildren(String path, boolean watch);  

watch参数用于指定是否监听path node的子node的增加和删除事件, 以及path node本身的删除事件.

判断znode是否存在

ZooKeeper对象的exists方法用于判断指定znode是否存在.

1.  Stat exists(String path, boolean watch);  

watch参数用于指定是否监听path node的创建, 删除事件, 以及数据更新事件. 如果该node存在, 则返回该node的状态信息, 否则返回null.

获取node中关联的数据

ZooKeeper对象的getData方法用于获取node关联的数据.

1.  byte[] getData(String path, boolean watch, Stat stat);  

watch参数用于指定是否监听path node的删除事件, 以及数据更新事件, 注意, 不监听path node的创建事件, 因为如果path node不存在, 该方法将抛出KeeperException.NoNodeException异常.
stat
参数是个传出参数, getData方法会将path node的状态信息设置到该参数中.

更新node中关联的数据

ZooKeeper对象的setData方法用于更新node关联的数据.

1.  Stat setData(final String path, byte data[], int version);  

data为待更新的数据.
version
参数指定要更新的数据的版本, 如果version和真实的版本不同, 更新操作将失败. 指定version-1则忽略版本检查.
返回path node的状态信息.

删除znode

ZooKeeper对象的delete方法用于删除znode.

1.  void delete(final String path, int version);  

version参数的作用同setData方法.

其余接口

请查看ZooKeeper对象的API文档.

需要注意的几个地方

·         znode中关联的数据不能超过1M. zookeeper的使命是分布式协作, 而不是数据存储.

·         getChildren, getData, exists方法可指定是否监听相应的事件. create, delete, setData方法则会触发相应的事件的发生.

·         以上介绍的几个方法大多存在其异步的重载方法, 具体请查看API说明.

 

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
4天前
|
数据采集 存储 API
网络爬虫与数据采集:使用Python自动化获取网页数据
【4月更文挑战第12天】本文介绍了Python网络爬虫的基础知识,包括网络爬虫概念(请求网页、解析、存储数据和处理异常)和Python常用的爬虫库requests(发送HTTP请求)与BeautifulSoup(解析HTML)。通过基本流程示例展示了如何导入库、发送请求、解析网页、提取数据、存储数据及处理异常。还提到了Python爬虫的实际应用,如获取新闻数据和商品信息。
|
14天前
|
监控 前端开发 JavaScript
实战篇:商品API接口在跨平台销售中的有效运用与案例解析
随着电子商务的蓬勃发展,企业为了扩大市场覆盖面,经常需要在多个在线平台上展示和销售产品。然而,手工管理多个平台的库存、价格、商品描述等信息既耗时又容易出错。商品API接口在这一背景下显得尤为重要,它能够帮助企业在不同的销售平台之间实现商品信息的高效同步和管理。本文将通过具体的淘宝API接口使用案例,展示如何在跨平台销售中有效利用商品API接口,以及如何通过代码实现数据的统一管理。
|
22天前
|
机器学习/深度学习 自然语言处理 数据处理
大模型开发:描述长短期记忆网络(LSTM)和它们在序列数据上的应用。
LSTM,一种RNN变体,设计用于解决RNN处理长期依赖的难题。其核心在于门控机制(输入、遗忘、输出门)和长期记忆单元(细胞状态),能有效捕捉序列数据的长期依赖,广泛应用于语言模型、机器翻译等领域。然而,LSTM也存在计算复杂度高、解释性差和数据依赖性强等问题,需要通过优化和增强策略来改进。
|
15天前
|
安全 算法 网络安全
网络安全与信息安全:保护你的数据,保护你的未来
在数字化的世界中,网络安全和信息安全已经成为我们生活的重要组成部分。本文将深入探讨网络安全漏洞、加密技术以及安全意识等方面的问题,以期帮助读者更好地理解和应对网络安全威胁。
16 4
|
22天前
|
供应链 搜索推荐 BI
深入了解淘宝原数据:获取API接口及其使用场景
在当今数字化的时代,对于电商行业来说,数据具有极大的价值。淘宝作为中国最大的综合电商平台,拥有庞大的商品信息和用户数据。对于开发者和企业来说,淘宝原数据的获取和分析是实现个性化服务和精准营销的基础。本文将介绍如何通过API接口获取淘宝原数据,以及数据的使用场景。
|
22天前
|
安全 网络安全 数据安全/隐私保护
网络安全与信息安全:保护你的数据,保护你的未来
在数字化的世界中,网络安全和信息安全是每个人都需要关注的问题。本文将深入探讨网络安全漏洞、加密技术以及安全意识等方面的问题,帮助读者更好地理解和保护自己的数据。我们将讨论如何识别和防范网络安全威胁,如何使用加密技术来保护信息,以及如何提高自己的安全意识,从而在网络世界中更安全地生活和工作。
16 5
|
22天前
|
安全 网络安全 数据安全/隐私保护
网络安全与信息安全:保护您的数据和隐私
随着互联网的普及和技术的快速发展,网络安全和信息安全已经成为我们日常生活中不可或缺的一部分。本文将介绍网络安全漏洞、加密技术以及安全意识等方面的知识,帮助读者更好地保护自己的数据和隐私。
21 4
|
24天前
|
存储 SQL 安全
网络安全与信息安全:保护数据的关键策略
在数字化时代,网络安全和信息安全已成为维护数据完整性、保障用户隐私和企业竞争力的核心。本文章深入探讨了网络安全漏洞的概念、加密技术的重要性以及提升安全意识的必要性。通过对当前网络威胁的分析,本文提出了一系列针对性的防护措施,旨在帮助读者构建更为坚固的信息安全防线。
15 4
|
25天前
|
存储 安全 网络安全
网络安全与信息安全:保护数据,守护未来
在当今数字化时代,网络安全和信息安全变得尤为重要。本文将探讨网络安全漏洞、加密技术以及提升安全意识等方面的知识,帮助读者更好地了解并保护自己在网络空间中的信息安全。
|
25天前
|
安全 网络安全 数据安全/隐私保护
网络安全与信息安全:保护数据的关键策略
【2月更文挑战第30天】在数字化时代,网络安全和信息安全已成为个人和企业不可忽视的重要组成部分。本文将探讨网络安全漏洞的概念、加密技术的应用以及提升安全意识的重要性,旨在为读者提供一系列保护数据和信息的策略和建议。