HBase+Spark社区 + 关注
手机版

HBase Coprocessor的实现与应用

  1. 云栖社区>
  2. HBase+Spark社区>
  3. 博客>
  4. 正文

HBase Coprocessor的实现与应用

hbase小能手 发布时间:2018-11-16 17:02:13 浏览702 评论0

摘要: 本文来自于中国HBase技术社区武汉站HBase MeetUp线下交流会的烽火大数据平台研发负责人叶铿(云端浪子)。 HBase Coprocessor的实现与应用PPT下载:http://hbase.

本文来自于中国HBase技术社区武汉站HBase MeetUp线下交流会的烽火大数据平台研发负责人叶铿(云端浪子)。

HBase Coprocessor的实现与应用PPT下载:http://hbase.group/slides/159


ef883f0bd0cc5164a20c21ec7e5ec28d78723bee

本次分享的内容主要分为以下五点:

  1. Coprocessor简介

  2. Endpoint服务端实现

  3. Endpoint客户端实现

  4. Observer实现二级索引

  5. Coprocessor应用场景


1.Coprocessor简介

HBase协处理器的灵感来自于Jeff Dean 09年的演讲,根据该演讲实现类似于Bigtable的协处理器,包括以下特性:每个表服务器的任意子表都可以运行代码客户端的高层调用接口(客户端能够直接访问数据表的行地址,多行读写会自动分片成多个并行的RPC调用),提供一个非常灵活的、可用于建立分布式服务的数据模型,能够自动化扩展、负载均衡、应用请求路由。HBase的协处理器灵感来自Bigtable,但是实现细节不尽相同。HBase建立框架为用户提供类库和运行时环境,使得代码能够在HBase Region Server和Master上面进行处理。


a5e6efe56123420dc5710262c59dccd84c6fbb1b



(1)实现目的

  • HBase无法轻易建立“二级索引”;

  • 执行求和、计数、排序等操作比较困难,必须通过MapReduce/Spark实现,对于简单的统计或聚合计算时,可能会因为网络与IO开销大而带来性能问题。


(2)灵感来源

灵感来源于Bigtable的协处理器,包含如下特性:

  • 每个表服务器的任意子表都可以运行代码;

  • 客户端能够直接访问数据表的行,多行读写会自动分片成多个并行的RPC调用。


(3)提供接口

  • RegionObserver:提供客户端的数据操纵事件钩子:Get、Put、Delete、Scan等;

  • WALObserver:提供WAL相关操作钩子;

  • MasterObserver:提供DDL-类型的操作钩子。如创建、删除、修改数据表等;

  • Endpoint:终端是动态RPC插件的接口,它的实现代码被安装在服务器端,能够通过HBase RPC调用唤醒。


(4)应用范围

  • 通过使用RegionObserver接口可以实现二级索引的创建和维护;

  • 通过使用Endpoint接口,在对数据进行简单排序和sum,count等统计操作时,能够极大提高性能。

本文将通过具体实例来演示两种协处理器的开发方法的详细实现过程。


2.Endpoint服务端实现

在传统关系型数据库里面,可以随时的对某列进行求和sum,但是目前HBase目前所提供的接口,直接求和是比较困难的,所以先编写好服务端代码,并加载到对应的Table上,加载协处理器有几种方法,可以通过HTableDescriptor的addCoprocessor方法直接加载,同理也可以通过removeCoprocessor方法卸载协处理器。

Endpoint协处理器类似传统数据库的存储过程,客户端调用Endpoint协处理器执行一段Server端代码,并将Server端代码的结果返回给Client进一步处理,最常见的用法就是进行聚合操作。举个例子说明:如果没有协处理器,当用户需要找出一张表中的最大数据即max聚合操作,必须进行全表扫描,客户端代码遍历扫描结果并执行求max操作,这样的方法无法利用底层集群的并发能力,而将所有计算都集中到Client端统一执行, 效率非常低。但是使用Coprocessor,用户将求max的代码部署到HBase Server端,HBase将利用底层Cluster的多个节点并行执行求max的操作即在每个Region范围内执行求最大值逻辑,将每个Region的最大值在Region Server端计算出,仅仅将该max值返回给客户端。客户端进一步将多个Region的max进一步处理而找到其中的max,这样整体执行效率提高很多。但是一定要注意的是Coprocessor一定要写正确,否则导致RegionServer宕机。

91b197c99d399fc4d4269ca5ee856d04f91d4367


Protobuf定义

如前所述,客户端和服务端之间需要进行RPC通信,所以两者间需要确定接口,当前版本的HBase的协处理器是通过Google Protobuf协议来实现数据交换的,所以需要通过Protobuf来定义接口。

如下所示:

67f79f94a008a7f5c90f2ed6d0f80a22322527d3


可以看到这里定义7个聚合服务RPC,名字分别叫做GetMax、GetMin、GetSum等,本文通过GetSum进行举例,其他的聚合RPC也是类似的内部实现。RPC有一个入口参数,用消息AggregateRequest表示;RPC的返回值用消息AggregateResponse表示。Service是一个抽象概念,RPC的Server端可以看作一个用来提供服务的Service。在HBase Coprocessor中Service就是Server端需要提供的Endpoint Coprocessor服务,主要用来给HBase的Client提供服务。AggregateService.java是由Protobuf软件通过终端命令“protoc filename.proto--java_out=OUT_DIR”自动生成的,其作用是将.proto文件定义的消息结构以及服务转换成对应接口的RPC实现,其中包括如何构建request消息和response响应以及消息包含的内容的处理方式,并且将AggregateService包装成一个抽象类,具体的服务以类的方法的形式提供。AggregateService.java定义Client端与Server端通信的协议,代码中包含请求信息结构AggregateRequest、响应信息结构AggregateResponse、提供的服务种类AggregateService,其中AggregateRequest中的interpreter_class_name指的是column interpreter的类名,此类的作用在于将数据格式从存储类型解析成所需类型。AggregateService.java由于代码太长,在这里就不贴出来了。

下面我们来讲一下服务端的架构,

首先,Endpoint Coprocessor是一个Protobuf Service的实现,因此需要它必须继承某个ProtobufService。我们在前面已经通过proto文件定义Service,命名为AggregateService,因此Server端代码需要重载该类,其次作为HBase的协处理器,Endpoint 还必须实现HBase定义的协处理器协议,用Java的接口来定义。具体来说就是CoprocessorService和Coprocessor,这些HBase接口负责将协处理器和HBase 的RegionServer等实例联系起来以便协同工作。Coprocessor接口定义两个接口函数:start和stop。

加载Coprocessor之后Region打开的时候被RegionServer自动加载,并会调用器start 接口完成初始化工作。一般情况该接口函数仅仅需要将协处理器的运行上下文环境变量CoprocessorEnvironment保存到本地即可。

CoprocessorEnvironment保存协处理器的运行环境,每个协处理器都是在一个RegionServer进程内运行并隶属于某个Region。通过该变量获取Region的实例等 HBase运行时环境对象。

Coprocessor接口还定义stop()接口函数,该函数在Region被关闭时调用,用来进行协处理器的清理工作。本文里我们没有进行任何清理工作,因此该函数什么也不干。

我们的协处理器还需要实现CoprocessorService接口。该接口仅仅定义一个接口函数 getService()。我们仅需要将本实例返回即可。HBase的Region Server在接收到客户端的调用请求时,将调用该接口获取实现RPCService的实例,因此本函数一般情况下就是返回自身实例即可。

完成以上三个接口函数之后,Endpoint的框架代码就已完成。每个Endpoint协处理器都必须实现这些框架代码而且写法雷同。


a1a8f66fa598582fc7653b4188b0563e0d49b5e5


Server端的代码就是一个Protobuf RPC的Service实现,即通过Protobuf提供的某种服务。其开发内容主要包括:

  • 实现Coprocessor的基本框架代码

  • 实现服务的RPC具体代码


Endpoint 协处理的基本框架

Endpoint 是一个Server端Service的具体实现,其实现有一些框架代码,这些框架代码与具体的业务需求逻辑无关。仅仅是为了和HBase运行时环境协同工作而必须遵循和完成的一些粘合代码。因此多数情况下仅仅需要从一个例子程序拷贝过来并进行命名修改即可。不过我们还是完整地对这些粘合代码进行粗略的讲解以便更好地理解代码。


412b21600c67e58fefe0d56f94e292c1882dcf41


Endpoint协处理器真正的业务代码都在每一个RPC函数的具体实现中。

在本文中,我们的Endpoint协处理器仅提供一个RPC函数即getSUM。我将分别介绍编写该函数的几个主要工作:了解函数的定义,参数列表;处理入口参数;实现业务逻辑;设置返回参数。


bed4bc7c6e6393c3b7466e77d0afd06507d703fe

f06576600f94bb6fef0cc98027063e3b9287e45d


Endpoint类比于数据库的存储过程,其触发服务端的基于Region的同步运行再将各个结果在客户端搜集后归并计算。特点类似于传统的MapReduce框架,服务端Map客户端Reduce。


3.Endpoint客户端实现

HBase提供客户端Java包org.apache.hadoop.hbase.client.HTable,提供以下三种方法来调用协处理器提供的服务:

  • coprocessorService(byte[])

  • coprocessorService(Class, byte[], byte[],Batch.Call),

  • coprocessorService(Class, byte[], byte[],Batch.Call, Batch.Callback)


f347b544f946ce72931c6018985da7a77b10e892

该方法采用rowkey指定Region。这是因为HBase客户端很少会直接操作Region,一般不需要知道Region的名字;况且在HBase中Region名会随时改变,所以用rowkey来指定Region是最合理的方式。使用rowkey可以指定唯一的一个Region,如果给定的Rowkey并不存在,只要在某个Region的rowkey范围内依然用来指定该Region。比如Region 1处理[row1, row100]这个区间内的数据,则rowkey=row1就由Region 1来负责处理,换句话说我们可以用row1来指定Region 1,无论rowkey等于”row1”的记录是否存在。CoprocessorService方法返回类型为CoprocessorRpcChannel的对象,该 RPC通道连接到由rowkey指定的Region上面,通过此通道可以调用该Region上面部署的协处理器RPC。

ad408a7e1a48f75d00f8cbc8212472d2653efd36

有时候客户端需要调用多个 Region上的同一个协处理器,比如需要统计整个Table的sum,在这种情况下,需要所有的Region都参与进来,分别统计自身Region内部的sum并返回客户端,最终客户端将所有Region的返回结果汇总,就可以得到整张表的sum。

这意味着该客户端同时和多个Region进行批处理交互。一个可行的方法是,收集每个 Region的startkey,然后循环调用第一种coprocessorService方法:用每一个Region的startkey 作为入口参数,获得RPC通道创建stub对象,进而逐一调用每个Region上的协处理器RPC。这种做法需要写很多的代码,为此HBase提供两种更加简单的 coprocessorService方法来处理多个Region的协处理器调用。先来看第一种方法 coprocessorService(Class, byte[],byte[],Batch.Call),

该方法有 4 个入口参数。第一个参数是实现RPC的Service 类,即前文中的AggregateService类。通过它,HBase就可以找到相应的部署在Region上的协处理器,一个Region上可以部署多个协处理器,客户端必须通过指定Service类来区分究竟需要调用哪个协处理器提供的服务。

要调用哪些Region上的服务则由startkey和endkey来确定,通过 rowkey范围即可确定多个 Region。为此,coprocessorService方法的第二个和第三个参数分别是 startkey和endkey,凡是落在[startkey,endkey]区间内的Region都会参与本次调用。

第四个参数是接口类Batch.Call。它定义了如何调用协处理器,用户通过重载该接口的call()方法来实现客户端的逻辑。在call()方法内,可以调用RPC,并对返回值进行任意处理。即前文代码清单1中所做的事情。coprocessorService将负责对每个 Region调用这个call()方法。

coprocessorService方法的返回值是一个Map类型的集合。该集合的key是Region名字,value是Batch.Call.call方法的返回值。该集合可以看作是所有Region的协处理器 RPC 返回的结果集。客户端代码可以遍历该集合对所有的结果进行汇总处理。

这种coprocessorService方法的大体工作流程如下。首先它分析startkey和 endkey,找到该区间内的所有Region,假设存放在regionList中。然后,遍历regionList,为每一个Region调用Batch.Call,在该接口内,用户定义具体的RPC调用逻辑。最后coprocessorService将所有Batch.Call.call()的返回值加入结果集合并返回。


8db09529d1eb770ca420ff83ec89c6670a94d2f2


coprocessorService的第三种方法比第二个方法多了一个参数callback。coprocessorService第二个方法内部使用HBase自带的缺省callback,该缺省 callback将每个Region的返回结果都添加到一个Map类型的结果集中,并将该集合作为coprocessorService方法的返回值。

HBase 提供第三种coprocessorService方法允许用户定义callback行为,coprocessorService 会为每一个RPC返回结果调用该callback,用户可以在callback 中执行需要的逻辑,比如执行sum累加。用第二种方法的情况下,每个Region协处理器RPC的返回结果先放入一个列表,所有的 Region 都返回后,用户代码再从该列表中取出每一个结果进行累加;用第三种方法,直接在callback中进行累加,省掉了创建结果集合和遍历该集合的开销,效率会更高一些。

因此我们只需要额外定义一个callback即可,callback是一个Batch.Callback接口类,用户需要重载其update方法。


eb6eaaa95801368378cbd71d0e76e8f0a0e82935


4.Observer实现二级索引

Observer类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor是一些散布在HBase Server端代码的 hook钩子, 在固定的事件发生时被调用。比如:put操作之前有钩子函数prePut,该函数在pu 操作执 行前会被Region Server调用;在put操作之后则有postPut 钩子函数。


069fdb2b8ebeb12ca7eeade13339dae4bd5e0a21


RegionObserver工作原理

RegionObserver提供客户端的数据操纵事件钩子,Get、Put、Delete、Scan,使用此功能能够解决主表以及多个索引表之间数据一致性的问题


c33d3f2355d3fc3237be1a3895640234c44909ff


  1. 客户端发出put请求;

  2. 该请求被分派给合适的RegionServer和Region;

  3. coprocessorHost拦截该请求,然后在该表上登记的每个 RegionObserver 上调用prePut();

  4. 如果没有被preGet()拦截,该请求继续送到 region,然后进行处理;

  5. Region产生的结果再次被CoprocessorHost拦截,调用postGet();

  6. 假如没有postGet()拦截该响应,最终结果被返回给客户端;


ebac9203abe59529cbdd446e8af1b01428d329c0

如上图所示,HBase可以根据rowkey很快的检索到数据,但是如果根据column检索数据,首先要根据rowkey减小范围,再通过列过滤器去过滤出数据,如果使用二级索引,可以先查基于column的索引表,获取到rowkey后再快速的检索到数据。

c2f225fa9fc89a6cd97d8d5c5a188857d0346b92

如图所示首先继承BaseRegionObserver类,重写postPut,postDelete方法,在postPut方法体内中写Put索引表数据的代码,在postDelete方法里面写Delete索引表数据,这样可以保持数据的一致性。

在Scan表的时候首先判断是否先查索引表,如果不查索引直接scan主表,如果走索引表通过索引表获取主表的rowkey再去查主表。

使用Elastic Search建立二级索引也是一样。

我们在同一个主机集群上同时建立了HBase集群和Elastic Search集群,存储到HBase的数据必须实时地同步到Elastic Search。而恰好HBase和Elastic Search都没有更新的概念,我们的需求可以简化为两步:

当一个新的Put操作产生时,将Put数据转化为json,索引到ElasticSearch,并把RowKey作为新文档的ID;

当一个新的Delete操作产生时获取Delete数据的rowkey删除Elastic Search中对应的ID。



5.协处理的主要应用场景

Observer允许集群在正常的客户端操作过程中可以有不同的行为表现;

Endpoint允许扩展集群的能力,对客户端应用开放新的运算命令;

Observer类似于RDBMS的触发器,主要在服务端工作;

Endpoint类似于RDBMS的存储过程,主要在服务端工作;

Observer可以实现权限管理、优先级设置、监控、ddl控制、二级索引等功能;

Endpoint可以实现min、max、avg、sum、distinct、group by等功能。

例如HBase源码org.apache.hadoop.hbase.security.access.AccessController利用Observer实现对HBase进行了权限控制,有兴趣的读者可以看看相关代码。

d3f2f0da5b6761a64c7049db7719525a2c492a0c

大家工作学习遇到HBase技术问题,把问题发布到HBase技术社区论坛http://hbase.group,欢迎大家论坛上面提问留言讨论。想了解更多HBase技术关注HBase技术社区公众号(微信号:hbasegroup),非常欢迎大家积极投稿。

096973d69f34b1380151180fd0a8ff2cade5bced

烽火大数据平台研发负责人叶铿,云端浪子的博客,主要分享Hadoop、HBase、TensorFlow技术干货


15dc5058a1e178995d574b18e2e4604b10adb98f
HBase技术交流社区 - 阿里官方“HBase生态+Spark社区大群”点击加入:https://dwz.cn/Fvqv066s



【云栖快讯】云栖专辑 | 阿里开发者们的20个感悟,一通百通  详情请点击

网友评论