From bfaf27c0542d480cab66ad241b44528f8403b9a3 Mon Sep 17 00:00:00 2001 From: rencheng Date: Wed, 20 May 2026 15:31:10 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0Oracle=20DBlink=E6=83=85?= =?UTF-8?q?=E5=86=B5=E4=B8=8B=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../impl/StasDataSourceServiceImpl.java | 9 ++- .../org/jeecg/taskConfig/job/SyncDataJob.java | 57 ++++++++++++------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java index 4f1f593..b9a4c28 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/service/impl/StasDataSourceServiceImpl.java @@ -100,7 +100,10 @@ public class StasDataSourceServiceImpl extends ServiceImpl queryTableList(String sourceId, String username) { StasDataSource stasDataSource = this.baseMapper.selectById(sourceId); if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){ - String sql = "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?"; + // 如果配置了DBLink,则在元数据视图后追加@dblink以查询远程数据库的所有表 + String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink()) + ? "@" + stasDataSource.getDbLink().trim() : ""; + String sql = "SELECT TABLE_NAME FROM ALL_TABLES" + dbLinkSuffix + " WHERE OWNER = ?"; return queryDatabaseMetadata(stasDataSource, sql, "TABLE_NAME", username); } else { String sql = "SELECT tablename AS table_name FROM pg_tables WHERE schemaname = ?"; @@ -112,7 +115,9 @@ public class StasDataSourceServiceImpl extends ServiceImpl queryColumnList(String sourceId, String username, String tableName) { StasDataSource stasDataSource = this.baseMapper.selectById(sourceId); if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){ - String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = ? AND TABLE_NAME = ?"; + String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink()) + ? "@" + stasDataSource.getDbLink().trim() : ""; + String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS" + dbLinkSuffix + " WHERE OWNER = ? AND TABLE_NAME = ?"; return queryDatabaseMetadata(stasDataSource, sql, "COLUMN_NAME", username, tableName); } else { String sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ?"; diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java index add3580..bde97cb 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java @@ -105,6 +105,14 @@ public class SyncDataJob implements Job { targetType = 0; } + // 计算 Oracle DBLink 后缀(仅当数据源为 Oracle 且配置了 DBLink 时生效),用于 SQL 透明拼接 @ + String sourceDbLinkSuffix = (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType()) + && StringUtils.isNotBlank(sourceInfo.getDbLink())) + ? "@" + sourceInfo.getDbLink().trim() : ""; + String targetDbLinkSuffix = (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType()) + && StringUtils.isNotBlank(targetInfo.getDbLink())) + ? "@" + targetInfo.getDbLink().trim() : ""; + // 获取自动探测的超时值 int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo); int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo); @@ -123,7 +131,7 @@ public class SyncDataJob implements Job { for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null); long startTime = System.currentTimeMillis(); - boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType()); + boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType(), sourceDbLinkSuffix); // 获取自适应批次大小 int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay()); @@ -136,10 +144,12 @@ public class SyncDataJob implements Job { try { if (isDate) { syncByDateRange(sourceConn, targetConn, stasSyncStrategy, - effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); + effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex, + sourceDbLinkSuffix, targetDbLinkSuffix); } else { syncByIdRange(sourceConn, targetConn, stasSyncStrategy, - effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); + effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex, + sourceDbLinkSuffix, targetDbLinkSuffix); } syncSuccess = true; // 同步成功,逐步恢复批次 @@ -205,10 +215,10 @@ public class SyncDataJob implements Job { /** * 判断字段是否为日期类型 */ - public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { + public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException { String sql; if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { - sql = "SELECT data_type FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?"; + sql = "SELECT data_type FROM all_tab_columns" + dbLinkSuffix + " WHERE owner = ? AND table_name = ? AND column_name = ?"; } else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) { sql = "SELECT data_type FROM information_schema.columns " + "WHERE table_schema = ? AND table_name = ? AND column_name = ?"; @@ -235,8 +245,9 @@ public class SyncDataJob implements Job { */ public void syncByDateRange(Connection sourceConn, Connection targetConn, StasSyncStrategy stasSyncStrategy, Integer syncDay, - Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException, ParseException { - DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType); + Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex, + String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException, ParseException { + DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType, sourceDbLinkSuffix); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @@ -254,7 +265,7 @@ public class SyncDataJob implements Job { .append(", 'YYYY-MM-DD HH24:MI:SS') = '") .append(sdf.format(currentEnd)) .append("'"); - deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); + deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString(), targetDbLinkSuffix); } String whereClause; @@ -273,7 +284,7 @@ public class SyncDataJob implements Job { int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), - whereClause, sourceDbType, targetDbType); + whereClause, sourceDbType, targetDbType, sourceDbLinkSuffix, targetDbLinkSuffix); saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), "SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null); @@ -292,8 +303,9 @@ public class SyncDataJob implements Job { */ public void syncByIdRange(Connection sourceConn, Connection targetConn, StasSyncStrategy stasSyncStrategy, Integer syncCount, - Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException { - IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType); + Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex, + String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException { + IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType, sourceDbLinkSuffix); String syncOrigin = stasSyncStrategy.getSyncOrigin(); long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId(); @@ -309,7 +321,7 @@ public class SyncDataJob implements Job { .append(" = '") .append(currentEnd) .append("'"); - deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); + deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString(), targetDbLinkSuffix); } if (currentStart > currentEnd) { @@ -327,7 +339,7 @@ public class SyncDataJob implements Job { int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), - whereClause, sourceDbType, targetDbType); + whereClause, sourceDbType, targetDbType, sourceDbLinkSuffix, targetDbLinkSuffix); saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), "SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null); @@ -344,11 +356,11 @@ public class SyncDataJob implements Job { /** * 根据日期等于或ID等于删除数据 */ - public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException { + public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause, String dbLinkSuffix) throws SQLException { String sql; if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" + - stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; + stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix + " WHERE " + whereClause; } else { sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; @@ -362,12 +374,12 @@ public class SyncDataJob implements Job { /** * 获取表的日期范围 */ - public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { + public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException { String sql; if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + - "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix; } else { sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + @@ -386,12 +398,12 @@ public class SyncDataJob implements Job { /** * 获取表的ID范围 */ - public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { + public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException { String sql; if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + - "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix; } else { sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + @@ -414,13 +426,14 @@ public class SyncDataJob implements Job { public int syncDataBatch(Connection sourceConn, Connection targetConn, String sourceOwner, String targetOwner, String tableName, String whereClause, - Integer sourceDbType, Integer targetDbType) throws SQLException { + Integer sourceDbType, Integer targetDbType, + String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException { int totalRows = 0; String selectSql; if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + - "\" WHERE " + whereClause; + "\"" + sourceDbLinkSuffix + " WHERE " + whereClause; } else { selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName + "\" WHERE " + whereClause; @@ -436,7 +449,7 @@ public class SyncDataJob implements Job { String insertSql; if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) { - insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES ("; + insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\"" + targetDbLinkSuffix + " VALUES ("; } else { insertSql = "INSERT INTO \"" + tableName + "\" VALUES ("; }