springboot集成ElasticsearchBboss调用Elasticsearch

一、搭建一个springboot项目

1)、使用的是idea工具,所以直接选中File–>New–>Project–>Spring Assistant–>Next(编写项目相关配置信息) -->Next–>Web–>Spring Web–>Next即可快速创建一个springboot的web项目;

2)、习惯使用yml文件的可以将创建项目的application.properties修改为application.yml文件

二、引入相关依赖

1)、修改maven仓库为自己配置的maven仓库;

2)、引入ElasticsearchBboss相关maven依赖。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.es</groupId><artifactId>es_bboss_web</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>es_bboss_web</name><description>Demo project for Spring Boot</description><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.6.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-elasticsearch-spring-boot-starter</artifactId><version>6.1.0</version><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><!--导入db-elasticsearch数据同步依赖包开始--><dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-elasticsearch-rest-jdbc</artifactId><version>6.1.0</version><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>com.bbossgroups.plugins</groupId><artifactId>bboss-elasticsearch-rest-hbase</artifactId><version>6.1.0</version><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-shaded-client</artifactId><version>2.2.3</version><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>org.xerial</groupId><artifactId>sqlite-jdbc</artifactId><version>3.30.1</version><scope>compile</scope><exclusions><exclusion><artifactId>slf4j-log4j12</artifactId><groupId>org.slf4j</groupId></exclusion></exclusions></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.40</version></dependency><!--导入db-elasticsearch数据同步依赖包结束--></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

三、修改配置文件

配置es连接的相关信息:


下面是官方文档的相关配置信息:

server.port=808
logging.level.org.bboss=INFO
logging.level.bboss=INFO
logging.level.com.frameworkset=INFO
logging.level.org.frameworkset=INFOlogging.level.org.apache=INFO# DSL configuration file hot load scan interval, in milliseconds, 5 seconds scan by default, turn off scan mechanism when <= 0
spring.elasticsearch.bboss.dslfile.refreshInterval = -1
##ES authentication configuration, support for x-pack and searchguard
spring.elasticsearch.bboss.elasticUser=
spring.elasticsearch.bboss.elasticPassword=spring.elasticsearch.bboss.elasticsearch.includeTypeName = false
spring.elasticsearch.bboss.elasticsearch.rest.hostNames=localhost:9200
#spring.elasticsearch.bboss.elasticsearch.rest.hostNames=10.180.211.27:9280,10.180.211.27:9281,10.180.211.27:9282
##HTTPS configuration, add the https:// protocol header
#spring.elasticsearch.bboss.default.elasticsearch.rest.hostNames=https://10.180.211.27:9280,https://10.180.211.27:9281,https://10.180.211.27:9282
spring.elasticsearch.bboss.elasticsearch.dateFormat=yyyy.MM.dd
spring.elasticsearch.bboss.elasticsearch.timeZone=Asia/Shanghai
#Debug switch to output DSL statement on console: showTemplate,false off, true on, log4j at least info level
spring.elasticsearch.bboss.elasticsearch.showTemplate=true
spring.elasticsearch.bboss.elasticsearch.discoverHost=falsespring.elasticsearch.bboss.elasticsearch.sliceScrollThreadCount=20spring.elasticsearch.bboss.elasticsearch.sliceScrollThreadQueue=20
spring.elasticsearch.bboss.elasticsearch.scrollThreadCount=10spring.elasticsearch.bboss.elasticsearch.scrollThreadQueue=10
#spring.elasticsearch.bboss.elasticsearch.slowDslThreshold = 1000
#spring.elasticsearch.bboss.elasticsearch.slowDslCallback=org.bboss.elasticsearchtest.crud.TestSlowDslCallback
##Elasticsearch restclient HTTP connection pool configuration
spring.elasticsearch.bboss.http.timeoutConnection = 5000
spring.elasticsearch.bboss.http.timeoutSocket = 50000
spring.elasticsearch.bboss.http.connectionRequestTimeout=5000
spring.elasticsearch.bboss.http.retryTime = 1
spring.elasticsearch.bboss.http.maxLineLength = -1
spring.elasticsearch.bboss.http.maxHeaderCount = 200
spring.elasticsearch.bboss.http.maxTotal = 4
spring.elasticsearch.bboss.http.defaultMaxPerRoute = 2
spring.elasticsearch.bboss.http.soReuseAddress = false
spring.elasticsearch.bboss.http.soKeepAlive = false
spring.elasticsearch.bboss.http.timeToLive = 3600000
spring.elasticsearch.bboss.http.keepAlive = 3600000
spring.elasticsearch.bboss.http.keystore =
spring.elasticsearch.bboss.http.keyPassword =
# SSL host name validation, whether default configuration is used,
# If the specified as the default, use DefaultHostnameVerifier, otherwise use SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER
spring.elasticsearch.bboss.http.hostnameVerifier =#The idle connection is verified every few milliseconds,
# and the invalid connection is automatically released, with <=0 uncheckedspring.elasticsearch.bboss.http.validateAfterInactivity=2000
# There is a performance overhead to validate connections every time a connection is acquired,
# true:check, false:no checkspring.elasticsearch.bboss.http.staleConnectionCheckEnabled=false
#* Custom retry control interface. Interface methods must be implemented
#* public interface CustomHttpRequestRetryHandler  {
#*  public boolean retryRequest(IOException exception, int executionCount, HttpContext context,ClientConfiguration configuration);
#* }
#* Method returns true and retries false without retrying
spring.elasticsearch.bboss.http.customHttpRequestRetryHandler=org.frameworkset.spi.remote.http.ConnectionResetHttpRequestRetryHandler# 演示数据库数据导入elasticsearch源配置
# ip地址信息库配置
spring.elasticsearch.bboss.ip.cachesize = 2000
# 库下载地址https://dev.maxmind.com/geoip/geoip2/geolite2/
spring.elasticsearch.bboss.ip.database = D:/workspace/hnai/terminal/geolite2/GeoLite2-City.mmdb
spring.elasticsearch.bboss.ip.asnDatabase = D:/workspace/hnai/terminal/geolite2/GeoLite2-ASN.mmdb# 演示数据库数据导入elasticsearch源配置
spring.elasticsearch.bboss.db.name = test
spring.elasticsearch.bboss.db.user = root
spring.elasticsearch.bboss.db.password = 123456
spring.elasticsearch.bboss.db.driver = com.mysql.jdbc.Driver
#db.url = jdbc:mysql://192.168.137.1:3306/test?useCursorFetch=true&useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.elasticsearch.bboss.db.url = jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false
spring.elasticsearch.bboss.db.usePool = falsespring.elasticsearch.bboss.db.initSize=100
spring.elasticsearch.bboss.db.minIdleSize=100
spring.elasticsearch.bboss.db.maxSize=100spring.elasticsearch.bboss.db.validateSQL = select 1
spring.elasticsearch.bboss.db.jdbcFetchSize = 3000
spring.elasticsearch.bboss.db.showsql = true

四、创建一个数据导入的定时任务类

import org.frameworkset.tran.DataStream;
import org.frameworkset.tran.ExportResultHandler;
import org.frameworkset.tran.db.input.es.DB2ESImportBuilder;
import org.frameworkset.tran.metrics.TaskMetrics;
import org.frameworkset.tran.schedule.ImportIncreamentConfig;
import org.frameworkset.tran.task.TaskCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.Date;/*** <p>Description: </p>* <p></p>* <p>Copyright (c) 2018</p>* @Date 2020/1/5 12:03* @author biaoping.yin* @version 1.0*/
@Service
public class DataTran {@AutowiredEnvironment environment;private Logger logger = LoggerFactory.getLogger(DataTran.class);private DB2ESImportBuilder db2ESImportBuilder;private DataStream dataStream;public String stopDB2ESJob(){if(dataStream != null) {synchronized (this) {if (dataStream != null) {dataStream.destroy();dataStream = null;db2ESImportBuilder = null;return "db2ESImport job stopped.";} else {return "db2ESImport job has stopped.";}}}else {return "db2ESImport job has stopped.";}}public  String scheduleDB2ESJob(){if (db2ESImportBuilder == null) {synchronized (this) {if (db2ESImportBuilder == null) {DB2ESImportBuilder importBuilder = DB2ESImportBuilder.newInstance();//数据源相关配置,可选项,可以在外部启动数据源importBuilder.setDbName(environment.getProperty("spring.elasticsearch.bboss.db.name")).setDbDriver(environment.getProperty("spring.elasticsearch.bboss.db.driver")) //数据库驱动程序,必须导入相关数据库的驱动jar包.setJdbcFetchSize(Integer.valueOf(environment.getProperty("spring.elasticsearch.bboss.db.jdbcFetchSize")))//启用mysql stream机制1,设置jdbcfetchsize大小为3000.setDbUrl(environment.getProperty("spring.elasticsearch.bboss.db.url")).setJdbcFetchSize(Integer.MIN_VALUE)//启用mysql stream机制二,设置jdbcfetchsize大小为Integer.MIN_VALUE.setDbUser(environment.getProperty("spring.elasticsearch.bboss.db.user")).setDbPassword(environment.getProperty("spring.elasticsearch.bboss.db.password")).setValidateSQL(environment.getProperty("spring.elasticsearch.bboss.db.validateSQL")).setUsePool(Boolean.valueOf(environment.getProperty("spring.elasticsearch.bboss.db.usePool")));//是否使用连接池importBuilder.setSql("SELECT * FROM activity_cfg_tbl");/*** es相关配置*/importBuilder.setTargetElasticsearch("default");importBuilder.setIndex("activity_cfg_tbl") //必填项.setRefreshOption("refresh")//可选项,null表示不实时刷新,importBuilder.setRefreshOption("refresh");表示实时刷新.setUseJavaName(false) //可选项,将数据库字段名称转换为java驼峰规范的名称,true转换,false不转换,默认false,例如:doc_id -> docId.setUseLowcase(false)  //可选项,true 列名称转小写,false列名称不转换小写,默认false,只要在UseJavaName为false的情况下,配置才起作用.setPrintTaskLog(true) //可选项,true 打印任务执行日志(耗时,处理记录数) false 不打印,默认值false.setBatchSize(100);  //可选项,批量导入es的记录数,默认为-1,逐条处理,> 0时批量处理//定时任务配置,importBuilder.setFixedRate(false)//参考jdk timer task文档对fixedRate的说明.setDeyLay(1000L) // 任务延迟执行deylay毫秒后执行.setPeriod(5000L); //每隔period毫秒执行,如果不设置,只执行一次importBuilder.setFromFirst(true);//任务重启时,重新开始采集数据,true 重新开始,false不重新开始,适合于每次全量导入数据的情况,如果是全量导入,可以先删除原来的索引数据importBuilder.setLastValueStorePath("activity_cfg_tbl");//记录上次采集的增量字段值的文件路径,作为下次增量(或者重启后)采集数据的起点,不同的任务这个路径要不一样importBuilder.setLastValueType(ImportIncreamentConfig.TIMESTAMP_TYPE);//如果没有指定增量查询字段名称,则需要指定字段类型:ImportIncreamentConfig.NUMBER_TYPE 数字类型SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");try {Date date = format.parse("2000-01-01");importBuilder.setLastValue(date);//增量起始值配置} catch (Exception e) {e.printStackTrace();}/*** 一次、作业创建一个内置的线程池,实现多线程并行数据导入elasticsearch功能,作业完毕后关闭线程池*/importBuilder.setParallel(true);//设置为多线程并行批量导入,false串行importBuilder.setQueue(10);//设置批量导入线程池等待队列长度importBuilder.setThreadCount(5);//设置批量导入线程池工作线程数量importBuilder.setContinueOnError(true);//任务出现异常,是否继续执行作业:true(默认值)继续执行 false 中断作业执行importBuilder.setAsyn(false);//true 异步方式执行,不等待所有导入作业任务结束,方法快速返回;false(默认值) 同步方式执行,等待所有导入作业任务结束,所有作业结束后方法才返回
//                    importBuilder.setEsIdField("id");//设置文档主键,不设置,则自动产生文档idimportBuilder.setDebugResponse(false);//设置是否将每次处理的reponse打印到日志文件中,默认falseimportBuilder.setDiscardBulkResponse(false);//设置是否需要批量处理的响应报文,不需要设置为false,true为需要,默认falseimportBuilder.setExportResultHandler(new ExportResultHandler<String, String>() {@Overridepublic void success(TaskCommand<String, String> taskCommand, String result) {TaskMetrics taskMetrics = taskCommand.getTaskMetrics();logger.info(taskMetrics.toString());logger.info(result);}@Overridepublic void error(TaskCommand<String, String> taskCommand, String result) {TaskMetrics taskMetrics = taskCommand.getTaskMetrics();logger.info(taskMetrics.toString());logger.info(result);}@Overridepublic void exception(TaskCommand<String, String> taskCommand, Exception exception) {TaskMetrics taskMetrics = taskCommand.getTaskMetrics();logger.info(taskMetrics.toString());}@Overridepublic int getMaxRetry() {return 0;}});/*** 执行数据库表数据导入es操作*/DataStream dataStream = importBuilder.builder();dataStream.execute();//执行导入操作db2ESImportBuilder = importBuilder;this.dataStream = dataStream;return "db2ESImport job started.";}else{return "db2ESImport job has started.";}}}else{return "db2ESImport job has started.";}}}

五、创建一个Controller类来开启或关闭同步数据的定时任务

import com.es.service.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;@RestController
public class DataTranController {@AutowiredDataTran dataTran;/*** 启动作业* @return*/@RequestMapping("/scheduleDB2ESJob")@ResponseBodypublic String scheduleDB2ESJob(){dataTran.scheduleDB2ESJob();return "db2ESImport job started.";}/*** 停止作业* @return*/@ResponseBody@RequestMapping("/stopDB2ESJob")public String stopDB2ESJob(){dataTran.stopDB2ESJob();return "db2ESImport job stopped.";}}

六、启动项目进行访问http://localhost:808/scheduleDB2ESJob或http://localhost:808/stopDB2ESJob即可开启或关闭定时任务

ElasticsearchBboss MySQL 同步数据到Elasticsearch相关推荐

  1. elastic如何和mysql同步数据_MySQL数据库之mysql 同步数据到 ElasticSearch 的方案

    本文主要向大家介绍了MySQL数据库之mysql 同步数据到 ElasticSearch 的方案 ,通过具体的内容向大家展现,希望对大家学习MySQL数据库有所帮助. MySQL Binlog 要通过 ...

  2. Mysql同步数据到Elasticsearch(实时Canal)

    这里只是作为一个想法,Canal有监听binlog文件的功能.所以简单看了一下Canal的入门使用. 后续Canal实时数据同步的功能希望不会被我阉割......当然有大佬已经实现或者有其他方法实现m ...

  3. datax从mysql同步数据到elasticsearch(使用es的动态模板)

    elasticsearch中设置动态模板 PUT _template/hkey_transferbill { "order":0 //order表示模板的优先级,值越大优先级越高, ...

  4. NodeJS同步MySQL上游数据到ElasticSearch数据库中

    NodeJS同步MySQL上游数据到ElasticSearch数据库中 项目地址: https://github.com/Miazzy/xdata-elasticsearchs-service.git ...

  5. ktl工具实现mysql向mysql同步数据方法

    使用ktl工具实现mysql向mysql同步数据 1.新建作业步骤 2.完善作业步骤,主要是完成作业中的转换工作 3.首先要确定数据来源库,也就是数据输出库,这里是mysql 4.其次要确定数据接收库 ...

  6. MySQL从零到一解读增量同步数据到elasticsearch canal adapter方式(binlog)实现

    本文是作者在单机上面从零到一实现增量同步MySQL数据到elasticsearch canal adapter方式(binlog)实现. 实现步骤 (1)安装MySQL (2)开启MySQL binl ...

  7. 【技术实验】表格存储Tablestore准实时同步数据到Elasticsearch

    实验背景 图书馆Q是一家大型图书馆,图书馆藏书众多,纸质图书600多万册,电子图书7000多万册,总数有八千多万册,这些图书之前都是人工检索维护的,现在需要做一个系统来存储管理这些图书信息. 需求如下 ...

  8. mysql同步数据_实现MySQL数据库数据的同步方法介绍

    做开发的时候要做MySQL的数据库同步,两台安装一样的系统,都是FreeBSD5.4,安装了Apache 2.0.55和PHP 4.4.0,MySQL的版本是4.1.15,都是目前最新的版本. 1.安 ...

  9. Mysql大批量数据导入ElasticSearch

    注:笔者环境 ES6.6.2.linux centos6.9.mysql8.0.三个节点.节点内存64G.八核CPU 场景: 目前Mysql 数据库数据量约10亿,有几张大表1亿左右,直接在Mysql ...

最新文章

  1. C语言SQLite3基本操作Demo
  2. 马云:梭梭树就是企业家精神
  3. 播种数据MVC 6 .NET Core应用程序
  4. php单选框点击取消,取消选中单选框radio的三种方式
  5. 湘苗培优|从入门到精通
  6. 洛谷——P1146 硬币翻转
  7. Facebook 推机器视觉方案,能轻易读懂图片信息
  8. 运算优先级、类型转换
  9. 斗战神 拳猴刷图加点
  10. Centos7+LVS+Keepalived实现Exchange2016高可用性
  11. 关于c语言打印图案的解析,c语言星号打印矩形、三角形、菱形等图案和答案解析.doc...
  12. 人力资源管理:理论与实务第七章
  13. CVPR2020 Rotate-and-Render: Unsupervised Photorealistic Face Rotation from Single-View Images论文笔记
  14. 获取少女资源.html,AI少女资源一般在哪获取比较好?AI少女全地图资源获取地址一览...
  15. 微信小程序 ---在Vscode上编辑,微信开发者工具上预览,快速上手
  16. 一张图看懂阿里云ACK
  17. 第十六章:开发工具-compileall:字节编译源文件-编译单个文件
  18. hud 1560 DNA sequence(IDA* 迭代加深搜索+估值函数)
  19. 非支配排序遗传算法c语言,第三代非支配排序遗传算法(NSGA-III)
  20. 用Floyd算法解决选址问题(附完整matlab代码)

热门文章

  1. Codevs 侦探推理
  2. TiDB 在北京银行交易场景中的应用实践
  3. 旷视研究院张祥雨:3年看1800篇论文,28岁掌舵旷视基础模型研究
  4. angularjs2学习教程
  5. html5源码笔记【爱创课堂专业前端培训】
  6. 牛客网-《刷C语言百题》第三期
  7. P1551 亲戚 并查集
  8. android模拟器设置静态ip,静态IP地址版EVE模拟器部署和使用说明
  9. EVE-NG模拟器教程(三)——Lab平台初探
  10. 错误跳转html页面模板,404错误页面模板代码大全 - 搜外SEO问答