Copycat - AppendRequest

简介:

对于Command,Configuration都要通过appendEntries的方式,把Entries同步给follower

LeaderState.configure

复制代码
/**
   * Commits the given configuration.
   */
  protected CompletableFuture<Long> configure(Collection<Member> members) {
    final long index;
    try (ConfigurationEntry entry = context.getLog().create(ConfigurationEntry.class)) {
      entry.setTerm(context.getTerm())
        .setTimestamp(System.currentTimeMillis())
        .setMembers(members);
      index = context.getLog().append(entry); //先把configuration写入local log
      LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry);

      // Store the index of the configuration entry in order to prevent other configurations from
      // being logged and committed concurrently. This is an important safety property of Raft.
      configuring = index; //configuring用于互斥
      context.getClusterState().configure(new Configuration(entry.getIndex(), entry.getTerm(), entry.getTimestamp(), entry.getMembers())); //更新ClusterState
    }

    return appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries,把configuration发给follower
      context.checkThread();
      if (isOpen()) {
        // Reset the configuration index to allow new configuration changes to be committed.
        configuring = 0; //configuring完成
      }
    });
  }
复制代码

 

appendCommand

复制代码
 /**
   * Sends append requests for a command to followers.
   */
  private void appendCommand(long index, CompletableFuture<CommandResponse> future) {
    appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到该index
      context.checkThread();
      if (isOpen()) {
        if (commitError == null) {
          applyCommand(index, future); //如果成功,applyCommand
        } else {
          future.complete(logResponse(CommandResponse.builder()
            .withStatus(Response.Status.ERROR)
            .withError(CopycatError.Type.INTERNAL_ERROR)
            .build()));
        }
      }
    });
  }
复制代码

 

LeaderAppender

通过LeaderAppender来完成appendEntries,

复制代码
/**
 * The leader appender is responsible for sending {@link AppendRequest}s on behalf of a leader to followers.
 * Append requests are sent by the leader only to other active members of the cluster.
 *
 * @author <a href="http://github.com/kuujo>Jordan Halterman</a>
 */
final class LeaderAppender extends AbstractAppender {
复制代码

 

append指定的index

复制代码
/**
   * Registers a commit handler for the given commit index.
   *
   * @param index The index for which to register the handler.
   * @return A completable future to be completed once the given log index has been committed.
   */
  public CompletableFuture<Long> appendEntries(long index) {
    if (index == 0) //如果index=0,等同于heartbeat
      return appendEntries();

    if (index <= context.getCommitIndex()) //如果index小于commit index,说明不需要commit
      return CompletableFuture.completedFuture(index);

    // If there are no other stateful servers in the cluster, immediately commit the index.
    if (context.getClusterState().getActiveMemberStates().isEmpty() && context.getClusterState().getPassiveMemberStates().isEmpty()) {
      long previousCommitIndex = context.getCommitIndex();
      context.setCommitIndex(index);
      context.setGlobalIndex(index);
      completeCommits(previousCommitIndex, index);
      return CompletableFuture.completedFuture(index);
    }
    // If there are no other active members in the cluster, update the commit index and complete the commit.
    // The updated commit index will be sent to passive/reserve members on heartbeats.
    else if (context.getClusterState().getActiveMemberStates().isEmpty()) {
      long previousCommitIndex = context.getCommitIndex();
      context.setCommitIndex(index);
      completeCommits(previousCommitIndex, index);
      return CompletableFuture.completedFuture(index);
    }

    // Only send entry-specific AppendRequests to active members of the cluster.
    return appendFutures.computeIfAbsent(index, i –> { //computeIfAbsent,如果map中没有key为index的item,执行后面的函数
      for (MemberState member : context.getClusterState().getActiveMemberStates()) {
        appendEntries(member);
      }
      return new CompletableFuture<>();
    });
  }
复制代码

appendFutures

private final Map<Long, CompletableFuture<Long>> appendFutures = new HashMap<>();

每个index对应的future都cache在appendFutures中,那么什么时候这个future会被complete

 

继续看,对于每个member调用

appendEntries

复制代码
@Override
  protected void appendEntries(MemberState member) {

    // If the member term is less than the current term or the member's configuration index is less
    // than the local configuration index, send a configuration update to the member.
    // Ensure that only one configuration attempt per member is attempted at any given time by storing the
    // member state in a set of configuring members.
    // Once the configuration is complete sendAppendRequest will be called recursively.
    else if (member.getConfigTerm() < context.getTerm() || member.getConfigIndex() < context.getClusterState().getConfiguration().index()) {
      if (member.canConfigure()) {
        sendConfigureRequest(member, buildConfigureRequest(member));
      }
    }
    // If the member is a reserve or passive member, send an empty AppendRequest to it.
    else if (member.getMember().type() == Member.Type.RESERVE || member.getMember().type() == Member.Type.PASSIVE) {
      if (member.canAppend()) {
        sendAppendRequest(member, buildAppendEmptyRequest(member)); //如果是reserve或passive,只需要发heartbeat,即emptyAppend
      }
    }
    // If the member's current snapshot index is less than the latest snapshot index and the latest snapshot index
    // is less than the nextIndex, send a snapshot request.
    else if (member.getMember().type() == Member.Type.ACTIVE && context.getSnapshotStore().currentSnapshot() != null
      && context.getSnapshotStore().currentSnapshot().index() >= member.getNextIndex()
      && context.getSnapshotStore().currentSnapshot().index() > member.getSnapshotIndex()) {
      if (member.canInstall()) {
        sendInstallRequest(member, buildInstallRequest(member));
      }
    }
    // If no AppendRequest is already being sent, send an AppendRequest.
    else if (member.canAppend()) {
      sendAppendRequest(member, buildAppendRequest(member, context.getLog().lastIndex())); //发送AppendRequest
    }
  }
复制代码

 

buildAppendRequest

复制代码
protected AppendRequest buildAppendRequest(MemberState member, long lastIndex) {
    // If the log is empty then send an empty commit.
    // If the next index hasn't yet been set then we send an empty commit first.
    // If the next index is greater than the last index then send an empty commit.
    // If the member failed to respond to recent communication send an empty commit. This
    // helps avoid doing expensive work until we can ascertain the member is back up.
    if (context.getLog().isEmpty() || member.getNextIndex() > lastIndex || member.getFailureCount() > 0) {
      return buildAppendEmptyRequest(member); //这里如果是上面的appendEntries(),hb,会直接发送emptyAppend
    } else {
      return buildAppendEntriesRequest(member, lastIndex);
    }
  }
复制代码

 

buildAppendEntriesRequest

复制代码
/**
   * Builds a populated AppendEntries request.
   */
  protected AppendRequest buildAppendEntriesRequest(MemberState member, long lastIndex) {
    Entry prevEntry = getPrevEntry(member); //查找该member的上一个entry index

    ServerMember leader = context.getLeader();
    AppendRequest.Builder builder = AppendRequest.builder()
      .withTerm(context.getTerm())
      .withLeader(leader != null ? leader.id() : 0)
      .withLogIndex(prevEntry != null ? prevEntry.getIndex() : 0)
      .withLogTerm(prevEntry != null ? prevEntry.getTerm() : 0)
      .withCommitIndex(context.getCommitIndex())
      .withGlobalIndex(context.getGlobalIndex());

    // Calculate the starting index of the list of entries.
    final long index = prevEntry != null ? prevEntry.getIndex() + 1 : context.getLog().firstIndex(); //如果有preEntry,就从+1开始读,如果没有,就从first开始

    // Build a list of entries to send to the member.
    List<Entry> entries = new ArrayList<>((int) Math.min(8, lastIndex - index + 1)); //意思最多8个

    // Build a list of entries up to the MAX_BATCH_SIZE. Note that entries in the log may
    // be null if they've been compacted and the member to which we're sending entries is just
    // joining the cluster or is otherwise far behind. Null entries are simply skipped and not
    // counted towards the size of the batch.
    // If there exists an entry in the log with size >= MAX_BATCH_SIZE the logic ensures that
    // entry will be sent in a batch of size one
    int size = 0;

    // Iterate through remaining entries in the log up to the last index.
    for (long i = index; i <= lastIndex; i++) {
      // Get the entry from the log and append it if it's not null. Entries in the log can be null
      // if they've been cleaned or compacted from the log. Each entry sent in the append request
      // has a unique index to handle gaps in the log.
      Entry entry = context.getLog().get(i);
      if (entry != null) {
        if (!entries.isEmpty() && size + entry.size() > MAX_BATCH_SIZE) { //用MAX_BATCH_SIZE控制batch的大小
          break;
        }
        size += entry.size();
        entries.add(entry);
      }
    }

    // Release the previous entry back to the entry pool.
    if (prevEntry != null) {
      prevEntry.release();
    }

    // Add the entries to the request builder and build the request.
    return builder.withEntries(entries).build();
  }
复制代码

 

sendAppendRequest

复制代码
/**
   * Connects to the member and sends a commit message.
   */
  protected void sendAppendRequest(MemberState member, AppendRequest request) {
    // Set the start time of the member's current commit. This will be used to associate responses
    // with the current commit request.
    member.setHeartbeatStartTime(heartbeatTime);

    super.sendAppendRequest(member, request);
  }
复制代码

 

AbstractAppender

复制代码
/**
   * Connects to the member and sends a commit message.
   */
  protected void sendAppendRequest(MemberState member, AppendRequest request) {
    // Start the append to the member.
    member.startAppend();

    LOGGER.debug("{} - Sent {} to {}", context.getCluster().member().address(), request, member.getMember().address());
    context.getConnections().getConnection(member.getMember().address()).whenComplete((connection, error) –> { // 试着去connect
      context.checkThread();

      if (open) {
        if (error == null) {
          sendAppendRequest(connection, member, request); // 如果连接成功,sendAppendRequest
        } else {
          // Complete the append to the member.
          member.completeAppend();

          // Trigger reactions to the request failure.
          handleAppendRequestFailure(member, request, error);
        }
      }
    });
  }
复制代码

 

复制代码
/**
   * Sends a commit message.
   */
  protected void sendAppendRequest(Connection connection, MemberState member, AppendRequest request) {
    long timestamp = System.nanoTime();
    connection.<AppendRequest, AppendResponse>send(request).whenComplete((response, error) -> { //send
      context.checkThread();

      // Complete the append to the member.
      if (!request.entries().isEmpty()) {
        member.completeAppend(System.nanoTime() - timestamp);
      } else {
        member.completeAppend();
      }

      if (open) {
        if (error == null) {
          LOGGER.debug("{} - Received {} from {}", context.getCluster().member().address(), response, member.getMember().address());
          handleAppendResponse(member, request, response); //如果发送成功
        } else {
          handleAppendResponseFailure(member, request, error); //如果发送失败
        }
      }
    });

    updateNextIndex(member, request);
    if (!request.entries().isEmpty() && hasMoreEntries(member)) {
      appendEntries(member);
    }
  }
复制代码

 

LeaderAppender

复制代码
/**
   * Handles an append response.
   */
  protected void handleAppendResponse(MemberState member, AppendRequest request, AppendResponse response) {
    // Trigger commit futures if necessary.
    updateHeartbeatTime(member, null);

    super.handleAppendResponse(member, request, response);
  }
复制代码
复制代码
/**
   * Handles an append response.
   */
  protected void handleAppendResponse(MemberState member, AppendRequest request, AppendResponse response) {
    if (response.status() == Response.Status.OK) {
      handleAppendResponseOk(member, request, response);
    } else {
      handleAppendResponseError(member, request, response);
    }
  }
复制代码

 

复制代码
/**
   * Handles a {@link Response.Status#OK} response.
   */
  protected void handleAppendResponseOk(MemberState member, AppendRequest request, AppendResponse response) {
    // Reset the member failure count and update the member's availability status if necessary.
    succeedAttempt(member); //更新hb

    // If replication succeeded then trigger commit futures.
    if (response.succeeded()) {
      updateMatchIndex(member, response); //更新match index 

      // If entries were committed to the replica then check commit indexes.
      if (!request.entries().isEmpty()) {
        commitEntries(); //试图commit entries
      }

      // If there are more entries to send then attempt to send another commit.
      if (hasMoreEntries(member)) {
        appendEntries(member);
      }
    }
    // If we've received a greater term, update the term and transition back to follower.
    else if (response.term() > context.getTerm()) {
      context.setTerm(response.term()).setLeader(0);
      context.transition(CopycatServer.State.FOLLOWER);
    }
    // If the response failed, the follower should have provided the correct last index in their log. This helps
    // us converge on the matchIndex faster than by simply decrementing nextIndex one index at a time.
    else {
      resetMatchIndex(member, response);
      resetNextIndex(member);

      // If there are more entries to send then attempt to send another commit.
      if (hasMoreEntries(member)) {
        appendEntries(member);
      }
    }
  }
复制代码

 

关键是commitEntries

复制代码
/**
   * Checks whether any futures can be completed.
   */
  private void commitEntries() {

    // Sort the list of replicas, order by the last index that was replicated
    // to the replica. This will allow us to determine the median index
    // for all known replicated entries across all cluster members.
    List<MemberState> members = context.getClusterState().getActiveMemberStates((m1, m2) -> //将ActiveMembers按照matchIndex排序
      Long.compare(m2.getMatchIndex() != 0 ? m2.getMatchIndex() : 0l, m1.getMatchIndex() != 0 ? m1.getMatchIndex() : 0l));

    // Calculate the current commit index as the median matchIndex.
    long commitIndex = members.get(quorumIndex()).getMatchIndex(); //取出quorum个member的最小match index

    // If the commit index has increased then update the commit index. Note that in order to ensure
    // the leader completeness property holds, we verify that the commit index is greater than or equal to
    // the index of the leader's no-op entry. Update the commit index and trigger commit futures.
    long previousCommitIndex = context.getCommitIndex();
    if (commitIndex > 0 && commitIndex > previousCommitIndex && (leaderIndex > 0 && commitIndex >= leaderIndex)) { //如果quorum个member都已经完成同步
      context.setCommitIndex(commitIndex);
      completeCommits(previousCommitIndex, commitIndex); //commit这次append
    }
  }
复制代码

 

可以看到这里,就是将appendFutures中的future拿出来complete掉,表示这次append已经完成

复制代码
/**
   * Completes append entries attempts up to the given index.
   */
  private void completeCommits(long previousCommitIndex, long commitIndex) {
    for (long i = previousCommitIndex + 1; i <= commitIndex; i++) {
      CompletableFuture<Long> future = appendFutures.remove(i);
      if (future != null) {
        future.complete(i);
      }
    }
  }
复制代码

 

这里发送出AppendRequest,大家是如果处理的?

public void connectServer(Connection connection) {
  connection.handler(AppendRequest.class, request -> state.append(request));

 

可以看到append也只能在server间request,client是不能直接append的

 

可以看到append,在每个state中都有实现,上面的append只会发到所有的active memeber

ActiveState 
复制代码
@Override
  public CompletableFuture<AppendResponse> append(final AppendRequest request) {
    context.checkThread();
    logRequest(request);

    // If the request indicates a term that is greater than the current term then
    // assign that term and leader to the current context and transition to follower.
    boolean transition = updateTermAndLeader(request.term(), request.leader());

    CompletableFuture<AppendResponse> future = CompletableFuture.completedFuture(logResponse(handleAppend(request)));

    // If a transition is required then transition back to the follower state.
    // If the node is already a follower then the transition will be ignored.
    if (transition) {
      context.transition(CopycatServer.State.FOLLOWER);
    }
    return future;
  }
复制代码

最终会调用到,

复制代码
@Override
  protected AppendResponse appendEntries(AppendRequest request) {
    // Get the last entry index or default to the request log index.
    long lastEntryIndex = request.logIndex();
    if (!request.entries().isEmpty()) {
      lastEntryIndex = request.entries().get(request.entries().size() - 1).getIndex();
    }

    // Ensure the commitIndex is not increased beyond the index of the last entry in the request.
    long commitIndex = Math.max(context.getCommitIndex(), Math.min(request.commitIndex(), lastEntryIndex));

    // Iterate through request entries and append them to the log.
    for (Entry entry : request.entries()) {
      // If the entry index is greater than the last log index, skip missing entries.
      if (context.getLog().lastIndex() < entry.getIndex()) {
        context.getLog().skip(entry.getIndex() - context.getLog().lastIndex() - 1).append(entry);
        LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
      } else if (context.getCommitIndex() >= entry.getIndex()) {
        continue;
      } else {
        // Compare the term of the received entry with the matching entry in the log.
        long term = context.getLog().term(entry.getIndex());
        if (term != 0) {
          if (entry.getTerm() != term) {
            // We found an invalid entry in the log. Remove the invalid entry and append the new entry.
            // If appending to the log fails, apply commits and reply false to the append request.
            LOGGER.debug("{} - Appended entry term does not match local log, removing incorrect entries", context.getCluster().member().address());
            context.getLog().truncate(entry.getIndex() - 1).append(entry);
            LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
          }
        } else {
          context.getLog().truncate(entry.getIndex() - 1).append(entry);
          LOGGER.debug("{} - Appended {} to log at index {}", context.getCluster().member().address(), entry, entry.getIndex());
        }
      }
    }

    // If we've made it this far, apply commits and send a successful response.
    LOGGER.debug("{} - Committed entries up to index {}", context.getCluster().member().address(), commitIndex);
    context.setCommitIndex(commitIndex);
    context.setGlobalIndex(request.globalIndex());

    // Apply commits to the local state machine.
    context.getStateMachine().applyAll(context.getCommitIndex()); //apply commit

    return AppendResponse.builder()
      .withStatus(Response.Status.OK)
      .withTerm(context.getTerm())
      .withSucceeded(true)
      .withLogIndex(context.getLog().lastIndex())
      .build();
  }
复制代码

可以看到逻辑,就是把entry加到log中,然后apply这个commit

 

FollowerState
复制代码
@Override
  public CompletableFuture<AppendResponse> append(AppendRequest request) {
    CompletableFuture<AppendResponse> future = super.append(request);

    // Reset the heartbeat timeout.
    resetHeartbeatTimeout();

    // Send AppendEntries requests to passive members if necessary.
    appender.appendEntries(); //同步到passive
    return future;
  }
复制代码

只是多了对passive的同步

 

心跳

一种特殊的append,

heartbeat,append空的entries

LeaderAppender
final class LeaderAppender extends AbstractAppender {
  private CompletableFuture<Long> heartbeatFuture;
  private CompletableFuture<Long> nextHeartbeatFuture;
 
复制代码
/**
   * Triggers a heartbeat to a majority of the cluster.
   * <p>
   * For followers to which no AppendRequest is currently being sent, a new empty AppendRequest will be
   * created and sent. For followers to which an AppendRequest is already being sent, the appendEntries()
   * call will piggyback on the *next* AppendRequest. Thus, multiple calls to this method will only ever
   * result in a single AppendRequest to each follower at any given time, and the returned future will be
   * shared by all concurrent calls.
   *
   * @return A completable future to be completed the next time a heartbeat is received by a majority of the cluster.
   */
  public CompletableFuture<Long> appendEntries() {
    // If there are no other active members in the cluster, simply complete the append operation.
    if (context.getClusterState().getRemoteMemberStates().isEmpty())
      return CompletableFuture.completedFuture(null);

    // If no heartbeat future already exists, that indicates there's no heartbeat currently under way.
    // Create a new heartbeat future and commit to all members in the cluster.
    if (heartbeatFuture == null) {
      CompletableFuture<Long> newHeartbeatFuture = new CompletableFuture<>();
      heartbeatFuture = newHeartbeatFuture;
      heartbeatTime = System.currentTimeMillis();
      for (MemberState member : context.getClusterState().getRemoteMemberStates()) {
        appendEntries(member);
      }
      return newHeartbeatFuture;
    }
    // If a heartbeat future already exists, that indicates there is a heartbeat currently underway.
    // We don't want to allow callers to be completed by a heartbeat that may already almost be done.
    // So, we create the next heartbeat future if necessary and return that. Once the current heartbeat
    // completes the next future will be used to do another heartbeat. This ensures that only one
    // heartbeat can be outstanding at any given point in time.
    else if (nextHeartbeatFuture == null) {
      nextHeartbeatFuture = new CompletableFuture<>();
      return nextHeartbeatFuture;
    } else {
      return nextHeartbeatFuture;
    }
  }
复制代码

注释很清楚,heartbeat是发往getRemoteMemberStates,所有members的

heartbeatFuture 和 nextHeartbeatFuture分别表示本次和下次hb的future

 

LeaderState

心跳是被定期触发的,

startAppendTimer

复制代码
/**
   * Starts sending AppendEntries requests to all cluster members.
   */
  private void startAppendTimer() {
    // Set a timer that will be used to periodically synchronize with other nodes
    // in the cluster. This timer acts as a heartbeat to ensure this node remains
    // the leader.
    LOGGER.debug("{} - Starting append timer", context.getCluster().member().address());
    appendTimer = context.getThreadContext().schedule(Duration.ZERO, context.getHeartbeatInterval(), this::appendMembers);
  }
复制代码
向所有的member定期发送心跳

复制代码
/**
   * Sends AppendEntries requests to members of the cluster that haven't heard from the leader in a while.
   */
  private void appendMembers() {
    context.checkThread();
    if (isOpen()) {
      appender.appendEntries();
    }
  }
复制代码

 

heartbeatFuture 和 nextHeartbeatFuture是什么时候被complete的?

复制代码
@Override
  protected void handleConfigureResponse(MemberState member, ConfigureRequest request, ConfigureResponse response) {
    // Trigger commit futures if necessary.
    updateHeartbeatTime(member, null);

    super.handleConfigureResponse(member, request, response);
  }
复制代码

 

updateHeartbeatTime

复制代码
/**
   * Sets a commit time or fails the commit if a quorum of successful responses cannot be achieved.
   */
  private void updateHeartbeatTime(MemberState member, Throwable error) {
    if (heartbeatFuture == null) {
      return;
    }

    if (error != null && member.getHeartbeatStartTime() == heartbeatTime) { //如果hb response有error
      int votingMemberSize = context.getClusterState().getActiveMemberStates().size() + (context.getCluster().member().type() == Member.Type.ACTIVE ? 1 : 0);
      int quorumSize = (int) Math.floor(votingMemberSize / 2) + 1;
      // If a quorum of successful responses cannot be achieved, fail this heartbeat. Ensure that only
      // ACTIVE members are considered. A member could have been transitioned to another state while the
      // heartbeat was being sent.
      if (member.getMember().type() == Member.Type.ACTIVE && ++heartbeatFailures > votingMemberSize - quorumSize) { //如果超过votingMemberSize - quorumSize的member hb失败
        heartbeatFuture.completeExceptionally(new InternalException("Failed to reach consensus")); //此次hb失败
        completeHeartbeat();
      }
    } else {
      member.setHeartbeatTime(System.currentTimeMillis()); //更新该member的hb

      // Sort the list of commit times. Use the quorum index to get the last time the majority of the cluster
      // was contacted. If the current heartbeatFuture's time is less than the commit time then trigger the
      // commit future and reset it to the next commit future.
      if (heartbeatTime <= heartbeatTime()) { //如果大于quorum member的hb都比上次hb的时间新
        heartbeatFuture.complete(null); //hb成功,complete heartbeatFuture
        completeHeartbeat();
      }
    }
  }

  /**
   * Returns the last time a majority of the cluster was contacted.
   * <p>
   * This is calculated by sorting the list of active members and getting the last time the majority of
   * the cluster was contacted based on the index of a majority of the members. So, in a list of 3 ACTIVE
   * members, index 1 (the second member) will be used to determine the commit time in a sorted members list.
   */
  private long heartbeatTime() {
    int quorumIndex = quorumIndex();
    if (quorumIndex >= 0) {
      return context.getClusterState().getActiveMemberStates((m1, m2)-> Long.compare(m2.getHeartbeatTime(), m1.getHeartbeatTime())).get(quorumIndex).getHeartbeatTime();
    }
    return System.currentTimeMillis();
  }
复制代码

 

可以看到这里,会把heartbeatFuture complete掉,无论是completeExceptionally还是complete

 

completeHeartbeat

复制代码
/**
   * Completes a heartbeat by replacing the current heartbeatFuture with the nextHeartbeatFuture, updating the
   * current heartbeatTime, and starting a new {@link AppendRequest} to all active members.
   */
  private void completeHeartbeat() {
    heartbeatFailures = 0;
    heartbeatFuture = nextHeartbeatFuture; //把nextHeartbeatFuture给heartbeatFuture
    nextHeartbeatFuture = null;
    updateGlobalIndex();
    if (heartbeatFuture != null) {
      heartbeatTime = System.currentTimeMillis(); //更新heartbeatTime
      for (MemberState member : context.getClusterState().getRemoteMemberStates()) {
        appendEntries(member); //再来一轮
      }
    }
  }
复制代码

 

前面定期触发时,并不会关系heartbeat是否成功,所以没有处理返回的future

 

但在appendLinearizableQuery时,需要hb成功,才能query

原因是,如果没有majority,相应我的hb,可能已经发生partition,这时已经无法保证LinearizableQuery

LeaderState

复制代码
private void appendLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {
    appender.appendEntries().whenComplete((commitIndex, commitError) -> {
      context.checkThread();
      if (isOpen()) {
        if (commitError == null) {
          entry.acquire();
          sequenceLinearizableQuery(entry, future);
        } else {
          future.complete(logResponse(QueryResponse.builder()
            .withStatus(Response.Status.ERROR)
            .withError(CopycatError.Type.QUERY_ERROR)
            .build()));
        }
      }
      entry.release();
    });
  }
复制代码
相关文章
|
3天前
|
SQL 关系型数据库 分布式数据库
Doodle Jump — 使用Flutter&Flame开发游戏真不错!
用Flutter&Flame开发游戏是一种什么体验?最近网上冲浪的时候,我偶然发现了一个国外的游戏网站,类似于国内的4399。在浏览时,我遇到了一款经典的小游戏:Doodle Jump...
|
10天前
|
弹性计算 运维 安全
访问控制(RAM)|云上程序使用临时凭证的最佳实践
STS临时访问凭证是阿里云提供的一种临时访问权限管理服务,通过STS获取可以自定义时效和访问权限的临时身份凭证,减少长期访问密钥(AccessKey)泄露的风险。本文将为您介绍产品原理,以及具体的使用步骤。
151031 3
|
9天前
|
数据采集 存储 运维
提升团队工程交付能力,从“看见”工程活动和研发模式开始
本文从统一工程交付的概念模型开始,介绍了如何将应用交付的模式显式地定义出来,并通过工具平台落地。
119967 48
|
9天前
|
监控 负载均衡 Java
深入探究Java微服务架构:Spring Cloud概论
**摘要:** 本文深入探讨了Java微服务架构中的Spring Cloud,解释了微服务架构如何解决传统单体架构的局限性,如松耦合、独立部署、可伸缩性和容错性。Spring Cloud作为一个基于Spring Boot的开源框架,提供了服务注册与发现、负载均衡、断路器、配置中心、API网关等组件,简化了微服务的开发、部署和管理。文章详细介绍了Spring Cloud的核心模块,如Eureka、Ribbon、Hystrix、Config、Zuul和Sleuth,并通过一个电商微服务系统的实战案例展示了如何使用Spring Cloud构建微服务应用。
103493 8
|
10天前
|
人工智能 Serverless 对象存储
让你的文档从静态展示到一键部署可操作验证
通过函数计算的能力让阿里云的文档从静态展示升级为动态可操作验证,用户在文档中单击一键部署可快速完成代码的部署及测试。这一改变已在函数计算的活动沙龙中得到用户的认可。
120388 194
|
10天前
|
SQL 存储 数据可视化
Ganos H3地理网格能力解析与最佳实践
本文介绍了Ganos H3的相关功能,帮助读者快速了解Ganos地理网格的重要特性与应用实践。H3是Uber研发的一种覆盖全球表面的二维地理网格,采用了一种全球统一的、多层次的六边形网格体系来表示地球表面,这种地理网格技术在诸多业务场景中得到广泛应用。Ganos不仅提供了H3网格的全套功能,还支持与其它Ganos时空数据类型进行跨模联合分析,极大程度提升了客户对于时空数据的挖掘分析能力。
|
10天前
|
存储 缓存 安全
深度解析JVM世界:JVM内存结构
深度解析JVM世界:JVM内存结构
|
17天前
|
人工智能 编解码 对象存储
一键生成视频!用 PAI-EAS 部署 AI 视频生成模型 SVD 工作流
本教程将带领大家免费领取阿里云PAI-EAS的免费试用资源,并且带领大家在 ComfyUI 环境下使用 SVD的模型,根据任何图片生成一个小短视频。
|
15天前
|
数据采集 运维 监控
DataphinV4.0来啦:自定义全局角色 ,实时研发覆盖全部署场景,个性化企业配置看本期
本次V4.0版本升级,Dataphin支持自定义全局角色、自定义逻辑表命名规范、Flink on K8s的部署模式,提升企业级适配能力,灵活匹配企业特色;将集成任务快速从组件模式切换为脚本模式、支持外部触发类型节点等,提升研发平台易用性,助力高效开发便捷运维。
90970 1