Zookeeper使用案例

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 本文主要讲述通过zookeeper实现分布式锁、分布式队列、leader选举的实现,以及详细代码


本文中的示例都是参考zookeeper-3.4.10/recipes中的示例代码,但因为这里的示例代码有bug,所以才做了一些改动和封装。

案例代码:https://gitee.com/wuzhengfei/great-truth

参考com.wzf.greattruth.zookeeper包中的代码。

 

一、   Lock

分布式锁要求全局只有一个线程可以获取到锁。使用中锁分两种,一种是获取不到锁直接放弃;一种是获取锁以后等待锁释放,释放后唤醒此次线程。

1.  获取锁,获取不到时等待锁资源释放

1)   原理

所有client都尝试在zookeeper指定目录下创建有序的临时节点;接着查询此目录下的所有children节点,接着对所有的节点进行排序,检查当前client创建的节点的序号是否是最小的那个,如果是那么认为此client取得锁;如果不是则监听上一个节点的删除事件;当取得锁的client执行完任务并删除临时节点后,就会触发之前的监听事件,从而触发下一个client的任务。

 

2)   步骤

以下是zookeeper recipes.pdf中关于获取锁顺序的描述。

²   Call create( ) with a pathname of "_locknode_/lock-" and the sequence and ephemeral flags set.

²   Call getChildren( ) on the lock node without setting the watch flag (this is important to avoid the herd effect).

²   If the pathname created in step 1 has the lowest sequence number suffix, the client has the lock and the client exits the protocol.

²   The client calls exists( ) with the watch flag set on the path in the lock directory with the next lowest sequence number.

²   if exists( ) returns false, go to step 2. Otherwise, wait for a notification for the pathname from the previous step before going to step 2.

 

a)   确保父节点存在

保证锁节点(lock root node)的父根节点存在,如果不存在就需要创建一个,此父节点是PERSISTENT的。注意如果父节点是多层级并且这些层级都不存在时,需要分别创建各个层级的节点。


Stat stat = zookeeper.exists(parentDir, false);
if (stat == null) {
   zookeeper.create(zkDir, datas, acl, mode);
}

b)   创建临时有序节点

客户端调用create()方法创建EPHEMERAL_SEQUENTIAL类型的节点。EPHEMERAL_SEQUENTIAL类型的节点是临时且顺序的,如果获取锁的client挂掉,则该节点自动失效。


List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE
byte[] datas = null
CreateMode mode = CreateMode.EPHEMERAL_SEQUENTIAL
String nodePath = zookeeper.create(parentDir, datas, acl, mode);
// 截取路径,获取nodeId
currentNodeId = ZookeeperUtil.getNodeId(nodePath, ephemeralDir);

 

c)   查询所有children

在父锁节点(lock root node)上调用getChildren()。

List<String> children = zookeeper.getChildren(parentDir, false);

 

d)   计算获取锁的client

按照公平竞争的原则,将上一步骤中获取的子节点按照节点编号排序,取出编号最小的一个节点做为取到锁的节点,然后判断自己是否就是创建此节点的client,如果是则返回lock成功,如果不是则lock失败,继续下一步操作。

SortedSet<Node> nodes = new TreeSet<Node>();
for (String child : children) {
    nodes.add(new Node(dir, child));
}
 
Node node = nodes.first();
if (currentNodeId.equals(node.getId())) {
    // 当前node是最小的那个node,那么当前线程上锁成功
    return true ;
}

 

e)   监听上一个节点

如果未获得锁,那么调用exists()监听上一个节点被删除的事件。(在获取并发锁的过程中,会将所有节点排序,以便判断当前节点是否可以获得锁,按照这个顺序取当前节点的上一个节点)


// 如果watcher不为空,那么监控前一个node的create/delete事件
if (serialLockWatcher != null) {
    // 如果不需要设置watcher,那么直接返回true
    Node currentNode = new Node(dir, currentNodeId);
    SortedSet<Node> preNodes = nodes.headSet(currentNode);
    if (preNodes.isEmpty()) {
        //这种情况应该不会发生,因为除非当前node id是最小的,才会进入这里,此种情况下载之前的判断是就会返回true
        return true;
    }
    
    Node preNode = preNodes.last();
    String preNodeId = preNode.getId();
    String preNodePath = ZookeeperUtil.getNodePath(dir, preNodeId);
    Stat stat = zookeeper.exists(preNodePath, serialLockWatcher);
    if (stat == null) {
        return false;
    }
}

 

f)   获取锁的client执行任务

如果获取到锁,那么直接执行任务;执行完毕以后删除当前client创建的临时节点。


boolean lock = distributeLock.lock();
if (!lock) {
    return null;
}
try {
   //执行任务
} finally {
    distributeLock.unlock();
}
 

 

g)   任务执行完毕后删除节点

临时节点删除时,watcher被触发,此时可以判断当前是否能够获取锁。

执行unlock时会删除之前创建的临时节点,从而触发Watcher,执行以下逻辑。


/**
* 获取锁以后的回调函数
* 
* @param obj
*/
public abstract void callbackWhenGainLock(WatchedEvent event, T obj);
 
@Override
public void process(WatchedEvent event) {
    // 收到监控事件以后,重新尝试上锁
    if (distributeLock != null) {
        boolean lock = distributeLock.lock();
        // 上锁成功以后执行回调函数
        if (lock) {
            try {
                callbackWhenGainLock(event, t);
            } finally {
                distributeLock.unlock();
            }
        }
    }
}

 

到目前为止获取并发锁的逻辑已经全部完成。记得使用完锁以后,最好将创建的node删除,以免时间过程后导致创建的临时节点膨胀。

 

 

2.  获取锁,获取不到时直接放弃。

1)   原理

所有client都尝试在zookeeper指定目录下创建一个同名节点,因为节点名称相同只会有一个节点创建成功,创建成功的client获取锁,其他client获取锁失败。

2)   步骤

a)   创建父节点

通过exist方法检查父级目录是否存在,不存在则通过create创建一个PERSISTENT的父节点。


Stat stat = zookeeper.exists(parentDir, false);
if (stat == null) {
   zookeeper.create(zkDir, datas, acl, mode);
}

 

b)   获取锁

尝试通过create在父目录下创建一个EPHEMERAL节点,创建成功则认为取到锁。

// 在parentDir目录下创建临时节点,创建成功则认为上锁成功
String path = ZookeeperUtil.getNodePath(parentDir, nodeKey);
String nodePath = createNode(path, datas, CreateMode.EPHEMERAL);
boolean locked = StringUtils.isNoneBlank(nodePath);


 

c)   获取锁的client执行业务逻辑,执行完毕后删除锁

boolean lock = distributeLock.lock();
if (!lock) {
    return null;
}
try {
   //执行任务
} finally {
    distributeLock.unlock();
}

二、   Queue

1.  分布式队列

1)   原理

入队列时,client在zookeeper指定目录下的创建PERSISTENT_SEQUENTIAL类型的节点,并将数据一起存在此节点中。

出队列时,client从zookeeper指定目录下获取序号最小的client的数据,然后删除此节点。

按照这种方式实现的队列其实是分布式先进先出队列,因为入队列时新创建节点序号逐次增大,而取数据的时候是从序号小的开始取,所以新进入队列的总是会被先取到。

 

2)   步骤

a)   入队前,确保父节点存在

保证父根节点存在,如果不存在就需要创建一个,此父节点是PERSISTENT的。

Stat stat = zookeeper.exists(parentDir, false);
if (stat == null) {
   zookeeper.create(zkDir, datas, acl, mode);
}


b)   入队列时,创建持久化的有序节点

客户端调用create()方法创建PERSISTENT_SEQUENTIAL类型的节点。

List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE
//datas为需要入队的数据
byte[] datas = null
CreateMode mode = CreateMode.PERSISTENT_SEQUENTIAL
String nodePath = zookeeper.create(parentDir, datas, acl, mode);

 

c)   出队列前,获取最小的节点

// 查询父目录下的所有children,排序后找到上一个node,监听此node的create/delete事件
List<String> children = null;
try {
    children = zookeeper.getChildren(queueDir, false);
} catch (KeeperException | InterruptedException e) {
    LOGGER.error("get children failed!", e);
}
if (CollectionUtils.isEmpty(children)) {
    return null;
}
SortedSet<Node> nodes = new TreeSet<Node>();
for (String child : children) {
    nodes.add(new Node(queueDir, child));
}
nodes.first();
   

d)   出队列时,获取数据然后删除节点  

//获取数据
String nodePath = ZookeeperUtil.getNodePath(node.getDir(),node.getId());
byte[] datas = zookeeper.getData(nodePath, false, null);
//删除节点
zookeeper.delete(path, -1);


三、   Election

1.  选举Leader方案一

1)   原理

线程启动后,去zookeeper指定目录下创建一个EPHEMERAL_SEQUENTIAL类型的节点;然后获取此目录下的children,然后将节点从小到大排序,接着判断当前线程创建的节点是否是所有children中最小的那个节点。如果是最小的那个节点则认为当前线程称为Leader;如果不是最小的那个节点,则通过exist方法监控上一个节点被操作的事件(其实主要是删除事件),当收到监听事件以后检查自己是否是最小的节点,是则成为leader。

 

2)   流程图

ddf01ba1bd2c7d1b4b16a382bccb1f67f3722374

 

3)   分析

虽然从流程上看,按照这种方式进行Leader选举并没有什么问题,但是如果应用于实践,却并不可靠,有分布式开发经验的小伙伴应该都清楚,我们应该假设所有环节都可能出问题,按照这个思路,流程图存在如下问题:

流程图中标序号的几个地方都存在网络传输,如果其中任何一个地方出现异常,都可能让我们的leader选举不能按照预期执行。例如设置监听器失败时,可能导致leader失效以后,其他服务器不能替代leader位置;监听被触发后,如果执行查询节点时除了问题将导致无法选举出leader。

接下来的一个方案是专门解决这个方案问题的。

 

2.  选举Leader方案二

1)   原理

和上面方案相比,增加了定时发送心跳,删除失效节点这两步。上面方案问题的根源在于:因为分布式系统之间通信存在各种不确定性,所以任何一个地方如果失败了都可能导致程序无法继续运行。稍有经验的小伙伴应该就知道,只要加一个异常处理的补偿流程,剔除主流程中的不确定因素,那么程序就能很好的按照我们预期执行。

 

2)   流程图


e38466e9f3773c405130a3ecb2ab8de335bcaa11

 

3)   步骤

a)   确保父节点存在

和之前Lock、Queue一样,首先保证父节点存在,这样才能创建子节点。


Stat stat = zookeeper.exists(parentDir, false);
if (stat == null) {
   zookeeper.create(zkDir, datas, acl, mode);
}


b)   创建临时有序节点

客户端调用create()方法创建EPHEMERAL_SEQUENTIAL类型的节点。EPHEMERAL_SEQUENTIAL类型的节点是临时且顺序的,如果获取锁的client挂掉,则该节点自动失效。


String path = ZookeeperHelper.getNodePath(dir, “ele-”);
List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE
byte[] datas = null
String nodePath = zookeeper.create(path, datas, acl, mode);
// 截取路径,获取nodeId
currentNodeId = ZookeeperUtil.getNodeId(nodePath, ephemeralDir);

 

c)   查询所有children

在父锁节点(lock root node)上调用getChildren()。

List<String> children = zookeeper.getChildren(parentDir, false);



d)   是leader还是fllower

将上一步骤中获取的子节点按照节点编号排序,取出编号最小的一个节点,然后判断自己是否就是创建此节点的client,如果是则将自己设置为leader,如果不是则表示自己为fllower,继续下一步操作。

SortedSet<Node> nodes = new TreeSet<Node>();
for (String child : children) {
    nodes.add(new Node(dir, child));
}
 
Node node = nodes.first();
if (currentNodeId.equals(node.getId())) {
    // 当前node是最小的那个node,那么当前线程上锁成功
    return true ;
}


 

e)   监听上一个节点

如果client是fllower,那么调用exists()监听上一个节点被修改的事件。


SortedSet<Node> preNodes = nodes.headSet(leaderOffer);
Node preNode = preNodes.last();
String preNodeId = preNode.getId();
String preNodePath = ZookeeperHelper.getNodePath(dir, preNodeId);
boolean exist = zookeeperManager.exist(zooKeeper, preNodePath, this);
 
public void process(WatchedEvent event) {
    if (event.getType() == EventType.NodeDeleted) {
        if (state != ElectionState.STOP) {
            determineState();
        }
    }
}

 

f)   异常补偿流程

// 启动定时线程,定时发送心跳,删除已经死亡的节点
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            // 发送心跳信息
            sendHeartbeat();
 
            // 主动清理失效节点
            removeDeadNodes();
 
            determineState();
 
        } catch (Exception e) {
            LOGGER.error("heartbeat or remove dead node failed!", e);
        }
    }
}, checkInterval, checkInterval, TimeUnit.MILLISECONDS); 

 

h)   停止服务时,删除节点、停止定时程序

停止选举服务时,将当前的节点删除,状态改为STOP,定时程序停止掉。

public boolean stop() {
    if (leaderOffer == null) {
        changeState(ElectionState.STOP);
        return true;
    }
    if (ElectionState.STOP == state) {
        return true;
    }
    String nodePath = ZookeeperHelper.getNodePath(leaderOffer.getDir(), leaderOffer.getId());
    // 节点不存在,则直接认为已停止
    boolean exist = zookeeperManager.exist(zooKeeper, nodePath);
    if (!exist) {
        changeState(ElectionState.STOP);
        return true;
    }
    boolean deleted = zookeeperManager.deleteNode(zooKeeper, nodePath);
    if (!deleted) {
        changeState(ElectionState.ERROR);
        return false;
    } else {
        leaderOffer = null;
        changeState(ElectionState.STOP);
        return true;
    }
}

 

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
1月前
|
消息中间件 分布式计算 负载均衡
ZooKeeper应用案例
【2月更文挑战第24天】
|
8月前
|
Java Linux API
Zookeeper学习---3、服务器动态上下线监听案例、ZooKeeper 分布式锁案例、企业面试真题
Zookeeper学习---3、服务器动态上下线监听案例、ZooKeeper 分布式锁案例、企业面试真题
|
11月前
zookeeper入门到精通08——服务器节点动态上下线案例实战
zookeeper入门到精通08——服务器节点动态上下线案例实战
|
11月前
|
运维 Kubernetes Cloud Native
《云原生架构容器&微服务优秀案例集》——02 汽车/制造——致景科技 基于 MSE 一站式实现服务治理
《云原生架构容器&微服务优秀案例集》——02 汽车/制造——致景科技 基于 MSE 一站式实现服务治理
167 0
|
11月前
|
Cloud Native 安全 Java
《云原生架构容器&微服务优秀案例集》——02 汽车/制造——来电科技 基于 MSE 无侵入式实现微服务治理
《云原生架构容器&微服务优秀案例集》——02 汽车/制造——来电科技 基于 MSE 无侵入式实现微服务治理
200 0
|
11月前
|
运维 Cloud Native 安全
《云原生架构容器&微服务优秀案例集》——03 零售/电商——斯凯奇 通过 MSE 提升业务迭代效率,轻松应对线上大促
《云原生架构容器&微服务优秀案例集》——03 零售/电商——斯凯奇 通过 MSE 提升业务迭代效率,轻松应对线上大促
130 0
|
11月前
|
运维 Kubernetes Cloud Native
《云原生架构容器&微服务优秀案例集》——05 金融—— 费芮互动 通过 MSE 完成移动支付应用稳定性和安全性双提升
《云原生架构容器&微服务优秀案例集》——05 金融—— 费芮互动 通过 MSE 完成移动支付应用稳定性和安全性双提升
246 0
《云原生架构容器&微服务优秀案例集》——05 金融—— 费芮互动 通过 MSE 完成移动支付应用稳定性和安全性双提升
|
11月前
|
Cloud Native 安全 Java
《2023云原生实战案例集》——01 汽车/制造——来电科技 基于MSE无侵入式实现微服务治理
《2023云原生实战案例集》——01 汽车/制造——来电科技 基于MSE无侵入式实现微服务治理
|
11月前
|
运维 Cloud Native 安全
《2023云原生实战案例集》——01 汽车/制造——致景科技 基于MSE 一站式实现服务治理
《2023云原生实战案例集》——01 汽车/制造——致景科技 基于MSE 一站式实现服务治理
|
11月前
|
运维 供应链 Cloud Native
《2023云原生实战案例集》——02 零售/电商/本地生活——斯凯奇 通过MSE提升业务迭代效率,轻松应对线上大促
《2023云原生实战案例集》——02 零售/电商/本地生活——斯凯奇 通过MSE提升业务迭代效率,轻松应对线上大促

热门文章

最新文章