对同步服务功能进行性能优化

This commit is contained in:
rencheng 2026-05-09 15:50:31 +08:00
parent ab1dac0426
commit b7b3158592
6 changed files with 18 additions and 19 deletions

3
.gitignore vendored
View File

@ -13,4 +13,5 @@ os_del.cmd
os_del_doc.cmd
.svn
derby.log
*.log
*.log
.qoder/

View File

@ -9,7 +9,7 @@
JOIN stas_sync_record r ON l.record_id = r.id
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND l.error_type IN ('TIMEOUT', 'TRUNCATION')
AND l.start_time >= (CURRENT_DATE - INTERVAL '#{days}' DAY)
AND l.start_time >= (CURRENT_DATE - (#{days} * INTERVAL '1 day'))
</select>
<!-- 统计指定数据源关联的最近N天成功同步的平均耗时(毫秒) -->
@ -20,7 +20,7 @@
FROM stas_sync_record r
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND r.end_time IS NOT NULL
AND r.start_time &gt;= (CURRENT_DATE - INTERVAL '#{days}' DAY)
AND r.start_time &gt;= (CURRENT_DATE - (#{days} * INTERVAL '1 day'))
</select>
<!-- 统计指定数据源最近连续N条日志中无超时错误的记录数 -->

View File

@ -164,7 +164,7 @@ public class SyncDataJob implements Job {
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
// 缩减批次大小并重试
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount(), errorType);
effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1];

View File

@ -8,6 +8,7 @@ import org.jeecg.modules.base.mapper.StasSyncStrategyMapper;
import org.springframework.stereotype.Component;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.Arrays;
import java.util.List;
@ -38,7 +39,7 @@ public class SyncAdaptiveStrategy {
private static final List<String> ORACLE_TIMEOUT_KEYWORDS = Arrays.asList(
"ORA-02396", "ORA-03113", "ORA-03114", "ORA-04030",
"Closed Connection", "Connection timed out", "Read timed out",
"ORA-12571", "ORA-03135", "ORA-02396", "ORA-00054",
"ORA-12571", "ORA-03135", "ORA-00054",
"No more data to read from socket", "Connection reset"
);
@ -139,7 +140,8 @@ public class SyncAdaptiveStrategy {
* @return 调整后的[batchDay, batchCount]
*/
public int[] adjustBatchSize(StasSyncStrategy strategy, boolean isDateColumn,
int taskConfigSyncDay, int taskConfigSyncCount) {
int taskConfigSyncDay, int taskConfigSyncCount,
ErrorType errorType) {
int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY;
int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT;
@ -152,7 +154,7 @@ public class SyncAdaptiveStrategy {
strategy.setCurrentSyncDay(newSyncDay);
strategy.setCurrentSyncCount(newSyncCount);
strategy.setRetryCount(strategy.getRetryCount() != null ? strategy.getRetryCount() + 1 : 1);
strategy.setLastErrorType(ErrorType.TIMEOUT.name());
strategy.setLastErrorType(errorType != null ? errorType.name() : ErrorType.OTHER.name());
stasSyncStrategyMapper.updateById(strategy);

View File

@ -39,8 +39,8 @@ public class TimeoutProbeService {
private static final int PROBE_EXPIRE_DAYS = 7;
/** 日志分析窗口天数 */
private static final int LOG_ANALYSIS_WINDOW_DAYS = 30;
/** 超时错误率阈值 */
private static final double TIMEOUT_ERROR_RATE_THRESHOLD = 0.2;
/** 超时错误次数阈值(最近窗口期内出现此次数及以上则认为偏高) */
private static final int TIMEOUT_ERROR_COUNT_THRESHOLD = 3;
/** 连续无超时日志条数阈值(满足此条件则温和下调) */
private static final int CONSECUTIVE_NON_TIMEOUT_THRESHOLD = 5;
@ -173,16 +173,12 @@ public class TimeoutProbeService {
int currentReadTimeout = dataSource.getDetectedReadTimeout() != null
? dataSource.getDetectedReadTimeout() : MIN_READ_TIMEOUT_SEC;
// 计算超时错误率
int totalErrors = timeoutErrors;
double errorRate = totalErrors > 0 ? (double) timeoutErrors / Math.max(totalErrors, 1) : 0;
if (errorRate > TIMEOUT_ERROR_RATE_THRESHOLD && avgSyncTimeMs != null && avgSyncTimeMs > 0) {
// 超时错误率过高增大读取超时
if (timeoutErrors >= TIMEOUT_ERROR_COUNT_THRESHOLD && avgSyncTimeMs != null && avgSyncTimeMs > 0) {
// 窗口期内超时次数偏高增大读取超时
int newReadTimeout = (int) Math.max(currentReadTimeout * 1.5,
(avgSyncTimeMs / 1000) * 3);
log.info("数据源[{}]超时错误率{}}过高,读取超时从{}s调整为{}s",
dataSource.getInstanceName(), String.format("%.2f", errorRate),
log.info("数据源[{}]最近{}天超时错误{}次偏高,读取超时从{}s调整为{}s",
dataSource.getInstanceName(), LOG_ANALYSIS_WINDOW_DAYS, timeoutErrors,
currentReadTimeout, newReadTimeout);
dataSource.setDetectedReadTimeout(newReadTimeout);
stasDataSourceMapper.updateById(dataSource);
@ -297,7 +293,7 @@ public class TimeoutProbeService {
* 解析PG时间间隔字符串为秒数 "5min", "30s", "2h"
*/
private int parsePgIntervalToSec(String value) {
if (value == null || "0".equals(value) || "-1".equals(value) || value.startsWith("0")) {
if (value == null || "0".equals(value.trim()) || "-1".equals(value.trim())) {
return -1;
}
try {

View File

@ -253,7 +253,7 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
|| errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION)
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount(), errorType);
effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1];
log.info("表 {} 批次已调整,准备第{}次重试: syncDay={}, syncCount={}",