HBase建模、使用以及优化

  1. 云栖社区>
  2. 博客列表>
  3. 正文

HBase建模、使用以及优化

友德 2018-07-11 09:32:07 浏览5190 评论0

摘要: HBase建表的基本准则 常见rowKey设计遇到的问题以及相应的解决方案 建模案例分析:电商中宝贝点击量建模示例 HBase客户端的使用 HBase优化 HBase连续分页问题查询的参考解决方案 分享的目的: 合理地使用HBase,发挥HBase本身所不具有的功能,提高HBase的执行效率 一、HBase建表的基本准则 1.

HBase建表的基本准则
常见rowKey设计遇到的问题以及相应的解决方案
建模案例分析:电商中宝贝点击量建模示例
HBase客户端的使用
HBase优化
HBase连续分页问题查询的参考解决方案

分享的目的:

合理地使用HBase,发挥HBase本身所不具有的功能,提高HBase的执行效率

一、HBase建表的基本准则

1. family数量的控制:

不能太多,建议不要超过2个,一般的情况 下,一张表,一个family最好, 可以避免在刷缓存的时候一个Region下多个Store的相互影响,减少不必要的IO操作;

2. rowKey的设计:

  1. rowKey写入不能连续,尽量散开,避免写入region热点的问题, 导致regionServer负载不均衡,不能充分发挥HBase集群高并发写入的性能,极端情况下会出现regionServer宕机下线;
  2. 在满足业务的情况下,rowKey的长度越短越好,rowkey的长度越短,越节约内存和硬盘的存储空间;
  3. 设置好rowKey的分割符,多个业务字段拼接,设置好分隔符,如#, $(方便前缀范围查询, 又没有热点问题)

3. 版本数量的控制:

业务上没有特别要求的情况 下,用一个版本,即最大版本和最小版本一样,均为1;

4. 失效时间的控制:

根据具体业务的需求,合理的设置好失效时间,从节约存储空间的情况考虑,当然是在满足业务的情况下存储的时间越短越好,永久保存的情况除外

二、常见rowKey设计遇到的问题以及相应的解决方案

1. 连续rowKey导致的region热点问题:

解决方案:

  1. rowKey整个逆序, 主要针对流水数据,前缀的范围查询变得不可用

               逆序前分别是: 20170227204355331和20170227204355339  (同一个region)
               逆序后分别是: 13355340272207102 和93355340272207102 (不同的region)
  2. rowKey的部分逆序, 主要针对一些特征的流水数据,而且还可以实现部分前缀的范围查询

               逆序前分别是: 20170227204355TNG和20170227204355TFF  (同一个region)
               逆序后分别是: GNT20170227204355 和FFT20170227204355 (不同的region)
  3. 对rowKey取MD5 Hash, 相领的值做md5 Hash之后,完全不同
  4. 直接使用UUID , 完全散列开

2. 相同rowKey的问题, 即业务 上没唯一字段:

  1. 加随机字符串后缀;
  2. 加时间戳后缀, 根据需要实现按时间递增或递减;
    注意:加前缀和后缀,最好设置好分隔符,方便业务上的处理,因为大部分用是的是26个大小写字母和数字,非常适合常见的分割符如 $, #

三、建模案例分析:电商中宝贝点击量建模示例

需求: 统计电商中某个宝贝最近一周的点击量
表的设计:

  1. 以分钟为单位的近实时统计表 rowKey=itemId + ‘#’+’yyyyMMddHHmm’
  2. 历史点击量汇总表 rowKey = itemId;
  3. 最近一周点击量近似汇总表 rowKey = itemId;

在业务上层每半个小时做定时更新
1.新的一周点击量= 最近的半个小时量-7天前的半小时量 + 原来一周的历史量
2.新的历史点击量 += 最近半个小时的点击量;

四、HBase客户端的使用

1 原生客户端Api

1.1连接

Configuration conf = HBaseConfiguraton.create(); 
conf 设置 HBase  zk 和其它参数; 
HTable table = new HTable(conf, “test.testTable”);

1.2 单个操作

Put put = new Put(Bytes.toBytes(“row1”));
put.add(Bytes.toBytes(“colfam1”), Bytes.toBytes(“qual1”), Bytes.toBytes(“val1”));
put.add(Bytes.toBytes(“colfam1”), Bytes.toBytes(“qual2”), Bytes.toBytes(“val1”));
table.put(put);

// delete操作
Delete delete = new Delete(toBytes(“row2”));
table.delete(delete);

// Get操作
Get get = new Get(toBytes(“row1”));
Result result = table.get(get);

1.3 批量操作

List<Put> puts = new ArrayList<Put>(2);
Put put2 = new Put(Bytes.toBytes(“row2”));
put2.add(Bytes.toBytes(“colfam1”), Bytes.toBytes(“qual1”), Bytes.toBytes(“val2”));
put2.add(Bytes.toBytes(“colfam1”), Bytes.toBytes(“qual2”), Bytes.toBytes(“val2”));
Put put3 = new Put(Bytes.toBytes(“row3”));
put3.add(Bytes.toBytes(“colfam1”), Bytes.toBytes(“qual1”), Bytes.toBytes(“val3”));
put3.add(Bytes.toBytes(“colfam1”), Bytes.toBytes(“qual2”), Bytes.toBytes(“val3”));
puts.add(put2);
puts.add(put3);
table.put(puts);

1.4 原子自增操作

Long result = table.incrementColumnValue(toBytes(“row1”), toBytes(“colfam1”),  

1.5 过滤器的使用

Bytes.toBytes(“qual3”), 3);
PageFilter pageFilter = new PageFilter(1);
Scan scan = new Scan();
scan.setFilter(pageFilter);
Long start = System.currentTimeMillis();
String rowKey = hbaseTemplate.find(tableName, scan, new MinRowKeyMapperExtractor());

注意: 不到万不得已,不要使用Filter, 有的Filter的查询 效率很低,最好是结合rowKey范围扫描进行查询

1.6 HBase 管理接口API

管理API, 查找一个HBase集群所有表

HBaseAdmin admin = new HBaseAdmin(configuration);
HTableDescriptor[] htds = admin.listTables();

2 Spring集成HBase客户端

1. 添加maven依赖

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>0.98.6-hadoop2</version>
</dependency>
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-jpa</artifactId>
    <version>1.6.0.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-hadoop</artifactId>
    <version>2.0.2.RELEASE</version>
</dependency>

2.引入Spring HBase工具类

HbaseConfigurationSpringFactoryBean (二次开发的封装类)

/**
 * hbase工厂类
 * @author tiandesheng
 *
 */
public class HbaseConfigurationSpringFactoryBean implements InitializingBean, FactoryBean<Configuration> {
 
    private static Logger logger = LoggerFactory.getLogger(HbaseConfigurationSpringFactoryBean.class);
    
    private Configuration configuration;
    
    private Properties properties;
    
    public Configuration getObject() throws Exception {
        return configuration;
    }

    public Class<Configuration> getObjectType() {
        return Configuration.class;
    }

    public boolean isSingleton() {
        return true;
    }

    public void afterPropertiesSet() throws Exception {
        
        configuration = HBaseConfiguration.create();
        addProperties(configuration, properties);
        if (logger.isInfoEnabled()) {
            logger.info("Hbase连接初始化完毕!");
        }
    } 
    
    public void addProperties(Configuration configuration, Properties properties) {
        
        if (properties != null) {
            for (Entry<Object, Object> entry : properties.entrySet()) {
                String key = entry.getKey().toString();
                String value = entry.getValue().toString();
                configuration.set(key, value);
            }
        }
    }

    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }

    public Configuration getConfiguration() {
        return configuration;
    }

    public void setProperties(Properties properties) {
        this.properties = properties;
    }
}

HTableInterfacePoolFactory (二次开发的封装类)

/**
 * 基于线程池的HTable实现工厂
 */
@SuppressWarnings("deprecation")
public class HTableInterfacePoolFactory implements HTableInterfaceFactory, DisposableBean, InitializingBean {

    private static final Logger        logger   = LoggerFactory.getLogger(HTableInterfacePoolFactory.class);

    private int                        poolSize = 50;
    private HTablePool                 pool     = null;
    private Configuration              configuration;
    private Map<String, AtomicInteger> initLock;
    
    public void releaseHTableInterface(HTableInterface table) {
        close(table);
    }
    
    public HTableInterface createHTableInterface(Configuration config, byte[] tableName) {
        
        if (tableName == null) {
            return null;
        }
        if (initLock != null) {
            AtomicInteger tlock = initLock.get(new String(tableName).trim());
            if (tlock != null && tlock.get() == 0) { 
                if (logger.isInfoEnabled()) {
                    logger.info("get Htable:{} connection lock", new String(tableName));
                }
                tlock.getAndAdd(1);
                return pool.getTable(tableName);
            }
        }
        return pool.getTable(tableName);
    }

    public void afterPropertiesSet() throws Exception {
        // 初始化HTablePool
        if (pool == null) {
            pool = new HTablePool(configuration, poolSize);
            if (logger.isInfoEnabled()) {
                logger.info("hbase 连接池创建并初始化完毕!");
            }
        }
    }
    
    private void close(HTableInterface hTableInterface) {
        if(hTableInterface != null) {
            try {
                hTableInterface.close();
            } catch(Throwable t) {
                logger.error("close异常 {},", t);
            }
        }
    }

    public void destroy() throws Exception {
        
        initLock.clear();
        if (logger.isInfoEnabled()) {
            logger.info("Hbase连接池已经关闭!");
        }
    }

    public void setPoolSize(int poolSize) {
        this.poolSize = poolSize;
    }

    public void setConfiguration(Configuration configuration) {
        this.configuration = configuration;
    }
}

HbaseTemplate (核心类)

     Spring的模板设置模式, 用法类型于JdbcTemplate, JmsTemplate, TransactionTemplate)
 通过以Spring 配置文件方式和纯代码的方式都可以实现Spring集成HBase客户端,不管以那种

试,都是要首先设置HBase连接的几个参数,其中zk和zk的端口这两个参数一定要包含,然后依次
初始化上面三个类的对象,最终得到的HbaseTemplate对象就是我们要直接要对HBase进行操作对
象, spring配置文件 和纯代码实现方式如下:

1. spring配置文件的方式:

<bean id="hBaseConfiguration" class="com.xxx.hbase.client.HbaseConfigurationSpringFactoryBean>
      <property name="properties">
          <props>
               <prop key="hbase.zookeeper.quorum", value="${hbase.zookeeper.quorum}" />
               <prop key="hbase.zookeeper.property.clientPort" ,
                               value="${hbase.zookeeper.property.clientPort}" />
               <prop key="hbase.master.port",value="${hbase.master.port}" />
          </props>
     </property>
</bean>

<bean id="hTableInterfacePoolFactory" class="com.xxx.hbase.client.HTableInterfacePoolFactory" >
       <property name="configuration" ref="hBaseConfiguration" />
       <property name="poolSize" value="${poolSize}" />
</bean>

<bean id="hbaseTemplate" class="org.springframework.data.hadoop.hbase.HbaseTemplate"> 
       <property name="encoding" value="utf-8" />
       <property name="configuration" ref="hBaseConfiguration" />
       <property name="tableFactory" ref="hTableInterfacePoolFactory" />
</bean>

2. 纯代码的方式:

public HbaseTemplate getHbaseTemplate(String hbaseClusterName) {
    HbaseTemplate hbaseTemplate = hbaseTemplates.get(hbaseClusterName);
    if (hbaseTemplate == null ) {
          DcHbaseCluster dcHbaseCluster = 
                dcHbaseClusterDao.selectByHbaseClusterName(hbaseClusterName);
         if (dcHbaseCluster == null) {
                throw new RuntimeException("getHbaseTemplate(String hbaseClusterName1) 出错
                                                                hbaseClusterName=" + hbaseClusterName);
         }
        String hbaseZookeeperQuorum = dcHbaseCluster.getHbaseZookeeperQuorum();
        String hbaseZookeeperPropertyClientPort = 
                          dcHbaseCluster.getHbaseZookeeperPropertyClientPort();
        String hbaseMasterPort = dcHbaseCluster.getHbaseMasterPort();
        Integer poolSize = dcHbaseCluster.getPoolSize();
        this.addHbaseTemplate(hbaseClusterName, hbaseZookeeperQuorum,
                              hbaseZookeeperPropertyClientPort, hbaseMasterPort, poolSize);
        hbaseTemplate = hbaseTemplates.get(hbaseClusterName);
        if (hbaseTemplate == null) {
              throw new RuntimeException("getHbaseTemplate(String hbaseClusterName2) 出错
                                                             hbaseClusterName=" + hbaseClusterName);
    }
    }
   return hbaseTemplate;
}


public boolean addHbaseTemplate(String hbaseClusterName,String hbaseZookeeperQuorum,
                                                        String hbaseZookeeperPropertyClientPort,
                                                        String hbaseMasterPort,
                                                        Integer poolSize) {
     HbaseConfigurationSpringFactoryBean hbaseConfigurationSpringFactoryBean =
                                                                           new HbaseConfigurationSpringFactoryBean();
     Properties properties = new Properties();
     //设置zk地址
     properties.put(“hbase.zookeeper.quorum”, hbaseZookeeperQuorum); 
     // 设置zk 端口
     properties.put(“hbase.zookeeper.property.clientPort”, hbaseZookeeperPropertyClientPort);
        // 设置Hmaster端口 
    properties.put(“hbase.master.port”, hbaseMasterPort);   
    hbaseConfigurationSpringFactoryBean.setProperties(properties);
    try {
                 // 初始化
                hbaseConfigurationSpringFactoryBean.afterPropertiesSet(); 
                hbaseConfigurationSpringFactoryBean
     } catch (Exception e) {
              e.printStackTrace();
     throw new RuntimeException("hbaseConfigurationSpringFactoryBean.afterPropertiesSet()出错");
    }
    Configuration configuration = null;
    try {
                 // 得到连接Configuration对象
                 configuration = (Configuration)hbaseConfigurationSpringFactoryBean.getObject();  
    } catch (Exception e) {
                e.printStackTrace();
               throw new RuntimeException("hbaseConfigurationSpringFactoryBean.getObject()出错!");
    }
    HTableInterfacePoolFactory hTableInterfacePoolFactory = new HTableInterfacePoolFactory();
    hTableInterfacePoolFactory.setConfiguration(configuration);
    hTableInterfacePoolFactory.setPoolSize(poolSize);
    try {
          hTableInterfacePoolFactory.afterPropertiesSet();   // 初始化hTableInterfacePoolFactory对象
    } catch (Exception e) {
          e.printStackTrace();
          throw new RuntimeException("hTableInterfacePoolFactory.afterPropertiesSet()出错!");
    }
    HbaseTemplate hbaseTemplate = new HbaseTemplate();
    hbaseTemplate.setEncoding("UTF-8");
    hbaseTemplate.setConfiguration(configuration);
    hbaseTemplate.setTableFactory(hTableInterfacePoolFactory);
    try {
          hbaseTemplate.afterPropertiesSet();   // 初始化hbaseTemplate 对象
         } catch (Exception e) {
         e.printStackTrace();
          throw new RuntimeException("hbaseTemplate.afterPropertiesSet()出错!");
    }
    configurations.put(hbaseClusterName, configuration);
    hbaseTemplates.put(hbaseClusterName, hbaseTemplate);
    return true;
}

单个Get读数据操作代码示例

public Map<String, String> find(HbaseTemplate hbaseTemplate, 
                            String tableName, 
                    final String rowKey, 
                            final List<String> columns) {
    
       if (logger.isInfoEnabled()) {
              logger.info("find method tableName={}, rowKey={}", tableName, rowKey);
    }
    Result result = hbaseTemplate.execute(tableName, new TableCallback<Result>() {

                 public Result doInTable(HTableInterface table) throws Throwable {
        // 1.产生Get对象
        Get get = getObjectGenerator.generateGet(rowKey, columns);
        // 2.单个查询hbase
        Long start = System.currentTimeMillis();
        Result result = table.get(get);
        if (logger.isInfoEnabled()) {
               logger.info("find method 消耗时间:{}ms", System.currentTimeMillis() - start);
        }
        return result;
                }
    });
    if (result == null) {
               return null;
    }
    Map<String, String> res = mapFrom(result);
    return res;
}

批量Get读数据操作代码示例

public List<Map<String, String>> batchFind(HbaseTemplate hbaseTemplate, 
                                                       String tableName, 
                                                       final List<String> columns, 
                                                       final List<String> rowKeys) {
     if (logger.isInfoEnabled()) {
      logger.info("batchFind method tableName={}, rowKey={}", tableName, rowKeys);
     }
     Object object = hbaseTemplate.execute(tableName, new TableCallback<Object>() {
          public Object doInTable(HTableInterface table) throws Throwable {
            List<Get> listGets = new ArrayList<Get>(10);
            for (String rowKey : rowKeys) {
                     // 1.产生Get对象
                     Get get = getObjectGenerator.generateGet(rowKey, columns);
                      listGets.add(get);
             }
             // 2.批量查询hbase
            Long start = System.currentTimeMillis();
            Result[] results = table.get(listGets);
            if (logger.isInfoEnabled()) {
             logger.info("batchFind method 消耗时间:{}ms", System.currentTimeMillis() - start);
            }
           return results;
       }
       });
      if (object == null) {
              return null;
      }
      Result[] results = (Result[])object;
      List<Map<String, String>> res = new ArrayList<Map<String, String>>(10);
      for (Result result : results) {
            Map<String, String> map = mapFrom(result);
            if (map != null) {
                 res.add(map);
             }
       }
     return res;
}

Spring集成HBase的本质

从上面代码,可以 看出,Spring集成HBase客户端的本质还是对原始的Spring Client进行封装, 从HBase Template的execute方法主要有三个重要的步骤:

  1. 根据表名,获得或创建HBase表连接实例 HTableInterface;
  2. 执行业务 上实现的doInTable回调方法, 即执行真正读写HBase操作;
  3. 把HBase连接实例放回到表池中;

五、HBase优化

1.JVM参数优化:

  1. –Xmn=12G –Xms=24G  -Xmx=24G  根据实际机器情况调整,一般为整个机器内存的一半,同时建议regionServer的堆内存建议不要超过32G ;
  2. -XX:PermSize=512M -XX:MaxPermSize=512M;
  3. -XX:+UseConcMarkSweepGC -XX:+UseParNewGC (建议使用CMS垃圾收集器, 其它JVM参数使用默认比较好)

2.HBase客户端使用优化:

  1. 用表池或创建多个客户端,适当提高HBase客户端写入的并发性;
  2. 尽可能批量写和批量读,减少RPC交互的次数;
  3. 不要读取不必要的列数据, 限定scan的数据范围;
  4. 读取到客户端的结果后,一定要关闭结果集, 即关闭Results和ReultsScanner;
  5. 根据HBase CPU的利用率,合时设置压缩算法,前提是要服务需要安装压缩算法包;
    6.关闭AutoFlush , 设置setAutoToFlush(false) 不建议,有丢数据的风险;

7.关闭WAL Flag,设置setWriteToWAL(false), 不建议,有丢数据的风险;

3. 建表时优化:

1.根据不同的业务,合理地设置好HBase表命名空间;

  1. 创建预分区, 减少region分裂的次数,并且写入的负载也比较 好;
  2. 动态添加分区, 在HBase后面新的稳定版本中会有这个功能,华为、阿里、facebook公司内部二次开发的HBase已经添加了这个功能;

4. 运维时优化:

  1. 业务低峰期时,手动或定进均衡;
  2. 业务 高峰期时,关闭自动负载(不建议使用);

5 .配置参数优化:

  1. 设置StoreFile的大小: 根据业务场景适当增大hbase.hregion.max.filesize减少写入过程中split的次数,同时也减少了Region的数量,新版本默认是10G,老版本默认是512M,新版本建议用默认值;
  2. 设置memstore固定缓冲区块的大小:hbase.hregion.memstore.mslab.chunksize, 默认是2M, 最好是根据一次写入数据大小进行评估,建议用默认值;
  3. 减少Zookeeper超时的发生: zookeeper.session.timeout, 默认是3分钟, 可以修改为一分钟或半分钟, 加快HMaster发现故障的regionServer;
  4. 增加处理线程: hbase.regionserver.handler.cout, 默认值为10, 如果批量写,这个值可以设置小些,如果是单 个读写,这个值可以适当设置大些;
  5. 启用数据压缩: 推荐使用Snappy或者LZO压缩, 前提是需要安装这个压缩算法的jar包,然后再进行配置,重启;
  6. 适当增加块缓存的大小: perf.hfile.block.cache.size 默认为0.2,这个需要查看内存刷写到磁盘的频率,如果不是很频繁,可以适当增加这个值的设置,建议0.2 ~ 0.3之间;
  7. 调整memStore限制: hbase.regionsever.global.memstore.upperLimit 默认为0.4 hbase.regionsever.global.memstore.lowerLimit 默认为0.35, 建议把这两个值设置近些或相等;
  8. 增加阻塞时存储 文件数目: hbase.hstore.blockingStoreFiles 默认值 为7,当一个region的StoreFile的个数超过值的时候,更新就会阻塞, 在高并写的情况 下,设置 为10左右比较 为合理;
  9. 增加阻塞倍率 : hbase.region.memstore.block.multiplier 默认值是2, 当memstore达到multiplier 乘以flush的大小时,写入就会阻塞, 对于写压力比较大,可以增加这个值,一般 为设置为2-4;
  10. 减少最大日志 文件 限制: hbase.regionserver.maxlogs 默认是32, 对于写压力比较大的情况 ,可以减少这个值的设置, 加快后台异步线程的定时清理工作;

六、HBase连续分页问题查询的参考解决方案

在实际的应用中,有很多少分页查询显示的功能,但HBase中分页过滤器在跨region的时候,会出现各种无法预测的问题,导致读取的数据丢失,为了解决这个问题,需要在业务端进行多次连续查询 ,在基于region根据rowKey连续递增的情况下,给出如下的解决方案:

10.jpg

源代码如实现如下:

public Map<String, Map<String, String>> prefixScanOfPageFilter(HbaseTemplate hbaseTemplate, 
                                                                         String tableName,
                                                                          List<String> columns, 
                                                                          String prefix, 
                                                                          String startRow, 
                                                                          int pageSize, 
                                                                          final String isProcessedNostringColumn, 
                                                                          final Set<String> tableColumnSet) {
        
        if (logger.isInfoEnabled()) {
            logger.info("prefixScanOfPageFilter method tableName={}", tableName);
        }
        
        // 每次都要重新查询
        TreeSet<HbaseRegionInfo> hbaseRegionInfos = new TreeSet<HbaseRegionInfo>(new Comparator<HbaseRegionInfo>() {
            @Override
            public int compare(HbaseRegionInfo o1, HbaseRegionInfo o2) {
                
                if (o1 == null && o2 == null) {
                    return 0;
                } else if (o1 != null && o2 == null) {
                    return 1;
                } else if (o1 == null && o2 != null) {
                    return -1;
                }
                return Bytes.compareTo(o1.getStartRow(), o2.getStartRow());
            }
        });
        
        try {
            HBaseAdmin admin = new HBaseAdmin(hbaseTemplate.getConfiguration());
            List<HRegionInfo> regionInfos = admin.getTableRegions(tableName.getBytes());
            if (regionInfos == null) {
                return null;
            }
            
            for (HRegionInfo hRegionInfo : regionInfos) {
            HbaseRegionInfo hbaseRegionInfo = new HbaseRegionInfo(hRegionInfo.getStartKey(), hRegionInfo.getEndKey() );
                hbaseRegionInfos.add(hbaseRegionInfo);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        StringBuffer buffer = new StringBuffer(2);
        String endRow = buffer.append((char)((int)prefix.charAt(0) + 1)).toString();
        
        // 确定region区间范围
        TreeSet<HbaseRegionInfo> regionInfoRanges = decideRegionRange(hbaseRegionInfos, startRow, endRow);

        // 返回结果
        Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>(pageSize+1);

        Long start = System.currentTimeMillis();

        byte[] byteBegin = toBytes(startRow);
        byte[] byteEnd = toBytes(endRow);

        HbaseRegionInfo first = regionInfoRanges.first();
        
        while (result.size() < pageSize + 1 ) {
            
            // 分页过滤器
            PageFilter pageFilter = new PageFilter(pageSize + 1 - result.size());
            // 当前查询的结果
            Map<String, Map<String, String>> currentResult = null;
            
            byte[] firstEndRow = first.getEndRow();
            
            // 考虑为空的情况  1 >> 结束
            if (firstEndRow == null || firstEndRow.length == 0) {
                Scan scan = new Scan( byteBegin, byteEnd  );
                scan = getObjectGenerator.generateScan(scan, columns);
                scan.setFilter(pageFilter);
                currentResult = hbaseTemplate.find(tableName, scan, 
                      new MapperAndRowKeyDetailExtractor(isProcessedNostringColumn, tableColumnSet));
                result.putAll(currentResult);
                if (logger.isInfoEnabled()) {
                    logger.info("findByPageFilter2 method 消耗时间:{}ms", System.currentTimeMillis() - start);
                }
                return result;
            }
            
            // 比较确定区间
            int cmp = Bytes.compareTo( byteEnd, firstEndRow );
            
            if ( cmp < 0 ) { 
                // 在region范围之内, 1 >> 结束
                Scan scan = new Scan( byteBegin, byteEnd );
                scan = getObjectGenerator.generateScan(scan, columns);
                scan.setFilter(pageFilter);
                currentResult = hbaseTemplate.find(tableName, scan, 
                      new MapperAndRowKeyDetailExtractor(isProcessedNostringColumn, tableColumnSet));
                result.putAll(currentResult);
                if (logger.isInfoEnabled()) {
                    logger.info("findByPageFilter2 method 消耗时间:{}ms", System.currentTimeMillis() - start);
                }
                return result;
            } else if ( cmp == 0 ) { // 还有特殊处理(暂时不要和1的情况合并)
                // 刚好在region范围之内, 2 >> 结束
                Scan scan = new Scan( byteBegin, byteEnd );
                scan = getObjectGenerator.generateScan(scan, columns);
                scan.setFilter(pageFilter);
                currentResult = hbaseTemplate.find(tableName, scan, 
                     new MapperAndRowKeyDetailExtractor(isProcessedNostringColumn, tableColumnSet));
                result.putAll(currentResult);
                if (logger.isInfoEnabled()) {
                    logger.info("findByPageFilter2 method 消耗时间:{}ms", System.currentTimeMillis() - start);
                }
                return result;
            } else {
                // 在范围之外(跨region), 需要再次查询
                Scan scan = new Scan( byteBegin, firstEndRow );
                scan = getObjectGenerator.generateScan(scan, columns);
                scan.setFilter(pageFilter);
                currentResult = hbaseTemplate.find(tableName, scan, 
                     new MapperAndRowKeyDetailExtractor(isProcessedNostringColumn, tableColumnSet));
                result.putAll(currentResult);
                if (result.size() < pageSize + 1) {
                    if ( first.getStartRow() == null || first.getStartRow().length == 0 ) {
                        HbaseRegionInfo tmp = new HbaseRegionInfo(first.getEndRow(), first.getEndRow());
                        first = regionInfoRanges.ceiling(tmp);
                    } else {
                        first = regionInfoRanges.higher(first);
                    }
                    if (first == null) {
                        return result;
                    }
                    byteBegin = first.getStartRow();
                } else {
                    // 数量满足要求  4 >> 结束
                    if (logger.isInfoEnabled()) {
                        logger.info("findByPageFilter2 method 消耗时间:{}ms", System.currentTimeMillis() - start);
                    }
                    return result;
                }
            }
        }
    return null;
    }
/**
     * 确定region的范围
     * @param sets
     * @param startRow
     * @param endRow
     * @return
     */
private TreeSet<HbaseRegionInfo> decideRegionRange(TreeSet<HbaseRegionInfo> sets, String startRow, String endRow) {
        
        HbaseRegionInfo first = new HbaseRegionInfo(toBytes(startRow), toBytes(startRow));
        HbaseRegionInfo second = new HbaseRegionInfo(toBytes(endRow), toBytes(endRow));
        
        HbaseRegionInfo firstLower = sets.lower(first);
        HbaseRegionInfo firstHigher = sets.ceiling(first);
        HbaseRegionInfo secondLower = sets.lower(second);
        HbaseRegionInfo secondHigher = sets.ceiling(second);
        
        HbaseRegionInfo firstRegion = null;
        HbaseRegionInfo secondRegion = null;
        
        if ( (firstLower != null  &&  (firstLower.getEndRow() == null || firstLower.getEndRow().length == 0) ) &&  
                      firstHigher == null) {
            TreeSet<HbaseRegionInfo> result = new TreeSet<HbaseRegionInfo>(new Comparator<HbaseRegionInfo>() {
                @Override
                public int compare(HbaseRegionInfo o1, HbaseRegionInfo o2) {
                    
                    if (o1 == null && o2 == null) {
                        return 0;
                    } else if (o1 != null && o2 == null) {
                        return 1;
                    } else if (o1 == null && o2 != null) {
                        return -1;
                    }
                    return Bytes.compareTo(o1.getStartRow(), o2.getStartRow());
                }
            });
            result.add(firstLower);
            return result;
        }
        
        // 连续相连的情况
//        if (firstLower.getEndRow().equals(firstHigher.getStartRow())) {
            if (startRow.equals(firstHigher.getStartRow())) {
                // 在region边缘,即第一个值
                firstRegion = firstHigher;
            } else {
                // 在region区间
                firstRegion = firstLower;
            }
//        } 
        
        if ((secondLower != null && (secondLower.getEndRow() == null || secondLower.getEndRow().length == 0) ) &&  
                     secondHigher == null) {
            secondRegion = secondLower;
        }
        
//        if (secondLower.getEndRow().equals(secondHigher.getStartRow())) {
            if (endRow.equals(secondHigher.getStartRow())) {
                secondRegion = secondHigher;
            } else {
                secondRegion = secondLower;
            }
//        }
        
        return (TreeSet<HbaseRegionInfo>)sets.subSet(firstRegion, true, secondRegion, true);
    }

因为region一般都比较大,在实践中,我们发现绝大数查询,是不存在跨页查询,跨页查询的情况下,大部分也是跨1 个页,跨2个以上页的情况非常罕见,因此查询效率没有感觉到明显变慢

【云栖快讯】阿里云栖开发者沙龙(Java技术专场)火热来袭!快来报名参与吧!  详情请点击

网友评论

友德
文章10篇 | 关注16
关注
云数据库 HBase 版(ApsaraDB for HBase)是基于 Hadoop 的一个... 查看详情
提供海量、安全和高可靠的云存储服务。RESTful API的平台无关性,容量和处理能力的弹性... 查看详情
快速、完全托管的TB/PB级数据仓库解决方案,向用户提供了完善的数据导入方案以及多种经典的分... 查看详情
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效... 查看详情
阿里云总监课正式启航

阿里云总监课正式启航