13、ES实战:分组聚合之一

分组聚和不像度量聚合那样通过字段进行计算,而是根据文档创建分组。每个聚合都关联一个标准(取决于聚合的类型),决定了一个文档在当前的条件下是否会“划入”分组中。
换句话说,分组实际上定义了一个文档集。除了这些分组之外,分组聚和也会计算和返回“划入”每个分组中文档的数量。
与度量聚合不同,分组聚合可以拥有子聚合。这些子聚合可以聚合由它们的“父”聚合创建分组。
分组集合有不同的类型,对应着不同的“分组”策略。有的策略定义一个分组(单分组聚合),有的策略定义固定数量的分组(多分组聚合),还有的策略在聚合执行过程中动态的创建分组。

1、 日期直方图聚合;
2、 时间范围聚合;
3、 嵌套聚合;

1、日期直方图聚合

日期直方图集合是一个多分组聚合,除了只能应用于日期的值之外,和“直方图聚合”的作用一样的。
自从Elasticsearch将日期类型作为内置类型之后,在日期上进行普通的直方图聚合得到了很好的支持。但是这么做会在精度上有所失真,这是因为时间间隔不是固定的(思考一下,一年或者一个月有多少天)。因为这个原因,我们需要对基于数据的时间进行忒别支持。从功能性的角度来说,最主要的不同就是可以通过日期/时间表达式指定间隔。
以一个月为间隔请求:

POST /sales/_search?size=0
{
   
     
    "aggs" : {
   
     
        "sales_over_time" : {
   
     
            "date_histogram" : {
   
     
                "field" : "date",
                "interval" : "month"
            }
        }
    }
}

可用的时间间隔表达式:year、quarter、month、week、day、hour、minute、second(年份、季度、月、周、日、小时、分钟、秒)

时间值也可以通过由时间单元解析支持的缩写来指定。注意:不支持分数的时间值,但是你可以通过转移到另一个时间单位地址本(例如:1.5h可以被指定为90m)。

POST /sales/_search?size=0
{
   
     
    "aggs" : {
   
     
        "sales_over_time" : {
   
     
            "date_histogram" : {
   
     
                "field" : "date",
                "interval" : "90m"
            }
        }
    }
}

1.1、时间键值:

本质上,日期表示为一个 64 位的时间戳数字,代表从纪元开始到现在的毫秒数。这些时间戳为分组的键返回。key_as_string 是相同的时间戳转换为格式化的日期字符串,格式通过 format 参数指定。
Note:如果没有指定格式,则它将使用字段映射中指定的第一日期格式。
请求示例:

POST /sales/_search?size=0
{
   
     
    "aggs" : {
   
     
        "sales_over_time" : {
   
     
            "date_histogram" : {
   
     
                "field" : "date",
                "interval" : "1M",
                "format" : "yyyy-MM-dd" 
            }
        }
    }
}

返回值:

{
   
     
    ...
    "aggregations": {
   
     
        "sales_over_time": {
   
     
            "buckets": [
                {
   
     
                    "key_as_string": "2015-01-01",
                    "key": 1420070400000,
                    "doc_count": 3
                },
                {
   
     
                    "key_as_string": "2015-02-01",
                    "key": 1422748800000,
                    "doc_count": 2
                },
                {
   
     
                    "key_as_string": "2015-03-01",
                    "key": 1425168000000,
                    "doc_count": 2
                }
            ]
        }
    }
}

1.2、时间区间

日期时间以世界标准时区(UTC)存储在Elasticsearch中。默认情况下,所有的分组和取整的操作也都是以世界标准时完成的。time_zone参数用于表示分组时需要使用不同的时间区间。
时间区间也可以被指定为一个ISO 8601 UTC偏移(如+ 01:00或08:00)或作为一个互联网编码数据库中的时间区间标识符,比如America/Los_Angeles。
考虑下面的例子:

PUT my_index/log/1?refresh
{
   
     
  "date": "2015-10-01T00:30:00Z"
}

PUT my_index/log/2?refresh
{
   
     
  "date": "2015-10-01T01:30:00Z"
}

GET my_index/_search?size=0
{
   
     
  "aggs": {
   
     
    "by_day": {
   
     
      "date_histogram": {
   
     
        "field":     "date",
        "interval":  "day"
      }
    }
  }
}

如果没有指定时间区间,就会使用世界标准时间,结果就是这些文档会放置在同一天的分组里,这个分组开始于2015年10月1日UTC午夜开始:

{
   
     
  ...
  "aggregations": {
   
     
    "by_day": {
   
     
      "buckets": [
        {
   
     
          "key_as_string": "2015-10-01T00:00:00.000Z",
          "key":           1443657600000,
          "doc_count":     2
        }
      ]
    }
  }
}

如果指定一个 - 01:00的时间分区,午夜时间就会比标准时间的午夜提前一个小时:

GET my_index/_search?size=0
{
   
     
  "aggs": {
   
     
    "by_day": {
   
     
      "date_histogram": {
   
     
        "field":     "date",
        "interval":  "day",
        "time_zone": "-01:00"
      }
    }
  }
}

现在,第一份文档就会划分入2015年9月30日的分组里,第二份文档划入2015年10月1日的分组了:

{
   
     
  ...
  "aggregations": {
   
     
    "by_day": {
   
     
      "buckets": [
        {
   
     
          "key_as_string": "2015-09-30T00:00:00.000-01:00", 
          "key": 1443574800000,
          "doc_count": 1
        },
        {
   
     
          "key_as_string": "2015-10-01T00:00:00.000-01:00", 
          "key": 1443661200000,
          "doc_count": 1
        }
      ]
    }
  }
}

key_as_string值代表指定的时区中每一天的午夜。

Warning:当使用DST(夏时制)更改的时区时,接近这些更改发生时的桶的大小与所使用的间隔所期望的大小略有不同。例如,考虑在CET时间区DST开始:2016年3月27日凌晨两点,时钟转了1小时,当地时间凌晨3点。当使用一天作为间隔时,那一天的桶盖只会保存23小时的数据,而不是其他桶通常的24小时。相同的时间间隔越短如12h是真的。在这里,我们将在3月27日上午在DST的转变发生只有一11h。

1.3、偏移

offset参数通过指定正(+)或负偏移(-)时间来修改每个分组的时间起始值,比如1h代表一小时,1M代表一个月。
例如,档用day作为时间间隔,每个分组的时间范围是从午夜到午夜。设置offset参数为+6h,会修改每个分组的时间范围从6点到6点。

PUT my_index/log/1?refresh
{
   
     
  "date": "2015-10-01T05:30:00Z"
}

PUT my_index/log/2?refresh
{
   
     
  "date": "2015-10-01T06:30:00Z"
}

GET my_index/_search?size=0
{
   
     
  "aggs": {
   
     
    "by_day": {
   
     
      "date_histogram": {
   
     
        "field":     "date",
        "interval":  "day",
        "offset":    "+6h"
      }
    }
  }
}

而不是一个单一的开始时间在午夜开始,上述请求组文件转换成开始时间从早上6点:

{
   
     
  ...
  "aggregations": {
   
     
    "by_day": {
   
     
      "buckets": [
        {
   
     
          "key_as_string": "2015-09-30T06:00:00.000Z",
          "key": 1443592800000,
          "doc_count": 1
        },
        {
   
     
          "key_as_string": "2015-10-01T06:00:00.000Z",
          "key": 1443679200000,
          "doc_count": 1
        }
      ]
    }
  }
}

Note:开始时间偏移量在时间区间做出调整后是计算在内的。

1.4、返回键控

将键控标志设置为true将与每个开始时间关联唯一的字符串键,并将范围返回为散列而不是数组:

POST /sales/_search?size=0
{
   
     
    "aggs" : {
   
     
        "sales_over_time" : {
   
     
            "date_histogram" : {
   
     
                "field" : "date",
                "interval" : "1M",
                "format" : "yyyy-MM-dd",
                "keyed": true
            }
        }
    }
}

返回值:

{
   
     
    ...
    "aggregations": {
   
     
        "sales_over_time": {
   
     
            "buckets": {
   
     
                "2015-01-01": {
   
     
                    "key_as_string": "2015-01-01",
                    "key": 1420070400000,
                    "doc_count": 3
                },
                "2015-02-01": {
   
     
                    "key_as_string": "2015-02-01",
                    "key": 1422748800000,
                    "doc_count": 2
                },
                "2015-03-01": {
   
     
                    "key_as_string": "2015-03-01",
                    "key": 1425168000000,
                    "doc_count": 2
                }
            }
        }
    }
}

Java API调用
导入聚合定义类:

import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;

下面是一个如何创建聚合请求的示例:

AggregationBuilder aggregation = AggregationBuilders
                .dateHistogram("agg")
                .field("dateOfBirth")
                .dateHistogramInterval(DateHistogramInterval.YEAR);
SearchRequestBuilder srb = client.prepareSearch(indexName)
                .setTypes(typeName);
srb.addAggregation(aggregation);

SearchResponse response =  srb.execute().actionGet();

Histogram agg = response.getAggregations().get("agg");

// For each entry
for (Histogram.Bucket entry : agg.getBuckets()) {
   
     
    DateTime key = (DateTime) entry.getKey();    // Key
    String keyAsString = entry.getKeyAsString(); // Key as String
    long docCount = entry.getDocCount();         // Doc count

    logger.info("key [{}], date [{}], doc_count [{}]", keyAsString, key.getYear(), docCount);
}

或者如果你想设定10天的间隔:

AggregationBuilder aggregation = AggregationBuilders
                .dateHistogram("agg")
                .field("dateOfBirth")
                .dateHistogramInterval(DateHistogramInterval.days(10));
earchRequestBuilder srb = client.prepareSearch(indexName)
                .setTypes(typeName);
srb.addAggregation(aggregation);

SearchResponse response =  srb.execute().actionGet();

Histogram agg = response.getAggregations().get("agg");

// For each entry
for (Histogram.Bucket entry : agg.getBuckets()) {
   
     
    DateTime key = (DateTime) entry.getKey();    // Key
    String keyAsString = entry.getKeyAsString(); // Key as String
    long docCount = entry.getDocCount();         // Doc count

    logger.info("key [{}], date [{}], doc_count [{}]", keyAsString, key.getYear(), docCount);
}
2、时间范围聚合

时间范围聚合是一个专门用于时间类型数据的范围聚合,与普通的范围聚合最大的区别在于,from和to参数值可以使用日期数值表达式,也可以指定返回的响应中from和to字段的日期格式。注意时间范围聚合的每个范围里面包含from值但排除to值。
示例:

POST /sales/_search?size=0
{
   
     
    "aggs": {
   
     
        "range": {
   
     
            "date_range": {
   
     
                "field": "date",
                "format": "MM-yyy",
                "ranges": [
                    {
   
      "to": "now-10M/M" }, 
                    {
   
      "from": "now-10M/M" } 
                ]
            }
        }
    }
}

说明:现在减去10个月,并跳转到月份的开始
在上面的例子里,我们创建了两个范围分组,第一个会包含日期在10个月之前的所有文件,第二个会包含从10个月前到现在的所有文件。
返回值:

{
   
     
    ...
    "aggregations": {
   
     
        "range": {
   
     
            "buckets": [
                {
   
     
                    "to": 1.4436576E12,
                    "to_as_string": "10-2015",
                    "doc_count": 7,
                    "key": "*-10-2015"
                },
                {
   
     
                    "from": 1.4436576E12,
                    "from_as_string": "10-2015",
                    "doc_count": 0,
                    "key": "10-2015-*"
                }
            ]
        }
    }
}

2.1、时间格式

所有ASCII字母都保留为格式模式字母,其定义如下:

符号 含义 描述 示例
G era text AD
C century of era (>=0) number 20
Y year of era (>=0) year 1996
x weekyear year 1996
w week of weekyear number 27
e day of week number 2
E day of week text Tuesday; Tue
y year year 1996
D day of year number 189
M month of year month July; Jul; 07
d day of month number 10
a halfday of day text PM
K hour of halfday (0~11) number 0
h clockhour of halfday (1~12) number 12
H hour of day (0~23) number 0
k clockhour of day (1~24) number 24
m minute of hour number 30
s second of minute number 55
S fraction of second number 978
z time zone text Pacific Standard Time; PST
Z time zone offset/id zone -0800; -08:00; America/Los_Angeles
escape for text delimiter ‘’

模式字母的计数决定格式。

  • Text:如果模式字母的数目是4或更多,则使用完整形式;否则,如果可用,则使用缩写或缩写形式。
  • Number:最小位数。较短的数字是零填充到这个数额。
  • Year:年与weekyear场数值表示的特殊处理。例如,如果y的计数为2,则该年份将显示为本世纪的零基年份,即两位数。
  • Month:使用3或以上,使用text;否则使用number。
  • Zone:Z的输出偏移没有冒号,ZZ输出偏移一个冒号,ZZZ或多个输出区域ID。
  • Zone names:时区名称(z)无法解析。

模式中的任何字符不在[a…z]或者[A…Z]的范围内,将被视为引用文本。
例如,人物如:,,,“#和?将出现在结果时间文本,即使他们不接受单引号内。

2.2、时间范围聚合中的时间区间

日期可以从另一个时区的时间转换为UTC通过指定time_zone参数。
时间区间参数也适用于数据的数学表达式的舍入。举个例子,在CET时间段开始时,你可以做以下几件事:

POST /sales/_search?size=0
{
   
     
   "aggs": {
   
     
       "range": {
   
     
           "date_range": {
   
     
               "field": "date",
               "time_zone": "CET",
               "ranges": [
                  {
   
      "to": "2016/02/01" }, 
                  {
   
      "from": "2016/02/01", "to" : "now/d" },
                  {
   
      "from": "now/d" }
              ]
          }
      }
   }
}

该日期将被转换为2016-02-15t00:00:00.000 + 01:00。
now/d 将在CET时间段结束。

2.3、返回键控

将键控标志设置为 true 将与每个开始时间关联唯一的字符串键,并将范围返回为散列而不是数组:

POST /sales/_search?size=0
{
   
     
    "aggs": {
   
     
        "range": {
   
     
            "date_range": {
   
     
                "field": "date",
                "format": "MM-yyy",
                "ranges": [
                    {
   
      "to": "now-10M/M" },
                    {
   
      "from": "now-10M/M" }
                ],
                "keyed": true
            }
        }
    }
}

返回值:

{
   
     
    ...
    "aggregations": {
   
     
        "range": {
   
     
            "buckets": {
   
     
                "*-10-2015": {
   
     
                    "to": 1.4436576E12,
                    "to_as_string": "10-2015",
                    "doc_count": 7
                },
                "10-2015-*": {
   
     
                    "from": 1.4436576E12,
                    "from_as_string": "10-2015",
                    "doc_count": 0
                }
            }
        }
    }
}

还可以自定义每个范围的键:

POST /sales/_search?size=0
{
   
     
    "aggs": {
   
     
        "range": {
   
     
            "date_range": {
   
     
                "field": "date",
                "format": "MM-yyy",
                "ranges": [
                    {
   
      "from": "01-2015",  "to": "03-2015", "key": "quarter_01" },
                    {
   
      "from": "03-2015", "to": "06-2015", "key": "quarter_02" }
                ],
                "keyed": true
            }
        }
    }
}

返回值:

{
   
     
    ...
    "aggregations": {
   
     
        "range": {
   
     
            "buckets": {
   
     
                "quarter_01": {
   
     
                    "from": 1.4200704E12,
                    "from_as_string": "01-2015",
                    "to": 1.425168E12,
                    "to_as_string": "03-2015",
                    "doc_count": 5
                },
                "quarter_02": {
   
     
                    "from": 1.425168E12,
                    "from_as_string": "03-2015",
                    "to": 1.4331168E12,
                    "to_as_string": "06-2015",
                    "doc_count": 2
                }
            }
        }
    }
}

Java API调用
导入聚合定义类:

import org.elasticsearch.search.aggregations.bucket.range.Range;

下面是一个如何创建聚合请求的示例:

AggregationBuilder aggregation = AggregationBuilders
                .dateRange("agg")
                .field("dateOfBirth")
                .format("yyyy")
                .addUnboundedTo("1950")    // from -infinity to 1950 (excluded)
                .addRange("1950", "1960")  // from 1950 to 1960 (excluded)
                .addUnboundedFrom("1960"); // from 1960 to +infinity
SearchRequestBuilder srb = client.prepareSearch(indexName)
                .setTypes(typeName);
srb.addAggregation(aggregation);

SearchResponse response =  srb.execute().actionGet();

// sr is here your SearchResponse object
Range agg = sr.getAggregations().get("agg");

// For each entry
for (Range.Bucket entry : agg.getBuckets()) {
   
     
    String key = entry.getKeyAsString();                // Date range as key
    DateTime fromAsDate = (DateTime) entry.getFrom();   // Date bucket from as a Date
    DateTime toAsDate = (DateTime) entry.getTo();       // Date bucket to as a Date
    long docCount = entry.getDocCount();                // Doc count

    logger.info("key [{}], from [{}], to [{}], doc_count [{}]", key, fromAsDate, toAsDate, docCount);
}

这基本上会产生:

key [*-1950], from [null], to [1950-01-01T00:00:00.000Z], doc_count [8]
key [1950-1960], from [1950-01-01T00:00:00.000Z], to [1960-01-01T00:00:00.000Z], doc_count [5]
key [1960-*], from [1960-01-01T00:00:00.000Z], to [null], doc_count [37]

3、嵌套聚合

嵌套聚合是一种特殊的单分组聚合,可以聚合嵌套的文档。
例如,假设我们有一个产品索引,每个产品来自不同的经销商——有着不同的价格。索引的映射如下:

{
   
     
	...
	"product":{
   
     
		"properties":{
   
     
			"resellers":{
   
     
				"type":"nested",
				“properties”:{
   
     
					"name":{
   
     "type":"string"},
					"price":{
   
     "type":"double"}
				}
			}
		}
	}
}

product对象下面的resellers是一个包含嵌套文档的数组:

{
   
     
	"query" : {
   
     "match" : {
   
     "name" : "led tv"}},
	"agg" : {
   
     
		"resellers" : {
   
     
			"nested" : {
   
     "path" : "resellers"},
			"agg" : {
   
     "min_price" : {
   
     "min" : {
   
     "field" : "resellers.price"}}}
		}
	}
}

这个示例会返回可以买到的产品的最低价。
嵌入聚合需要定义嵌入文档在顶级文档中的路径(path)。我们可以在这些嵌入文档上定义任何类型的聚合。
返回值:

{
   
     
	"aggregations" : {
   
     
		"resellers" : {
   
     
			“min_price” : {
   
     “value” : “350”}
		}
	}
}

Java API调用
下面是一个如何创建聚合请求的示例:

//时间范围聚合
AggregationBuilder  DateRange  = AggregationBuilders
			                .dateRange("agg")
			                .field("tm")
			                .format("yyyy/MM/dd HH:mm:ss")
			                .addRange("2017/11/13 10:35:24", "2017/11/17");
			
//日期直方图聚合
AggregationBuilder DateHistogram = AggregationBuilders
			                .dateHistogram("agg")
			                .field("tm")
			                .dateHistogramInterval(DateHistogramInterval.DAY);

//统计聚合
StatsAggregationBuilder Stats = AggregationBuilders
			                .stats("stats")
			                .field("value");

DateRange.subAggregation(DateHistogram.subAggregation(Stats));

SearchRequestBuilder srb = client.prepareSearch(indexName)
				.setTypes(indexType);
//过滤条件(先过滤后聚合)
srb.setQuery(QueryBuilders.boolQuery()
			.must(QueryBuilders.matchPhraseQuery("uid", "0"))
			.must(QueryBuilders.rangeQuery("value").gte(15)));
srb.addAggregation(DateRange);
			
SearchResponse response =  srb.execute().actionGet();

Range agg = response.getAggregations().get("agg");
			
for (Range.Bucket entry : agg.getBuckets()) {
   
     
		Histogram agg2 = (Histogram) entry.getAggregations().asMap().get("agg");
		for(Histogram.Bucket entry2 : agg2.getBuckets()){
   
     
			DateTime key = (DateTime) entry2.getKey();    // Key
			String keyAsString = entry2.getKeyAsString(); // Key as String
			double min = (double) entry2.getAggregations().asMap()
									.get("stats").getProperty("min");
			double max = (double) entry2.getAggregations().asMap()
									.get("stats").getProperty("max");
			double sum = (double) entry2.getAggregations().asMap()
									.get("stats").getProperty("sum");
			double avg  = (double) entry2.getAggregations().asMap()
									.get("stats").getProperty("avg");
			double count = (double) entry2.getAggregations().asMap()
									.get("stats").getProperty("count");
				    
			if(count>0){
   
     
				 System.out.println("key [{"+keyAsString+"}], date [{"+key.getDayOfMonth()+"}], doc_count [{"+count+"}, min_value[{"+min+"}], max_value[{"+max+"}], sum_value[{"+sum+"}], avg_value[{"+avg+"}]]");
				    }
		}
}

版权声明:本文不是「本站」原创文章,版权归原作者所有 | 原文地址: