hbase协处理器Coprocessor(简介)

  1. 云栖社区>
  2. 博客>
  3. 正文

hbase协处理器Coprocessor(简介)

sunt_dota 2017-04-13 10:38:00 浏览950
展开阅读全文

一:介绍

把一部分计算也移动到数据的存放端;允许用户执行region级的操作;可以动态加载。

二:使用场景:

1、使用钩子来关联行修改操作来维护辅助索引,或维护一些数据间的引用完整性。

2.权限控制

三:coprocessor两大类:observer和endpoint介绍

3.1Observer

与触发器类似;

regionobserver处理数据修改事件,表region联系紧密;

MasterObserver集群级事件操作,管理或DDL类型操作;

WALObserver控制WAL。

3.2Endpoint

用户自定义操作添加到服务端添加一些远程过程调用动态拓展RPC协议;与RDBMS存储类似;

3.3Coprocessor

所有的协处理器都必须实现这个接口,定义了协处理器的基本约定。

两个被应用于框架的枚举类:priority与state。

3.3.1Coprocessor.priority

定义的优先级

SYSTEM 最先被执行的协处理器;

USER 其他协处理器,按顺序执行;

3.3.2Coprocessor类接口提供的方法

void start(CoprocessorEnviroment env)throws IOException;

void stop(CoprocessorEnviroment env)throws IOException;

3.3.3CoprocessorEnviroment

(协处理器实例一直保存在提供的环境中)

3.3.4Coprocessor.state

定义的状态;

uninstalled:协处理器最初的状态

installed:实例装载了他的环境参数

starting:开始工作

active:start方法调用,处于active状态

stopping:stop方法被调用之前stopped:

stop方法将控制权交给框架

3.3.5CoprocessorHost类

维护协处理实例和他们专用环境

Coprocessor,CoprocessorEnviroment,CoprocessorHost形成协处理器类基础。

四:协处理器的加载:

1.从配置文件中加载(hbase-site.xml)

2.表描述符中加载HTableDescriptor.setValue()

五:RegionObserver类

region级别操作发生,会触发钩子函数

分为两类操作:region生命周期变化;客户端api调用

5.1RegionCoprocessorEnviroment

RegionObserver类的协处理器环境的实例;实现了CoprocessorEnviroment接口;

5.2ObserverContext

特殊的上下文:提供访问当前系统环境入口;提供关键功能通知协处理器在毁掉函数完成时需要做什么;

5.3BaseRegionObserver

所有用户实现监听类型协处理器基类,可以重载自己感兴趣的方法实现自己功能

六:MasterObserver类

处理master服务器的所有回调函数

6.1MasterCoprocessorEnviroment

封装了一个masterobserver实例;实现了CoprocessorEnviroment接口;

6.2BaseMasterObserver

扩展此类,实现自己功能;选择对应的pre.post方法;

七:endpoint:

hbase0.98之前:实现一个endpoint两个步骤:

1.拓展CoprocessorProtocol接口

给客户端提供自定义的rpc协议;

定义了客户端服务端的通信协议

2.拓展BaseEndpointCoprocessor类

hbase 0.98以上版本对endpoint的版本作了修改,修改后的使用

地址:90.2. Endpoint Example

(1)创建通信协议

一个proto文件,使用protoc工具来生成协议类文件。这个文件需要在服务端及客户端存在。

proto文件:

option java_package = "org.myname.hbase.coprocessor.autogenerated";

option java_outer_classname = "Sum";

message SumRequest {

  required string family = 1;

}

message SumResponse {

  required int64 sum = 1 [default = 0];

}

service SumService {

  rpc getSum(SumRequest)

      returns (SumResponse);

}

生成协议类文件:

$ protoc --java_out=src ./sum.proto

(2)创建一个Service类,实现具体的业务逻辑

例如官网例子:

public class SumEndPoint extends Sum.SumService implements Coprocessor, CoprocessorService {

@Override

public Service getService() {

                return this;

}

@Override

public void start(CoprocessorEnvironment env) throws IOException {

               if (env instanceof RegionCoprocessorEnvironment) {

                    .................

}

@Override

public void stop(CoprocessorEnvironment env) throws IOException {

       // do nothing

}

@Override

public void getSum(RpcController controller, Sum.SumRequest request, RpcCallback done) {

            Scan scan = new Scan();

            .............

          try {

             ............

}

(3)创建表时指定使用这个EndPoint,或者是全局配置

(4)创建一个Client类,调用这个RPC方法

例如:

try{

          Map results = table.coprocessorService(        

                    Sum.SumService.class,

                     null, /* start key */ 

                      null,/* end  key */

                       newBatch.Call() {

                        @Override

                          publicLongcall(Sum.SumService aggregate)throwsIOException{

                                BlockingRpcCallback rpcCallback =newBlockingRpcCallback<>();

                                aggregate.getSum(null, request, rpcCallback);

                                Sum.SumResponse response = rpcCallback.get();

                                  returnresponse.hasSum() ? response.getSum() :0L; 

         }       

 }    );

八:未完待续.....................................

网友评论

登录后评论
0/500
评论
sunt_dota
+ 关注