为了使海量数据能够提供实时快速的查询,mysql很显然力不从心,于是我们需要利用es提供大数据搜索服务,典型的场景就是:产品或者商品搜索。
Logstash是一个数据收集管道,配置输入输出,可将数据从一个地方传到另一个地方。同步mysql到Elasticsearch,这里的输入,指的是mysql,输出就是Elasticsearch。
首先是数据同步,将mysql数据同步到es的方式很多,经过测试,稳定且易用的是 logstash-input-jdbc
一、下载ElasticSearch、Logstash,并解压
二、安装logstash-input-jdbc插件,默认Logstash不包含读取数据库的jdbc插件,需要手动下载。进入Logstash的bin目录,执行:
./logstash-plugin install logstash-input-jdbc
因为网络原因,安装可能费点时间。
三、在某处建一个目录,放置配置文件(哪不重要,因为执行时会配置路径),我这放到bin下的mysql目录里。如:/home/lizh/devtools/logstash-5.6.0/bin/mysql下面
四、复制mysql jdbc驱动(mysql-connector-java-5.1.35.jar)到该mysql目录下
五、编写导出数据的sql,放到mysql目录下,命名为media.sql(文件名自己取),内容类似这样:
SELECT *, id AS media_id FROM media m WHERE m.operationTime >= : sql_last_value
这里需要介绍下,我的表里有个operationTime 字段,用于记录该条记录的最后修改时间,sql_last_value是Logstash查询后,保存的上次查询时间,第一次查询时,该值是1970/1/1,所以第一次导入,如果你的表中现有数据很多,可能会有点问题,后面会根据最后修改时间,更新修改过的数据。
六、编写输入输出配置文件jdbc.conf
input {
jdbc {
jdbc_driver_library => "/home/lizh/devtools/logstash-5.6.0/bin/mysql/mysql-connector-java-5.1.35.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://172.10.10.19:3306/edu_cbh"
jdbc_user => "root"
jdbc_password => "123456"
schedule => "* * * * *"
jdbc_default_timezone => "Asia/Shanghai"
statement_filepath => "/home/lizh/devtools/logstash-5.6.0/bin/mysql/media.sql"
use_column_value => false
last_run_metadata_path => "/home/lizh/devtools/logstash-5.6.0/bin/mysql/last_run.txt"
}
}
output {
elasticsearch {
hosts => ["172.10.10.69:9200"]
index => "edu"
document_id => "%{media_id}"
document_type => "media"
}
stdout {
codec => json_lines
}
}
字段介绍:
input.jdbc.jdbc_driver_library jdbc驱动的位置
input.jdbc.jdbc_driver_class 驱动类名
input.jdbc.jdbc_connection_string 数据库连接字符串
input.jdbc.jdbc_user 用户名
input.jdbc.jdbc_password 密码
input.jdbc.schedule 更新计划(参考linux crontab)
input.jdbc.jdbc_default_timezone 时区,默认没有时区,日志里时间差8小时,中国需要用Asia/Shanghai
input.jdbc.statement_filepath 导出数据的sql文件,就是上面写的
input.jdbc.use_column_value 如果是true,sql_last_value是tracking_column指定字段的数字值,false就是时间,默认是false
input.jdbc.last_run_metadata_path 保存sql_last_value值文件的位置
output.elasticsearch.hosts elasticsearch服务器,填多个,请求会负载均衡。
output.elasticsearch.index 索引名
output.elasticsearch.document_id 生成文件的id,这里使用sql产生的media_id
output.elasticsearch.document_type 文档类型
output.stdout 配置的是命令行输出,使用json
六、配置完以后,启动
./logstash -f mysql/jdbc.conf
按上面的配置,logstash会每分钟查询一次表,看是否会更新,有更新则提交到Elasticsearch。
如果需要同时同步多个表,那么需要以下配置
input {
jdbc {
jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
jdbc_user => "root"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from table1"
type => "table1"
}
jdbc {
jdbc_driver_library => "/Users/logstash/mysql-connector-java-5.1.39-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
jdbc_user => "root"
jdbc_password => "password"
schedule => "* * * * *"
statement => "select * from table2"
type => "table2"
}
# add more jdbc inputs to suit your needs
}
output {
elasticsearch {
index => "testdb"
document_type => "%{type}" # <- use the type from each input
hosts => "localhost:9200"
}
}
最终目录结构

备注:
全量同步与增量同步
全量同步是指全部将数据同步到es,通常是刚建立es,第一次同步时使用。增量同步是指将后续的更新、插入记录同步到es。(删除记录没有办法同步,只能两边执行自己的删除命令)
根据公司内部实践,logstash-input-jdbc增量同步的原理很简单。我们做增量同步是需要知道插入和更新记录的,因此,进入ES提供搜索服务的表(要同步的标),都要加上update_time,每次插入和更新的时候更新这个字段,让logstash-input-jdbc知道即可。
参考:
https://www.pocketdigi.com/20171212/1585.html
https://blog.csdn.net/kingice1014/article/details/72413196
https://blog.csdn.net/u013055678/article/details/77882435
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html#_predefined_parameters
https://blog.csdn.net/yeyuma/article/details/50240595#quote
https://yq.aliyun.com/articles/276730
网友评论