Datax安装及基本使用请查看上一篇文章:

文章目录

  • Datax概述
    • 1.概述
    • 2.功能清单
    • 3.==说明==:本项目只支持mysql及hbase之间的数据同步
  • 代码模块
    • 配置文件
    • pom.xml
    • DataxDolphinschedulerController
    • ConfigAddForm
    • ConfigUpdateForm
    • ProcessDto
    • SyncConfigDto
    • SyncConfigService
    • SyncConfigServiceImpl
    • SyncConfigMapper
    • SyncConfigSqlProvider
    • JdbcDataSourceService
    • JdbcDataSourceServiceImpl
    • DataSourceMapper
    • DataSourceSqlProvider
    • MetaDataSourceMapper
    • DataSourceSelectForm
    • DataSourceAddForm
    • DataSourceUpdateForm
    • YAPI测试用例
      • 5.1查询全部同步任务配置(分页)
      • 5.2 创建同步任务配置-mysql->mysql
      • 5.6 创建同步任务配置-hbase->hbase
      • 5.7 创建同步任务配置-mysql->hbase
      • 5.8 创建同步任务配置-hbase->mysql
      • 5.3 更新同步任务配置
      • 5.5 删除同步任务配置
      • 5.4 查询同步任务配置
      • 3.3 执行数据同步任务
      • 3.4 停止数据同步任务
  • 三、本人相关其他文章链接

Datax概述

1.概述

2.功能清单

功能清单
CRUD增删改查 、启动任务、停止任务

3.说明:本项目只支持mysql及hbase之间的数据同步

代码模块

配置文件

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.geespace.microservices.bd-platform</groupId><artifactId>all</artifactId><version>1.0-SNAPSHOT</version></parent><artifactId>data-sync-config</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>1.8</java.version><gson.version>2.8.1</gson.version></properties><dependencies><dependency><groupId>com.github.pagehelper</groupId><artifactId>pagehelper-spring-boot-starter</artifactId></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>${gson.version}</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--elasticsearch--><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>6.8.12</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-client</artifactId><version>6.8.12</version></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>6.8.12</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><scope>runtime</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-configuration-processor</artifactId><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>org.codehaus.jackson</groupId><artifactId>jackson-core-asl</artifactId><version>1.9.13</version><scope>compile</scope></dependency><dependency><groupId>com.geespace.microservices.bd-platform</groupId><artifactId>data-config</artifactId><version>1.0-SNAPSHOT</version><scope>compile</scope></dependency><!--httpclient--><dependency><groupId>commons-httpclient</groupId><artifactId>commons-httpclient</artifactId><version>3.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.2</version><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.1.4.RELEASE</version></dependency></dependencies><configuration><keepDependenciesWithProvidedScope>false</keepDependenciesWithProvidedScope><createDependencyReducedPom>true</createDependencyReducedPom><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.handlers</resource></transformer><transformerimplementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer"><resource>META-INF/spring.factories</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>META-INF/spring.schemas</resource></transformer><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.geespace.microservices.dispatcher.DispatchApplication</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>

DataxDolphinschedulerController

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;import javax.servlet.http.HttpServletRequest;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.geespace.microservices.builder.dto.ProcessDto;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.enums.DictionaryEnum;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.BizCode;
import com.geespace.microservices.builder.response.DolphinschedulerResponse;
import com.geespace.microservices.builder.response.Msg;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
import com.geespace.microservices.builder.service.SyncConfigService;
import com.geespace.microservices.builder.tools.JsonTools;import lombok.extern.slf4j.Slf4j;import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.NameValuePair;
import org.apache.commons.httpclient.methods.PostMethod;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.ResponseEntity;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;/*** 迁移dolphinscheduler调度器* * @author: liudz* @date: 2021/5/7*/
@Slf4j
@RestController
@RequestMapping("/dolphinscheduler/v1")
public class DataxDolphinschedulerController {@Autowiredprivate RestTemplate restTemplate;@Value("${dolphinscheduler.token}")String token;@Value("${dolphinscheduler.address}")String address;public static final int ZERO = 0;public static final int SUCCESS = 200;public static final String CREATE = "create";public static final String UPDATE = "update";public static final String ADD = "add";public static final String DELETE = "delete";public static final String ONLINE = "ONLINE";public static final String OFFLINE = "OFFLINE";public static final int ONE_THOUSAND_AND_FIVE_HUNDRED = 1500;public static final int SIX = 6;public static final int EIGHTY = 80;public static final int THREE = 3;@Autowiredprivate SyncConfigService syncConfigService;/*** 创建任务-创建用户下唯一工作流,无则创建有则并排添加* @param request request* @param form 任务参数* @author liudz* @date 2021/5/8* @return 执行结果**/@PostMapping("/project/process/datax")@Transactional(rollbackFor = Exception.class)public ReturnResult operatorDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigAddForm form) {Long userId = Long.valueOf(request.getUserPrincipal().getName());form.setUserId(userId);ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.addConfig(form);if (dataxTaskReturnResult.getCode() != SUCCESS) {return dataxTaskReturnResult;}log.info("--(1)addDataxTaskResult--success");form.setId(dataxTaskReturnResult.getData().getId());if (dataxTaskReturnResult.getCode() == SUCCESS) {Boolean verifyResult = verifyProcessExist(userId + "-dataxTask", form.getProjectName());log.info("--(2)verifyProcessExist--success:{}", verifyResult);if (!verifyResult) {ProcessDto processDto = packageProcessParam("create", userId + "-dataxTask", dataxTaskReturnResult.getData(), null);log.info("--(3)packageProcessParam--success");processDto.setProjectName(form.getProjectName());processDto.setProjectId(form.getProjectId());dataxTaskReturnResult =  createProcess(processDto);} else {//获取用户下唯一工作流IDDolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());JSONObject processJson = new JSONObject();log.info("--(3)getUserProcess--success:{}", processInfoList);List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();for (Map<String, Object> map : list) {if (map.get("name").equals(userId + "-dataxTask")) {processJson.fluentPutAll(map);}}ProcessDto processDto = packageProcessParam("add", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);processDto.setId(processJson.getInteger("id"));log.info("--(4)packageProcessParam--success");if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",processDto.getId(), 0);log.info("--(5)releaseProcessDefinition--OFFLINE--success");}dataxTaskReturnResult =  updateProcess(form, processDto);}}return dataxTaskReturnResult;}/*** 更新任务* @param request request* @param form 任务参数* @author liudz* @date 2021/5/8* @return 执行结果**/@PutMapping("/project/process/datax")@Transactional(rollbackFor = Exception.class)public ReturnResult updateDataxTask(HttpServletRequest request, @RequestBody @Validated ConfigUpdateForm form) {Long userId = Long.valueOf(request.getUserPrincipal().getName());form.setUserId(userId);ReturnResult<SyncConfigDto> dataxTaskReturnResult = syncConfigService.updateConfig(form);log.info("--(1)updateDataxTaskResult--mysql--success");if (dataxTaskReturnResult.getCode() == SUCCESS) {//获取用户下唯一工作流IDDolphinschedulerResponse processInfoList = getUserProcess(form.getProjectName());JSONObject processJson = new JSONObject();log.info("--(2)getUserProcess--success:{}", processInfoList);List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();for (Map<String, Object> map : list) {if (map.get("name").equals(userId + "-dataxTask")) {processJson.fluentPutAll(map);}}ProcessDto processDto = packageProcessParam("update", userId + "-dataxTask", dataxTaskReturnResult.getData(), processJson);processDto.setProjectName(form.getProjectName());processDto.setProjectId(form.getProjectId());processDto.setId(processJson.getInteger("id"));log.info("--(3)packageProcessParam--success");if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {releaseProcessDefinition(form.getProjectName(), userId + "-dataxTask",processDto.getId(), 0);log.info("--(4)releaseProcessDefinition--OFFLINE--success");}ConfigAddForm configAddForm = new ConfigAddForm();BeanUtils.copyProperties(form, configAddForm);return updateProcess(configAddForm, processDto);}return dataxTaskReturnResult;}/*** 删除任务* @param request request* @param projectName 项目名称* @param id 任务ID* @author liudz* @date 2021/5/8* @return 执行结果**/@DeleteMapping("/project/process/datax/{projectName}/{id}")@Transactional(rollbackFor = Exception.class)public ReturnResult deleteDataxTask(HttpServletRequest request, @PathVariable("projectName") String projectName,@PathVariable("id") Long id) {Long userId = Long.valueOf(request.getUserPrincipal().getName());SyncConfigDto syncConfigDto = new SyncConfigDto();syncConfigDto.setId(id);ConfigAddForm configAddForm = new ConfigAddForm();configAddForm.setProjectName(projectName);ReturnResult dataxTaskReturnResult = syncConfigService.delete(id, userId);log.info("--(1)deleteDataxTask--mysql--success");if (dataxTaskReturnResult.getCode() == SUCCESS) {//获取用户下唯一工作流IDDolphinschedulerResponse processInfoList = getUserProcess(projectName);JSONObject processJson = new JSONObject();log.info("--(2)getUserProcess--success:{}", processInfoList);List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();for (Map<String, Object> map : list) {if (map.get("name").equals(userId + "-dataxTask")) {processJson.fluentPutAll(map);}}ProcessDto processDto = packageProcessParam("delete", userId + "-dataxTask", syncConfigDto, processJson);processDto.setId(processJson.getInteger("id"));log.info("--(3)packageProcessParam--success");if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(ONLINE)) {releaseProcessDefinition(projectName, userId + "-dataxTask",processDto.getId(), 0);log.info("--(4)releaseProcessDefinition--OFFLINE--success");}if (JSONObject.parseObject(processDto.getLocations()).keySet().size() == 0) {//删除工作流deleteProcess(configAddForm, processDto);} else {//更新工作流updateProcess(configAddForm, processDto);}}return dataxTaskReturnResult;}/*** 校验工作流是否存在* * @param processName*            工作流名称* @param projectName 项目名称* @author liudz* @date 2021/5/8* @return boolean**/public Boolean verifyProcessExist(String processName, String projectName) {HttpHeaders headers = new HttpHeaders();headers.set("token", token);headers.set("Content-Type", "application/json");HttpEntity requestEntity = new HttpEntity(headers);ResponseEntity<DolphinschedulerResponse> returnResult =restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName+ "/process/verify-name?name=" + processName,HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);if (returnResult.getBody().getCode() == ZERO) {return false;}return true;}/*** 创建工作流* @param processDto processDto* @author liudz* @date 2021/5/7* @return 执行结果**/public ReturnResult createProcess(ProcessDto processDto) {try {String postURL = address + "/dolphinscheduler/projects/"+ URLEncoder.encode(processDto.getProjectName(), "utf-8") + "/process/save";PostMethod postMethod = new PostMethod(postURL);postMethod.setRequestHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");postMethod.setRequestHeader("token", token);NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),new NameValuePair("name", processDto.getName()),new NameValuePair("locations", processDto.getLocations()),new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};postMethod.setRequestBody(data);org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();httpClient.executeMethod(postMethod);JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());log.info("--(5)httpCreateProcess:{}", result);if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));}} catch (Exception e) {log.info("请求异常:{}", e);}return ReturnResult.success();}/*** 更新工作流* @param vo vo* @param processDto processDto* @author liudz* @date 2021/5/7* @return 执行结果**/public ReturnResult updateProcess(ConfigAddForm vo, ProcessDto processDto) {try {String postURL = address + "/dolphinscheduler/projects/"+ URLEncoder.encode(vo.getProjectName(), "utf-8") + "/process/update";PostMethod postMethod = new PostMethod(postURL);postMethod.setRequestHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");postMethod.setRequestHeader("token", token);// 参数设置,需要注意的就是里边不能传NULL,要传空字符串NameValuePair[] data = {new NameValuePair("connects", processDto.getConnects()),new NameValuePair("name", processDto.getName()),new NameValuePair("locations", processDto.getLocations()),new NameValuePair("id", processDto.getId().toString()),new NameValuePair("processDefinitionJson", processDto.getProcessDefinitionJson())};postMethod.setRequestBody(data);org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();httpClient.executeMethod(postMethod);JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());log.info("--(5)httpUpdateProcess:{}", result);if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));}} catch (Exception e) {log.info("请求异常:{}", e);}return ReturnResult.success();}/*** 删除工作流* @param dto dto* @param processDto processDto* @author liudz* @date 2021/5/7* @return 执行结果**/public DolphinschedulerResponse deleteProcess(ConfigAddForm dto, ProcessDto processDto) {HttpHeaders headers = new HttpHeaders();headers.set("token", token);headers.set("Content-Type", "application/json");HttpEntity requestEntity = new HttpEntity(headers);ResponseEntity<DolphinschedulerResponse> returnResult =restTemplate.exchange(address + "/dolphinscheduler/projects/" + dto.getProjectName()+ "/process/delete?processDefinitionId=" + processDto.getId(),HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);log.info("--(5)httpDeleteProcess:{}", returnResult);return returnResult.getBody();}/*** 获取dolphinscheduler上的资源spark可拖拽jar的id* * @author liudz* @date 2021/5/8* @return id**/public Integer getSparkResourceJarId() {Integer resourceId = null;HttpHeaders headers = new HttpHeaders();headers.set("token", token);headers.set("Content-Type", "application/json");HttpEntity requestEntity = new HttpEntity(headers);ResponseEntity<DolphinschedulerResponse> returnResult =restTemplate.exchange(address + "/dolphinscheduler/resources/authorize-resource-tree?userId=1",HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);List<Map<String, Object>> list = (List<Map<String, Object>>) returnResult.getBody().getData();for (Map<String, Object> map : list) {if (map.get("name").equals("big_data02.jar")) {resourceId = Integer.valueOf(map.get("id").toString());}}return resourceId;}/*** 获取dolphinscheduler上的某用户下唯一工作流* @param projectName 项目名称* @author liudz* @date 2021/5/8* @return id**/public DolphinschedulerResponse getUserProcess(String projectName) {HttpHeaders headers = new HttpHeaders();headers.set("token", token);headers.set("Content-Type", "application/json");HttpEntity requestEntity = new HttpEntity(headers);ResponseEntity<DolphinschedulerResponse> returnResult =restTemplate.exchange(address + "/dolphinscheduler/projects/" + projectName + "/process/list",HttpMethod.GET, requestEntity, DolphinschedulerResponse.class);return returnResult.getBody();}/***  封装参数* @param type 操作类型* @param processName 用户工作流名称* @param dto 任务参数* @param processJson 工作流json* @author liudz* @date 2021/5/13* @return ProcessDto**/public ProcessDto packageProcessParam(String type, String processName, SyncConfigDto dto, JSONObject processJson) {ProcessDto processDto = new ProcessDto();processDto.setConnects("[]");processDto.setName(processName);JSONObject locationsOne = new JSONObject();JSONObject locationsTwo = new JSONObject();locationsTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("targetarr", "").fluentPut("nodenumber", "0");locationsTwo.fluentPut("x", 0).fluentPut("y", 0);locationsOne.put("datax-" + dto.getId(), locationsTwo);// 创建工作流if (CREATE.equals(type)) {processDto = packageProcessParamOfCreate(processDto, dto, locationsOne);} else if (ADD.equals(type)) {//工作流添加节点processDto = packageProcessParamOfAdd(processDto, dto, processJson, locationsOne, locationsTwo);} else if (UPDATE.equals(type)) {//更新工作流-只更新参数processDefinitionJson的tasks参数processDto = packageProcessParamOfUpdate(processDto, processJson, dto);} else if (DELETE.equals(type)) {//更新工作流或删除工作流-更新则删除参数processDefinitionJson的tasks参数processDto = packageProcessParamOfDelete(processDto, processJson, dto);}return processDto;}/*** packageProcessParamOfCreate* @param processDto 工作流参数* @param dto 任务参数* @param locationsOne locationsOne* @author liudz* @date 2021/5/7* @return ProcessDto**/public ProcessDto packageProcessParamOfCreate(ProcessDto processDto, SyncConfigDto dto, JSONObject locationsOne) {processDto.setLocations(locationsOne.toString());JSONObject processOne = new JSONObject();processOne.fluentPut("globalParams", new ArrayList<>()).fluentPut("tenantId", THREE).fluentPut("timeout", 0);JSONObject processTwo = new JSONObject();processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");String taskJsonString = dto.getContent().toString();processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1,"+ "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}"));JSONObject jsonTimeout = new JSONObject();jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");JSONObject processTree = new JSONObject();processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());JSONObject jsonconditionResult = new JSONObject();jsonconditionResult.put("successNode", new ArrayList<>());jsonconditionResult.put("failedNode", new ArrayList<>());processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");processTwo.fluentPut("preTasks", new ArrayList<>());JSONArray processTaskArray = new JSONArray();processTaskArray.add(processTwo);processOne.put("tasks", processTaskArray);processDto.setProcessDefinitionJson(processOne.toString());return processDto;}/*** packageProcessParamOfAdd* @param processDto 工作流参数* @param locationsOne locationsOne* @param locationsTwo locationsTwo* @param dto 任务参数* @param processJson 工作流json* @author liudz* @date 2021/5/7* @return ProcessDto**/public ProcessDto packageProcessParamOfAdd(ProcessDto processDto, SyncConfigDto dto, JSONObject processJson,JSONObject locationsOne, JSONObject locationsTwo) {String maxTaskKey = JsonTools.getJsonStringMaxKey(processJson.getString("locations"));Integer x = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("x");Integer y = processJson.getJSONObject("locations").getJSONObject(maxTaskKey).getInteger("y");if (x < ONE_THOUSAND_AND_FIVE_HUNDRED) {locationsTwo.fluentPut("x", x + EIGHTY).fluentPut("y", y);} else if (x >= ONE_THOUSAND_AND_FIVE_HUNDRED) {locationsTwo.fluentPut("y", y + EIGHTY).fluentPut("x", 0);}locationsOne = processJson.getJSONObject("locations").fluentPut("datax-" + dto.getId(), locationsTwo);processDto.setLocations(locationsOne.toString());processDto.setId(processJson.getInteger("id"));JSONObject processTwo = new JSONObject();processTwo.fluentPut("type", "DATAX").fluentPut("id", "datax-" + dto.getId());processTwo.fluentPut("name", "datax-" + dto.getId()).fluentPut("description", "");String taskJsonString = dto.getContent().toString().replace("}}", "} }").replace("{{", "{ {");processTwo.put("params", JSONObject.parseObject("{\"localParams\":[],\"customConfig\":1,"+ "\"json\":\"" + taskJsonString.replace("\"", "\\\"") + "\"}"));JSONObject jsonTimeout = new JSONObject();jsonTimeout.fluentPut("strategy", "").fluentPut("interval", null).fluentPut("enable", false);processTwo.fluentPut("timeout", jsonTimeout).fluentPut("runFlag", "NORMAL");JSONObject processTree = new JSONObject();processTree.fluentPut("successNode", new ArrayList<>()).fluentPut("failedNode", new ArrayList<>());JSONObject jsonconditionResult = new JSONObject();jsonconditionResult.put("successNode", new ArrayList<>());jsonconditionResult.put("failedNode", new ArrayList<>());processTwo.fluentPut("conditionResult", jsonconditionResult).fluentPut("dependence", new JSONObject());processTwo.fluentPut("maxRetryTimes", "0").fluentPut("retryInterval", "1");processTwo.fluentPut("taskInstancePriority", "MEDIUM").fluentPut("workerGroup", "default");processTwo.fluentPut("preTasks", new ArrayList<>());JSONObject jsonNew = processJson.getJSONObject("processDefinitionJson");JSONArray jsonArray = jsonNew.getJSONArray("tasks");jsonArray.add(processTwo);jsonNew.put("tasks", jsonArray);processDto.setProcessDefinitionJson(jsonNew.toString());return processDto;}/*** packageProcessParamOfUpdate* @param processDto 工作流参数* @param dto 任务参数* @param processJson 工作流json* @author liudz* @date 2021/5/7* @return ProcessDto**/public ProcessDto packageProcessParamOfUpdate(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {processDto.setLocations(processJson.getString("locations"));processDto.setId(processJson.getInteger("id"));JSONArray jsonTasksArray = processJson.getJSONObject("processDefinitionJson").getJSONArray("tasks");JSONArray copyJsonTasksArray = new JSONArray();copyJsonTasksArray.addAll(jsonTasksArray);JSONObject processDefinitionJson = new JSONObject();String taskJsonString = dto.getContent().toString();for (Object object : jsonTasksArray) {JSONObject jsonObject = JSONObject.parseObject(object.toString());if (Long.valueOf(jsonObject.getString("id").substring(SIX)) == dto.getId()) {String json = jsonObject.getString("json");json = taskJsonString;copyJsonTasksArray.remove(jsonObject);jsonObject.getJSONObject("params").put("json", json);copyJsonTasksArray.add(jsonObject);processDefinitionJson = processJson.getJSONObject("processDefinitionJson");processDefinitionJson.put("tasks", copyJsonTasksArray);}}processDto.setProcessDefinitionJson(processDefinitionJson.toString());return processDto;}/*** packageProcessParamOfDelete* @param processDto 工作流参数* @param dto 任务参数* @param processJson 工作流json* @author liudz* @date 2021/5/7* @return ProcessDto**/public ProcessDto packageProcessParamOfDelete(ProcessDto processDto, JSONObject processJson, SyncConfigDto dto) {processDto.setId(processJson.getInteger("id"));JSONObject locationsJson = processJson.getJSONObject("locations");JSONObject processDefinitionJson = processJson.getJSONObject("processDefinitionJson");JSONArray processDefinitionArray = processDefinitionJson.getJSONArray("tasks");JSONArray copyProcessDefinitionArray = new JSONArray();copyProcessDefinitionArray.addAll(processDefinitionArray);if (locationsJson.containsKey(DictionaryEnum.DATAX.getFiledString() + dto.getId())) {locationsJson.remove("datax-" + dto.getId());for (Object object : processDefinitionArray) {if (JSONObject.parseObject(object.toString()).getString("id").equals("datax-" + dto.getId())) {copyProcessDefinitionArray.remove(object);}}processDefinitionJson.put("tasks", copyProcessDefinitionArray);}processDto.setLocations(locationsJson.toString());processDto.setProcessDefinitionJson(processDefinitionJson.toString());return processDto;}/*** 工作流【上线或者下线】* @param projectName 项目名称* @param processName 用户工作流名称* @param processId 工作流ID* @param releaseState 上下线状态操作【0:下线,1:上线】* @author liudz* @date 2021/5/7* @return 执行结果**/public ReturnResult releaseProcessDefinition(String projectName, String processName, Integer processId,Integer releaseState) {try {String postURL = address + "/dolphinscheduler/projects/"+ URLEncoder.encode(projectName, "utf-8") + "/process/release";PostMethod postMethod = new PostMethod(postURL);postMethod.setRequestHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");postMethod.setRequestHeader("token", token);// 参数设置,需要注意的就是里边不能传NULL,要传空字符串NameValuePair[] data = {new NameValuePair("name", processName),new NameValuePair("processId", processId.toString()),new NameValuePair("releaseState", releaseState.toString())};postMethod.setRequestBody(data);org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();httpClient.executeMethod(postMethod);JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {return ReturnResult.error(BizCode.DB_INSERT_ERROR, result.getString("msg"));}} catch (Exception e) {log.info("请求异常:{}", e);}return ReturnResult.success();}/*** 运行流程实例* @param projectName 项目名称* @param request request* @param id 数据同步任务ID* @author liudz* @date 2021/5/7* @return 执行结果**/@GetMapping("/project/process/datax/start")public DolphinschedulerResponse startProcessDataxTask(@RequestParam("projectName") String projectName, @RequestParam("id") Integer id,HttpServletRequest request) {try {Long userId = Long.valueOf(request.getUserPrincipal().getName());DolphinschedulerResponse processInfoList = getUserProcess(projectName);if (processInfoList.getCode() != ZERO) {return processInfoList;}JSONObject processJson = new JSONObject();log.info("--(1)getUserProcess--success:{}", processInfoList);List<Map<String, Object>> list = (List<Map<String, Object>>) processInfoList.getData();for (Map<String, Object> map : list) {if (map.get("name").equals(userId + "-dataxTask")) {processJson.fluentPutAll(map);}}if (processJson.getString(DictionaryEnum.RELEASE_STATE.getFiledString()).equals(OFFLINE)) {releaseProcessDefinition(projectName, userId + "-dataxTask",processJson.getInteger("id"), 1);log.info("--(2)releaseProcessDefinition--ONLINE--success");}String postURL = address + "/dolphinscheduler/projects/" + URLEncoder.encode(projectName, "utf-8")+ "/executors/start-process-instance";PostMethod postMethod = new PostMethod(postURL);postMethod.setRequestHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");postMethod.setRequestHeader("token", token);// 参数设置,需要注意的就是里边不能传NULL,要传空字符串NameValuePair[] data = packageNameValuePair(processJson, id);postMethod.setRequestBody(data);org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();httpClient.executeMethod(postMethod);JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());log.info("--(2)startProcessInstance--result:{}", result);if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));}} catch (Exception e) {log.info("请求异常:{}", e);}return DolphinschedulerResponse.success();}/***  packageNameValuePair封装参数* @param processJson 工作流json* @param dragSparkTaskId 任务ID* @author liudz* @date 2021/5/14* @return NameValuePair**/public NameValuePair[] packageNameValuePair(JSONObject processJson, Integer dragSparkTaskId) {NameValuePair[] data = {new NameValuePair("failureStrategy", "CONTINUE"),new NameValuePair("processDefinitionId", processJson.getString("id")),new NameValuePair("processInstancePriority", "MEDIUM"),new NameValuePair("warningGroupId", "0"),new NameValuePair("warningType", "NONE"),new NameValuePair("runMode", "RUN_MODE_SERIAL"),new NameValuePair("startNodeList", "datax-" + dragSparkTaskId),new NameValuePair("taskDependType", "TASK_POST"),new NameValuePair("workerGroup", "default")};return data;}/*** stopProcessDataxTask* @param id id* @param executeType executeType* @param projectName 项目名称* @return ReturnResult* @author: liudz* @author: lty update 2020/5/27* @date: 2020/4/28 10:31*/@GetMapping(value = "/project/process/datax/execute/{projectName}/{id}/{executeType}")public DolphinschedulerResponse<String> stopProcessDataxTask(@PathVariable("projectName") String projectName,@PathVariable("id") Long id, @PathVariable("executeType") String executeType) {log.info("--(1)stopProcessDataxTask--begin--projectName:{},id:{},executeType:{}", projectName, id, executeType);try {HttpHeaders headers = new HttpHeaders();headers.set("token", token);headers.set("Content-Type", "application/json");HttpEntity requestEntity = new HttpEntity(headers);ResponseEntity<JSONObject> returnResult = restTemplate.exchange(address + "/"+ "dolphinscheduler/projects/" + projectName + "/task-instance/list-paging?"+ "pageNo=1&pageSize=100&taskName=datax-" + id, HttpMethod.GET, requestEntity, JSONObject.class);List<Map<String, Object>> list =(List<Map<String, Object>>) returnResult.getBody().getJSONObject("data").get("totalList");Integer processInstanceId = null;for (Map<String, Object> map : list) {if (map.get("state").equals("RUNNING_EXEUTION")) {processInstanceId = Integer.valueOf(map.get("processInstanceId").toString());}}if (StringUtils.isEmpty(processInstanceId)) {return DolphinschedulerResponse.error(Msg.TASK_HAS_BEEN_STOPPED);}log.info("--(2)getProcessInstanceId--success--:{}", processInstanceId);String postURL = address + "/dolphinscheduler/projects/"+ URLEncoder.encode(projectName, "utf-8") + "/executors/execute";PostMethod postMethod = new PostMethod(postURL);postMethod.setRequestHeader("Content-Type","application/x-www-form-urlencoded;charset=utf-8");postMethod.setRequestHeader("token", token);NameValuePair[] data = {new NameValuePair("executeType", executeType),new NameValuePair("processInstanceId", processInstanceId.toString())};postMethod.setRequestBody(data);org.apache.commons.httpclient.HttpClient httpClient = new org.apache.commons.httpclient.HttpClient();httpClient.executeMethod(postMethod);JSONObject result = JSONObject.parseObject(postMethod.getResponseBodyAsString());if (!result.get(DictionaryEnum.CODE.getFiledString()).equals(ZERO)) {return DolphinschedulerResponse.error(result.getInteger("code"), result.getString("msg"));}log.info("--(3)stopProcessSparkTask--success--:{}", result);} catch (UnsupportedEncodingException e) {log.info("UnsupportedEncodingException:{}", e);} catch (HttpException e) {log.info("HttpException:{}", e);} catch (IOException e) {log.info("IOException:{}", e);}return DolphinschedulerResponse.success();}/*** 查询全部同步任务配置(分页)** @param form*            name* @param request*            含有用户id* @return 分页结果*/@RequestMapping(value = "/project/process/datax/list", method = RequestMethod.POST)public ReturnResult<PageResult<SyncConfigDto>> findAll(@RequestBody @Validated ConfigSelectForm form,HttpServletRequest request) {Long userId = Long.valueOf(request.getUserPrincipal().getName());return syncConfigService.list(form, userId);}/*** 获取同步任务配置** @param id*            配置id* @param request*            用户id* @return 添加结果*/@RequestMapping(value = "/project/process/datax", method = RequestMethod.GET)public ReturnResult<SyncConfigDto> findById(@RequestParam Long id, HttpServletRequest request) {Long userId = Long.valueOf(request.getUserPrincipal().getName());return syncConfigService.findById(id, userId);}
}

ConfigAddForm

package com.geespace.microservices.builder.request;import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;import lombok.Data;/*** @Author: zjr* @Date: 2020-05-06 09:42* @Version 1.0*/
@Data
public class ConfigAddForm {/*** 配置名称*/@NotEmpty(message = "name不能为空")private String name;/*** 配置描述*/private String description;/*** 实时/全量/增量*/@NotNull(message = "同步方式不能为空")private int syncType;/*** reader 选择的数据源id*/@NotNull(message = "读取数据源id不能为空")private Long readerConfigId;/*** reader*/@NotEmpty(message = "读取参数不能为空")private JSONObject readerParam;/*** writer 选择的数据源id*/@NotNull(message = "写入数据源id不能为空")private Long writerConfigId;/*** writer*/@NotEmpty(message = "写入参数不能为空")private JSONObject writerParam;/*** reader:column left,writer:column right*/@NotEmpty(message = "字段对照表不能为空")private JSONArray columnMap;private Long userId;/***  项目名称**/String projectName;/***  项目id**/@NotNull(message = "projectId not null")Long projectId;Long id;
}

ConfigUpdateForm

package com.geespace.microservices.builder.request;import javax.validation.constraints.NotEmpty;
import javax.validation.constraints.NotNull;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;import lombok.Data;/*** @Author: zjr* @Date: 2020-05-06 09:42* @Version 1.0*/
@Data
public class ConfigUpdateForm {@NotNull(message = "同步配置id不能为空")private Long id;/*** 配置名称*/@NotEmpty(message = "name不能为空")private String name;/*** 配置描述*/private String description;/*** 实时/全量/增量*/@NotNull(message = "同步方式不能为空")private int syncType;/*** reader 选择的数据源id*/@NotNull(message = "读取数据源id不能为空")private Long readerConfigId;/*** reader*/@NotEmpty(message = "读取参数不能为空")private JSONObject readerParam;/*** writer 选择的数据源id*/@NotNull(message = "写入数据源id不能为空")private Long writerConfigId;/*** writer*/@NotEmpty(message = "写入参数不能为空")private JSONObject writerParam;/*** reader:column left,writer:column right*/@NotEmpty(message = "字段对照表不能为空")private JSONArray columnMap;private Long userId;/***  项目id**/@NotNull(message = "projectId not null")Long projectId;/***  项目名称**/String projectName;}

ProcessDto

package com.geespace.microservices.builder.dto;import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;/*** dolphinscheduler调度器中工作流参数* @Author: liudz* @Date: 2020-03-23**/
@Data
@EqualsAndHashCode(callSuper = false)
@ToString(callSuper = true)
public class ProcessDto {/*** 流程定义ID**/private Integer id;/*** 流程定义节点图标连接信息(json格式)**/private String connects;/*** 流程定义节点坐标位置信息(json格式)**/private String locations;/*** 流程定义名称**/private String name;/*** 流程定义详细信息(json格式)**/private String processDefinitionJson;/***  项目名称**/String projectName;/***  项目id**/Long projectId;
}

SyncConfigDto

package com.geespace.microservices.builder.dto;import com.alibaba.fastjson.JSONObject;import lombok.Data;/*** @Author: zjr* @Date: 2020-05-05 17:03* @Version 1.0*/
@Data
public class SyncConfigDto {private Long id;/*** 配置名称*/private String name;/*** 配置描述*/private String description;/*** 实时/全量/增量*/private int syncType;/*** json base64*/private JSONObject content;/***  项目名称**/String projectName;/***  项目id**/Long projectId;
}

SyncConfigService

package com.geespace.microservices.builder.service;import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;/*** @Author: zjr* @Date: 2020-05-05 13:59* @Version 1.0*/
public interface SyncConfigService {/*** 添加同步任务配置* * @param form*            任务配置参数* @return 添加结果*/ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form);/*** 修改同步任务配置* * @param form*            任务配置参数(含id)* @return 修改结果*/ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form);/*** 查找同步任务配置* * @param id*            同步任务配置id* @param userId*            用户id* @return 查询结果*/ReturnResult<SyncConfigDto> findById(Long id, Long userId);/*** 删除同步任务配置* * @param id*            任务配置id* @param userId*            用户id* @return 删除结果*/ReturnResult delete(Long id, Long userId);/*** 查询全部同步任务配置(分页)** @param form*            name* @param userId*            用户id* @return 分页结果*/ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId);
}

SyncConfigServiceImpl

package com.geespace.microservices.builder.service.impl;import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.geespace.microservices.builder.biz.Contants;
import com.geespace.microservices.builder.dao.SyncConfigMapper;
import com.geespace.microservices.builder.dto.ColumnMap;
import com.geespace.microservices.builder.dto.SyncConfigDto;
import com.geespace.microservices.builder.entity.SyncConfig;
import com.geespace.microservices.builder.factory.BaseParamTool;
import com.geespace.microservices.builder.factory.ParamToolFactory;
import com.geespace.microservices.builder.request.ConfigAddForm;
import com.geespace.microservices.builder.request.ConfigSelectForm;
import com.geespace.microservices.builder.request.ConfigUpdateForm;
import com.geespace.microservices.builder.response.BizCode;
import com.geespace.microservices.builder.response.PageResult;
import com.geespace.microservices.builder.response.ReturnResult;
import com.geespace.microservices.builder.service.SyncConfigService;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.response.Response;
import com.geespace.microservices.datasource.service.JdbcDataSourceService;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;import lombok.extern.slf4j.Slf4j;import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;/*** @Author: zjr* @Date: 2020-05-05 13:59* @Version 1.0*/
@Service
@Slf4j
public class SyncConfigServiceImpl implements SyncConfigService {public static final int ZERO = 0;public static final String HBASE = "hbase";@Autowiredprivate SyncConfigMapper syncConfigMapper;@Autowiredprivate JdbcDataSourceService dataSourceService;@Overridepublic ReturnResult<SyncConfigDto> addConfig(ConfigAddForm form) {Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), null);if (checkResult == ZERO) {ColumnMap columnMap = makeColumnMap(form.getColumnMap());if (columnMap == null) {return ReturnResult.error(BizCode.COLUMN_MATCHING_ERROR);}// 查询reader数据源 填充readerJSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());// 查询writer数据源 填充writerJSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());JSONArray contentArray = new JSONArray();JSONObject content = new JSONObject();content.put("reader", reader);content.put("writer", writer);contentArray.add(content);SyncConfig syncConfig = new SyncConfig();syncConfig.setContent(packageJob(contentArray));syncConfig.setName(form.getName());syncConfig.setDescription(form.getDescription());syncConfig.setSyncType(form.getSyncType());syncConfig.setCreatedTimestamp(System.currentTimeMillis());syncConfig.setCreatedUser(form.getUserId());syncConfig.setModifiedTimestamp(System.currentTimeMillis());syncConfig.setProjectName(form.getProjectName());syncConfig.setProjectId(form.getProjectId());syncConfigMapper.insert(syncConfig);return ReturnResult.success(entityToDto(syncConfig));}log.error("SyncConfigServiceImpl--addConfig--NAME_IS_EXIST!");return ReturnResult.error(BizCode.NAME_IS_EXIST);}@Overridepublic ReturnResult<SyncConfigDto> updateConfig(ConfigUpdateForm form) {SyncConfig syncConfig = syncConfigMapper.findById(form.getId());if (syncConfig == null || syncConfig.getCreatedUser() != form.getUserId()) {return ReturnResult.error(BizCode.UPDATE_OBJECT_NOT_EXIST);}Integer checkResult = syncConfigMapper.checkNameUnique(form.getUserId(), form.getName(), form.getId());if (checkResult == ZERO) {ColumnMap columnMap = makeColumnMap(form.getColumnMap());JSONObject reader = packageReader(form.getReaderConfigId(), form.getReaderParam(), columnMap.getReader());JSONObject writer = packageWriter(form.getWriterConfigId(), form.getWriterParam(), columnMap.getWriter());JSONArray contentArray = new JSONArray();JSONObject content = new JSONObject();content.put("reader", reader);content.put("writer", writer);contentArray.add(content);syncConfig.setContent(packageJob(contentArray));syncConfig.setName(form.getName());syncConfig.setDescription(form.getDescription());syncConfig.setSyncType(form.getSyncType());syncConfig.setModifiedTimestamp(System.currentTimeMillis());syncConfig.setProjectName(form.getProjectName());syncConfig.setProjectId(form.getProjectId());syncConfigMapper.update(syncConfig);return ReturnResult.success(entityToDto(syncConfig));}log.error("SyncConfigServiceImpl--updateConfig--NAME_IS_EXIST!");return ReturnResult.error(BizCode.NAME_IS_EXIST);}@Overridepublic ReturnResult<SyncConfigDto> findById(Long id, Long userId) {SyncConfig syncConfig = syncConfigMapper.findById(id);if (syncConfig == null || syncConfig.getCreatedUser() != userId) {return ReturnResult.success(new SyncConfigDto());}return ReturnResult.success(entityToDto(syncConfig));}@Overridepublic ReturnResult delete(Long id, Long userId) {log.debug("****id:{},userId:{}****", id, userId);SyncConfig syncConfig = syncConfigMapper.findById(id);log.debug("****syncConfig:{}****", syncConfig);log.debug("****syncConfig != null:{}", syncConfig != null);log.debug("****syncConfig.getCreatedUser():{},userId:{},syncConfig.getCreatedUser().equals(userId):{}",syncConfig.getCreatedUser(), userId, syncConfig.getCreatedUser().equals(userId));if (syncConfig != null && syncConfig.getCreatedUser().equals(userId)) {syncConfigMapper.delete(id);log.debug("****delete success!");}return ReturnResult.success();}@Overridepublic ReturnResult<PageResult<SyncConfigDto>> list(ConfigSelectForm form, Long userId) {SyncConfig syncConfig = new SyncConfig();syncConfig.setCreatedUser(userId);syncConfig.setName(form.getName());syncConfig.setProjectId(form.getProjectId());PageHelper.startPage(form.getPageNum(), form.getPageSize());PageInfo<SyncConfig> configPageInfo = new PageInfo<>(syncConfigMapper.list(syncConfig));PageResult<SyncConfigDto> result = new PageResult<>();result.setPageNum(configPageInfo.getPageNum());result.setPageSize(configPageInfo.getPageSize());result.setTotalCount(configPageInfo.getTotal());result.setTotalPage(configPageInfo.getPages());List<SyncConfigDto> dtoList =configPageInfo.getList().stream().map(this::entityToDto).collect(Collectors.toList());result.setList(dtoList);return ReturnResult.success(result);}/*** 将reader writer对照list查分成2个独立list(保持顺序)** @param columnMap*            [{"reader":"col l1","writer":"col r1"},{"reader":"col l2","writer":"col r2"}]* @return object contants reader(list<String>) and writer(list<String>)*/private ColumnMap makeColumnMap(JSONArray columnMap) {List<String> readerColumns = new ArrayList<>();List<String> writerColumns = new ArrayList<>();for (int i = 0; i < columnMap.size(); i++) {JSONObject column = columnMap.getJSONObject(i);readerColumns.add(column.getString("reader"));writerColumns.add(column.getString("writer"));}if (CollectionUtils.isEmpty(readerColumns) || CollectionUtils.isEmpty(writerColumns)) {return null;}ColumnMap column = new ColumnMap();column.setReader(readerColumns);column.setWriter(writerColumns);return column;}/*** 封装reader json** @param readerConfigId*            数据源id* @param readerParam*            页面填写reader 配置属性信息(table、where...)* @param readerColumns*            选择的数据字段* @return reader json*/private JSONObject packageReader(Long readerConfigId, JSONObject readerParam, List<String> readerColumns) {Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(readerConfigId);if (!descrypt.responseSuccess()) {return null;}JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();String sourceType = jdbcDataSource.getSourceType();BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);JSONObject reader = baseParamTool.makeReaderJson(jdbcDataSource, readerParam, readerColumns);return reader;}/*** 封装writer json** @param writerConfigId*            数据源id* @param writerParam*            页面填写writer 配置属性信息(table、where...)* @param writerColumns*            选择的映射字段* @return writer json*/private JSONObject packageWriter(Long writerConfigId, JSONObject writerParam, List<String> writerColumns) {Response<JdbcDataSourceDto> descrypt = dataSourceService.findDescrypt(writerConfigId);if (!descrypt.responseSuccess()) {return null;}JdbcDataSourceDto jdbcDataSource = descrypt.getInfo();String sourceType = jdbcDataSource.getSourceType();BaseParamTool baseParamTool = ParamToolFactory.getByType(sourceType);JSONObject writer = baseParamTool.makeWriterJson(jdbcDataSource, writerParam, writerColumns);return writer;}/*** 封装执行job json** @param content*            reader and writer* @return job*/private JSONObject packageJob(JSONArray content) {JSONObject job = new JSONObject();JSONObject setting = new JSONObject();JSONObject speed = new JSONObject();speed.put("channel", 1);JSONObject errorLimit = new JSONObject();errorLimit.put("record", 0);errorLimit.put("percentage", Contants.PERCENTAGE);setting.put("speed", speed);setting.put("errorLimit", errorLimit);job.put("setting", setting);job.put("content", content);JSONObject jobContent = new JSONObject();jobContent.put("job", job);return jobContent;}/*** entity转dto** @param syncConfig*            entity* @return dto*/private SyncConfigDto entityToDto(SyncConfig syncConfig) {SyncConfigDto configDto = new SyncConfigDto();BeanUtils.copyProperties(syncConfig, configDto);return configDto;}
}

SyncConfigMapper

package com.geespace.microservices.builder.dao;import java.util.List;import com.geespace.microservices.builder.entity.SyncConfig;import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.SelectKey;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.Update;
import org.apache.ibatis.type.JdbcType;/*** @Author: zjr* @Date: 2020-05-05 10:40* @Version 1.0*/
@Mapper
public interface SyncConfigMapper {/*** 插入一条数据** @param syncConfig*            插入对象* @return 结果*/@Insert({"insert into sync_config (name, description, content, sync_type, created_timestamp, created_user, ","modified_timestamp,project_id,project_name) values (#{name,jdbcType=VARCHAR},#{description,jdbcType=VARCHAR},","#{content,jdbcType=OTHER, typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler}, ","#{syncType,jdbcType=TINYINT}, #{createdTimestamp,jdbcType=BIGINT}, #{createdUser,jdbcType=BIGINT}, ","#{modifiedTimestamp,jdbcType=BIGINT},#{projectId},#{projectName})"})@SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)Long insert(SyncConfig syncConfig);/*** 更新数据** @param syncConfig*            插入对象* @return 结果*/@Update({"update sync_config set ","name = #{name,jdbcType=VARCHAR}, description = #{description,jdbcType=VARCHAR}, ","sync_type = #{syncType,jdbcType=TINYINT}, content = #{content,jdbcType=OTHER,","typeHandler=com.geespace.microservices.builder.handler.MySqlJsonHandler},project_id=#{projectId}, ","created_timestamp = #{createdTimestamp,jdbcType=BIGINT}, created_user = #{createdUser,jdbcType=BIGINT}, ","modified_timestamp = #{modifiedTimestamp,jdbcType=BIGINT},project_name=#{projectName},project_id=#{projectId}"," where id = #{id,jdbcType=BIGINT}"})int update(SyncConfig syncConfig);/*** 删除数据** @param id*            config id* @return 影响行数*/@Delete("delete from sync_config where id = #{id,jdbcType=BIGINT}")int delete(Long id);/*** 查询** @param syncConfig*            name* @return list结果*/@SelectProvider(type = SyncConfigSqlProvider.class, method = "select")@Results(id = "resultMap",value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),@Result(column = "name", property = "name", jdbcType = JdbcType.VARCHAR),@Result(column = "description", property = "description", jdbcType = JdbcType.VARCHAR),@Result(column = "sync_type", property = "syncType", jdbcType = JdbcType.TINYINT),@Result(column = "content", property = "content", jdbcType = JdbcType.OTHER,typeHandler = com.geespace.microservices.builder.handler.MySqlJsonHandler.class),@Result(column = "created_timestamp", property = "createdTimestamp", jdbcType = JdbcType.BIGINT),@Result(column = "created_user", property = "createdUser", jdbcType = JdbcType.BIGINT),@Result(column = "project_id", property = "projectId", jdbcType = JdbcType.BIGINT),@Result(column = "project_name", property = "projectName", jdbcType = JdbcType.VARCHAR),@Result(column = "modified_timestamp", property = "modifiedTimestamp", jdbcType = JdbcType.BIGINT)})List<SyncConfig> list(SyncConfig syncConfig);/*** id 查询** @param id*            id* @return 结果*/@Select({"select id,project_id,project_name,name, description, sync_type, content,"+ " created_timestamp, created_user, modified_timestamp ","from sync_config where id = #{id,jdbcType=BIGINT}"})@ResultMap("resultMap")SyncConfig findById(Long id);/*** 校验任务名称唯一性,用于新增功能* @author: liudz* @param createdUser 用户ID* @param name 任务名称* @param id 任务ID* @date: 2020/7/23* @return SparkTask*/@SelectProvider(type = SyncConfigSqlProvider.class, method = "checkNameUnique")Integer checkNameUnique(Long createdUser, String name, Long id);
}

SyncConfigSqlProvider

package com.geespace.microservices.builder.dao;import com.geespace.microservices.builder.entity.SyncConfig;import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.jdbc.SQL;/*** @Author: zjr* @Date: 2020-05-22 13:35* @Version 1.0*/
public class SyncConfigSqlProvider {/*** 条件查询** @param syncConfig*            name* @return sql*/public String select(SyncConfig syncConfig) {SQL sql = new SQL();sql.SELECT("id,project_id,project_name,name, description, sync_type, content, created_timestamp,"+ " created_user, modified_timestamp");sql.FROM("sync_config");sql.WHERE("created_user = #{createdUser,jdbcType=BIGINT}");if (!org.springframework.util.StringUtils.isEmpty(syncConfig.getProjectId())) {sql.WHERE("project_id=#{projectId}");}if (!StringUtils.isBlank(syncConfig.getName())) {sql.WHERE("name like concat('%', #{name,jdbcType=VARCHAR}, '%')");}sql.ORDER_BY("id desc");return sql.toString();}/*** 校验任务名称唯一性,用于新增功能** @author: liudz* @date 2019/12/3* @author: liudz* @param createdUser 用户ID* @param name 任务名称* @param id 任务ID* @return sql*/public String checkNameUnique(Long createdUser, String name, Long id) {SQL sql = new SQL();sql.SELECT("COUNT(name)");sql.FROM("sync_config");if (!org.springframework.util.StringUtils.isEmpty(id)) {sql.WHERE("id != #{id}");}sql.WHERE("created_user=#{createdUser} and name=#{name}");return sql.toString();}
}

JdbcDataSourceService

package com.geespace.microservices.datasource.service;import java.util.List;import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
import com.geespace.microservices.datasource.response.PageResult;
import com.geespace.microservices.datasource.response.Response;/*** @Author: zjr* @Date: 2020-04-07 17:44* @Version 1.0*/
public interface JdbcDataSourceService {/*** 添加数据源信息* * @param dataSourceAddForm*            数据源信息* @return 添加成功的信息*/Response<JdbcDataSourceDto> addDataSource(DataSourceAddForm dataSourceAddForm);/*** 修改数据源信息* * @param dataSourceUpdateForm*            数据源信息* @return 修改后的信息*/Response<JdbcDataSourceDto> updateDataSource(DataSourceUpdateForm dataSourceUpdateForm);/*** 删除数据源信息* * @param id*            数据源id* @return 删除是否成功*/Response deleteDataSource(Long id);/*** 数据源列表查询-全量* * @param creator*            创建者* @return 全量列表*/Response<List<JdbcDataSourceDto>> list(Long creator);/*** 数据源列表查询-按类型查询** @param type*            数据源类型* @param creator*            创建者* @return 全量列表*/Response<List<JdbcDataSourceDto>> listByType(Long creator, List<String> type);/*** 内部数据源列表查询-全量** @return 全量列表*/Response<List<JdbcDataSourceDto>> listMeta();/*** 数据源列表查询-分页** @param form*            查询条件* @return 分页列表*/Response<PageResult<JdbcDataSourceDto>> select(DataSourceSelectForm form);/*** 通过id查找数据源* * @param id*            数据源id* @return 查询结果*/Response<JdbcDataSourceDto> find(Long id);/*** 通过id查找元数据源** @param id*            数据源id* @return 查询结果*/Response<JdbcDataSourceDto> findMetaDataSource(Long id);/*** 通过id查找数据源-明文** @param id*            数据源id* @return 查询结果*/Response<JdbcDataSourceDto> findDescrypt(Long id);
}

JdbcDataSourceServiceImpl

package com.geespace.microservices.datasource.service.impl;import java.util.Date;
import java.util.List;
import java.util.stream.Collectors;import com.geespace.microservices.datasource.dao.DataSourceMapper;
import com.geespace.microservices.datasource.dao.MetaDataSourceMapper;
import com.geespace.microservices.datasource.dto.JdbcDataSourceDto;
import com.geespace.microservices.datasource.entity.JdbcDataSource;
import com.geespace.microservices.datasource.enums.JdbcDataSourceStatusEnum;
import com.geespace.microservices.datasource.form.datasource.DataSourceAddForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceSelectForm;
import com.geespace.microservices.datasource.form.datasource.DataSourceUpdateForm;
import com.geespace.microservices.datasource.response.Msg;
import com.geespace.microservices.datasource.response.PageResult;
import com.geespace.microservices.datasource.response.Response;
import com.geespace.microservices.datasource.service.JdbcDataSourceService;
import com.geespace.microservices.datasource.util.AesUtil;
import com.geespace.microservices.datasource.util.LocalCacheUtil;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;import lombok.extern.slf4j.Slf4j;import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/*** @Author: zjr* @Date: 2020-04-07 17:45* @Version 1.0*/
@Slf4j
@Service
public class JdbcDataSourceServiceImpl implements JdbcDataSourceService {@Autowiredprivate DataSourceMapper dataSourceMapper;@Autowiredprivate MetaDataSourceMapper metaDataSourceMapper;@Overridepublic Response<JdbcDataSourceDto> addDataSource(DataSourceAddForm dataSourceAddForm) {JdbcDataSource dataSource = new JdbcDataSource();BeanUtils.copyProperties(dataSourceAddForm, dataSource);JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);if (exist != null) {return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);}String userName = AesUtil.decrypt(dataSource.getUserName());// 判断账密是否为密文if (userName == null) {dataSource.setUserName(AesUtil.encrypt(dataSource.getUserName()));}String pwd = AesUtil.decrypt(dataSource.getPassword());if (pwd == null) {dataSource.setPassword(AesUtil.encrypt(dataSource.getPassword()));}dataSource.setCreateTime(new Date());dataSource.setUpdateTime(new Date());dataSource.setStatus(JdbcDataSourceStatusEnum.USING.getStatus());dataSourceMapper.insert(dataSource);JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();BeanUtils.copyProperties(dataSource, dataSourceDto);return Response.success(dataSourceDto);}@Overridepublic Response<JdbcDataSourceDto> updateDataSource(DataSourceUpdateForm dataSourceUpdateForm) {JdbcDataSource dataSource = dataSourceMapper.find(dataSourceUpdateForm.getId());if (dataSource == null || dataSourceUpdateForm.getCreator() != dataSource.getCreator()) {return Response.error(Msg.DATASOURCE_NOT_EXIST);}String userName = AesUtil.decrypt(dataSourceUpdateForm.getUserName());// 判断账密是否为密文if (userName == null) {dataSourceUpdateForm.setUserName(AesUtil.encrypt(dataSourceUpdateForm.getUserName()));}String pwd = AesUtil.decrypt(dataSourceUpdateForm.getPassword());if (pwd == null) {dataSourceUpdateForm.setPassword(AesUtil.encrypt(dataSourceUpdateForm.getPassword()));}String originName = dataSource.getSourceName();// 注意copyProperties是将source中的属性全部copy到target中BeanUtils.copyProperties(dataSourceUpdateForm, dataSource);JdbcDataSource exist = dataSourceMapper.nameExist(dataSource);if (exist != null && !exist.getSourceName().equals(originName)) {return Response.error(Msg.DATASOURCE_NAME_ALREAD_EXIST);}dataSource.setUpdateTime(new Date());dataSourceMapper.update(dataSource);LocalCacheUtil.remove(dataSource.getCreator() + originName);JdbcDataSourceDto dataSourceDto = new JdbcDataSourceDto();BeanUtils.copyProperties(dataSource, dataSourceDto);return Response.success(dataSourceDto);}@Overridepublic Response deleteDataSource(Long id) {dataSourceMapper.delete(id);return Response.success();}@Overridepublic Response<List<JdbcDataSourceDto>> list(Long creator) {List<JdbcDataSource> list = dataSourceMapper.list(creator);List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList());return Response.success(listDto);}@Overridepublic Response<List<JdbcDataSourceDto>> listByType(Long creator, List<String> type) {List<JdbcDataSource> list = dataSourceMapper.listByType(creator, type);List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList());return Response.success(listDto);}@Overridepublic Response<List<JdbcDataSourceDto>> listMeta() {List<JdbcDataSource> list = metaDataSourceMapper.list();List<JdbcDataSourceDto> listDto = list.stream().map(this::getDto).collect(Collectors.toList());return Response.success(listDto);}@Overridepublic Response<PageResult<JdbcDataSourceDto>> select(DataSourceSelectForm form) {JdbcDataSource select = new JdbcDataSource();select.setSourceName(form.getSourceName());select.setCreator(form.getCreator());PageHelper.startPage(form.getPageNum(), form.getPageSize());List<JdbcDataSource> list = dataSourceMapper.select(select);PageInfo<JdbcDataSource> pageInfo = new PageInfo<>(list);PageResult<JdbcDataSourceDto> pageResult = new PageResult<>();pageResult.setPageNum(pageInfo.getPageNum());pageResult.setPageSize(pageInfo.getPageSize());pageResult.setTotalPage(pageInfo.getPages());pageResult.setTotalCount(pageInfo.getTotal());pageResult.setList(pageInfo.getList().stream().map(this::getDto).collect(Collectors.toList()));return Response.success(pageResult);}@Overridepublic Response<JdbcDataSourceDto> find(Long id) {JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);if (jdbcDataSource == null) {return Response.error(Msg.DATASOURCE_NOT_EXIST);}return Response.success(getDto(jdbcDataSource));}@Overridepublic Response<JdbcDataSourceDto> findMetaDataSource(Long id) {JdbcDataSource jdbcDataSource = this.metaDataSourceMapper.find(id);if (jdbcDataSource == null) {return Response.error(Msg.DATASOURCE_NOT_EXIST);}return Response.success(getDto(jdbcDataSource));}@Overridepublic Response<JdbcDataSourceDto> findDescrypt(Long id) {JdbcDataSource jdbcDataSource = dataSourceMapper.find(id);if (jdbcDataSource == null) {return Response.error(Msg.DATASOURCE_NOT_EXIST);}if (!StringUtils.isBlank(jdbcDataSource.getUserName())) {jdbcDataSource.setUserName(AesUtil.decrypt(jdbcDataSource.getUserName()));}if (!StringUtils.isBlank(jdbcDataSource.getPassword())) {jdbcDataSource.setPassword(AesUtil.decrypt(jdbcDataSource.getPassword()));}return Response.success(getDto(jdbcDataSource));}/*** 获取dto* * @param jdbcDataSource*            source* @return dto*/private JdbcDataSourceDto getDto(JdbcDataSource jdbcDataSource) {JdbcDataSourceDto dto = new JdbcDataSourceDto();BeanUtils.copyProperties(jdbcDataSource, dto);return dto;}
}

DataSourceMapper

package com.geespace.microservices.datasource.dao;import java.util.List;import com.geespace.microservices.datasource.entity.JdbcDataSource;import org.apache.ibatis.annotations.Delete;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Options;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.annotations.SelectKey;
import org.apache.ibatis.annotations.SelectProvider;
import org.apache.ibatis.annotations.Update;
import org.apache.ibatis.type.JdbcType;/*** @Author: zjr* @Date: 2020-04-07 17:05* @Version 1.0*/
@Mapper
public interface DataSourceMapper {/*** 添加数据源信息* * @param source*            数据源* @return id*/@Insert({"insert into ge_jdbc_datasource (source_type, source_name, jdbc_url, user_name, password, zk_address, znode, ","database_name, jdbc_driver_class, remark, creator, create_time, update_time, status)","values (#{sourceType,jdbcType=VARCHAR}, #{sourceName,jdbcType=VARCHAR}, #{jdbcUrl,jdbcType=VARCHAR}, ","#{userName,jdbcType=VARCHAR}, #{password,jdbcType=VARCHAR}, #{zkAddress,jdbcType=VARCHAR}, ","#{znode,jdbcType=VARCHAR}, #{databaseName,jdbcType=VARCHAR}, #{jdbcDriverClass,jdbcType=VARCHAR}, ","#{remark,jdbcType=VARCHAR}, #{creator,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, ","#{updateTime,jdbcType=TIMESTAMP}, #{status,jdbcType=TINYINT})"})@SelectKey(statement = "SELECT LAST_INSERT_ID()", keyProperty = "id", before = false, resultType = Long.class)int insert(JdbcDataSource source);/*** 修改数据源* * @param source*            全量修改* @return 影响行数*/@Update({"update ge_jdbc_datasource","set source_type = #{sourceType,jdbcType=VARCHAR}, source_name = #{sourceName,jdbcType=VARCHAR}, ","jdbc_url = #{jdbcUrl,jdbcType=VARCHAR}, user_name = #{userName,jdbcType=VARCHAR}, ","password = #{password,jdbcType=VARCHAR}, zk_address = #{zkAddress,jdbcType=VARCHAR}, ","znode = #{znode,jdbcType=VARCHAR}, database_name = #{databaseName,jdbcType=VARCHAR}, ","jdbc_driver_class = #{jdbcDriverClass,jdbcType=VARCHAR}, remark = #{remark,jdbcType=VARCHAR}, ","update_time = #{updateTime,jdbcType=TIMESTAMP}, status = #{status,jdbcType=TINYINT}","where id = #{id,jdbcType=BIGINT} and status = 1"})int update(JdbcDataSource source);/*** 删除数据源配置* * @param id*            数据源id* @return 影响行数*/@Delete("delete from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT}")int delete(Long id);/*** 查询用户数据源配置* * @param creator*            创建者id* @return 数据源列表*/@Select({"select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} and status = 1"," order by id desc"})@Results(id = "resultMap",value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),@Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),@Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),@Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),@Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),@Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),@Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),@Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),@Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),@Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),@Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),@Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),@Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),@Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),@Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})List<JdbcDataSource> list(Long creator);/*** 查询用户数据源配置** @param creator*            创建者id* @param type*            数据源类型* @return 数据源列表*/@Select({"<script>", "select * from ge_jdbc_datasource where creator = #{creator,jdbcType=BIGINT} ","and status = 1 and source_type in ","<foreach item='item' index='index' collection='type' open='(' separator=',' close=')'>","#{item,jdbcType=VARCHAR}", "</foreach>", " order by id desc", "</script>"})@ResultMap("resultMap")List<JdbcDataSource> listByType(@Param("creator") Long creator, @Param("type") List<String> type);/*** 条件查询数据源* * @param jdbcDataSource*            查询条件* @return 查询结果*/@SelectProvider(type = DataSourceSqlProvider.class, method = "select")@ResultMap("resultMap")List<JdbcDataSource> select(JdbcDataSource jdbcDataSource);/*** id 查找* * @param id*            id* @return 数据源*/@Options(flushCache = Options.FlushCachePolicy.TRUE)@Select("select * from ge_jdbc_datasource where id = #{id,jdbcType=BIGINT} and status = 1")@ResultMap("resultMap")JdbcDataSource find(Long id);/*** 数据源名称是否存在* * @param jdbcDataSource*            数据源名称* @return 数据源*/@Select({"select * from ge_jdbc_datasource ","where source_name = #{sourceName,jdbcType=VARCHAR} and creator = #{creator,jdbcType=BIGINT}"})@ResultMap("resultMap")JdbcDataSource nameExist(JdbcDataSource jdbcDataSource);
}

DataSourceSqlProvider

package com.geespace.microservices.datasource.dao;import com.geespace.microservices.datasource.entity.JdbcDataSource;import org.apache.commons.lang.StringUtils;
import org.apache.ibatis.jdbc.SQL;/*** @Author: zjr* @Date: 2020-04-09 14:20* @Version 1.0*/
public class DataSourceSqlProvider {/*** 条件查询sql语句生成* * @param jdbcDataSource*            查询条件* @return sql语句*/public String select(JdbcDataSource jdbcDataSource) {SQL sql = new SQL();sql.SELECT("*");sql.FROM("ge_jdbc_datasource");sql.WHERE("status = 1");if (jdbcDataSource.getCreator() != null) {sql.WHERE("creator = #{creator,jdbcType=BIGINT}");}if (!StringUtils.isBlank(jdbcDataSource.getSourceName())) {sql.WHERE("source_name like concat('%', #{sourceName,jdbcType=VARCHAR}, '%')");}sql.ORDER_BY("id desc");return sql.toString();}
}

MetaDataSourceMapper

package com.geespace.microservices.datasource.dao;import java.util.List;import com.geespace.microservices.datasource.entity.JdbcDataSource;import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Result;
import org.apache.ibatis.annotations.ResultMap;
import org.apache.ibatis.annotations.Results;
import org.apache.ibatis.annotations.Select;
import org.apache.ibatis.type.JdbcType;/*** 内部数据源,系统配置,和外部数据源保持一致* @Author: zjr* @Date: 2020-04-07 17:05* @Version 1.0*/
@Mapper
public interface MetaDataSourceMapper {/*** 查询用户数据源配置* * @return 数据源列表*/@Select("select * from ge_meta_datasource ")@Results(id = "resultMap",value = {@Result(column = "id", property = "id", jdbcType = JdbcType.BIGINT, id = true),@Result(column = "source_type", property = "sourceType", jdbcType = JdbcType.VARCHAR),@Result(column = "source_name", property = "sourceName", jdbcType = JdbcType.VARCHAR),@Result(column = "jdbc_url", property = "jdbcUrl", jdbcType = JdbcType.VARCHAR),@Result(column = "user_name", property = "userName", jdbcType = JdbcType.VARCHAR),@Result(column = "password", property = "password", jdbcType = JdbcType.VARCHAR),@Result(column = "zk_address", property = "zkAddress", jdbcType = JdbcType.VARCHAR),@Result(column = "znode", property = "znode", jdbcType = JdbcType.VARCHAR),@Result(column = "database_name", property = "databaseName", jdbcType = JdbcType.VARCHAR),@Result(column = "jdbc_driver_class", property = "jdbcDriverClass", jdbcType = JdbcType.VARCHAR),@Result(column = "remark", property = "remark", jdbcType = JdbcType.VARCHAR),@Result(column = "creator", property = "creator", jdbcType = JdbcType.BIGINT),@Result(column = "create_time", property = "createTime", jdbcType = JdbcType.TIMESTAMP),@Result(column = "update_time", property = "updateTime", jdbcType = JdbcType.TIMESTAMP),@Result(column = "status", property = "status", jdbcType = JdbcType.TINYINT)})List<JdbcDataSource> list();/*** id 查找* * @param id*            id* @return 数据源*/@Select("select * from ge_meta_datasource where id = #{id,jdbcType=BIGINT} ")@ResultMap("resultMap")JdbcDataSource find(Long id);
}

DataSourceSelectForm

package com.geespace.microservices.datasource.form.datasource;import javax.validation.constraints.NotNull;import lombok.Data;/*** @Author: zjr* @Date: 2020-04-09 14:13* @Version 1.0*/
@Data
public class DataSourceSelectForm {/*** 数据源名称模糊查询*/private String sourceName;/*** 创建者*/private Long creator;/*** 页码*/@NotNull(message = "pageSize不能为空")private int pageSize;/*** 每页数据量*/@NotNull(message = "pageNum不能为空")private int pageNum;
}

DataSourceAddForm

package com.geespace.microservices.datasource.form.datasource;import javax.validation.constraints.NotBlank;import lombok.Data;/*** @Author: zjr* @Date: 2020-04-07 17:46* @Version 1.0*/
@Data
public class DataSourceAddForm {/*** 数据源类型*/@NotBlank(message = "数据源类型不能为空")private String sourceType;/*** 数据源名称*/@NotBlank(message = "数据源名称不能为空")private String sourceName;/*** jdbc url*/private String jdbcUrl;/*** 用户名*/private String userName;/*** 密码*/private String password;/*** zk地址*/private String zkAddress;/*** hbase znode*/private String znode;/*** 数据库名称*/private String databaseName;/*** 驱动类*/private String jdbcDriverClass;/*** 备注*/private String remark;/*** 创建者*/private Long creator;}

DataSourceUpdateForm

package com.geespace.microservices.datasource.form.datasource;import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;import lombok.Data;/*** @Author: zjr* @Date: 2020-04-07 17:46* @Version 1.0*/
@Data
public class DataSourceUpdateForm {/*** id*/@NotNull(message = "id不能为空")private Long id;/*** 数据源类型*/@NotBlank(message = "数据源类型不能为空")private String sourceType;/*** 数据源名称*/@NotBlank(message = "数据源名称不能为空")private String sourceName;/*** jdbc url*/private String jdbcUrl;/*** 用户名*/private String userName;/*** 密码*/private String password;/*** zk地址*/private String zkAddress;/*** hbase znode*/private String znode;/*** 数据库名称*/private String databaseName;/*** 驱动类*/private String jdbcDriverClass;/*** 备注*/private String remark;/*** 创建者*/private Long creator;
}

YAPI测试用例

5.1查询全部同步任务配置(分页)

{"pageNum": 1,"pageSize": 10,"projectId": 28,"name": "测试"
}

5.2 创建同步任务配置-mysql->mysql

{"name": "测试同步任务配置-mysql-mysql-1","description": "测试同步任务配置-mysql-mysql-1","projectName": "test测试1","projectId": 28,"syncType": 2,"readerConfigId": 1,"readerParam": {"table": "test_test"},"writerConfigId": 1,"writerParam": {"table": "test_test_1"},"columnMap": [{"reader": "id","writer": "id"},{"reader": "name","writer": "name"}]
}

5.6 创建同步任务配置-hbase->hbase

{"name": "测试同步任务配置-hbase-hbase-1","description": "测试同步任务配置-hbase-hbase-1","projectName": "test测试1","projectId": 28,"syncType": 2,"readerConfigId": 130,"readerParam": {"table": "test_test"},"writerConfigId": 130,"writerParam": {"table": "test_test_1","rowkeyColumns": ["f:id","f:name"]},"columnMap": [{"reader": "f:id","writer": "f:id"},{"reader": "f:name","writer": "f:name"}]
}

5.7 创建同步任务配置-mysql->hbase

{"name": "测试同步任务配置-mysql-hbase-1","description": "测试同步任务配置mysql-hbase-1","projectName": "test测试1","projectId": 28,"syncType": 2,"readerConfigId": 1,"readerParam": {"table": "test_test"},"writerConfigId": 130,"writerParam": {"table": "test_test_1","rowkeyColumns": ["f:id","f:name"]},"columnMap": [{"reader": "id","writer": "f:id"},{"reader": "name","writer": "f:name"}]
}

5.8 创建同步任务配置-hbase->mysql

{"name": "测试同步任务配置-hbase-mysql-1","description": "测试同步任务配置-hbase-mysql-1","projectName": "test测试1","projectId": 28,"syncType": 2,"readerConfigId": 130,"readerParam": {"table": "test_test","rowkeyColumns": ["f:id","f:name"]},"writerConfigId": 1,"writerParam": {"table": "test_test_1"},"columnMap": [{"reader": "f:id","writer": "id"},{"reader": "f:name","writer": "name"}]
}

5.3 更新同步任务配置

{"id": 82,"name": "测试同步任务配置-mysql-3","description": "测试同步任务配置-mysql-3","projectName": "数据同步任务","projectId": 19,"syncType": 2,"readerConfigId": 1,"readerParam": {"table": "test_test"},"writerConfigId": 1,"writerParam": {"table": "test_test_1"},"columnMap": [{"reader": "id","writer": "id"},{"reader": "name","writer": "name"}]
}

5.5 删除同步任务配置

5.4 查询同步任务配置

3.3 执行数据同步任务

3.4 停止数据同步任务

三、本人相关其他文章链接

1.springboot项目集成dolphinscheduler调度器 可拖拽spark任务管理:
https://blog.csdn.net/a924382407/article/details/117119831

2.springboot项目集成dolphinscheduler调度器 实现datax数据同步任务:
https://blog.csdn.net/a924382407/article/details/120951230

3.springboot项目集成dolphinscheduler调度器 项目管理:
https://blog.csdn.net/a924382407/article/details/117118931

4.springboot项目集成大数据第三方dolphinscheduler调度器 执行/停止任务
https://blog.csdn.net/a924382407/article/details/117121181

5.springboot项目集成大数据第三方dolphinscheduler调度器
https://blog.csdn.net/a924382407/article/details/117113848

springboot项目集成dolphinscheduler调度器 实现datax数据同步任务相关推荐

  1. springboot项目集成大数据第三方dolphinscheduler调度器

    文章目录 摘要 项目背景 功能要求 功能说明 1.1用例图 1.2业务流程分析 1.3业务ER图 1.4 管理任务流程图 1.5功能设计详细说明点 1.6页面原型 三.本人相关其他文章链接 摘要 ①d ...

  2. 一、springboot项目集成大众点评cat

    一. 什么是CAT 1.cat简介 Cat是基于Java开发的实时应用监控平台,为美团点评提供了全面的实时监控告警服务 • CAT作为服务端项目基础组件,提供了java, c/c++, node, p ...

  3. springboot项目集成log4j2打成jar包 ,引入外边的log4j2文件,运行日志无法打印

    springboot项目集成log4j2打成jar包 ,引入外边的log4j2文件,运行日志无法打印 问题描述 因为项目中已经有log4j2文件,所以打的jar中也有,在application.yml ...

  4. springboot项目集成docker

    文章目录 一.docker常用命令 0.拉取镜像到本地仓库 1.查看所有镜像 2.创建一个新的容器并运行,返回的是容器的ID -- CONTAINER ID: 3.查看运行中的docker实例 4.查 ...

  5. springboot项目配置视图解析器无效的问题

    springboot项目配置视图解析器无效的问题 今天springboot尝试配置视图解析器的时候,如图: 一切正常,视图解析器却始终无效.后面发现问题. 在控制器的注解要使用:@Controller ...

  6. Springboot项目集成Minio文件服务器(下)

    Springboot项目集成Minio文件服务器(下) 1.配置依赖 在pom文件里面配置Minio的相关依赖. <!--添加minio的依赖--><dependency>&l ...

  7. 在SpringBoot项目中整合拦截器

    拦截器在Web系统中非常常见,对于某些全局统一的操作,我们可以把它提取到拦截器中实现.总结起来,拦截器大致有以下几种使用场景: 1.权限检查:如登录检测,进入处理程序检测用户是否登录,如果没有,则直接 ...

  8. 网络云存储技术Windows server 2012 (项目十五 存储服务间的数据同步)

    网络云存储技术Windows server 2012 (项目十五 存储服务间的数据同步) 目录 前言 一.项目背景 二. 项目实训题 前言 网络存储技术,是以互联网为载体实现数据的传输与存储,它采用面 ...

  9. datax数据同步问题(mysql2hive)汇总

    文章摘要: 1.代码 2.搭建spark 3.使用datax 4.常见问题 5.指正补充 前言: git代码,有需要的可以参考 ![GitHub contributors](https://img.s ...

最新文章

  1. 在Ubuntu kylin 14 64位上flashplayer 插件
  2. 《动手玩转Arduino》——2.4 展望
  3. LeetCode:Count Primes
  4. java中的locksupport_java中线程的停止以及LockSupport工具类
  5. R 升级到 4 之后的悲剧
  6. 数据挖掘之自然语言处理
  7. McAfee Agent漏洞可导致黑客以Windows 系统权限运行代码
  8. Stage3D 入门资源汇总
  9. 程序阅读:简单C++学生信息管理系统
  10. matlab将函数展开成幂级数,解析函数展开成幂级数的方法分析.doc
  11. Java算法与数据结构、设计模式、高并发视频教程免费下载
  12. 无源晶振(Crystal)的负载电容
  13. 在oracle里面查询视图,oracle查询所有视图
  14. box-shadow兼容IE8浏览器写法
  15. 导出DNS服务器上的记录
  16. 轴承特征频率计算公式
  17. C语言程序课程设计任务书
  18. 仿QQ好友列表,QListWidget!
  19. realme怎么互传_Realme X刷MIUI11系统后通过普通电脑实现“小米互传”攻略
  20. 通过iptable进行流量转发

热门文章

  1. 07 python 要点 (正则化)
  2. 从基础到进阶,100道测试开发面试题,进大厂涨薪必备
  3. 新浪微博开发之三十五(微博frame)
  4. 你家乡的美食和特色小吃有哪些?
  5. Webots教程(根据官网教程)
  6. 局域网arp攻击_谈谈电子欺骗中的ARP欺骗
  7. Adaptec by PMC 联合希捷公司现场演示高性能端到端12Gb/s SAS存储解决方案
  8. PHP curl 获取PHPSESSID,PHP curl_init() 方法伪造HTTP头信息 (采集用)
  9. 关于Debug版正常运行,release版运行崩溃的问题
  10. STM32F103RC串口发送数据1使LED闪烁---串口助手作为主机发送数据1,单片机作为从机接收