Revert "修改同步服务功能"

This reverts commit ab1dac0426.
This commit is contained in:
panbaolin 2026-05-21 16:11:08 +08:00
parent 26ad5cdbb4
commit 4134a749fb
14 changed files with 106 additions and 1499 deletions

View File

@ -10,15 +10,6 @@ public class DBUtil {
public static final String ORACLE_URL_PREFIX = "jdbc:oracle:thin:@"; public static final String ORACLE_URL_PREFIX = "jdbc:oracle:thin:@";
public static final String POSTGRES_URL_PREFIX = "jdbc:postgresql://"; public static final String POSTGRES_URL_PREFIX = "jdbc:postgresql://";
/** 默认Oracle连接超时毫秒 */
public static final int DEFAULT_ORACLE_CONNECT_TIMEOUT = 300000;
/** 默认Oracle读取超时毫秒 */
public static final int DEFAULT_ORACLE_READ_TIMEOUT = 1200000;
/** 默认PostgreSQL连接超时毫秒 */
public static final int DEFAULT_PG_CONNECT_TIMEOUT = 300000;
/** 默认PostgreSQL套接字超时毫秒 */
public static final int DEFAULT_PG_SOCKET_TIMEOUT = 1200000;
/*public static Connection getConnection(String url, String username, String password, String Driver) throws SQLException{ /*public static Connection getConnection(String url, String username, String password, String Driver) throws SQLException{
try { try {
Class.forName(Driver); Class.forName(Driver);
@ -43,63 +34,18 @@ public class DBUtil {
return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId); return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId);
} }
/** public static Connection getConnection(String url, String username, String password, String Driver,int type) throws SQLException{
* 构建PostgreSQL URL带tcpKeepAlive参数
*/
public static String getPgUrlWithKeepAlive(String ip, Integer port, String serveId){
String baseUrl = String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId);
if (baseUrl.contains("?")) {
return baseUrl + "&tcpKeepAlive=true";
}
return baseUrl + "?tcpKeepAlive=true";
}
/**
* 获取数据库连接兼容旧接口使用默认超时
* @param type 0=PostgreSQL, 1=Oracle
*/
public static Connection getConnection(String url, String username, String password, String Driver, int type) throws SQLException{
if (type == 0) {
return getConnection(url, username, password, Driver, type,
DEFAULT_PG_CONNECT_TIMEOUT, DEFAULT_PG_SOCKET_TIMEOUT);
} else {
return getConnection(url, username, password, Driver, type,
DEFAULT_ORACLE_CONNECT_TIMEOUT, DEFAULT_ORACLE_READ_TIMEOUT);
}
}
/**
* 获取数据库连接自定义超时
* @param url JDBC连接URL
* @param username 用户名
* @param password 密码
* @param driver 驱动类名
* @param type 0=PostgreSQL, 1=Oracle
* @param connectTimeoutMs 连接超时毫秒null则使用默认值
* @param readTimeoutMs 读取超时毫秒null则使用默认值
*/
public static Connection getConnection(String url, String username, String password,
String driver, int type,
Integer connectTimeoutMs, Integer readTimeoutMs) throws SQLException{
try { try {
Class.forName(driver); Class.forName(Driver);
Properties props = new Properties(); Properties props = new Properties();
if(type == 0){
props.put("user", username); props.put("user", username);
props.put("password", password); props.put("password", password);
}else{
if (type == 0) { props.put("user", username);
// PostgreSQL props.put("password", password);
int connTimeout = (connectTimeoutMs != null) ? connectTimeoutMs : DEFAULT_PG_CONNECT_TIMEOUT; props.put("oracle.net.CONNECT_TIMEOUT", "300000");
int readTimeout = (readTimeoutMs != null) ? readTimeoutMs : DEFAULT_PG_SOCKET_TIMEOUT; props.put("oracle.jdbc.ReadTimeout", "1200000");
props.put("loginTimeout", String.valueOf(connTimeout / 1000));
props.put("socketTimeout", String.valueOf(readTimeout / 1000));
props.put("tcpKeepAlive", "true");
} else {
// Oracle
int connTimeout = (connectTimeoutMs != null) ? connectTimeoutMs : DEFAULT_ORACLE_CONNECT_TIMEOUT;
int readTimeout = (readTimeoutMs != null) ? readTimeoutMs : DEFAULT_ORACLE_READ_TIMEOUT;
props.put("oracle.net.CONNECT_TIMEOUT", String.valueOf(connTimeout));
props.put("oracle.jdbc.ReadTimeout", String.valueOf(readTimeout));
} }
return DriverManager.getConnection(url, props); return DriverManager.getConnection(url, props);
} catch(ClassNotFoundException e){ } catch(ClassNotFoundException e){

View File

@ -78,15 +78,7 @@ public class StasDataSource implements Serializable {
/**描述*/ /**描述*/
@Excel(name = "描述", width = 15) @Excel(name = "描述", width = 15)
private String description; private String description;
/**探测到的连接超时(秒)*/
@Excel(name = "连接超时(秒)", width = 15)
private Integer detectedConnectTimeout;
/**探测到的读取超时(秒)*/
@Excel(name = "读取超时(秒)", width = 15)
private Integer detectedReadTimeout;
/**上次探测时间*/
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
private Date lastProbeTime;
} }

View File

@ -40,16 +40,4 @@ public class StasSyncLog implements Serializable {
/**描述*/ /**描述*/
@Excel(name = "描述", width = 15) @Excel(name = "描述", width = 15)
private String description; private String description;
/**错误类型TIMEOUT/TRUNCATION/OTHER/SUCCESS*/
@Excel(name = "错误类型", width = 15)
private String errorType;
/**调整前批次大小*/
@Excel(name = "调整前批次大小", width = 15)
private Integer batchSizeBefore;
/**调整后批次大小*/
@Excel(name = "调整后批次大小", width = 15)
private Integer batchSizeAfter;
/**第几次重试*/
@Excel(name = "重试序号", width = 15)
private Integer retryIndex;
} }

View File

@ -63,22 +63,4 @@ public class StasSyncStrategy implements Serializable {
/**同步位置*/ /**同步位置*/
@Excel(name = "同步位置", width = 15) @Excel(name = "同步位置", width = 15)
private String syncOrigin; private String syncOrigin;
/**当前生效的批次天数自适应调整后null时使用taskConfig的syncDay*/
@Excel(name = "当前批次天数", width = 15)
private Integer currentSyncDay;
/**当前生效的批次数量自适应调整后null时使用taskConfig的syncCount*/
@Excel(name = "当前批次数量", width = 15)
private Integer currentSyncCount;
/**最近一次错误类型TIMEOUT/TRUNCATION/OTHER*/
@Excel(name = "错误类型", width = 15)
private String lastErrorType;
/**连续重试次数(成功后清零)*/
@Excel(name = "重试次数", width = 15)
private Integer retryCount;
/**批次天数最小值下限默认1*/
@Excel(name = "最小批次天数", width = 15)
private Integer minSyncDay;
/**批次数量最小值下限默认100*/
@Excel(name = "最小批次数量", width = 15)
private Integer minSyncCount;
} }

View File

@ -1,6 +1,5 @@
package org.jeecg.modules.base.mapper; package org.jeecg.modules.base.mapper;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.base.entity.StasSyncLog; import org.jeecg.modules.base.entity.StasSyncLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@ -12,18 +11,4 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper;
*/ */
public interface StasSyncLogMapper extends BaseMapper<StasSyncLog> { public interface StasSyncLogMapper extends BaseMapper<StasSyncLog> {
/**
* 统计指定数据源关联的最近N天超时/截断错误次数
*/
int countTimeoutErrorsByDataSource(@Param("dataSourceId") String dataSourceId, @Param("days") int days);
/**
* 统计指定数据源关联的最近N天成功同步的平均耗时毫秒
*/
Long avgSyncTimeByDataSource(@Param("dataSourceId") String dataSourceId, @Param("days") int days);
/**
* 统计指定数据源最近连续N条日志中无超时错误的次数
*/
int countConsecutiveNonTimeoutByDataSource(@Param("dataSourceId") String dataSourceId, @Param("limit") int limit);
} }

View File

@ -2,38 +2,4 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd"> <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.StasSyncLogMapper"> <mapper namespace="org.jeecg.modules.base.mapper.StasSyncLogMapper">
<!-- 统计指定数据源关联的最近N天超时/截断错误次数 -->
<select id="countTimeoutErrorsByDataSource" resultType="int">
SELECT COUNT(1)
FROM stas_sync_log l
JOIN stas_sync_record r ON l.record_id = r.id
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND l.error_type IN ('TIMEOUT', 'TRUNCATION')
AND l.start_time &gt;= (CURRENT_DATE - INTERVAL '#{days}' DAY)
</select>
<!-- 统计指定数据源关联的最近N天成功同步的平均耗时(毫秒) -->
<select id="avgSyncTimeByDataSource" resultType="java.lang.Long">
SELECT CAST(AVG(
EXTRACT(EPOCH FROM (r.end_time - r.start_time)) * 1000
) AS BIGINT)
FROM stas_sync_record r
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND r.end_time IS NOT NULL
AND r.start_time &gt;= (CURRENT_DATE - INTERVAL '#{days}' DAY)
</select>
<!-- 统计指定数据源最近连续N条日志中无超时错误的记录数 -->
<select id="countConsecutiveNonTimeoutByDataSource" resultType="int">
SELECT COUNT(1) FROM (
SELECT l.error_type
FROM stas_sync_log l
JOIN stas_sync_record r ON l.record_id = r.id
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
ORDER BY l.start_time DESC
LIMIT #{limit}
) sub
WHERE sub.error_type IS NULL OR sub.error_type NOT IN ('TIMEOUT', 'TRUNCATION')
</select>
</mapper> </mapper>

View File

@ -1,369 +0,0 @@
================================================================================
jeecg-module-sync 模块 — AI辅助代码编辑上下文文件
================================================================================
一、模块概述
--------------------------------------------------------------------------------
jeecg-module-sync 是源项分析系统STAS中的数据同步模块负责在Oracle和PostgreSQL
数据库之间进行数据同步。支持定时任务调度、增量同步(按日期/ID范围、同步策略管理、
同步日志记录和同步统计等功能。
父工程: jeecg-boot-parent 3.8.1
Java版本: 17
核心依赖: jeecg-boot-base-core提供实体、Mapper、工具类等基础能力
框架: Spring Boot + MyBatis-Plus + Quartz + Swagger/OpenAPI
二、目录结构与文件清单
--------------------------------------------------------------------------------
jeecg-module-sync/
├── pom.xml
└── src/main/java/org/jeecg/
├── OracleSync.java # 独立工具类Oracle-to-Oracle同步main方法入口
├── OracleToPgSync.java # 独立工具类Oracle-to-PostgreSQL建表同步main方法入口
├── dataSource/ # 数据源管理子模块
│ ├── controller/StasDataSourceController.java
│ ├── service/IStasDataSourceService.java
│ ├── service/impl/StasDataSourceServiceImpl.java
│ └── vo/TableColumnVO.java
├── stasSyncStrategy/ # 同步策略管理子模块
│ ├── controller/StasSyncStrategyController.java
│ ├── service/IStasSyncStrategyService.java
│ └── service/impl/StasSyncStrategyServiceImpl.java
├── syncLog/ # 同步日志子模块
│ ├── controller/StasSyncLogController.java
│ ├── service/IStasSyncLogService.java
│ └── service/impl/StasSyncLogServiceImpl.java
├── syncNum/ # 同步数量统计子模块
│ ├── controller/StasSyncNumController.java
│ ├── service/IStasSyncNumService.java
│ ├── service/impl/StasSyncNumServiceImpl.java
│ └── vo/PieChartVO.java
├── syncRecord/ # 同步记录子模块
│ ├── controller/StasSyncRecordController.java
│ ├── service/IStasSyncRecordService.java
│ ├── service/impl/StasSyncRecordServiceImpl.java
│ └── vo/SyncRecordVO.java, TaskHisStatsVO.java, TaskStatsResultVO.java
├── taskConfig/ # 任务配置子模块(核心)
│ ├── controller/StasTaskConfigController.java
│ ├── job/SyncDataJob.java # Quartz Job执行类
│ ├── service/IStasTaskConfigService.java
│ └── service/impl/StasTaskConfigServiceImpl.java
├── quartz/ # Quartz定时任务管理
│ ├── service/IQuartzJobService.java
│ └── service/impl/QuartzJobServiceImpl.java
└── vo/ # 模块内VO
├── DateRangeVO.java
├── IdRangeVO.java
├── MindMapVO.java
├── TableInfoVO.java
└── UserInfoVO.java
三、数据库实体(定义在 jeecg-boot-base-core 中)
--------------------------------------------------------------------------------
本模块的实体类统一定义在 jeecg-boot-base-core 的 org.jeecg.modules.base.entity 包中,
由 jeecg-boot-base-core 的 org.jeecg.modules.base.mapper 提供 MyBatis-Plus Mapper 接口。
1. StasDataSource表 stas_data_source— 数据源配置
字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime,
instanceName(实例名), type(数据库类型: 0=Oracle,1=PostgreSQL),
port, serveId(服务名), dbLink, username, password, remark,
delFlag(逻辑删除), ipAddress, description
2. StasSyncStrategy表 stas_sync_strategy— 同步策略
字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime,
taskId(关联任务ID), sourceOwner(源用户/Schema), targetOwner(目标用户/Schema),
tableName(同步表名), columnName(依据字段名), syncOrigin(同步位置/断点)
3. StasTaskConfig表 stas_task_config— 任务配置
字段: id(ASSIGN_ID), createBy, createTime, updateBy, updateTime,
taskName(任务名称), quartzId(关联Quartz任务ID), cron(Cron表达式),
sourceId(源库ID), targetId(目标库ID), syncCount(批次数量), syncDay(批次天数),
taskStatus(0=未开始,1=进行中), description,
sourceName(@TableField(exist=false)), targetName(@TableField(exist=false))
4. StasSyncLog表 stas_sync_log— 同步日志
字段: id(ASSIGN_ID), recordId(关联记录ID), startTime, description
5. StasSyncNum表 stas_sync_num— 同步数量
字段: id(ASSIGN_ID), recordId(关联记录ID), tableName, syncNum(同步条数)
6. StasSyncRecord表 stas_sync_record— 同步记录
字段: id(ASSIGN_ID), taskId, sourceId, targetId, startTime, endTime
7. QuartzJob表 sys_quartz_job— Quartz定时任务
字段: id(ASSIGN_ID), createBy, createTime, delFlag, updateBy, updateTime,
jobClassName, cronExpression, parameter, description, status(0=正常,-1=停止)
四、核心枚举与常量(定义在 jeecg-boot-base-core 中)
--------------------------------------------------------------------------------
1. SourceDataTypeEnum:
ORACLE(0, "ORACLE"), POSTGRES(1, "POSTGRES")
2. SyncTaskStatusEnum:
NOT_STARTED(0), IN_OPERATION(1)
3. QuartzJobConstant:
JOB_CLASS_NAME = "org.jeecg.taskConfig.job.SyncDataJob"
4. DBUtil 工具类:
- ORACLE_DRIVER = "oracle.jdbc.OracleDriver"
- POSTGRES_DRIVER = "org.postgresql.Driver"
- ORACLE_URL_PREFIX = "jdbc:oracle:thin:@"
- POSTGRES_URL_PREFIX = "jdbc:postgresql://"
- getUrl(ip, port, serveId) → 构建Oracle JDBC URL
- getPgUrl(ip, port, serveId) → 构建PostgreSQL JDBC URL
- getConnection(url, username, password, driver, type) → 获取JDBC连接
type=0: PostgreSQL(无超时设置), type=1: Oracle(设置CONNECT_TIMEOUT=300s, ReadTimeout=1200s)
- close(conn, stmt, rs) → 释放资源
五、Mapper XML定义在 jeecg-boot-base-core 中)
--------------------------------------------------------------------------------
StasSyncRecordMapper.xml — 同步记录统计查询:
- taskHistoryStats: 关联 stas_task_config + stas_sync_record + stas_sync_num查历史同步详情
- taskStatsDay: 按日统计同步数量TO_CHAR(start_time,'YYYY-MM-DD')
- taskStatsMonth: 按月统计同步数量TO_CHAR(start_time,'YYYY-MM-DD')
六、API接口清单
--------------------------------------------------------------------------------
1. 数据源管理 — StasDataSourceController (/dataSource)
GET /list — 分页查询数据源支持instanceName模糊查询
GET /userList — 查询数据源下所有用户Oracle: ALL_USERS; PG: information_schema.schemata
GET /targetUser — 查询目标数据源用户通过taskId关联查targetId
GET /tableList — 查询数据源下指定用户的所有表
GET /fieldList — 查询数据源表字段
GET /testConnection — 数据库连接测试
GET /checkInstanceName — 数据库名称唯一性校验
POST /add — 添加数据源
PUT /edit — 编辑数据源
DELETE /delete — 删除数据源(单个)
DELETE /deleteBatch — 批量删除数据源
GET /queryById — 通过ID查询
GET /queryall — 查询所有数据源
2. 同步策略 — StasSyncStrategyController (/syncStrategy)
GET /list — 分页查询同步策略
POST /createTargetTables — 在目标库创建表结构并保存策略
POST /add — 添加同步策略
PUT/POST /edit — 编辑同步策略
DELETE /delete — 删除同步策略
DELETE /deleteBatch — 批量删除同步策略
GET /queryById — 通过ID查询
GET /exportXls — 导出Excel
POST /importExcel — 导入Excel
3. 同步日志 — StasSyncLogController (/stasSyncLog)
GET /list — 分页查询同步日志按start_time升序
4. 同步数量 — StasSyncNumController (/stasSyncNum)
GET /list — 按表名分组统计同步数量返回PieChartVO饼图数据
5. 同步记录 — StasSyncRecordController (/stasSyncRecord)
GET /list — 分页查询同步记录(关联数据源名称,支持时间区间过滤)
GET /taskHisSta — 历史同步数量统计
GET /taskStatsDay — 按日统计最近一个月同步数量
GET /taskStatsMonth — 按月统计最近一年同步数量
6. 任务配置 — StasTaskConfigController (/taskConfig)
GET /list — 分页查询任务(关联数据源名称)
GET /getMindMap — 数据表思维导图(异步并行查询)
POST /add — 添加任务含创建Quartz Job
PUT/POST /edit — 编辑任务含更新Quartz Job
DELETE /delete — 删除任务含删除Quartz Job和关联策略
GET /pause — 暂停定时任务
GET /resume — 启动定时任务
GET /execute — 立即执行定时任务
GET /syncDataByFieldType — 手动触发同步
DELETE /deleteBatch — 批量删除任务
GET /queryById — 通过ID查询
GET /exportXls — 导出Excel
POST /importExcel — 导入Excel
七、核心业务逻辑详解
--------------------------------------------------------------------------------
7.1 数据同步流程SyncDataJob / StasTaskConfigServiceImpl.syncDataByFieldType
执行链路:
StasTaskConfigController.execute/手动触发
→ StasTaskConfigServiceImpl.syncDataByFieldType(taskId) [Service层版本仅支持Oracle]
→ 或通过 QuartzJobService.execute → SyncDataJob.execute → SyncDataJob.syncDataByFieldType [Job版本支持Oracle+PG]
核心步骤:
1. 根据taskId获取 StasTaskConfig → 获取源/目标 StasDataSource
2. 创建 StasSyncRecord 记录(记录同步开始时间)
3. 构建源/目标JDBC连接
4. 获取该任务下所有 StasSyncStrategy
5. 对每个策略:
a. 判断依据字段类型isDateColumn
b. 如果是日期类型 → syncByDateRange按syncDay天数为批次
c. 如果是ID类型 → syncByIdRange按syncCount条数为批次
6. 更新 StasSyncRecord 结束时间
7.2 按日期范围同步syncByDateRange
- 获取源表日期范围MIN/MAX
- 读取 syncOrigin 作为上次断点
- 从断点开始,每次同步 syncDay 天
- 若结束日期超过最大日期,先删除目标库中最后一天的数据(避免重复)
- Oracle条件: BETWEEN TO_DATE('...', 'YYYY-MM-DD HH24:MI:SS')
- PostgreSQL条件: BETWEEN TIMESTAMP '...'
- 同步完成后更新 syncOrigin 为当前结束日期
7.3 按ID范围同步syncByIdRange
- 获取源表ID范围MIN/MAX
- 读取 syncOrigin 作为上次断点(从断点+1开始
- 每次同步 syncCount 条
- 若结束ID超过最大ID先删除目标库中最后一条的数据
- 同步完成后更新 syncOrigin 为当前结束ID
7.4 批量数据同步syncDataBatch
- 源库 SELECT * FROM "OWNER"."TABLE" WHERE ...
- fetchSize: 5000优化大表读取
- 目标库 INSERT INTO ... VALUES (?, ?, ...)
- batchSize: 5000 提交一次
- 特殊处理: Oracle→PG时TIMESTAMP/DATE类型使用 setTimestamp 而非 setObject
7.5 Oracle类型到PostgreSQL映射mapOracleTypeToPg / convertOracleTypeToPg
VARCHAR2 → VARCHAR(n), NVARCHAR2 → VARCHAR(n)
CHAR → CHAR(n), NCHAR → CHAR(n)
NUMBER(p,s) → NUMERIC(p,s), NUMBER → NUMERIC
FLOAT → DOUBLE PRECISION
DATE → TIMESTAMP, TIMESTAMP → TIMESTAMP
CLOB → TEXT, BLOB → BYTEA, RAW → BYTEA
LONG → TEXT, LONG RAW → BYTEA
默认 → TEXT
7.6 同步策略创建createTargetTables
- 先删除同taskId的旧策略
- 对每个策略:
a. 获取源/目标数据源信息
b. 检查目标表是否已存在
c. 若不存在生成建表DDL支持Oracle和Oracle→PG两种模式
d. 在目标库执行建表
e. 保存策略记录
7.7 Quartz任务管理
- 添加任务: saveAndScheduleJob → 创建QuartzJobjobClassName固定为SyncDataJob
- 编辑任务: 编辑后重新调度
- 暂停: schedulerDelete + 更新状态为STATUS_DISABLE
- 恢复: schedulerDelete + 重新schedulerAdd + 更新状态为STATUS_NORMAL
- 立即执行: SimpleTrigger0.1秒后执行一次
- 删除: schedulerDelete + removeById
八、重要VO类
--------------------------------------------------------------------------------
- DateRangeVO: minDate, maxDate日期范围
- IdRangeVO: minId, maxIdID范围
- MindMapVO: databaseName, List<UserInfoVO>(思维导图数据)
- UserInfoVO: userName, List<TableInfoVO>
- TableInfoVO: tableName, columnName
- TableColumnVO: columnName, typeName, columnSize, decimalDigits, isNullable, columnDef
- PieChartVO: name(表名), value(同步数), percent(百分比)
- SyncRecordVO: id, taskName, sourceName, targetName, startTime, endTime
- TaskHisStatsVO: taskName, syncNum
- TaskStatsResultVO: dateTimes(List<String>), taskStatsInfos(List<TaskStatsInfo>)
- TaskStatsInfo: taskName, taskStats(List<TaskStatsVO>)
- TaskStatsVO(base-core): id, taskName, dateTime, syncNum
九、代码风格与约定
--------------------------------------------------------------------------------
1. Controller层:
- 继承 JeecgController<Entity, Service>
- 使用 @Tag + @Operation 做Swagger文档
- 使用 @AutoLog 记录操作日志
- 返回统一 Result<T> 对象
- 分页查询使用 QueryGenerator.initQueryWrapper + MyBatis-Plus Page
2. Service层:
- 接口继承 IService<Entity>
- 实现类继承 ServiceImpl<Mapper, Entity>
- 使用 @RequiredArgsConstructor 或 @Autowired 注入
- 事务注解 @Transactional(rollbackFor = Exception.class)
3. 实体类:
- 使用 @TableName + @TableId(type = IdType.ASSIGN_ID)
- 日期字段使用 @JsonFormat + @DateTimeFormat
- 非数据库字段标记 @TableField(exist = false)
- 逻辑删除 @TableLogic
4. 异常处理:
- 业务异常统一抛出 JeecgBootException
- 数据库异常 SQLException 传播或包装
5. 数据库区分:
- Oracle: owner/table名使用大写 + 双引号, 条件使用 TO_DATE/TO_CHAR
- PostgreSQL: schema/table名使用小写 + 双引号, 条件使用 TIMESTAMP 字面量
十、已知注意事项与潜在问题
--------------------------------------------------------------------------------
1. StasTaskConfigServiceImpl.syncDataByFieldType 仅支持OracleisDateColumn写死了Oracle SQL
而 SyncDataJob.syncDataByFieldType 支持Oracle+PG通过dbType参数区分
两个类中存在大量重复的同步逻辑代码。
2. StasDataSourceServiceImpl.queryDatabaseMetadata 方法中PostgreSQL数据源连接时
仍使用 ORACLE_DRIVER 而非 POSTGRES_DRIVER可能存在Bug。
3. OracleSync 和 OracleToPgSync 是独立工具类有main方法不属于Spring容器管理
数据库连接信息硬编码,仅用于一次性迁移脚本。
4. SyncDataJob 通过 @Resource 注入Mapper因Quartz Job不由Spring直接管理实例
需要Spring的Quartz集成支持依赖注入。
5. StasTaskConfigServiceImpl.getMindMap 使用 CompletableFuture.supplyAsync 但
使用默认线程池ForkJoinPool在高并发场景可能需要自定义线程池。
6. 同步策略的 createTargetTables 方法在事务中执行DDL操作CREATE TABLE
某些数据库驱动可能不支持DDL回滚。
7. syncByDateRange 中删除目标库数据的逻辑deleteByEquals在边界条件处理上
可能存在数据一致性问题(先删后插期间如有并发访问可能看到空数据)。
十一、模块间依赖关系
--------------------------------------------------------------------------------
jeecg-module-sync 依赖:
└── jeecg-boot-base-core
├── 实体类: StasDataSource, StasSyncStrategy, StasTaskConfig, StasSyncLog,
│ StasSyncNum, StasSyncRecord, QuartzJob
├── Mapper: StasDataSourceMapper, StasSyncStrategyMapper, StasTaskConfigMapper,
│ StasSyncLogMapper, StasSyncNumMapper, StasSyncRecordMapper, QuartzJobMapper
├── VO: TaskStatsVO
├── 枚举: SourceDataTypeEnum, SyncTaskStatusEnum
├── 常量: QuartzJobConstant, CommonConstant
├── 工具: DBUtil, DateUtils
└── 基础: JeecgController, Result, QueryGenerator, JeecgBootException
外部依赖(通过 jeecg-boot-base-core 间接引入):
- MyBatis-Plus (分页、条件构造、CRUD)
- Quartz Scheduler (定时任务)
- Swagger/OpenAPI (API文档)
- Lombok (代码简化)
- Apache Commons Lang (StringUtils)
- Oracle JDBC Driver / PostgreSQL JDBC Driver
十二、SQL数据库表结构参考
--------------------------------------------------------------------------------
核心表(定义在 db/stas.sql 中):
- stas_data_source: 数据源配置表
- stas_sync_strategy: 同步策略表
- stas_task_config: 任务配置表
- stas_sync_log: 同步日志表
- stas_sync_num: 同步数量统计表
- stas_sync_record: 同步记录表
Quartz表定义在 db/stas.sql 或 Quartz默认脚本中:
- sys_quartz_job: 定时任务表
================================================================================
文件生成时间: 2026-05-09
适用于: AI辅助代码编辑上下文参考
================================================================================

View File

@ -22,7 +22,6 @@ import org.jeecg.modules.base.entity.StasSyncStrategy;
import org.jeecg.modules.base.entity.StasTaskConfig; import org.jeecg.modules.base.entity.StasTaskConfig;
import org.jeecg.modules.base.service.BaseCommonService; import org.jeecg.modules.base.service.BaseCommonService;
import org.jeecg.taskConfig.service.IStasTaskConfigService; import org.jeecg.taskConfig.service.IStasTaskConfigService;
import org.jeecg.taskConfig.service.TimeoutProbeService;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import java.util.*; import java.util.*;
@ -45,8 +44,6 @@ public class StasDataSourceController extends JeecgController<StasDataSource, IS
private IStasDataSourceService stasDataSourceService; private IStasDataSourceService stasDataSourceService;
@Autowired @Autowired
private IStasTaskConfigService stasTaskConfigService; private IStasTaskConfigService stasTaskConfigService;
@Autowired
private TimeoutProbeService timeoutProbeService;
/** /**
* 分页列表查询 * 分页列表查询
@ -276,66 +273,4 @@ public class StasDataSourceController extends JeecgController<StasDataSource, IS
return result; return result;
} }
/**
* 手动触发超时探测
* @param id 数据源ID
* @return 探测结果
*/
@AutoLog(value = "数据源-手动超时探测")
@Operation(summary = "数据源-手动超时探测")
@GetMapping(value = "/probeTimeout")
public Result<Map<String, Object>> probeTimeout(@RequestParam(name="id", required=true) String id) {
try {
StasDataSource dataSource = stasDataSourceService.getById(id);
if (dataSource == null) {
return Result.error("未找到对应数据源");
}
timeoutProbeService.probeAndDetectTimeout(dataSource);
// 重新查询获取更新后的值
dataSource = stasDataSourceService.getById(id);
Map<String, Object> probeResult = new HashMap<>();
probeResult.put("id", dataSource.getId());
probeResult.put("instanceName", dataSource.getInstanceName());
probeResult.put("detectedConnectTimeout", dataSource.getDetectedConnectTimeout());
probeResult.put("detectedReadTimeout", dataSource.getDetectedReadTimeout());
probeResult.put("lastProbeTime", dataSource.getLastProbeTime());
return Result.OK("探测完成", probeResult);
} catch (Exception e) {
log.error("超时探测失败", e);
return Result.error("探测失败: " + e.getMessage());
}
}
/**
* 查询数据源当前生效的超时配置
* @param id 数据源ID
* @return 超时配置信息
*/
@AutoLog(value = "数据源-查询超时配置")
@Operation(summary = "数据源-查询超时配置")
@GetMapping(value = "/timeoutInfo")
public Result<Map<String, Object>> timeoutInfo(@RequestParam(name="id", required=true) String id) {
try {
StasDataSource dataSource = stasDataSourceService.getById(id);
if (dataSource == null) {
return Result.error("未找到对应数据源");
}
int[] effectiveTimeout = timeoutProbeService.getEffectiveTimeout(dataSource);
Map<String, Object> info = new HashMap<>();
info.put("id", dataSource.getId());
info.put("instanceName", dataSource.getInstanceName());
info.put("type", dataSource.getType());
info.put("detectedConnectTimeout", dataSource.getDetectedConnectTimeout());
info.put("detectedReadTimeout", dataSource.getDetectedReadTimeout());
info.put("lastProbeTime", dataSource.getLastProbeTime());
info.put("effectiveConnectTimeoutMs", effectiveTimeout[0]);
info.put("effectiveReadTimeoutMs", effectiveTimeout[1]);
info.put("isProbed", dataSource.getDetectedConnectTimeout() != null);
return Result.OK(info);
} catch (Exception e) {
log.error("查询超时配置失败", e);
return Result.error("查询失败: " + e.getMessage());
}
}
} }

View File

@ -143,9 +143,7 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
List<String> resultList = new ArrayList<>(); List<String> resultList = new ArrayList<>();
try (Connection conn = DBUtil.getConnection(urlSource, stasDataSource.getUsername(), try (Connection conn = DBUtil.getConnection(urlSource, stasDataSource.getUsername(),
stasDataSource.getPassword(), stasDataSource.getPassword(), DBUtil.ORACLE_DRIVER, 1);
SourceDataTypeEnum.POSTGRES.getKey().equals(stasDataSource.getType()) ? DBUtil.POSTGRES_DRIVER : DBUtil.ORACLE_DRIVER,
SourceDataTypeEnum.POSTGRES.getKey().equals(stasDataSource.getType()) ? 0 : 1);
PreparedStatement st = conn.prepareStatement(sql)) { PreparedStatement st = conn.prepareStatement(sql)) {
// 设置参数 // 设置参数

View File

@ -9,8 +9,6 @@ import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil; import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.mapper.*; import org.jeecg.modules.base.mapper.*;
import org.jeecg.taskConfig.service.SyncAdaptiveStrategy;
import org.jeecg.taskConfig.service.TimeoutProbeService;
import org.jeecg.vo.DateRangeVO; import org.jeecg.vo.DateRangeVO;
import org.jeecg.vo.IdRangeVO; import org.jeecg.vo.IdRangeVO;
import org.quartz.Job; import org.quartz.Job;
@ -25,7 +23,6 @@ import java.util.List;
/** /**
* 数据同步任务(支持Oracle和PostgreSQL) * 数据同步任务(支持Oracle和PostgreSQL)
* 增强功能超时自动探测批次自适应调整重试机制
*/ */
@Slf4j @Slf4j
public class SyncDataJob implements Job { public class SyncDataJob implements Job {
@ -42,10 +39,6 @@ public class SyncDataJob implements Job {
private StasSyncLogMapper stasSyncLogMapper; private StasSyncLogMapper stasSyncLogMapper;
@Resource @Resource
private StasSyncNumMapper stasSyncNumMapper; private StasSyncNumMapper stasSyncNumMapper;
@Resource
private SyncAdaptiveStrategy syncAdaptiveStrategy;
@Resource
private TimeoutProbeService timeoutProbeService;
private String parameter; private String parameter;
@ -63,7 +56,7 @@ public class SyncDataJob implements Job {
} }
/** /**
* 根据字段类型同步数据增强版自动探测超时 + 自适应批次 + 重试 * 根据字段类型同步数据
*/ */
public void syncDataByFieldType(String taskId) throws SQLException { public void syncDataByFieldType(String taskId) throws SQLException {
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId); StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId);
@ -78,41 +71,16 @@ public class SyncDataJob implements Job {
stasSyncRecordMapper.insert(stasSyncRecord); stasSyncRecordMapper.insert(stasSyncRecord);
String recordId = stasSyncRecord.getId(); String recordId = stasSyncRecord.getId();
// 构建源/目标URL和Driver String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String sourceUrl;
String sourceDriver;
int sourceType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType())) {
sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.ORACLE_DRIVER;
sourceType = 1;
} else {
sourceUrl = DBUtil.getPgUrlWithKeepAlive(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.POSTGRES_DRIVER;
sourceType = 0;
}
String targetUrl; String targetUrl;
String targetDriver; if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
int targetType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())) {
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.ORACLE_DRIVER; }else{
targetType = 1; targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
} else {
targetUrl = DBUtil.getPgUrlWithKeepAlive(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.POSTGRES_DRIVER;
targetType = 0;
} }
// 获取自动探测的超时值 try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo); Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo);
try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),
sourceDriver, sourceType, sourceTimeouts[0], sourceTimeouts[1]);
Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),
targetDriver, targetType, targetTimeouts[0], targetTimeouts[1])) {
// 设置目标连接为批量提交模式 // 设置目标连接为批量提交模式
targetConn.setAutoCommit(false); targetConn.setAutoCommit(false);
@ -121,81 +89,25 @@ public class SyncDataJob implements Job {
.selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId)); .selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null); saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()));
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType());
// 获取自适应批次大小
int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay());
int effectiveSyncCount = syncAdaptiveStrategy.getEffectiveSyncCount(stasSyncStrategy, stasTaskConfig.getSyncCount());
boolean syncSuccess = false;
int retryIndex = 0;
while (!syncSuccess) {
try { try {
if (isDate) { if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy, syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType(), recordId);
} else { } else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy, syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId);
} }
syncSuccess = true;
// 同步成功逐步恢复批次
syncAdaptiveStrategy.onSyncSuccess(stasSyncStrategy,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
} catch (SQLException | ParseException e) { } catch (SQLException | ParseException e) {
SyncAdaptiveStrategy.ErrorType errorType = (e instanceof SQLException) saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage()));
? syncAdaptiveStrategy.analyzeSQLException((SQLException) e) targetConn.rollback();
: SyncAdaptiveStrategy.ErrorType.OTHER; throw new JeecgBootException(e.getMessage());
retryIndex++;
saveSyncLog(recordId, String.format("同步表 %s 时出错(第%d次): %s [错误类型: %s]",
stasSyncStrategy.getTableName(), retryIndex, e.getMessage(), errorType.name()),
errorType.name(), isDate ? effectiveSyncDay : effectiveSyncCount, retryIndex);
// 检查超时配置是否需要调整
syncAdaptiveStrategy.checkAndAdjustTimeout(sourceInfo, errorType);
syncAdaptiveStrategy.checkAndAdjustTimeout(targetInfo, errorType);
if ((errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT
|| errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION)
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
// 缩减批次大小并重试
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1];
saveSyncLog(recordId, String.format("表 %s 批次已调整,准备第%d次重试: syncDay=%d, syncCount=%d",
stasSyncStrategy.getTableName(), retryIndex + 1, effectiveSyncDay, effectiveSyncCount),
errorType.name(), isDate ? effectiveSyncDay : effectiveSyncCount, retryIndex);
// 尝试回滚并重建连接
try { targetConn.rollback(); } catch (SQLException ignored) {}
// 如果连接已关闭需要重建
if (sourceConn.isClosed() || targetConn.isClosed()) {
saveSyncLog(recordId, String.format("表 %s 连接已断开,无法重试,跳过该表", stasSyncStrategy.getTableName()),
errorType.name(), null, retryIndex);
break;
}
continue;
} else {
// 非超时/截断错误或已达到重试上限
try { targetConn.rollback(); } catch (SQLException ignored) {}
saveSyncLog(recordId, String.format("表 %s 同步失败,跳过该表继续同步其他表: %s",
stasSyncStrategy.getTableName(), e.getMessage()),
errorType.name(), null, retryIndex);
break; // 跳过当前表继续下一个表
}
}
} }
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
saveSyncLog(recordId, String.format("表 %s 同步耗时: %s 毫秒%s", stasSyncStrategy.getTableName(), saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime)));
(endTime - startTime), syncSuccess ? "" : " (失败)"), null, null, null);
} }
} }
stasSyncRecord.setEndTime(new Date()); stasSyncRecord.setEndTime(new Date());
@ -224,6 +136,7 @@ public class SyncDataJob implements Job {
ResultSet rs = pstmt.executeQuery(); ResultSet rs = pstmt.executeQuery();
if (rs.next()) { if (rs.next()) {
String dataType = rs.getString("data_type").toUpperCase(); String dataType = rs.getString("data_type").toUpperCase();
// 判断是否为日期/时间类型
return dataType.contains("DATE") || dataType.contains("TIME"); return dataType.contains("DATE") || dataType.contains("TIME");
} }
} }
@ -234,29 +147,32 @@ public class SyncDataJob implements Job {
* 按日期范围同步数据 * 按日期范围同步数据
*/ */
public void syncByDateRange(Connection sourceConn, Connection targetConn, public void syncByDateRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncDay, StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException, ParseException { Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException, ParseException {
// 获取最小和最大日期
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType); DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin(); String syncOrigin = stasSyncStrategy.getSyncOrigin();
Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate(); Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate();
Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ? Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ?
lastSyncedDate : dateRange.getMinDate(); lastSyncedDate : dateRange.getMinDate();
Date currentEnd = addDays(currentStart, syncDay); Date currentEnd = addDays(currentStart, syncCount);
if (currentEnd.after(dateRange.getMaxDate())) { if (currentEnd.after(dateRange.getMaxDate())) {
currentEnd = dateRange.getMaxDate(); currentEnd = dateRange.getMaxDate();
StringBuilder deleteClause = new StringBuilder(); StringBuilder whereClause = new StringBuilder();
deleteClause.append("TO_CHAR(") whereClause.append("TO_CHAR(")
.append(stasSyncStrategy.getColumnName()) .append(stasSyncStrategy.getColumnName())
.append(", 'YYYY-MM-DD HH24:MI:SS') = '") .append(", 'YYYY-MM-DD HH24:MI:SS') = '")
.append(sdf.format(currentEnd)) .append(sdf.format(currentEnd))
.append("'"); .append("'");
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString());
} }
// 根据数据库类型构建不同的日期条件
String whereClause; String whereClause;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) + whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
@ -266,24 +182,20 @@ public class SyncDataJob implements Job {
"' AND TIMESTAMP '" + sdf.format(currentEnd) + "'"; "' AND TIMESTAMP '" + sdf.format(currentEnd) + "'";
} }
int batchSizeBefore = syncDay; saveSyncLog(recordId,String.format("%s表同步日期范围: %s 至 %s", stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd)));
saveSyncLog(recordId, String.format("%s表同步日期范围: %s 至 %s (批次天数: %d)",
stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd), syncDay),
null, batchSizeBefore, retryIndex > 0 ? retryIndex : null);
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType); whereClause, sourceDbType, targetDbType);
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), saveSyncLog(recordId,String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced); saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
// 更新同步位置
if (currentEnd != null) { if (currentEnd != null) {
stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd)); stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy); stasSyncStrategyMapper.updateById(stasSyncStrategy);
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd)), saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd)));
null, null, null);
} }
} }
@ -292,9 +204,11 @@ public class SyncDataJob implements Job {
*/ */
public void syncByIdRange(Connection sourceConn, Connection targetConn, public void syncByIdRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount, StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException { Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException {
// 获取最小和最大ID
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType); IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType);
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin(); String syncOrigin = stasSyncStrategy.getSyncOrigin();
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId(); long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
@ -304,48 +218,45 @@ public class SyncDataJob implements Job {
long currentEnd = currentStart + syncCount - 1; long currentEnd = currentStart + syncCount - 1;
if (currentEnd > idRange.getMaxId()) { if (currentEnd > idRange.getMaxId()) {
currentEnd = idRange.getMaxId(); currentEnd = idRange.getMaxId();
StringBuilder deleteClause = new StringBuilder(); StringBuilder whereClause = new StringBuilder();
deleteClause.append(stasSyncStrategy.getColumnName()) whereClause.append(stasSyncStrategy.getColumnName())
.append(" = '") .append(" = '")
.append(currentEnd) .append(currentEnd)
.append("'"); .append("'");
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString());
} }
// 确保起始位置不超过结束位置
if (currentStart > currentEnd) { if (currentStart > currentEnd) {
saveSyncLog(recordId, String.format("%s表已同步到最新无需继续同步", stasSyncStrategy.getTableName()), saveSyncLog(recordId, String.format("%s表已同步到最新无需继续同步", stasSyncStrategy.getTableName()));
null, null, null);
return; return;
} }
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd; String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
int batchSizeBefore = syncCount; saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s", stasSyncStrategy.getTableName(), currentStart, currentEnd));
saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s (批次数量: %d)",
stasSyncStrategy.getTableName(), currentStart, currentEnd, syncCount),
null, batchSizeBefore, retryIndex > 0 ? retryIndex : null);
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType); whereClause, sourceDbType, targetDbType);
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced); saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
// 更新同步位置
if (currentEnd > 0) { if (currentEnd > 0) {
stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd)); stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy); stasSyncStrategyMapper.updateById(stasSyncStrategy);
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd), saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd));
null, null, null);
} }
} }
/** /**
* 根据日期等于或ID等于删除数据 * 根据日期等于或ID等于删除数据使用yyyy-MM-dd格式比较
*/ */
public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException { public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException {
String sql; String sql;
// 构建完整的SQL语句
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" + sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" +
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
@ -409,7 +320,6 @@ public class SyncDataJob implements Job {
/** /**
* 同步一批数据(支持跨数据库类型) * 同步一批数据(支持跨数据库类型)
* 增强版细粒度异常捕获截断时尝试提交已成功部分
*/ */
public int syncDataBatch(Connection sourceConn, Connection targetConn, public int syncDataBatch(Connection sourceConn, Connection targetConn,
String sourceOwner, String targetOwner, String sourceOwner, String targetOwner,
@ -418,6 +328,7 @@ public class SyncDataJob implements Job {
int totalRows = 0; int totalRows = 0;
String selectSql; String selectSql;
// 构建查询SQL(根据源数据库类型)
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
"\" WHERE " + whereClause; "\" WHERE " + whereClause;
@ -430,10 +341,11 @@ public class SyncDataJob implements Job {
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = sourceStmt.executeQuery(selectSql)) { ResultSet rs = sourceStmt.executeQuery(selectSql)) {
sourceStmt.setFetchSize(5000); sourceStmt.setFetchSize(5000); // 优化大表读取
ResultSetMetaData metaData = rs.getMetaData(); ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount(); int columnCount = metaData.getColumnCount();
// 构建插入SQL(根据目标数据库类型)
String insertSql; String insertSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES ("; insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES (";
@ -447,14 +359,15 @@ public class SyncDataJob implements Job {
} }
insertSql += ")"; insertSql += ")";
// 批量插入
try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) { try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) {
int batchSize = 0; int batchSize = 0;
while (rs.next()) { while (rs.next()) {
try {
for (int i = 1; i <= columnCount; i++) { for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i); Object value = rs.getObject(i);
// 特殊处理Oracle的TIMESTAMP类型到PostgreSQL
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) {
String columnType = metaData.getColumnTypeName(i); String columnType = metaData.getColumnTypeName(i);
if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) { if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) {
@ -476,22 +389,6 @@ public class SyncDataJob implements Job {
targetConn.commit(); targetConn.commit();
batchSize = 0; batchSize = 0;
} }
} catch (SQLException e) {
// 单行处理异常尝试提交已成功部分
SyncAdaptiveStrategy.ErrorType errorType = syncAdaptiveStrategy.analyzeSQLException(e);
if (errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION || errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT) {
// 截断/超时错误先提交已成功的数据
if (batchSize > 0) {
try {
pstmt.executeBatch();
targetConn.commit();
} catch (SQLException commitEx) {
log.warn("提交已成功部分失败: {}", commitEx.getMessage());
}
}
}
throw e; // 向上抛出由外层重试逻辑处理
}
} }
if (batchSize > 0) { if (batchSize > 0) {
pstmt.executeBatch(); pstmt.executeBatch();
@ -503,14 +400,11 @@ public class SyncDataJob implements Job {
} }
private void saveSyncLog(String recordId, String desc, String errorType, Integer batchSize, Integer retryIndex){ private void saveSyncLog(String recordId, String desc){
StasSyncLog stasSyncLog = new StasSyncLog(); StasSyncLog stasSyncLog = new StasSyncLog();
stasSyncLog.setRecordId(recordId); stasSyncLog.setRecordId(recordId);
stasSyncLog.setStartTime(new Date()); stasSyncLog.setStartTime(new Date());
stasSyncLog.setDescription(desc); stasSyncLog.setDescription(desc);
stasSyncLog.setErrorType(errorType);
stasSyncLog.setBatchSizeBefore(batchSize);
stasSyncLog.setRetryIndex(retryIndex);
stasSyncLogMapper.insert(stasSyncLog); stasSyncLogMapper.insert(stasSyncLog);
} }

View File

@ -1,261 +0,0 @@
package org.jeecg.taskConfig.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.modules.base.entity.StasDataSource;
import org.jeecg.modules.base.entity.StasSyncStrategy;
import org.jeecg.modules.base.mapper.StasSyncStrategyMapper;
import org.springframework.stereotype.Component;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
/**
* 同步自适应策略组件
* 负责异常分析批次调整重试决策
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class SyncAdaptiveStrategy {
private final StasSyncStrategyMapper stasSyncStrategyMapper;
private final TimeoutProbeService timeoutProbeService;
/** 最大重试次数 */
private static final int MAX_RETRY_COUNT = 3;
/** 批次缩减比例 */
private static final double BATCH_REDUCTION_RATIO = 0.5;
/** 成功后批次恢复比例 */
private static final double BATCH_RECOVERY_RATIO = 1.2;
/** 默认最小批次天数 */
private static final int DEFAULT_MIN_SYNC_DAY = 1;
/** 默认最小批次数量 */
private static final int DEFAULT_MIN_SYNC_COUNT = 100;
// Oracle 超时/连接中断关键字
private static final List<String> ORACLE_TIMEOUT_KEYWORDS = Arrays.asList(
"ORA-02396", "ORA-03113", "ORA-03114", "ORA-04030",
"Closed Connection", "Connection timed out", "Read timed out",
"ORA-12571", "ORA-03135", "ORA-02396", "ORA-00054",
"No more data to read from socket", "Connection reset"
);
// PostgreSQL 超时/连接中断关键字
private static final List<String> PG_TIMEOUT_KEYWORDS = Arrays.asList(
"FATAL: terminating connection", "connection timed out",
"socket closed", "I/O error", "Connection refused",
"An I/O error occurred", "Software caused connection abort"
);
// 截断/语句超时关键字
private static final List<String> TRUNCATION_KEYWORDS = Arrays.asList(
"ORA-01013", "statement timeout", "canceling statement",
"query_canceled", "ORA-00039", "timeout",
"SQLTimeoutException", "The query was canceled"
);
/**
* 错误类型枚举
*/
public enum ErrorType {
TIMEOUT, TRUNCATION, OTHER
}
/**
* 分析SQLException类型识别超时/截断错误
*/
public ErrorType analyzeSQLException(SQLException e) {
if (e == null) {
return ErrorType.OTHER;
}
String message = e.getMessage();
if (message == null) {
// 检查cause
if (e.getCause() != null && e.getCause().getMessage() != null) {
message = e.getCause().getMessage();
} else {
return ErrorType.OTHER;
}
}
String upperMessage = message.toUpperCase();
// 优先检查截断更具体
for (String keyword : TRUNCATION_KEYWORDS) {
if (upperMessage.contains(keyword.toUpperCase())) {
return ErrorType.TRUNCATION;
}
}
// 检查Oracle超时
for (String keyword : ORACLE_TIMEOUT_KEYWORDS) {
if (upperMessage.contains(keyword.toUpperCase())) {
return ErrorType.TIMEOUT;
}
}
// 检查PG超时
for (String keyword : PG_TIMEOUT_KEYWORDS) {
if (upperMessage.contains(keyword.toUpperCase())) {
return ErrorType.TIMEOUT;
}
}
// 检查SQLState
if (e.getSQLState() != null) {
// 08xxx = 连接异常, 57014 = 语句超时
String state = e.getSQLState();
if (state.startsWith("08")) {
return ErrorType.TIMEOUT;
}
if ("57014".equals(state)) {
return ErrorType.TRUNCATION;
}
}
// 检查是否为SQLTimeoutException
if (e instanceof SQLTimeoutException) {
return ErrorType.TIMEOUT;
}
// 检查BatchUpdateException中的cause
if (e.getCause() instanceof SQLException) {
ErrorType causeType = analyzeSQLException((SQLException) e.getCause());
if (causeType != ErrorType.OTHER) {
return causeType;
}
}
return ErrorType.OTHER;
}
/**
* 根据错误类型自动缩减批次大小
* @param strategy 同步策略
* @param isDateColumn 是否按日期同步
* @param taskConfigSyncDay taskConfig原始批次天数
* @param taskConfigSyncCount taskConfig原始批次数量
* @return 调整后的[batchDay, batchCount]
*/
public int[] adjustBatchSize(StasSyncStrategy strategy, boolean isDateColumn,
int taskConfigSyncDay, int taskConfigSyncCount) {
int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY;
int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT;
int currentSyncDay = strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : taskConfigSyncDay;
int currentSyncCount = strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : taskConfigSyncCount;
int newSyncDay = Math.max((int)(currentSyncDay * BATCH_REDUCTION_RATIO), minSyncDay);
int newSyncCount = Math.max((int)(currentSyncCount * BATCH_REDUCTION_RATIO), minSyncCount);
strategy.setCurrentSyncDay(newSyncDay);
strategy.setCurrentSyncCount(newSyncCount);
strategy.setRetryCount(strategy.getRetryCount() != null ? strategy.getRetryCount() + 1 : 1);
strategy.setLastErrorType(ErrorType.TIMEOUT.name());
stasSyncStrategyMapper.updateById(strategy);
log.info("策略[{}.{}]批次调整: syncDay {} -> {}, syncCount {} -> {}",
strategy.getSourceOwner(), strategy.getTableName(),
currentSyncDay, newSyncDay, currentSyncCount, newSyncCount);
return new int[]{newSyncDay, newSyncCount};
}
/**
* 同步成功后逐步恢复批次大小
* @param strategy 同步策略
* @param taskConfigSyncDay taskConfig原始批次天数
* @param taskConfigSyncCount taskConfig原始批次数量
*/
public void onSyncSuccess(StasSyncStrategy strategy, int taskConfigSyncDay, int taskConfigSyncCount) {
boolean needsUpdate = false;
// 恢复批次天数
if (strategy.getCurrentSyncDay() != null && strategy.getCurrentSyncDay() < taskConfigSyncDay) {
int newDay = Math.min((int)(strategy.getCurrentSyncDay() * BATCH_RECOVERY_RATIO), taskConfigSyncDay);
strategy.setCurrentSyncDay(newDay);
needsUpdate = true;
}
// 恢复批次数量
if (strategy.getCurrentSyncCount() != null && strategy.getCurrentSyncCount() < taskConfigSyncCount) {
int newCount = Math.min((int)(strategy.getCurrentSyncCount() * BATCH_RECOVERY_RATIO), taskConfigSyncCount);
strategy.setCurrentSyncCount(newCount);
needsUpdate = true;
}
// 清零重试计数和错误类型
if (strategy.getRetryCount() != null && strategy.getRetryCount() > 0) {
strategy.setRetryCount(0);
needsUpdate = true;
}
if (strategy.getLastErrorType() != null) {
strategy.setLastErrorType(null);
needsUpdate = true;
}
if (needsUpdate) {
stasSyncStrategyMapper.updateById(strategy);
log.info("策略[{}.{}]同步成功,批次恢复: syncDay={}, syncCount={}",
strategy.getSourceOwner(), strategy.getTableName(),
strategy.getCurrentSyncDay(), strategy.getCurrentSyncCount());
}
// 若已恢复到taskConfig原始值清空自适应字段
if (Integer.valueOf(taskConfigSyncDay).equals(strategy.getCurrentSyncDay())
&& Integer.valueOf(taskConfigSyncCount).equals(strategy.getCurrentSyncCount())) {
strategy.setCurrentSyncDay(null);
strategy.setCurrentSyncCount(null);
stasSyncStrategyMapper.updateById(strategy);
}
}
/**
* 判断是否应该重试
* @param strategy 同步策略
* @return 是否应该重试
*/
public boolean shouldRetry(StasSyncStrategy strategy) {
int retryCount = strategy.getRetryCount() != null ? strategy.getRetryCount() : 0;
if (retryCount >= MAX_RETRY_COUNT) {
return false;
}
// 检查批次是否还能缩减
int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY;
int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT;
int currentSyncDay = strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : Integer.MAX_VALUE;
int currentSyncCount = strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : Integer.MAX_VALUE;
return currentSyncDay > minSyncDay || currentSyncCount > minSyncCount;
}
/**
* 获取当前生效的批次天数
*/
public int getEffectiveSyncDay(StasSyncStrategy strategy, int taskConfigSyncDay) {
return strategy.getCurrentSyncDay() != null ? strategy.getCurrentSyncDay() : taskConfigSyncDay;
}
/**
* 获取当前生效的批次数量
*/
public int getEffectiveSyncCount(StasSyncStrategy strategy, int taskConfigSyncCount) {
return strategy.getCurrentSyncCount() != null ? strategy.getCurrentSyncCount() : taskConfigSyncCount;
}
/**
* 同步失败后检查是否需要调整超时配置
*/
public void checkAndAdjustTimeout(StasDataSource dataSource, ErrorType errorType) {
if (errorType == ErrorType.TIMEOUT || errorType == ErrorType.TRUNCATION) {
try {
timeoutProbeService.adjustTimeoutFromLogs(dataSource);
} catch (Exception e) {
log.warn("调整数据源[{}]超时配置失败: {}", dataSource.getInstanceName(), e.getMessage());
}
}
}
}

View File

@ -1,333 +0,0 @@
package org.jeecg.taskConfig.service;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.StasDataSource;
import org.jeecg.modules.base.mapper.StasDataSourceMapper;
import org.jeecg.modules.base.mapper.StasSyncLogMapper;
import org.springframework.stereotype.Service;
import java.sql.*;
import java.util.Date;
import java.util.Properties;
/**
* 超时自动探测服务
* 采用探针探测 + 历史日志分析结合方案
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class TimeoutProbeService {
private final StasDataSourceMapper stasDataSourceMapper;
private final StasSyncLogMapper stasSyncLogMapper;
/** 探针重复次数 */
private static final int PROBE_REPEAT_COUNT = 3;
/** 最小连接超时(秒) */
private static final int MIN_CONNECT_TIMEOUT_SEC = 30;
/** 最小读取超时(秒) */
private static final int MIN_READ_TIMEOUT_SEC = 120;
/** 连接超时倍率(相对平均延迟) */
private static final int CONNECT_TIMEOUT_MULTIPLIER = 5;
/** 读取超时倍率(相对平均延迟) */
private static final int READ_TIMEOUT_MULTIPLIER = 20;
/** 探测结果过期天数 */
private static final int PROBE_EXPIRE_DAYS = 7;
/** 日志分析窗口天数 */
private static final int LOG_ANALYSIS_WINDOW_DAYS = 30;
/** 超时错误率阈值 */
private static final double TIMEOUT_ERROR_RATE_THRESHOLD = 0.2;
/** 连续无超时日志条数阈值(满足此条件则温和下调) */
private static final int CONSECUTIVE_NON_TIMEOUT_THRESHOLD = 5;
/**
* 获取生效超时值供DBUtil.getConnection使用
* 若未探测过或已过期自动触发探测
* @return [connectTimeoutMs, readTimeoutMs]
*/
public int[] getEffectiveTimeout(StasDataSource dataSource) {
// 检查是否需要重新探测
if (dataSource.getDetectedConnectTimeout() == null || isProbeExpired(dataSource)) {
probeAndDetectTimeout(dataSource);
}
// 基于日志动态微调
adjustTimeoutFromLogs(dataSource);
int connectTimeoutMs = dataSource.getDetectedConnectTimeout() != null
? dataSource.getDetectedConnectTimeout() * 1000
: DBUtil.DEFAULT_ORACLE_CONNECT_TIMEOUT;
int readTimeoutMs = dataSource.getDetectedReadTimeout() != null
? dataSource.getDetectedReadTimeout() * 1000
: DBUtil.DEFAULT_ORACLE_READ_TIMEOUT;
return new int[]{connectTimeoutMs, readTimeoutMs};
}
/**
* 首次连接自动探测
*/
public void probeAndDetectTimeout(StasDataSource dataSource) {
boolean isOracle = SourceDataTypeEnum.ORACLE.getKey().equals(dataSource.getType());
boolean isPg = SourceDataTypeEnum.POSTGRES.getKey().equals(dataSource.getType());
String url;
String driver;
String probeSql;
if (isOracle) {
url = DBUtil.getUrl(dataSource.getIpAddress(), dataSource.getPort(), dataSource.getServeId());
driver = DBUtil.ORACLE_DRIVER;
probeSql = "SELECT 1 FROM DUAL";
} else if (isPg) {
url = DBUtil.getPgUrlWithKeepAlive(dataSource.getIpAddress(), dataSource.getPort(), dataSource.getServeId());
driver = DBUtil.POSTGRES_DRIVER;
probeSql = "SELECT 1";
} else {
log.warn("不支持的数据源类型: {}, 跳过超时探测", dataSource.getType());
return;
}
long totalLatency = 0;
int successCount = 0;
for (int i = 0; i < PROBE_REPEAT_COUNT; i++) {
try {
long start = System.currentTimeMillis();
Class.forName(driver);
Properties props = new Properties();
props.put("user", dataSource.getUsername());
props.put("password", dataSource.getPassword());
// 探针自身使用短超时
if (isOracle) {
props.put("oracle.net.CONNECT_TIMEOUT", "10000");
props.put("oracle.jdbc.ReadTimeout", "30000");
} else {
props.put("loginTimeout", "10");
props.put("socketTimeout", "30");
}
try (Connection conn = DriverManager.getConnection(url, props);
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(probeSql)) {
if (rs.next()) {
long elapsed = System.currentTimeMillis() - start;
totalLatency += elapsed;
successCount++;
log.info("数据源[{}]探针第{}次响应: {}ms", dataSource.getInstanceName(), i + 1, elapsed);
}
}
} catch (Exception e) {
log.warn("数据源[{}]探针第{}次失败: {}", dataSource.getInstanceName(), i + 1, e.getMessage());
}
}
if (successCount == 0) {
log.error("数据源[{}]探针全部失败,使用默认超时", dataSource.getInstanceName());
return;
}
long avgLatencyMs = totalLatency / successCount;
// 转为秒计算
long avgLatencySec = Math.max(avgLatencyMs / 1000, 1);
int connectTimeoutSec = Math.max((int)(avgLatencySec * CONNECT_TIMEOUT_MULTIPLIER), MIN_CONNECT_TIMEOUT_SEC);
int readTimeoutSec = Math.max((int)(avgLatencySec * READ_TIMEOUT_MULTIPLIER), MIN_READ_TIMEOUT_SEC);
// 查询数据库自身超时配置作为上限参考
int[] dbLimits = queryDatabaseTimeoutLimits(dataSource, url, driver, isOracle);
if (dbLimits != null) {
int dbConnectLimit = dbLimits[0];
int dbReadLimit = dbLimits[1];
if (dbConnectLimit > 0 && connectTimeoutSec > dbConnectLimit) {
connectTimeoutSec = (int)(dbConnectLimit * 0.8);
}
if (dbReadLimit > 0 && readTimeoutSec > dbReadLimit) {
readTimeoutSec = (int)(dbReadLimit * 0.8);
}
}
log.info("数据源[{}]探测结果: avgLatency={}ms, connectTimeout={}s, readTimeout={}s",
dataSource.getInstanceName(), avgLatencyMs, connectTimeoutSec, readTimeoutSec);
dataSource.setDetectedConnectTimeout(connectTimeoutSec);
dataSource.setDetectedReadTimeout(readTimeoutSec);
dataSource.setLastProbeTime(new Date());
stasDataSourceMapper.updateById(dataSource);
}
/**
* 基于历史日志动态调整超时
*/
public void adjustTimeoutFromLogs(StasDataSource dataSource) {
try {
int timeoutErrors = stasSyncLogMapper.countTimeoutErrorsByDataSource(
dataSource.getId(), LOG_ANALYSIS_WINDOW_DAYS);
Long avgSyncTimeMs = stasSyncLogMapper.avgSyncTimeByDataSource(
dataSource.getId(), LOG_ANALYSIS_WINDOW_DAYS);
int consecutiveNonTimeout = stasSyncLogMapper.countConsecutiveNonTimeoutByDataSource(
dataSource.getId(), CONSECUTIVE_NON_TIMEOUT_THRESHOLD);
int currentReadTimeout = dataSource.getDetectedReadTimeout() != null
? dataSource.getDetectedReadTimeout() : MIN_READ_TIMEOUT_SEC;
// 计算超时错误率
int totalErrors = timeoutErrors;
double errorRate = totalErrors > 0 ? (double) timeoutErrors / Math.max(totalErrors, 1) : 0;
if (errorRate > TIMEOUT_ERROR_RATE_THRESHOLD && avgSyncTimeMs != null && avgSyncTimeMs > 0) {
// 超时错误率过高增大读取超时
int newReadTimeout = (int) Math.max(currentReadTimeout * 1.5,
(avgSyncTimeMs / 1000) * 3);
log.info("数据源[{}]超时错误率{}}过高,读取超时从{}s调整为{}s",
dataSource.getInstanceName(), String.format("%.2f", errorRate),
currentReadTimeout, newReadTimeout);
dataSource.setDetectedReadTimeout(newReadTimeout);
stasDataSourceMapper.updateById(dataSource);
} else if (consecutiveNonTimeout >= CONSECUTIVE_NON_TIMEOUT_THRESHOLD
&& avgSyncTimeMs != null && avgSyncTimeMs > 0) {
// 连续无超时错误温和下调
int newReadTimeout = (int) Math.max(currentReadTimeout * 0.8,
Math.max((avgSyncTimeMs / 1000) * 2, MIN_READ_TIMEOUT_SEC));
if (newReadTimeout < currentReadTimeout) {
log.info("数据源[{}]连续无超时错误,读取超时从{}s调整为{}s",
dataSource.getInstanceName(), currentReadTimeout, newReadTimeout);
dataSource.setDetectedReadTimeout(newReadTimeout);
stasDataSourceMapper.updateById(dataSource);
}
}
} catch (Exception e) {
log.warn("数据源[{}]日志分析调整超时失败: {}", dataSource.getInstanceName(), e.getMessage());
}
}
/**
* 查询数据库自身超时配置
* @return [connectTimeLimitSec, idleTimeLimitSec] null
*/
private int[] queryDatabaseTimeoutLimits(StasDataSource dataSource, String url, String driver, boolean isOracle) {
try {
Class.forName(driver);
Properties props = new Properties();
props.put("user", dataSource.getUsername());
props.put("password", dataSource.getPassword());
if (isOracle) {
props.put("oracle.net.CONNECT_TIMEOUT", "10000");
props.put("oracle.jdbc.ReadTimeout", "30000");
} else {
props.put("loginTimeout", "10");
props.put("socketTimeout", "30");
}
try (Connection conn = DriverManager.getConnection(url, props)) {
if (isOracle) {
return queryOracleTimeoutLimits(conn);
} else {
return queryPgTimeoutLimits(conn);
}
}
} catch (Exception e) {
log.warn("查询数据源[{}]超时配置失败: {}", dataSource.getInstanceName(), e.getMessage());
return null;
}
}
/**
* 查询Oracle数据库超时限制
*/
private int[] queryOracleTimeoutLimits(Connection conn) {
int connectTimeLimit = -1;
int idleTimeLimit = -1;
String sql = "SELECT resource_name, limit FROM dba_profiles " +
"WHERE profile = 'DEFAULT' AND resource_name IN ('CONNECT_TIME', 'IDLE_TIME')";
try (PreparedStatement pstmt = conn.prepareStatement(sql);
ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
String resourceName = rs.getString("resource_name");
String limitStr = rs.getString("limit");
if ("DEFAULT".equalsIgnoreCase(limitStr) || "UNLIMITED".equalsIgnoreCase(limitStr)) {
continue;
}
try {
int limitVal = Integer.parseInt(limitStr);
if ("CONNECT_TIME".equals(resourceName)) {
connectTimeLimit = limitVal; // 分钟
} else if ("IDLE_TIME".equals(resourceName)) {
idleTimeLimit = limitVal; // 分钟
}
} catch (NumberFormatException ignored) {
}
}
} catch (SQLException e) {
log.warn("查询Oracle超时配置失败: {}", e.getMessage());
}
// 转为秒返回Oracle中CONNECT_TIME单位是分钟
return new int[]{
connectTimeLimit > 0 ? connectTimeLimit * 60 : -1,
idleTimeLimit > 0 ? idleTimeLimit * 60 : -1
};
}
/**
* 查询PostgreSQL数据库超时限制
*/
private int[] queryPgTimeoutLimits(Connection conn) {
int statementTimeout = -1;
int idleTimeout = -1;
try (Statement stmt = conn.createStatement()) {
try (ResultSet rs = stmt.executeQuery("SHOW statement_timeout")) {
if (rs.next()) {
statementTimeout = parsePgIntervalToSec(rs.getString(1));
}
}
try (ResultSet rs = stmt.executeQuery("SHOW idle_in_transaction_session_timeout")) {
if (rs.next()) {
idleTimeout = parsePgIntervalToSec(rs.getString(1));
}
}
} catch (SQLException e) {
log.warn("查询PG超时配置失败: {}", e.getMessage());
}
return new int[]{statementTimeout, idleTimeout};
}
/**
* 解析PG时间间隔字符串为秒数 "5min", "30s", "2h"
*/
private int parsePgIntervalToSec(String value) {
if (value == null || "0".equals(value) || "-1".equals(value) || value.startsWith("0")) {
return -1;
}
try {
value = value.trim().toLowerCase();
if (value.endsWith("ms")) {
return (int) (Long.parseLong(value.replace("ms", "")) / 1000);
} else if (value.endsWith("s")) {
return Integer.parseInt(value.replace("s", ""));
} else if (value.endsWith("min")) {
return Integer.parseInt(value.replace("min", "")) * 60;
} else if (value.endsWith("h")) {
return Integer.parseInt(value.replace("h", "")) * 3600;
} else if (value.endsWith("d")) {
return Integer.parseInt(value.replace("d", "")) * 86400;
}
return Integer.parseInt(value);
} catch (NumberFormatException e) {
return -1;
}
}
/**
* 判断探测结果是否过期超过7天
*/
private boolean isProbeExpired(StasDataSource dataSource) {
if (dataSource.getLastProbeTime() == null) {
return true;
}
long daysSinceProbe = (System.currentTimeMillis() - dataSource.getLastProbeTime().getTime())
/ (1000 * 60 * 60 * 24);
return daysSinceProbe >= PROBE_EXPIRE_DAYS;
}
}

View File

@ -2,11 +2,9 @@ package org.jeecg.taskConfig.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.QuartzJobConstant; import org.jeecg.common.constant.QuartzJobConstant;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil; import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.StasDataSource; import org.jeecg.modules.base.entity.StasDataSource;
@ -18,8 +16,6 @@ import org.jeecg.modules.base.mapper.StasTaskConfigMapper;
import org.jeecg.modules.base.entity.QuartzJob; import org.jeecg.modules.base.entity.QuartzJob;
import org.jeecg.quartz.service.IQuartzJobService; import org.jeecg.quartz.service.IQuartzJobService;
import org.jeecg.taskConfig.service.IStasTaskConfigService; import org.jeecg.taskConfig.service.IStasTaskConfigService;
import org.jeecg.taskConfig.service.SyncAdaptiveStrategy;
import org.jeecg.taskConfig.service.TimeoutProbeService;
import org.jeecg.vo.*; import org.jeecg.vo.*;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -41,15 +37,12 @@ import java.util.stream.Collectors;
*/ */
@Service @Service
@RequiredArgsConstructor @RequiredArgsConstructor
@Slf4j
public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper, StasTaskConfig> implements IStasTaskConfigService { public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper, StasTaskConfig> implements IStasTaskConfigService {
private final StasTaskConfigMapper stasTaskConfigMapper; private final StasTaskConfigMapper stasTaskConfigMapper;
private final StasDataSourceMapper stasDataSourceMapper; private final StasDataSourceMapper stasDataSourceMapper;
private final StasSyncStrategyMapper stasSyncStrategyMapper; private final StasSyncStrategyMapper stasSyncStrategyMapper;
private final IQuartzJobService quartzJobService; private final IQuartzJobService quartzJobService;
private final SyncAdaptiveStrategy syncAdaptiveStrategy;
private final TimeoutProbeService timeoutProbeService;
/** /**
* 数据表思维导图 * 数据表思维导图
@ -169,7 +162,7 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
/** /**
* 根据字段类型同步数据增强版自动探测超时 + 自适应批次 + 重试 * 根据字段类型同步数据
* @param taskId 任务id * @param taskId 任务id
* @throws SQLException 数据库异常 * @throws SQLException 数据库异常
*/ */
@ -178,105 +171,36 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId); StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId);
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId()); StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId()); StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
// 构建源/目标URL和Driver // 设置目标连接为批量提交模式
String sourceUrl;
String sourceDriver;
int sourceType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType())) {
sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.ORACLE_DRIVER;
sourceType = 1;
} else {
sourceUrl = DBUtil.getPgUrlWithKeepAlive(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
sourceDriver = DBUtil.POSTGRES_DRIVER;
sourceType = 0;
}
String targetUrl;
String targetDriver;
int targetType;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())) {
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.ORACLE_DRIVER;
targetType = 1;
} else {
targetUrl = DBUtil.getPgUrlWithKeepAlive(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
targetDriver = DBUtil.POSTGRES_DRIVER;
targetType = 0;
}
// 获取自动探测的超时值
int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo);
int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo);
try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),
sourceDriver, sourceType, sourceTimeouts[0], sourceTimeouts[1]);
Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),
targetDriver, targetType, targetTimeouts[0], targetTimeouts[1])) {
targetConn.setAutoCommit(false); targetConn.setAutoCommit(false);
List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper. List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper.
selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId)); selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
log.info("开始同步表: {} (依据字段: {})", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()); System.out.println("开始同步表: " + stasSyncStrategy.getTableName() + " (依据字段: " + stasSyncStrategy.getColumnName() + ")");
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
boolean isDate = isDateColumn(sourceConn, stasSyncStrategy);
int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay());
int effectiveSyncCount = syncAdaptiveStrategy.getEffectiveSyncCount(stasSyncStrategy, stasTaskConfig.getSyncCount());
boolean syncSuccess = false;
int retryIndex = 0;
while (!syncSuccess) {
try { try {
if (isDate) { if (isDateColumn(sourceConn,stasSyncStrategy)) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy, effectiveSyncDay); syncByDateRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncDay());
} else { } else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy, effectiveSyncCount); syncByIdRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncCount());
} }
syncSuccess = true;
syncAdaptiveStrategy.onSyncSuccess(stasSyncStrategy,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
} catch (SQLException e) { } catch (SQLException e) {
SyncAdaptiveStrategy.ErrorType errorType = syncAdaptiveStrategy.analyzeSQLException(e); System.err.println("同步表 " + stasSyncStrategy.getTableName() + " 时出错: " + e.getMessage());
retryIndex++; targetConn.rollback();
log.warn("同步表 {} 时出错(第{}次): {} [错误类型: {}]", throw new JeecgBootException(e.getMessage());
stasSyncStrategy.getTableName(), retryIndex, e.getMessage(), errorType.name());
syncAdaptiveStrategy.checkAndAdjustTimeout(sourceInfo, errorType);
syncAdaptiveStrategy.checkAndAdjustTimeout(targetInfo, errorType);
if ((errorType == SyncAdaptiveStrategy.ErrorType.TIMEOUT
|| errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION)
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1];
log.info("表 {} 批次已调整,准备第{}次重试: syncDay={}, syncCount={}",
stasSyncStrategy.getTableName(), retryIndex + 1, effectiveSyncDay, effectiveSyncCount);
try { targetConn.rollback(); } catch (SQLException ignored) {}
if (sourceConn.isClosed() || targetConn.isClosed()) {
log.warn("表 {} 连接已断开,无法重试,跳过该表", stasSyncStrategy.getTableName());
break;
}
continue;
} else {
try { targetConn.rollback(); } catch (SQLException ignored) {}
log.error("表 {} 同步失败,跳过: {}", stasSyncStrategy.getTableName(), e.getMessage());
break;
}
} catch (ParseException e) { } catch (ParseException e) {
throw new JeecgBootException(e.getMessage()); throw new JeecgBootException(e.getMessage());
} }
}
long endTime = System.currentTimeMillis(); long endTime = System.currentTimeMillis();
log.info("表 {} 同步耗时: {} 毫秒{}", stasSyncStrategy.getTableName(), System.out.println("" + stasSyncStrategy.getTableName() + " 同步耗时: " + (endTime - startTime) + " 毫秒");
(endTime - startTime), syncSuccess ? "" : " (失败)");
} }
} }
} }

View File

@ -1,40 +0,0 @@
-- ============================================================
-- 数据同步超时自适应与批次调整 - DDL更新脚本
-- 执行环境: PostgreSQL (目标数据库)
-- ============================================================
-- 1. stas_data_source 表新增超时探测字段
ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS detected_connect_timeout INTEGER;
ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS detected_read_timeout INTEGER;
ALTER TABLE stas_data_source ADD COLUMN IF NOT EXISTS last_probe_time TIMESTAMP;
-- 2. stas_sync_strategy 表新增自适应批次字段
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS current_sync_day INTEGER;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS current_sync_count INTEGER;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS last_error_type VARCHAR(20);
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS retry_count INTEGER DEFAULT 0;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS min_sync_day INTEGER DEFAULT 1;
ALTER TABLE stas_sync_strategy ADD COLUMN IF NOT EXISTS min_sync_count INTEGER DEFAULT 100;
-- 3. stas_sync_log 表新增错误详情字段
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS error_type VARCHAR(20);
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS batch_size_before INTEGER;
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS batch_size_after INTEGER;
ALTER TABLE stas_sync_log ADD COLUMN IF NOT EXISTS retry_index INTEGER;
-- 4. 为新增字段添加注释PostgreSQL
COMMENT ON COLUMN stas_data_source.detected_connect_timeout IS '探测到的连接超时(秒)null表示未探测';
COMMENT ON COLUMN stas_data_source.detected_read_timeout IS '探测到的读取超时(秒)null表示未探测';
COMMENT ON COLUMN stas_data_source.last_probe_time IS '上次超时探测时间超过7天自动重新探测';
COMMENT ON COLUMN stas_sync_strategy.current_sync_day IS '当前生效的批次天数(自适应调整后)null时使用taskConfig的syncDay';
COMMENT ON COLUMN stas_sync_strategy.current_sync_count IS '当前生效的批次数量(自适应调整后)null时使用taskConfig的syncCount';
COMMENT ON COLUMN stas_sync_strategy.last_error_type IS '最近一次错误类型: TIMEOUT/TRUNCATION/OTHER';
COMMENT ON COLUMN stas_sync_strategy.retry_count IS '连续重试次数,同步成功后清零';
COMMENT ON COLUMN stas_sync_strategy.min_sync_day IS '批次天数最小值下限默认1';
COMMENT ON COLUMN stas_sync_strategy.min_sync_count IS '批次数量最小值下限默认100';
COMMENT ON COLUMN stas_sync_log.error_type IS '错误类型: TIMEOUT/TRUNCATION/OTHER/SUCCESS';
COMMENT ON COLUMN stas_sync_log.batch_size_before IS '调整前批次大小';
COMMENT ON COLUMN stas_sync_log.batch_size_after IS '调整后批次大小';
COMMENT ON COLUMN stas_sync_log.retry_index IS '第几次重试';