Revert "对同步服务功能进行性能优化"

This reverts commit b7b3158592.
This commit is contained in:
panbaolin 2026-05-21 16:11:07 +08:00
parent 644fab9430
commit 26ad5cdbb4
6 changed files with 19 additions and 18 deletions

1
.gitignore vendored
View File

@ -14,4 +14,3 @@ os_del_doc.cmd
.svn .svn
derby.log derby.log
*.log *.log
.qoder/

View File

@ -9,7 +9,7 @@
JOIN stas_sync_record r ON l.record_id = r.id JOIN stas_sync_record r ON l.record_id = r.id
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId}) WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND l.error_type IN ('TIMEOUT', 'TRUNCATION') AND l.error_type IN ('TIMEOUT', 'TRUNCATION')
AND l.start_time >= (CURRENT_DATE - (#{days} * INTERVAL '1 day')) AND l.start_time >= (CURRENT_DATE - INTERVAL '#{days}' DAY)
</select> </select>
<!-- 统计指定数据源关联的最近N天成功同步的平均耗时(毫秒) --> <!-- 统计指定数据源关联的最近N天成功同步的平均耗时(毫秒) -->
@ -20,7 +20,7 @@
FROM stas_sync_record r FROM stas_sync_record r
WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId}) WHERE (r.source_id = #{dataSourceId} OR r.target_id = #{dataSourceId})
AND r.end_time IS NOT NULL AND r.end_time IS NOT NULL
AND r.start_time &gt;= (CURRENT_DATE - (#{days} * INTERVAL '1 day')) AND r.start_time &gt;= (CURRENT_DATE - INTERVAL '#{days}' DAY)
</select> </select>
<!-- 统计指定数据源最近连续N条日志中无超时错误的记录数 --> <!-- 统计指定数据源最近连续N条日志中无超时错误的记录数 -->

View File

@ -164,7 +164,7 @@ public class SyncDataJob implements Job {
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) { && syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
// 缩减批次大小并重试 // 缩减批次大小并重试
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate, int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount(), errorType); stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
effectiveSyncDay = adjusted[0]; effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1]; effectiveSyncCount = adjusted[1];

View File

@ -8,7 +8,6 @@ import org.jeecg.modules.base.mapper.StasSyncStrategyMapper;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.SQLTimeoutException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
@ -39,7 +38,7 @@ public class SyncAdaptiveStrategy {
private static final List<String> ORACLE_TIMEOUT_KEYWORDS = Arrays.asList( private static final List<String> ORACLE_TIMEOUT_KEYWORDS = Arrays.asList(
"ORA-02396", "ORA-03113", "ORA-03114", "ORA-04030", "ORA-02396", "ORA-03113", "ORA-03114", "ORA-04030",
"Closed Connection", "Connection timed out", "Read timed out", "Closed Connection", "Connection timed out", "Read timed out",
"ORA-12571", "ORA-03135", "ORA-00054", "ORA-12571", "ORA-03135", "ORA-02396", "ORA-00054",
"No more data to read from socket", "Connection reset" "No more data to read from socket", "Connection reset"
); );
@ -140,8 +139,7 @@ public class SyncAdaptiveStrategy {
* @return 调整后的[batchDay, batchCount] * @return 调整后的[batchDay, batchCount]
*/ */
public int[] adjustBatchSize(StasSyncStrategy strategy, boolean isDateColumn, public int[] adjustBatchSize(StasSyncStrategy strategy, boolean isDateColumn,
int taskConfigSyncDay, int taskConfigSyncCount, int taskConfigSyncDay, int taskConfigSyncCount) {
ErrorType errorType) {
int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY; int minSyncDay = strategy.getMinSyncDay() != null ? strategy.getMinSyncDay() : DEFAULT_MIN_SYNC_DAY;
int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT; int minSyncCount = strategy.getMinSyncCount() != null ? strategy.getMinSyncCount() : DEFAULT_MIN_SYNC_COUNT;
@ -154,7 +152,7 @@ public class SyncAdaptiveStrategy {
strategy.setCurrentSyncDay(newSyncDay); strategy.setCurrentSyncDay(newSyncDay);
strategy.setCurrentSyncCount(newSyncCount); strategy.setCurrentSyncCount(newSyncCount);
strategy.setRetryCount(strategy.getRetryCount() != null ? strategy.getRetryCount() + 1 : 1); strategy.setRetryCount(strategy.getRetryCount() != null ? strategy.getRetryCount() + 1 : 1);
strategy.setLastErrorType(errorType != null ? errorType.name() : ErrorType.OTHER.name()); strategy.setLastErrorType(ErrorType.TIMEOUT.name());
stasSyncStrategyMapper.updateById(strategy); stasSyncStrategyMapper.updateById(strategy);

View File

@ -39,8 +39,8 @@ public class TimeoutProbeService {
private static final int PROBE_EXPIRE_DAYS = 7; private static final int PROBE_EXPIRE_DAYS = 7;
/** 日志分析窗口天数 */ /** 日志分析窗口天数 */
private static final int LOG_ANALYSIS_WINDOW_DAYS = 30; private static final int LOG_ANALYSIS_WINDOW_DAYS = 30;
/** 超时错误次数阈值(最近窗口期内出现此次数及以上则认为偏高) */ /** 超时错误率阈值 */
private static final int TIMEOUT_ERROR_COUNT_THRESHOLD = 3; private static final double TIMEOUT_ERROR_RATE_THRESHOLD = 0.2;
/** 连续无超时日志条数阈值(满足此条件则温和下调) */ /** 连续无超时日志条数阈值(满足此条件则温和下调) */
private static final int CONSECUTIVE_NON_TIMEOUT_THRESHOLD = 5; private static final int CONSECUTIVE_NON_TIMEOUT_THRESHOLD = 5;
@ -173,12 +173,16 @@ public class TimeoutProbeService {
int currentReadTimeout = dataSource.getDetectedReadTimeout() != null int currentReadTimeout = dataSource.getDetectedReadTimeout() != null
? dataSource.getDetectedReadTimeout() : MIN_READ_TIMEOUT_SEC; ? dataSource.getDetectedReadTimeout() : MIN_READ_TIMEOUT_SEC;
if (timeoutErrors >= TIMEOUT_ERROR_COUNT_THRESHOLD && avgSyncTimeMs != null && avgSyncTimeMs > 0) { // 计算超时错误率
// 窗口期内超时次数偏高增大读取超时 int totalErrors = timeoutErrors;
double errorRate = totalErrors > 0 ? (double) timeoutErrors / Math.max(totalErrors, 1) : 0;
if (errorRate > TIMEOUT_ERROR_RATE_THRESHOLD && avgSyncTimeMs != null && avgSyncTimeMs > 0) {
// 超时错误率过高增大读取超时
int newReadTimeout = (int) Math.max(currentReadTimeout * 1.5, int newReadTimeout = (int) Math.max(currentReadTimeout * 1.5,
(avgSyncTimeMs / 1000) * 3); (avgSyncTimeMs / 1000) * 3);
log.info("数据源[{}]最近{}天超时错误{}次偏高,读取超时从{}s调整为{}s", log.info("数据源[{}]超时错误率{}}过高,读取超时从{}s调整为{}s",
dataSource.getInstanceName(), LOG_ANALYSIS_WINDOW_DAYS, timeoutErrors, dataSource.getInstanceName(), String.format("%.2f", errorRate),
currentReadTimeout, newReadTimeout); currentReadTimeout, newReadTimeout);
dataSource.setDetectedReadTimeout(newReadTimeout); dataSource.setDetectedReadTimeout(newReadTimeout);
stasDataSourceMapper.updateById(dataSource); stasDataSourceMapper.updateById(dataSource);
@ -293,7 +297,7 @@ public class TimeoutProbeService {
* 解析PG时间间隔字符串为秒数 "5min", "30s", "2h" * 解析PG时间间隔字符串为秒数 "5min", "30s", "2h"
*/ */
private int parsePgIntervalToSec(String value) { private int parsePgIntervalToSec(String value) {
if (value == null || "0".equals(value.trim()) || "-1".equals(value.trim())) { if (value == null || "0".equals(value) || "-1".equals(value) || value.startsWith("0")) {
return -1; return -1;
} }
try { try {

View File

@ -253,7 +253,7 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
|| errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION) || errorType == SyncAdaptiveStrategy.ErrorType.TRUNCATION)
&& syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) { && syncAdaptiveStrategy.shouldRetry(stasSyncStrategy)) {
int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate, int[] adjusted = syncAdaptiveStrategy.adjustBatchSize(stasSyncStrategy, isDate,
stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount(), errorType); stasTaskConfig.getSyncDay(), stasTaskConfig.getSyncCount());
effectiveSyncDay = adjusted[0]; effectiveSyncDay = adjusted[0];
effectiveSyncCount = adjusted[1]; effectiveSyncCount = adjusted[1];
log.info("表 {} 批次已调整,准备第{}次重试: syncDay={}, syncCount={}", log.info("表 {} 批次已调整,准备第{}次重试: syncDay={}, syncCount={}",