ES度量聚合(ElasticSearch Metric Aggregations)

  1. 云栖社区>
  2. 中间件兴趣圈>
  3. 博客>
  4. 正文

ES度量聚合(ElasticSearch Metric Aggregations)

丁威 2019-09-02 20:33:05 浏览95
展开阅读全文

从本篇将开始进入ES系列的聚合部分(Aggregations)。

本篇重点介绍Elasticsearch Metric Aggregations(度量聚合)。

Metric聚合,主要针对数值类型的字段,类似于关系型数据库中的sum、avg、max、min等聚合类型。

本例基于如下索引进行试验:

public static void createMapping_agregations() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            CreateIndexRequest request = new CreateIndexRequest("aggregations_index02");
            XContentBuilder jsonBuilder = XContentFactory.jsonBuilder()
                                            .startObject()
                                                .startObject("properties")
                                                    .startObject("orderId")
                                                        .field("type", "integer")
                                                    .endObject()
                                                    .startObject("orderNo")
                                                        .field("type", "keyword")
                                                    .endObject()
                                                    .startObject("totalPrice")
                                                        .field("type", "double")
                                                    .endObject()
                                                    .startObject("sellerId")
                                                        .field("type", "integer")
                                                    .endObject()
                                                    .startObject("sellerName")
                                                        .field("type", "keyword")
                                                    .endObject()
                                                    .startObject("buyerId")
                                                        .field("type", "integer")
                                                    .endObject()
                                                    .startObject("buyerName")
                                                        .field("type", "keyword")
                                                    .endObject()
                                                    .startObject("createTime")
                                                        .field("type", "date")
                                                        .field("format", "yyyy-MM-dd HH:mm:ss")
                                                    .endObject()
                                                    .startObject("status")
                                                        .field("type", "integer")
                                                    .endObject()
                                                    .startObject("reciveAddressId")
                                                        .field("type", "integer")
                                                    .endObject()
                                                    .startObject("reciveName")
                                                        .field("type", "keyword")
                                                    .endObject()
                                                    .startObject("phone")
                                                        .field("type", "keyword")
                                                    .endObject()
                                                    .startObject("skuId")
                                                        .field("type", "integer")
                                                    .endObject()
                                                    .startObject("skuNo")
                                                        .field("type", "keyword")
                                                    .endObject()
                                                    .startObject("goodsId")
                                                        .field("type", "integer")
                                                    .endObject()
                                                    .startObject("goodsName")
                                                        .field("type", "keyword")
                                                    .endObject()
                                                    .startObject("num")
                                                        .field("type", "integer")
                                                    .endObject()
                                                .endObject()
                                            .endObject();
            request.mapping("_doc", jsonBuilder);
            System.out.println(client.indices().create(request, RequestOptions.DEFAULT));
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

对应的SQL表结构如下:

CREATE TABLE `es_order_tmp` (
  `orderId` int(11) NOT NULL DEFAULT '0' COMMENT '主键',
  `orderNo` varchar(30) DEFAULT NULL COMMENT '订单编号',
  `totalPrice` decimal(10,2) DEFAULT NULL COMMENT '订单总价,跟支付中心返回金额相等,包括了雅豆,余额,第三方支付的金额。运费包含在内,优惠券抵扣的金额不含在内',
  `sellerId` int(11) DEFAULT NULL COMMENT '商家ID',
  `selerName` varchar(50) DEFAULT NULL COMMENT '商家名称',
  `buyerId` int(11) DEFAULT NULL COMMENT '创建者,购买者',
  `buyerName` varchar(255) DEFAULT NULL COMMENT '业主姓名',
  `createTime` varchar(22) DEFAULT NULL,
  `status` int(11) DEFAULT NULL COMMENT '订单状态,0:待付款,1:待发货,2:待收货,3:待评价,4:订单完成,5:订单取消,6:退款处理中,7:拒绝退货,8:同意退货,9:退款成功,10:退款关闭,11:订单支付超时,12:半支付状态',
  `reciveAddressId` int(11) DEFAULT NULL COMMENT '收货地址ID',
  `reciveName` varchar(50) DEFAULT NULL,
  `phone` varchar(30) DEFAULT NULL COMMENT '联系号码',
  `skuId` int(11) DEFAULT NULL COMMENT '货品ID',
  `skuNo` varchar(100) DEFAULT NULL COMMENT 'SKU编号',
  `goodsId` int(11) DEFAULT NULL COMMENT '商品ID',
  `goodsName` varchar(100) DEFAULT NULL COMMENT '商品名称',
  `num` int(11) DEFAULT NULL COMMENT '数量'
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

avg 平均值

POST /exams/_search?size=0
{
    "aggs" : {
        "avg_grade" : { "avg" : { "field" : "grade" } }
    }
}

对字段grade取平均值。

对应的java示例如下:

public static void testMatchQuery() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            AggregationBuilder avg = AggregationBuilders.avg("avg-aggregation").field("num").missing(0);    // @1
            sourceBuilder.aggregation(avg);
            sourceBuilder.size(0);
            sourceBuilder.query(
                    QueryBuilders.termQuery("sellerId", 24)
            );
            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

其中代码@1:missing(0)表示如果文档中没有取平均值的字段时,则使用该值进行计算,本例中使用0参与计算。

其返回结果如下:

{
    "took":2,
    "timed_out":false,
    "_shards":{
        "total":5,
        "successful":5,
        "skipped":0,
        "failed":0
    },
    "hits":{
        "total":39,
        "max_score":0,
        "hits":[

        ]
    },
    "aggregations":{
        "avg#avg-aggregation":{
            "value":1.2820512820512822
        }
    }
}

Weighted Avg Aggregation 加权平均聚合

加权平均算法,∑(value * weight) / ∑(weight)。

加权平均(weghted_avg)支持的参数列表:

  • value
    提供值的字段或脚本的配置。例如定义计算哪个字段的平均值,该值支持如下子参数:
  • field
    用来定义平均值的字段名称。
  • missing
    用来定义如果匹配到的文档没有avg字段,使用该值来参与计算。
  • weight
    用来定义权重的对象,其可选属性如下:
  • field
    定义权重来源的字段。
  • missing
    如果文档缺失权重来源字段,以该值来代表该文档的权重值。
  • format
    数值类型格式化。
  • value_type
    用来指定value的类型,例如ValueType.DATE、ValueType.IP等。

示例如下:

POST /exams/_search
{
    "size": 0,
    "aggs" : {
        "weighted_grade": {
            "weighted_avg": {
                "value": {
                    "field": "grade"
                },
                "weight": {
                    "field": "weight"            // @2
                }
            }
        }
    }
}

从文档中抽取属性为weight的字段的值来当权重值。
其JAVA示例如下:

public static void test_weight_avg_aggregation() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            WeightedAvgAggregationBuilder avg = AggregationBuilders.weightedAvg("avg-aggregation")
                    
                                        .value( 
                                                (new MultiValuesSourceFieldConfig.Builder())
                                                   .setFieldName("num")
                                                   .setMissing(0)
                                                   .build()
                                              )
                                        .weight(
                                                (new MultiValuesSourceFieldConfig.Builder())
                                                   .setFieldName("num")
                                                   .setMissing(1)
                                                   .build()
                                               )
    //                                    .valueType(ValueType.LONG)
                                        
                                       ;
            
            avg.toString();
            
            sourceBuilder.aggregation(avg);
            sourceBuilder.size(0);
            sourceBuilder.query(
                    QueryBuilders.termQuery("sellerId", 24)
            );
            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

Cardinality Aggregation

基数聚合,先distinct,再聚合,类似关系型数据库(count(distinct))。

示例如下:

POST /sales/_search?size=0
{
    "aggs" : {
        "type_count" : {
            "cardinality" : {
                "field" : "type"
            }
        }
    }
}

对应的JAVA示例如下:

public static void test_Cardinality_Aggregation() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            AggregationBuilder aggregationBuild = AggregationBuilders.cardinality("buyerid_count").field("buyerId");
            sourceBuilder.aggregation(aggregationBuild);
            sourceBuilder.size(0);
            sourceBuilder.query(
                    QueryBuilders.termQuery("sellerId", 24)
            );
            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

返回结果如下:

{
    "took":30,
    "timed_out":false,
    "_shards":{
        "total":5,
        "successful":5,
        "skipped":0,
        "failed":0
    },
    "hits":{
        "total":39,
        "max_score":0,
        "hits":[

        ]
    },
    "aggregations":{
        "cardinality#type_count":{
            "value":11
        }
    }
}

上述实现与SQL:SELECT COUNT(DISTINCT buyerId) from es_order_tmp where sellerId=24; 效果类似,表示购买了商家id为24的买家个数。

其核心参数如下:

  • precision_threshold
    精确度控制。在此计数之下,期望计数接近准确。在这个值之上,计数可能会变得更加模糊(不准确)。支持的最大值是40000,超过此值的阈值与40000的阈值具有相同的效果。默认值是3000。

上述示例中返回的11是精确值,如果改写成下面的代码,结果将变的不准确:

field("buyerId").precisionThreshold(5)

其返回结果如下:

{
    "took":5,
    "timed_out":false,
    "_shards":{
        "total":5,
        "successful":5,
        "skipped":0,
        "failed":0
    },
    "hits":{
        "total":39,
        "max_score":0,
        "hits":[

        ]
    },
    "aggregations":{
        "cardinality#buyerid_count":{
            "value":9
        }
    }
}
  • Pre-computed hashes
    一个比较好的实践是需要对字符串类型的字段进行基数聚合的话,可以提前索引该字符串的hash值,通过对hash值的聚合,提高效率。
  • Missing Value
    missing参数定义了应该如何处理缺少值的文档。默认情况下,它们将被忽略,但也可以将它们视为具有一个值,通过missing value来设置。

Extended Stats Aggregation

stats聚合的扩展版本,示例如下:

GET /exams/_search
{
    "size": 0,
    "aggs" : {
        "grades_stats" : { "extended_stats" : { "field" : "grade" } }
    }
}

对应的JAVA示例如下:

public static void test_Extended_Stats_Aggregation() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            AggregationBuilder aggregationBuild = AggregationBuilders.extendedStats("extended_stats")
                                                     .field("num")
                                                  ;
            sourceBuilder.aggregation(aggregationBuild);
            sourceBuilder.size(0);
            sourceBuilder.query(
                    QueryBuilders.termQuery("sellerId", 24)
            );
            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

返回的结果如下:

{
    "took":13,
    "timed_out":false,
    "_shards":{
        "total":5,
        "successful":5,
        "skipped":0,
        "failed":0
    },
    "hits":{
        "total":39,
        "max_score":0,
        "hits":[

        ]
    },
    "aggregations":{
        "extended_stats#extended_stats":{
            "count":39,                   //   @1
            "min":1,                        //   @2
            "max":11,                     //    @3
            "avg":1.2820512820512822,    // @4
            "sum":50,                                 // @5
            "sum_of_squares":162,           // @6
            "variance":2.5101906640368177,    // @7
            "std_deviation":1.5843581236692725,  // @8
            "std_deviation_bounds":{                       // @9
                "upper":4.450767529389827,
                "lower":-1.886664965287263
            }
        }
    }
}

将所能支持的聚合类型都返回。
@1:返回符合条件的总条数。
@2:该属性在符合条件中的最小值。
@3:该属性在符合条件中的最大值。
@4:该属性在符合条件的文档中的平均值。
@5:该属性在符合条件的文档中的sum求和。
@6-9:暂未理解其含义。

同样支持missing属性。

max Aggregation

求最大值,与avg Aggregation聚合类似,不再重复介绍。

min Aggregation

求最小值,与avg Aggregation聚合类似,不再重复介绍。

Percentiles Aggregation

百分位计算,ES提供的另外一种近似度量方式。主要用于展现以具体百分比下观察到的数值,例如,第95个百分位上的数值,是高于 95% 的数据总和。百分位聚合通常用来找出异常,适用与使用统计学中正态分布来观察问题。
官方文档:https://www.elastic.co/guide/cn/elasticsearch/guide/current/percentiles.html

例如:

GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_outlier" : {
            "percentiles" : {
                "field" : "load_time" 
            }
        }
    }
}

load_time,在官方文档中的字段含义为字段加载时间,其返回值如下:

{
    ...

   "aggregations": {
      "load_time_outlier": {
         "values" : {
            "1.0": 5.0,
            "5.0": 25.0,
            "25.0": 165.0,
            "50.0": 445.0,
            "75.0": 725.0,
            "95.0": 945.0,
            "99.0": 985.0
         }
      }
   }
}

默认的百分比key为[ 1, 5, 25, 50, 75, 95, 99 ]。
按照官方的解读,可以这样理解上述返回结果:
"1.0": 5.0;表示(100-1)%的数据都大于5.0;也表示1%的数据小于5.0。
"5.0": 25.0 表示,95%的请求的加载时间大于等于25。
"99.0": 985.0 表示1%的请求的加载时间大于985.0。

  • percentile
    用来定义其百分比,例如percents:[10,50,95,99]
  • keyed
    默认情况下,keyed参数为true,其结果的返回格式如上:
"values" : {
            "1.0": 5.0,
            "5.0": 25.0,
            "25.0": 165.0,
            "50.0": 445.0,
            "75.0": 725.0,
            "95.0": 945.0,
            "99.0": 985.0
         }

如果设置keyed=false,则返回值的格式如下:

"aggregations": {
        "load_time_outlier": {
            "values": [
                {
                    "key": 1.0,
                    "value": 5.0
                },
                {
                    "key": 5.0,
                    "value": 25.0
                },
           ...
            ]
        }
    }
  • 百分位使用场景
    百分位通常使用近似统计。

计算百分位数有许多不同的算法。简单实现只是将所有值存储在一个排序数组中。要找到第50个百分位,只需找到my_array[count(my_array) * 0.5]处的值。

显然,这种简单的实现没有伸缩性——排序数组随数据集中值的数量线性增长。为了计算es集群中可能存在的数十亿个值的百分位数,兼顾性能的需求,故ES通常使用计算近似百分位数。近似百分位通常使用TDigest 算法。

在使用近似百分位时,通常需要考虑这些:

  1. 准确度与q(1-q)成正比。这意味着极端百分位数(如99%)比不那么极端的百分位数(如中位数)更准确
  2. 对于较小的值集,百分位数是非常准确的(如果数据足够小,可能是100%准确)。
  3. 当桶中值的数量增加时,算法开始近似百分位数。它有效地以准确性换取内存节省。准确的不准确程度很难一概而论,因为它取决于您的数据分布和聚合的数据量。
  • Compression
    近似算法必须平衡内存利用率和估计精度。这个平衡可以使用参数compression来控制。

TDigest算法使用许多“节点”来近似百分位数——可用节点越多,与数据量成比例的准确性(和大内存占用)就越高。压缩参数将节点的最大数量限制为20 * compression。
因此,通过增加压缩值,可以以增加内存为代价来提高百分位数的准确性。较大的压缩值也会使算法变慢,因为底层树数据结构的大小会增加,从而导致更昂贵的操作。默认压缩值是100。
一个“节点”使用大约32字节的内存,因此在最坏的情况下(大量数据按顺序到达),默认设置将产生大约64KB(32 20 100)大小的TDigest。实际上,数据往往更随机,TDigest使用的内存更少。

HDR Histogram(直方图)

HDR直方图(High Dynamic Range Histogram,高动态范围直方图)是一种替代实现,在计算延迟度量的百分位数时非常有用,因为它比t-digest实现更快,但需要更大的内存占用。此实现维护一个固定的最坏情况百分比错误(指定为有效数字的数量)。这意味着如果数据记录值从1微秒到1小时(3600000000毫秒)直方图设置为3位有效数字,它将维持一个价值1微秒的分辨率值1毫秒,3.6秒(或更好的)最大跟踪值(1小时)。

GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_outlier" : {
            "percentiles" : {
                "field" : "load_time",
                "percents" : [95, 99, 99.9],
                "hdr": { 
                  "number_of_significant_value_digits" : 3 
                }
            }
        }
    }
}
  1. hdr
    通过hdr属性指定直方图相关的参数。
  2. number_of_significant_value_digits
    指定以有效位数为单位的直方图值的分辨率。

注意:hdr直方图只支持正值,如果传递负值,则会出错。如果值的范围是未知的,那么使用HDRHistogram也不是一个好主意,因为这可能会导致内存的大量使用。

  • Missing value
    missing参数定义了应该如何处理缺少值的文档。默认情况下,它们将被忽略,但也可以将它们视为具有一个值。

Percentiles Aggregation示例(Java Demo):

public static void test_Percentiles_Aggregation() {
        RestHighLevelClient client = EsClient.getClient();
        try {
            SearchRequest searchRequest = new SearchRequest();
            searchRequest.indices("aggregations_index02");
            SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
            AggregationBuilder aggregationBuild = AggregationBuilders.percentiles("percentiles")
                                                     .field("load_time")
                                                     .percentiles(75,90,99.9)
                                                     .compression(100)
                                                     .method(PercentilesMethod.HDR)
                                                     .numberOfSignificantValueDigits(3)
                                                  ;
            sourceBuilder.aggregation(aggregationBuild);
            sourceBuilder.size(0);
            sourceBuilder.query(
                    QueryBuilders.termQuery("sellerId", 24)
            );
            searchRequest.source(sourceBuilder);
            SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT);
            System.out.println(result);
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            EsClient.close(client);
        }
    }

Percentile Ranks Aggregation

百分位范围表示观察值低于某一值的百分比。例如,如果一个值大于或等于观察值的95%,那么它就属于第95百分位。
假设您的数据包含网站加载时间。您可能有一个服务协议,95%的页面加载完全在500ms内完成,99%的页面加载完全在600ms内完成。

示例:

GET latency/_search
{
    "size": 0,
    "aggs" : {
        "load_time_ranks" : {                         // @1
            "percentile_ranks" : {                     // @2
                "field" : "load_time",                   // @3
                "values" : [500, 600]                  // @4
            }
        }
    }
}

代码@1:聚合的名称。
代码@2:聚合的类型,这里使用percentile_ranks。
代码@3:用于聚合的字段。
代码@5:设置观察值。

其他的使用与1.7 Percentiles Aggregation类似,就不单独给出JAVA示例了。

Stats Aggregation

返回的统计信息包括:min、max、sum、count和avg。
其示例如下:

POST /exams/_search?size=0
{
    "aggs" : {
        "grades_stats" : { "stats" : { "field" : "grade" } }
    }
}

对应的返回结果为:

{
    ...
    "aggregations": {
        "grades_stats": {
            "count": 2,
            "min": 50.0,
            "max": 100.0,
            "avg": 75.0,
            "sum": 150.0
        }
    }
}

因为与avg的使用类似,故JAVA示例就不重复给出。

Sum Aggregation

求和聚合。类似于关系型数据库的sum函数,其使用与avg类似,故只是简单罗列一下restful的使用方式:

POST /sales/_search?size=0
{
    "query" : {
        "constant_score" : {
            "filter" : {
                "match" : { "type" : "hat" }
            }
        }
    },
    "aggs" : {
        "hat_prices" : { "sum" : { "field" : "price" } }
    }
}

value count aggregation

值个数聚合,主要是统计一个字段有多少个不同的值,例如关系型数据库中一张用户表user中一个性别字段sex,其取值为0,1,2,那不管这个表有多少行数据,sex的value count最多为3。

示例如下:

POST /sales/_search?size=0
{
    "aggs" : {
        "types_count" : { "value_count" : { "field" : "type" } }
    }
}

其响应结果如下:

{
    ...
    "aggregations": {
        "types_count": {
            "value": 7
        }
    }
}

median absolute deviation aggregation

中位绝对偏差聚合。由于这部分内容与统计学关系密切,但这并不是我的特长,故对该统计的含义做深入解读,在实际场景中,我们只需要知道ES提供了中位数偏差统计的功能,如果有这方面的需求,我们知道如何使用ES的中位数统计即可。

官方场景:
假设我们收集了商品评价数据(1星到5星之间的数值)。在实际使用过程中通常会使用平均值来展示商品的整体评价等级。中位绝对偏差聚合可以帮助我们了解评审之间的差异有多大。

在这个例子中,我们有一个平均评级为3星的产品。让我们看看它的评级的绝对偏差中值,以确定它们的变化有多大。按照我的理解,中位绝对偏差聚合 ,聚合的数据来源于(原始数据 - 所有原始数值的平均值 的绝对值进行聚合)。
例如评论原始数据如下:
1、2、5、5、4、3、5、5、5、5
其平均值:4
那中位数绝对偏差值聚合的数据为:
3、2、1、1、0、1、1、1、1、1

其Restfull示例如下:

GET reviews/_search
{
  "size": 0,
  "aggs": {
    "review_average": {     // @1
      "avg": {                
        "field": "rating"
      }
    },
    "review_variability": {    // @2
      "median_absolute_deviation": {
        "field": "rating" 
      }
    }
  }
}

该聚合包含两部分。
代码@1:针对字段rating使用AVG进行聚合(平均聚合,求出中位数)
代码@2:针对字段rating进行中位数绝对偏差聚合。

备注:在es high rest api中未封装(median absolute deviation aggregation)聚合。

ES 关于 Metric聚合就介绍到这里了,接下来将重点分析Es Buket聚合。


原文发布时间为:2019-03-10
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈

网友评论

登录后评论
0/500
评论
丁威
+ 关注
所属云栖号: 中间件兴趣圈