开发者社区> 问答> 正文

hbase批量查批量写报错org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException

org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException

at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.makeException(AsyncProcess.java:228)
at org.apache.hadoop.hbase.client.AsyncProcess$BatchErrors.access$1700(AsyncProcess.java:208)
at org.apache.hadoop.hbase.client.AsyncProcess.waitForAllPreviousOpsAndReset(AsyncProcess.java:1700)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.backgroundFlushCommits(BufferedMutatorImpl.java:208)
at org.apache.hadoop.hbase.client.BufferedMutatorImpl.flush(BufferedMutatorImpl.java:183)
at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:1449)
at org.apache.hadoop.hbase.client.HTable.put(HTable.java:1052)
at com.pdd.service.medoc.dao.hbase.dao.impl.BaseDAO.lambda$batchInsertV3$15(BaseDAO.java:353)
at com.pdd.service.medoc.dao.hbase.config.HbaseTemplateV2.execute(HbaseTemplateV2.java:27)
at com.pdd.service.medoc.dao.hbase.dao.impl.BaseDAO.batchInsertV3(BaseDAO.java:339)
at com.pdd.service.medoc.dao.hbase.dao.impl.GoodsExtensionDAOV3Impl.batchInsert(GoodsExtensionDAOV3Impl.java:83)
at com.pdd.service.medoc.dao.hbase.dao.impl.GoodsExtensionDAOV3Impl

$$ FastClassBySpringCGLIB $$

f025277.invoke()
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:136)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at com.pdd.service.medoc.dao.hbase.dao.impl.GoodsExtensionDAOV3Impl

$$ EnhancerBySpringCGLIB $$

499b33c1.batchInsert()
at com.pdd.service.medoc.build.sharding.ShardingGoodsHbaseLogic.processGoodsById(ShardingGoodsHbaseLogic.java:758)
at com.pdd.service.medoc.build.sharding.ShardingGoodsHbaseLogic.insertGoodsInfoIntoHbase(ShardingGoodsHbaseLogic.java:152)
at com.pdd.service.medoc.build.sync.SyncGoodsInfo.lambda$syncAll$0(SyncGoodsInfo.java:85)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如上代码是批量读或者批量写时的报错,
public HbaseTemplateV2 hbaseTemplateV2() {

    HbaseTemplateV2 hbaseTemplateV2 = new HbaseTemplateV2();

    HbaseTemplateConfig config = new HbaseTemplateConfig();
    Configuration hBaseConfiguration = HBaseConfiguration.create();
    hBaseConfiguration.set("hbase.zookeeper.quorum", config.getZookeeperQuorumV2());
    hBaseConfiguration.set("hbase.zookeeper.property.clientPort", config.getZookeeperPortV2());
    hBaseConfiguration.set("zookeeper.znode.parent", config.getZookeeperParentV2());

    hBaseConfiguration.setInt("hbase.client.retries.number",3);
    hBaseConfiguration.setInt("ipc.socket.timeout",3000);//socket建立链接的超时时间
    hBaseConfiguration.setInt("hbase.rpc.timeout",3000);//单次rpc请求超时时间
    hBaseConfiguration.setInt("hbase.client.operation.timeout",18000);//单次数据操作总时间 不包括scan
    hBaseConfiguration.setInt("hbase.client.scanner.timeout.period",3000);//scan操作单次rpc请求超时时间
    hbaseTemplateV2.setConf(hBaseConfiguration);

    return hbaseTemplateV2;
}

如上代码,是hbase初始化时的参数配置,
protected List batchGetV2(String tableName, List rowNames, String familyName) {

    return hbaseTemplateV2.execute(conn -> {
        Table table = conn.getTable(TableName.valueOf(tableName));
        Transaction t1 = Cat.newTransaction("transaction_hbase_batch_get", tableName);
        try {
            List<Get> getList = Lists.newArrayList();
            rowNames.forEach(rowName -> {
                Get get = new Get(Bytes.toBytes(rowName));
                get.addFamily(Bytes.toBytes(familyName));
                getList.add(get);
            });
            Result results = table.get(getList);
            List<T> fetchResult = Lists.newArrayList();
            for (Result result : results) {
                T t = convertResult2DTO(result);
                if(t!=null) {
                    fetchResult.add(t);
                }
            }
            return fetchResult;
        } catch (Exception e) {
            Cat.logError("#medoc-hbase# failed to batch get tablename." + tableName + ".rowNames." + JsonUtils.toJson(rowNames), e);
            log.error("#medoc-hbase# failed to batch get tablename." + tableName + ".rowNames." + JsonUtils.toJson(rowNames), e);
            t1.setStatus(e);
            return null;
        } finally {
            if (Objects.nonNull(table)) {
                table.close();
            }
            t1.complete();
            t1.setStatus(Transaction.SUCCESS);
        }
    });
}

如上代码是我批量读的代码,
具体背景是,任务启动,程序从DB里批量读取一定量的数据,然后再从hbase里读取,进行比较筛选后,再写入hbase,有时候批量读会报错,有时候批量写会报错,都是上面那个类型的错误

hbase客户端版本
"org.apache.hbase:hbase-client:$hbaseVersion"
hbaseVersion = '1.1.2.2.4.2.0-258'

调用方法所在类
org.apache.hadoop.hbase.client.Table

展开
收起
hbase小能手 2018-11-07 15:59:18 14004 0
2 条回答
写回答
取消 提交回答
  • 碰到同样问题,批量读数据量较大时,hbase直接宕机,后续所有请求全部处于等待中,然后超时,新进连接直接无法连接,请问楼主除了降低读写量还有没有其他办法提高负载?

    2021-06-09 09:59:26
    赞同 展开评论 打赏
  • 社区管理员

    后面降低了写数据的任务频率,同时降低数据写入量,运行平稳的时间稍微长了一点,但是看监控,只要GC一触发,就会继续报这个错误,我能理解成只要有GC 就会报这个错吗?如果是这样 怎么提高服务器在GC的同时负载能力不下降那么多呢

    2019-07-17 23:12:41
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
大数据时代的存储 ——HBase的实践与探索 立即下载
Hbase在滴滴出行的应用场景和最佳实践 立即下载
阿里云HBase主备双活 立即下载