ceph monitor paxos的实现(一)

简介: ceph monitor的一个主要功能是使用paxos分布式式协议维护一个key/value数据库的一致性。所使用的数据库引擎一般是leveldb。 数据库接口抽象 ----------- 为了适应不同的数据库引擎, ceph定义一个MonitorDBStore类来抽象对k/v数据库的操作。对后端数据库要求是支持事务或者原子性的key/value批量更新。它定义个一 Transa

ceph monitor的一个主要功能是使用paxos分布式式协议维护一个key/value数据库的一致性。所使用的数据库引擎一般是leveldb。

数据库接口抽象

为了适应不同的数据库引擎, ceph定义一个MonitorDBStore类来抽象对k/v数据库的操作。对后端数据库要求是支持事务或者原子性的key/value批量更新。它定义个一 Transaction类来说明一个事务包含的所有操作,并且这个类是可以序列化和反序列化的,以便在服务器之间传送:

struct Op {
    uint8_t type;
    string prefix;
    string key, endkey;
    bufferlist bl;
}

struct Transaction {
    list<Op> ops;
    uint64_t bytes, keys;
                                                                              
    Transaction() : bytes(0), keys(0) {}
                                                                                
    enum { 
      OP_PUT    = 1,
      OP_ERASE  = 2,
      OP_COMPACT = 3,
    };

例如它定义个put、erase的成员函数操作:


// 设置一个key的value
void put(string prefix, string key, bufferlist& bl) {
      ops.push_back(Op(OP_PUT, prefix, key, bl));
      ++keys;
      bytes += prefix.length() + key.length() + bl.length();
}
   
// 删除一个key
void erase(string prefix, string key) {
      ops.push_back(Op(OP_ERASE, prefix, key));
      ++keys;
      bytes += prefix.length() + key.length();
}
    

而序列化、反序列化函数:

void encode(bufferlist& bl) const { 
      ENCODE_START(2, 1,bl);
      ::encode(ops,bl);
      ::encode(bytes, bl);
      ::encode(keys, bl);
      ENCODE_FINISH(bl);
}
                                                                                
void decode(bufferlist::iterator& bl) {
      DECODE_START(2, bl);
      ::decode(ops, bl);
      if (struct_v >= 2) {
        ::decode(bytes,bl);
        ::decode(keys, bl);
      }
      DECODE_FINISH(bl);
}
    

ceph 主要用这个MonitorDBStore来为各个需要使用paxos的模块提供存储,为了各个模块不相互干扰,每个模块会选择一个前缀, 所有属于这个模块的数据都使用这个prefix再加上 一个key,才构成后端数据库真正的key, 具体结构时这样的:

prefix + '\0' + key

MonitorDBStore的API 主要是

int apply_transaction(MonitorDBStore::TransactionRef t)

负责把Transaction的每一条操作以原子方式在后端数据库执行,是一个同步操作,而

queue_transaction(MonitorDBStore::TransactionRef t,Context *oncommit)

是一个异步操作,事务完成后会回调一个从Context导出的类对象,类似于C语言中的回调函数。

除此以外,MonitorDBStore还有get操作

int get(const string& prefix, const string& key, bufferlist& bl);
int get(const string& prefix, const version_t ver, bufferlist& bl);

定义迭代器用来批量获取数据,它可以指定几个prefix, 并批量把数据追加到一个Transaction里面,以便在服务器见批量传数据, 可以预见加进去的数据操作是put操作

class WholeStoreIteratorImpl : public StoreIteratorImpl {
    KeyValueDB::WholeSpaceIterator iter;
    set<string> sync_prefixes;
public:
    WholeStoreIteratorImpl(KeyValueDB::WholeSpaceIterator(
    set<string> &prefixes) : StoreIteratorImpl(), iter(iter),
   sync_prefixes(prefixes)
    { }
    bool add_chunk_entry(TransactionRef tx
                         string &prefix,
                         string &key,
                         bufferlist &value,
                         uint64_t max); 
}
    

paxos数据在MonitorDB上的存放格式

ceph内部使用了log来记录最近一段时间的操作,log存放在leveldb中,key的前缀‘paxos’被paxos核心模块保留。每一条log一个key, key的组成是paxos前缀+
index, index是用整数来表示的,顺序增加。为了加快log的查询, 还用"first_committed" "last_committed", 两个key来表示这段log, 前者是第一条log,后者是最后一条log。

monitor启动时的数据同步

每次monitor server启动时都会按照monmap中的服务器地址去连接其他monitor服务器,并同步数据。这个过程叫做bootstrap(). bootstrap的第一个目的是补全数据,从其他服务拉缺失的paxos log或者全量复制数据库,其次是在必要时形成多数派建立一个paxos集群或者加入到已有的多数派中。

启动时将自己加入到一个外部法人集合,因为刚开始自己肯定不是在多数派中:

// i'm outside the quorum
if (monmap->contains(name))
    outside_quorum.insert(name);

然后给其它所有它知道的服务器发送探测包:

  // probe monitors
  dout(10) << "probing other monitors" << dendl;
  for (unsigned i = 0; i < monmap->size(); i++) {
    if ((int)i != rank)
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined),
                  monmap->get_inst(i));
  }
  for (set<entity_addr_t>::iterator p = extra_probe_peers.begin();
       p != extra_probe_peers.end();
       ++p) {
    if (*p != messenger->get_myaddr()) {
      entity_inst_t i;
      i.name = entity_name_t::MON(-1);
      i.addr = *p;
      messenger->send_message(new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined), i);
    }
  }

任何一个服务器收到探测包都会比较自己的最后一次修改数据的版本是否落后于正在探测的服务器的数据:

 if (!is_probing() && !is_synchronizing()) {
    // If the probing mon is way ahead of us, we need to re-bootstrap.
    // Normally we capture this case when we initially bootstrap, but
    // it is possible we pass those checks (we overlap with
    // quorum-to-be) but fail to join a quorum before it moves past
    // us.  We need to be kicked back to bootstrap so we can
    // synchonize, not keep calling elections.
    if (paxos->get_version() + 1 < m->paxos_first_version) {
      dout(1) << " peer " << m->get_source_addr() << " has first_committed " << "ahead of us, re-bootstrapping" << dendl;
      bootstrap();
      goto out;

    }
  }

对于被探测的服务器,如果最后一条log的index number都跟不上对方的第一条记录的index number,意味着已经落后太多了,中间log记录已经缺失,不可能让paxos核心部分通过log来传播数据到本进程以获得数据的最终版本,本进程需要重启bootstrap从对方主动拉数据。此时不会带对方的探测包返回应答。正常情况,我们会报告本服务器的paxos状态:

r = new MMonProbe(monmap->fsid, MMonProbe::OP_REPLY, name, has_ever_joined);
  r->name = name;
  r->quorum = quorum;
  monmap->encode(r->monmap_bl, m->get_connection()->get_features());
  r->paxos_first_version = paxos->get_first_committed();
  r->paxos_last_version = paxos->get_version();
  m->get_connection()->send_message(r);

  // did we discover a peer here?
  if (!monmap->contains(m->get_source_addr())) {
    dout(1) << " adding peer " << m->get_source_addr()
        << " to list of hints" << dendl;
    extra_probe_peers.insert(m->get_source_addr());
  }

主要内容包括我们是否是多数派的一员(通过返回多数派成员列表),以及我的paxos log的第一条记录号和最后一条记录号。

一旦一个发出探测包的服务器收到一个应答也会检查paxos log是否过时:

if (paxos->get_version() < m->paxos_first_version &&
    m->paxos_first_version > 1) {  // no need to sync if we're 0 and they start at 1.
      dout(10) << " peer paxos versions [" << m->paxos_first_version
           << "," << m->paxos_last_version << "]"
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, true);
      m->put();
      return;
    }
    if (paxos->get_version() + g_conf->paxos_max_join_drift < m->paxos_last_version) {
      dout(10) << " peer paxos version " << m->paxos_last_version
           << " vs my version " << paxos->get_version()
           << " (too far ahead)"
           << dendl;
      cancel_probe_timeout();
      sync_start(other, false);
      m->put();
      return;
    }

一种情况是我的最后一条log记录和对方的第一条log记录之间有空隙,中间有缺失,只能主动从对方拉数据,道理与上面相同。还有一种是根据配置变量paxos_max_join_drift,数据并没有缺失,但是要传的log超过一个阀值,不如全量从对方复制数据。

输入探测方发现不需要在这个阶段复制数据,并且对方就是多数派的一员,那么可以肯定它的数据是和其他服务器同步的,至少应该乐观的认为,:-) ,所以直接加入到多数派去:

if (m->quorum.size()) { // 多数派列表非空
   if (monmap->contains(name) &&
        !monmap->get_addr(name).is_blank_ip()) {
      // i'm part of the cluster; just initiate a new election
      // 我的地址他们都知道了, 通过start_election选举后可以加入多数派
      start_election();
   } else {
    // 需要通知leader把我的地址修改了,然后会probe time会超时后重启bootstrap
    dout(10) << " ready to join, but i'm not in the monmap or my addr is blank, trying to join" << dendl;
      messenger->send_message(new MMonJoin(monmap->fsid, name, messenger->get_myaddr()),
                              monmap->get_inst(*m->quorum.begin()));
   }
}
else {
    //如果对方也不是当前多数派的一员,并且是属于monmap的一员,那么把它列入到在多数派外面的人
    if (monmap->contains(m->name)) {
      dout(10) << " mon." << m->name << " is outside the quorum" << dendl;
      outside_quorum.insert(m->name);
    } else {
      dout(10) << " mostly ignoring mon." << m->name << ", not part of monmap" << dendl;
      m->put();
      return;
    }

    //一旦发现不在多数派的人数超过2F + 1 (包括自己), 说明集群不存在多数派,就可以通过选举来形成多数派
    unsigned need = monmap->size() / 2 + 1;
    dout(10) << " outside_quorum now " << outside_quorum << ", need " << need << dendl;
    if (outside_quorum.size() >= need) {
      if (outside_quorum.count(name)) {
        dout(10) << " that's enough to form a new quorum, calling election" << dendl;
        start_election();
      } else {
        dout(10) << " that's enough to form a new quorum, but it does not include me; waiting" << dendl;
      }
    } else {
      dout(10) << " that's not yet enough for a new quorum, waiting" << dendl;
    }
  }

本章总结

ceph monitor通过bootstrap过程,探测服务器列表中的各个服务器,比对log的最小记录号和最大记录号,直到本机数据的log历史(第一条记录和最后一条记录)都与所有其他服务器有交集,说明本机没有漏掉数据,从而进入多数派的形成过程,为paxos核心部分只通过传播log就可以同步数据创造条件。在boostrap阶段,服务器分析是否存在一个多数派,必要是通过进入竞选形成多数派。在这个阶段的全量同步和部分数据传输,没有介绍,因为相对简单,可以通过阅读ceph源码获得。
本章并未涉及ceph paxos设计最核心部分,有时间再介绍。

目录
相关文章
|
分布式计算 资源调度 Spark
Spark Master资源调度--SparkContext向所有master注册
Spark Master资源调度–SparkContext向所有master注册 更多资源 github: https://github.
745 0
|
JavaScript 前端开发
|
Java 分布式数据库 Apache
Hbase集群master.HMasterCommandLine: Master exiting
2016-12-15 17:01:57,473 INFO [main] impl.MetricsSystemImpl: HBase metrics system started 2016-12-15 17:01:59,649 ERROR [main] master.
2542 0
|
JSON 监控 关系型数据库