From 4134a749fb00f7fe1d02a65294590b38cfa16048 Mon Sep 17 00:00:00 2001 From: panbaolin Date: Thu, 21 May 2026 16:11:08 +0800 Subject: [PATCH] =?UTF-8?q?Revert=20"=E4=BF=AE=E6=94=B9=E5=90=8C=E6=AD=A5?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E5=8A=9F=E8=83=BD"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit ab1dac042623f20a3be38869eefd433f57730b00. --- .../java/org/jeecg/common/util/DBUtil.java | 74 +--- .../modules/base/entity/StasDataSource.java | 12 +- .../modules/base/entity/StasSyncLog.java | 12 - .../modules/base/entity/StasSyncStrategy.java | 18 - .../base/mapper/StasSyncLogMapper.java | 15 - .../base/mapper/xml/StasSyncLogMapper.xml | 34 -- jeecg-module-sync/SYNC_MODULE_CONTEXT.txt | 369 ------------------ .../controller/StasDataSourceController.java | 65 --- .../impl/StasDataSourceServiceImpl.java | 4 +- .../org/jeecg/taskConfig/job/SyncDataJob.java | 254 ++++-------- .../service/SyncAdaptiveStrategy.java | 261 ------------- .../service/TimeoutProbeService.java | 333 ---------------- .../impl/StasTaskConfigServiceImpl.java | 114 +----- .../main/resources/sql/sync_adaptive_ddl.sql | 40 -- 14 files changed, 106 insertions(+), 1499 deletions(-) delete mode 100644 jeecg-module-sync/SYNC_MODULE_CONTEXT.txt delete mode 100644 jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/SyncAdaptiveStrategy.java delete mode 100644 jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/TimeoutProbeService.java delete mode 100644 jeecg-module-sync/src/main/resources/sql/sync_adaptive_ddl.sql diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java index 982ef46..218dc18 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/DBUtil.java @@ -10,15 +10,6 @@ public class DBUtil { public static final String ORACLE_URL_PREFIX = "jdbc:oracle:thin:@"; public static final String POSTGRES_URL_PREFIX = "jdbc:postgresql://"; - /** 默认Oracle连接超时(毫秒) */ - public static final int DEFAULT_ORACLE_CONNECT_TIMEOUT = 300000; - /** 默认Oracle读取超时(毫秒) */ - public static final int DEFAULT_ORACLE_READ_TIMEOUT = 1200000; - /** 默认PostgreSQL连接超时(毫秒) */ - public static final int DEFAULT_PG_CONNECT_TIMEOUT = 300000; - /** 默认PostgreSQL套接字超时(毫秒) */ - public static final int DEFAULT_PG_SOCKET_TIMEOUT = 1200000; - /*public static Connection getConnection(String url, String username, String password, String Driver) throws SQLException{ try { Class.forName(Driver); @@ -43,63 +34,18 @@ public class DBUtil { return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId); } - /** - * 构建PostgreSQL URL(带tcpKeepAlive参数) - */ - public static String getPgUrlWithKeepAlive(String ip, Integer port, String serveId){ - String baseUrl = String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId); - if (baseUrl.contains("?")) { - return baseUrl + "&tcpKeepAlive=true"; - } - return baseUrl + "?tcpKeepAlive=true"; - } - - /** - * 获取数据库连接(兼容旧接口,使用默认超时) - * @param type 0=PostgreSQL, 1=Oracle - */ - public static Connection getConnection(String url, String username, String password, String Driver, int type) throws SQLException{ - if (type == 0) { - return getConnection(url, username, password, Driver, type, - DEFAULT_PG_CONNECT_TIMEOUT, DEFAULT_PG_SOCKET_TIMEOUT); - } else { - return getConnection(url, username, password, Driver, type, - DEFAULT_ORACLE_CONNECT_TIMEOUT, DEFAULT_ORACLE_READ_TIMEOUT); - } - } - - /** - * 获取数据库连接(自定义超时) - * @param url JDBC连接URL - * @param username 用户名 - * @param password 密码 - * @param driver 驱动类名 - * @param type 0=PostgreSQL, 1=Oracle - * @param connectTimeoutMs 连接超时(毫秒),null则使用默认值 - * @param readTimeoutMs 读取超时(毫秒),null则使用默认值 - */ - public static Connection getConnection(String url, String username, String password, - String driver, int type, - Integer connectTimeoutMs, Integer readTimeoutMs) throws SQLException{ + public static Connection getConnection(String url, String username, String password, String Driver,int type) throws SQLException{ try { - Class.forName(driver); + Class.forName(Driver); Properties props = new Properties(); - props.put("user", username); - props.put("password", password); - - if (type == 0) { - // PostgreSQL - int connTimeout = (connectTimeoutMs != null) ? connectTimeoutMs : DEFAULT_PG_CONNECT_TIMEOUT; - int readTimeout = (readTimeoutMs != null) ? readTimeoutMs : DEFAULT_PG_SOCKET_TIMEOUT; - props.put("loginTimeout", String.valueOf(connTimeout / 1000)); - props.put("socketTimeout", String.valueOf(readTimeout / 1000)); - props.put("tcpKeepAlive", "true"); - } else { - // Oracle - int connTimeout = (connectTimeoutMs != null) ? connectTimeoutMs : DEFAULT_ORACLE_CONNECT_TIMEOUT; - int readTimeout = (readTimeoutMs != null) ? readTimeoutMs : DEFAULT_ORACLE_READ_TIMEOUT; - props.put("oracle.net.CONNECT_TIMEOUT", String.valueOf(connTimeout)); - props.put("oracle.jdbc.ReadTimeout", String.valueOf(readTimeout)); + if(type == 0){ + props.put("user", username); + props.put("password", password); + }else{ + props.put("user", username); + props.put("password", password); + props.put("oracle.net.CONNECT_TIMEOUT", "300000"); + props.put("oracle.jdbc.ReadTimeout", "1200000"); } return DriverManager.getConnection(url, props); } catch(ClassNotFoundException e){ diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasDataSource.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasDataSource.java index b46b45b..0436241 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasDataSource.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasDataSource.java @@ -78,15 +78,7 @@ public class StasDataSource implements Serializable { /**描述*/ @Excel(name = "描述", width = 15) private String description; - /**探测到的连接超时(秒)*/ - @Excel(name = "连接超时(秒)", width = 15) - private Integer detectedConnectTimeout; - /**探测到的读取超时(秒)*/ - @Excel(name = "读取超时(秒)", width = 15) - private Integer detectedReadTimeout; - /**上次探测时间*/ - @JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss") - @DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss") - private Date lastProbeTime; + + } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java index c443080..a7b8bf8 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncLog.java @@ -40,16 +40,4 @@ public class StasSyncLog implements Serializable { /**描述*/ @Excel(name = "描述", width = 15) private String description; - /**错误类型(TIMEOUT/TRUNCATION/OTHER/SUCCESS)*/ - @Excel(name = "错误类型", width = 15) - private String errorType; - /**调整前批次大小*/ - @Excel(name = "调整前批次大小", width = 15) - private Integer batchSizeBefore; - /**调整后批次大小*/ - @Excel(name = "调整后批次大小", width = 15) - private Integer batchSizeAfter; - /**第几次重试*/ - @Excel(name = "重试序号", width = 15) - private Integer retryIndex; } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java index a11e1c4..d50d894 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java @@ -63,22 +63,4 @@ public class StasSyncStrategy implements Serializable { /**同步位置*/ @Excel(name = "同步位置", width = 15) private String syncOrigin; - /**当前生效的批次天数(自适应调整后,null时使用taskConfig的syncDay)*/ - @Excel(name = "当前批次天数", width = 15) - private Integer currentSyncDay; - /**当前生效的批次数量(自适应调整后,null时使用taskConfig的syncCount)*/ - @Excel(name = "当前批次数量", width = 15) - private Integer currentSyncCount; - /**最近一次错误类型(TIMEOUT/TRUNCATION/OTHER)*/ - @Excel(name = "错误类型", width = 15) - private String lastErrorType; - /**连续重试次数(成功后清零)*/ - @Excel(name = "重试次数", width = 15) - private Integer retryCount; - /**批次天数最小值下限(默认1)*/ - @Excel(name = "最小批次天数", width = 15) - private Integer minSyncDay; - /**批次数量最小值下限(默认100)*/ - @Excel(name = "最小批次数量", width = 15) - private Integer minSyncCount; } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/StasSyncLogMapper.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/StasSyncLogMapper.java index 775a06c..f4f3dd3 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/StasSyncLogMapper.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/StasSyncLogMapper.java @@ -1,6 +1,5 @@ package org.jeecg.modules.base.mapper; -import org.apache.ibatis.annotations.Param; import org.jeecg.modules.base.entity.StasSyncLog; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -12,18 +11,4 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; */ public interface StasSyncLogMapper extends BaseMapper { - /** - * 统计指定数据源关联的最近N天超时/截断错误次数 - */ - int countTimeoutErrorsByDataSource(@Param("dataSourceId") String dataSourceId, @Param("days") int days); - - /** - * 统计指定数据源关联的最近N天成功同步的平均耗时(毫秒) - */ - Long avgSyncTimeByDataSource(@Param("dataSourceId") String dataSourceId, @Param("days") int days); - - /** - * 统计指定数据源最近连续N条日志中无超时错误的次数 - */ - int countConsecutiveNonTimeoutByDataSource(@Param("dataSourceId") String dataSourceId, @Param("limit") int limit); } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/xml/StasSyncLogMapper.xml b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/xml/StasSyncLogMapper.xml index 51bd4ac..8927f33 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/xml/StasSyncLogMapper.xml +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/xml/StasSyncLogMapper.xml @@ -2,38 +2,4 @@ - - - - - - - - - \ No newline at end of file diff --git a/jeecg-module-sync/SYNC_MODULE_CONTEXT.txt b/jeecg-module-sync/SYNC_MODULE_CONTEXT.txt deleted file mode 100644 index 185aa8c..0000000 --- a/jeecg-module-sync/SYNC_MODULE_CONTEXT.txt +++ /dev/null @@ -1,369 +0,0 @@ -================================================================================ - jeecg-module-sync 模块 — AI辅助代码编辑上下文文件 -================================================================================ - -一、模块概述 --------------------------------------------------------------------------------- -jeecg-module-sync 是源项分析系统(STAS)中的数据同步模块,负责在Oracle和PostgreSQL -数据库之间进行数据同步。支持定时任务调度、增量同步(按日期/ID范围)、同步策略管理、 -同步日志记录和同步统计等功能。 - -父工程: jeecg-boot-parent 3.8.1 -Java版本: 17 -核心依赖: jeecg-boot-base-core(提供实体、Mapper、工具类等基础能力) -框架: Spring Boot + MyBatis-Plus + Quartz + Swagger/OpenAPI - - -二、目录结构与文件清单 --------------------------------------------------------------------------------- -jeecg-module-sync/ -├── pom.xml -└── src/main/java/org/jeecg/ - ├── OracleSync.java # 独立工具类:Oracle-to-Oracle同步(main方法入口) - ├── OracleToPgSync.java # 独立工具类:Oracle-to-PostgreSQL建表同步(main方法入口) - ├── dataSource/ # 数据源管理子模块 - │ ├── controller/StasDataSourceController.java - │ ├── service/IStasDataSourceService.java - │ ├── service/impl/StasDataSourceServiceImpl.java - │ └── vo/TableColumnVO.java - ├── stasSyncStrategy/ # 同步策略管理子模块 - │ ├── controller/StasSyncStrategyController.java - │ ├── service/IStasSyncStrategyService.java - │ └── service/impl/StasSyncStrategyServiceImpl.java - ├── syncLog/ # 同步日志子模块 - │ ├── controller/StasSyncLogController.java - │ ├── service/IStasSyncLogService.java - │ └── service/impl/StasSyncLogServiceImpl.java - ├── syncNum/ # 同步数量统计子模块 - │ ├── controller/StasSyncNumController.java - │ ├── service/IStasSyncNumService.java - │ ├── service/impl/StasSyncNumServiceImpl.java - │ └── vo/PieChartVO.java - ├── syncRecord/ # 同步记录子模块 - │ ├── controller/StasSyncRecordController.java - │ ├── service/IStasSyncRecordService.java - │ ├── service/impl/StasSyncRecordServiceImpl.java - │ └── vo/SyncRecordVO.java, TaskHisStatsVO.java, TaskStatsResultVO.java - ├── taskConfig/ # 任务配置子模块(核心) - │ ├── controller/StasTaskConfigController.java - │ ├── job/SyncDataJob.java # Quartz Job执行类 - │ ├── service/IStasTaskConfigService.java - │ └── service/impl/StasTaskConfigServiceImpl.java - ├── quartz/ # Quartz定时任务管理 - │ ├── service/IQuartzJobService.java - │ └── service/impl/QuartzJobServiceImpl.java - └── vo/ # 模块内VO - ├── DateRangeVO.java - ├── IdRangeVO.java - ├── MindMapVO.java - ├── TableInfoVO.java - └── UserInfoVO.java - - -三、数据库实体(定义在 jeecg-boot-base-core 中) --------------------------------------------------------------------------------- -本模块的实体类统一定义在 jeecg-boot-base-core 的 org.jeecg.modules.base.entity 包中, -由 jeecg-boot-base-core 的 org.jeecg.modules.base.mapper 提供 MyBatis-Plus Mapper 接口。 - -1. StasDataSource(表 stas_data_source)— 数据源配置 - 字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime, - instanceName(实例名), type(数据库类型: 0=Oracle,1=PostgreSQL), - port, serveId(服务名), dbLink, username, password, remark, - delFlag(逻辑删除), ipAddress, description - -2. StasSyncStrategy(表 stas_sync_strategy)— 同步策略 - 字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime, - taskId(关联任务ID), sourceOwner(源用户/Schema), targetOwner(目标用户/Schema), - tableName(同步表名), columnName(依据字段名), syncOrigin(同步位置/断点) - -3. StasTaskConfig(表 stas_task_config)— 任务配置 - 字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime, - taskName(任务名称), quartzId(关联Quartz任务ID), cron(Cron表达式), - sourceId(源库ID), targetId(目标库ID), syncCount(批次数量), syncDay(批次天数), - taskStatus(0=未开始,1=进行中), description, - sourceName(@TableField(exist=false)), targetName(@TableField(exist=false)) - -4. StasSyncLog(表 stas_sync_log)— 同步日志 - 字段: id(ASSIGN_ID), recordId(关联记录ID), startTime, description - -5. StasSyncNum(表 stas_sync_num)— 同步数量 - 字段: id(ASSIGN_ID), recordId(关联记录ID), tableName, syncNum(同步条数) - -6. StasSyncRecord(表 stas_sync_record)— 同步记录 - 字段: id(ASSIGN_ID), taskId, sourceId, targetId, startTime, endTime - -7. QuartzJob(表 sys_quartz_job)— Quartz定时任务 - 字段: id(ASSIGN_ID), createBy, createTime, delFlag, updateBy, updateTime, - jobClassName, cronExpression, parameter, description, status(0=正常,-1=停止) - - -四、核心枚举与常量(定义在 jeecg-boot-base-core 中) --------------------------------------------------------------------------------- -1. SourceDataTypeEnum: - ORACLE(0, "ORACLE"), POSTGRES(1, "POSTGRES") - -2. SyncTaskStatusEnum: - NOT_STARTED(0), IN_OPERATION(1) - -3. QuartzJobConstant: - JOB_CLASS_NAME = "org.jeecg.taskConfig.job.SyncDataJob" - -4. DBUtil 工具类: - - ORACLE_DRIVER = "oracle.jdbc.OracleDriver" - - POSTGRES_DRIVER = "org.postgresql.Driver" - - ORACLE_URL_PREFIX = "jdbc:oracle:thin:@" - - POSTGRES_URL_PREFIX = "jdbc:postgresql://" - - getUrl(ip, port, serveId) → 构建Oracle JDBC URL - - getPgUrl(ip, port, serveId) → 构建PostgreSQL JDBC URL - - getConnection(url, username, password, driver, type) → 获取JDBC连接 - type=0: PostgreSQL(无超时设置), type=1: Oracle(设置CONNECT_TIMEOUT=300s, ReadTimeout=1200s) - - close(conn, stmt, rs) → 释放资源 - - -五、Mapper XML(定义在 jeecg-boot-base-core 中) --------------------------------------------------------------------------------- -StasSyncRecordMapper.xml — 同步记录统计查询: -- taskHistoryStats: 关联 stas_task_config + stas_sync_record + stas_sync_num,查历史同步详情 -- taskStatsDay: 按日统计同步数量(TO_CHAR(start_time,'YYYY-MM-DD')) -- taskStatsMonth: 按月统计同步数量(TO_CHAR(start_time,'YYYY-MM-DD')) - - -六、API接口清单 --------------------------------------------------------------------------------- -1. 数据源管理 — StasDataSourceController (/dataSource) - GET /list — 分页查询数据源(支持instanceName模糊查询) - GET /userList — 查询数据源下所有用户(Oracle: ALL_USERS; PG: information_schema.schemata) - GET /targetUser — 查询目标数据源用户(通过taskId关联查targetId) - GET /tableList — 查询数据源下指定用户的所有表 - GET /fieldList — 查询数据源表字段 - GET /testConnection — 数据库连接测试 - GET /checkInstanceName — 数据库名称唯一性校验 - POST /add — 添加数据源 - PUT /edit — 编辑数据源 - DELETE /delete — 删除数据源(单个) - DELETE /deleteBatch — 批量删除数据源 - GET /queryById — 通过ID查询 - GET /queryall — 查询所有数据源 - -2. 同步策略 — StasSyncStrategyController (/syncStrategy) - GET /list — 分页查询同步策略 - POST /createTargetTables — 在目标库创建表结构并保存策略 - POST /add — 添加同步策略 - PUT/POST /edit — 编辑同步策略 - DELETE /delete — 删除同步策略 - DELETE /deleteBatch — 批量删除同步策略 - GET /queryById — 通过ID查询 - GET /exportXls — 导出Excel - POST /importExcel — 导入Excel - -3. 同步日志 — StasSyncLogController (/stasSyncLog) - GET /list — 分页查询同步日志(按start_time升序) - -4. 同步数量 — StasSyncNumController (/stasSyncNum) - GET /list — 按表名分组统计同步数量(返回PieChartVO饼图数据) - -5. 同步记录 — StasSyncRecordController (/stasSyncRecord) - GET /list — 分页查询同步记录(关联数据源名称,支持时间区间过滤) - GET /taskHisSta — 历史同步数量统计 - GET /taskStatsDay — 按日统计最近一个月同步数量 - GET /taskStatsMonth — 按月统计最近一年同步数量 - -6. 任务配置 — StasTaskConfigController (/taskConfig) - GET /list — 分页查询任务(关联数据源名称) - GET /getMindMap — 数据表思维导图(异步并行查询) - POST /add — 添加任务(含创建Quartz Job) - PUT/POST /edit — 编辑任务(含更新Quartz Job) - DELETE /delete — 删除任务(含删除Quartz Job和关联策略) - GET /pause — 暂停定时任务 - GET /resume — 启动定时任务 - GET /execute — 立即执行定时任务 - GET /syncDataByFieldType — 手动触发同步 - DELETE /deleteBatch — 批量删除任务 - GET /queryById — 通过ID查询 - GET /exportXls — 导出Excel - POST /importExcel — 导入Excel - - -七、核心业务逻辑详解 --------------------------------------------------------------------------------- -7.1 数据同步流程(SyncDataJob / StasTaskConfigServiceImpl.syncDataByFieldType) - -执行链路: - StasTaskConfigController.execute/手动触发 - → StasTaskConfigServiceImpl.syncDataByFieldType(taskId) [Service层版本,仅支持Oracle] - → 或通过 QuartzJobService.execute → SyncDataJob.execute → SyncDataJob.syncDataByFieldType [Job版本,支持Oracle+PG] - -核心步骤: - 1. 根据taskId获取 StasTaskConfig → 获取源/目标 StasDataSource - 2. 创建 StasSyncRecord 记录(记录同步开始时间) - 3. 构建源/目标JDBC连接 - 4. 获取该任务下所有 StasSyncStrategy - 5. 对每个策略: - a. 判断依据字段类型(isDateColumn) - b. 如果是日期类型 → syncByDateRange(按syncDay天数为批次) - c. 如果是ID类型 → syncByIdRange(按syncCount条数为批次) - 6. 更新 StasSyncRecord 结束时间 - -7.2 按日期范围同步(syncByDateRange) - - 获取源表日期范围(MIN/MAX) - - 读取 syncOrigin 作为上次断点 - - 从断点开始,每次同步 syncDay 天 - - 若结束日期超过最大日期,先删除目标库中最后一天的数据(避免重复) - - Oracle条件: BETWEEN TO_DATE('...', 'YYYY-MM-DD HH24:MI:SS') - - PostgreSQL条件: BETWEEN TIMESTAMP '...' - - 同步完成后更新 syncOrigin 为当前结束日期 - -7.3 按ID范围同步(syncByIdRange) - - 获取源表ID范围(MIN/MAX) - - 读取 syncOrigin 作为上次断点(从断点+1开始) - - 每次同步 syncCount 条 - - 若结束ID超过最大ID,先删除目标库中最后一条的数据 - - 同步完成后更新 syncOrigin 为当前结束ID - -7.4 批量数据同步(syncDataBatch) - - 源库 SELECT * FROM "OWNER"."TABLE" WHERE ... - - fetchSize: 5000(优化大表读取) - - 目标库 INSERT INTO ... VALUES (?, ?, ...) - - batchSize: 5000 提交一次 - - 特殊处理: Oracle→PG时,TIMESTAMP/DATE类型使用 setTimestamp 而非 setObject - -7.5 Oracle类型到PostgreSQL映射(mapOracleTypeToPg / convertOracleTypeToPg) - VARCHAR2 → VARCHAR(n), NVARCHAR2 → VARCHAR(n) - CHAR → CHAR(n), NCHAR → CHAR(n) - NUMBER(p,s) → NUMERIC(p,s), NUMBER → NUMERIC - FLOAT → DOUBLE PRECISION - DATE → TIMESTAMP, TIMESTAMP → TIMESTAMP - CLOB → TEXT, BLOB → BYTEA, RAW → BYTEA - LONG → TEXT, LONG RAW → BYTEA - 默认 → TEXT - -7.6 同步策略创建(createTargetTables) - - 先删除同taskId的旧策略 - - 对每个策略: - a. 获取源/目标数据源信息 - b. 检查目标表是否已存在 - c. 若不存在,生成建表DDL(支持Oracle和Oracle→PG两种模式) - d. 在目标库执行建表 - e. 保存策略记录 - -7.7 Quartz任务管理 - - 添加任务: saveAndScheduleJob → 创建QuartzJob,jobClassName固定为SyncDataJob - - 编辑任务: 编辑后重新调度 - - 暂停: schedulerDelete + 更新状态为STATUS_DISABLE - - 恢复: schedulerDelete + 重新schedulerAdd + 更新状态为STATUS_NORMAL - - 立即执行: SimpleTrigger,0.1秒后执行一次 - - 删除: schedulerDelete + removeById - - -八、重要VO类 --------------------------------------------------------------------------------- -- DateRangeVO: minDate, maxDate(日期范围) -- IdRangeVO: minId, maxId(ID范围) -- MindMapVO: databaseName, List(思维导图数据) - - UserInfoVO: userName, List - - TableInfoVO: tableName, columnName -- TableColumnVO: columnName, typeName, columnSize, decimalDigits, isNullable, columnDef -- PieChartVO: name(表名), value(同步数), percent(百分比) -- SyncRecordVO: id, taskName, sourceName, targetName, startTime, endTime -- TaskHisStatsVO: taskName, syncNum -- TaskStatsResultVO: dateTimes(List), taskStatsInfos(List) - - TaskStatsInfo: taskName, taskStats(List) -- TaskStatsVO(base-core): id, taskName, dateTime, syncNum - - -九、代码风格与约定 --------------------------------------------------------------------------------- -1. Controller层: - - 继承 JeecgController - - 使用 @Tag + @Operation 做Swagger文档 - - 使用 @AutoLog 记录操作日志 - - 返回统一 Result 对象 - - 分页查询使用 QueryGenerator.initQueryWrapper + MyBatis-Plus Page - -2. Service层: - - 接口继承 IService - - 实现类继承 ServiceImpl - - 使用 @RequiredArgsConstructor 或 @Autowired 注入 - - 事务注解 @Transactional(rollbackFor = Exception.class) - -3. 实体类: - - 使用 @TableName + @TableId(type = IdType.ASSIGN_ID) - - 日期字段使用 @JsonFormat + @DateTimeFormat - - 非数据库字段标记 @TableField(exist = false) - - 逻辑删除 @TableLogic - -4. 异常处理: - - 业务异常统一抛出 JeecgBootException - - 数据库异常 SQLException 传播或包装 - -5. 数据库区分: - - Oracle: owner/table名使用大写 + 双引号, 条件使用 TO_DATE/TO_CHAR - - PostgreSQL: schema/table名使用小写 + 双引号, 条件使用 TIMESTAMP 字面量 - - -十、已知注意事项与潜在问题 --------------------------------------------------------------------------------- -1. StasTaskConfigServiceImpl.syncDataByFieldType 仅支持Oracle(isDateColumn写死了Oracle SQL), - 而 SyncDataJob.syncDataByFieldType 支持Oracle+PG(通过dbType参数区分)。 - 两个类中存在大量重复的同步逻辑代码。 - -2. StasDataSourceServiceImpl.queryDatabaseMetadata 方法中,PostgreSQL数据源连接时 - 仍使用 ORACLE_DRIVER 而非 POSTGRES_DRIVER,可能存在Bug。 - -3. OracleSync 和 OracleToPgSync 是独立工具类(有main方法),不属于Spring容器管理, - 数据库连接信息硬编码,仅用于一次性迁移脚本。 - -4. SyncDataJob 通过 @Resource 注入Mapper(因Quartz Job不由Spring直接管理实例), - 需要Spring的Quartz集成支持依赖注入。 - -5. StasTaskConfigServiceImpl.getMindMap 使用 CompletableFuture.supplyAsync 但 - 使用默认线程池(ForkJoinPool),在高并发场景可能需要自定义线程池。 - -6. 同步策略的 createTargetTables 方法在事务中执行DDL操作(CREATE TABLE), - 某些数据库驱动可能不支持DDL回滚。 - -7. syncByDateRange 中删除目标库数据的逻辑(deleteByEquals)在边界条件处理上 - 可能存在数据一致性问题(先删后插期间如有并发访问可能看到空数据)。 - - -十一、模块间依赖关系 --------------------------------------------------------------------------------- -jeecg-module-sync 依赖: - └── jeecg-boot-base-core - ├── 实体类: StasDataSource, StasSyncStrategy, StasTaskConfig, StasSyncLog, - │ StasSyncNum, StasSyncRecord, QuartzJob - ├── Mapper: StasDataSourceMapper, StasSyncStrategyMapper, StasTaskConfigMapper, - │ StasSyncLogMapper, StasSyncNumMapper, StasSyncRecordMapper, QuartzJobMapper - ├── VO: TaskStatsVO - ├── 枚举: SourceDataTypeEnum, SyncTaskStatusEnum - ├── 常量: QuartzJobConstant, CommonConstant - ├── 工具: DBUtil, DateUtils - └── 基础: JeecgController, Result, QueryGenerator, JeecgBootException - -外部依赖(通过 jeecg-boot-base-core 间接引入): - - MyBatis-Plus (分页、条件构造、CRUD) - - Quartz Scheduler (定时任务) - - Swagger/OpenAPI (API文档) - - Lombok (代码简化) - - Apache Commons Lang (StringUtils) - - Oracle JDBC Driver / PostgreSQL JDBC Driver - - -十二、SQL数据库表结构参考 --------------------------------------------------------------------------------- -核心表(定义在 db/stas.sql 中): -- stas_data_source: 数据源配置表 -- stas_sync_strategy: 同步策略表 -- stas_task_config: 任务配置表 -- stas_sync_log: 同步日志表 -- stas_sync_num: 同步数量统计表 -- stas_sync_record: 同步记录表 - -Quartz表(定义在 db/stas.sql 或 Quartz默认脚本中): -- sys_quartz_job: 定时任务表 - -================================================================================ - 文件生成时间: 2026-05-09 - 适用于: AI辅助代码编辑上下文参考 -================================================================================ - diff --git a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java index 5552f3c..8ab9e21 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java @@ -22,7 +22,6 @@ import org.jeecg.modules.base.entity.StasSyncStrategy; import org.jeecg.modules.base.entity.StasTaskConfig; import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.taskConfig.service.IStasTaskConfigService; -import org.jeecg.taskConfig.service.TimeoutProbeService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; import java.util.*; @@ -45,8 +44,6 @@ public class StasDataSourceController extends JeecgController> probeTimeout(@RequestParam(name="id", required=true) String id) { - try { - StasDataSource dataSource = stasDataSourceService.getById(id); - if (dataSource == null) { - return Result.error("未找到对应数据源"); - } - timeoutProbeService.probeAndDetectTimeout(dataSource); - // 重新查询获取更新后的值 - dataSource = stasDataSourceService.getById(id); - Map probeResult = new HashMap<>(); - probeResult.put("id", dataSource.getId()); - probeResult.put("instanceName", dataSource.getInstanceName()); - probeResult.put("detectedConnectTimeout", dataSource.getDetectedConnectTimeout()); - probeResult.put("detectedReadTimeout", dataSource.getDetectedReadTimeout()); - probeResult.put("lastProbeTime", dataSource.getLastProbeTime()); - return Result.OK("探测完成", probeResult); - } catch (Exception e) { - log.error("超时探测失败", e); - return Result.error("探测失败: " + e.getMessage()); - } - } - - /** - * 查询数据源当前生效的超时配置 - * @param id 数据源ID - * @return 超时配置信息 - */ - @AutoLog(value = "数据源-查询超时配置") - @Operation(summary = "数据源-查询超时配置") - @GetMapping(value = "/timeoutInfo") - public Result> timeoutInfo(@RequestParam(name="id", required=true) String id) { - try { - StasDataSource dataSource = stasDataSourceService.getById(id); - if (dataSource == null) { - return Result.error("未找到对应数据源"); - } - int[] effectiveTimeout = timeoutProbeService.getEffectiveTimeout(dataSource); - Map info = new HashMap<>(); - info.put("id", dataSource.getId()); - info.put("instanceName", dataSource.getInstanceName()); - info.put("type", dataSource.getType()); - info.put("detectedConnectTimeout", dataSource.getDetectedConnectTimeout()); - info.put("detectedReadTimeout", dataSource.getDetectedReadTimeout()); - info.put("lastProbeTime", dataSource.getLastProbeTime()); - info.put("effectiveConnectTimeoutMs", effectiveTimeout[0]); - info.put("effectiveReadTimeoutMs", effectiveTimeout[1]); - info.put("isProbed", dataSource.getDetectedConnectTimeout() != null); - return Result.OK(info); - } catch (Exception e) { - log.error("查询超时配置失败", e); - return Result.error("查询失败: " + e.getMessage()); - } - } } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java index 4f1f593..0fb6357 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java @@ -143,9 +143,7 @@ public class StasDataSourceServiceImpl extends ServiceImpl resultList = new ArrayList<>(); try (Connection conn = DBUtil.getConnection(urlSource, stasDataSource.getUsername(), - stasDataSource.getPassword(), - SourceDataTypeEnum.POSTGRES.getKey().equals(stasDataSource.getType()) ? DBUtil.POSTGRES_DRIVER : DBUtil.ORACLE_DRIVER, - SourceDataTypeEnum.POSTGRES.getKey().equals(stasDataSource.getType()) ? 0 : 1); + stasDataSource.getPassword(), DBUtil.ORACLE_DRIVER, 1); PreparedStatement st = conn.prepareStatement(sql)) { // 设置参数 diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java index 0cdf2a9..fdfb9ae 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java @@ -9,8 +9,6 @@ import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.util.DBUtil; import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.mapper.*; -import org.jeecg.taskConfig.service.SyncAdaptiveStrategy; -import org.jeecg.taskConfig.service.TimeoutProbeService; import org.jeecg.vo.DateRangeVO; import org.jeecg.vo.IdRangeVO; import org.quartz.Job; @@ -25,7 +23,6 @@ import java.util.List; /** * 数据同步任务(支持Oracle和PostgreSQL) - * 增强功能:超时自动探测、批次自适应调整、重试机制 */ @Slf4j public class SyncDataJob implements Job { @@ -42,10 +39,6 @@ public class SyncDataJob implements Job { private StasSyncLogMapper stasSyncLogMapper; @Resource private StasSyncNumMapper stasSyncNumMapper; - @Resource - private SyncAdaptiveStrategy syncAdaptiveStrategy; - @Resource - private TimeoutProbeService timeoutProbeService; private String parameter; @@ -63,7 +56,7 @@ public class SyncDataJob implements Job { } /** - * 根据字段类型同步数据(增强版:自动探测超时 + 自适应批次 + 重试) + * 根据字段类型同步数据 */ public void syncDataByFieldType(String taskId) throws SQLException { StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId); @@ -78,41 +71,16 @@ public class SyncDataJob implements Job { stasSyncRecordMapper.insert(stasSyncRecord); String recordId = stasSyncRecord.getId(); - // 构建源/目标URL和Driver - String sourceUrl; - String sourceDriver; - int sourceType; - if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType())) { - sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId()); - sourceDriver = DBUtil.ORACLE_DRIVER; - sourceType = 1; - } else { - sourceUrl = DBUtil.getPgUrlWithKeepAlive(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId()); - sourceDriver = DBUtil.POSTGRES_DRIVER; - sourceType = 0; - } - + String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId()); String targetUrl; - String targetDriver; - int targetType; - if (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())) { + if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){ targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); - targetDriver = DBUtil.ORACLE_DRIVER; - targetType = 1; - } else { - targetUrl = DBUtil.getPgUrlWithKeepAlive(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); - targetDriver = DBUtil.POSTGRES_DRIVER; - targetType = 0; + }else{ + targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); } - // 获取自动探测的超时值 - int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo); - int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo); - - try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(), - sourceDriver, sourceType, sourceTimeouts[0], sourceTimeouts[1]); - Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(), - targetDriver, targetType, targetTimeouts[0], targetTimeouts[1])) { + try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword()); + Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) { // 设置目标连接为批量提交模式 targetConn.setAutoCommit(false); @@ -121,81 +89,25 @@ public class SyncDataJob implements Job { .selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { - saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null); + saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName())); long startTime = System.currentTimeMillis(); - boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType()); - // 获取自适应批次大小 - int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay()); - int effectiveSyncCount = syncAdaptiveStrategy.getEffectiveSyncCount(stasSyncStrategy, stasTaskConfig.getSyncCount()); - - boolean syncSuccess = false; - int retryIndex = 0; - - while (!syncSuccess) { - try { - if (isDate) { - syncByDateRange(sourceConn, targetConn, stasSyncStrategy, - effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); - } else { - syncByIdRange(sourceConn, targetConn, stasSyncStrategy, - effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); - } - syncSuccess = true; - // 同步成功,逐步恢复批次 - syncAdaptiveStrategy.onSyncSuccess(stasSyncStrategy, - stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount()); - } catch (SQLException | ParseException e) { - SyncAdaptiveStrategy.ErrorType errorType = (e instanceof SQLException) - ? syncAdaptiveStrategy.analyzeSQLException((SQLException) e) - : SyncAdaptiveStrategy.ErrorType.OTHER; - - retryIndex++; - saveSyncLog(recordId, String.format("同步表 %s 时出错(第%d次): %s [错误类型: %s]", - stasSyncStrategy.getTableName(), retryIndex, e.getMessage(), errorType.name()), - errorType.name(), isDate ? effectiveSyncDay : effectiveSyncCount, retryIndex); - - // 检查超时配置是否需要调整 - syncAdaptiveStrategy.checkAndAdjustTimeout(sourceInfo, errorType); - syncAdaptiveStrategy.checkAndAdjustTimeout(targetInfo, errorType); - - if ((errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT - || errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION) - && syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) { - // 缩减批次大小并重试 - int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate, - stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount()); - effectiveSyncDay = adjusted[0]; - effectiveSyncCount = adjusted[1]; - - saveSyncLog(recordId, String.format("表 %s 批次已调整,准备第%d次重试: syncDay=%d, syncCount=%d", - stasSyncStrategy.getTableName(), retryIndex + 1, effectiveSyncDay, effectiveSyncCount), - errorType.name(), isDate ? effectiveSyncDay : effectiveSyncCount, retryIndex); - - // 尝试回滚并重建连接 - try { targetConn.rollback(); } catch (SQLException ignored) {} - - // 如果连接已关闭,需要重建 - if (sourceConn.isClosed() || targetConn.isClosed()) { - saveSyncLog(recordId, String.format("表 %s 连接已断开,无法重试,跳过该表", stasSyncStrategy.getTableName()), - errorType.name(), null, retryIndex); - break; - } - continue; - } else { - // 非超时/截断错误,或已达到重试上限 - try { targetConn.rollback(); } catch (SQLException ignored) {} - saveSyncLog(recordId, String.format("表 %s 同步失败,跳过该表继续同步其他表: %s", - stasSyncStrategy.getTableName(), e.getMessage()), - errorType.name(), null, retryIndex); - break; // 跳过当前表,继续下一个表 - } + try { + if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) { + syncByDateRange(sourceConn, targetConn, stasSyncStrategy, + stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType(), recordId); + } else { + syncByIdRange(sourceConn, targetConn, stasSyncStrategy, + stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId); } + } catch (SQLException | ParseException e) { + saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage())); + targetConn.rollback(); + throw new JeecgBootException(e.getMessage()); } long endTime = System.currentTimeMillis(); - saveSyncLog(recordId, String.format("表 %s 同步耗时: %s 毫秒%s", stasSyncStrategy.getTableName(), - (endTime - startTime), syncSuccess ? "" : " (失败)"), null, null, null); + saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime))); } } stasSyncRecord.setEndTime(new Date()); @@ -224,6 +136,7 @@ public class SyncDataJob implements Job { ResultSet rs = pstmt.executeQuery(); if (rs.next()) { String dataType = rs.getString("data_type").toUpperCase(); + // 判断是否为日期/时间类型 return dataType.contains("DATE") || dataType.contains("TIME"); } } @@ -234,29 +147,32 @@ public class SyncDataJob implements Job { * 按日期范围同步数据 */ public void syncByDateRange(Connection sourceConn, Connection targetConn, - StasSyncStrategy stasSyncStrategy, Integer syncDay, - Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException, ParseException { + StasSyncStrategy stasSyncStrategy, Integer syncCount, + Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException, ParseException { + // 获取最小和最大日期 DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + // 获取上次同步的位置 String syncOrigin = stasSyncStrategy.getSyncOrigin(); Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate(); Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ? lastSyncedDate : dateRange.getMinDate(); - Date currentEnd = addDays(currentStart, syncDay); + Date currentEnd = addDays(currentStart, syncCount); if (currentEnd.after(dateRange.getMaxDate())) { currentEnd = dateRange.getMaxDate(); - StringBuilder deleteClause = new StringBuilder(); - deleteClause.append("TO_CHAR(") + StringBuilder whereClause = new StringBuilder(); + whereClause.append("TO_CHAR(") .append(stasSyncStrategy.getColumnName()) .append(", 'YYYY-MM-DD HH24:MI:SS') = '") .append(sdf.format(currentEnd)) .append("'"); - deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); + deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString()); } + // 根据数据库类型构建不同的日期条件 String whereClause; if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) + @@ -266,24 +182,20 @@ public class SyncDataJob implements Job { "' AND TIMESTAMP '" + sdf.format(currentEnd) + "'"; } - int batchSizeBefore = syncDay; - saveSyncLog(recordId, String.format("%s表同步日期范围: %s 至 %s (批次天数: %d)", - stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd), syncDay), - null, batchSizeBefore, retryIndex > 0 ? retryIndex : null); + saveSyncLog(recordId,String.format("%s表同步日期范围: %s 至 %s", stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd))); int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause, sourceDbType, targetDbType); - saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), - "SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null); + saveSyncLog(recordId,String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced)); saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced); + // 更新同步位置 if (currentEnd != null) { stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd)); stasSyncStrategyMapper.updateById(stasSyncStrategy); - saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd)), - null, null, null); + saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd))); } } @@ -292,9 +204,11 @@ public class SyncDataJob implements Job { */ public void syncByIdRange(Connection sourceConn, Connection targetConn, StasSyncStrategy stasSyncStrategy, Integer syncCount, - Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException { + Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException { + // 获取最小和最大ID IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType); + // 获取上次同步的位置 String syncOrigin = stasSyncStrategy.getSyncOrigin(); long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId(); @@ -304,48 +218,45 @@ public class SyncDataJob implements Job { long currentEnd = currentStart + syncCount - 1; if (currentEnd > idRange.getMaxId()) { currentEnd = idRange.getMaxId(); - StringBuilder deleteClause = new StringBuilder(); - deleteClause.append(stasSyncStrategy.getColumnName()) + StringBuilder whereClause = new StringBuilder(); + whereClause.append(stasSyncStrategy.getColumnName()) .append(" = '") .append(currentEnd) .append("'"); - deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); + deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString()); } + // 确保起始位置不超过结束位置 if (currentStart > currentEnd) { - saveSyncLog(recordId, String.format("%s表已同步到最新,无需继续同步", stasSyncStrategy.getTableName()), - null, null, null); + saveSyncLog(recordId, String.format("%s表已同步到最新,无需继续同步", stasSyncStrategy.getTableName())); return; } String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd; - int batchSizeBefore = syncCount; - saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s (批次数量: %d)", - stasSyncStrategy.getTableName(), currentStart, currentEnd, syncCount), - null, batchSizeBefore, retryIndex > 0 ? retryIndex : null); + saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s", stasSyncStrategy.getTableName(), currentStart, currentEnd)); int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause, sourceDbType, targetDbType); - saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), - "SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null); + saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced)); saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced); + // 更新同步位置 if (currentEnd > 0) { stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd)); stasSyncStrategyMapper.updateById(stasSyncStrategy); - saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd), - null, null, null); + saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd)); } } /** - * 根据日期等于或ID等于删除数据 + * 根据日期等于或ID等于删除数据(使用yyyy-MM-dd格式比较) */ public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException { String sql; + // 构建完整的SQL语句 if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; @@ -409,7 +320,6 @@ public class SyncDataJob implements Job { /** * 同步一批数据(支持跨数据库类型) - * 增强版:细粒度异常捕获,截断时尝试提交已成功部分 */ public int syncDataBatch(Connection sourceConn, Connection targetConn, String sourceOwner, String targetOwner, @@ -418,6 +328,7 @@ public class SyncDataJob implements Job { int totalRows = 0; String selectSql; + // 构建查询SQL(根据源数据库类型) if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + "\" WHERE " + whereClause; @@ -430,10 +341,11 @@ public class SyncDataJob implements Job { ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet rs = sourceStmt.executeQuery(selectSql)) { - sourceStmt.setFetchSize(5000); + sourceStmt.setFetchSize(5000); // 优化大表读取 ResultSetMetaData metaData = rs.getMetaData(); int columnCount = metaData.getColumnCount(); + // 构建插入SQL(根据目标数据库类型) String insertSql; if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) { insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES ("; @@ -447,50 +359,35 @@ public class SyncDataJob implements Job { } insertSql += ")"; + // 批量插入 try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) { int batchSize = 0; while (rs.next()) { - try { - for (int i = 1; i <= columnCount; i++) { - Object value = rs.getObject(i); + for (int i = 1; i <= columnCount; i++) { + Object value = rs.getObject(i); - if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) { - String columnType = metaData.getColumnTypeName(i); - if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) { - Timestamp ts = rs.getTimestamp(i); - if (ts != null) { - pstmt.setTimestamp(i, ts); - continue; - } - } - } - pstmt.setObject(i, value); - } - pstmt.addBatch(); - batchSize++; - totalRows++; - - if (batchSize % 5000 == 0) { - pstmt.executeBatch(); - targetConn.commit(); - batchSize = 0; - } - } catch (SQLException e) { - // 单行处理异常,尝试提交已成功部分 - SyncAdaptiveStrategy.ErrorType errorType = syncAdaptiveStrategy.analyzeSQLException(e); - if (errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION || errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT) { - // 截断/超时错误:先提交已成功的数据 - if (batchSize > 0) { - try { - pstmt.executeBatch(); - targetConn.commit(); - } catch (SQLException commitEx) { - log.warn("提交已成功部分失败: {}", commitEx.getMessage()); + // 特殊处理Oracle的TIMESTAMP类型到PostgreSQL + if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) { + String columnType = metaData.getColumnTypeName(i); + if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) { + Timestamp ts = rs.getTimestamp(i); + if (ts != null) { + pstmt.setTimestamp(i, ts); + continue; } } } - throw e; // 向上抛出,由外层重试逻辑处理 + pstmt.setObject(i, value); + } + pstmt.addBatch(); + batchSize++; + totalRows++; + + if (batchSize % 5000 == 0) { + pstmt.executeBatch(); + targetConn.commit(); + batchSize = 0; } } if (batchSize > 0) { @@ -503,14 +400,11 @@ public class SyncDataJob implements Job { } - private void saveSyncLog(String recordId, String desc, String errorType, Integer batchSize, Integer retryIndex){ + private void saveSyncLog(String recordId, String desc){ StasSyncLog stasSyncLog = new StasSyncLog(); stasSyncLog.setRecordId(recordId); stasSyncLog.setStartTime(new Date()); stasSyncLog.setDescription(desc); - stasSyncLog.setErrorType(errorType); - stasSyncLog.setBatchSizeBefore(batchSize); - stasSyncLog.setRetryIndex(retryIndex); stasSyncLogMapper.insert(stasSyncLog); } @@ -529,4 +423,4 @@ public class SyncDataJob implements Job { long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000; return new Date(time); } -} +} \ No newline at end of file diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/SyncAdaptiveStrategy.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/SyncAdaptiveStrategy.java deleted file mode 100644 index 912f44e..0000000 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/SyncAdaptiveStrategy.java +++ /dev/null @@ -1,261 +0,0 @@ -package org.jeecg.taskConfig.service; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.jeecg.modules.base.entity.StasDataSource; -import org.jeecg.modules.base.entity.StasSyncStrategy; -import org.jeecg.modules.base.mapper.StasSyncStrategyMapper; -import org.springframework.stereotype.Component; - -import java.sql.SQLException; -import java.util.Arrays; -import java.util.List; - -/** - * 同步自适应策略组件 - * 负责异常分析、批次调整、重试决策 - */ -@Slf4j -@Component -@RequiredArgsConstructor -public class SyncAdaptiveStrategy { - - private final StasSyncStrategyMapper stasSyncStrategyMapper; - private final TimeoutProbeService timeoutProbeService; - - /** 最大重试次数 */ - private static final int MAX_RETRY_COUNT = 3; - /** 批次缩减比例 */ - private static final double BATCH_REDUCTION_RATIO = 0.5; - /** 成功后批次恢复比例 */ - private static final double BATCH_RECOVERY_RATIO = 1.2; - /** 默认最小批次天数 */ - private static final int DEFAULT_MIN_SYNC_DAY = 1; - /** 默认最小批次数量 */ - private static final int DEFAULT_MIN_SYNC_COUNT = 100; - - // Oracle 超时/连接中断关键字 - private static final List ORACLE_TIMEOUT_KEYWORDS = Arrays.asList( - "ORA-02396", "ORA-03113", "ORA-03114", "ORA-04030", - "Closed Connection", "Connection timed out", "Read timed out", - "ORA-12571", "ORA-03135", "ORA-02396", "ORA-00054", - "No more data to read from socket", "Connection reset" - ); - - // PostgreSQL 超时/连接中断关键字 - private static final List PG_TIMEOUT_KEYWORDS = Arrays.asList( - "FATAL: terminating connection", "connection timed out", - "socket closed", "I/O error", "Connection refused", - "An I/O error occurred", "Software caused connection abort" - ); - - // 截断/语句超时关键字 - private static final List TRUNCATION_KEYWORDS = Arrays.asList( - "ORA-01013", "statement timeout", "canceling statement", - "query_canceled", "ORA-00039", "timeout", - "SQLTimeoutException", "The query was canceled" - ); - - /** - * 错误类型枚举 - */ - public enum ErrorType { - TIMEOUT, TRUNCATION, OTHER - } - - /** - * 分析SQLException类型,识别超时/截断错误 - */ - public ErrorType analyzeSQLException(SQLException e) { - if (e == null) { - return ErrorType.OTHER; - } - String message = e.getMessage(); - if (message == null) { - // 检查cause - if (e.getCause() != null && e.getCause().getMessage() != null) { - message = e.getCause().getMessage(); - } else { - return ErrorType.OTHER; - } - } - String upperMessage = message.toUpperCase(); - - // 优先检查截断(更具体) - for (String keyword : TRUNCATION_KEYWORDS) { - if (upperMessage.contains(keyword.toUpperCase())) { - return ErrorType.TRUNCATION; - } - } - - // 检查Oracle超时 - for (String keyword : ORACLE_TIMEOUT_KEYWORDS) { - if (upperMessage.contains(keyword.toUpperCase())) { - return ErrorType.TIMEOUT; - } - } - - // 检查PG超时 - for (String keyword : PG_TIMEOUT_KEYWORDS) { - if (upperMessage.contains(keyword.toUpperCase())) { - return ErrorType.TIMEOUT; - } - } - - // 检查SQLState - if (e.getSQLState() != null) { - // 08xxx = 连接异常, 57014 = 语句超时 - String state = e.getSQLState(); - if (state.startsWith("08")) { - return ErrorType.TIMEOUT; - } - if ("57014".equals(state)) { - return ErrorType.TRUNCATION; - } - } - - // 检查是否为SQLTimeoutException - if (e instanceof SQLTimeoutException) { - return ErrorType.TIMEOUT; - } - - // 检查BatchUpdateException中的cause - if (e.getCause() instanceof SQLException) { - ErrorType causeType = analyzeSQLException((SQLException) e.getCause()); - if (causeType != ErrorType.OTHER) { - return causeType; - } - } - - return ErrorType.OTHER; - } - - /** - * 根据错误类型自动缩减批次大小 - * @param strategy 同步策略 - * @param isDateColumn 是否按日期同步 - * @param taskConfigSyncDay taskConfig原始批次天数 - * @param taskConfigSyncCount taskConfig原始批次数量 - * @return 调整后的[batchDay, batchCount] - */ - public int[] adjustBatchSize(StasSyncStrategy strategy, boolean isDateColumn, - int taskConfigSyncDay, int taskConfigSyncCount) { - int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY; - int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT; - - int currentSyncDay = strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : taskConfigSyncDay; - int currentSyncCount = strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : taskConfigSyncCount; - - int newSyncDay = Math.max((int)(currentSyncDay * BATCH_REDUCTION_RATIO), minSyncDay); - int newSyncCount = Math.max((int)(currentSyncCount * BATCH_REDUCTION_RATIO), minSyncCount); - - strategy.setCurrentSyncDay(newSyncDay); - strategy.setCurrentSyncCount(newSyncCount); - strategy.setRetryCount(strategy.getRetryCount() != null ? strategy.getRetryCount() + 1 : 1); - strategy.setLastErrorType(ErrorType.TIMEOUT.name()); - - stasSyncStrategyMapper.updateById(strategy); - - log.info("策略[{}.{}]批次调整: syncDay {} -> {}, syncCount {} -> {}", - strategy.getSourceOwner(), strategy.getTableName(), - currentSyncDay, newSyncDay, currentSyncCount, newSyncCount); - - return new int[]{newSyncDay, newSyncCount}; - } - - /** - * 同步成功后,逐步恢复批次大小 - * @param strategy 同步策略 - * @param taskConfigSyncDay taskConfig原始批次天数 - * @param taskConfigSyncCount taskConfig原始批次数量 - */ - public void onSyncSuccess(StasSyncStrategy strategy, int taskConfigSyncDay, int taskConfigSyncCount) { - boolean needsUpdate = false; - - // 恢复批次天数 - if (strategy.getCurrentSyncDay() != null && strategy.getCurrentSyncDay() < taskConfigSyncDay) { - int newDay = Math.min((int)(strategy.getCurrentSyncDay() * BATCH_RECOVERY_RATIO), taskConfigSyncDay); - strategy.setCurrentSyncDay(newDay); - needsUpdate = true; - } - - // 恢复批次数量 - if (strategy.getCurrentSyncCount() != null && strategy.getCurrentSyncCount() < taskConfigSyncCount) { - int newCount = Math.min((int)(strategy.getCurrentSyncCount() * BATCH_RECOVERY_RATIO), taskConfigSyncCount); - strategy.setCurrentSyncCount(newCount); - needsUpdate = true; - } - - // 清零重试计数和错误类型 - if (strategy.getRetryCount() != null && strategy.getRetryCount() > 0) { - strategy.setRetryCount(0); - needsUpdate = true; - } - if (strategy.getLastErrorType() != null) { - strategy.setLastErrorType(null); - needsUpdate = true; - } - - if (needsUpdate) { - stasSyncStrategyMapper.updateById(strategy); - log.info("策略[{}.{}]同步成功,批次恢复: syncDay={}, syncCount={}", - strategy.getSourceOwner(), strategy.getTableName(), - strategy.getCurrentSyncDay(), strategy.getCurrentSyncCount()); - } - - // 若已恢复到taskConfig原始值,清空自适应字段 - if (Integer.valueOf(taskConfigSyncDay).equals(strategy.getCurrentSyncDay()) - && Integer.valueOf(taskConfigSyncCount).equals(strategy.getCurrentSyncCount())) { - strategy.setCurrentSyncDay(null); - strategy.setCurrentSyncCount(null); - stasSyncStrategyMapper.updateById(strategy); - } - } - - /** - * 判断是否应该重试 - * @param strategy 同步策略 - * @return 是否应该重试 - */ - public boolean shouldRetry(StasSyncStrategy strategy) { - int retryCount = strategy.getRetryCount() != null ? strategy.getRetryCount() : 0; - if (retryCount >= MAX_RETRY_COUNT) { - return false; - } - - // 检查批次是否还能缩减 - int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY; - int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT; - int currentSyncDay = strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : Integer.MAX_VALUE; - int currentSyncCount = strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : Integer.MAX_VALUE; - - return currentSyncDay > minSyncDay || currentSyncCount > minSyncCount; - } - - /** - * 获取当前生效的批次天数 - */ - public int getEffectiveSyncDay(StasSyncStrategy strategy, int taskConfigSyncDay) { - return strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : taskConfigSyncDay; - } - - /** - * 获取当前生效的批次数量 - */ - public int getEffectiveSyncCount(StasSyncStrategy strategy, int taskConfigSyncCount) { - return strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : taskConfigSyncCount; - } - - /** - * 同步失败后检查是否需要调整超时配置 - */ - public void checkAndAdjustTimeout(StasDataSource dataSource, ErrorType errorType) { - if (errorType == ErrorType.TIMEOUT || errorType == ErrorType.TRUNCATION) { - try { - timeoutProbeService.adjustTimeoutFromLogs(dataSource); - } catch (Exception e) { - log.warn("调整数据源[{}]超时配置失败: {}", dataSource.getInstanceName(), e.getMessage()); - } - } - } -} diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/TimeoutProbeService.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/TimeoutProbeService.java deleted file mode 100644 index d1ed87b..0000000 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/TimeoutProbeService.java +++ /dev/null @@ -1,333 +0,0 @@ -package org.jeecg.taskConfig.service; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.jeecg.common.constant.enums.SourceDataTypeEnum; -import org.jeecg.common.util.DBUtil; -import org.jeecg.modules.base.entity.StasDataSource; -import org.jeecg.modules.base.mapper.StasDataSourceMapper; -import org.jeecg.modules.base.mapper.StasSyncLogMapper; -import org.springframework.stereotype.Service; - -import java.sql.*; -import java.util.Date; -import java.util.Properties; - -/** - * 超时自动探测服务 - * 采用探针探测 + 历史日志分析结合方案 - */ -@Slf4j -@Service -@RequiredArgsConstructor -public class TimeoutProbeService { - - private final StasDataSourceMapper stasDataSourceMapper; - private final StasSyncLogMapper stasSyncLogMapper; - - /** 探针重复次数 */ - private static final int PROBE_REPEAT_COUNT = 3; - /** 最小连接超时(秒) */ - private static final int MIN_CONNECT_TIMEOUT_SEC = 30; - /** 最小读取超时(秒) */ - private static final int MIN_READ_TIMEOUT_SEC = 120; - /** 连接超时倍率(相对平均延迟) */ - private static final int CONNECT_TIMEOUT_MULTIPLIER = 5; - /** 读取超时倍率(相对平均延迟) */ - private static final int READ_TIMEOUT_MULTIPLIER = 20; - /** 探测结果过期天数 */ - private static final int PROBE_EXPIRE_DAYS = 7; - /** 日志分析窗口天数 */ - private static final int LOG_ANALYSIS_WINDOW_DAYS = 30; - /** 超时错误率阈值 */ - private static final double TIMEOUT_ERROR_RATE_THRESHOLD = 0.2; - /** 连续无超时日志条数阈值(满足此条件则温和下调) */ - private static final int CONSECUTIVE_NON_TIMEOUT_THRESHOLD = 5; - - /** - * 获取生效超时值(供DBUtil.getConnection使用) - * 若未探测过或已过期,自动触发探测 - * @return [connectTimeoutMs, readTimeoutMs] - */ - public int[] getEffectiveTimeout(StasDataSource dataSource) { - // 检查是否需要重新探测 - if (dataSource.getDetectedConnectTimeout() == null || isProbeExpired(dataSource)) { - probeAndDetectTimeout(dataSource); - } - // 基于日志动态微调 - adjustTimeoutFromLogs(dataSource); - - int connectTimeoutMs = dataSource.getDetectedConnectTimeout() != null - ? dataSource.getDetectedConnectTimeout() * 1000 - : DBUtil.DEFAULT_ORACLE_CONNECT_TIMEOUT; - int readTimeoutMs = dataSource.getDetectedReadTimeout() != null - ? dataSource.getDetectedReadTimeout() * 1000 - : DBUtil.DEFAULT_ORACLE_READ_TIMEOUT; - - return new int[]{connectTimeoutMs, readTimeoutMs}; - } - - /** - * 首次连接自动探测 - */ - public void probeAndDetectTimeout(StasDataSource dataSource) { - boolean isOracle = SourceDataTypeEnum.ORACLE.getKey().equals(dataSource.getType()); - boolean isPg = SourceDataTypeEnum.POSTGRES.getKey().equals(dataSource.getType()); - - String url; - String driver; - String probeSql; - - if (isOracle) { - url = DBUtil.getUrl(dataSource.getIpAddress(), dataSource.getPort(), dataSource.getServeId()); - driver = DBUtil.ORACLE_DRIVER; - probeSql = "SELECT 1 FROM DUAL"; - } else if (isPg) { - url = DBUtil.getPgUrlWithKeepAlive(dataSource.getIpAddress(), dataSource.getPort(), dataSource.getServeId()); - driver = DBUtil.POSTGRES_DRIVER; - probeSql = "SELECT 1"; - } else { - log.warn("不支持的数据源类型: {}, 跳过超时探测", dataSource.getType()); - return; - } - - long totalLatency = 0; - int successCount = 0; - - for (int i = 0; i < PROBE_REPEAT_COUNT; i++) { - try { - long start = System.currentTimeMillis(); - Class.forName(driver); - Properties props = new Properties(); - props.put("user", dataSource.getUsername()); - props.put("password", dataSource.getPassword()); - // 探针自身使用短超时 - if (isOracle) { - props.put("oracle.net.CONNECT_TIMEOUT", "10000"); - props.put("oracle.jdbc.ReadTimeout", "30000"); - } else { - props.put("loginTimeout", "10"); - props.put("socketTimeout", "30"); - } - try (Connection conn = DriverManager.getConnection(url, props); - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(probeSql)) { - if (rs.next()) { - long elapsed = System.currentTimeMillis() - start; - totalLatency += elapsed; - successCount++; - log.info("数据源[{}]探针第{}次响应: {}ms", dataSource.getInstanceName(), i + 1, elapsed); - } - } - } catch (Exception e) { - log.warn("数据源[{}]探针第{}次失败: {}", dataSource.getInstanceName(), i + 1, e.getMessage()); - } - } - - if (successCount == 0) { - log.error("数据源[{}]探针全部失败,使用默认超时", dataSource.getInstanceName()); - return; - } - - long avgLatencyMs = totalLatency / successCount; - // 转为秒计算 - long avgLatencySec = Math.max(avgLatencyMs / 1000, 1); - - int connectTimeoutSec = Math.max((int)(avgLatencySec * CONNECT_TIMEOUT_MULTIPLIER), MIN_CONNECT_TIMEOUT_SEC); - int readTimeoutSec = Math.max((int)(avgLatencySec * READ_TIMEOUT_MULTIPLIER), MIN_READ_TIMEOUT_SEC); - - // 查询数据库自身超时配置作为上限参考 - int[] dbLimits = queryDatabaseTimeoutLimits(dataSource, url, driver, isOracle); - if (dbLimits != null) { - int dbConnectLimit = dbLimits[0]; - int dbReadLimit = dbLimits[1]; - if (dbConnectLimit > 0 && connectTimeoutSec > dbConnectLimit) { - connectTimeoutSec = (int)(dbConnectLimit * 0.8); - } - if (dbReadLimit > 0 && readTimeoutSec > dbReadLimit) { - readTimeoutSec = (int)(dbReadLimit * 0.8); - } - } - - log.info("数据源[{}]探测结果: avgLatency={}ms, connectTimeout={}s, readTimeout={}s", - dataSource.getInstanceName(), avgLatencyMs, connectTimeoutSec, readTimeoutSec); - - dataSource.setDetectedConnectTimeout(connectTimeoutSec); - dataSource.setDetectedReadTimeout(readTimeoutSec); - dataSource.setLastProbeTime(new Date()); - stasDataSourceMapper.updateById(dataSource); - } - - /** - * 基于历史日志动态调整超时 - */ - public void adjustTimeoutFromLogs(StasDataSource dataSource) { - try { - int timeoutErrors = stasSyncLogMapper.countTimeoutErrorsByDataSource( - dataSource.getId(), LOG_ANALYSIS_WINDOW_DAYS); - Long avgSyncTimeMs = stasSyncLogMapper.avgSyncTimeByDataSource( - dataSource.getId(), LOG_ANALYSIS_WINDOW_DAYS); - int consecutiveNonTimeout = stasSyncLogMapper.countConsecutiveNonTimeoutByDataSource( - dataSource.getId(), CONSECUTIVE_NON_TIMEOUT_THRESHOLD); - - int currentReadTimeout = dataSource.getDetectedReadTimeout() != null - ? dataSource.getDetectedReadTimeout() : MIN_READ_TIMEOUT_SEC; - - // 计算超时错误率 - int totalErrors = timeoutErrors; - double errorRate = totalErrors > 0 ? (double) timeoutErrors / Math.max(totalErrors, 1) : 0; - - if (errorRate > TIMEOUT_ERROR_RATE_THRESHOLD && avgSyncTimeMs != null && avgSyncTimeMs > 0) { - // 超时错误率过高,增大读取超时 - int newReadTimeout = (int) Math.max(currentReadTimeout * 1.5, - (avgSyncTimeMs / 1000) * 3); - log.info("数据源[{}]超时错误率{}}过高,读取超时从{}s调整为{}s", - dataSource.getInstanceName(), String.format("%.2f", errorRate), - currentReadTimeout, newReadTimeout); - dataSource.setDetectedReadTimeout(newReadTimeout); - stasDataSourceMapper.updateById(dataSource); - } else if (consecutiveNonTimeout >= CONSECUTIVE_NON_TIMEOUT_THRESHOLD - && avgSyncTimeMs != null && avgSyncTimeMs > 0) { - // 连续无超时错误,温和下调 - int newReadTimeout = (int) Math.max(currentReadTimeout * 0.8, - Math.max((avgSyncTimeMs / 1000) * 2, MIN_READ_TIMEOUT_SEC)); - if (newReadTimeout < currentReadTimeout) { - log.info("数据源[{}]连续无超时错误,读取超时从{}s调整为{}s", - dataSource.getInstanceName(), currentReadTimeout, newReadTimeout); - dataSource.setDetectedReadTimeout(newReadTimeout); - stasDataSourceMapper.updateById(dataSource); - } - } - } catch (Exception e) { - log.warn("数据源[{}]日志分析调整超时失败: {}", dataSource.getInstanceName(), e.getMessage()); - } - } - - /** - * 查询数据库自身超时配置 - * @return [connectTimeLimitSec, idleTimeLimitSec] 或 null - */ - private int[] queryDatabaseTimeoutLimits(StasDataSource dataSource, String url, String driver, boolean isOracle) { - try { - Class.forName(driver); - Properties props = new Properties(); - props.put("user", dataSource.getUsername()); - props.put("password", dataSource.getPassword()); - if (isOracle) { - props.put("oracle.net.CONNECT_TIMEOUT", "10000"); - props.put("oracle.jdbc.ReadTimeout", "30000"); - } else { - props.put("loginTimeout", "10"); - props.put("socketTimeout", "30"); - } - - try (Connection conn = DriverManager.getConnection(url, props)) { - if (isOracle) { - return queryOracleTimeoutLimits(conn); - } else { - return queryPgTimeoutLimits(conn); - } - } - } catch (Exception e) { - log.warn("查询数据源[{}]超时配置失败: {}", dataSource.getInstanceName(), e.getMessage()); - return null; - } - } - - /** - * 查询Oracle数据库超时限制 - */ - private int[] queryOracleTimeoutLimits(Connection conn) { - int connectTimeLimit = -1; - int idleTimeLimit = -1; - String sql = "SELECT resource_name, limit FROM dba_profiles " + - "WHERE profile = 'DEFAULT' AND resource_name IN ('CONNECT_TIME', 'IDLE_TIME')"; - try (PreparedStatement pstmt = conn.prepareStatement(sql); - ResultSet rs = pstmt.executeQuery()) { - while (rs.next()) { - String resourceName = rs.getString("resource_name"); - String limitStr = rs.getString("limit"); - if ("DEFAULT".equalsIgnoreCase(limitStr) || "UNLIMITED".equalsIgnoreCase(limitStr)) { - continue; - } - try { - int limitVal = Integer.parseInt(limitStr); - if ("CONNECT_TIME".equals(resourceName)) { - connectTimeLimit = limitVal; // 分钟 - } else if ("IDLE_TIME".equals(resourceName)) { - idleTimeLimit = limitVal; // 分钟 - } - } catch (NumberFormatException ignored) { - } - } - } catch (SQLException e) { - log.warn("查询Oracle超时配置失败: {}", e.getMessage()); - } - // 转为秒返回(Oracle中CONNECT_TIME单位是分钟) - return new int[]{ - connectTimeLimit > 0 ? connectTimeLimit * 60 : -1, - idleTimeLimit > 0 ? idleTimeLimit * 60 : -1 - }; - } - - /** - * 查询PostgreSQL数据库超时限制 - */ - private int[] queryPgTimeoutLimits(Connection conn) { - int statementTimeout = -1; - int idleTimeout = -1; - try (Statement stmt = conn.createStatement()) { - try (ResultSet rs = stmt.executeQuery("SHOW statement_timeout")) { - if (rs.next()) { - statementTimeout = parsePgIntervalToSec(rs.getString(1)); - } - } - try (ResultSet rs = stmt.executeQuery("SHOW idle_in_transaction_session_timeout")) { - if (rs.next()) { - idleTimeout = parsePgIntervalToSec(rs.getString(1)); - } - } - } catch (SQLException e) { - log.warn("查询PG超时配置失败: {}", e.getMessage()); - } - return new int[]{statementTimeout, idleTimeout}; - } - - /** - * 解析PG时间间隔字符串为秒数(如 "5min", "30s", "2h") - */ - private int parsePgIntervalToSec(String value) { - if (value == null || "0".equals(value) || "-1".equals(value) || value.startsWith("0")) { - return -1; - } - try { - value = value.trim().toLowerCase(); - if (value.endsWith("ms")) { - return (int) (Long.parseLong(value.replace("ms", "")) / 1000); - } else if (value.endsWith("s")) { - return Integer.parseInt(value.replace("s", "")); - } else if (value.endsWith("min")) { - return Integer.parseInt(value.replace("min", "")) * 60; - } else if (value.endsWith("h")) { - return Integer.parseInt(value.replace("h", "")) * 3600; - } else if (value.endsWith("d")) { - return Integer.parseInt(value.replace("d", "")) * 86400; - } - return Integer.parseInt(value); - } catch (NumberFormatException e) { - return -1; - } - } - - /** - * 判断探测结果是否过期(超过7天) - */ - private boolean isProbeExpired(StasDataSource dataSource) { - if (dataSource.getLastProbeTime() == null) { - return true; - } - long daysSinceProbe = (System.currentTimeMillis() - dataSource.getLastProbeTime().getTime()) - / (1000 * 60 * 60 * 24); - return daysSinceProbe >= PROBE_EXPIRE_DAYS; - } -} diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java index a015477..bb021dd 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java @@ -2,11 +2,9 @@ package org.jeecg.taskConfig.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.QuartzJobConstant; -import org.jeecg.common.constant.enums.SourceDataTypeEnum; import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.util.DBUtil; import org.jeecg.modules.base.entity.StasDataSource; @@ -18,8 +16,6 @@ import org.jeecg.modules.base.mapper.StasTaskConfigMapper; import org.jeecg.modules.base.entity.QuartzJob; import org.jeecg.quartz.service.IQuartzJobService; import org.jeecg.taskConfig.service.IStasTaskConfigService; -import org.jeecg.taskConfig.service.SyncAdaptiveStrategy; -import org.jeecg.taskConfig.service.TimeoutProbeService; import org.jeecg.vo.*; import org.springframework.stereotype.Service; @@ -41,15 +37,12 @@ import java.util.stream.Collectors; */ @Service @RequiredArgsConstructor -@Slf4j public class StasTaskConfigServiceImpl extends ServiceImpl implements IStasTaskConfigService { private final StasTaskConfigMapper stasTaskConfigMapper; private final StasDataSourceMapper stasDataSourceMapper; private final StasSyncStrategyMapper stasSyncStrategyMapper; private final IQuartzJobService quartzJobService; - private final SyncAdaptiveStrategy syncAdaptiveStrategy; - private final TimeoutProbeService timeoutProbeService; /** * 数据表思维导图 @@ -169,7 +162,7 @@ public class StasTaskConfigServiceImpl extends ServiceImpl stasSyncStrategies = stasSyncStrategyMapper. selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { - log.info("开始同步表: {} (依据字段: {})", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()); + System.out.println("开始同步表: " + stasSyncStrategy.getTableName() + " (依据字段: " + stasSyncStrategy.getColumnName() + ")"); long startTime = System.currentTimeMillis(); - boolean isDate = isDateColumn(sourceConn, stasSyncStrategy); - int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay()); - int effectiveSyncCount = syncAdaptiveStrategy.getEffectiveSyncCount(stasSyncStrategy, stasTaskConfig.getSyncCount()); - - boolean syncSuccess = false; - int retryIndex = 0; - - while (!syncSuccess) { - try { - if (isDate) { - syncByDateRange(sourceConn, targetConn, stasSyncStrategy, effectiveSyncDay); - } else { - syncByIdRange(sourceConn, targetConn, stasSyncStrategy, effectiveSyncCount); - } - syncSuccess = true; - syncAdaptiveStrategy.onSyncSuccess(stasSyncStrategy, - stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount()); - } catch (SQLException e) { - SyncAdaptiveStrategy.ErrorType errorType = syncAdaptiveStrategy.analyzeSQLException(e); - retryIndex++; - log.warn("同步表 {} 时出错(第{}次): {} [错误类型: {}]", - stasSyncStrategy.getTableName(), retryIndex, e.getMessage(), errorType.name()); - - syncAdaptiveStrategy.checkAndAdjustTimeout(sourceInfo, errorType); - syncAdaptiveStrategy.checkAndAdjustTimeout(targetInfo, errorType); - - if ((errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT - || errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION) - && syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) { - int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate, - stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount()); - effectiveSyncDay = adjusted[0]; - effectiveSyncCount = adjusted[1]; - log.info("表 {} 批次已调整,准备第{}次重试: syncDay={}, syncCount={}", - stasSyncStrategy.getTableName(), retryIndex + 1, effectiveSyncDay, effectiveSyncCount); - try { targetConn.rollback(); } catch (SQLException ignored) {} - if (sourceConn.isClosed() || targetConn.isClosed()) { - log.warn("表 {} 连接已断开,无法重试,跳过该表", stasSyncStrategy.getTableName()); - break; - } - continue; - } else { - try { targetConn.rollback(); } catch (SQLException ignored) {} - log.error("表 {} 同步失败,跳过: {}", stasSyncStrategy.getTableName(), e.getMessage()); - break; - } - } catch (ParseException e) { - throw new JeecgBootException(e.getMessage()); + try { + if (isDateColumn(sourceConn,stasSyncStrategy)) { + syncByDateRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncDay()); + } else { + syncByIdRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncCount()); } + } catch (SQLException e) { + System.err.println("同步表 " + stasSyncStrategy.getTableName() + " 时出错: " + e.getMessage()); + targetConn.rollback(); + throw new JeecgBootException(e.getMessage()); + } catch (ParseException e) { + throw new JeecgBootException(e.getMessage()); } long endTime = System.currentTimeMillis(); - log.info("表 {} 同步耗时: {} 毫秒{}", stasSyncStrategy.getTableName(), - (endTime - startTime), syncSuccess ? "" : " (失败)"); + System.out.println("表 " + stasSyncStrategy.getTableName() + " 同步耗时: " + (endTime - startTime) + " 毫秒"); } } } diff --git a/jeecg-module-sync/src/main/resources/sql/sync_adaptive_ddl.sql b/jeecg-module-sync/src/main/resources/sql/sync_adaptive_ddl.sql deleted file mode 100644 index d72f768..0000000 --- a/jeecg-module-sync/src/main/resources/sql/sync_adaptive_ddl.sql +++ /dev/null @@ -1,40 +0,0 @@ --- ============================================================ --- 数据同步超时自适应与批次调整 - DDL更新脚本 --- 执行环境: PostgreSQL (目标数据库) --- ============================================================ - --- 1. stas_data_source 表新增超时探测字段 -ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS detected_connect_timeout INTEGER; -ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS detected_read_timeout INTEGER; -ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS last_probe_time TIMESTAMP; - --- 2. stas_sync_strategy 表新增自适应批次字段 -ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS current_sync_day INTEGER; -ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS current_sync_count INTEGER; -ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS last_error_type VARCHAR(20); -ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0; -ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS min_sync_day INTEGER DEFAULT 1; -ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS min_sync_count INTEGER DEFAULT 100; - --- 3. stas_sync_log 表新增错误详情字段 -ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS error_type VARCHAR(20); -ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS batch_size_before INTEGER; -ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS batch_size_after INTEGER; -ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS retry_index INTEGER; - --- 4. 为新增字段添加注释(PostgreSQL) -COMMENT ON COLUMN stas_data_source.detected_connect_timeout IS '探测到的连接超时(秒),null表示未探测'; -COMMENT ON COLUMN stas_data_source.detected_read_timeout IS '探测到的读取超时(秒),null表示未探测'; -COMMENT ON COLUMN stas_data_source.last_probe_time IS '上次超时探测时间,超过7天自动重新探测'; - -COMMENT ON COLUMN stas_sync_strategy.current_sync_day IS '当前生效的批次天数(自适应调整后),null时使用taskConfig的syncDay'; -COMMENT ON COLUMN stas_sync_strategy.current_sync_count IS '当前生效的批次数量(自适应调整后),null时使用taskConfig的syncCount'; -COMMENT ON COLUMN stas_sync_strategy.last_error_type IS '最近一次错误类型: TIMEOUT/TRUNCATION/OTHER'; -COMMENT ON COLUMN stas_sync_strategy.retry_count IS '连续重试次数,同步成功后清零'; -COMMENT ON COLUMN stas_sync_strategy.min_sync_day IS '批次天数最小值下限,默认1'; -COMMENT ON COLUMN stas_sync_strategy.min_sync_count IS '批次数量最小值下限,默认100'; - -COMMENT ON COLUMN stas_sync_log.error_type IS '错误类型: TIMEOUT/TRUNCATION/OTHER/SUCCESS'; -COMMENT ON COLUMN stas_sync_log.batch_size_before IS '调整前批次大小'; -COMMENT ON COLUMN stas_sync_log.batch_size_after IS '调整后批次大小'; -COMMENT ON COLUMN stas_sync_log.retry_index IS '第几次重试';