Copycat - CopycatServer

简介:

Server被拉起有两种方式,

Address address = new Address("123.456.789.0", 5000);
CopycatServer.Builder builder = CopycatServer.builder(address);
builder.withStateMachine(MapStateMachine::new);

自己拉起一个cluster,

CompletableFuture<CopycatServer> future = server.bootstrap();
future.join();

 

join到一个现有的cluster,

Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
server.join(cluster).join();

 

CopycatServer.builder.build

复制代码
/**
     * @throws ConfigurationException if a state machine, members or transport are not configured
     */
    @Override
    public CopycatServer build() {
      if (stateMachineFactory == null)
        throw new ConfigurationException("state machine not configured");

      // If the transport is not configured, attempt to use the default Netty transport.
      if (serverTransport == null) {
        try {
          serverTransport = (Transport) Class.forName("io.atomix.catalyst.transport.netty.NettyTransport").newInstance(); //默认netty
        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
          throw new ConfigurationException("transport not configured");
        }
      }

      // If the client transport is not configured, default it to the server transport.
      if (clientTransport == null) {
        clientTransport = serverTransport;
      }

      // If no serializer instance was provided, create one.
      if (serializer == null) {
        serializer = new Serializer(new PooledHeapAllocator());
      }

      // Resolve serializable request/response and other types.
      serializer.resolve(new ClientRequestTypeResolver());
      serializer.resolve(new ClientResponseTypeResolver());
      serializer.resolve(new ProtocolSerialization());
      serializer.resolve(new ServerSerialization());
      serializer.resolve(new StorageSerialization());

      // If the storage is not configured, create a new Storage instance with the configured serializer.
      if (storage == null) {
        storage = new Storage(); //storage
      }

      ConnectionManager connections = new ConnectionManager(serverTransport.client());
      ThreadContext threadContext = new SingleThreadContext(String.format("copycat-server-%s-%s", serverAddress, name), serializer); //单线程的TreadContext,对thread的简单封装,用于执行对statemachine的操作

      ServerContext context = new ServerContext(name, type, serverAddress, clientAddress, storage, serializer, stateMachineFactory, connections, threadContext); //封装成ServerContext
      context.setElectionTimeout(electionTimeout)
        .setHeartbeatInterval(heartbeatInterval)
        .setSessionTimeout(sessionTimeout)
        .setGlobalSuspendTimeout(globalSuspendTimeout);

      return new CopycatServer(name, clientTransport, serverTransport, context);
    }
复制代码

 

CopycatServer.bootstrap

public CompletableFuture<CopycatServer> bootstrap() {
return bootstrap(Collections.EMPTY_LIST); //仅仅拉起自己,所以参数是empty list
}

 

public CompletableFuture<CopycatServer> bootstrap(Collection<Address> cluster) {
return start(() -> cluster().bootstrap(cluster));
}
 
调用start,
复制代码
/**
   * Starts the server.
   */
  private CompletableFuture<CopycatServer> start(Supplier<CompletableFuture<Void>> joiner) {
    if (started)
      return CompletableFuture.completedFuture(this);

    if (openFuture == null) {
      synchronized (this) {
        if (openFuture == null) {
          Function<Void, CompletionStage<CopycatServer>> completionFunction = state -> {
            CompletableFuture<CopycatServer> future = new CompletableFuture<>();
            openFuture = null;
            joiner.get().whenComplete((result, error) -> { //处理joiner
              if (error == null) {
                if (cluster().leader() != null) {
                  started = true;
                  future.complete(this);
                } else {
                  electionListener = cluster().onLeaderElection(leader -> {
                    if (electionListener != null) {
                      started = true;
                      future.complete(this);
                      electionListener.close();
                      electionListener = null;
                    }
                  });
                }
              } else {
                future.completeExceptionally(error);
              }
            });
            return future;
          };

          if (closeFuture == null) {
            openFuture = listen().thenCompose(completionFunction); //listen
          } else {
            openFuture = closeFuture.thenCompose(c -> listen().thenCompose(completionFunction));
          }
        }
      }
    }
复制代码

start主要做两件事,执行joiner和listen

joiner这里是cluster().bootstrap(cluster)

复制代码
@Override
  public CompletableFuture<Void> bootstrap(Collection<Address> cluster) {
    if (joinFuture != null)
      return joinFuture;

    if (configuration == null) {
      if (member.type() != Member.Type.ACTIVE) {
        return Futures.exceptionalFuture(new IllegalStateException("only ACTIVE members can bootstrap the cluster"));
      } else {
        // Create a set of active members.
        Set<Member> activeMembers = cluster.stream()
          .filter(m -> !m.equals(member.serverAddress()))
          .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated()))
          .collect(Collectors.toSet());

        // Add the local member to the set of active members.
        activeMembers.add(member);

        // Create a new configuration and store it on disk to ensure the cluster can fall back to the configuration.
        configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers));
      }
    }
    return join();
  }
复制代码

 

listen

复制代码
/**
   * Starts listening the server.
   */
  private CompletableFuture<Void> listen() {
    CompletableFuture<Void> future = new CompletableFuture<>();
    context.getThreadContext().executor().execute(() -> {
      internalServer.listen(cluster().member().serverAddress(), context::connectServer).whenComplete((internalResult, internalError) -> { //internalServer可能是local或是netty
        if (internalError == null) {
          // If the client address is different than the server address, start a separate client server.
          if (clientServer != null) {
            clientServer.listen(cluster().member().clientAddress(), context::connectClient).whenComplete((clientResult, clientError) -> { //和client沟通可能是不同的地址
              started = true;
              future.complete(null);
            });
          } else {
            started = true;
            future.complete(null);
          }
        } else {
          future.completeExceptionally(internalError);
        }
      });
    });

    return future;
  }
复制代码

ServerContext

复制代码
/**
   * Handles a connection from a client.
   */
  public void connectClient(Connection connection) {
    threadContext.checkThread();

    // Note we do not use method references here because the "state" variable changes over time.
    // We have to use lambdas to ensure the request handler points to the current state.
    connection.handler(RegisterRequest.class, request -> state.register(request));
    connection.handler(ConnectRequest.class, request -> state.connect(request, connection));
    connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request));
    connection.handler(UnregisterRequest.class, request -> state.unregister(request));
    connection.handler(CommandRequest.class, request -> state.command(request));
    connection.handler(QueryRequest.class, request -> state.query(request));

    connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection);
  }

  /**
   * Handles a connection from another server.
   */
  public void connectServer(Connection connection) {
    threadContext.checkThread();

    // Handlers for all request types are registered since requests can be proxied between servers.
    // Note we do not use method references here because the "state" variable changes over time.
    // We have to use lambdas to ensure the request handler points to the current state.
    connection.handler(RegisterRequest.class, request -> state.register(request));
    connection.handler(ConnectRequest.class, request -> state.connect(request, connection));
    connection.handler(KeepAliveRequest.class, request -> state.keepAlive(request));
    connection.handler(UnregisterRequest.class, request -> state.unregister(request));
    connection.handler(PublishRequest.class, request -> state.publish(request));
    connection.handler(ConfigureRequest.class, request -> state.configure(request));
    connection.handler(InstallRequest.class, request -> state.install(request));
    connection.handler(JoinRequest.class, request -> state.join(request));
    connection.handler(ReconfigureRequest.class, request -> state.reconfigure(request));
    connection.handler(LeaveRequest.class, request -> state.leave(request));
    connection.handler(AppendRequest.class, request -> state.append(request));
    connection.handler(PollRequest.class, request -> state.poll(request));
    connection.handler(VoteRequest.class, request -> state.vote(request));
    connection.handler(CommandRequest.class, request -> state.command(request));
    connection.handler(QueryRequest.class, request -> state.query(request));

    connection.closeListener(stateMachine.executor().context().sessions()::unregisterConnection);
  }
复制代码

 

加入一个cluster

public CompletableFuture<CopycatServer> join(Collection<Address> cluster) {
    return start(() -> cluster().join(cluster));
  }

ClusterState.join,这里的逻辑和bootstrap类似

复制代码
@Override
  public synchronized CompletableFuture<Void> join(Collection<Address> cluster) {

    // If no configuration was loaded from disk, create a new configuration.
    if (configuration == null) { //当不存在configuration
      // Create a set of cluster members, excluding the local member which is joining a cluster.
      Set<Member> activeMembers = cluster.stream()
        .filter(m -> !m.equals(member.serverAddress())) //过滤掉自己
        .map(m -> new ServerMember(Member.Type.ACTIVE, m, null, member.updated())) //创建ServerMember对象
        .collect(Collectors.toSet());

      // If the set of members in the cluster is empty when the local member is excluded,
      // fail the join.
      if (activeMembers.isEmpty()) { //如果cluster为空
        return Futures.exceptionalFuture(new IllegalStateException("cannot join empty cluster"));
      }

      // Create a new configuration and configure the cluster. Once the cluster is configured, the configuration
      // will be stored on disk to ensure the cluster can fall back to the provided configuration if necessary.
      configure(new Configuration(0, 0, member.updated().toEpochMilli(), activeMembers)); //让configuration生效
    }
    return join();
  }
复制代码

只是需要初始化configuration

然后调用join

复制代码
/**
   * Starts the join to the cluster.
   */
  private synchronized CompletableFuture<Void> join() {
    joinFuture = new CompletableFuture<>();

    context.getThreadContext().executor().execute(() -> {
      // Transition the server to the appropriate state for the local member type.
      context.transition(member.type()); //将当前member transitioin到指定type

      // Attempt to join the cluster. If the local member is ACTIVE then failing to join the cluster
      // will result in the member attempting to get elected. This allows initial clusters to form.
      List<MemberState> activeMembers = getActiveMemberStates();
      if (!activeMembers.isEmpty()) {
        join(getActiveMemberStates().iterator()); //join 其他active members
      } else {
        joinFuture.complete(null);
      }
    });

    return joinFuture.whenComplete((result, error) -> joinFuture = null);
  }
复制代码

 

复制代码
/**
   * Recursively attempts to join the cluster.
   */
  private void join(Iterator<MemberState> iterator) {
    if (iterator.hasNext()) {
      cancelJoinTimer();
      joinTimeout = context.getThreadContext().schedule(context.getElectionTimeout().multipliedBy(2), () -> {
        join(iterator); //只要不成功,就会一直递归schedule join
      });

      MemberState member = iterator.next();
      LOGGER.debug("{} - Attempting to join via {}", member().address(), member.getMember().serverAddress());

      context.getConnections().getConnection(member.getMember().serverAddress()).thenCompose(connection -> {
        JoinRequest request = JoinRequest.builder()
          .withMember(new ServerMember(member().type(), member().serverAddress(), member().clientAddress(), member().updated()))
          .build();
        return connection.<JoinRequest, JoinResponse>send(request); //发送join request
      }).whenComplete((response, error) -> {
        // Cancel the join timer.
        cancelJoinTimer(); //先cancel join timer

        if (error == null) { //join 成功
          if (response.status() == Response.Status.OK) {
            LOGGER.info("{} - Successfully joined via {}", member().address(), member.getMember().serverAddress());

            Configuration configuration = new Configuration(response.index(), response.term(), response.timestamp(), response.members());

            // Configure the cluster with the join response.
            // Commit the configuration as we know it was committed via the successful join response.
            configure(configuration).commit(); //更新配置


          } else if (response.error() == null || response.error() == CopycatError.Type.CONFIGURATION_ERROR) {
            // If the response error is null, that indicates that no error occurred but the leader was
            // in a state that was incapable of handling the join request. Attempt to join the leader
            // again after an election timeout.
            LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
            resetJoinTimer();
          } else {
            // If the response error was non-null, attempt to join via the next server in the members list.
            LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
            join(iterator);
          }
        } else {
          LOGGER.debug("{} - Failed to join {}", member().address(), member.getMember().address());
          join(iterator);
        }
      });
    }
    // If join attempts remain, schedule another attempt after two election timeouts. This allows enough time
    // for servers to potentially timeout and elect a leader.
    else {
      LOGGER.debug("{} - Failed to join cluster, retrying...", member.address());
      resetJoinTimer(); //如果遍历完还不成功,reset
    }
  }
复制代码

对任何一个member,join成功,即可,因为join request无论发给谁,都会forward给leader

相关文章
|
9月前
|
应用服务中间件 网络安全 nginx
docker仓库
docker仓库
934 1
|
存储 前端开发 Java
|
9天前
|
Kubernetes 安全 Devops
【云效流水线 Flow 测评】驾驭云海:五大场景下的云效Flow实战部署评测
云效是一款企业级持续集成和持续交付工具,提供免费、高可用的服务,集成阿里云多种服务,支持蓝绿、分批、金丝雀等发布策略。其亮点包括快速定位问题、节省维护成本、丰富的企业级特性及与团队协作的契合。基础版和高级版分别针对小型企业和大规模团队,提供不同功能和服务。此外,云效对比Jenkins在集成阿里云服务和易用性上有优势。通过实战演示了云效在ECS和K8s上的快速部署流程,以及代码质量检测和AI智能排查功能,展示了其在DevOps流程中的高效和便捷,适合不同规模的企业使用。本文撰写用时5小时,请各位看官帮忙多多支持,如有建议也请一并给出,您的建议能帮助我下一篇更加出色。
136073 12
|
16天前
|
设计模式 前端开发 JavaScript
卓越工程布道:掌握条件判断的模式
本文是普适性的经验分享,并非按规范局限在 JavaScript 前端视角 做出的总结,除JavaScript外还深入结合了ActionScript 3.0、PHP、C / C++、Basic非纯粹OOP领域语言的经验。
241499 0
|
9天前
|
存储 SQL Apache
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
阿里云数据库内核 Apache Doris 基于 Workload Group 的负载隔离能力解读
|
14天前
|
机器学习/深度学习 数据采集 人工智能
人类生产力的解放?揭晓从大模型到AIGC的新魔法
本文从介绍大模型的概念延伸到大模型的革命意义。作者讲述了通过大模型的加持,让AIGC有了更多的可能性。
126800 6
|
14天前
|
人工智能 弹性计算 算法
一文解读:阿里云AI基础设施的演进与挑战
对于如何更好地释放云上性能助力AIGC应用创新?“阿里云弹性计算为云上客户提供了ECS GPU DeepGPU增强工具包,帮助用户在云上高效地构建AI训练和AI推理基础设施,从而提高算力利用效率。”李鹏介绍到。目前,阿里云ECS DeepGPU已经帮助众多客户实现性能的大幅提升。其中,LLM微调训练场景下性能最高可提升80%,Stable Difussion推理场景下性能最高可提升60%。
|
16天前
|
Kubernetes Cloud Native 容灾
OpenKruise v1.6 版本解读:增强多域管理能力
OpenKruise 在 2024.3 发布了最新的 v1.6 版本(ChangeLog),本文对新版本的核心特性做整体介绍。
164353 4
|
15天前
|
机器人 Linux API
基于Ollama+AnythingLLM轻松打造本地大模型知识库
Ollama是开源工具,简化了在本地运行大型语言模型(ile优化模型运行,支持GPU使用和热加载。它轻量、易用,可在Mac和Linux上通过Docker快速部署。AnythingLLM是Mintplex Labs的文档聊天机器人,支持多用户、多种文档格式,提供对话和查询模式,内置向量数据库,可高效管理大模型和文档。它也是开源的,能与Ollama结合使用,提供安全、低成本的LLM体验。这两款工具旨在促进本地高效利用和管理LLMs。
140011 28
|
16天前
|
消息中间件 安全 API
Apache RocketMQ ACL 2.0 全新升级
RocketMQ ACL 2.0 不管是在模型设计、可扩展性方面,还是安全性和性能方面都进行了全新的升级。旨在能够为用户提供精细化的访问控制,同时,简化权限的配置流程。欢迎大家尝试体验新版本,并应用在生产环境中。
187096 3