测试环境:centos7.6,mysql5.5.1,jdk1.8,elasticsearch6.6.0,logstash-6.6.0

elasticsearch 安装请参见 elasticsearch安装

本来网上查看资料用elasticsearch-jdbc的比较多,但是版本支持太低,然后就选择了logstash,选择和elasticsearch对应的版本,logstash下载地址。下文中的elasticsearch使用es代替。

安装logstash

  1. cd /usr/local/src
  2. wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz
  3. tar -xvf logstash-6.6.0.tar.gz
  4. cd logstash-6.6.0

安装logstash-input-jdbc插件

  1. [root@localhost logstash-6.6.0]# ./bin/logstash-plugin install logstash-input-jdbc
  2. Validating logstash-input-jdbc
  3. Installing logstash-input-jdbc
  4. Installation successful

安装mysql的jdbc驱动

下载地址:https://dev.mysql.com/downloads/connector/j

  1. cd config
  2. wget https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-5.1.47.tar.gz
  3. tar -xvf mysql-connector-java-5.1.47.tar.gz
  4. # 复制驱动到config目录下
  5. cp -b mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar mysql-connector-java-5.1.47.jar

创建配置文件

我们需要把test数据库中test表的数据同步到es的demo索引中。创建配置文件 jdbc.conf,内容如下:

  1. input {
  2. jdbc {
  3. # mysql相关jdbc配置 test是数据库名
  4. jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
  5. jdbc_user => "root"
  6. jdbc_password => "123456"
  7. # jdbc连接mysql驱动的文件地址
  8. jdbc_driver_library => "/usr/local/src/logstash-6.6.0/config/mysql-connector-java-5.1.47.jar"
  9. jdbc_driver_class => "com.mysql.jdbc.Driver"
  10. jdbc_paging_enabled => "true"
  11. jdbc_page_size => "50000"
  12. jdbc_default_timezone => "Asia/Shanghai"
  13. # mysql语句,也可以写到sql文件里面
  14. statement => "select * from test"
  15. #statement_filepath => "/usr/local/src/logstash-6.6.0/config/jdbc.sql"
  16. # 这里类似crontab,可以定制定时操作,默认每分钟执行一次同步(分 时 天 月 年)
  17. schedule => "* * * * *"
  18. # es的索引类型
  19. type => "jdbc"
  20. }
  21. }
  22. output {
  23. elasticsearch {
  24. # es的ip地址和端口
  25. hosts => "127.0.0.1:9200"
  26. # 索引名称
  27. index => "demo"
  28. #自增ID编号
  29. document_id => "%{id}"
  30. }
  31. # 输出调试,正式运行时可以注释掉 以JSON格式输出
  32. stdout {
  33. codec => json_lines
  34. }
  35. }

test表结构数据如下:

  1. DROP TABLE IF EXISTS `test`;
  2. CREATE TABLE `test` (
  3. `id` int(11) NOT NULL AUTO_INCREMENT,
  4. `num` int(11) DEFAULT NULL,
  5. `text` text,
  6. PRIMARY KEY (`id`)
  7. ) ENGINE=MyISAM AUTO_INCREMENT=11 DEFAULT CHARSET=utf8;
  8. INSERT INTO `test` VALUES ('1', '1234', 'php是世界上最好的语言');
  9. INSERT INTO `test` VALUES ('2', '2345', 'javascript也是最好的语言');
  10. INSERT INTO `test` VALUES ('3', '3456', 'golang也是一门语言');
  11. INSERT INTO `test` VALUES ('4', '4567', '欢迎来到codehi.net');
  12. INSERT INTO `test` VALUES ('5', '5678', 'codehui主要是分享技术');
  13. INSERT INTO `test` VALUES ('6', '6789', '每一种语言都是很好');
  14. INSERT INTO `test` VALUES ('7', '7890', 'php,javascript,codehui,golang等');
  15. INSERT INTO `test` VALUES ('8', '8901', '学习php');
  16. INSERT INTO `test` VALUES ('9', '9012', '学习golang');
  17. INSERT INTO `test` VALUES ('10', '1012', '今天你学习了吗?');

开始同步数据

使用logstash执行刚才创建的配置文件,会显示一大推内容,只要没报错停止,等待一会数据就会同步至es中,然后进入每一分钟就会同步一次test表中的数据。

  1. [root@localhost logstash-6.6.0]# ./bin/logstash -f ./config/jdbc.conf

数据同步完成就可以在es中进行查看。

查看数据

可以使用 ip:端口/demo/_search 的形式在浏览器中查看,也可以使用curl请求查看es中的数据,可以看到mysql中的10条数据都导入到了es中。

  1. [root@localhost ~]# curl 127.0.0.1:9200/demo/_search
  2. {"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":10,"max_score":1.0,"hits":[{"_index":"demo","_type":"doc","_id":"8","_score":1.0,"_source":{"type":"jdbc","num":8901,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":8,"text":"学习php"}},{"_index":"demo","_type":"doc","_id":"10","_score":1.0,"_source":{"type":"jdbc","num":1012,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":10,"text":"今天你学习了吗?"}},{"_index":"demo","_type":"doc","_id":"5","_score":1.0,"_source":{"type":"jdbc","num":5678,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":5,"text":"codehui主要是分享技术"}},{"_index":"demo","_type":"doc","_id":"9","_score":1.0,"_source":{"type":"jdbc","num":9012,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":9,"text":"学习golang"}},{"_index":"demo","_type":"doc","_id":"4","_score":1.0,"_source":{"type":"jdbc","num":4567,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":4,"text":"欢迎来到codehi.net"}},{"_index":"demo","_type":"doc","_id":"2","_score":1.0,"_source":{"type":"jdbc","num":2345,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":2,"text":"javascript也是最好的语言"}},{"_index":"demo","_type":"doc","_id":"6","_score":1.0,"_source":{"type":"jdbc","num":6789,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":6,"text":"每一种语言都是很好"}},{"_index":"demo","_type":"doc","_id":"7","_score":1.0,"_source":{"type":"jdbc","num":7890,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":7,"text":"php,javascript,codehui,golang等"}},{"_index":"demo","_type":"doc","_id":"1","_score":1.0,"_source":{"type":"jdbc","num":1234,"@version":"1","@timestamp":"2019-02-26T07:26:00.045Z","id":1,"text":"php是世界上最好的语言"}},{"_index":"demo","_type":"doc","_id":"3","_score":1.0,"_source":{"type":"jdbc","num":3456,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":3,"text":"golang也是一门语言"}}]}}

测试数据同步

刚才配置jdbc.conf文件中,schedule参数默认是1分钟同步一次。我们先查询一下demo索引中包含codehui的数据,并使用python进行格式化,看到了两条数据如下

  1. [root@localhost ~]# curl 127.0.0.1:9200/demo/_search?q=codehui | python -m json.tool
  2. % Total % Received % Xferd Average Speed Time Time Time Current
  3. Dload Upload Total Spent Left Speed
  4. 100 543 100 543 0 0 84737 0 --:--:-- --:--:-- --:--:-- 90500
  5. {
  6. "_shards": {
  7. "failed": 0,
  8. "skipped": 0,
  9. "successful": 5,
  10. "total": 5
  11. },
  12. "hits": {
  13. "hits": [
  14. {
  15. "_id": "5",
  16. "_index": "demo",
  17. "_score": 0.991507,
  18. "_source": {
  19. "@timestamp": "2019-02-26T07:32:00.112Z",
  20. "@version": "1",
  21. "id": 5,
  22. "num": 5678,
  23. "text": "codehui主要是分享技术",
  24. "type": "jdbc"
  25. },
  26. "_type": "doc"
  27. },
  28. {
  29. "_id": "7",
  30. "_index": "demo",
  31. "_score": 0.80259144,
  32. "_source": {
  33. "@timestamp": "2019-02-26T07:32:00.112Z",
  34. "@version": "1",
  35. "id": 7,
  36. "num": 7890,
  37. "text": "php,javascript,codehui,golang等",
  38. "type": "jdbc"
  39. },
  40. "_type": "doc"
  41. }
  42. ],
  43. "max_score": 0.991507,
  44. "total": 2
  45. },
  46. "timed_out": false,
  47. "took": 3
  48. }

然后我们在数据库test中,修改一条数据

UPDATE test SET text='php,javascript,golang等' WHERE ('id'='7')

等待一会数据同步过去然后在进行搜索,只有了一条数据。

  1. [root@localhost ~]# curl 127.0.0.1:9200/demo/_search?q=codehui | python -m json.tool
  2. % Total % Received % Xferd Average Speed Time Time Time Current
  3. Dload Upload Total Spent Left Speed
  4. 100 335 100 335 0 0 5271 0 --:--:-- --:--:-- --:--:-- 5317
  5. {
  6. "_shards": {
  7. "failed": 0,
  8. "skipped": 0,
  9. "successful": 5,
  10. "total": 5
  11. },
  12. "hits": {
  13. "hits": [
  14. {
  15. "_id": "5",
  16. "_index": "demo",
  17. "_score": 0.991507,
  18. "_source": {
  19. "@timestamp": "2019-02-26T07:40:00.313Z",
  20. "@version": "1",
  21. "id": 5,
  22. "num": 5678,
  23. "text": "codehui主要是分享技术",
  24. "type": "jdbc"
  25. },
  26. "_type": "doc"
  27. }
  28. ],
  29. "max_score": 0.991507,
  30. "total": 1
  31. },
  32. "timed_out": false,
  33. "took": 27
  34. }

目前数据同步算是基本成功了,下面更新会继续介绍:

敬请关注。