【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式竞选

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介:

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式锁-升级版中,我们通过利用ZooKeeper的临时节点和Watcher特性,实现了一个分布式锁。
本文我们结合实际场景,完成一个分布式竞争选举。

设计

这里我们实现一个公平的选举方式,即先参加选举的优先被选为leader。
具体的实现思路 参考了ZooKeeper提供的官方示例:zookeeper-recipes-election

  • START:服务器开始竞选
  • OFFER:创建临时顺序结点
  • DETERMINE:开始决策,将临时节点按末尾序号从小到大排序,如果当前节点的序号最小,则竞选成功,否则,则Watch前一个节点,当前一个节点被删除时,再次进行决策
  • ELECTED:当前节点是序号最小的节点,竞选成功
  • READY:当前节点不是序号最小的节点,竞选不成功,Watch前一个节点,进入READY态
  • FAILED:当出现异常情况时,为失败状态
  • STOP:结束竞选

LeaderElectionSupport

public class LeaderElectionSupport implements LeaderElection{
    private static Logger logger = LoggerFactory.getLogger(LeaderElectionSupport.class);

    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //节点前缀
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    //要加锁节点
    private String fullPath;

    //选举状态
    private State state;

    //监听器
    private Set<LeaderElectionListener> listeners;

    //存当前节点的信息
    private volatile LeaderNode leaderNode;

    //监察器
    private Watcher watcher;


    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
    public LeaderElectionSupport(ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        this.fullPath = dir.concat("/").concat(this.node);
        init();

        state = State.STOP;
        listeners = Collections.synchronizedSet(new HashSet<>());
    }

    /**
     * 初始化根节点、检查器等
     * */
    private void init() {
        try {
            watcher = new LeaderWatcher();
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[LeaderElectionSupport#init] error : " + e.toString(), e);
        }
    }
}

start

/**
 * Start.
 * 开始竞选
 */
@Override
public void start() {
    synchronized (this) {
        state = State.START;
        dispatchEvent(EventType.START);
        offerElection();
        determineElection();
    }

}

offerElection

/**
 * 创建临时节点,参加竞选,并将主机信息保存在node中
 * */
private void offerElection() {
    dispatchEvent(EventType.OFFER_START);
    state = State.OFFER;
    if (leaderNode == null) {
        synchronized (this) {
            try {
                if (leaderNode == null) {
                    InetAddress ia = InetAddress.getLocalHost();
                    LeaderNode tmpNode = new LeaderNode();
                    tmpNode.setHostName(ia.getHostName());
                    String path = zooKeeper.create(fullPath, ConversionUtil.objectToBytes(ia.getHostName()), acls, CreateMode.EPHEMERAL_SEQUENTIAL);

                    tmpNode.setNodePath(path);
                    tmpNode.setId(NodeUtil.getNodeId(path));

                    leaderNode = tmpNode;
                }
            } catch (Exception e) {
                becomeFailed(e);
            }
        }
    }
    dispatchEvent(EventType.OFFER_COMPLETE);
}

determineElection

/**
 * 决定竞选结果
 * 1、竞选节点序号最低的赢取选举
 * 2、未赢得选举的节点,监听上一个节点,直到上一个节点被删除,则尝试重新竞选
 * */
private void determineElection() {
    dispatchEvent(EventType.DETERMINE_START);
    state = State.DETERMINE;
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();
        if (nodePathSet.isEmpty()) {
            becomeFailed(new Exception("no node"));
            return;
        }
        String leaderPath = nodePathSet.first();
        if (leaderNode.getNodePath().equalsIgnoreCase(leaderPath)) {
            becomeLeader();
        } else {
            becomeReady(nodePathSet.headSet(leaderNode.getNodePath()).last());
        }
    }
    dispatchEvent(EventType.DETERMINE_COMPLETE);
}

becomeLeader

/**
 * 竞选成功
 * */
private void becomeLeader() {
    dispatchEvent(EventType.ELECTED_START);
    state = State.ELECTED;
    dispatchEvent(EventType.ELECTED_COMPLETE);
}

becomeReady

/**
 * 竞选失败进入就绪态
 * */
private void becomeReady(String path) {
    try {
        Stat stat = zooKeeper.exists(path, watcher);

        if (stat == null) {
            determineElection();
        } else {
            dispatchEvent(EventType.READY_START);
            state = State.READY;
            dispatchEvent(EventType.READY_COMPLETE);
        }
    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }
}

becomeFailed

/**
 * 当发生异常时,更新为FAILED状态
 * */
private void becomeFailed(Exception e) {
    state = State.FAILED;
    dispatchEvent(EventType.FAILED);
    logger.error("[LeaderElectionSupport#becomeFailed] error : " + e.toString(), e);
}

getNodePathSet

/**
 * 获取参加竞选的节点信息
 * */
private TreeSet<String> getNodePathSet() {
    TreeSet<String> nodeSet = new TreeSet<>();
    try {
        List<String> nodes = zooKeeper.getChildren(dir, false);

        for (String node : nodes) {
            nodeSet.add(dir.concat("/").concat(node));
        }

    } catch (KeeperException e) {
        becomeFailed(e);
    } catch (InterruptedException e) {
        becomeFailed(e);
    }

    return nodeSet;
}

stop

/**
 * Stop.
 * 停止竞选
 */
@Override
public void stop() {
    synchronized (this) {
        dispatchEvent(EventType.STOP_START);
        deleteNode();
        state = State.STOP;
        dispatchEvent(EventType.STOP_COMPLETE);
    }
}

deleteNode

/**
 * 停止时,删除节点,退出竞选
 * */
private void deleteNode() {
    try {
        if (leaderNode != null) {
            synchronized (this) {
                zooKeeper.delete(leaderNode.getNodePath(), -1);
                leaderNode = null;
            }
        }
    } catch (InterruptedException e) {
        becomeFailed(e);
    } catch (KeeperException e) {
        becomeFailed(e);
    }
}

getLeaderHostName

/**
 * Gets get leader host name.
 *
 * @return the get leader host name
 */
@Override
public String getLeaderHostName() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        if (!nodePathSet.isEmpty()) {
            try {
                String leaderPath = nodePathSet.first();
                return (String) ConversionUtil.bytesToObject(zooKeeper.getData(leaderPath, false, null));
            } catch (KeeperException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (InterruptedException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (IOException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            } catch (ClassNotFoundException e) {
                logger.error("[LeaderWatcher#getLeaderHostName] error : " + e.toString(), e);
            }
        }

        return null;
    }
}

getLeaderNodePath

/**
 * Gets get leader node path.
 *
 * @return the get leader node path
 */
@Override
public String getLeaderNodePath() {
    synchronized (this) {
        TreeSet<String> nodePathSet = getNodePathSet();

        return nodePathSet.isEmpty() ? null : nodePathSet.first();
    }

}

LeaderWatcher

/**
 * 内部watcher类,当竞选失败时,watch前一个节点,当前一个节点别移除时,再次发起决策
 * */
private class LeaderWatcher implements Watcher {
    /**
     * Process.
     *
     * @param watchedEvent the watched event
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        try {
            if (Event.EventType.NodeDeleted.equals(watchedEvent.getType()) && !State.STOP.equals(state)) {
                determineElection();
            }
        } catch (Exception e) {
            logger.error("[LeaderWatcher#process] error : " + e.toString(), e);
        }

    }
}

总结

以上就是我们利用ZooKeeper的临时节点和Watcher特性实现的公平模式分布式竞选。

可以进行简单的选主操作,适用于如执行单机定时任务、心跳检测等场景。实际上是实现的Master-Slave模型。

源代码可见:aloofJr

而对高可用要求较多的复杂选举场景,如分布式存储、同步等,则需要考虑集群一致性、脑裂等各种情况,则需要实现如Paxos、raft、Zab等一致性算法协议。如ZooKeeper集群的选举模式就是使用的Zab算法。
我们后续会进行深入的探讨。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
7天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
28 2
|
1月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
90 1
|
2月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
242 1
|
10天前
|
Docker 容器 关系型数据库
【PolarDB-X从入门到精通】 第四讲:PolarDB分布式版安装部署(源码编译部署)
本期课程将于4月11日19:00开始直播,内容包括源码编译基础知识和实践操作,课程目标是使学员掌握源码编译部署技能,为未来发展奠定基础,期待大家在课程中取得丰富的学习成果!
【PolarDB-X从入门到精通】 第四讲:PolarDB分布式版安装部署(源码编译部署)
|
21天前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
26 1
|
2月前
|
Java Linux Spring
Zookeeper实现分布式服务配置中心
Zookeeper实现分布式服务配置中心
48 0
|
2月前
|
存储 分布式计算 Hadoop
ZooKeeper初探:分布式世界的守护者
ZooKeeper初探:分布式世界的守护者
64 0
|
2月前
|
NoSQL Java API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
298 0
|
1月前
|
NoSQL 算法 安全
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
Redlock 算法-主从redis分布式锁主节点宕机锁丢失的问题
152 0
|
1月前
|
NoSQL 关系型数据库 MySQL
分布式锁(redis/mysql)
分布式锁(redis/mysql)
55 1

热门文章

最新文章