From db391d7519895843e07999df9286b89e9a77fc78 Mon Sep 17 00:00:00 2001 From: panbaolin Date: Fri, 22 May 2026 11:54:06 +0800 Subject: [PATCH] =?UTF-8?q?fix:1.=E6=A2=B3=E7=90=86=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E5=8A=9F=E8=83=BD=EF=BC=8C=E5=8E=BB=E9=99=A4?= =?UTF-8?q?=E6=97=A0=E7=94=A8=E7=9A=84=E5=90=8C=E6=AD=A5=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=202.=E4=BF=AE=E6=94=B9=E4=BB=BB=E5=8A=A1=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E5=8F=8A=E7=AD=96=E7=95=A5=E9=85=8D=E7=BD=AE=E7=AE=A1=E7=90=86?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=B7=BB=E5=8A=A0=E4=BA=8B=E5=8A=A1?= =?UTF-8?q?=EF=BC=8C=E9=98=B2=E6=AD=A2=E6=95=B0=E6=8D=AE=E6=B1=A1=E6=9F=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constant/enums/SourceDataTypeEnum.java | 4 +- .../java/org/jeecg/common/util/DBUtil.java | 37 +++ .../jeecg/modules/base/entity/QuartzJob.java | 2 - .../modules/base/entity/StasSyncLog.java | 2 + .../modules/base/entity/StasSyncNum.java | 2 + .../controller/StasDataSourceController.java | 22 -- .../impl/StasDataSourceServiceImpl.java | 35 ++- .../service/impl/QuartzJobServiceImpl.java | 1 - .../StasSyncStrategyController.java | 59 +---- .../service/IStasSyncStrategyService.java | 27 ++- .../impl/StasSyncStrategyServiceImpl.java | 101 +++++---- .../controller/StasTaskConfigController.java | 87 +------ .../org/jeecg/taskConfig/job/SyncDataJob.java | 214 ++++-------------- .../service/IStasTaskConfigService.java | 21 +- .../impl/StasTaskConfigServiceImpl.java | 96 +++++--- 15 files changed, 273 insertions(+), 437 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/SourceDataTypeEnum.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/SourceDataTypeEnum.java index 42c12c1..cfe0893 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/SourceDataTypeEnum.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/SourceDataTypeEnum.java @@ -5,9 +5,9 @@ public enum SourceDataTypeEnum { ORACLE(0, "ORACLE"), POSTGRES(1, "POSTGRES"); - private Integer key; + private final Integer key; - private String value; + private final String value; SourceDataTypeEnum(Integer key, String value) { this.key = key; diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java index 704f867..aed4cd1 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java @@ -1,5 +1,7 @@ package org.jeecg.common.util; +import org.jeecg.common.constant.enums.SourceDataTypeEnum; + import java.sql.*; import java.util.Properties; @@ -18,6 +20,41 @@ public class DBUtil { return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId); } + /** + * 获取数据库连接(自定义超时) + * @param url JDBC连接URL + * @param username 用户名 + * @param password 密码 + * @param type 0=PostgreSQL, 1=Oracle + */ + public static Connection getConnection(String url, String username, String password, + int type) throws SQLException{ + try { + Properties props = new Properties(); + props.put("user", username); + props.put("password", password); + + if (SourceDataTypeEnum.POSTGRES.getKey().equals(type)) { + // PostgreSQL + Class.forName(POSTGRES_DRIVER); + + props.put("loginTimeout", 30); + props.put("socketTimeout", 300000); + props.put("tcpKeepAlive", "true"); + } else { + // Oracle + Class.forName(ORACLE_DRIVER); + + props.put("oracle.net.CONNECT_TIMEOUT",30000); + props.put("oracle.jdbc.ReadTimeout",300000); + } + return DriverManager.getConnection(url, props); + } catch(ClassNotFoundException e){ + e.printStackTrace() ; + } + return null; + } + public static Connection getConnection(String url, String username, String password, String Driver,int type) throws SQLException{ try { Class.forName(Driver); diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/QuartzJob.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/QuartzJob.java index 3b92bed..e2489da 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/QuartzJob.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/QuartzJob.java @@ -33,8 +33,6 @@ public class QuartzJob implements Serializable { @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") private java.util.Date createTime; - /**删除状态*/ - private Integer delFlag; /**修改人*/ private String updateBy; /**修改时间*/ diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java index 6592455..e69204d 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java @@ -29,6 +29,8 @@ public class StasSyncLog implements Serializable { /**主键*/ @TableId(type = IdType.ASSIGN_ID) private String id; + /**任务id*/ + private String taskId; /**记录id*/ private String recordId; /**开始时间*/ diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncNum.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncNum.java index 97dfeef..01fc059 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncNum.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncNum.java @@ -25,6 +25,8 @@ public class StasSyncNum implements Serializable { /**主键*/ @TableId(type = IdType.ASSIGN_ID) private String id; + /**任务id*/ + private String taskId; /**记录id*/ private String recordId; /**同步表名*/ diff --git a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java index 8ab9e21..46967ab 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java @@ -7,7 +7,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; -import jakarta.annotation.Resource; import jakarta.servlet.http.HttpServletRequest; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; @@ -15,12 +14,9 @@ import org.jeecg.common.api.vo.Result; import org.jeecg.common.aspect.annotation.AutoLog; import org.jeecg.common.system.base.controller.JeecgController; import org.jeecg.common.system.query.QueryGenerator; -import org.jeecg.common.util.oConvertUtils; import org.jeecg.modules.base.entity.StasDataSource; import org.jeecg.dataSource.service.IStasDataSourceService; -import org.jeecg.modules.base.entity.StasSyncStrategy; import org.jeecg.modules.base.entity.StasTaskConfig; -import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.taskConfig.service.IStasTaskConfigService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @@ -224,24 +220,6 @@ public class StasDataSourceController extends JeecgController deleteBatch(@RequestParam(name="ids",required=true) String ids) { - Result result = new Result(); - if(oConvertUtils.isEmpty(ids)) { - result.error500("未选中数据源!"); - }else { - stasDataSourceService.deleteBatchDataSource(ids.split(",")); - result.success("删除数据源成功!"); - } - return result; - } - /** * 通过id查询 * @param id diff --git a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java index 0fb6357..4e61670 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java @@ -1,17 +1,19 @@ package org.jeecg.dataSource.service.impl; +import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; -import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.enums.SourceDataTypeEnum; import org.jeecg.common.util.DBUtil; import org.jeecg.modules.base.entity.StasDataSource; +import org.jeecg.modules.base.entity.StasTaskConfig; import org.jeecg.modules.base.mapper.StasDataSourceMapper; import org.jeecg.dataSource.service.IStasDataSourceService; +import org.jeecg.taskConfig.service.IStasTaskConfigService; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; - import java.sql.*; import java.util.*; import java.util.Date; @@ -25,8 +27,11 @@ import java.util.Date; * @since 2020-12-12 */ @Service +@RequiredArgsConstructor public class StasDataSourceServiceImpl extends ServiceImpl implements IStasDataSourceService { + private final IStasTaskConfigService stasTaskConfigService; + @Override @Transactional(rollbackFor = Exception.class) public boolean saveDataSource(StasDataSource dataSource) { @@ -36,15 +41,24 @@ public class StasDataSourceServiceImpl extends ServiceImpl 0){ throw new RuntimeException("数据库名称已存在!"); } - dataSource.setDelFlag(CommonConstant.DEL_FLAG_0); dataSource.setCreateTime(new Date()); -// dataSource.setCreateBy(setCreateBy) return this.save(dataSource); } @Override @Transactional(rollbackFor = Exception.class) public boolean deleteDataSource(String dataSourceId) { + //此数据源被任务配置作为源数据库使用 + LambdaQueryWrapper useSrcQueryWrapper = new LambdaQueryWrapper<>(); + useSrcQueryWrapper.eq(StasTaskConfig::getSourceId, dataSourceId); + List useSrcTasks = stasTaskConfigService.list(useSrcQueryWrapper); + //此数据源被任务配置作为目标数据库使用 + LambdaQueryWrapper useTargetQueryWrapper = new LambdaQueryWrapper<>(); + useTargetQueryWrapper.eq(StasTaskConfig::getTargetId, dataSourceId); + List useTargetTasks = stasTaskConfigService.list(useTargetQueryWrapper); + if (CollUtil.isNotEmpty(useSrcTasks) || CollUtil.isNotEmpty(useTargetTasks)){ + throw new RuntimeException("此数据源已被任务配置数据所占用,请先清除任务配置数据"); + } this.removeById(dataSourceId); return true; } @@ -99,8 +113,11 @@ public class StasDataSourceServiceImpl extends ServiceImpl queryTableList(String sourceId, String username) { StasDataSource stasDataSource = this.baseMapper.selectById(sourceId); - if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){ - String sql = "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?"; + if(Objects.equals(SourceDataTypeEnum.ORACLE.getKey(), stasDataSource.getType())){ + // 如果配置了DBLink,则在元数据视图后追加@dblink以查询远程数据库的所有表 + String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink()) + ? stasDataSource.getDbLink().trim() : ""; + String sql = "SELECT TABLE_NAME FROM ALL_TABLES" + dbLinkSuffix + " WHERE OWNER = ?"; return queryDatabaseMetadata(stasDataSource, sql, "TABLE_NAME", username); } else { String sql = "SELECT tablename AS table_name FROM pg_tables WHERE schemaname = ?"; @@ -111,8 +128,10 @@ public class StasDataSourceServiceImpl extends ServiceImpl queryColumnList(String sourceId, String username, String tableName) { StasDataSource stasDataSource = this.baseMapper.selectById(sourceId); - if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){ - String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = ? AND TABLE_NAME = ?"; + if(Objects.equals(SourceDataTypeEnum.ORACLE.getKey(), stasDataSource.getType())){ + String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink()) + ? stasDataSource.getDbLink().trim() : ""; + String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS" + dbLinkSuffix + " WHERE OWNER = ? AND TABLE_NAME = ?"; return queryDatabaseMetadata(stasDataSource, sql, "COLUMN_NAME", username, tableName); } else { String sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ?"; diff --git a/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java index e7c6800..3d47d69 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java @@ -41,7 +41,6 @@ public class QuartzJobServiceImpl extends ServiceImpl> queryPageList(StasSyncStrategy stasSyncStrategy, @@ -85,7 +76,7 @@ public class StasSyncStrategyController extends JeecgController add(@RequestBody StasSyncStrategy stasSyncStrategy) { - stasSyncStrategyService.save(stasSyncStrategy); + stasSyncStrategyService.add(stasSyncStrategy); return Result.OK("添加成功!"); } @@ -99,7 +90,7 @@ public class StasSyncStrategyController extends JeecgController edit(@RequestBody StasSyncStrategy stasSyncStrategy) { - stasSyncStrategyService.updateById(stasSyncStrategy); + stasSyncStrategyService.edit(stasSyncStrategy); return Result.OK("编辑成功!"); } @@ -113,31 +104,17 @@ public class StasSyncStrategyController extends JeecgController delete(@RequestParam(name="id",required=true) String id) { - stasSyncStrategyService.removeById(id); + stasSyncStrategyService.delete(id); return Result.OK("删除成功!"); } - - /** - * 批量删除 - * - * @param ids - * @return - */ - @AutoLog(value = "同步策略表-批量删除") - @Operation(summary="同步策略表-批量删除") - @DeleteMapping(value = "/deleteBatch") - public Result deleteBatch(@RequestParam(name="ids",required=true) String ids) { - this.stasSyncStrategyService.removeByIds(Arrays.asList(ids.split(","))); - return Result.OK("批量删除成功!"); - } - + /** * 通过id查询 * * @param id * @return */ - //@AutoLog(value = "同步策略表-通过id查询") + @AutoLog(value = "同步策略表-通过id查询") @Operation(summary="同步策略表-通过id查询") @GetMapping(value = "/queryById") public Result queryById(@RequestParam(name="id",required=true) String id) { @@ -147,28 +124,4 @@ public class StasSyncStrategyController extends JeecgController importExcel(HttpServletRequest request, HttpServletResponse response) { - return super.importExcel(request, response, StasSyncStrategy.class); - } - } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/IStasSyncStrategyService.java b/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/IStasSyncStrategyService.java index 461d9c4..a2653fa 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/IStasSyncStrategyService.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/IStasSyncStrategyService.java @@ -2,7 +2,6 @@ package org.jeecg.stasSyncStrategy.service; import org.jeecg.modules.base.entity.StasSyncStrategy; import com.baomidou.mybatisplus.extension.service.IService; - import java.sql.SQLException; import java.util.List; @@ -14,18 +13,28 @@ import java.util.List; */ public interface IStasSyncStrategyService extends IService { - /** - * 验证表是否存在且包含指定字段 - * @param stasSyncStrategy 同步策略信息 - * @return 是否有效 - * @throws SQLException 数据库异常 - */ - boolean validateTables(StasSyncStrategy stasSyncStrategy); - /** * 在目标库创建表结构 * @param stasSyncStrategys 同步策略信息 * @throws SQLException 数据库异常 */ void createTargetTables(List stasSyncStrategys); + + /** + * 保存策略 + * @param stasSyncStrategy + */ + void add(StasSyncStrategy stasSyncStrategy); + + /** + * 修改策略 + * @param stasSyncStrategy + */ + void edit(StasSyncStrategy stasSyncStrategy); + + /** + * 删除策略 + * @param id + */ + void delete(String id); } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java index 515eed1..efa972c 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/stasSyncStrategy/service/impl/StasSyncStrategyServiceImpl.java @@ -1,5 +1,6 @@ package org.jeecg.stasSyncStrategy.service.impl; +import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import org.jeecg.common.constant.enums.SourceDataTypeEnum; @@ -35,38 +36,6 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl add(@RequestBody StasTaskConfig stasTaskConfig) { - stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey()); - stasTaskConfigService.save(stasTaskConfig); - stasTaskConfigService.saveQuartzJob(stasTaskConfig); + stasTaskConfigService.add(stasTaskConfig); return Result.OK("添加成功!"); } /** - * 编辑 - * + *编辑 * @param stasTaskConfig * @return */ @AutoLog(value = "任务配置表-编辑") @Operation(summary="任务配置表-编辑") - @RequestMapping(value = "/edit", method = {RequestMethod.PUT,RequestMethod.POST}) + @PutMapping(value = "/edit") public Result edit(@RequestBody StasTaskConfig stasTaskConfig) { - stasTaskConfigService.updateById(stasTaskConfig); - stasTaskConfigService.editQuartzJob(stasTaskConfig); + stasTaskConfigService.edit(stasTaskConfig); return Result.OK("编辑成功!"); } @@ -149,10 +133,7 @@ public class StasTaskConfigController extends JeecgController delete(@RequestParam(name="id",required=true) String id) { - StasTaskConfig byId = stasTaskConfigService.getById(id); - stasTaskConfigService.delQuartzJob(byId.getQuartzId()); - stasSyncStrategyService.remove(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId,id)); - stasTaskConfigService.removeById(id); + stasTaskConfigService.delete(id); return Result.OK("删除成功!"); } @@ -166,16 +147,12 @@ public class StasTaskConfigController extends JeecgController pause(@RequestParam(name="taskId",required=true) String taskId) { - StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId); - stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey()); - stasTaskConfigService.updateById(stasTaskConfig); - stasTaskConfigService.pauseQuartzJob(stasTaskConfig.getQuartzId()); + stasTaskConfigService.pause(taskId); return Result.OK("暂停成功!"); } /** - * 启动定时任务 - * + *启动定时任务 * @param taskId * @return */ @@ -183,16 +160,12 @@ public class StasTaskConfigController extends JeecgController resume(@RequestParam(name="taskId",required=true) String taskId) { - StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId); - stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.IN_OPERATION.getKey()); - stasTaskConfigService.updateById(stasTaskConfig); - stasTaskConfigService.resumeQuartzJob(stasTaskConfig.getQuartzId()); + stasTaskConfigService.resume(taskId); return Result.OK("启动成功!"); } /** - * 立即执行定时任务 - * + *立即执行定时任务 * @param taskId * @return */ @@ -217,21 +190,7 @@ public class StasTaskConfigController extends JeecgController deleteBatch(@RequestParam(name="ids",required=true) String ids) { - this.stasTaskConfigService.removeByIds(Arrays.asList(ids.split(","))); - return Result.OK("批量删除成功!"); - } - + /** * 通过id查询 * @@ -248,28 +207,4 @@ public class StasTaskConfigController extends JeecgController importExcel(HttpServletRequest request, HttpServletResponse response) { - return super.importExcel(request, response, StasTaskConfig.class); - } - } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java index edcfaee..55259f5 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java @@ -1,7 +1,10 @@ package org.jeecg.taskConfig.job; +import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.Resource; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jeecg.common.constant.enums.SourceDataTypeEnum; @@ -9,15 +12,10 @@ import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.util.DBUtil; import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.mapper.*; -import org.jeecg.vo.DateRangeVO; import org.jeecg.vo.IdRangeVO; import org.quartz.Job; import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; - import java.sql.*; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; @@ -40,14 +38,15 @@ public class SyncDataJob implements Job { @Resource private StasSyncNumMapper stasSyncNumMapper; + @Getter @Setter private String parameter; @Override - public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { + public void execute(JobExecutionContext jobExecutionContext) { try { syncDataByFieldType(parameter); } catch (SQLException e) { - throw new JeecgBootException(e.getMessage()); + log.error("执行数据同步任务出现错误,原因为:{}",e.getMessage()); } } @@ -75,8 +74,8 @@ public class SyncDataJob implements Job { targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); } - try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword()); - Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) { + try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),sourceInfo.getType()); + Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),targetInfo.getType())) { // 设置目标连接为批量提交模式 targetConn.setAutoCommit(false); @@ -84,24 +83,21 @@ public class SyncDataJob implements Job { List stasSyncStrategies = stasSyncStrategyMapper .selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); + //处理DBlink + String sourceDBLink = StrUtil.isNotBlank(sourceInfo.getDbLink())?sourceInfo.getDbLink():""; + String targetDBLink = StrUtil.isNotBlank(targetInfo.getDbLink())?targetInfo.getDbLink():""; for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName())); long startTime = System.currentTimeMillis(); try { - if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) { - syncByDateRange(sourceConn, targetConn, stasSyncStrategy, - stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType(), recordId); - } else { - syncByIdRange(sourceConn, targetConn, stasSyncStrategy, - stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId); - } - } catch (SQLException | ParseException e) { + syncByIdRange(sourceConn, targetConn, stasSyncStrategy, + stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId,sourceDBLink,targetDBLink); + } catch (SQLException e) { saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage())); targetConn.rollback(); - throw new JeecgBootException(e.getMessage()); + throw new SQLException(e.getMessage()); } - long endTime = System.currentTimeMillis(); saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime))); } @@ -110,99 +106,15 @@ public class SyncDataJob implements Job { stasSyncRecordMapper.updateById(stasSyncRecord); } - /** - * 判断字段是否为日期类型 - */ - public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { - String sql; - if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { - sql = "SELECT data_type FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?"; - } else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) { - sql = "SELECT data_type FROM information_schema.columns " + - "WHERE table_schema = ? AND table_name = ? AND column_name = ?"; - } else { - throw new SQLException("不支持的数据库类型: " + dbType); - } - - try (PreparedStatement pstmt = conn.prepareStatement(sql)) { - pstmt.setString(1, stasSyncStrategy.getSourceOwner().toUpperCase()); - pstmt.setString(2, stasSyncStrategy.getTableName().toUpperCase()); - pstmt.setString(3, stasSyncStrategy.getColumnName().toUpperCase()); - - ResultSet rs = pstmt.executeQuery(); - if (rs.next()) { - String dataType = rs.getString("data_type").toUpperCase(); - // 判断是否为日期/时间类型 - return dataType.contains("DATE") || dataType.contains("TIME"); - } - } - return false; - } - - /** - * 按日期范围同步数据 - */ - public void syncByDateRange(Connection sourceConn, Connection targetConn, - StasSyncStrategy stasSyncStrategy, Integer syncCount, - Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException, ParseException { - // 获取最小和最大日期 - DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType); - - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - // 获取上次同步的位置 - String syncOrigin = stasSyncStrategy.getSyncOrigin(); - Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate(); - Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ? - lastSyncedDate : dateRange.getMinDate(); - - Date currentEnd = addDays(currentStart, syncCount); - if (currentEnd.after(dateRange.getMaxDate())) { - currentEnd = dateRange.getMaxDate(); - StringBuilder whereClause = new StringBuilder(); - whereClause.append("TO_CHAR(") - .append(stasSyncStrategy.getColumnName()) - .append(", 'YYYY-MM-DD HH24:MI:SS') = '") - .append(sdf.format(currentEnd)) - .append("'"); - deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString()); - } - - // 根据数据库类型构建不同的日期条件 - String whereClause; - if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { - whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) + - "', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')"; - } else { - whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TIMESTAMP '" + sdf.format(currentStart) + - "' AND TIMESTAMP '" + sdf.format(currentEnd) + "'"; - } - - saveSyncLog(recordId,String.format("%s表同步日期范围: %s 至 %s", stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd))); - - int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), - stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), - whereClause, sourceDbType, targetDbType); - - saveSyncLog(recordId,String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced)); - saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced); - - // 更新同步位置 - if (currentEnd != null) { - stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd)); - stasSyncStrategyMapper.updateById(stasSyncStrategy); - saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd))); - } - } - /** * 按ID范围同步数据 */ public void syncByIdRange(Connection sourceConn, Connection targetConn, StasSyncStrategy stasSyncStrategy, Integer syncCount, - Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException { + Integer sourceDbType, Integer targetDbType, String recordId, + String sourceDbLink, String targetDbLink) throws SQLException { // 获取最小和最大ID - IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType); + IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType,sourceDbLink); // 获取上次同步的位置 String syncOrigin = stasSyncStrategy.getSyncOrigin(); @@ -211,22 +123,12 @@ public class SyncDataJob implements Job { long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ? lastSyncedId + 1 : idRange.getMinId(); - long currentEnd = currentStart + syncCount - 1; - if (currentEnd > idRange.getMaxId()) { - currentEnd = idRange.getMaxId(); - StringBuilder whereClause = new StringBuilder(); - whereClause.append(stasSyncStrategy.getColumnName()) - .append(" = '") - .append(currentEnd) - .append("'"); - deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString()); - } - // 确保起始位置不超过结束位置 - if (currentStart > currentEnd) { + if (currentStart > idRange.getMaxId()) { saveSyncLog(recordId, String.format("%s表已同步到最新,无需继续同步", stasSyncStrategy.getTableName())); return; } + long currentEnd = currentStart + syncCount - 1; String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd; @@ -234,7 +136,7 @@ public class SyncDataJob implements Job { int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), - whereClause, sourceDbType, targetDbType); + whereClause, sourceDbType, targetDbType,sourceDbLink,targetDbLink); saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced)); saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced); @@ -247,58 +149,15 @@ public class SyncDataJob implements Job { } } - /** - * 根据日期等于或ID等于删除数据(使用yyyy-MM-dd格式比较) - */ - public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException { - String sql; - // 构建完整的SQL语句 - if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { - sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" + - stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; - } else { - sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toLowerCase() + "\".\"" + - stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; - } - - try (PreparedStatement pstmt = conn.prepareStatement(sql)) { - return pstmt.executeUpdate(); - } - } - - /** - * 获取表的日期范围 - */ - public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { - String sql; - if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { - sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + - "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + - "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; - } else { - sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + - "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + - "FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; - } - - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - if (rs.next()) { - return new DateRangeVO(rs.getTimestamp("min_date"), rs.getTimestamp("max_date")); - } - } - throw new SQLException("无法获取日期范围"); - } - /** * 获取表的ID范围 */ - public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { + public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType,String sourceDbLink) throws SQLException { String sql; if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + - "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + sourceDbLink; } else { sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + @@ -320,14 +179,15 @@ public class SyncDataJob implements Job { public int syncDataBatch(Connection sourceConn, Connection targetConn, String sourceOwner, String targetOwner, String tableName, String whereClause, - Integer sourceDbType, Integer targetDbType) throws SQLException { + Integer sourceDbType, Integer targetDbType, + String sourceDbLink, String targetDbLink) throws SQLException { int totalRows = 0; String selectSql; // 构建查询SQL(根据源数据库类型) if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + - "\" WHERE " + whereClause; + "\"" + sourceDbLink + " WHERE " + whereClause; } else { selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName + "\" WHERE " + whereClause; @@ -344,7 +204,7 @@ public class SyncDataJob implements Job { // 构建插入SQL(根据目标数据库类型) String insertSql; if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) { - insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES ("; + insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\"" + targetDbLink + " VALUES ("; } else { insertSql = "INSERT INTO \"" + tableName + "\" VALUES ("; } @@ -395,7 +255,11 @@ public class SyncDataJob implements Job { return totalRows; } - + /** + * 保存同步日志 + * @param recordId + * @param desc + */ private void saveSyncLog(String recordId, String desc){ StasSyncLog stasSyncLog = new StasSyncLog(); stasSyncLog.setRecordId(recordId); @@ -404,6 +268,12 @@ public class SyncDataJob implements Job { stasSyncLogMapper.insert(stasSyncLog); } + /** + * 保存同步记录 + * @param recordId + * @param tableName + * @param num + */ private void saveSyncNum(String recordId, String tableName, Integer num){ StasSyncNum stasSyncNum = new StasSyncNum(); stasSyncNum.setRecordId(recordId); @@ -411,12 +281,4 @@ public class SyncDataJob implements Job { stasSyncNum.setSyncNum(num); stasSyncNumMapper.insert(stasSyncNum); } - - /** - * 计算指定日期加上指定天数后的日期 - */ - private Date addDays(Date date, int days) { - long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000; - return new Date(time); - } } \ No newline at end of file diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java index f4cddfa..bd1bd17 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java @@ -21,34 +21,31 @@ public interface IStasTaskConfigService extends IService { MindMapVO getMindMap(String taskId); /** - * 添加定时任务 - * @param stasTaskConfig + * 保存任务 */ - void saveQuartzJob(StasTaskConfig stasTaskConfig); + void add(StasTaskConfig stasTaskConfig); /** - * 编辑定时任务 - * @param stasTaskConfig + * 编辑任务 */ - void editQuartzJob(StasTaskConfig stasTaskConfig); + void edit(StasTaskConfig stasTaskConfig); /** * 删除定时任务 - * @param quartzId + * @param id */ - void delQuartzJob(String quartzId); + void delete(String id); /** * 暂停定时任务 - * @param quartzId */ - void pauseQuartzJob(String quartzId); + void pause(String taskId); /** * 启动定时任务 - * @param quartzId + * @param taskId */ - void resumeQuartzJob(String quartzId); + void resume(String taskId); /** * 立即执行定时任务 diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java index bb021dd..49baa24 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java @@ -5,21 +5,25 @@ import lombok.RequiredArgsConstructor; import org.apache.commons.lang3.StringUtils; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.QuartzJobConstant; +import org.jeecg.common.constant.enums.SyncTaskStatusEnum; import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.util.DBUtil; -import org.jeecg.modules.base.entity.StasDataSource; +import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.mapper.StasDataSourceMapper; -import org.jeecg.modules.base.entity.StasSyncStrategy; import org.jeecg.modules.base.mapper.StasSyncStrategyMapper; -import org.jeecg.modules.base.entity.StasTaskConfig; import org.jeecg.modules.base.mapper.StasTaskConfigMapper; -import org.jeecg.modules.base.entity.QuartzJob; import org.jeecg.quartz.service.IQuartzJobService; +import org.jeecg.stasSyncStrategy.service.IStasSyncStrategyService; +import org.jeecg.syncLog.service.IStasSyncLogService; +import org.jeecg.syncNum.service.IStasSyncNumService; +import org.jeecg.syncRecord.service.IStasSyncRecordService; import org.jeecg.taskConfig.service.IStasTaskConfigService; import org.jeecg.vo.*; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.springframework.transaction.annotation.Transactional; import java.sql.*; import java.text.ParseException; @@ -43,6 +47,10 @@ public class StasTaskConfigServiceImpl extends ServiceImpl().eq(StasSyncStrategy::getTaskId,id)); + //删除同步记录 + syncRecordService.remove(new LambdaQueryWrapper().eq(StasSyncRecord::getTaskId,id)); + //删除同步数量记录 + stasSyncNumService.remove(new LambdaQueryWrapper().eq(StasSyncNum::getTaskId,id)); + //删除同步任务产生的日志 + stasSyncLogService.remove(new LambdaQueryWrapper().eq(StasSyncLog::getTaskId,id)); + //删除本任务数据 + this.removeById(id); } + /** + * 暂停定时任务 + * @param taskId + */ + @Transactional(rollbackFor = Exception.class) @Override - public void pauseQuartzJob(String quartzId){ - try { - QuartzJob quartzJob = quartzJobService.getById(quartzId); - quartzJobService.pause(quartzJob); - } catch (Exception e) { - throw new JeecgBootException(e.getMessage()); - } + public void pause(String taskId) { + StasTaskConfig stasTaskConfig = this.getById(taskId); + stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey()); + this.updateById(stasTaskConfig); + //暂停定时任务 + QuartzJob quartzJob = quartzJobService.getById(stasTaskConfig.getQuartzId()); + quartzJobService.pause(quartzJob); } + /** + * 启动定时任务 + * @param taskId + */ + @Transactional(rollbackFor = Exception.class) @Override - public void resumeQuartzJob(String quartzId){ - try { - QuartzJob quartzJob = quartzJobService.getById(quartzId); - quartzJobService.resumeJob(quartzJob); - } catch (Exception e) { - throw new JeecgBootException(e.getMessage()); - } + public void resume(String taskId) { + StasTaskConfig stasTaskConfig = this.getById(taskId); + stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.IN_OPERATION.getKey()); + this.updateById(stasTaskConfig); + //启动定时任务 + QuartzJob quartzJob = quartzJobService.getById(stasTaskConfig.getQuartzId()); + quartzJobService.resumeJob(quartzJob); } @Override