Copycat - Overview

简介:

Copycat’s primary role is as a framework for building highly consistent, fault-tolerant replicated state machines.

Copycat servers receive state machine operations from clients, log and replicate the operations as necessary, and apply them to a state machine on each server.

State machine operations are guaranteed to be applied in the same order on all servers, and Copycat handles the persistence and replication of the state machine state internally.

 

Copycat是用来管理分布式状态机的,要保证所有操作以相同的顺序在每个server上被执行,从而得到一直的状态机的状态

为了做fault-tolerant,当状态机crash可以恢复,所以要先把operation写入log,并保证所有server上的log是一致的,这样只需要按log回放就可以得到一致的状态

这种replication技术,成为operation transfer

还有一种是state transfer

参考,Data replication 同步技术

 

也可以参考kudu的论文,kudu

Kudu does not replicate the on-disk storage of a tablet, but rather just its operation log. 
The physical storage of each replica of a tablet is fully decoupled.

这样做对于server的状态机,或kudu所说的tablet存储是不感知分布式的,fully decoupled;

 

用户使用Copycat,

首先需要创建一个statemachine类,这就是用户需要同步的对象,

public class MapStateMachine extends StateMachine {
}

 

 

Copycat replicated state machines are modified and queried by defining operations through which a client and state machine can communicate.

Operations are replicated by the Copycat cluster and are translated into arguments to methods on the replicated state machine. 
Users must define the interface between the client and the cluster by implementing Operation classes that clients will submit to the replicated state machine.

然后用户要定义,这个StateMachine之上的操作,

操作分为两类,

Command,可以修改状态机

Query,只读

 

Command

For example, in a map state machine some commands might include put and remove. To implement a state machine command, simply implement theCommand interface.

复制代码
public class PutCommand implements Command<Object> {
  private final Object key;
  private final Object value;

  public PutCommand(Object key, Object value) {
    this.key = key;
    this.value = value;
  }

  public Object key() {
    return key;
  }

  public Object value() {
    return value;
  }
}
复制代码

上面就定义一个put command,这个命令就是要把key:value put到状态机

 

Query

Queries are state machine operations that read the system’s state but do not modify it. For example, in a map state machine some queries might include getsize, and isEmpty. To implement a state machine query, implement the Queryinterface.

复制代码
public class GetQuery implements Query<Object> {
  private final Object key;

  public GetQuery(Object key) {
    this.key = key;
  }

  public Object key() {
    return key;
  }
}
复制代码

 

再者,要在状态机上实现这些操作,

Implementing State Machine Operations

State machine operations are implemented as public methods on the state machine class which accept a singleCommitparameter where the generic argument for the commit is the operation accepted by the method. Copycat automatically detects the command or query that applies to a given state machine methods based on the generic argument to the Commitparameter.

复制代码
public class MapStateMachine extends StateMachine {
  private Map<Object, Object> map = new HashMap<>();

  public Object put(Commit<PutCommand> commit) {
    try {
      map.put(commit.operation().key(), commit.operation().value());
    } finally {
      commit.close();
    }
  }

  public Object get(Commit<GetQuery> commit) {
    try {
      return map.get(commit.operation().key());
    } finally {
      commit.close();
    }
  }
}
复制代码

Commit可以认为是command的封装

snapshot逻辑的实现,

State machine operations are replicated and written to a log on disk on each server in the cluster.

As commands are submitted to the cluster over time, the disk capacity will eventually be consumed. 
Copycat must periodically remove unneeded commands from the replicated log to conserve disk space. This is known as log compaction.

log越来越大就需要删掉老的log,但是为了保证数据不丢,就需要把当前的statemachine做snapshot存储下来;这样就可以把当前状态以前的log给删除掉

复制代码
public class MapStateMachine extends StateMachine implements Snapshottable {
  private Map<Object, Object> map = new HashMap<>();

  @Override
  public void snapshot(SnapshotWriter writer) {
    writer.writeObject(map);
  }

  @Override
  public void install(SnapshotReader reader) {
    map = reader.readObject();
  }
}
复制代码

For snapshottable state machines, Copycat will periodically request a binary snapshot of the state machine’s state and write the snapshot to disk. If the server is restarted, the state machine’s state will be recovered from the on-disk snapshot. When a new server joins the cluster, the snapshot of the state machine will be replicated to the joining server to catch up its state. This allows Copycat to remove commits that contributed to the snapshot from the replicated log, thus conserving disk space.

 

最后,创建cluster

1. 先建立一个server,

Once a state machine and its operations have been defined, we can create a CopycatServer to manage the state machine.

Address address = new Address("123.456.789.0", 5000);
CopycatServer.Builder builder = CopycatServer.builder(address);
builder.withStateMachine(MapStateMachine::new);

用我们上面定义的MapStateMachine,拉起server

复制代码
builder.withTransport(NettyTransport.builder()
  .withThreads(4)
  .build());

builder.withStorage(Storage.builder()
  .withDirectory(new File("logs"))
  .withStorageLevel(StorageLevel.DISK)
  .build());

CopycatServer server = builder.build();
复制代码

可以自定义的,transport和storage

注册我们定义的command

One final task is necessary to complete the configuration of the server. We’ve created two state machine operations -PutCommand and GetQuery - which are Serializable. By default, Copycat’s serialization framework will serialize these operations using Java’s serialization. However, users can explicitly register serializable classes and implement custom binary serializers for more efficient serialization.

server.serializer().register(PutCommand.class);
server.serializer().register(GetQuery.class);

serializer默认是Java’s serialization,如果对性能有要求,可以自己实现序列化

 

2. 拉起集群

Bootstrapping the Cluster

Once the server has been built, we can bootstrap a new cluster by calling the bootstrap() method:

CompletableFuture<CopycatServer> future = server.bootstrap();
future.join();

When a server is bootstrapped, it forms a new single node cluster to which additional servers can be joined.

 

3. 加入已有集群

Joining an Existing Cluster

Once an initial cluster has been bootstrapped, additional servers can be added to the cluster via the join()method. When joining an existing cluster, the existing cluster configuration must be provided to the joinmethod:

Collection<Address> cluster = Collections.singleton(new Address("127.0.0.1", 8700))
server.join(cluster).join();
相关文章
|
1月前
|
UED
什么是 SAP Fiori 的 Technical Catalog 和 Business Catalog
什么是 SAP Fiori 的 Technical Catalog 和 Business Catalog
30 0
|
存储 SQL Cloud Native
GalaxyEngine Overview
PolarDB-X 是由阿里巴巴自主研发的云原生分布式数据库,融合了分布式 SQL 引擎 GalaxySQL 和分布式存储引擎 GalaxyEngine,其中 GalaxyEngine 是新一代面向分布式场景的 MySQL 发行版本,作为官方 MySQL 版本的一个分支,除了吸收和跟进官方版本的持续改进以外,尤其在分布式场景下,实现了 Lizard 分布式事务和全局一致性解决方案、 Galaxy X-Protocol 交互协议 pipeline request、 X-Engine 存储引擎、 Galaxy X-Paxos Cluster 保证数据零丢失并持续可用,以及共享的 RDS MySQL
1114 0
GalaxyEngine Overview
|
设计模式 前端开发 JavaScript
Overview|学习笔记
快速学习Overview
71 0
|
XML 数据格式
Some more technical details about SAP note
I use this note 2184333 which I am responsible for as an example:
121 0
Some more technical details about SAP note
Uptime And Monitoring Strategies For Cloud-Based E-Commerce Applications/Websites
In order to keep your e-commerce site functioning properly, you need to take positive steps to monitor both its performance and functionality.
1484 0

热门文章

最新文章