修改同步服务功能

This commit is contained in:
rencheng 2026-05-09 10:56:21 +08:00
parent 31a3f6cb14
commit ab1dac0426
14 changed files with 1501 additions and 108 deletions

View File

@ -10,6 +10,15 @@ public class DBUtil {
public static final String ORACLE_URL_PREFIX = "jdbc:oracle:thin:@";
public static final String POSTGRES_URL_PREFIX = "jdbc:postgresql://";
/** 默认Oracle连接超时毫秒 */
public static final int DEFAULT_ORACLE_CONNECT_TIMEOUT = 300000;
/** 默认Oracle读取超时毫秒 */
public static final int DEFAULT_ORACLE_READ_TIMEOUT = 1200000;
/** 默认PostgreSQL连接超时毫秒 */
public static final int DEFAULT_PG_CONNECT_TIMEOUT = 300000;
/** 默认PostgreSQL套接字超时毫秒 */
public static final int DEFAULT_PG_SOCKET_TIMEOUT = 1200000;
/*public static Connection getConnection(String url, String username, String password, String Driver) throws SQLException{
try {
Class.forName(Driver);
@ -34,18 +43,63 @@ public class DBUtil {
return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId);
}
public static Connection getConnection(String url, String username, String password, String Driver,int type) throws SQLException{
/**
* 构建PostgreSQL URL带tcpKeepAlive参数
*/
public static String getPgUrlWithKeepAlive(String ip, Integer port, String serveId){
String baseUrl = String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId);
if (baseUrl.contains("?")) {
return baseUrl + "&tcpKeepAlive=true";
}
return baseUrl + "?tcpKeepAlive=true";
}
/**
* 获取数据库连接兼容旧接口使用默认超时
* @param type 0=PostgreSQL, 1=Oracle
*/
public static Connection getConnection(String url, String username, String password, String Driver, int type) throws SQLException{
if (type == 0) {
return getConnection(url, username, password, Driver, type,
DEFAULT_PG_CONNECT_TIMEOUT, DEFAULT_PG_SOCKET_TIMEOUT);
} else {
return getConnection(url, username, password, Driver, type,
DEFAULT_ORACLE_CONNECT_TIMEOUT, DEFAULT_ORACLE_READ_TIMEOUT);
}
}
/**
* 获取数据库连接自定义超时
* @param url JDBC连接URL
* @param username 用户名
* @param password 密码
* @param driver 驱动类名
* @param type 0=PostgreSQL, 1=Oracle
* @param connectTimeoutMs 连接超时毫秒null则使用默认值
* @param readTimeoutMs 读取超时毫秒null则使用默认值
*/
public static Connection getConnection(String url, String username, String password,
String driver, int type,
Integer connectTimeoutMs, Integer readTimeoutMs) throws SQLException{
try {
Class.forName(Driver);
Class.forName(driver);
Properties props = new Properties();
if(type == 0){
props.put("user", username);
props.put("password", password);
}else{
props.put("user", username);
props.put("password", password);
props.put("oracle.net.CONNECT_TIMEOUT", "300000");
props.put("oracle.jdbc.ReadTimeout", "1200000");
props.put("user", username);
props.put("password", password);
if (type == 0) {
// PostgreSQL
int connTimeout = (connectTimeoutMs != null) ? connectTimeoutMs : DEFAULT_PG_CONNECT_TIMEOUT;
int readTimeout = (readTimeoutMs != null) ? readTimeoutMs : DEFAULT_PG_SOCKET_TIMEOUT;
props.put("loginTimeout", String.valueOf(connTimeout / 1000));
props.put("socketTimeout", String.valueOf(readTimeout / 1000));
props.put("tcpKeepAlive", "true");
} else {
// Oracle
int connTimeout = (connectTimeoutMs != null) ? connectTimeoutMs : DEFAULT_ORACLE_CONNECT_TIMEOUT;
int readTimeout = (readTimeoutMs != null) ? readTimeoutMs : DEFAULT_ORACLE_READ_TIMEOUT;
props.put("oracle.net.CONNECT_TIMEOUT", String.valueOf(connTimeout));
props.put("oracle.jdbc.ReadTimeout", String.valueOf(readTimeout));
}
return DriverManager.getConnection(url, props);
} catch(ClassNotFoundException e){

View File

@ -78,7 +78,15 @@ public class StasDataSource implements Serializable {
/**描述*/
@Excel(name = "描述", width = 15)
private String description;
/**探测到的连接超时(秒)*/
@Excel(name = "连接超时(秒)", width = 15)
private Integer detectedConnectTimeout;
/**探测到的读取超时(秒)*/
@Excel(name = "读取超时(秒)", width = 15)
private Integer detectedReadTimeout;
/**上次探测时间*/
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
private Date lastProbeTime;
}

View File

@ -40,4 +40,16 @@ public class StasSyncLog implements Serializable {
/**描述*/
@Excel(name = "描述", width = 15)
private String description;
/**错误类型TIMEOUT/TRUNCATION/OTHER/SUCCESS*/
@Excel(name = "错误类型", width = 15)
private String errorType;
/**调整前批次大小*/
@Excel(name = "调整前批次大小", width = 15)
private Integer batchSizeBefore;
/**调整后批次大小*/
@Excel(name = "调整后批次大小", width = 15)
private Integer batchSizeAfter;
/**第几次重试*/
@Excel(name = "重试序号", width = 15)
private Integer retryIndex;
}

View File

@ -63,4 +63,22 @@ public class StasSyncStrategy implements Serializable {
/**同步位置*/
@Excel(name = "同步位置", width = 15)
private String syncOrigin;
/**当前生效的批次天数自适应调整后null时使用taskConfig的syncDay*/
@Excel(name = "当前批次天数", width = 15)
private Integer currentSyncDay;
/**当前生效的批次数量自适应调整后null时使用taskConfig的syncCount*/
@Excel(name = "当前批次数量", width = 15)
private Integer currentSyncCount;
/**最近一次错误类型TIMEOUT/TRUNCATION/OTHER*/
@Excel(name = "错误类型", width = 15)
private String lastErrorType;
/**连续重试次数(成功后清零)*/
@Excel(name = "重试次数", width = 15)
private Integer retryCount;
/**批次天数最小值下限默认1*/
@Excel(name = "最小批次天数", width = 15)
private Integer minSyncDay;
/**批次数量最小值下限默认100*/
@Excel(name = "最小批次数量", width = 15)
private Integer minSyncCount;
}

View File

@ -1,5 +1,6 @@
package org.jeecg.modules.base.mapper;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.base.entity.StasSyncLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@ -11,4 +12,18 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
*/
public interface StasSyncLogMapper extends BaseMapper<StasSyncLog> {
/**
* 统计指定数据源关联的最近N天超时/截断错误次数
*/
int countTimeoutErrorsByDataSource(@Param("dataSourceId") String dataSourceId, @Param("days") int days);
/**
* 统计指定数据源关联的最近N天成功同步的平均耗时毫秒
*/
Long avgSyncTimeByDataSource(@Param("dataSourceId") String dataSourceId, @Param("days") int days);
/**
* 统计指定数据源最近连续N条日志中无超时错误的次数
*/
int countConsecutiveNonTimeoutByDataSource(@Param("dataSourceId") String dataSourceId, @Param("limit") int limit);
}

View File

@ -2,4 +2,38 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.StasSyncLogMapper">
<!-- 统计指定数据源关联的最近N天超时/截断错误次数 -->
<select id="countTimeoutErrorsByDataSource" resultType="int">
SELECT COUNT(1)
FROM stas_sync_log l
JOIN stas_sync_record r ON l.record_id = r.id
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND l.error_type IN ('TIMEOUT', 'TRUNCATION')
AND l.start_time &gt;= (CURRENT_DATE - INTERVAL '#{days}' DAY)
</select>
<!-- 统计指定数据源关联的最近N天成功同步的平均耗时(毫秒) -->
<select id="avgSyncTimeByDataSource" resultType="java.lang.Long">
SELECT CAST(AVG(
EXTRACT(EPOCH FROM (r.end_time - r.start_time)) * 1000
) AS BIGINT)
FROM stas_sync_record r
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND r.end_time IS NOT NULL
AND r.start_time &gt;= (CURRENT_DATE - INTERVAL '#{days}' DAY)
</select>
<!-- 统计指定数据源最近连续N条日志中无超时错误的记录数 -->
<select id="countConsecutiveNonTimeoutByDataSource" resultType="int">
SELECT COUNT(1) FROM (
SELECT l.error_type
FROM stas_sync_log l
JOIN stas_sync_record r ON l.record_id = r.id
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
ORDER BY l.start_time DESC
LIMIT #{limit}
) sub
WHERE sub.error_type IS NULL OR sub.error_type NOT IN ('TIMEOUT', 'TRUNCATION')
</select>
</mapper>

View File

@ -0,0 +1,369 @@
================================================================================
jeecg-module-sync 模块 — AI辅助代码编辑上下文文件
================================================================================
一、模块概述
--------------------------------------------------------------------------------
jeecg-module-sync 是源项分析系统STAS中的数据同步模块负责在Oracle和PostgreSQL
数据库之间进行数据同步。支持定时任务调度、增量同步(按日期/ID范围、同步策略管理、
同步日志记录和同步统计等功能。
父工程: jeecg-boot-parent 3.8.1
Java版本: 17
核心依赖: jeecg-boot-base-core提供实体、Mapper、工具类等基础能力
框架: Spring Boot + MyBatis-Plus + Quartz + Swagger/OpenAPI
二、目录结构与文件清单
--------------------------------------------------------------------------------
jeecg-module-sync/
├── pom.xml
└── src/main/java/org/jeecg/
├── OracleSync.java # 独立工具类Oracle-to-Oracle同步main方法入口
├── OracleToPgSync.java # 独立工具类Oracle-to-PostgreSQL建表同步main方法入口
├── dataSource/ # 数据源管理子模块
│ ├── controller/StasDataSourceController.java
│ ├── service/IStasDataSourceService.java
│ ├── service/impl/StasDataSourceServiceImpl.java
│ └── vo/TableColumnVO.java
├── stasSyncStrategy/ # 同步策略管理子模块
│ ├── controller/StasSyncStrategyController.java
│ ├── service/IStasSyncStrategyService.java
│ └── service/impl/StasSyncStrategyServiceImpl.java
├── syncLog/ # 同步日志子模块
│ ├── controller/StasSyncLogController.java
│ ├── service/IStasSyncLogService.java
│ └── service/impl/StasSyncLogServiceImpl.java
├── syncNum/ # 同步数量统计子模块
│ ├── controller/StasSyncNumController.java
│ ├── service/IStasSyncNumService.java
│ ├── service/impl/StasSyncNumServiceImpl.java
│ └── vo/PieChartVO.java
├── syncRecord/ # 同步记录子模块
│ ├── controller/StasSyncRecordController.java
│ ├── service/IStasSyncRecordService.java
│ ├── service/impl/StasSyncRecordServiceImpl.java
│ └── vo/SyncRecordVO.java, TaskHisStatsVO.java, TaskStatsResultVO.java
├── taskConfig/ # 任务配置子模块(核心)
│ ├── controller/StasTaskConfigController.java
│ ├── job/SyncDataJob.java # Quartz Job执行类
│ ├── service/IStasTaskConfigService.java
│ └── service/impl/StasTaskConfigServiceImpl.java
├── quartz/ # Quartz定时任务管理
│ ├── service/IQuartzJobService.java
│ └── service/impl/QuartzJobServiceImpl.java
└── vo/ # 模块内VO
├── DateRangeVO.java
├── IdRangeVO.java
├── MindMapVO.java
├── TableInfoVO.java
└── UserInfoVO.java
三、数据库实体(定义在 jeecg-boot-base-core 中)
--------------------------------------------------------------------------------
本模块的实体类统一定义在 jeecg-boot-base-core 的 org.jeecg.modules.base.entity 包中,
由 jeecg-boot-base-core 的 org.jeecg.modules.base.mapper 提供 MyBatis-Plus Mapper 接口。
1. StasDataSource表 stas_data_source— 数据源配置
字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime,
instanceName(实例名), type(数据库类型: 0=Oracle,1=PostgreSQL),
port, serveId(服务名), dbLink, username, password, remark,
delFlag(逻辑删除), ipAddress, description
2. StasSyncStrategy表 stas_sync_strategy— 同步策略
字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime,
taskId(关联任务ID), sourceOwner(源用户/Schema), targetOwner(目标用户/Schema),
tableName(同步表名), columnName(依据字段名), syncOrigin(同步位置/断点)
3. StasTaskConfig表 stas_task_config— 任务配置
字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime,
taskName(任务名称), quartzId(关联Quartz任务ID), cron(Cron表达式),
sourceId(源库ID), targetId(目标库ID), syncCount(批次数量), syncDay(批次天数),
taskStatus(0=未开始,1=进行中), description,
sourceName(@TableField(exist=false)), targetName(@TableField(exist=false))
4. StasSyncLog表 stas_sync_log— 同步日志
字段: id(ASSIGN_ID), recordId(关联记录ID), startTime, description
5. StasSyncNum表 stas_sync_num— 同步数量
字段: id(ASSIGN_ID), recordId(关联记录ID), tableName, syncNum(同步条数)
6. StasSyncRecord表 stas_sync_record— 同步记录
字段: id(ASSIGN_ID), taskId, sourceId, targetId, startTime, endTime
7. QuartzJob表 sys_quartz_job— Quartz定时任务
字段: id(ASSIGN_ID), createBy, createTime, delFlag, updateBy, updateTime,
jobClassName, cronExpression, parameter, description, status(0=正常,-1=停止)
四、核心枚举与常量(定义在 jeecg-boot-base-core 中)
--------------------------------------------------------------------------------
1. SourceDataTypeEnum:
ORACLE(0, "ORACLE"), POSTGRES(1, "POSTGRES")
2. SyncTaskStatusEnum:
NOT_STARTED(0), IN_OPERATION(1)
3. QuartzJobConstant:
JOB_CLASS_NAME = "org.jeecg.taskConfig.job.SyncDataJob"
4. DBUtil 工具类:
- ORACLE_DRIVER = "oracle.jdbc.OracleDriver"
- POSTGRES_DRIVER = "org.postgresql.Driver"
- ORACLE_URL_PREFIX = "jdbc:oracle:thin:@"
- POSTGRES_URL_PREFIX = "jdbc:postgresql://"
- getUrl(ip, port, serveId) → 构建Oracle JDBC URL
- getPgUrl(ip, port, serveId) → 构建PostgreSQL JDBC URL
- getConnection(url, username, password, driver, type) → 获取JDBC连接
type=0: PostgreSQL(无超时设置), type=1: Oracle(设置CONNECT_TIMEOUT=300s, ReadTimeout=1200s)
- close(conn, stmt, rs) → 释放资源
五、Mapper XML定义在 jeecg-boot-base-core 中)
--------------------------------------------------------------------------------
StasSyncRecordMapper.xml — 同步记录统计查询:
- taskHistoryStats: 关联 stas_task_config + stas_sync_record + stas_sync_num查历史同步详情
- taskStatsDay: 按日统计同步数量TO_CHAR(start_time,'YYYY-MM-DD')
- taskStatsMonth: 按月统计同步数量TO_CHAR(start_time,'YYYY-MM-DD')
六、API接口清单
--------------------------------------------------------------------------------
1. 数据源管理 — StasDataSourceController (/dataSource)
GET /list — 分页查询数据源支持instanceName模糊查询
GET /userList — 查询数据源下所有用户Oracle: ALL_USERS; PG: information_schema.schemata
GET /targetUser — 查询目标数据源用户通过taskId关联查targetId
GET /tableList — 查询数据源下指定用户的所有表
GET /fieldList — 查询数据源表字段
GET /testConnection — 数据库连接测试
GET /checkInstanceName — 数据库名称唯一性校验
POST /add — 添加数据源
PUT /edit — 编辑数据源
DELETE /delete — 删除数据源(单个)
DELETE /deleteBatch — 批量删除数据源
GET /queryById — 通过ID查询
GET /queryall — 查询所有数据源
2. 同步策略 — StasSyncStrategyController (/syncStrategy)
GET /list — 分页查询同步策略
POST /createTargetTables — 在目标库创建表结构并保存策略
POST /add — 添加同步策略
PUT/POST /edit — 编辑同步策略
DELETE /delete — 删除同步策略
DELETE /deleteBatch — 批量删除同步策略
GET /queryById — 通过ID查询
GET /exportXls — 导出Excel
POST /importExcel — 导入Excel
3. 同步日志 — StasSyncLogController (/stasSyncLog)
GET /list — 分页查询同步日志按start_time升序
4. 同步数量 — StasSyncNumController (/stasSyncNum)
GET /list — 按表名分组统计同步数量返回PieChartVO饼图数据
5. 同步记录 — StasSyncRecordController (/stasSyncRecord)
GET /list — 分页查询同步记录(关联数据源名称,支持时间区间过滤)
GET /taskHisSta — 历史同步数量统计
GET /taskStatsDay — 按日统计最近一个月同步数量
GET /taskStatsMonth — 按月统计最近一年同步数量
6. 任务配置 — StasTaskConfigController (/taskConfig)
GET /list — 分页查询任务(关联数据源名称)
GET /getMindMap — 数据表思维导图(异步并行查询)
POST /add — 添加任务含创建Quartz Job
PUT/POST /edit — 编辑任务含更新Quartz Job
DELETE /delete — 删除任务含删除Quartz Job和关联策略
GET /pause — 暂停定时任务
GET /resume — 启动定时任务
GET /execute — 立即执行定时任务
GET /syncDataByFieldType — 手动触发同步
DELETE /deleteBatch — 批量删除任务
GET /queryById — 通过ID查询
GET /exportXls — 导出Excel
POST /importExcel — 导入Excel
七、核心业务逻辑详解
--------------------------------------------------------------------------------
7.1 数据同步流程SyncDataJob / StasTaskConfigServiceImpl.syncDataByFieldType
执行链路:
StasTaskConfigController.execute/手动触发
→ StasTaskConfigServiceImpl.syncDataByFieldType(taskId) [Service层版本仅支持Oracle]
→ 或通过 QuartzJobService.execute → SyncDataJob.execute → SyncDataJob.syncDataByFieldType [Job版本支持Oracle+PG]
核心步骤:
1. 根据taskId获取 StasTaskConfig → 获取源/目标 StasDataSource
2. 创建 StasSyncRecord 记录(记录同步开始时间)
3. 构建源/目标JDBC连接
4. 获取该任务下所有 StasSyncStrategy
5. 对每个策略:
a. 判断依据字段类型isDateColumn
b. 如果是日期类型 → syncByDateRange按syncDay天数为批次
c. 如果是ID类型 → syncByIdRange按syncCount条数为批次
6. 更新 StasSyncRecord 结束时间
7.2 按日期范围同步syncByDateRange
- 获取源表日期范围MIN/MAX
- 读取 syncOrigin 作为上次断点
- 从断点开始,每次同步 syncDay 天
- 若结束日期超过最大日期,先删除目标库中最后一天的数据(避免重复)
- Oracle条件: BETWEEN TO_DATE('...', 'YYYY-MM-DD HH24:MI:SS')
- PostgreSQL条件: BETWEEN TIMESTAMP '...'
- 同步完成后更新 syncOrigin 为当前结束日期
7.3 按ID范围同步syncByIdRange
- 获取源表ID范围MIN/MAX
- 读取 syncOrigin 作为上次断点(从断点+1开始
- 每次同步 syncCount 条
- 若结束ID超过最大ID先删除目标库中最后一条的数据
- 同步完成后更新 syncOrigin 为当前结束ID
7.4 批量数据同步syncDataBatch
- 源库 SELECT * FROM "OWNER"."TABLE" WHERE ...
- fetchSize: 5000优化大表读取
- 目标库 INSERT INTO ... VALUES (?, ?, ...)
- batchSize: 5000 提交一次
- 特殊处理: Oracle→PG时TIMESTAMP/DATE类型使用 setTimestamp 而非 setObject
7.5 Oracle类型到PostgreSQL映射mapOracleTypeToPg / convertOracleTypeToPg
VARCHAR2 → VARCHAR(n), NVARCHAR2 → VARCHAR(n)
CHAR → CHAR(n), NCHAR → CHAR(n)
NUMBER(p,s) → NUMERIC(p,s), NUMBER → NUMERIC
FLOAT → DOUBLE PRECISION
DATE → TIMESTAMP, TIMESTAMP → TIMESTAMP
CLOB → TEXT, BLOB → BYTEA, RAW → BYTEA
LONG → TEXT, LONG RAW → BYTEA
默认 → TEXT
7.6 同步策略创建createTargetTables
- 先删除同taskId的旧策略
- 对每个策略:
a. 获取源/目标数据源信息
b. 检查目标表是否已存在
c. 若不存在生成建表DDL支持Oracle和Oracle→PG两种模式
d. 在目标库执行建表
e. 保存策略记录
7.7 Quartz任务管理
- 添加任务: saveAndScheduleJob → 创建QuartzJobjobClassName固定为SyncDataJob
- 编辑任务: 编辑后重新调度
- 暂停: schedulerDelete + 更新状态为STATUS_DISABLE
- 恢复: schedulerDelete + 重新schedulerAdd + 更新状态为STATUS_NORMAL
- 立即执行: SimpleTrigger0.1秒后执行一次
- 删除: schedulerDelete + removeById
八、重要VO类
--------------------------------------------------------------------------------
- DateRangeVO: minDate, maxDate日期范围
- IdRangeVO: minId, maxIdID范围
- MindMapVO: databaseName, List<UserInfoVO>(思维导图数据)
- UserInfoVO: userName, List<TableInfoVO>
- TableInfoVO: tableName, columnName
- TableColumnVO: columnName, typeName, columnSize, decimalDigits, isNullable, columnDef
- PieChartVO: name(表名), value(同步数), percent(百分比)
- SyncRecordVO: id, taskName, sourceName, targetName, startTime, endTime
- TaskHisStatsVO: taskName, syncNum
- TaskStatsResultVO: dateTimes(List<String>), taskStatsInfos(List<TaskStatsInfo>)
- TaskStatsInfo: taskName, taskStats(List<TaskStatsVO>)
- TaskStatsVO(base-core): id, taskName, dateTime, syncNum
九、代码风格与约定
--------------------------------------------------------------------------------
1. Controller层:
- 继承 JeecgController<Entity, Service>
- 使用 @Tag + @Operation 做Swagger文档
- 使用 @AutoLog 记录操作日志
- 返回统一 Result<T> 对象
- 分页查询使用 QueryGenerator.initQueryWrapper + MyBatis-Plus Page
2. Service层:
- 接口继承 IService<Entity>
- 实现类继承 ServiceImpl<Mapper, Entity>
- 使用 @RequiredArgsConstructor 或 @Autowired 注入
- 事务注解 @Transactional(rollbackFor = Exception.class)
3. 实体类:
- 使用 @TableName + @TableId(type = IdType.ASSIGN_ID)
- 日期字段使用 @JsonFormat + @DateTimeFormat
- 非数据库字段标记 @TableField(exist = false)
- 逻辑删除 @TableLogic
4. 异常处理:
- 业务异常统一抛出 JeecgBootException
- 数据库异常 SQLException 传播或包装
5. 数据库区分:
- Oracle: owner/table名使用大写 + 双引号, 条件使用 TO_DATE/TO_CHAR
- PostgreSQL: schema/table名使用小写 + 双引号, 条件使用 TIMESTAMP 字面量
十、已知注意事项与潜在问题
--------------------------------------------------------------------------------
1. StasTaskConfigServiceImpl.syncDataByFieldType 仅支持OracleisDateColumn写死了Oracle SQL
而 SyncDataJob.syncDataByFieldType 支持Oracle+PG通过dbType参数区分
两个类中存在大量重复的同步逻辑代码。
2. StasDataSourceServiceImpl.queryDatabaseMetadata 方法中PostgreSQL数据源连接时
仍使用 ORACLE_DRIVER 而非 POSTGRES_DRIVER可能存在Bug。
3. OracleSync 和 OracleToPgSync 是独立工具类有main方法不属于Spring容器管理
数据库连接信息硬编码,仅用于一次性迁移脚本。
4. SyncDataJob 通过 @Resource 注入Mapper因Quartz Job不由Spring直接管理实例
需要Spring的Quartz集成支持依赖注入。
5. StasTaskConfigServiceImpl.getMindMap 使用 CompletableFuture.supplyAsync 但
使用默认线程池ForkJoinPool在高并发场景可能需要自定义线程池。
6. 同步策略的 createTargetTables 方法在事务中执行DDL操作CREATE TABLE
某些数据库驱动可能不支持DDL回滚。
7. syncByDateRange 中删除目标库数据的逻辑deleteByEquals在边界条件处理上
可能存在数据一致性问题(先删后插期间如有并发访问可能看到空数据)。
十一、模块间依赖关系
--------------------------------------------------------------------------------
jeecg-module-sync 依赖:
└── jeecg-boot-base-core
├── 实体类: StasDataSource, StasSyncStrategy, StasTaskConfig, StasSyncLog,
│ StasSyncNum, StasSyncRecord, QuartzJob
├── Mapper: StasDataSourceMapper, StasSyncStrategyMapper, StasTaskConfigMapper,
│ StasSyncLogMapper, StasSyncNumMapper, StasSyncRecordMapper, QuartzJobMapper
├── VO: TaskStatsVO
├── 枚举: SourceDataTypeEnum, SyncTaskStatusEnum
├── 常量: QuartzJobConstant, CommonConstant
├── 工具: DBUtil, DateUtils
└── 基础: JeecgController, Result, QueryGenerator, JeecgBootException
外部依赖(通过 jeecg-boot-base-core 间接引入):
- MyBatis-Plus (分页、条件构造、CRUD)
- Quartz Scheduler (定时任务)
- Swagger/OpenAPI (API文档)
- Lombok (代码简化)
- Apache Commons Lang (StringUtils)
- Oracle JDBC Driver / PostgreSQL JDBC Driver
十二、SQL数据库表结构参考
--------------------------------------------------------------------------------
核心表(定义在 db/stas.sql 中):
- stas_data_source: 数据源配置表
- stas_sync_strategy: 同步策略表
- stas_task_config: 任务配置表
- stas_sync_log: 同步日志表
- stas_sync_num: 同步数量统计表
- stas_sync_record: 同步记录表
Quartz表定义在 db/stas.sql 或 Quartz默认脚本中:
- sys_quartz_job: 定时任务表
================================================================================
文件生成时间: 2026-05-09
适用于: AI辅助代码编辑上下文参考
================================================================================

View File

@ -22,6 +22,7 @@ import org.jeecg.modules.base.entity.StasSyncStrategy;
import org.jeecg.modules.base.entity.StasTaskConfig;
import org.jeecg.modules.base.service.BaseCommonService;
import org.jeecg.taskConfig.service.IStasTaskConfigService;
import org.jeecg.taskConfig.service.TimeoutProbeService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.*;
@ -44,6 +45,8 @@ public class StasDataSourceController extends JeecgController<StasDataSource, IS
private IStasDataSourceService stasDataSourceService;
@Autowired
private IStasTaskConfigService stasTaskConfigService;
@Autowired
private TimeoutProbeService timeoutProbeService;
/**
* 分页列表查询
@ -272,5 +275,67 @@ public class StasDataSourceController extends JeecgController<StasDataSource, IS
}
return result;
}
/**
* 手动触发超时探测
* @param id 数据源ID
* @return 探测结果
*/
@AutoLog(value = "数据源-手动超时探测")
@Operation(summary = "数据源-手动超时探测")
@GetMapping(value = "/probeTimeout")
public Result<Map<String, Object>> probeTimeout(@RequestParam(name="id", required=true) String id) {
try {
StasDataSource dataSource = stasDataSourceService.getById(id);
if (dataSource == null) {
return Result.error("未找到对应数据源");
}
timeoutProbeService.probeAndDetectTimeout(dataSource);
// 重新查询获取更新后的值
dataSource = stasDataSourceService.getById(id);
Map<String, Object> probeResult = new HashMap<>();
probeResult.put("id", dataSource.getId());
probeResult.put("instanceName", dataSource.getInstanceName());
probeResult.put("detectedConnectTimeout", dataSource.getDetectedConnectTimeout());
probeResult.put("detectedReadTimeout", dataSource.getDetectedReadTimeout());
probeResult.put("lastProbeTime", dataSource.getLastProbeTime());
return Result.OK("探测完成", probeResult);
} catch (Exception e) {
log.error("超时探测失败", e);
return Result.error("探测失败: " + e.getMessage());
}
}
/**
* 查询数据源当前生效的超时配置
* @param id 数据源ID
* @return 超时配置信息
*/
@AutoLog(value = "数据源-查询超时配置")
@Operation(summary = "数据源-查询超时配置")
@GetMapping(value = "/timeoutInfo")
public Result<Map<String, Object>> timeoutInfo(@RequestParam(name="id", required=true) String id) {
try {
StasDataSource dataSource = stasDataSourceService.getById(id);
if (dataSource == null) {
return Result.error("未找到对应数据源");
}
int[] effectiveTimeout = timeoutProbeService.getEffectiveTimeout(dataSource);
Map<String, Object> info = new HashMap<>();
info.put("id", dataSource.getId());
info.put("instanceName", dataSource.getInstanceName());
info.put("type", dataSource.getType());
info.put("detectedConnectTimeout", dataSource.getDetectedConnectTimeout());
info.put("detectedReadTimeout", dataSource.getDetectedReadTimeout());
info.put("lastProbeTime", dataSource.getLastProbeTime());
info.put("effectiveConnectTimeoutMs", effectiveTimeout[0]);
info.put("effectiveReadTimeoutMs", effectiveTimeout[1]);
info.put("isProbed", dataSource.getDetectedConnectTimeout() != null);
return Result.OK(info);
} catch (Exception e) {
log.error("查询超时配置失败", e);
return Result.error("查询失败: " + e.getMessage());
}
}
}

View File

@ -143,7 +143,9 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
List<String> resultList = new ArrayList<>();
try (Connection conn = DBUtil.getConnection(urlSource, stasDataSource.getUsername(),
stasDataSource.getPassword(), DBUtil.ORACLE_DRIVER, 1);
stasDataSource.getPassword(),
SourceDataTypeEnum.POSTGRES.getKey().equals(stasDataSource.getType()) ? DBUtil.POSTGRES_DRIVER : DBUtil.ORACLE_DRIVER,
SourceDataTypeEnum.POSTGRES.getKey().equals(stasDataSource.getType()) ? 0 : 1);
PreparedStatement st = conn.prepareStatement(sql)) {
// 设置参数

View File

@ -9,6 +9,8 @@ import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.mapper.*;
import org.jeecg.taskConfig.service.SyncAdaptiveStrategy;
import org.jeecg.taskConfig.service.TimeoutProbeService;
import org.jeecg.vo.DateRangeVO;
import org.jeecg.vo.IdRangeVO;
import org.quartz.Job;
@ -23,6 +25,7 @@ import java.util.List;
/**
* 数据同步任务(支持Oracle和PostgreSQL)
* 增强功能超时自动探测批次自适应调整重试机制
*/
@Slf4j
public class SyncDataJob implements Job {
@ -39,6 +42,10 @@ public class SyncDataJob implements Job {
private StasSyncLogMapper stasSyncLogMapper;
@Resource
private StasSyncNumMapper stasSyncNumMapper;
@Resource
private SyncAdaptiveStrategy syncAdaptiveStrategy;
@Resource
private TimeoutProbeService timeoutProbeService;
private String parameter;
@ -56,7 +63,7 @@ public class SyncDataJob implements Job {
}
/**
* 根据字段类型同步数据
* 根据字段类型同步数据增强版自动探测超时 + 自适应批次 + 重试
*/
public void syncDataByFieldType(String taskId) throws SQLException {
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId);
@ -71,16 +78,41 @@ public class SyncDataJob implements Job {
stasSyncRecordMapper.insert(stasSyncRecord);
String recordId = stasSyncRecord.getId();
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl;
if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}else{
targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
// 构建源/目标URL和Driver
String sourceUrl;
String sourceDriver;
int sourceType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType())) {
sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.ORACLE_DRIVER;
sourceType = 1;
} else {
sourceUrl = DBUtil.getPgUrlWithKeepAlive(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.POSTGRES_DRIVER;
sourceType = 0;
}
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
String targetUrl;
String targetDriver;
int targetType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())) {
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.ORACLE_DRIVER;
targetType = 1;
} else {
targetUrl = DBUtil.getPgUrlWithKeepAlive(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.POSTGRES_DRIVER;
targetType = 0;
}
// 获取自动探测的超时值
int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo);
int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo);
try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),
sourceDriver, sourceType, sourceTimeouts[0], sourceTimeouts[1]);
Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),
targetDriver, targetType, targetTimeouts[0], targetTimeouts[1])) {
// 设置目标连接为批量提交模式
targetConn.setAutoCommit(false);
@ -89,25 +121,81 @@ public class SyncDataJob implements Job {
.selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()));
saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null);
long startTime = System.currentTimeMillis();
boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType());
try {
if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType(), recordId);
} else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId);
// 获取自适应批次大小
int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay());
int effectiveSyncCount = syncAdaptiveStrategy.getEffectiveSyncCount(stasSyncStrategy, stasTaskConfig.getSyncCount());
boolean syncSuccess = false;
int retryIndex = 0;
while (!syncSuccess) {
try {
if (isDate) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex);
} else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex);
}
syncSuccess = true;
// 同步成功逐步恢复批次
syncAdaptiveStrategy.onSyncSuccess(stasSyncStrategy,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
} catch (SQLException | ParseException e) {
SyncAdaptiveStrategy.ErrorType errorType = (e instanceof SQLException)
? syncAdaptiveStrategy.analyzeSQLException((SQLException) e)
: SyncAdaptiveStrategy.ErrorType.OTHER;
retryIndex++;
saveSyncLog(recordId, String.format("同步表 %s 时出错(第%d次): %s [错误类型: %s]",
stasSyncStrategy.getTableName(), retryIndex, e.getMessage(), errorType.name()),
errorType.name(), isDate ? effectiveSyncDay : effectiveSyncCount, retryIndex);
// 检查超时配置是否需要调整
syncAdaptiveStrategy.checkAndAdjustTimeout(sourceInfo, errorType);
syncAdaptiveStrategy.checkAndAdjustTimeout(targetInfo, errorType);
if ((errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT
|| errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION)
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
// 缩减批次大小并重试
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1];
saveSyncLog(recordId, String.format("表 %s 批次已调整,准备第%d次重试: syncDay=%d, syncCount=%d",
stasSyncStrategy.getTableName(), retryIndex + 1, effectiveSyncDay, effectiveSyncCount),
errorType.name(), isDate ? effectiveSyncDay : effectiveSyncCount, retryIndex);
// 尝试回滚并重建连接
try { targetConn.rollback(); } catch (SQLException ignored) {}
// 如果连接已关闭需要重建
if (sourceConn.isClosed() || targetConn.isClosed()) {
saveSyncLog(recordId, String.format("表 %s 连接已断开,无法重试,跳过该表", stasSyncStrategy.getTableName()),
errorType.name(), null, retryIndex);
break;
}
continue;
} else {
// 非超时/截断错误或已达到重试上限
try { targetConn.rollback(); } catch (SQLException ignored) {}
saveSyncLog(recordId, String.format("表 %s 同步失败,跳过该表继续同步其他表: %s",
stasSyncStrategy.getTableName(), e.getMessage()),
errorType.name(), null, retryIndex);
break; // 跳过当前表继续下一个表
}
}
} catch (SQLException | ParseException e) {
saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage()));
targetConn.rollback();
throw new JeecgBootException(e.getMessage());
}
long endTime = System.currentTimeMillis();
saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime)));
saveSyncLog(recordId, String.format("表 %s 同步耗时: %s 毫秒%s", stasSyncStrategy.getTableName(),
(endTime - startTime), syncSuccess ? "" : " (失败)"), null, null, null);
}
}
stasSyncRecord.setEndTime(new Date());
@ -136,7 +224,6 @@ public class SyncDataJob implements Job {
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
String dataType = rs.getString("data_type").toUpperCase();
// 判断是否为日期/时间类型
return dataType.contains("DATE") || dataType.contains("TIME");
}
}
@ -147,32 +234,29 @@ public class SyncDataJob implements Job {
* 按日期范围同步数据
*/
public void syncByDateRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException, ParseException {
// 获取最小和最大日期
StasSyncStrategy stasSyncStrategy, Integer syncDay,
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException, ParseException {
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate();
Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ?
lastSyncedDate : dateRange.getMinDate();
Date currentEnd = addDays(currentStart, syncCount);
Date currentEnd = addDays(currentStart, syncDay);
if (currentEnd.after(dateRange.getMaxDate())) {
currentEnd = dateRange.getMaxDate();
StringBuilder whereClause = new StringBuilder();
whereClause.append("TO_CHAR(")
StringBuilder deleteClause = new StringBuilder();
deleteClause.append("TO_CHAR(")
.append(stasSyncStrategy.getColumnName())
.append(", 'YYYY-MM-DD HH24:MI:SS') = '")
.append(sdf.format(currentEnd))
.append("'");
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString());
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString());
}
// 根据数据库类型构建不同的日期条件
String whereClause;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
@ -182,20 +266,24 @@ public class SyncDataJob implements Job {
"' AND TIMESTAMP '" + sdf.format(currentEnd) + "'";
}
saveSyncLog(recordId,String.format("%s表同步日期范围: %s 至 %s", stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd)));
int batchSizeBefore = syncDay;
saveSyncLog(recordId, String.format("%s表同步日期范围: %s 至 %s (批次天数: %d)",
stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd), syncDay),
null, batchSizeBefore, retryIndex > 0 ? retryIndex : null);
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType);
saveSyncLog(recordId,String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
// 更新同步位置
if (currentEnd != null) {
stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd)));
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd)),
null, null, null);
}
}
@ -204,11 +292,9 @@ public class SyncDataJob implements Job {
*/
public void syncByIdRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException {
// 获取最小和最大ID
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException {
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType);
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
@ -218,45 +304,48 @@ public class SyncDataJob implements Job {
long currentEnd = currentStart + syncCount - 1;
if (currentEnd > idRange.getMaxId()) {
currentEnd = idRange.getMaxId();
StringBuilder whereClause = new StringBuilder();
whereClause.append(stasSyncStrategy.getColumnName())
StringBuilder deleteClause = new StringBuilder();
deleteClause.append(stasSyncStrategy.getColumnName())
.append(" = '")
.append(currentEnd)
.append("'");
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString());
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString());
}
// 确保起始位置不超过结束位置
if (currentStart > currentEnd) {
saveSyncLog(recordId, String.format("%s表已同步到最新无需继续同步", stasSyncStrategy.getTableName()));
saveSyncLog(recordId, String.format("%s表已同步到最新无需继续同步", stasSyncStrategy.getTableName()),
null, null, null);
return;
}
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s", stasSyncStrategy.getTableName(), currentStart, currentEnd));
int batchSizeBefore = syncCount;
saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s (批次数量: %d)",
stasSyncStrategy.getTableName(), currentStart, currentEnd, syncCount),
null, batchSizeBefore, retryIndex > 0 ? retryIndex : null);
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType);
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
// 更新同步位置
if (currentEnd > 0) {
stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd));
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd),
null, null, null);
}
}
/**
* 根据日期等于或ID等于删除数据使用yyyy-MM-dd格式比较
* 根据日期等于或ID等于删除数据
*/
public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException {
String sql;
// 构建完整的SQL语句
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" +
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
@ -320,6 +409,7 @@ public class SyncDataJob implements Job {
/**
* 同步一批数据(支持跨数据库类型)
* 增强版细粒度异常捕获截断时尝试提交已成功部分
*/
public int syncDataBatch(Connection sourceConn, Connection targetConn,
String sourceOwner, String targetOwner,
@ -328,7 +418,6 @@ public class SyncDataJob implements Job {
int totalRows = 0;
String selectSql;
// 构建查询SQL(根据源数据库类型)
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
"\" WHERE " + whereClause;
@ -341,11 +430,10 @@ public class SyncDataJob implements Job {
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = sourceStmt.executeQuery(selectSql)) {
sourceStmt.setFetchSize(5000); // 优化大表读取
sourceStmt.setFetchSize(5000);
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// 构建插入SQL(根据目标数据库类型)
String insertSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES (";
@ -359,35 +447,50 @@ public class SyncDataJob implements Job {
}
insertSql += ")";
// 批量插入
try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) {
int batchSize = 0;
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i);
try {
for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i);
// 特殊处理Oracle的TIMESTAMP类型到PostgreSQL
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) {
String columnType = metaData.getColumnTypeName(i);
if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) {
Timestamp ts = rs.getTimestamp(i);
if (ts != null) {
pstmt.setTimestamp(i, ts);
continue;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) {
String columnType = metaData.getColumnTypeName(i);
if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) {
Timestamp ts = rs.getTimestamp(i);
if (ts != null) {
pstmt.setTimestamp(i, ts);
continue;
}
}
}
pstmt.setObject(i, value);
}
pstmt.addBatch();
batchSize++;
totalRows++;
if (batchSize % 5000 == 0) {
pstmt.executeBatch();
targetConn.commit();
batchSize = 0;
}
} catch (SQLException e) {
// 单行处理异常尝试提交已成功部分
SyncAdaptiveStrategy.ErrorType errorType = syncAdaptiveStrategy.analyzeSQLException(e);
if (errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION || errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT) {
// 截断/超时错误先提交已成功的数据
if (batchSize > 0) {
try {
pstmt.executeBatch();
targetConn.commit();
} catch (SQLException commitEx) {
log.warn("提交已成功部分失败: {}", commitEx.getMessage());
}
}
}
pstmt.setObject(i, value);
}
pstmt.addBatch();
batchSize++;
totalRows++;
if (batchSize % 5000 == 0) {
pstmt.executeBatch();
targetConn.commit();
batchSize = 0;
throw e; // 向上抛出由外层重试逻辑处理
}
}
if (batchSize > 0) {
@ -400,11 +503,14 @@ public class SyncDataJob implements Job {
}
private void saveSyncLog(String recordId, String desc){
private void saveSyncLog(String recordId, String desc, String errorType, Integer batchSize, Integer retryIndex){
StasSyncLog stasSyncLog = new StasSyncLog();
stasSyncLog.setRecordId(recordId);
stasSyncLog.setStartTime(new Date());
stasSyncLog.setDescription(desc);
stasSyncLog.setErrorType(errorType);
stasSyncLog.setBatchSizeBefore(batchSize);
stasSyncLog.setRetryIndex(retryIndex);
stasSyncLogMapper.insert(stasSyncLog);
}
@ -423,4 +529,4 @@ public class SyncDataJob implements Job {
long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000;
return new Date(time);
}
}
}

View File

@ -0,0 +1,261 @@
package org.jeecg.taskConfig.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.modules.base.entity.StasDataSource;
import org.jeecg.modules.base.entity.StasSyncStrategy;
import org.jeecg.modules.base.mapper.StasSyncStrategyMapper;
import org.springframework.stereotype.Component;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
/**
* 同步自适应策略组件
* 负责异常分析批次调整重试决策
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class SyncAdaptiveStrategy {
private final StasSyncStrategyMapper stasSyncStrategyMapper;
private final TimeoutProbeService timeoutProbeService;
/** 最大重试次数 */
private static final int MAX_RETRY_COUNT = 3;
/** 批次缩减比例 */
private static final double BATCH_REDUCTION_RATIO = 0.5;
/** 成功后批次恢复比例 */
private static final double BATCH_RECOVERY_RATIO = 1.2;
/** 默认最小批次天数 */
private static final int DEFAULT_MIN_SYNC_DAY = 1;
/** 默认最小批次数量 */
private static final int DEFAULT_MIN_SYNC_COUNT = 100;
// Oracle 超时/连接中断关键字
private static final List<String> ORACLE_TIMEOUT_KEYWORDS = Arrays.asList(
"ORA-02396", "ORA-03113", "ORA-03114", "ORA-04030",
"Closed Connection", "Connection timed out", "Read timed out",
"ORA-12571", "ORA-03135", "ORA-02396", "ORA-00054",
"No more data to read from socket", "Connection reset"
);
// PostgreSQL 超时/连接中断关键字
private static final List<String> PG_TIMEOUT_KEYWORDS = Arrays.asList(
"FATAL: terminating connection", "connection timed out",
"socket closed", "I/O error", "Connection refused",
"An I/O error occurred", "Software caused connection abort"
);
// 截断/语句超时关键字
private static final List<String> TRUNCATION_KEYWORDS = Arrays.asList(
"ORA-01013", "statement timeout", "canceling statement",
"query_canceled", "ORA-00039", "timeout",
"SQLTimeoutException", "The query was canceled"
);
/**
* 错误类型枚举
*/
public enum ErrorType {
TIMEOUT, TRUNCATION, OTHER
}
/**
* 分析SQLException类型识别超时/截断错误
*/
public ErrorType analyzeSQLException(SQLException e) {
if (e == null) {
return ErrorType.OTHER;
}
String message = e.getMessage();
if (message == null) {
// 检查cause
if (e.getCause() != null && e.getCause().getMessage() != null) {
message = e.getCause().getMessage();
} else {
return ErrorType.OTHER;
}
}
String upperMessage = message.toUpperCase();
// 优先检查截断更具体
for (String keyword : TRUNCATION_KEYWORDS) {
if (upperMessage.contains(keyword.toUpperCase())) {
return ErrorType.TRUNCATION;
}
}
// 检查Oracle超时
for (String keyword : ORACLE_TIMEOUT_KEYWORDS) {
if (upperMessage.contains(keyword.toUpperCase())) {
return ErrorType.TIMEOUT;
}
}
// 检查PG超时
for (String keyword : PG_TIMEOUT_KEYWORDS) {
if (upperMessage.contains(keyword.toUpperCase())) {
return ErrorType.TIMEOUT;
}
}
// 检查SQLState
if (e.getSQLState() != null) {
// 08xxx = 连接异常, 57014 = 语句超时
String state = e.getSQLState();
if (state.startsWith("08")) {
return ErrorType.TIMEOUT;
}
if ("57014".equals(state)) {
return ErrorType.TRUNCATION;
}
}
// 检查是否为SQLTimeoutException
if (e instanceof SQLTimeoutException) {
return ErrorType.TIMEOUT;
}
// 检查BatchUpdateException中的cause
if (e.getCause() instanceof SQLException) {
ErrorType causeType = analyzeSQLException((SQLException) e.getCause());
if (causeType != ErrorType.OTHER) {
return causeType;
}
}
return ErrorType.OTHER;
}
/**
* 根据错误类型自动缩减批次大小
* @param strategy 同步策略
* @param isDateColumn 是否按日期同步
* @param taskConfigSyncDay taskConfig原始批次天数
* @param taskConfigSyncCount taskConfig原始批次数量
* @return 调整后的[batchDay, batchCount]
*/
public int[] adjustBatchSize(StasSyncStrategy strategy, boolean isDateColumn,
int taskConfigSyncDay, int taskConfigSyncCount) {
int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY;
int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT;
int currentSyncDay = strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : taskConfigSyncDay;
int currentSyncCount = strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : taskConfigSyncCount;
int newSyncDay = Math.max((int)(currentSyncDay * BATCH_REDUCTION_RATIO), minSyncDay);
int newSyncCount = Math.max((int)(currentSyncCount * BATCH_REDUCTION_RATIO), minSyncCount);
strategy.setCurrentSyncDay(newSyncDay);
strategy.setCurrentSyncCount(newSyncCount);
strategy.setRetryCount(strategy.getRetryCount() != null ? strategy.getRetryCount() + 1 : 1);
strategy.setLastErrorType(ErrorType.TIMEOUT.name());
stasSyncStrategyMapper.updateById(strategy);
log.info("策略[{}.{}]批次调整: syncDay {} -> {}, syncCount {} -> {}",
strategy.getSourceOwner(), strategy.getTableName(),
currentSyncDay, newSyncDay, currentSyncCount, newSyncCount);
return new int[]{newSyncDay, newSyncCount};
}
/**
* 同步成功后逐步恢复批次大小
* @param strategy 同步策略
* @param taskConfigSyncDay taskConfig原始批次天数
* @param taskConfigSyncCount taskConfig原始批次数量
*/
public void onSyncSuccess(StasSyncStrategy strategy, int taskConfigSyncDay, int taskConfigSyncCount) {
boolean needsUpdate = false;
// 恢复批次天数
if (strategy.getCurrentSyncDay() != null && strategy.getCurrentSyncDay() < taskConfigSyncDay) {
int newDay = Math.min((int)(strategy.getCurrentSyncDay() * BATCH_RECOVERY_RATIO), taskConfigSyncDay);
strategy.setCurrentSyncDay(newDay);
needsUpdate = true;
}
// 恢复批次数量
if (strategy.getCurrentSyncCount() != null && strategy.getCurrentSyncCount() < taskConfigSyncCount) {
int newCount = Math.min((int)(strategy.getCurrentSyncCount() * BATCH_RECOVERY_RATIO), taskConfigSyncCount);
strategy.setCurrentSyncCount(newCount);
needsUpdate = true;
}
// 清零重试计数和错误类型
if (strategy.getRetryCount() != null && strategy.getRetryCount() > 0) {
strategy.setRetryCount(0);
needsUpdate = true;
}
if (strategy.getLastErrorType() != null) {
strategy.setLastErrorType(null);
needsUpdate = true;
}
if (needsUpdate) {
stasSyncStrategyMapper.updateById(strategy);
log.info("策略[{}.{}]同步成功,批次恢复: syncDay={}, syncCount={}",
strategy.getSourceOwner(), strategy.getTableName(),
strategy.getCurrentSyncDay(), strategy.getCurrentSyncCount());
}
// 若已恢复到taskConfig原始值清空自适应字段
if (Integer.valueOf(taskConfigSyncDay).equals(strategy.getCurrentSyncDay())
&& Integer.valueOf(taskConfigSyncCount).equals(strategy.getCurrentSyncCount())) {
strategy.setCurrentSyncDay(null);
strategy.setCurrentSyncCount(null);
stasSyncStrategyMapper.updateById(strategy);
}
}
/**
* 判断是否应该重试
* @param strategy 同步策略
* @return 是否应该重试
*/
public boolean shouldRetry(StasSyncStrategy strategy) {
int retryCount = strategy.getRetryCount() != null ? strategy.getRetryCount() : 0;
if (retryCount >= MAX_RETRY_COUNT) {
return false;
}
// 检查批次是否还能缩减
int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY;
int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT;
int currentSyncDay = strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : Integer.MAX_VALUE;
int currentSyncCount = strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : Integer.MAX_VALUE;
return currentSyncDay > minSyncDay || currentSyncCount > minSyncCount;
}
/**
* 获取当前生效的批次天数
*/
public int getEffectiveSyncDay(StasSyncStrategy strategy, int taskConfigSyncDay) {
return strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : taskConfigSyncDay;
}
/**
* 获取当前生效的批次数量
*/
public int getEffectiveSyncCount(StasSyncStrategy strategy, int taskConfigSyncCount) {
return strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : taskConfigSyncCount;
}
/**
* 同步失败后检查是否需要调整超时配置
*/
public void checkAndAdjustTimeout(StasDataSource dataSource, ErrorType errorType) {
if (errorType == ErrorType.TIMEOUT || errorType == ErrorType.TRUNCATION) {
try {
timeoutProbeService.adjustTimeoutFromLogs(dataSource);
} catch (Exception e) {
log.warn("调整数据源[{}]超时配置失败: {}", dataSource.getInstanceName(), e.getMessage());
}
}
}
}

View File

@ -0,0 +1,333 @@
package org.jeecg.taskConfig.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.StasDataSource;
import org.jeecg.modules.base.mapper.StasDataSourceMapper;
import org.jeecg.modules.base.mapper.StasSyncLogMapper;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.util.Date;
import java.util.Properties;
/**
* 超时自动探测服务
* 采用探针探测 + 历史日志分析结合方案
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class TimeoutProbeService {
private final StasDataSourceMapper stasDataSourceMapper;
private final StasSyncLogMapper stasSyncLogMapper;
/** 探针重复次数 */
private static final int PROBE_REPEAT_COUNT = 3;
/** 最小连接超时(秒) */
private static final int MIN_CONNECT_TIMEOUT_SEC = 30;
/** 最小读取超时(秒) */
private static final int MIN_READ_TIMEOUT_SEC = 120;
/** 连接超时倍率(相对平均延迟) */
private static final int CONNECT_TIMEOUT_MULTIPLIER = 5;
/** 读取超时倍率(相对平均延迟) */
private static final int READ_TIMEOUT_MULTIPLIER = 20;
/** 探测结果过期天数 */
private static final int PROBE_EXPIRE_DAYS = 7;
/** 日志分析窗口天数 */
private static final int LOG_ANALYSIS_WINDOW_DAYS = 30;
/** 超时错误率阈值 */
private static final double TIMEOUT_ERROR_RATE_THRESHOLD = 0.2;
/** 连续无超时日志条数阈值(满足此条件则温和下调) */
private static final int CONSECUTIVE_NON_TIMEOUT_THRESHOLD = 5;
/**
* 获取生效超时值供DBUtil.getConnection使用
* 若未探测过或已过期自动触发探测
* @return [connectTimeoutMs, readTimeoutMs]
*/
public int[] getEffectiveTimeout(StasDataSource dataSource) {
// 检查是否需要重新探测
if (dataSource.getDetectedConnectTimeout() == null || isProbeExpired(dataSource)) {
probeAndDetectTimeout(dataSource);
}
// 基于日志动态微调
adjustTimeoutFromLogs(dataSource);
int connectTimeoutMs = dataSource.getDetectedConnectTimeout() != null
? dataSource.getDetectedConnectTimeout() * 1000
: DBUtil.DEFAULT_ORACLE_CONNECT_TIMEOUT;
int readTimeoutMs = dataSource.getDetectedReadTimeout() != null
? dataSource.getDetectedReadTimeout() * 1000
: DBUtil.DEFAULT_ORACLE_READ_TIMEOUT;
return new int[]{connectTimeoutMs, readTimeoutMs};
}
/**
* 首次连接自动探测
*/
public void probeAndDetectTimeout(StasDataSource dataSource) {
boolean isOracle = SourceDataTypeEnum.ORACLE.getKey().equals(dataSource.getType());
boolean isPg = SourceDataTypeEnum.POSTGRES.getKey().equals(dataSource.getType());
String url;
String driver;
String probeSql;
if (isOracle) {
url = DBUtil.getUrl(dataSource.getIpAddress(), dataSource.getPort(), dataSource.getServeId());
driver = DBUtil.ORACLE_DRIVER;
probeSql = "SELECT 1 FROM DUAL";
} else if (isPg) {
url = DBUtil.getPgUrlWithKeepAlive(dataSource.getIpAddress(), dataSource.getPort(), dataSource.getServeId());
driver = DBUtil.POSTGRES_DRIVER;
probeSql = "SELECT 1";
} else {
log.warn("不支持的数据源类型: {}, 跳过超时探测", dataSource.getType());
return;
}
long totalLatency = 0;
int successCount = 0;
for (int i = 0; i < PROBE_REPEAT_COUNT; i++) {
try {
long start = System.currentTimeMillis();
Class.forName(driver);
Properties props = new Properties();
props.put("user", dataSource.getUsername());
props.put("password", dataSource.getPassword());
// 探针自身使用短超时
if (isOracle) {
props.put("oracle.net.CONNECT_TIMEOUT", "10000");
props.put("oracle.jdbc.ReadTimeout", "30000");
} else {
props.put("loginTimeout", "10");
props.put("socketTimeout", "30");
}
try (Connection conn = DriverManager.getConnection(url, props);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(probeSql)) {
if (rs.next()) {
long elapsed = System.currentTimeMillis() - start;
totalLatency += elapsed;
successCount++;
log.info("数据源[{}]探针第{}次响应: {}ms", dataSource.getInstanceName(), i + 1, elapsed);
}
}
} catch (Exception e) {
log.warn("数据源[{}]探针第{}次失败: {}", dataSource.getInstanceName(), i + 1, e.getMessage());
}
}
if (successCount == 0) {
log.error("数据源[{}]探针全部失败,使用默认超时", dataSource.getInstanceName());
return;
}
long avgLatencyMs = totalLatency / successCount;
// 转为秒计算
long avgLatencySec = Math.max(avgLatencyMs / 1000, 1);
int connectTimeoutSec = Math.max((int)(avgLatencySec * CONNECT_TIMEOUT_MULTIPLIER), MIN_CONNECT_TIMEOUT_SEC);
int readTimeoutSec = Math.max((int)(avgLatencySec * READ_TIMEOUT_MULTIPLIER), MIN_READ_TIMEOUT_SEC);
// 查询数据库自身超时配置作为上限参考
int[] dbLimits = queryDatabaseTimeoutLimits(dataSource, url, driver, isOracle);
if (dbLimits != null) {
int dbConnectLimit = dbLimits[0];
int dbReadLimit = dbLimits[1];
if (dbConnectLimit > 0 && connectTimeoutSec > dbConnectLimit) {
connectTimeoutSec = (int)(dbConnectLimit * 0.8);
}
if (dbReadLimit > 0 && readTimeoutSec > dbReadLimit) {
readTimeoutSec = (int)(dbReadLimit * 0.8);
}
}
log.info("数据源[{}]探测结果: avgLatency={}ms, connectTimeout={}s, readTimeout={}s",
dataSource.getInstanceName(), avgLatencyMs, connectTimeoutSec, readTimeoutSec);
dataSource.setDetectedConnectTimeout(connectTimeoutSec);
dataSource.setDetectedReadTimeout(readTimeoutSec);
dataSource.setLastProbeTime(new Date());
stasDataSourceMapper.updateById(dataSource);
}
/**
* 基于历史日志动态调整超时
*/
public void adjustTimeoutFromLogs(StasDataSource dataSource) {
try {
int timeoutErrors = stasSyncLogMapper.countTimeoutErrorsByDataSource(
dataSource.getId(), LOG_ANALYSIS_WINDOW_DAYS);
Long avgSyncTimeMs = stasSyncLogMapper.avgSyncTimeByDataSource(
dataSource.getId(), LOG_ANALYSIS_WINDOW_DAYS);
int consecutiveNonTimeout = stasSyncLogMapper.countConsecutiveNonTimeoutByDataSource(
dataSource.getId(), CONSECUTIVE_NON_TIMEOUT_THRESHOLD);
int currentReadTimeout = dataSource.getDetectedReadTimeout() != null
? dataSource.getDetectedReadTimeout() : MIN_READ_TIMEOUT_SEC;
// 计算超时错误率
int totalErrors = timeoutErrors;
double errorRate = totalErrors > 0 ? (double) timeoutErrors / Math.max(totalErrors, 1) : 0;
if (errorRate > TIMEOUT_ERROR_RATE_THRESHOLD && avgSyncTimeMs != null && avgSyncTimeMs > 0) {
// 超时错误率过高增大读取超时
int newReadTimeout = (int) Math.max(currentReadTimeout * 1.5,
(avgSyncTimeMs / 1000) * 3);
log.info("数据源[{}]超时错误率{}}过高,读取超时从{}s调整为{}s",
dataSource.getInstanceName(), String.format("%.2f", errorRate),
currentReadTimeout, newReadTimeout);
dataSource.setDetectedReadTimeout(newReadTimeout);
stasDataSourceMapper.updateById(dataSource);
} else if (consecutiveNonTimeout >= CONSECUTIVE_NON_TIMEOUT_THRESHOLD
&& avgSyncTimeMs != null && avgSyncTimeMs > 0) {
// 连续无超时错误温和下调
int newReadTimeout = (int) Math.max(currentReadTimeout * 0.8,
Math.max((avgSyncTimeMs / 1000) * 2, MIN_READ_TIMEOUT_SEC));
if (newReadTimeout < currentReadTimeout) {
log.info("数据源[{}]连续无超时错误,读取超时从{}s调整为{}s",
dataSource.getInstanceName(), currentReadTimeout, newReadTimeout);
dataSource.setDetectedReadTimeout(newReadTimeout);
stasDataSourceMapper.updateById(dataSource);
}
}
} catch (Exception e) {
log.warn("数据源[{}]日志分析调整超时失败: {}", dataSource.getInstanceName(), e.getMessage());
}
}
/**
* 查询数据库自身超时配置
* @return [connectTimeLimitSec, idleTimeLimitSec] null
*/
private int[] queryDatabaseTimeoutLimits(StasDataSource dataSource, String url, String driver, boolean isOracle) {
try {
Class.forName(driver);
Properties props = new Properties();
props.put("user", dataSource.getUsername());
props.put("password", dataSource.getPassword());
if (isOracle) {
props.put("oracle.net.CONNECT_TIMEOUT", "10000");
props.put("oracle.jdbc.ReadTimeout", "30000");
} else {
props.put("loginTimeout", "10");
props.put("socketTimeout", "30");
}
try (Connection conn = DriverManager.getConnection(url, props)) {
if (isOracle) {
return queryOracleTimeoutLimits(conn);
} else {
return queryPgTimeoutLimits(conn);
}
}
} catch (Exception e) {
log.warn("查询数据源[{}]超时配置失败: {}", dataSource.getInstanceName(), e.getMessage());
return null;
}
}
/**
* 查询Oracle数据库超时限制
*/
private int[] queryOracleTimeoutLimits(Connection conn) {
int connectTimeLimit = -1;
int idleTimeLimit = -1;
String sql = "SELECT resource_name, limit FROM dba_profiles " +
"WHERE profile = 'DEFAULT' AND resource_name IN ('CONNECT_TIME', 'IDLE_TIME')";
try (PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
String resourceName = rs.getString("resource_name");
String limitStr = rs.getString("limit");
if ("DEFAULT".equalsIgnoreCase(limitStr) || "UNLIMITED".equalsIgnoreCase(limitStr)) {
continue;
}
try {
int limitVal = Integer.parseInt(limitStr);
if ("CONNECT_TIME".equals(resourceName)) {
connectTimeLimit = limitVal; // 分钟
} else if ("IDLE_TIME".equals(resourceName)) {
idleTimeLimit = limitVal; // 分钟
}
} catch (NumberFormatException ignored) {
}
}
} catch (SQLException e) {
log.warn("查询Oracle超时配置失败: {}", e.getMessage());
}
// 转为秒返回Oracle中CONNECT_TIME单位是分钟
return new int[]{
connectTimeLimit > 0 ? connectTimeLimit * 60 : -1,
idleTimeLimit > 0 ? idleTimeLimit * 60 : -1
};
}
/**
* 查询PostgreSQL数据库超时限制
*/
private int[] queryPgTimeoutLimits(Connection conn) {
int statementTimeout = -1;
int idleTimeout = -1;
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery("SHOW statement_timeout")) {
if (rs.next()) {
statementTimeout = parsePgIntervalToSec(rs.getString(1));
}
}
try (ResultSet rs = stmt.executeQuery("SHOW idle_in_transaction_session_timeout")) {
if (rs.next()) {
idleTimeout = parsePgIntervalToSec(rs.getString(1));
}
}
} catch (SQLException e) {
log.warn("查询PG超时配置失败: {}", e.getMessage());
}
return new int[]{statementTimeout, idleTimeout};
}
/**
* 解析PG时间间隔字符串为秒数 "5min", "30s", "2h"
*/
private int parsePgIntervalToSec(String value) {
if (value == null || "0".equals(value) || "-1".equals(value) || value.startsWith("0")) {
return -1;
}
try {
value = value.trim().toLowerCase();
if (value.endsWith("ms")) {
return (int) (Long.parseLong(value.replace("ms", "")) / 1000);
} else if (value.endsWith("s")) {
return Integer.parseInt(value.replace("s", ""));
} else if (value.endsWith("min")) {
return Integer.parseInt(value.replace("min", "")) * 60;
} else if (value.endsWith("h")) {
return Integer.parseInt(value.replace("h", "")) * 3600;
} else if (value.endsWith("d")) {
return Integer.parseInt(value.replace("d", "")) * 86400;
}
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return -1;
}
}
/**
* 判断探测结果是否过期超过7天
*/
private boolean isProbeExpired(StasDataSource dataSource) {
if (dataSource.getLastProbeTime() == null) {
return true;
}
long daysSinceProbe = (System.currentTimeMillis() - dataSource.getLastProbeTime().getTime())
/ (1000 * 60 * 60 * 24);
return daysSinceProbe >= PROBE_EXPIRE_DAYS;
}
}

View File

@ -2,9 +2,11 @@ package org.jeecg.taskConfig.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.QuartzJobConstant;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.StasDataSource;
@ -16,6 +18,8 @@ import org.jeecg.modules.base.mapper.StasTaskConfigMapper;
import org.jeecg.modules.base.entity.QuartzJob;
import org.jeecg.quartz.service.IQuartzJobService;
import org.jeecg.taskConfig.service.IStasTaskConfigService;
import org.jeecg.taskConfig.service.SyncAdaptiveStrategy;
import org.jeecg.taskConfig.service.TimeoutProbeService;
import org.jeecg.vo.*;
import org.springframework.stereotype.Service;
@ -37,12 +41,15 @@ import java.util.stream.Collectors;
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper, StasTaskConfig> implements IStasTaskConfigService {
private final StasTaskConfigMapper stasTaskConfigMapper;
private final StasDataSourceMapper stasDataSourceMapper;
private final StasSyncStrategyMapper stasSyncStrategyMapper;
private final IQuartzJobService quartzJobService;
private final SyncAdaptiveStrategy syncAdaptiveStrategy;
private final TimeoutProbeService timeoutProbeService;
/**
* 数据表思维导图
@ -162,7 +169,7 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
/**
* 根据字段类型同步数据
* 根据字段类型同步数据增强版自动探测超时 + 自适应批次 + 重试
* @param taskId 任务id
* @throws SQLException 数据库异常
*/
@ -171,36 +178,105 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId);
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
// 设置目标连接为批量提交模式
// 构建源/目标URL和Driver
String sourceUrl;
String sourceDriver;
int sourceType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType())) {
sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.ORACLE_DRIVER;
sourceType = 1;
} else {
sourceUrl = DBUtil.getPgUrlWithKeepAlive(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.POSTGRES_DRIVER;
sourceType = 0;
}
String targetUrl;
String targetDriver;
int targetType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())) {
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.ORACLE_DRIVER;
targetType = 1;
} else {
targetUrl = DBUtil.getPgUrlWithKeepAlive(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.POSTGRES_DRIVER;
targetType = 0;
}
// 获取自动探测的超时值
int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo);
int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo);
try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),
sourceDriver, sourceType, sourceTimeouts[0], sourceTimeouts[1]);
Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),
targetDriver, targetType, targetTimeouts[0], targetTimeouts[1])) {
targetConn.setAutoCommit(false);
List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper.
selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
System.out.println("开始同步表: " + stasSyncStrategy.getTableName() + " (依据字段: " + stasSyncStrategy.getColumnName() + ")");
log.info("开始同步表: {} (依据字段: {})", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName());
long startTime = System.currentTimeMillis();
boolean isDate = isDateColumn(sourceConn, stasSyncStrategy);
try {
if (isDateColumn(sourceConn,stasSyncStrategy)) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncDay());
} else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncCount());
int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay());
int effectiveSyncCount = syncAdaptiveStrategy.getEffectiveSyncCount(stasSyncStrategy, stasTaskConfig.getSyncCount());
boolean syncSuccess = false;
int retryIndex = 0;
while (!syncSuccess) {
try {
if (isDate) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy, effectiveSyncDay);
} else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy, effectiveSyncCount);
}
syncSuccess = true;
syncAdaptiveStrategy.onSyncSuccess(stasSyncStrategy,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
} catch (SQLException e) {
SyncAdaptiveStrategy.ErrorType errorType = syncAdaptiveStrategy.analyzeSQLException(e);
retryIndex++;
log.warn("同步表 {} 时出错(第{}次): {} [错误类型: {}]",
stasSyncStrategy.getTableName(), retryIndex, e.getMessage(), errorType.name());
syncAdaptiveStrategy.checkAndAdjustTimeout(sourceInfo, errorType);
syncAdaptiveStrategy.checkAndAdjustTimeout(targetInfo, errorType);
if ((errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT
|| errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION)
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1];
log.info("表 {} 批次已调整,准备第{}次重试: syncDay={}, syncCount={}",
stasSyncStrategy.getTableName(), retryIndex + 1, effectiveSyncDay, effectiveSyncCount);
try { targetConn.rollback(); } catch (SQLException ignored) {}
if (sourceConn.isClosed() || targetConn.isClosed()) {
log.warn("表 {} 连接已断开,无法重试,跳过该表", stasSyncStrategy.getTableName());
break;
}
continue;
} else {
try { targetConn.rollback(); } catch (SQLException ignored) {}
log.error("表 {} 同步失败,跳过: {}", stasSyncStrategy.getTableName(), e.getMessage());
break;
}
} catch (ParseException e) {
throw new JeecgBootException(e.getMessage());
}
} catch (SQLException e) {
System.err.println("同步表 " + stasSyncStrategy.getTableName() + " 时出错: " + e.getMessage());
targetConn.rollback();
throw new JeecgBootException(e.getMessage());
} catch (ParseException e) {
throw new JeecgBootException(e.getMessage());
}
long endTime = System.currentTimeMillis();
System.out.println("" + stasSyncStrategy.getTableName() + " 同步耗时: " + (endTime - startTime) + " 毫秒");
log.info("表 {} 同步耗时: {} 毫秒{}", stasSyncStrategy.getTableName(),
(endTime - startTime), syncSuccess ? "" : " (失败)");
}
}
}

View File

@ -0,0 +1,40 @@
-- ============================================================
-- 数据同步超时自适应与批次调整 - DDL更新脚本
-- 执行环境: PostgreSQL (目标数据库)
-- ============================================================
-- 1. stas_data_source 表新增超时探测字段
ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS detected_connect_timeout INTEGER;
ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS detected_read_timeout INTEGER;
ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS last_probe_time TIMESTAMP;
-- 2. stas_sync_strategy 表新增自适应批次字段
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS current_sync_day INTEGER;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS current_sync_count INTEGER;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS last_error_type VARCHAR(20);
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS min_sync_day INTEGER DEFAULT 1;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS min_sync_count INTEGER DEFAULT 100;
-- 3. stas_sync_log 表新增错误详情字段
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS error_type VARCHAR(20);
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS batch_size_before INTEGER;
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS batch_size_after INTEGER;
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS retry_index INTEGER;
-- 4. 为新增字段添加注释PostgreSQL
COMMENT ON COLUMN stas_data_source.detected_connect_timeout IS '探测到的连接超时(秒)null表示未探测';
COMMENT ON COLUMN stas_data_source.detected_read_timeout IS '探测到的读取超时(秒)null表示未探测';
COMMENT ON COLUMN stas_data_source.last_probe_time IS '上次超时探测时间超过7天自动重新探测';
COMMENT ON COLUMN stas_sync_strategy.current_sync_day IS '当前生效的批次天数(自适应调整后)null时使用taskConfig的syncDay';
COMMENT ON COLUMN stas_sync_strategy.current_sync_count IS '当前生效的批次数量(自适应调整后)null时使用taskConfig的syncCount';
COMMENT ON COLUMN stas_sync_strategy.last_error_type IS '最近一次错误类型: TIMEOUT/TRUNCATION/OTHER';
COMMENT ON COLUMN stas_sync_strategy.retry_count IS '连续重试次数,同步成功后清零';
COMMENT ON COLUMN stas_sync_strategy.min_sync_day IS '批次天数最小值下限默认1';
COMMENT ON COLUMN stas_sync_strategy.min_sync_count IS '批次数量最小值下限默认100';
COMMENT ON COLUMN stas_sync_log.error_type IS '错误类型: TIMEOUT/TRUNCATION/OTHER/SUCCESS';
COMMENT ON COLUMN stas_sync_log.batch_size_before IS '调整前批次大小';
COMMENT ON COLUMN stas_sync_log.batch_size_after IS '调整后批次大小';
COMMENT ON COLUMN stas_sync_log.retry_index IS '第几次重试';