测试环境:centos7.6
,mysql5.5.1
,jdk1.8
,elasticsearch6.6.0
,logstash-6.6.0
elasticsearch 安装请参见 elasticsearch安装
本来网上查看资料用elasticsearch-jdbc
的比较多,但是版本支持太低,然后就选择了logstash
,选择和elasticsearch对应的版本,logstash下载地址。下文中的elasticsearch使用es代替。
安装logstash
cd /usr/local/src
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.6.0.tar.gz
tar -xvf logstash-6.6.0.tar.gz
cd logstash-6.6.0
安装logstash-input-jdbc插件
[root@localhost logstash-6.6.0]# ./bin/logstash-plugin install logstash-input-jdbc
Validating logstash-input-jdbc
Installing logstash-input-jdbc
Installation successful
安装mysql的jdbc驱动
下载地址:https://dev.mysql.com/downloads/connector/j
cd config
wget https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-5.1.47.tar.gz
tar -xvf mysql-connector-java-5.1.47.tar.gz
# 复制驱动到config目录下
cp -b mysql-connector-java-5.1.47/mysql-connector-java-5.1.47.jar mysql-connector-java-5.1.47.jar
创建配置文件
我们需要把test数据库中test表的数据同步到es的demo索引中。创建配置文件 jdbc.conf,内容如下:
input {
jdbc {
# mysql相关jdbc配置 test是数据库名
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/test"
jdbc_user => "root"
jdbc_password => "123456"
# jdbc连接mysql驱动的文件地址
jdbc_driver_library => "/usr/local/src/logstash-6.6.0/config/mysql-connector-java-5.1.47.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
jdbc_default_timezone => "Asia/Shanghai"
# mysql语句,也可以写到sql文件里面
statement => "select * from test"
#statement_filepath => "/usr/local/src/logstash-6.6.0/config/jdbc.sql"
# 这里类似crontab,可以定制定时操作,默认每分钟执行一次同步(分 时 天 月 年)
schedule => "* * * * *"
# es的索引类型
type => "jdbc"
}
}
output {
elasticsearch {
# es的ip地址和端口
hosts => "127.0.0.1:9200"
# 索引名称
index => "demo"
#自增ID编号
document_id => "%{id}"
}
# 输出调试,正式运行时可以注释掉 以JSON格式输出
stdout {
codec => json_lines
}
}
test表结构数据如下:
DROP TABLE IF EXISTS `test`;
CREATE TABLE `test` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`num` int(11) DEFAULT NULL,
`text` text,
PRIMARY KEY (`id`)
) ENGINE=MyISAM AUTO_INCREMENT=11 DEFAULT CHARSET=utf8;
INSERT INTO `test` VALUES ('1', '1234', 'php是世界上最好的语言');
INSERT INTO `test` VALUES ('2', '2345', 'javascript也是最好的语言');
INSERT INTO `test` VALUES ('3', '3456', 'golang也是一门语言');
INSERT INTO `test` VALUES ('4', '4567', '欢迎来到codehi.net');
INSERT INTO `test` VALUES ('5', '5678', 'codehui主要是分享技术');
INSERT INTO `test` VALUES ('6', '6789', '每一种语言都是很好');
INSERT INTO `test` VALUES ('7', '7890', 'php,javascript,codehui,golang等');
INSERT INTO `test` VALUES ('8', '8901', '学习php');
INSERT INTO `test` VALUES ('9', '9012', '学习golang');
INSERT INTO `test` VALUES ('10', '1012', '今天你学习了吗?');
开始同步数据
使用logstash执行刚才创建的配置文件,会显示一大推内容,只要没报错停止,等待一会数据就会同步至es中,然后进入每一分钟就会同步一次test表中的数据。
[root@localhost logstash-6.6.0]# ./bin/logstash -f ./config/jdbc.conf
数据同步完成就可以在es中进行查看。
查看数据
可以使用 ip:端口/demo/_search
的形式在浏览器中查看,也可以使用curl请求查看es中的数据,可以看到mysql中的10条数据都导入到了es中。
[root@localhost ~]# curl 127.0.0.1:9200/demo/_search
{"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"skipped":0,"failed":0},"hits":{"total":10,"max_score":1.0,"hits":[{"_index":"demo","_type":"doc","_id":"8","_score":1.0,"_source":{"type":"jdbc","num":8901,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":8,"text":"学习php"}},{"_index":"demo","_type":"doc","_id":"10","_score":1.0,"_source":{"type":"jdbc","num":1012,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":10,"text":"今天你学习了吗?"}},{"_index":"demo","_type":"doc","_id":"5","_score":1.0,"_source":{"type":"jdbc","num":5678,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":5,"text":"codehui主要是分享技术"}},{"_index":"demo","_type":"doc","_id":"9","_score":1.0,"_source":{"type":"jdbc","num":9012,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":9,"text":"学习golang"}},{"_index":"demo","_type":"doc","_id":"4","_score":1.0,"_source":{"type":"jdbc","num":4567,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":4,"text":"欢迎来到codehi.net"}},{"_index":"demo","_type":"doc","_id":"2","_score":1.0,"_source":{"type":"jdbc","num":2345,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":2,"text":"javascript也是最好的语言"}},{"_index":"demo","_type":"doc","_id":"6","_score":1.0,"_source":{"type":"jdbc","num":6789,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":6,"text":"每一种语言都是很好"}},{"_index":"demo","_type":"doc","_id":"7","_score":1.0,"_source":{"type":"jdbc","num":7890,"@version":"1","@timestamp":"2019-02-26T07:26:00.047Z","id":7,"text":"php,javascript,codehui,golang等"}},{"_index":"demo","_type":"doc","_id":"1","_score":1.0,"_source":{"type":"jdbc","num":1234,"@version":"1","@timestamp":"2019-02-26T07:26:00.045Z","id":1,"text":"php是世界上最好的语言"}},{"_index":"demo","_type":"doc","_id":"3","_score":1.0,"_source":{"type":"jdbc","num":3456,"@version":"1","@timestamp":"2019-02-26T07:26:00.046Z","id":3,"text":"golang也是一门语言"}}]}}
测试数据同步
刚才配置jdbc.conf文件中,schedule参数默认是1分钟同步一次。我们先查询一下demo索引中包含codehui的数据,并使用python进行格式化,看到了两条数据如下
[root@localhost ~]# curl 127.0.0.1:9200/demo/_search?q=codehui | python -m json.tool
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 543 100 543 0 0 84737 0 --:--:-- --:--:-- --:--:-- 90500
{
"_shards": {
"failed": 0,
"skipped": 0,
"successful": 5,
"total": 5
},
"hits": {
"hits": [
{
"_id": "5",
"_index": "demo",
"_score": 0.991507,
"_source": {
"@timestamp": "2019-02-26T07:32:00.112Z",
"@version": "1",
"id": 5,
"num": 5678,
"text": "codehui主要是分享技术",
"type": "jdbc"
},
"_type": "doc"
},
{
"_id": "7",
"_index": "demo",
"_score": 0.80259144,
"_source": {
"@timestamp": "2019-02-26T07:32:00.112Z",
"@version": "1",
"id": 7,
"num": 7890,
"text": "php,javascript,codehui,golang等",
"type": "jdbc"
},
"_type": "doc"
}
],
"max_score": 0.991507,
"total": 2
},
"timed_out": false,
"took": 3
}
然后我们在数据库test中,修改一条数据
UPDATE test SET text='php,javascript,golang等' WHERE ('id'='7')
等待一会数据同步过去然后在进行搜索,只有了一条数据。
[root@localhost ~]# curl 127.0.0.1:9200/demo/_search?q=codehui | python -m json.tool
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 335 100 335 0 0 5271 0 --:--:-- --:--:-- --:--:-- 5317
{
"_shards": {
"failed": 0,
"skipped": 0,
"successful": 5,
"total": 5
},
"hits": {
"hits": [
{
"_id": "5",
"_index": "demo",
"_score": 0.991507,
"_source": {
"@timestamp": "2019-02-26T07:40:00.313Z",
"@version": "1",
"id": 5,
"num": 5678,
"text": "codehui主要是分享技术",
"type": "jdbc"
},
"_type": "doc"
}
],
"max_score": 0.991507,
"total": 1
},
"timed_out": false,
"took": 27
}
目前数据同步算是基本成功了,下面更新会继续介绍:
- 《elasticsearch在php中的使用》
- 《elasticsearch搜索中文分词ik插件的安装以及使用》
敬请关注。