【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列

本文涉及的产品
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 前言上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

前言

上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

设计

我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

DistributedQueue

//继承AbstractQueue类并实现Queue接口
public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {
    private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);
    
    //ZooKeeper客户端,进行ZooKeeper操作
    private ZooKeeper zooKeeper;

    //根节点名称
    private String dir;

    //数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式
    private String node;

    //ZooKeeper鉴权信息
    private List<ACL> acls;

    /**
     * Constructor.
     *
     * @param zooKeeper the zoo keeper
     * @param dir       the dir
     * @param node      the node
     * @param acls      the acls
     */
   public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {
        this.zooKeeper = zooKeeper;
        this.dir = dir;
        this.node = node;
        this.acls = acls;
        init();
    }


    private void init() {
        //需要先判断根节点是否存在,不存在的话,创建子节点时会出错。
        try {
            Stat stat = zooKeeper.exists(dir, false);
            if (stat == null) {
                zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);
            }
        } catch (Exception e) {
            logger.error("[DistributedQueue#init] error : " + e.toString(), e);
        }
    }
}

offer

/**
 * Offer boolean.
 *
 * @param o the o
 * @return the boolean
 */
@Override
public boolean offer(E o) {
    //构建要插入的节点名称
    String fullPath = dir.concat("/").concat(node);
    try {
        //创建子节点成功则返回入队成功
      zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);
        return true;
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] error : " + e.toString(), e);
    }
    return false;
}

poll

/**
 * Poll e.
 *
 * @return the e
 */
@Override
public E poll() {
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,并删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                zooKeeper.delete(fullPath, -1);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);
    }

    return null;
}

peek

/**
 * Peek e.
 *
 * @return the e
 */
@Override
public E peek() {
   
    try {
        //获取根节点所有子节点信息。
        List<String> children = zooKeeper.getChildren(dir, null);
        //如果队列是空的则返回null
        if (children == null || children.isEmpty()) {
            return null;
        }

        //将子节点名称排序
        Collections.sort(children);
        
        for (String child : children) {
            //拼接子节点的具体名称
            String fullPath = dir.concat("/").concat(child);
            try {
                //如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点
                byte[] bytes = zooKeeper.getData(fullPath, false, null);
                E data = (E) bytesToObject(bytes);
                return data;
            } catch (Exception e) {
                logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);
            }
        }

    } catch (Exception e) {
        logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);
    }

    return null;
}

size

/**
 * Size int.
 *
 * @return the int
 */
@Override
public int size() {
    try {
        //获取根节点的子节点名称
        List<String> children = zooKeeper.getChildren(dir, null);
        //返回子结点信息数量
        return children.size();
    } catch (Exception e) {
        logger.error("[DistributedQueue#offer] size : " + e.toString(), e);
    }

    return 0;
}

总结

上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr
如果有好的优化建议,欢迎一起讨论。

更多文章

见我的博客:https://nc2era.com

written by AloofJr,转载请注明出处

相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
目录
相关文章
|
8天前
|
监控 负载均衡 Cloud Native
ZooKeeper分布式协调服务详解:面试经验与必备知识点解析
【4月更文挑战第9天】本文深入剖析ZooKeeper分布式协调服务原理,涵盖核心概念如Server、Client、ZNode、ACL、Watcher,以及ZAB协议在一致性、会话管理、Leader选举中的作用。讨论ZooKeeper数据模型、操作、会话管理、集群部署与管理、性能调优和监控。同时,文章探讨了ZooKeeper在分布式锁、队列、服务注册与发现等场景的应用,并在面试方面分析了与其它服务的区别、实战挑战及解决方案。附带Java客户端实现分布式锁的代码示例,助力提升面试表现。
29 2
|
1月前
|
监控 NoSQL Java
Zookeeper分布式锁
Zookeeper分布式锁
90 1
|
2月前
|
消息中间件 RocketMQ 微服务
RocketMQ 分布式事务消息实战指南
RocketMQ 分布式事务消息实战指南
242 1
|
21天前
|
缓存 应用服务中间件 数据库
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
【分布式技术专题】「缓存解决方案」一文带领你好好认识一下企业级别的缓存技术解决方案的运作原理和开发实战(多级缓存设计分析)
26 1
|
1月前
|
消息中间件 监控 NoSQL
在Windows下设置分布式队列Celery的心跳轮询
在Windows下设置分布式队列Celery的心跳轮询
28 0
|
1月前
|
消息中间件 监控 NoSQL
一文读懂python分布式任务队列-celery
celery是一个简单,灵活、可靠的分布式任务执行框架,可以支持大量任务的并发执行。celery采用典型生产者和消费者模型。生产者提交任务到任务队列,众多消费者从任务队列中取任务执行【2月更文挑战第11天】
81 5
|
2月前
|
Java Linux Spring
Zookeeper实现分布式服务配置中心
Zookeeper实现分布式服务配置中心
48 0
|
3月前
|
消息中间件 Java 网络安全
JAVAEE分布式技术之Zookeeper的第一次课
JAVAEE分布式技术之Zookeeper的第一次课
70 0
|
3月前
|
监控 Dubbo Java
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
深入理解Zookeeper系列-2.Zookeeper基本使用和分布式锁原理
58 0
|
3月前
|
NoSQL 中间件 API
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)(下)
分布式锁【数据库乐观锁实现的分布式锁、Zookeeper分布式锁原理、Redis实现的分布式锁】(三)-全面详解(学习总结---从入门到深化)
80 2