下文给大家带来有关通过logstash增量采集mysql的数据内容,相信大家一定看过类似的文章。我们给大家带来的有何不同呢?一起来看看正文部分吧,相信看完通过logstash增量采集mysql的数据你一定会有所收获。
最近有一项需求需要把mysql中一个表中的数据同步到es中,分析后使用logstash的jdbc插件获取mysql中的数据,output到es中,采集的情况分两种:开始是全量的采集,之后是增量采集。
已下式验证的过程:
1.安装logstash和mysql
mysql和logstash的安装就不多说了,可以在网上自行查找。
需要注意的事项是:
1)可能需要安装:bin/plugin install logstash-input-jdbc
2)mysql的driver下载https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
2.创建mysql表和数据
create table test.zy (
id int,
str varchar(20)
) ;
insert into test.zy values('1','a1');
insert into test.zy values('2','a2');
insert into test.zy values('3','a3');
insert into test.zy values('4','a4');
insert into test.zy values('5','a5');
insert into test.zy values('6','a6');
insert into test.zy values('7','a7');
insert into test.zy values('8','a8');
insert into test.zy values('9','a9');
insert into test.zy values('10','a10');
insert into test.zy values('11','a11');
insert into test.zy values('12','a12');
insert into test.zy values('13','a13');
insert into test.zy values('14','a14');
#增量采集验证插入的数据
insert into test.zy values('15','a15');
insert into test.zy values('16','a16');
3.logstash采集mysql的配置文件
#根据字段增量采集
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://ip:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
statement => "select * from zy where id > :sql_last_value"
use_column_value => true
tracking_column => "id"
record_last_run => true
last_run_metadata_path => "/root/test.log"
schedule => "*/2 * * * *"
}
}
output {
file {
path => "./mysql/test-%{+YYYY-MM-dd}.txt"
}
}
#根据时间戳增量采集
input {
jdbc {
jdbc_connection_string => "jdbc:mysql://ip:port/test"
jdbc_user => "root"
jdbc_password => "123456"
jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_default_timezone =>"Asia/Shanghai" #设置sql_last_value记录的时间时区,不然会影响增量采集的效果
statement => "select * from zy1 where time > :sql_last_value"
use_column_value => false
record_last_run => true
last_run_metadata_path => "/root/test.log"
schedule => "*/2 * * * *"
}
}
output {
file {
path => "./db2/test-%{+YYYY-MM-dd}.txt"
}
}
input的配置解析
statement 执行myqsl的语句,也可以是statementpath 后跟sql文件的路径
use_column_value
是否使用列值作为依据,进行上次运行位置的记录。
如果设置为true,则使用tracking_column定义的列,作为:sql_last_value.
如果设置为false,则:sql_last_value反映的是上次SQL的运行时间。
tracking_column 增量采集依据的字段名 如果 use_colomn_value为false时,可以不写
record_last_run 是否记录本次采集数据的位置
last_run_metadata_path 设置记录采集数据位置的文件
schedule sql脚本执行的频率
4.采集验证
启动logstash
cd /usr/local/logstash
bin/logstash -f conf/logtash.conf
##logstash 自动加载配置文件 bin/logstash -f conf/logtash.conf --config.reload.automatic
测试logtash是否正常使用
bin/logstash -e 'input { stdin{}} output { output{}}'
测试并退出
bin/logstash -f conf/logtash.conf --config.test_and_exit
logstash采集输出日志:
输出文件内容查看
sql_last_value的记录
对于上文关于通过logstash增量采集mysql的数据,大家觉得是自己想要的吗?如果想要了解更多相关,可以继续关注我们的行业资讯板块。
亿速云「云数据库 MySQL」免部署即开即用,比自行安装部署数据库高出1倍以上的性能,双节点冗余防止单节点故障,数据自动定期备份随时恢复。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。