部署 MySQL -> ES 数据同步

(mysql 同步到 ES 是支持 多表查询 后把结果同步到ES 中的同一个索引中的)

1.服务器:

内网ip:192.168.0.60
登录name+key
实现功能: canal、kafka、es7;canal同步预生产polardb数据到es7

2.ES7 kafka服务

es web 管理url:
http://[internet ip]:9800
es:
公网: [internet ip] 9201
内网: 192.168.0.60 9201

kafka:
kafka-manage
http://[internet ip]:9001

kafka:
公网: [internet ip] 9092
内网: 192.168.0.60 9092
zookeeper:
192.168.0.60 2181

##3.ES 同步相关文件目录如下:

见文章最后

服务器部署列表:

application ip: port install dir user/psd
mysql [mysql_server_ip]:3306 rds canal / 123456
zookeeper 192.168.0.60:2181 /opt/app/zookeeper-3.4.12 #zookeeper
canal.deployer 192.168.0.60:1111 /opt/app/canal.deployer #canal
canal.adapter 192.168.0.60:8081 /opt/app/canal.adapter #canal
ES 192.168.0.60:9201 DOCKER es
kafka 192.168.0.60:9092 DOCKER kafka

1.安装zookeeper.

配置文件:
vi conf/zoo.cfg
主要参数:initLimit=10syncLimit=5clientPort=2181dataDir=/opt/app/zookeeper-3.4.12/datavi conf/log4j.properties #日志类配置zookeeper.log.dir=.zookeeper.log.file=zookeeper.logzookeeper.log.threshold=DEBUGzookeeper.tracelog.dir=.zookeeper.tracelog.file=zookeeper_trace.log启动zookeeper# ./zkServer.sh start ../conf/zoo.cfgZooKeeper JMX enabled by defaultUsing config: ../conf/zoo.cfgStarting zookeeper ... STARTED在zookeeper 中查看同步canal 的信息:zkCli.sh -server localhost:2181ls /otterget /otter/canal/destinations/crm_canal/2/cursor

2.安装配置canal.deployer

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gzcanal,adapter 最好下载同版本。adapter1.1.5 插件需要更新为:client-adapter.es7x-1.1.5-jar-with-dependencies.jar
相关说明如下: canal 1.1.5本身有个Bug存在(https://github.com/alibaba/canal/issues/3636),需要手动替换一个插件mkdir /opt/app/canal.adapter /opt/app/canal.deployer
tar zxvf /opt/download/canal.deployer-1.1.5.tar.gz -C /opt/app/canal.deployer
tar zxvf /opt/download/canal.adapter-1.1.5.tar.gz -C /opt/app/canal.adapter

2.1 canal deployer 配置文件主要参数:

vi conf/canal.properties #   canal.id = 1   #如果是集群,编号要不相同canal.ip = 192.168.0.60    #本地IPcanal.port = 11111    #端口canal.metrics.pull.port = 11112canal.zkServers = 192.168.0.60  #zookeeper 服务器,这里是本地canal.serverMode = kafka    #同步方式,kafka 数据流的方式canal.destinations = shop     #同步后的目的地名称(在mq中可以查看到)kafka.bootstrap.servers = 192.168.0.60:9092

2.1 canal instance 配置文件主要参数:

cd /opt/app/canal.deplyer/conf
cp example/instance.properties ./shop/
vi shop/instance.properties

      # enable gtid use true/falsecanal.instance.gtidon=true# position infocanal.instance.master.address=192.168.0.25:3306canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp=canal.instance.master.gtid=truecanal.instance.dbUsername=canalcanal.instance.dbPassword=123456canal.instance.connectionCharset = UTF-8# mq configcanal.mq.topic=SYNC_ES_shop# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0

mysql数据库相关配置

canal.instance.master.address=mysql_server_ip:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword= [密码查看其它文件]
待同步数据表
canal.instance.filter.regex=shop.tb_building,shop.tb_article,shop.tb_home_news,shop.tb_home_store_product,shop.
tb_travel_product
指定topic
canal.mq.topic=SYNC_ES_SHOP

2.2 adapter 参数配置

 vi conf/application.yml

canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:127.0.0.1:2181
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:

  srcDataSources:defaultDS:url: jdbc:mysql://mysql_server_ip:3306/mytest?useUnicode=trueusername: sync_user password: 123456canalAdapters:- instance: SYNC_ES_shop # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: es7key: exampleKey#hosts: localhost:9201 # 127.0.0.1:9200 for rest mode ~~#**注意: 此方式无法访问ES   ERROR: Illegal character in scheme name at index 0: 127.0.0.1:9201****~~hosts: http://192.168.0.60:9201 # 127.0.0.1:9200 for rest modeproperties:mode: rest # or restcluster.name: docker-cluster
3.2 开启实时同步、全量同步数据到ES

1.启动canal-server
2.启动canal-adapter

3.建立ES 索引

 cd /opt/json/es_index_json_file/curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_home_store_product?include_type_name=true -d  "@tb_home_store_product.json"curl -XPUT -H "Content-Type: application/json" http://127.0.0.1:9201/tb_travel_product?include_type_name=true -d  "@tb_travel_product.json"#查看ES中的索引curl -XGET http://127.0.0.1:9201/_cat/indices?v#删除索引#curl -XDELETE http://127.0.0.1:9201/tb_building

4.全量同步数据到ES

curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_home_store_product.yml
curl -X POST http://127.0.0.1:8081/etl/es7/exampleKey/tb_travel_product.yml

ES web 查询 页面:

http://[internet ip]:9800/
连接内部 ESDH IP: http://192.168.0.60:9201

#以下文件内容在进行同步时,拷贝另存为对应的文件即可

****************************************** tb_home_store_product.json file ***************************************************************

tb_home_store_product.json file


{"mappings":{"home_store_product_doc":{"properties":{"id": {"type": "integer"},"store_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"store_info": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"content": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"mark_price": {"type": "double"},"ot_price": {"type": "double"},"sales": {"type": "long"},"ficti": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"}
}
}
}
}

****************************************** tb_home_store_product.yml file ***************************************************************

dataSourceKey: defaultDS
destination: SYNC_ES_SHOP_TEST
outerAdapterKey: exampleKey
groupId: g1
esMapping:_index: tb_home_store_product_id: idsql: "select id,store_name,store_info,content,mark_price,ot_price,sales,ficti,is_show,del_flag,img_size,image,create_by,create_time,update_by,update_time
from tb_home_store_product t"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000

****************************************** tb_travel_product.json file ***************************************************************

{"mappings":{"travel_product_doc":{"properties":{"id": {"type": "integer"},"on_sale": {"type": "integer"},"product_name": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_shortname": {"type": "text","analyzer":"ik_max_word","search_analyzer":"ik_max_word"},"product_price": {"type": "double"},"product_img": {"type": "keyword"},"sale_count": {"type": "long"},"create_by": {"type": "keyword"},"create_time": {"type": "date"},"update_by": {"type": "keyword"},"update_time": {"type": "date"}
}
}
}
}

************************************************** tb_travel_product.yml file *******************************************************

###说明:这里可以看出,这个查询是2个表join 后的结果,说明是可以2个表导出到ES 为一个索引的。

dataSourceKey: defaultDS
destination: SYNC_ES_SHOP_TEST
outerAdapterKey: exampleKey
groupId: g1
esMapping:_index: tb_travel_product_id: idsql: "SELECT t.id as id,t.product_no, t.on_sale, t.product_name, t.product_shortname,t.product_price, i. product_img,       t.sale_count,   t.create_by,    t.create_time,  t.update_by,    t.update_time from      tb_travel_product t left join tb_travel_product_info i on i.product_no = t.product_no"etlCondition: "where t.id>={0} and t.id<={1}"commitBatch: 3000

MySQL -> ES 数据同步 配置步骤相关推荐

  1. 基于Canal的MySQL=>ES数据同步方案

    文章目录 1.MySQL和ES的主要区别? 1.1 功能性 1.2 性能指标 1.3 在搜索业务上的区别 1.3.1 查询 1.3.2 检索 2.为什么要做数据同步 2.1 检索性能 2.2 写入性能 ...

  2. mysql主主同步配置_MySQL 主主同步配置步骤

    MySQL 主主同步配置 服务器名 IP 系统 MySQL odd.example.com 192.168.1.116 rhel-5.8 5.5.16 even.example.com 192.168 ...

  3. Linux下mysql支持中文,linux下mysql环境支持中文配置步骤

    sql脚本执行前加上: CREATE DATABASE IF NOT EXISTS mydatabase DEFAULT CHARSET utf8 COLLATE UTF8_GENERAL_CI; u ...

  4. ES和MYSQL实现数据同步

    elasticsearch中的酒店数据来自于mysql数据库,因此mysql数据发生改变时,elasticsearch也必须跟着改变,这个就是elasticsearch与mysql之间的数据同步. 1 ...

  5. mysql 与 es 数据同步常见方案

    mysql 与 es 数据同步常见方案 说明 @author JellyfishMIX - github / blog.jellyfishmix.com LICENSE GPL-2.0 问题背景 最近 ...

  6. MySQL 到 MySQL 实时数据同步实操分享

    摘要:很多 DBA 和开发同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.最近了解到一款实时数据同步工具 Tapdata C ...

  7. Linux下MySQL数据库主从同步配置

    操作系统:CentOS 6.x 64位 MySQL数据库版本:mysql-5.5.35 MySQL主服务器:192.168.21.128 MySQL从服务器:192.168.21.129 准备篇: 说 ...

  8. mysql goldengate_Goldengate完成Mysql到Mysql的数据同步

    文档参考地址:http://blog.csdn.net/u010587433/article/details/49305019 需求: 使用Goldengate完成Mysql到Mysql的数据同步,源 ...

  9. mysql ogg kafka_Oracle goldengate 实现mysql到kafka同步配置

    一.oracle goldengate技术架构 Oracle GoldenGate 实现原理是通过抽取源端的redo log 或者 archive log ,然后通过TCP/IP投递到目标端,最后解析 ...

最新文章

  1. 干货丨人工智能、机器学习和认知计算入门指南
  2. go 调用dll char*传输
  3. Android 使用OpenCV的三种方式(Android Studio)
  4. [组合数]求组合数的几种方法总结
  5. 连号区间数(2013年第四届c/c++ b组第10题)
  6. 解决IE6、IE7、Firefox兼容最简单的CSS Hack
  7. 命运2服务器维护时间2019,《命运2》今晚将停机维护 为多平台共用存档做准备...
  8. php输出多行多列,数据库查询记录php 多行多列显示
  9. linux使用rename批量修改文件扩展名
  10. 为什么事业单位公务员的公积金比企业里要缴得多?
  11. Config文件的使用:通过程序修改Config文件
  12. Get value from agent failed: cannot connect to [[192.168.186.130]:10050]: [113]No route to host
  13. visual studio 2013 快速安全ocx(ActiveX控件)开发
  14. 软件无法安装时怎么办
  15. 计算机系统运行费,关于同意计算机离港系统实行收费的批复
  16. 运维必备——Zabbix监控系统
  17. 初学者入门阿里云haas510开板式DTU(2.0版本)--510-AS
  18. Cg语言学习笔记(1)
  19. 【TensorFlow学习笔记】完美解决 pip3 install tensorflow 没有models库,读取PTB数据
  20. 亿级万物互联新时代的物联网消息中间件EMQX调研

热门文章

  1. 如何查看自己Oracle的版本
  2. Android -ui控件
  3. SqlServer 算法 :Nested Loops Join(嵌套连接)
  4. php system()和exec()差别
  5. Jmeter连接SqlServer数据库进行压力测试
  6. React和设计良好的jQuery插件并没有什么不兼容的问题。
  7. 关于字符串排序的别的规则
  8. 数据结构与算法分析资源总结
  9. thinkphp验证码功能
  10. JavaScript对Json的增删改属性