【最佳实践】如何使用 Elasticsearch ingest 节点来丰富日志和指标

本文涉及的产品
检索分析服务 Elasticsearch 版,2核4GB开发者规格 1个月
简介: 丰富化是将权威来源的数据合并到文档中的过程,当将这些数据导入到 Elasticsearch 中时,并用其他信息丰富文档,通常可以帮助我们更好的对信息进行搜索或查看数据。

从历史上看,数据丰富功能仅在 Logstash 中可用,但由于 Elasticsearch 7.5.0 中引入了 enrich 处理器,因此可以直接在 Elasticsearch 中进行丰富,而无需配置单独的服务/系统。如果你想知道在 Logstash 中是如何实现,那么请参阅我之前的文章 “Logstash:运用 jdbc_streaming 来丰富我们的数据”。

由于通常用于丰富的主数据通常是在 CSV 文件中创建的,因此在此博客中,我们将逐步说明如何使用 CSV 文件中的数据将在摄取节点上运行的 enrich 处理器用于丰富数据。

样本 CSV 数据

我们可以使用阿里云Elasticsearch中的 Kibana ,或通过 ECS 自建 ELK,导入以下 CSV 格式的示例主数据,然后在将文档吸收到 Elasticsearch 中时用于丰富文档。 对于本博客中给出的示例,我们将主数据存储在一个名为 test.csv 的文件中。 此数据代表组织清单中的设备。

test.csv

"Device ID","Device Location","Device Owner","Device Type"
"device1","London","Engineering","Computer"
"device2","Toronto","Consulting","Mouse"
"device3","Winnipeg","Sales","Computer"
"device4","Barcelona","Engineering","Phone"
"device5","Toronto","Consulting","Computer"
"device6","London","Consulting","Computer"

请注意,CSV 数据不应包含任何其他空格,因为当前版本的 Data Visualizer 需要精确格式化数据。 在此 github 问题 中对此进行了记录。

将 CSV 数据导入 Elasticsearch

我们可以直接使用 Kibana 来把数据导入。打开 Kibana:

image.png

点击上面的 Import a CSV, NDJSON, or log file 链接:、

image.png

点击 Select or drag and drop a file, 然后选择刚才我们创建的 test.csv 文件:

image.png


点击 Import 按钮:

image.png

我们把导入的这个 index 的名字叫做 master_data_from_csv。点击 Import 按钮:

image.png

这样就完成了我们的 master_data_from_csv 的索引创建。我们可以在上面屏幕的下方的四个选项中任意选择一个来查看导入的数据。

利用我们的主数据丰富文档

在本节中,我们演示如何使用 enrich Processor 将主数据合并到输入数据流中的文档中。关于 enrich processor,我之前有另外一篇文章有详述“Elasticsearch:enrich processor (7.5发行版新功能)”。

image.png

第一步是创建一个丰富的策略,该策略定义我们将使用哪个字段将主数据与输入数据流中的文档进行匹配。 下面提供了适用于我们的数据的示例策略:

PUT /_enrich/policy/enrich-devices-policy
{
  "match": {
    "indices": "master_data_from_csv",
    "match_field": "Device ID",
    "enrich_fields": [
      "Device Location",
      "Device Owner",
      "Device Type"
    ]
  }
}

运行上面的策略。然后,我们使用 execute enrich policy API 为该策略创建 enrich 索引:

PUT /_enrich/policy/enrich-devices-policy/_execute

接下来,我们创建一个使用我们的丰富策略的 ingest pipeline。

PUT /_ingest/pipeline/device_lookup
{
  "description": "Enrich device information",
  "processors": [
    {
      "enrich": {
        "policy_name": "enrich-devices-policy",
        "field": "device_id",
        "target_field": "my_enriched_data",
        "max_matches": "1"
      }
    }
  ]
}

我们插入一个文档,并让它使用上面定义的 ingest pipeline,如下所示:

PUT /device_index/_doc/1?pipeline=device_lookup
{
  "device_id": "device1",
  "other_field": "some value"
}

我们可以使用 GET API 查看导入的文档,如下所示:

GET device_index/_doc/1

{
  "_index" : "device_index",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1,
  "_seq_no" : 0,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_enriched_data" : {
      "Device Location" : "London",
      "Device Owner" : "Engineering",
      "Device ID" : "device1",
      "Device Type" : "Computer"
    },
    "device_id" : "device1",
    "other_field" : "some value"
  }
}

在上面,我们可以看出来在返回的文档信息中,有多一个叫做 my_enriched_data 的字段。它包含了 Device Location, Device Owner, Device ID 及 Device Type。这些信息都来自于我们之前导入的 test.csv 文档信息。enrich processor 通过关联 device_id 为 device1 而从 master_data_from_csv 索引中获得这些信息。也就是说我们的数据变得更多了,这也就是我们之前说的丰富了。

在索引设置中指定pipeline

在上面,我们通过导入时指定的 pipeline 来进行调用 enrich processor,但是在实际的应用场景中,我们更加倾向于把这个配置与 index 的设置中,而不是在请求的url中来指定特定的 pipeline。我们可以通过在 index 的配置中添加 index.default_pipeline 来完成。

PUT device_index/_settings
{
  "index.default_pipeline": "device_lookup"
}

现在,发送到 device_index 的所有文档都将通过 device_lookup 管道,而无需在 URL 中使用 pipeline=device_lookup。 我们可以使用下面的 PUT 命令来验证它是否正常工作。

PUT /device_index/_doc/2
{
  "device_id": "device2",
  "other_field": "some value"
}

执行以下命令以查看我们刚刚提取的文档:

GET device_index/_doc/2

那么你可以看到如下所示的文档:

{
  "_index" : "device_index",
  "_type" : "_doc",
  "_id" : "2",
  "_version" : 1,
  "_seq_no" : 1,
  "_primary_term" : 1,
  "found" : true,
  "_source" : {
    "my_enriched_data" : {
      "Device Location" : "Toronto",
      "Device Owner" : "Consulting",
      "Device ID" : "device2",
      "Device Type" : "Mouse"
    },
    "device_id" : "device2",
    "other_field" : "some value"
  }
}

结论

通常需要在导入时丰富文档,以确保 Elasticsearch 中的文档包含搜索或查看它们所需的信息。 在此博客中,我们演示了在 ingest 节点上运行的 enrich 处理器如何使用 CSV 数据进行扩充,这对于在将主数据吸收到 Elasticsearch 中时将主数据合并到文档中非常有用。

声明:本文由原文作者“ Elastic 中国社区布道师——刘晓国”授权转载,对未经许可擅自使用者,保留追究其法律责任的权利。
出处链接:https://elasticstack.blog.csdn.net/.


image.png

阿里云Elastic Stack】100%兼容开源ES,独有9大能力,提供免费 X-pack服务(单节点价值$6000)

相关活动


更多折扣活动,请访问阿里云 Elasticsearch 官网

阿里云 Elasticsearch 商业通用版,1核2G ,SSD 20G首月免费
阿里云 Logstash 2核4G首月免费


image.png

image.png

相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
相关文章
|
2月前
|
存储 监控 搜索推荐
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——安装篇(一)
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——安装篇(一)
|
2月前
电子书阅读分享《Elasticsearch全观测技术解析与应用(构建日志、指标、APM统一观测平台)》
电子书阅读分享《Elasticsearch全观测技术解析与应用(构建日志、指标、APM统一观测平台)》
234 1
|
2月前
|
JSON Prometheus Cloud Native
Grafana 系列 - 统一展示 -8-ElasticSearch 日志快速搜索仪表板
Grafana 系列 - 统一展示 -8-ElasticSearch 日志快速搜索仪表板
|
2月前
|
JSON 监控 Java
Java Web开发中的异常处理与日志记录最佳实践
Java Web开发中的异常处理与日志记录最佳实践
|
2月前
|
缓存 Java API
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——聚合与搜索(三)
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧——聚合与搜索(三)
|
2月前
|
存储 Java API
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧———索引与数据上传(二)
在生产环境中部署Elasticsearch:最佳实践和故障排除技巧———索引与数据上传(二)
|
3月前
dsl语句查询elasticsearch集群节点分布和资源使用情况
dsl语句查询elasticsearch集群节点分布和资源使用情况
118 0
|
4月前
|
SQL 存储 监控
使用CloudLens for SLS监控Project资源配额最佳实践
本文主要介绍如何使用CloudLens for SLS中全局错误日志、监控指标做Project 资源配额的水位监控 、超限监控 以及 如何提交资源配额提升申请。
79337 19
使用CloudLens for SLS监控Project资源配额最佳实践
|
4月前
|
存储 SQL 监控
从 Elasticsearch 到 SelectDB,观测云实现日志存储与分析的 10 倍性价比提升
SelectDB 助力观测云完成日志数据存储和分析架构升级,实现整体性价比 10 倍提升,为日志存储和分析场景服务提供强大动力。
|
5月前
|
canal 负载均衡 关系型数据库
Flink CDC如何获得增量binlog,可能是跟canal一样,伪装成从节点获取日志?
Flink CDC如何获得增量binlog,可能是跟canal一样,伪装成从节点获取日志?
97 1

相关产品

  • 检索分析服务 Elasticsearch版