添加Oracle DBlink情况下数据同步

This commit is contained in:
rencheng 2026-05-20 15:31:10 +08:00
parent b7b3158592
commit bfaf27c054
2 changed files with 42 additions and 24 deletions

View File

@ -100,7 +100,10 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
public List<String> queryTableList(String sourceId, String username) { public List<String> queryTableList(String sourceId, String username) {
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId); StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){ if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
String sql = "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?"; // 如果配置了DBLink则在元数据视图后追加@dblink以查询远程数据库的所有表
String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink())
? "@" + stasDataSource.getDbLink().trim() : "";
String sql = "SELECT TABLE_NAME FROM ALL_TABLES" + dbLinkSuffix + " WHERE OWNER = ?";
return queryDatabaseMetadata(stasDataSource, sql, "TABLE_NAME", username); return queryDatabaseMetadata(stasDataSource, sql, "TABLE_NAME", username);
} else { } else {
String sql = "SELECT tablename AS table_name FROM pg_tables WHERE schemaname = ?"; String sql = "SELECT tablename AS table_name FROM pg_tables WHERE schemaname = ?";
@ -112,7 +115,9 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
public List<String> queryColumnList(String sourceId, String username, String tableName) { public List<String> queryColumnList(String sourceId, String username, String tableName) {
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId); StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){ if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = ? AND TABLE_NAME = ?"; String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink())
? "@" + stasDataSource.getDbLink().trim() : "";
String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS" + dbLinkSuffix + " WHERE OWNER = ? AND TABLE_NAME = ?";
return queryDatabaseMetadata(stasDataSource, sql, "COLUMN_NAME", username, tableName); return queryDatabaseMetadata(stasDataSource, sql, "COLUMN_NAME", username, tableName);
} else { } else {
String sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ?"; String sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";

View File

@ -105,6 +105,14 @@ public class SyncDataJob implements Job {
targetType = 0; targetType = 0;
} }
// 计算 Oracle DBLink 后缀仅当数据源为 Oracle 且配置了 DBLink 时生效用于 SQL 透明拼接 @<dblink>
String sourceDbLinkSuffix = (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType())
&& StringUtils.isNotBlank(sourceInfo.getDbLink()))
? "@" + sourceInfo.getDbLink().trim() : "";
String targetDbLinkSuffix = (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())
&& StringUtils.isNotBlank(targetInfo.getDbLink()))
? "@" + targetInfo.getDbLink().trim() : "";
// 获取自动探测的超时值 // 获取自动探测的超时值
int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo); int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo);
int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo); int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo);
@ -123,7 +131,7 @@ public class SyncDataJob implements Job {
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null); saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null);
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType()); boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType(), sourceDbLinkSuffix);
// 获取自适应批次大小 // 获取自适应批次大小
int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay()); int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay());
@ -136,10 +144,12 @@ public class SyncDataJob implements Job {
try { try {
if (isDate) { if (isDate) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy, syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex,
sourceDbLinkSuffix, targetDbLinkSuffix);
} else { } else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy, syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex); effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex,
sourceDbLinkSuffix, targetDbLinkSuffix);
} }
syncSuccess = true; syncSuccess = true;
// 同步成功逐步恢复批次 // 同步成功逐步恢复批次
@ -205,10 +215,10 @@ public class SyncDataJob implements Job {
/** /**
* 判断字段是否为日期类型 * 判断字段是否为日期类型
*/ */
public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException {
String sql; String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT data_type FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?"; sql = "SELECT data_type FROM all_tab_columns" + dbLinkSuffix + " WHERE owner = ? AND table_name = ? AND column_name = ?";
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) { } else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
sql = "SELECT data_type FROM information_schema.columns " + sql = "SELECT data_type FROM information_schema.columns " +
"WHERE table_schema = ? AND table_name = ? AND column_name = ?"; "WHERE table_schema = ? AND table_name = ? AND column_name = ?";
@ -235,8 +245,9 @@ public class SyncDataJob implements Job {
*/ */
public void syncByDateRange(Connection sourceConn, Connection targetConn, public void syncByDateRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncDay, StasSyncStrategy stasSyncStrategy, Integer syncDay,
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException, ParseException { Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex,
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType); String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException, ParseException {
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType, sourceDbLinkSuffix);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -254,7 +265,7 @@ public class SyncDataJob implements Job {
.append(", 'YYYY-MM-DD HH24:MI:SS') = '") .append(", 'YYYY-MM-DD HH24:MI:SS') = '")
.append(sdf.format(currentEnd)) .append(sdf.format(currentEnd))
.append("'"); .append("'");
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString(), targetDbLinkSuffix);
} }
String whereClause; String whereClause;
@ -273,7 +284,7 @@ public class SyncDataJob implements Job {
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType); whereClause, sourceDbType, targetDbType, sourceDbLinkSuffix, targetDbLinkSuffix);
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null); "SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
@ -292,8 +303,9 @@ public class SyncDataJob implements Job {
*/ */
public void syncByIdRange(Connection sourceConn, Connection targetConn, public void syncByIdRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount, StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException { Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex,
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType); String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException {
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType, sourceDbLinkSuffix);
String syncOrigin = stasSyncStrategy.getSyncOrigin(); String syncOrigin = stasSyncStrategy.getSyncOrigin();
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId(); long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
@ -309,7 +321,7 @@ public class SyncDataJob implements Job {
.append(" = '") .append(" = '")
.append(currentEnd) .append(currentEnd)
.append("'"); .append("'");
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString()); deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString(), targetDbLinkSuffix);
} }
if (currentStart > currentEnd) { if (currentStart > currentEnd) {
@ -327,7 +339,7 @@ public class SyncDataJob implements Job {
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType); whereClause, sourceDbType, targetDbType, sourceDbLinkSuffix, targetDbLinkSuffix);
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced), saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null); "SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
@ -344,11 +356,11 @@ public class SyncDataJob implements Job {
/** /**
* 根据日期等于或ID等于删除数据 * 根据日期等于或ID等于删除数据
*/ */
public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException { public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause, String dbLinkSuffix) throws SQLException {
String sql; String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" + sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" +
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix + " WHERE " + whereClause;
} else { } else {
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toLowerCase() + "\".\"" + sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toLowerCase() + "\".\"" +
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause; stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
@ -362,12 +374,12 @@ public class SyncDataJob implements Job {
/** /**
* 获取表的日期范围 * 获取表的日期范围
*/ */
public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException {
String sql; String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix;
} else { } else {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
@ -386,12 +398,12 @@ public class SyncDataJob implements Job {
/** /**
* 获取表的ID范围 * 获取表的ID范围
*/ */
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException { public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException {
String sql; String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix;
} else { } else {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
@ -414,13 +426,14 @@ public class SyncDataJob implements Job {
public int syncDataBatch(Connection sourceConn, Connection targetConn, public int syncDataBatch(Connection sourceConn, Connection targetConn,
String sourceOwner, String targetOwner, String sourceOwner, String targetOwner,
String tableName, String whereClause, String tableName, String whereClause,
Integer sourceDbType, Integer targetDbType) throws SQLException { Integer sourceDbType, Integer targetDbType,
String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException {
int totalRows = 0; int totalRows = 0;
String selectSql; String selectSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
"\" WHERE " + whereClause; "\"" + sourceDbLinkSuffix + " WHERE " + whereClause;
} else { } else {
selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName + selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName +
"\" WHERE " + whereClause; "\" WHERE " + whereClause;
@ -436,7 +449,7 @@ public class SyncDataJob implements Job {
String insertSql; String insertSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) { if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES ("; insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\"" + targetDbLinkSuffix + " VALUES (";
} else { } else {
insertSql = "INSERT INTO \"" + tableName + "\" VALUES ("; insertSql = "INSERT INTO \"" + tableName + "\" VALUES (";
} }