MySQL同步数据至elasticsearch
作者:快盘下载 人气:经过各种测试本文采用binlog方式 中间件选用go_mysql_elasticsearch
使用docker方式部署(镜像已经制作好了上传至阿里云)
docker run -it -v /root/mysql.toml:/mysql-river-es5.toml -v /mysqlmaster/:/go/src/github.com/siddontang/go-mysql-elasticsearch/var registry.cn-hangzhou.aliyuncs.com/yanfulei/gomysqles5:5.5.3
docker run -it -v /root/mysql.toml:/mysql-river-es5.toml -v /mysqlmaster/:/go/src/github.com/siddontang/go-mysql-elasticsearch/var registry.cn-hangzhou.aliyuncs.com/yanfulei/gomysqles5:5.5.3
挂载文件/root/mysql.toml为go_mysql_elasticsearch的配置文件,内容直接贴出
# MySQL address, user and password # user must have replication privilege in MySQL. my_addr = "xx.xx.xx.xxx:3306" my_user = "root" my_pass = "xxxxx" my_charset = "utf8" # Set true when elasticsearch use https #es_https = false # Elasticsearch address es_addr = "xx.xx.xxx.xxxx:9200" # Elasticsearch user and password, maybe set by shield, nginx, or x-pack es_user = "elastic" es_pass = "changeme" # Path to store data, like master.info, if not set or empty, # we must use this to support breakpoint resume syncing. # TODO: support other storage, like etcd. data_dir = "./var" # Inner Http status address stat_addr = "127.0.0.1:12800" # pseudo server id like a slave server_id = 1001 # mysql or mariadb flavor = "mysql" # mysqldump execution path # if not set or empty, ignore mysqldump. mysqldump = "mysqldump" # if we have no privilege to use mysqldump with --master-data, # we must skip it. #skip_master_data = false # minimal items to be inserted in one bulk bulk_size = 128 # force flush the pending requests if we don't have enough items >= bulk_size flush_bulk_time = "200ms" # Ignore table without primary key skip_no_pk_table = false # MySQL data source [[source]] schema = "smh_orders" # Only below tables will be synced into Elasticsearch. # "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023 # I don't think it is necessary to sync all tables in a database. tables = ["cl_cashlog", "cl_cashlogdetails", "od_orderdiscount", "od_orderexpress", "od_orderitems", "od_orders", "py_paymethod", "py_payparam", "py_storepaymethod", "py_weapppayparam"] # Below is for special rule mapping # Very simple example # # desc t; # +-------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +-------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | name | varchar(256) | YES | | NULL | | # +-------+--------------+------+-----+---------+-------+ # # The table `t` will be synced to ES index `test` and type `t`. #[[rule]] #schema = "test" #table = "t" #index = "test" #type = "t" # Wildcard table rule, the wildcard table must be in source tables # All tables which match the wildcard format will be synced to ES index `test` and type `t`. # In this example, all tables must have same schema with above table `t`; #[[rule]] #schema = "test" #table = "t_[0-9]{4}" #index = "test" #type = "t" # Simple field rule # # desc tfield; # +----------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +----------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | tags | varchar(256) | YES | | NULL | | # | keywords | varchar(256) | YES | | NULL | | # +----------+--------------+------+-----+---------+-------+ # #[[rule]] #schema = "test" ##table = "tfield" #index = "test" #type = "tfield" #[rule.field] # Map column `id` to ES field `es_id` #id="es_id" # Map column `tags` to ES field `es_tags` with array type #tags="es_tags,list" # Map column `keywords` to ES with array type #keywords=",list" # Filter rule # # desc tfilter; # +-------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +-------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | c1 | int(11) | YES | | 0 | | # | c2 | int(11) | YES | | 0 | | # | name | varchar(256) | YES | | NULL | | # +-------+--------------+------+-----+---------+-------+ # #[[rule]] #schema = "test" #table = "tfilter" #index = "test" #type = "tfilter" # Only sync following columns #filter = ["id", "name"] # id rule # # desc tid_[0-9]{4}; # +----------+--------------+------+-----+---------+-------+ # | Field | Type | Null | Key | Default | Extra | # +----------+--------------+------+-----+---------+-------+ # | id | int(11) | NO | PRI | NULL | | # | tag | varchar(256) | YES | | NULL | | # | desc | varchar(256) | YES | | NULL | | # +----------+--------------+------+-----+---------+-------+ # #[[rule]] #schema = "test" #table = "tid_[0-9]{4}" #index = "test" #type = "t" # The es doc's id will be `id`:`tag` # It is useful for merge muliple table into one type while theses tables have same PK #id = ["id", "tag"]
挂载文件夹/mysqlmaster/:/go/src/github.com/siddontang/go-mysql-elasticsearch/var为重启后上次同步信息备份 完成
加载全部内容