Logstash
Logstash 数据采集引擎
注:使用时,Logstash版本和Elasticsearch版本须保持一致!
- 数据采集
从数据库或者其它数据源中采集数据至es中
- 以id或update_time作为同步边界
基于定时任务调度实现以下两种方式的同步
根据id更新:适合增量同步,不适合修改同步
根据update_time:logstash会维护一个更新时间,从而比对数据源的更新时间,实现数据同步 - logstash-input-jdbc 插件
此插件用于同步Mysql数据源,已自带
- 预先创建索引
数据同步前,ES应先有一个索引库
环境准备
根据具体版本准备如下软件环境
我的当前目录:/home/software/
- jdk
jdk-8u191-linux-x64.tar.gz
- logstash
logstash-6.4.3.tar.gz,下载地址
- jdbc驱动
mysql-connector-java-5-1-41.jar
安装及配置logstash
解压
tar -zxvf logstash-6.4.3.tar.gz移动目录
mv logstash-6.4.3 /usr/local/ cd /usr/local/logstash-6.4.3/创建一个配置目录
mkdir sync cd sync创建一个配置文件
vim logstash-db-sync.conf :wq创建一个sql脚本文件
内容为数据同步的查询sql
vim items.sql :wq注:sql的where部分,可使用如
where tName.update_time >= :sql_last_value:sql_last_value是logstash维护一个数据最后更新时间的内置变量,每次同步完成后会更新的一个边界值拷贝mysql驱动至当前目录
cp /home/software/mysql-connector-java-5-1-41.jar .配置
logstash-db-sync.confinput { jdbc { # 设置 Mysql/MariaDB 数据库url及数据库名称 jdbc_connection_string => "jdbc:mysql://host:port/database?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 数据库驱动所在位置,可以是绝对路径或者相对路径 jdbc_driver_library => "/usr/local/logstash-6.4.3/sync/mysql-connector-java-5-1-41.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" # 开启分页,数据采集过程中 jdbc_paging_enabled => "true" # 分页每页数据,可以自定义 jdbc_page_size => "10000" # 执行的sql文件路径 statement_filepath => "/usr/local/lostash-6.4.3/sync/items.sql # 设置定时任务间隔 含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次 schedule => "* * * * *" # 索引类型 type => "_doc" # 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件 use_column_value => true # 记录上一次追踪的结果值,会自动创建trach_time这个文件 last_run_metadata_path => "/usr/local/logstash-6.4.3/sync/track_time" # 如果 use_column_value 为true,配置本参数,追踪的 column 名,可以是自增id或者时间 tracking_column => "update_time" # tracking_column 对应字段的类型 tracking_column_type => "timestamp" # 是否清除 last_tun_metadata_path 的记录,true则每次都从头开始查询 clean_run => false # 数据库字段名称大写转小写 lowercase_column_names => false } } output { elasticsearch { # es地址,集群可以配置多个 hosts => ["192.168.1.187.9200"] # 同步的索引名 index => "shop-items" # 设置_docID和数据相同 document_id => "%{id}" } # 日志输出 stdout { # 输出至控制台 codec => json_lines } }启动 logstash
cd /usr/local/logstash-6.4.3/bin/ ./logstash -f /usr/local/logstash-6.4.4/sync/logstash-db-sync.conf-f表示指定配置文件启动
观察日志,等待1分钟可见同步日志的输出
总结
以上,我们完成了 Logstash 实现ES数据同步
当数据新增、更新时,会自动两步数据至ES
注:在数据库删除数据,ES不会同步删除。解决:可以通过逻辑删除,在ES查询时带上删除标记达到”删除”效果