美文网首页ELK stack
使用Logstash同步mysql数据到Elasticsearc

使用Logstash同步mysql数据到Elasticsearc

作者: 为技术疯狂 | 来源:发表于2018-06-19 15:22 被阅读7次

为了使海量数据能够提供实时快速的查询,mysql很显然力不从心,于是我们需要利用es提供大数据搜索服务,典型的场景就是:产品或者商品搜索。        

Logstash是一个数据收集管道,配置输入输出,可将数据从一个地方传到另一个地方。同步mysql到Elasticsearch,这里的输入,指的是mysql,输出就是Elasticsearch。

首先是数据同步,将mysql数据同步到es的方式很多,经过测试,稳定且易用的是 logstash-input-jdbc

一、下载ElasticSearch、Logstash,并解压

二、安装logstash-input-jdbc插件,默认Logstash不包含读取数据库的jdbc插件,需要手动下载。进入Logstash的bin目录,执行:

./logstash-plugin install logstash-input-jdbc

因为网络原因,安装可能费点时间。

三、在某处建一个目录,放置配置文件(哪不重要,因为执行时会配置路径),我这放到bin下的mysql目录里。如:/home/lizh/devtools/logstash-5.6.0/bin/mysql下面

四、复制mysql jdbc驱动(mysql-connector-java-5.1.35.jar)到该mysql目录下

五、编写导出数据的sql,放到mysql目录下,命名为media.sql(文件名自己取),内容类似这样:

SELECT *, id AS media_id FROM media m WHERE m.operationTime >= : sql_last_value

这里需要介绍下,我的表里有个operationTime 字段,用于记录该条记录的最后修改时间,sql_last_value是Logstash查询后,保存的上次查询时间,第一次查询时,该值是1970/1/1,所以第一次导入,如果你的表中现有数据很多,可能会有点问题,后面会根据最后修改时间,更新修改过的数据。

六、编写输入输出配置文件jdbc.conf

input {

  jdbc {

    jdbc_driver_library => "/home/lizh/devtools/logstash-5.6.0/bin/mysql/mysql-connector-java-5.1.35.jar"

    jdbc_driver_class => "com.mysql.jdbc.Driver"

    jdbc_connection_string => "jdbc:mysql://172.10.10.19:3306/edu_cbh"

    jdbc_user => "root"

    jdbc_password => "123456"

    schedule => "* * * * *"

    jdbc_default_timezone => "Asia/Shanghai"

    statement_filepath => "/home/lizh/devtools/logstash-5.6.0/bin/mysql/media.sql"

    use_column_value  => false

    last_run_metadata_path => "/home/lizh/devtools/logstash-5.6.0/bin/mysql/last_run.txt"

  }

}

output {

    elasticsearch {

        hosts => ["172.10.10.69:9200"]

        index => "edu"

        document_id => "%{media_id}"

        document_type => "media"

    }

    stdout {

        codec => json_lines

    }

}

字段介绍:

input.jdbc.jdbc_driver_library jdbc驱动的位置

input.jdbc.jdbc_driver_class    驱动类名

input.jdbc.jdbc_connection_string  数据库连接字符串

input.jdbc.jdbc_user    用户名

input.jdbc.jdbc_password  密码

input.jdbc.schedule  更新计划(参考linux crontab)

input.jdbc.jdbc_default_timezone  时区,默认没有时区,日志里时间差8小时,中国需要用Asia/Shanghai

input.jdbc.statement_filepath 导出数据的sql文件,就是上面写的

input.jdbc.use_column_value  如果是true,sql_last_value是tracking_column指定字段的数字值,false就是时间,默认是false

input.jdbc.last_run_metadata_path  保存sql_last_value值文件的位置

output.elasticsearch.hosts elasticsearch服务器,填多个,请求会负载均衡。

output.elasticsearch.index 索引名

output.elasticsearch.document_id  生成文件的id,这里使用sql产生的media_id

output.elasticsearch.document_type  文档类型

output.stdout 配置的是命令行输出,使用json

六、配置完以后,启动

./logstash -f mysql/jdbc.conf

按上面的配置,logstash会每分钟查询一次表,看是否会更新,有更新则提交到Elasticsearch。

如果需要同时同步多个表,那么需要以下配置

input {

  jdbc {

    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"

    jdbc_driver_class => "com.mysql.jdbc.Driver"

    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"

    jdbc_user => "root"

    jdbc_password => "password"

    schedule => "* * * * *"

    statement => "select * from table1"

    type => "table1"

  }

  jdbc {

    jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"

    jdbc_driver_class => "com.mysql.jdbc.Driver"

    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"

    jdbc_user => "root"

    jdbc_password => "password"

    schedule => "* * * * *"

    statement => "select * from table2"

    type => "table2"

  }

  # add more jdbc inputs to suit your needs

}

output {

    elasticsearch {

        index => "testdb"

        document_type => "%{type}"  # <- use the type from each input

        hosts => "localhost:9200"

    }

}

最终目录结构

最终目录结构

备注:

全量同步与增量同步

全量同步是指全部将数据同步到es,通常是刚建立es,第一次同步时使用。增量同步是指将后续的更新、插入记录同步到es。(删除记录没有办法同步,只能两边执行自己的删除命令)

根据公司内部实践,logstash-input-jdbc增量同步的原理很简单。我们做增量同步是需要知道插入和更新记录的,因此,进入ES提供搜索服务的表(要同步的标),都要加上update_time,每次插入和更新的时候更新这个字段,让logstash-input-jdbc知道即可。

参考:

https://www.pocketdigi.com/20171212/1585.html

https://blog.csdn.net/kingice1014/article/details/72413196

https://blog.csdn.net/u013055678/article/details/77882435

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_predefined_parameters

https://blog.csdn.net/yeyuma/article/details/50240595#quote

https://yq.aliyun.com/articles/276730

相关文章

网友评论

    本文标题:使用Logstash同步mysql数据到Elasticsearc

    本文链接:https://www.haomeiwen.com/subject/ibbaeftx.html