ZooKeeper类初探

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:     ZooKeeper源码分析的版本:3.4.10。 一.创建ZooKeeper对象     ZooKeeper类是ZooKeeper客户端的实现,用来发送命令给ZooKeeper服务器。 ZooKeeper中可以设置Watcher,每个Watcher在节点状态发生变化的时候被通知,执行预先注册的Watcher动作。

    ZooKeeper源码分析的版本:3.4.10。

一.创建ZooKeeper对象

    ZooKeeper类是ZooKeeper客户端的实现,用来发送命令给ZooKeeper服务器。 ZooKeeper中可以设置Watcher,每个Watcher在节点状态发生变化的时候被通知,执行预先注册的Watcher动作。 

    ZooKeeper有三种Watcher列表: (1)DataWatcher (2)ExistWatcher (3)ChildWatcher.

    ClientCnxn是客户端和服务端通信的底层接口,和ClientCnxnSocket一起工作提供网络通信服务。

protected final ClientCnxn cnxn;// 成员变量cnxn,连接服务器,通过cnxn发送命令给服务端
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly)
    throws IOException
{
    ......// 打印log
    watchManager.defaultWatcher = watcher;
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);// 从传入的服务器地址字符串中解析出服务器地址
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());// 提供服务器地址,当服务器发生故障无法连接时,会自动连接其它的服务器
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);// 构建和服务器通信的对象cnxn
    cnxn.start();
}


二.create操作

    调用create在ZooKeeper中创建一个Node,返回值是成功创建的路径名称: 首先看看 create 方法:


public String create(final String path, byte data[], List<ACL> acl,
        CreateMode createMode)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils.validatePath(clientPath, createMode.isSequential());
    final String serverPath = prependChroot(clientPath);
    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.create);// 设置操作代码为create
    CreateRequest request = new CreateRequest();
    CreateResponse response = new CreateResponse();
    request.setData(data);// 使用输入参数构造CreateRequest请求
    request.setFlags(createMode.toFlag());
    request.setPath(serverPath);
    if (acl != null && acl.size() == 0) {
        throw new KeeperException.InvalidACLException();
    }
    request.setAcl(acl);
    ReplyHeader r = cnxn.submitRequest(h, request, response, null);// 将请求提交发送给服务器
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                clientPath);
    }
    if (cnxn.chrootPath == null) {
        return response.getPath();// 从返回的CreateResponse中获取创建成功后的路径
    } else {
        return response.getPath().substring(cnxn.chrootPath.length());
    }
}
    

    在 create 中通过 submitRequest 来提交请求:


public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null,
                null, watchRegistration);// 将CreateRequest转换成Packet包
    synchronized (packet) {
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}



    queuePacket 将CreateRequest转换成Packet包:


Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration)
{
    Packet packet = null;
    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);// 将CreateRequest转换成Packet包
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);// 将发送包放入队列,等待发送线程发送给服务器
        }
    }
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}


三.delete操作

    删除节点操作,提供同步和异步两种接口方式:


public void delete(final String path, int version, VoidCallback cb,
        Object ctx)
{
    final String clientPath = path;
    PathUtils.validatePath(clientPath);// 校验传入的路径是否合法
    final String serverPath;
    // maintain semantics even in chroot case
    // specifically - root cannot be deleted
    // I think this makes sense even in chroot case.
    if (clientPath.equals("/")) {
        // a bit of a hack, but delete(/) will never succeed and ensures
        // that the same semantics are maintained
        serverPath = clientPath;
    } else {
        serverPath = prependChroot(clientPath);
    }
    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.delete);// 设置操作代码为delete
    DeleteRequest request = new DeleteRequest();
    request.setPath(serverPath);// 使用输入参数构造DeleteRequest请求
    request.setVersion(version);
    cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath,
            serverPath, ctx, null);// 和create操作一样,调用queuePacket方法,将DeleteRequest转换成Packet包
}

四.其他类似操作

     exists:判断节点是否存在,异步方式。构造ExistsRequest请求对象,设置操作码ZooDefs.OpCode.exists;

     getData:获取节点关联数据。构造GetDataRequest请求对象,设置操作码ZooDefs.OpCode.getData;

     setData:设置节点关联数据。构造SetDataRequest请求对象,设置操作码ZooDefs.OpCode.setData;

     getChildren:获取子节点路径列表。构造GetChildrenRequest请求对象,设置操作码ZooDefs.OpCode.getChildren


    看完ZooKeeper类分析,是不是觉得很简单,都是差不多的套路:构建对应服务器操作的请求对象,打包成Packet,然后等待发送线程把这些发送包发送给服务器。

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
11天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
30 2
|
3月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0
|
1月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
90 1
|
3月前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
58 0
|
3月前
|
NoSQL 中间件 API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
81 2
|
3月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(上)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
73 0
|
5月前
分布式系列教程(25) -解决Zookeeper启动失败的问题
分布式系列教程(25) -解决Zookeeper启动失败的问题
134 0
|
2月前
|
Java Linux Spring
Zookeeper实现分布式服务配置中心
Zookeeper实现分布式服务配置中心
48 0
|
2月前
|
存储 分布式计算 Hadoop
ZooKeeper初探:分布式世界的守护者
ZooKeeper初探:分布式世界的守护者
64 0
|
2月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
298 0