parent
bfaf27c054
commit
644fab9430
|
|
@ -100,10 +100,7 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
|
||||||
public List<String> queryTableList(String sourceId, String username) {
|
public List<String> queryTableList(String sourceId, String username) {
|
||||||
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
|
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
|
||||||
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
|
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
|
||||||
// 如果配置了DBLink,则在元数据视图后追加@dblink以查询远程数据库的所有表
|
String sql = "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?";
|
||||||
String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink())
|
|
||||||
? "@" + stasDataSource.getDbLink().trim() : "";
|
|
||||||
String sql = "SELECT TABLE_NAME FROM ALL_TABLES" + dbLinkSuffix + " WHERE OWNER = ?";
|
|
||||||
return queryDatabaseMetadata(stasDataSource, sql, "TABLE_NAME", username);
|
return queryDatabaseMetadata(stasDataSource, sql, "TABLE_NAME", username);
|
||||||
} else {
|
} else {
|
||||||
String sql = "SELECT tablename AS table_name FROM pg_tables WHERE schemaname = ?";
|
String sql = "SELECT tablename AS table_name FROM pg_tables WHERE schemaname = ?";
|
||||||
|
|
@ -115,9 +112,7 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
|
||||||
public List<String> queryColumnList(String sourceId, String username, String tableName) {
|
public List<String> queryColumnList(String sourceId, String username, String tableName) {
|
||||||
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
|
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
|
||||||
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
|
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
|
||||||
String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink())
|
String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = ? AND TABLE_NAME = ?";
|
||||||
? "@" + stasDataSource.getDbLink().trim() : "";
|
|
||||||
String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS" + dbLinkSuffix + " WHERE OWNER = ? AND TABLE_NAME = ?";
|
|
||||||
return queryDatabaseMetadata(stasDataSource, sql, "COLUMN_NAME", username, tableName);
|
return queryDatabaseMetadata(stasDataSource, sql, "COLUMN_NAME", username, tableName);
|
||||||
} else {
|
} else {
|
||||||
String sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
|
String sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
|
||||||
|
|
|
||||||
|
|
@ -105,14 +105,6 @@ public class SyncDataJob implements Job {
|
||||||
targetType = 0;
|
targetType = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 计算 Oracle DBLink 后缀(仅当数据源为 Oracle 且配置了 DBLink 时生效),用于 SQL 透明拼接 @<dblink>
|
|
||||||
String sourceDbLinkSuffix = (SourceDataTypeEnum.ORACLE.getKey().equals(sourceInfo.getType())
|
|
||||||
&& StringUtils.isNotBlank(sourceInfo.getDbLink()))
|
|
||||||
? "@" + sourceInfo.getDbLink().trim() : "";
|
|
||||||
String targetDbLinkSuffix = (SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())
|
|
||||||
&& StringUtils.isNotBlank(targetInfo.getDbLink()))
|
|
||||||
? "@" + targetInfo.getDbLink().trim() : "";
|
|
||||||
|
|
||||||
// 获取自动探测的超时值
|
// 获取自动探测的超时值
|
||||||
int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo);
|
int[] sourceTimeouts = timeoutProbeService.getEffectiveTimeout(sourceInfo);
|
||||||
int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo);
|
int[] targetTimeouts = timeoutProbeService.getEffectiveTimeout(targetInfo);
|
||||||
|
|
@ -131,7 +123,7 @@ public class SyncDataJob implements Job {
|
||||||
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
|
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
|
||||||
saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null);
|
saveSyncLog(recordId, String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()), null, null, null);
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType(), sourceDbLinkSuffix);
|
boolean isDate = isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType());
|
||||||
|
|
||||||
// 获取自适应批次大小
|
// 获取自适应批次大小
|
||||||
int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay());
|
int effectiveSyncDay = syncAdaptiveStrategy.getEffectiveSyncDay(stasSyncStrategy, stasTaskConfig.getSyncDay());
|
||||||
|
|
@ -144,12 +136,10 @@ public class SyncDataJob implements Job {
|
||||||
try {
|
try {
|
||||||
if (isDate) {
|
if (isDate) {
|
||||||
syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
|
syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
|
||||||
effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex,
|
effectiveSyncDay, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex);
|
||||||
sourceDbLinkSuffix, targetDbLinkSuffix);
|
|
||||||
} else {
|
} else {
|
||||||
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
|
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
|
||||||
effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex,
|
effectiveSyncCount, sourceInfo.getType(), targetInfo.getType(), recordId, retryIndex);
|
||||||
sourceDbLinkSuffix, targetDbLinkSuffix);
|
|
||||||
}
|
}
|
||||||
syncSuccess = true;
|
syncSuccess = true;
|
||||||
// 同步成功,逐步恢复批次
|
// 同步成功,逐步恢复批次
|
||||||
|
|
@ -215,10 +205,10 @@ public class SyncDataJob implements Job {
|
||||||
/**
|
/**
|
||||||
* 判断字段是否为日期类型
|
* 判断字段是否为日期类型
|
||||||
*/
|
*/
|
||||||
public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException {
|
public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
|
||||||
String sql;
|
String sql;
|
||||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||||
sql = "SELECT data_type FROM all_tab_columns" + dbLinkSuffix + " WHERE owner = ? AND table_name = ? AND column_name = ?";
|
sql = "SELECT data_type FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?";
|
||||||
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
|
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
|
||||||
sql = "SELECT data_type FROM information_schema.columns " +
|
sql = "SELECT data_type FROM information_schema.columns " +
|
||||||
"WHERE table_schema = ? AND table_name = ? AND column_name = ?";
|
"WHERE table_schema = ? AND table_name = ? AND column_name = ?";
|
||||||
|
|
@ -245,9 +235,8 @@ public class SyncDataJob implements Job {
|
||||||
*/
|
*/
|
||||||
public void syncByDateRange(Connection sourceConn, Connection targetConn,
|
public void syncByDateRange(Connection sourceConn, Connection targetConn,
|
||||||
StasSyncStrategy stasSyncStrategy, Integer syncDay,
|
StasSyncStrategy stasSyncStrategy, Integer syncDay,
|
||||||
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex,
|
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException, ParseException {
|
||||||
String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException, ParseException {
|
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType);
|
||||||
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType, sourceDbLinkSuffix);
|
|
||||||
|
|
||||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||||
|
|
||||||
|
|
@ -265,7 +254,7 @@ public class SyncDataJob implements Job {
|
||||||
.append(", 'YYYY-MM-DD HH24:MI:SS') = '")
|
.append(", 'YYYY-MM-DD HH24:MI:SS') = '")
|
||||||
.append(sdf.format(currentEnd))
|
.append(sdf.format(currentEnd))
|
||||||
.append("'");
|
.append("'");
|
||||||
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString(), targetDbLinkSuffix);
|
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
String whereClause;
|
String whereClause;
|
||||||
|
|
@ -284,7 +273,7 @@ public class SyncDataJob implements Job {
|
||||||
|
|
||||||
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
|
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
|
||||||
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
|
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
|
||||||
whereClause, sourceDbType, targetDbType, sourceDbLinkSuffix, targetDbLinkSuffix);
|
whereClause, sourceDbType, targetDbType);
|
||||||
|
|
||||||
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
|
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
|
||||||
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
|
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
|
||||||
|
|
@ -303,9 +292,8 @@ public class SyncDataJob implements Job {
|
||||||
*/
|
*/
|
||||||
public void syncByIdRange(Connection sourceConn, Connection targetConn,
|
public void syncByIdRange(Connection sourceConn, Connection targetConn,
|
||||||
StasSyncStrategy stasSyncStrategy, Integer syncCount,
|
StasSyncStrategy stasSyncStrategy, Integer syncCount,
|
||||||
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex,
|
Integer sourceDbType, Integer targetDbType, String recordId, int retryIndex) throws SQLException {
|
||||||
String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException {
|
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType);
|
||||||
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType, sourceDbLinkSuffix);
|
|
||||||
|
|
||||||
String syncOrigin = stasSyncStrategy.getSyncOrigin();
|
String syncOrigin = stasSyncStrategy.getSyncOrigin();
|
||||||
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
|
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
|
||||||
|
|
@ -321,7 +309,7 @@ public class SyncDataJob implements Job {
|
||||||
.append(" = '")
|
.append(" = '")
|
||||||
.append(currentEnd)
|
.append(currentEnd)
|
||||||
.append("'");
|
.append("'");
|
||||||
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString(), targetDbLinkSuffix);
|
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, deleteClause.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (currentStart > currentEnd) {
|
if (currentStart > currentEnd) {
|
||||||
|
|
@ -339,7 +327,7 @@ public class SyncDataJob implements Job {
|
||||||
|
|
||||||
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
|
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
|
||||||
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
|
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
|
||||||
whereClause, sourceDbType, targetDbType, sourceDbLinkSuffix, targetDbLinkSuffix);
|
whereClause, sourceDbType, targetDbType);
|
||||||
|
|
||||||
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
|
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced),
|
||||||
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
|
"SUCCESS", batchSizeBefore, retryIndex > 0 ? retryIndex : null);
|
||||||
|
|
@ -356,11 +344,11 @@ public class SyncDataJob implements Job {
|
||||||
/**
|
/**
|
||||||
* 根据日期等于或ID等于删除数据
|
* 根据日期等于或ID等于删除数据
|
||||||
*/
|
*/
|
||||||
public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause, String dbLinkSuffix) throws SQLException {
|
public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException {
|
||||||
String sql;
|
String sql;
|
||||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||||
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" +
|
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" +
|
||||||
stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix + " WHERE " + whereClause;
|
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
|
||||||
} else {
|
} else {
|
||||||
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toLowerCase() + "\".\"" +
|
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toLowerCase() + "\".\"" +
|
||||||
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
|
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
|
||||||
|
|
@ -374,12 +362,12 @@ public class SyncDataJob implements Job {
|
||||||
/**
|
/**
|
||||||
* 获取表的日期范围
|
* 获取表的日期范围
|
||||||
*/
|
*/
|
||||||
public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException {
|
public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
|
||||||
String sql;
|
String sql;
|
||||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
|
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
|
||||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
|
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
|
||||||
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix;
|
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
|
||||||
} else {
|
} else {
|
||||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
|
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
|
||||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
|
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
|
||||||
|
|
@ -398,12 +386,12 @@ public class SyncDataJob implements Job {
|
||||||
/**
|
/**
|
||||||
* 获取表的ID范围
|
* 获取表的ID范围
|
||||||
*/
|
*/
|
||||||
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String dbLinkSuffix) throws SQLException {
|
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
|
||||||
String sql;
|
String sql;
|
||||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
|
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
|
||||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
|
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
|
||||||
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + dbLinkSuffix;
|
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
|
||||||
} else {
|
} else {
|
||||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
|
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
|
||||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
|
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
|
||||||
|
|
@ -426,14 +414,13 @@ public class SyncDataJob implements Job {
|
||||||
public int syncDataBatch(Connection sourceConn, Connection targetConn,
|
public int syncDataBatch(Connection sourceConn, Connection targetConn,
|
||||||
String sourceOwner, String targetOwner,
|
String sourceOwner, String targetOwner,
|
||||||
String tableName, String whereClause,
|
String tableName, String whereClause,
|
||||||
Integer sourceDbType, Integer targetDbType,
|
Integer sourceDbType, Integer targetDbType) throws SQLException {
|
||||||
String sourceDbLinkSuffix, String targetDbLinkSuffix) throws SQLException {
|
|
||||||
int totalRows = 0;
|
int totalRows = 0;
|
||||||
String selectSql;
|
String selectSql;
|
||||||
|
|
||||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
|
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
|
||||||
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
|
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
|
||||||
"\"" + sourceDbLinkSuffix + " WHERE " + whereClause;
|
"\" WHERE " + whereClause;
|
||||||
} else {
|
} else {
|
||||||
selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName +
|
selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName +
|
||||||
"\" WHERE " + whereClause;
|
"\" WHERE " + whereClause;
|
||||||
|
|
@ -449,7 +436,7 @@ public class SyncDataJob implements Job {
|
||||||
|
|
||||||
String insertSql;
|
String insertSql;
|
||||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
|
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
|
||||||
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\"" + targetDbLinkSuffix + " VALUES (";
|
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES (";
|
||||||
} else {
|
} else {
|
||||||
insertSql = "INSERT INTO \"" + tableName + "\" VALUES (";
|
insertSql = "INSERT INTO \"" + tableName + "\" VALUES (";
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue
Block a user