oracle同步到pg数据库

This commit is contained in:
hekaiyu 2025-10-13 18:16:45 +08:00
parent 5effd1ead3
commit 268cacd2c1
4 changed files with 339 additions and 135 deletions

View File

@ -30,6 +30,10 @@ public class DBUtil {
return String.format("%s%s:%s%s", DBUtil.ORACLE_URL_PREFIX, ip, port, serveId);
}
public static String getPgUrl(String ip, Integer port, String serveId){
return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId);
}
public static Connection getConnection(String url, String username, String password, String Driver,int type) throws SQLException{
try {
Class.forName(Driver);

View File

@ -1,6 +1,7 @@
package org.jeecg.stasSyncStrategy.service.impl;
import lombok.RequiredArgsConstructor;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.StasDataSource;
@ -16,6 +17,8 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.transaction.annotation.Transactional;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
/**
* @Description: 同步策略表
@ -48,14 +51,14 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
String tableName = stasSyncStrategy.getTableName();
String columnName = stasSyncStrategy.getColumnName();
if (isTableExists(sourceConn, username, tableName)) {
if (isColumnExists(sourceConn, username, tableName, columnName)) {
if (isTableExists(sourceConn, username, tableName, sourceInfo.getType())) {
if (isColumnExists(sourceConn, username, tableName, columnName, sourceInfo.getType())) {
return true;
} else {
throw new JeecgBootException(String.format("警告: 表%s中依据字段%s不存在!", tableName, columnName));
}
} else {
throw new JeecgBootException(String.format("警告: %s用户下%s表不存在!", username, tableName)); // 修正了这里
throw new JeecgBootException(String.format("警告: %s用户下%s表不存在!", username, tableName));
}
} catch (SQLException e) {
throw new JeecgBootException(e.getMessage());
@ -75,15 +78,23 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
String targetUrl;
if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}else{
targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
try {
// 检查表是否已存在区分大小写
if (!isTableExists(targetConn, stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName())) {
// 检查表是否已存在
if (!isTableExists(targetConn, stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), targetInfo.getType())) {
// 获取源表结构
String createSql = getCreateTableSql(sourceConn, stasSyncStrategy.getSourceOwner(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName());
String createSql = getCreateTableSql(sourceConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
sourceInfo.getType(), targetInfo.getType());
// 在目标库创建表
try (Statement stmt = targetConn.createStatement()) {
@ -93,8 +104,9 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
e.printStackTrace();
throw new JeecgBootException(e.getMessage());
}
}else{
throw new JeecgBootException(String.format("在用户%s中%s表已存在请删除已存在的表", stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName()));
} else {
throw new JeecgBootException(String.format("在用户%s中%s表已存在请删除已存在的表",
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName()));
}
} catch (SQLException e) {
throw new JeecgBootException(String.format("处理表: %s时出错", stasSyncStrategy.getTableName()));
@ -107,17 +119,151 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
/**
* 获取创建表的SQL语句
* @param conn 数据库连接
* @param sourceOwner 源模式名
* @param targetOwner 目标模式名
* @param table 表名
* @param sourceDbType 源数据库类型
* @param targetDbType 目标数据库类型
* @return 创建表的SQL语句
* @throws SQLException 数据库异常
*/
private static String getCreateTableSql(Connection conn, String sourceOwner, String targetOwner, String table) throws SQLException {
if (!isTableExists(conn, sourceOwner, table)) {
private static String getCreateTableSql(Connection conn, String sourceOwner, String targetOwner,
String table, Integer sourceDbType, Integer targetDbType) throws SQLException {
if (!isTableExists(conn, sourceOwner, table, sourceDbType)) {
throw new SQLException("" + table + " 在源数据库中不存在");
}
// 更明确地处理用户名/模式名
return String.format("CREATE TABLE \"%s\".\"%s\" AS SELECT * FROM \"%s\".\"%s\" WHERE 1=0",
targetOwner.toUpperCase(), table, sourceOwner.toUpperCase(), table);
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) {
// Oracle转PostgreSQL的特殊处理
return generatePgCreateTableFromOracle(conn, sourceOwner, targetOwner, table);
} else {
// 其他情况使用原始方式
return String.format("CREATE TABLE \"%s\".\"%s\" AS SELECT * FROM \"%s\".\"%s\" WHERE 1=0",
targetOwner.toUpperCase(), table, sourceOwner.toUpperCase(), table);
}
}
/**
* 生成从Oracle到PostgreSQL的建表语句
*/
private static String generatePgCreateTableFromOracle(Connection conn, String sourceOwner,
String targetOwner, String tableName) throws SQLException {
StringBuilder sqlBuilder = new StringBuilder();
//模式名称
// sqlBuilder.append("CREATE TABLE \"").append(targetOwner).append("\".\"").append(tableName).append("\" (\n");
sqlBuilder.append("CREATE TABLE \"").append(tableName).append("\" (\n");
// 获取列信息
String columnSql = "SELECT column_name, data_type, data_length, data_precision, data_scale, nullable " +
"FROM all_tab_columns " +
"WHERE owner = ? AND table_name = ? " +
"ORDER BY column_id";
try (PreparedStatement pstmt = conn.prepareStatement(columnSql)) {
pstmt.setString(1, sourceOwner.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
ResultSet rs = pstmt.executeQuery();
boolean first = true;
while (rs.next()) {
if (!first) {
sqlBuilder.append(",\n");
}
first = false;
String columnName = rs.getString("column_name");
String dataType = rs.getString("data_type");
int dataLength = rs.getInt("data_length");
int dataPrecision = rs.getInt("data_precision");
int dataScale = rs.getInt("data_scale");
String nullable = rs.getString("nullable");
sqlBuilder.append(" \"").append(columnName).append("\" ");
sqlBuilder.append(mapOracleTypeToPg(dataType, dataLength, dataPrecision, dataScale));
if ("N".equalsIgnoreCase(nullable)) {
sqlBuilder.append(" NOT NULL");
}
}
}
// 获取主键信息
String pkSql = "SELECT cols.column_name " +
"FROM all_constraints cons, all_cons_columns cols " +
"WHERE cons.owner = ? AND cons.table_name = ? " +
"AND cons.constraint_type = 'P' " +
"AND cons.constraint_name = cols.constraint_name " +
"AND cons.owner = cols.owner " +
"ORDER BY cols.position";
try (PreparedStatement pstmt = conn.prepareStatement(pkSql)) {
pstmt.setString(1, sourceOwner.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
ResultSet rs = pstmt.executeQuery();
List<String> pkColumns = new ArrayList<>();
while (rs.next()) {
pkColumns.add(rs.getString("column_name"));
}
if (!pkColumns.isEmpty()) {
sqlBuilder.append(",\n PRIMARY KEY (");
for (int i = 0; i < pkColumns.size(); i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("\"").append(pkColumns.get(i)).append("\"");
}
sqlBuilder.append(")");
}
}
sqlBuilder.append("\n)");
return sqlBuilder.toString();
}
/**
* Oracle数据类型到PostgreSQL的映射
*/
private static String mapOracleTypeToPg(String oracleType, int length, int precision, int scale) {
switch (oracleType.toUpperCase()) {
case "VARCHAR2":
return "VARCHAR(" + (length > 0 ? length : 255) + ")";
case "NVARCHAR2":
return "VARCHAR(" + (length > 0 ? length : 255) + ")";
case "CHAR":
return "CHAR(" + (length > 0 ? length : 1) + ")";
case "NCHAR":
return "CHAR(" + (length > 0 ? length : 1) + ")";
case "NUMBER":
if (precision == 0 && scale == 0) {
return "NUMERIC"; // 未指定精度的NUMBER
} else if (scale == 0) {
return "NUMERIC(" + precision + ")"; // 整数
} else {
return "NUMERIC(" + precision + "," + scale + ")"; // 小数
}
case "FLOAT":
return "DOUBLE PRECISION";
case "DATE":
return "TIMESTAMP";
case "TIMESTAMP":
return "TIMESTAMP";
case "CLOB":
return "TEXT";
case "BLOB":
return "BYTEA";
case "RAW":
return "BYTEA";
case "LONG":
return "TEXT";
case "LONG RAW":
return "BYTEA";
default:
return "TEXT"; // 默认转换为TEXT
}
}
/**
@ -125,15 +271,28 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
* @param conn 数据库连接
* @param schema 模式名
* @param tableName 表名
* @param dbType 数据库类型
* @return 表是否存在
* @throws SQLException 数据库异常
*/
private static boolean isTableExists(Connection conn, String schema, String tableName) throws SQLException {
String sql = "SELECT COUNT(*) FROM all_tables WHERE owner = ? AND (table_name = ? OR table_name = ?)";
private static boolean isTableExists(Connection conn, String schema, String tableName, Integer dbType) throws SQLException {
String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT COUNT(*) FROM all_tables WHERE owner = ? AND (table_name = ? OR table_name = ?)";
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
sql = "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = ? AND table_name = ?";
} else {
throw new SQLException("不支持的数据库类型: " + dbType);
}
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, schema.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
pstmt.setString(3, tableName);
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
pstmt.setString(3, tableName); // Oracle需要检查大小写两种形式
}
try (ResultSet rs = pstmt.executeQuery()) {
return rs.next() && rs.getInt(1) > 0;
}
@ -146,15 +305,26 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
* @param schema 模式名
* @param tableName 表名
* @param columnName 列名
* @param dbType 数据库类型
* @return 列是否存在
* @throws SQLException 数据库异常
*/
private static boolean isColumnExists(Connection conn, String schema, String tableName, String columnName) throws SQLException {
String sql = "SELECT COUNT(*) FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?";
private static boolean isColumnExists(Connection conn, String schema, String tableName,
String columnName, Integer dbType) throws SQLException {
String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT COUNT(*) FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?";
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
sql = "SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = ? AND table_name = ? AND column_name = ?";
} else {
throw new SQLException("不支持的数据库类型: " + dbType);
}
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, schema.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
pstmt.setString(3, columnName.toUpperCase());
try (ResultSet rs = pstmt.executeQuery()) {
return rs.next() && rs.getInt(1) > 0;
}

View File

@ -168,8 +168,10 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
@Operation(summary="任务配置表-暂停定时任务")
@GetMapping(value = "/pause")
public Result<String> pause(@RequestParam(name="taskId",required=true) String taskId) {
StasTaskConfig byId = stasTaskConfigService.getById(taskId);
stasTaskConfigService.pauseQuartzJob(byId.getQuartzId());
StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId);
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey());
stasTaskConfigService.updateById(stasTaskConfig);
stasTaskConfigService.pauseQuartzJob(stasTaskConfig.getQuartzId());
return Result.OK("暂停成功!");
}
@ -183,8 +185,10 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
@Operation(summary="任务配置表-启动定时任务")
@GetMapping(value = "/resume")
public Result<String> resume(@RequestParam(name="taskId",required=true) String taskId) {
StasTaskConfig byId = stasTaskConfigService.getById(taskId);
stasTaskConfigService.resumeQuartzJob(byId.getQuartzId());
StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId);
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.IN_OPERATION.getKey());
stasTaskConfigService.updateById(stasTaskConfig);
stasTaskConfigService.resumeQuartzJob(stasTaskConfig.getQuartzId());
return Result.OK("启动成功!");
}

View File

@ -4,6 +4,7 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.StasDataSource;
@ -25,9 +26,7 @@ import java.util.Date;
import java.util.List;
/**
* 示例带参定时任务
*
* @Author Scott
* 数据同步任务(支持Oracle和PostgreSQL)
*/
@Slf4j
public class SyncDataJob implements Job {
@ -39,9 +38,6 @@ public class SyncDataJob implements Job {
@Resource
private StasSyncStrategyMapper stasSyncStrategyMapper;
/**
* 若参数变量名修改 QuartzJobController中也需对应修改
*/
private String parameter;
public void setParameter(String parameter) {
@ -50,102 +46,106 @@ public class SyncDataJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
try {
syncDataByFieldType(parameter);
} catch (SQLException e) {
throw new JeecgBootException(e.getMessage());
}
}
try {
syncDataByFieldType(parameter);
} catch (SQLException e) {
throw new JeecgBootException(e.getMessage());
}
}
/**
* 根据字段类型同步数据
* @param taskId 任务id
* @throws SQLException 数据库异常
*/
public void syncDataByFieldType(String taskId) throws SQLException {
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId);
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
String targetUrl;
if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}else{
targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
// 设置目标连接为批量提交模式
targetConn.setAutoCommit(false);
List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper.
selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper
.selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
System.out.println("开始同步表: " + stasSyncStrategy.getTableName() + " (依据字段: " + stasSyncStrategy.getColumnName() + ")");
log.info("开始同步表: {} (依据字段: {})", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName());
long startTime = System.currentTimeMillis();
try {
if (isDateColumn(sourceConn,stasSyncStrategy)) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncDay());
if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType());
} else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncCount());
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType());
}
} catch (SQLException e) {
System.err.println("同步表 " + stasSyncStrategy.getTableName() + " 时出错: " + e.getMessage());
} catch (SQLException | ParseException e) {
log.error("同步表 {} 时出错: {}", stasSyncStrategy.getTableName(), e.getMessage());
targetConn.rollback();
throw new JeecgBootException(e.getMessage());
} catch (ParseException e) {
throw new JeecgBootException(e.getMessage());
}
long endTime = System.currentTimeMillis();
System.out.println("" + stasSyncStrategy.getTableName() + " 同步耗时: " + (endTime - startTime) + " 毫秒");
log.info("表 {} 同步耗时: {} 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime));
}
}
}
public boolean isDateColumn(Connection sourceConn, StasSyncStrategy stasSyncStrategy) throws SQLException {
String sql = "SELECT data_type FROM all_tab_columns "
+ "WHERE owner = ? AND table_name = ? AND column_name = ?";
/**
* 判断字段是否为日期类型
*/
public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT data_type FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?";
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
sql = "SELECT data_type FROM information_schema.columns " +
"WHERE table_schema = ? AND table_name = ? AND column_name = ?";
} else {
throw new SQLException("不支持的数据库类型: " + dbType);
}
try (PreparedStatement pstmt = sourceConn.prepareStatement(sql)) {
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, stasSyncStrategy.getSourceOwner().toUpperCase());
pstmt.setString(2, stasSyncStrategy.getTableName().toUpperCase());
pstmt.setString(3, stasSyncStrategy.getColumnName().toUpperCase());
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
String dataType = rs.getString("data_type");
String dataType = rs.getString("data_type").toUpperCase();
// 判断是否为日期/时间类型
return dataType != null &&
(dataType.equalsIgnoreCase("DATE") ||
dataType.equalsIgnoreCase("TIMESTAMP") ||
dataType.equalsIgnoreCase("TIMESTAMP WITH TIME ZONE") ||
dataType.equalsIgnoreCase("TIMESTAMP WITH LOCAL TIME ZONE"));
return dataType.contains("DATE") || dataType.contains("TIME");
}
} catch (SQLException e) {
e.printStackTrace();
throw e; // 重新抛出异常让调用方处理
}
return false;
}
/**
* 按日期范围同步数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param stasSyncStrategy 表配置
* @param syncCount 同步起始位置
* @throws SQLException 数据库异常
*/
public void syncByDateRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException, ParseException {
StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType) throws SQLException, ParseException {
// 获取最小和最大日期
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy);
System.out.println("日期范围: " + dateRange.getMinDate() + "" + dateRange.getMaxDate());
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType);
log.info("日期范围: {} 至 {}", dateRange.getMinDate(), dateRange.getMaxDate());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(stasSyncStrategy.getSyncOrigin()) : dateRange.getMinDate();
Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate();
Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ?
lastSyncedDate : dateRange.getMinDate();
@ -154,36 +154,40 @@ public class SyncDataJob implements Job {
currentEnd = dateRange.getMaxDate();
}
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
"', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')";
// 根据数据库类型构建不同的日期条件
String whereClause;
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
"', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')";
} else {
whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TIMESTAMP '" + sdf.format(currentStart) +
"' AND TIMESTAMP '" + sdf.format(currentEnd) + "'";
}
System.out.println("同步日期范围: " + sdf.format(currentStart) + "" + sdf.format(currentEnd));
log.info("同步日期范围: {} 至 {}", sdf.format(currentStart), sdf.format(currentEnd));
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause);
System.out.println("已同步 " + rowsSynced + " 行数据");
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType);
log.info("已同步 {} 行数据", rowsSynced);
// 只在最后记录同步位置
// 更新同步位置
if (currentEnd != null) {
stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
System.out.println("最终同步位置已更新为: " + sdf.format(currentEnd));
log.info("最终同步位置已更新为: {}", sdf.format(currentEnd));
}
}
/**
* 按ID范围同步数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param stasSyncStrategy 表配置
* @param syncCount 每次同步的记录数
* @throws SQLException 数据库异常
*/
public void syncByIdRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException {
StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType) throws SQLException {
// 获取最小和最大ID
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy);
System.out.println("ID范围: " + idRange.getMinId() + "" + idRange.getMaxId());
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType);
log.info("ID范围: {} 至 {}", idRange.getMinId(), idRange.getMaxId());
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
@ -198,35 +202,40 @@ public class SyncDataJob implements Job {
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
System.out.println("同步ID范围: " + currentStart + "" + currentEnd);
log.info("同步ID范围: {} 至 {}", currentStart, currentEnd);
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause);
System.out.println("已同步 " + rowsSynced + " 行数据");
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType);
log.info("已同步 {} 行数据", rowsSynced);
// 只在最后记录同步位置
// 更新同步位置
if (currentEnd > 0) {
stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
System.out.println("最终同步位置已更新为: " + currentEnd);
log.info("最终同步位置已更新为: {}", currentEnd);
}
}
/**
* 获取表的日期范围
* @param conn 数据库连接
* @param stasSyncStrategy 表配置
* @return 日期范围对象
* @throws SQLException 数据库异常
*/
public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException {
String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, MAX(" + stasSyncStrategy.getColumnName() + ") as max_date FROM " +
stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName();
public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
} else {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
}
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return new DateRangeVO(rs.getDate("min_date"), rs.getDate("max_date"));
return new DateRangeVO(rs.getTimestamp("min_date"), rs.getTimestamp("max_date"));
}
}
throw new SQLException("无法获取日期范围");
@ -234,14 +243,18 @@ public class SyncDataJob implements Job {
/**
* 获取表的ID范围
* @param conn 数据库连接
* @param stasSyncStrategy 表配置
* @return ID范围对象
* @throws SQLException 数据库异常
*/
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException {
String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, MAX(" + stasSyncStrategy.getColumnName() + ") as max_id FROM " +
stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName();
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
} else {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
}
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
@ -253,50 +266,66 @@ public class SyncDataJob implements Job {
}
/**
* 同步一批数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param tableName 表名
* @param whereClause 条件子句
* @return 同步的行数
* @throws SQLException 数据库异常
* 同步一批数据(支持跨数据库类型)
*/
public int syncDataBatch(Connection sourceConn, Connection targetConn,
String sourceOwner, String targetOwner,
String tableName, String whereClause) throws SQLException {
String tableName, String whereClause,
Integer sourceDbType, Integer targetDbType) throws SQLException {
int totalRows = 0;
String selectSql;
// 从源表读取数据使用带引号的表名保持大小写
String selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
"\" WHERE " + whereClause;
// 构建查询SQL(根据源数据库类型)
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
"\" WHERE " + whereClause;
} else {
selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName +
"\" WHERE " + whereClause;
}
try (Statement sourceStmt = sourceConn.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = sourceStmt.executeQuery(selectSql)) {
// 设置获取大小为5000优化大表读取
sourceStmt.setFetchSize(5000);
// 获取列信息
sourceStmt.setFetchSize(5000); // 优化大表读取
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// 准备插入语句使用带引号的表名保持大小写
StringBuilder insertSql = new StringBuilder("INSERT INTO \"")
.append(targetOwner.toUpperCase()).append("\".\"").append(tableName).append("\" VALUES (");
for (int i = 1; i <= columnCount; i++) {
if (i > 1) insertSql.append(", ");
insertSql.append("?");
// 构建插入SQL(根据目标数据库类型)
String insertSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES (";
} else {
insertSql = "INSERT INTO \"" + tableName + "\" VALUES (";
}
insertSql.append(")");
for (int i = 1; i <= columnCount; i++) {
if (i > 1) insertSql += ", ";
insertSql += "?";
}
insertSql += ")";
// 批量插入
try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql.toString())) {
try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) {
int batchSize = 0;
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
pstmt.setObject(i, rs.getObject(i));
Object value = rs.getObject(i);
// 特殊处理Oracle的TIMESTAMP类型到PostgreSQL
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) {
String columnType = metaData.getColumnTypeName(i);
if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) {
Timestamp ts = rs.getTimestamp(i);
if (ts != null) {
pstmt.setTimestamp(i, ts);
continue;
}
}
}
pstmt.setObject(i, value);
}
pstmt.addBatch();
batchSize++;
@ -319,12 +348,9 @@ public class SyncDataJob implements Job {
/**
* 计算指定日期加上指定天数后的日期
* @param date 基准日期
* @param days 要添加的天数
* @return 计算后的日期
*/
private static Date addDays(Date date, int days) {
long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000;
return new Date(time);
}
}
}