elasticSearch数据导入工具logstash-input-jdbc 同步原理及相关问题解读

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,高可用系列 2核4GB
简介: 前言:基于logstash-input-jdbc较其他插件的稳定性、易用性、版本和ES同步更新的特点,以下研究主要针对 logstash-input-jdbc 展开。

前言:

基于logstash-input-jdbc较其他插件的稳定性、易用性、版本和ES同步更新的特点,以下研究主要针对 logstash-input-jdbc 展开。 
针对logstash-input-jdbc常见的几个疑难问题,部分问题也在git和stackoverflow进行了激烈讨论,以下统一给出验证和解答。

1、logstash-input-jdbc 的同步原理是什么?

(1)、对于全量同步依据

配置文件jdbc.sql的sql语句的进行同步。

(2)、对于增量实时同步依据

1)设定的定时策略。

如最小更新间隔每分钟更新一次设定:schedule => “* * * * *”,目前最小更新间隔为1分钟,验证发现,不支持60s以内的秒级更新。

2)设定的sql语句。

如jdbc.sql, 决定同步哪些内容及同步更新的条件。

{"id":10,"name":"10test","@version":"1","@timestamp":"2016-06-29T03:18:00.177Z","type":"132c_type"}
AI 代码解读
  • 1

2:logstash-input-jdbc 只支持基于时间同步吗?

验证表名:同步更新除了支持根据时间同步外,还支持根据某自增列(如:自增ID)字段的变化进行同步。

上次举例只是举了同步时间变化的例子,设定条件:

[root@5b9dbaaa148a logstash_jdbc_test]# cat jdbc.sql_bak

select
        *
from
        cc
where   cc.modified_at > :sql_last_value
AI 代码解读
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

实际进一步研究发现,在配置文件中有use_column_value字段决定,是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要 track 的 column 名称,此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.

举例:以下即是设定以id的变化作为同步条件的。

[root@5b9dbaaa148a logstash_jdbc_test]# cat jdbc_xm.sql
select
        *
from
        cc
where   cc.id >= :sql_last_value
AI 代码解读
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

我们可以指定文件,来记录上次执行到的 tracking_column 字段的值 比如上次数据库有 12 条记录,查询完后该文件中就会有数字 12 这样的记录,下次执行 SQL 查询可以从 13 条处开始.

我们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值(12).

last_run_metadata_path => “/etc/logstash/run_metadata.d/my_info”

如:

[root@5b9 run_metadata.d]# cat /etc/logstash/run_metadata.d/my_info

--- 12
AI 代码解读
  • 1
  • 2
  • 3

已全局代码搜索,没有触发器trigger相关处理操作。

3:mysql和ES分别存储在两台服务器,且时间不一致,能否实现同步?

(1). 设定对于以时间作为判定条件的增量同步,可以以设定的时间为基准点进行同步。

验证发现:

显示的时间戳timestamp为ES上的UTC时间值(不论ES机器是什么时区,都会修改为UTC时间存入ES),显示的modified_at时间值为同步过来的mysql时间值转化为UTC的结果值。

更新的前提是必须满足: cc.modified_at >= :sql_last_value。即如果mysql的时间修改为小于sql_last_value的时刻值,是无法进行同步的。

   如:
AI 代码解读
  • 1
  • 2
[elasticsearch@5b9dbaaa148a run_metadata.d]$ cat my_info

--- 2016-06-29 02:19:00.182000000 Z
AI 代码解读
  • 1
  • 2
  • 3

(2). 对于选定某列作为判定条件(如自增ID),两者(mysql和ES)时间不一致,实际是也可以同步更新的。

验证发现:

测试设定的时间是mysql比ES早一天或者晚一天的时刻值,都可以实现同步更新操作。

4:如何支持实时同步mysql的delete操作到ES中?

logstash-input-jdbc插件不支持物理删除的同步更新。详见:

http://stackoverflow.com/questions/35813923/sync-postgresql-data-with-elasticsearch/35823497#35823497

https://github.com/logstash-plugins/logstash-input-jdbc/issues/145

解决方案:

同步删除操作改为同步update更新操作实现。

第一步:进行软件删除,而不是物理删除操作。

先不物理删除记录,而是软件删除,即新增一个 flag 列,标识记录是否已经被删除(默认为false,设置为true或者deleted代表已经被删除,业界通用方法),这样,通过已有的同步机制,相同的标记记录该行数据会同步更新到Elasticsearch。

第二步:ES中检索flag标记为true或者deleted的字段信息。

在ES可以执行简单的term查询操作,检索出已经删除的数据信息。

第三步:定时物理删除。

设置定时事件,间隔一段时间物理删除掉mysql和ES中的flag字段标记为true或deleted的记录,即完成物理删除操。

原文网址:http://blog.csdn.net/laoyang360/article/details/51793301


相关实践学习
使用阿里云Elasticsearch体验信息检索加速
通过创建登录阿里云Elasticsearch集群,使用DataWorks将MySQL数据同步至Elasticsearch,体验多条件检索效果,简单展示数据同步和信息检索加速的过程和操作。
ElasticSearch 入门精讲
ElasticSearch是一个开源的、基于Lucene的、分布式、高扩展、高实时的搜索与数据分析引擎。根据DB-Engines的排名显示,Elasticsearch是最受欢迎的企业搜索引擎,其次是Apache Solr(也是基于Lucene)。 ElasticSearch的实现原理主要分为以下几个步骤: 用户将数据提交到Elastic Search 数据库中 通过分词控制器去将对应的语句分词,将其权重和分词结果一并存入数据 当用户搜索数据时候,再根据权重将结果排名、打分 将返回结果呈现给用户 Elasticsearch可以用于搜索各种文档。它提供可扩展的搜索,具有接近实时的搜索,并支持多租户。
目录
打赏
0
0
0
0
3
分享
相关文章
【YashanDB知识库】jdbc查询st_geometry类型的数据时抛出YAS-00101 cannot allocate 0 bytes for anlHeapMalloc异常
【YashanDB知识库】jdbc查询st_geometry类型的数据时抛出YAS-00101 cannot allocate 0 bytes for anlHeapMalloc异常
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
本文以GIS表为例,介绍通过Java代码向数据库插入POINT类型地理数据的方法。首先创建包含ID和POS字段的GIS表,POS字段为ST_GEOMETRY类型。接着利用Java的PreparedStatement批量插入10条经纬度相同的POINT数据,最后查询结果显示成功插入10条记录,验证了操作的正确性。
51 19
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
【YashanDB知识库】如何使用jdbc向YashanDB批量插入gis数据
【YashanDB 知识库】jdbc 查询 st_geometry 类型的数据时抛出 YAS-00101 cannot allocate 0 bytes for anlHeapMalloc 异常
**简介:** 客户在使用 YashanDB JDBC 驱动查询含 st_geometry 列的数据时,遇到 YAS-00101 错误,提示无法分配内存。该问题影响所有版本的 YashanDB,导致业务中断。原因是用户缺少对 st_geometry 类型的 execute 权限。解决方法是为用户赋权:`grant execute any type to <username>;` 以恢复正常运行。
【YashanDB 知识库】jdbc 查询 st_geometry 类型的数据时抛出 YAS-00101 cannot allocate 0 bytes for anlHeapMalloc 异常
**问题简介:** 客户使用 YashanDB JDBC 驱动查询含 st_geometry 列的数据时,出现 YAS-00101 错误,提示无法分配 0 字节内存。该问题影响所有 YashanDB 版本,导致业务中断。原因是数据库用户缺少 st_geometry 类型的 execute 权限。解决方法是为用户赋权:`grant execute any type to <username>;`。
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
194 0
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
81 0
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-22 Sqoop 数据MySQL到HDFS(全量) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
123 0

热门文章

最新文章