From 7d2846af185ce354f990910a8a8c2fea83d74c96 Mon Sep 17 00:00:00 2001 From: panbaolin Date: Thu, 28 May 2026 14:37:50 +0800 Subject: [PATCH] =?UTF-8?q?fix:1.=E5=AE=8C=E6=88=90=E9=87=8D=E6=9E=84?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=90=8C=E6=AD=A5=E5=90=8E=E5=8F=B0=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jeecg/common/constant/CommonConstant.java | 2 + .../common/properties/DataSyncProperties.java | 23 ++ .../modules/base/entity/StasSyncStrategy.java | 4 + .../controller/StasDataSourceController.java | 3 +- .../service/impl/QuartzJobServiceImpl.java | 3 +- .../impl/StasSyncStrategyServiceImpl.java | 145 ++------ .../controller/StasSyncLogController.java | 6 +- .../controller/StasTaskConfigController.java | 19 - .../taskConfig/job/AbstractSyncDataJob.java | 223 ++++++++++++ .../taskConfig/job/CustomThreadFactory.java | 20 + .../job/SyncDataByDateColumnJob.java | 103 ++++++ .../job/SyncDataByIncrementColumnJob.java | 107 ++++++ .../org/jeecg/taskConfig/job/SyncDataJob.java | 344 +++++++----------- .../service/IStasTaskConfigService.java | 6 - .../impl/StasTaskConfigServiceImpl.java | 285 +-------------- 15 files changed, 661 insertions(+), 632 deletions(-) create mode 100644 jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DataSyncProperties.java create mode 100644 jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/AbstractSyncDataJob.java create mode 100644 jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/CustomThreadFactory.java create mode 100644 jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByDateColumnJob.java create mode 100644 jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByIncrementColumnJob.java diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java index 7ecd802..6122d61 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java @@ -316,4 +316,6 @@ public interface CommonConstant { * 输运模拟时序分析数据KEY */ String TRANSPORT_TIMING_ANALYSIS = "transport:timing_analysis:"; + + String DATA_SYNC_FREQUENCY = "data_sync:frequency"; } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DataSyncProperties.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DataSyncProperties.java new file mode 100644 index 0000000..f911f9c --- /dev/null +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DataSyncProperties.java @@ -0,0 +1,23 @@ +package org.jeecg.common.properties; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * 数据同步配置 + */ +@Data +@Component +@ConfigurationProperties(prefix = "data-sync") +public class DataSyncProperties { + + /** + * 最小间隔(分钟) + */ + private Integer minMinute; + /** + * 同时最大执行数据表数量 + */ + private Integer maxExecNum; +} diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java index b2ff227..effafb4 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/StasSyncStrategy.java @@ -55,4 +55,8 @@ public class StasSyncStrategy implements Serializable { private String columnName; /**同步位置*/ private String syncOrigin; + /** + * 同步数量/天数,优先级高于同步任务的数据 + */ + private Integer syncCount; } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java index 46967ab..da355dd 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/dataSource/controller/StasDataSourceController.java @@ -1,6 +1,7 @@ package org.jeecg.dataSource.controller; +import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -114,7 +115,7 @@ public class StasDataSourceController extends JeecgController tableList(String sourceId, String username) { List tableNameList= stasDataSourceService.queryTableList(sourceId, username); - if (tableNameList != null && !"".equals(tableNameList)){ + if (CollUtil.isNotEmpty(tableNameList)){ return Result.OK(tableNameList); } return Result.error("获取源端用户为空"); diff --git a/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java index 3d47d69..4812e5a 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/quartz/service/impl/QuartzJobServiceImpl.java @@ -137,7 +137,8 @@ public class QuartzJobServiceImpl extends ServiceImpl stasSyncStrategys) { if(null != stasSyncStrategys && stasSyncStrategys.size() > 0) { @@ -144,7 +144,7 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl 0) { - // 数字类型:NUMBER(p,s), NUMERIC(p,s) 等 ddl.append("(").append(dataPrecision); if (dataScale > 0) { ddl.append(",").append(dataScale); @@ -218,9 +214,10 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl pkColumns = new ArrayList<>(); - - while (rs.next()) { - pkColumns.add(rs.getString("column_name")); - } - - if (!pkColumns.isEmpty()) { - sqlBuilder.append(",\n PRIMARY KEY ("); - for (int i = 0; i < pkColumns.size(); i++) { - if (i > 0) { - sqlBuilder.append(", "); - } - sqlBuilder.append("\"").append(pkColumns.get(i)).append("\""); - } - sqlBuilder.append(")"); - } - } - sqlBuilder.append("\n)"); return sqlBuilder.toString(); } @@ -336,41 +298,35 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl 0 ? length : 255) + ")"; + return "varchar(" + (length > 0 ? length : 255) + ")"; case "NVARCHAR2": - return "VARCHAR(" + (length > 0 ? length : 255) + ")"; + return "varchar(" + (length > 0 ? length : 255) + ")"; case "CHAR": - return "CHAR(" + (length > 0 ? length : 1) + ")"; + return "char(" + (length > 0 ? length : 1) + ")"; case "NCHAR": - return "CHAR(" + (length > 0 ? length : 1) + ")"; + return "char(" + (length > 0 ? length : 1) + ")"; case "NUMBER": - if (precision == 0 && scale == 0) { - return "NUMERIC"; // 未指定精度的NUMBER - } else if (scale == 0) { - return "NUMERIC(" + precision + ")"; // 整数 - } else { - return "NUMERIC(" + precision + "," + scale + ")"; // 小数 - } + return "numeric"; case "FLOAT": - return "DOUBLE PRECISION"; + return "float4"; case "DATE": return "TIMESTAMP"; case "TIMESTAMP": - return "TIMESTAMP"; + return "timestamp"; case "CLOB": - return "TEXT"; + return "text"; case "BLOB": - return "BYTEA"; + return "bytea"; case "RAW": - return "BYTEA"; + return "bytea"; case "LONG": - return "TEXT"; + return "text"; case "LONG RAW": - return "BYTEA"; + return "bytea"; default: - return "TEXT"; // 默认转换为TEXT + return "text"; // 默认转换为TEXT } } @@ -389,55 +345,20 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl 0; - } - } - } - - /** - * 检查列是否存在 - * @param conn 数据库连接 - * @param schema 模式名 - * @param tableName 表名 - * @param columnName 列名 - * @param dbType 数据库类型 - * @return 列是否存在 - * @throws SQLException 数据库异常 - */ - private boolean isColumnExists(Connection conn, String schema,String dbLink, String tableName, - String columnName, Integer dbType) throws SQLException { - String sql; - if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { - dbLink = StrUtil.isNotBlank(dbLink)?dbLink:""; - sql = "SELECT COUNT(*) FROM all_tab_columns"+dbLink+" WHERE owner = ? AND table_name = ? AND column_name = ?"; - } else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) { - sql = "SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = ? AND table_name = ? AND column_name = ?"; - } else { - throw new SQLException("不支持的数据库类型: " + dbType); - } - - try (PreparedStatement pstmt = conn.prepareStatement(sql)) { - pstmt.setString(1, schema.toUpperCase()); - pstmt.setString(2, tableName.toUpperCase()); - pstmt.setString(3, columnName.toUpperCase()); - try (ResultSet rs = pstmt.executeQuery()) { return rs.next() && rs.getInt(1) > 0; } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/syncLog/controller/StasSyncLogController.java b/jeecg-module-sync/src/main/java/org/jeecg/syncLog/controller/StasSyncLogController.java index f7362c9..afc8193 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/syncLog/controller/StasSyncLogController.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/syncLog/controller/StasSyncLogController.java @@ -36,7 +36,7 @@ public class StasSyncLogController extends JeecgController> queryPageList(StasSyncLog stasSyncLog, - @RequestParam(name="pageNo", defaultValue="1") Integer pageNo, + @RequestParam(name="pageNum", defaultValue="1") Integer pageNum, @RequestParam(name="pageSize", defaultValue="10") Integer pageSize, HttpServletRequest req) { QueryWrapper queryWrapper = QueryGenerator.initQueryWrapper(stasSyncLog, req.getParameterMap()); queryWrapper.orderByAsc("start_time"); - Page page = new Page(pageNo, pageSize); + Page page = new Page(pageNum, pageSize); IPage pageList = stasSyncLogService.page(page, queryWrapper); return Result.OK(pageList); } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java index 8741a15..49b743f 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/controller/StasTaskConfigController.java @@ -1,21 +1,15 @@ package org.jeecg.taskConfig.controller; -import java.sql.SQLException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.stream.Collectors; -import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.servlet.http.HttpServletRequest; import org.apache.commons.lang.StringUtils; import org.jeecg.common.api.vo.Result; -import org.jeecg.common.constant.enums.SyncTaskStatusEnum; import org.jeecg.common.system.query.QueryGenerator; import org.jeecg.dataSource.service.IStasDataSourceService; import org.jeecg.modules.base.entity.StasDataSource; -import org.jeecg.modules.base.entity.StasSyncStrategy; import org.jeecg.modules.base.entity.StasTaskConfig; -import org.jeecg.stasSyncStrategy.service.IStasSyncStrategyService; import org.jeecg.taskConfig.service.IStasTaskConfigService; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; @@ -178,19 +172,6 @@ public class StasTaskConfigController extends JeecgController 1) insertSql += ", "; + insertSql += "?"; + } + insertSql += ")"; + + // 批量插入 + try (PreparedStatement pstmt = this.targetConn.prepareStatement(insertSql)) { + int batchSize = 0; + while (rs.next()) { + for (int i = 1; i <= columnCount; i++) { + Object value = rs.getObject(i); + + // 特殊处理Oracle的TIMESTAMP类型到PostgreSQL + if (SourceDataTypeEnum.ORACLE.getKey().equals(this.sourceInfo.getType()) && SourceDataTypeEnum.POSTGRES.getKey().equals(this.targetInfo.getType())) { + String columnType = metaData.getColumnTypeName(i); + if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) { + Timestamp ts = rs.getTimestamp(i); + if (ts != null) { + pstmt.setTimestamp(i, ts); + continue; + } + } + } + pstmt.setObject(i, value); + } + pstmt.addBatch(); + batchSize++; + totalRows++; + + if (batchSize % 5000 == 0) { + pstmt.executeBatch(); + this.targetConn.commit(); + batchSize = 0; + } + } + if (batchSize > 0) { + pstmt.executeBatch(); + this.targetConn.commit(); + } + } + } + return totalRows; + } + + /** + * 同步数据前先走一次删除操作,防止上次回滚失败,本次插入历史数据 + * @param whereClause + * @throws SQLException + */ + protected void confirmDeletion(String whereClause) throws SQLException { + String delSql; + if (SourceDataTypeEnum.ORACLE.getKey().equals(this.sourceInfo.getType())) { + delSql = "DELETE FROM \"" + this.syncStrategy.getTargetOwner().toUpperCase() + "\".\"" + this.syncStrategy.getTableName().toUpperCase() + + "\"" + this.targetDBLink + " WHERE " + whereClause; + } else { + delSql = "delete from \"" + this.syncStrategy.getTargetOwner().toLowerCase() + "\".\"" + this.syncStrategy.getTableName().toLowerCase() + + "\" where " + whereClause; + } + try (PreparedStatement pstmt = this.targetConn.prepareStatement(delSql)) { + pstmt.execute(); + this.targetConn.commit(); + } + } + + /** + * 保存同步日志 + * @param desc + */ + protected void saveSyncLog(String desc){ + StasSyncLog stasSyncLog = new StasSyncLog(); + stasSyncLog.setRecordId(this.recordId); + stasSyncLog.setStartTime(new Date()); + stasSyncLog.setDescription(desc.length()>300?desc.substring(0,300):desc); + stasSyncLog.setTaskId(this.taskConfig.getId()); + syncLogMapper.insert(stasSyncLog); + } + + /** + * 保存表批次同步数量 + * @param num + */ + protected void saveSyncNum(Integer num){ + StasSyncNum stasSyncNum = new StasSyncNum(); + stasSyncNum.setRecordId(this.recordId); + stasSyncNum.setTableName(this.syncStrategy.getTableName()); + stasSyncNum.setSyncNum(num); + stasSyncNum.setTaskId(this.taskConfig.getId()); + this.syncNumMapper.insert(stasSyncNum); + } + + /** + * 处理同步过程报错问题 + * + * @param e the exception + */ + public void uncaughtException(Throwable e) { + String tableName = this.syncStrategy.getSourceOwner()+"."+this.syncStrategy.getTableName(); + String errLog = String.format("%s表数据同步出现错误,原因为:%s",tableName, e.getMessage()); + log.error(errLog); + saveSyncLog(errLog); + try { + targetConn.rollback(); + } catch (SQLException ex) { + log.error("数据回滚出现错误,原因为:{}",ex.getMessage()); + } + } +} diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/CustomThreadFactory.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/CustomThreadFactory.java new file mode 100644 index 0000000..8b58218 --- /dev/null +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/CustomThreadFactory.java @@ -0,0 +1,20 @@ +package org.jeecg.taskConfig.job; + +import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import java.util.concurrent.ThreadFactory; + +public class CustomThreadFactory extends CustomizableThreadFactory implements ThreadFactory { + + private final String threadNamePrefix; + + public CustomThreadFactory(String threadNamePrefix) { + this.threadNamePrefix = threadNamePrefix; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(runnable); + thread.setName(this.threadNamePrefix +"_"+ thread.getName()); + return thread; + } +} diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByDateColumnJob.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByDateColumnJob.java new file mode 100644 index 0000000..0baead2 --- /dev/null +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByDateColumnJob.java @@ -0,0 +1,103 @@ +package org.jeecg.taskConfig.job; + +import cn.hutool.core.date.StopWatch; +import org.apache.commons.lang3.StringUtils; +import org.jeecg.vo.DateRangeVO; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.time.Duration; +import java.util.Date; +import java.util.Objects; + +public class SyncDataByDateColumnJob extends AbstractSyncDataJob { + + + @Override + public void run() { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName(); + String columnName = super.syncStrategy.getColumnName(); + try{ + super.handleCollDBLink(); + super.handleConn(); + + super.saveSyncLog(String.format("开始同步表: %s (依据字段: %s)",tableName,columnName)); + //开始同步过程 + this.syncByDateColumnData(); + } catch (Exception e) { + super.uncaughtException(e); + } finally { + stopWatch.stop(); + super.saveSyncLog(String.format("表 %s 同步耗时: %s 秒",tableName,stopWatch.getTotalTimeSeconds())); + try { + if (null != super.sourceConn) { + super.sourceConn.close(); + } + if (null != super.targetConn) { + super.targetConn.close(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + /** + * 按日期范围同步数据 + */ + public void syncByDateColumnData() throws SQLException, ParseException { + String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName(); + //获取数据表时间范围 + DateRangeVO dateRange = getDateRange(); + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + String syncOrigin = super.syncStrategy.getSyncOrigin(); + //得到上次同步的最后时间 + Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate(); + //得到当前应该开始的时间 + Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ? + lastSyncedDate : dateRange.getMinDate(); + //得到本次应同步到的结束时间 + int syncCount = (Objects.nonNull(super.syncStrategy.getSyncCount()) && super.syncStrategy.getSyncCount()>0)?super.syncStrategy.getSyncCount():super.taskConfig.getSyncDay(); + Date currentEnd = new Date(currentStart.getTime() + Duration.ofDays(syncCount).toMillis()); + //得到拼接条件 + String whereClause = super.syncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) + + "', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')"; + + super.saveSyncLog(String.format("%s表同步日期范围: %s 至 %s (批次天数: %d)", + tableName, sdf.format(currentStart), sdf.format(currentEnd), super.taskConfig.getSyncDay())); + //确保如果上次同步失败时没有垃圾数据,同步前先走一下删除逻辑 + super.confirmDeletion(super.syncStrategy.getColumnName() + "> TO_DATE('"+ sdf.format(currentStart) +"', 'YYYY-MM-DD HH24:MI:SS')"); + //获取并保存数据 + int rowsSynced = syncDataBatch(whereClause); + //保存日志 + super.saveSyncLog(String.format("%s表已同步 %s 行数据",tableName, rowsSynced)); + //保存表批次同步数量 + super.saveSyncNum(rowsSynced); + //保存同步日志并修改同步位置 + super.syncStrategy.setSyncOrigin(sdf.format(currentEnd)); + super.syncStrategyMapper.updateById(super.syncStrategy); + super.saveSyncLog(String.format("%s表最终同步位置已更新为: %s",tableName, sdf.format(currentEnd))); + } + + /** + * 获取oracle表的日期范围 + */ + public DateRangeVO getDateRange() throws SQLException { + String sql = "SELECT MIN(" + super.syncStrategy.getColumnName() + ") as min_date, " + + "MAX(" + super.syncStrategy.getColumnName() + ") as max_date " + + "FROM \"" + super.syncStrategy.getSourceOwner().toUpperCase() + "\".\"" + super.syncStrategy.getTableName() + "\"" + super.sourceDBLink; + try (Statement stmt = super.sourceConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + if (rs.next()) { + return new DateRangeVO(rs.getTimestamp("min_date"), rs.getTimestamp("max_date")); + } + } + throw new SQLException("无法获取日期范围"); + } +} diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByIncrementColumnJob.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByIncrementColumnJob.java new file mode 100644 index 0000000..5bc574d --- /dev/null +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataByIncrementColumnJob.java @@ -0,0 +1,107 @@ +package org.jeecg.taskConfig.job; + +import cn.hutool.core.date.StopWatch; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.jeecg.vo.IdRangeVO; +import java.sql.*; +import java.util.Objects; + +@Slf4j +public class SyncDataByIncrementColumnJob extends AbstractSyncDataJob { + + /** + * 执行同步 + */ + @Override + public void run() { + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + + String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName(); + String columnName = super.syncStrategy.getColumnName(); + try{ + super.handleCollDBLink(); + super.handleConn(); + super.saveSyncLog(String.format("开始同步表: %s (依据字段: %s)",tableName,columnName)); + //开始同步过程 + this.syncByIncrementColumnData(); + } catch (Exception e) { + super.uncaughtException(e); + } finally { + stopWatch.stop(); + super.saveSyncLog(String.format("表 %s 同步耗时: %s 秒",tableName,stopWatch.getTotalTimeSeconds())); + try { + if (null != super.sourceConn) { + super.sourceConn.close(); + } + if (null != super.targetConn) { + super.targetConn.close(); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } + } + } + + /** + * 按ID范围同步数据 + */ + public void syncByIncrementColumnData() throws SQLException { + String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName(); + // 获取最小和最大ID + IdRangeVO idRange = this.getIncrementColumnRange(); + + // 获取上次同步的位置 + String syncOrigin = super.syncStrategy.getSyncOrigin(); + long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId(); + + long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ? + lastSyncedId + 1 : idRange.getMinId(); + + // 确保起始位置不超过结束位置 + if (currentStart > idRange.getMaxId()) { + super.saveSyncLog(String.format("%s表已同步到最新,无需继续同步",tableName)); + return; + } + //得到当前同步的结束条件 + //策略里配置的同步数量优先级高于任务中配置的同步数量 + int syncCount = (Objects.nonNull(super.syncStrategy.getSyncCount()) && super.syncStrategy.getSyncCount()>0)?super.syncStrategy.getSyncCount():super.taskConfig.getSyncCount(); + long currentEnd = currentStart + syncCount - 1; + //拼接条件 + String whereClause = super.syncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd; + //保存日志 + super.saveSyncLog(String.format("%s表同步ID范围: %s 至 %s",tableName, currentStart, currentEnd)); + //确保如果上次同步失败时没有垃圾数据,同步前先走一下删除逻辑 + super.confirmDeletion(super.syncStrategy.getColumnName() + ">=" + currentStart); + //查询并保存数据 + int rowsSynced = super.syncDataBatch(whereClause); + //保存日志 + super.saveSyncLog(String.format("%s表已同步 %s 行数据",tableName, rowsSynced)); + //保存表批次同步数量 + super.saveSyncNum(rowsSynced); + + // 更新同步位置 + if (currentEnd > 0) { + super.syncStrategy.setSyncOrigin(String.valueOf(currentEnd)); + super.syncStrategyMapper.updateById(syncStrategy); + super.saveSyncLog(String.format("%s表最终同步位置已更新为: %s",tableName, currentEnd)); + } + } + + /** + * 获取oracle表的ID范围 + */ + private IdRangeVO getIncrementColumnRange() throws SQLException { + String sql = "SELECT MIN(" + super.syncStrategy.getColumnName() + ") as min_id, " + + "MAX(" + super.syncStrategy.getColumnName() + ") as max_id " + + "FROM \"" + super.syncStrategy.getSourceOwner().toUpperCase() + "\".\"" + super.syncStrategy.getTableName() + "\"" + super.sourceDBLink; + try (Statement stmt = super.sourceConn.createStatement(); + ResultSet rs = stmt.executeQuery(sql)) { + if (rs.next()) { + return new IdRangeVO(rs.getLong("min_id"), rs.getLong("max_id")); + } + } + throw new SQLException("无法获取ID范围"); + } +} \ No newline at end of file diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java index 55259f5..0b140f1 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/job/SyncDataJob.java @@ -1,258 +1,186 @@ package org.jeecg.taskConfig.job; +import cn.hutool.core.date.DateUtil; +import cn.hutool.core.date.StopWatch; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import jakarta.annotation.Resource; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.jeecg.common.constant.enums.SourceDataTypeEnum; -import org.jeecg.common.exception.JeecgBootException; +import org.jeecg.common.properties.DataSyncProperties; import org.jeecg.common.util.DBUtil; +import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.mapper.*; -import org.jeecg.vo.IdRangeVO; -import org.quartz.Job; -import org.quartz.JobExecutionContext; +import org.quartz.*; import java.sql.*; import java.util.Date; import java.util.List; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * 数据同步任务(支持Oracle和PostgreSQL) */ @Slf4j +@DisallowConcurrentExecution public class SyncDataJob implements Job { @Resource - private StasTaskConfigMapper stasTaskConfigMapper; + private StasTaskConfigMapper taskConfigMapper; @Resource - private StasDataSourceMapper stasDataSourceMapper; + private StasDataSourceMapper dataSourceMapper; @Resource - private StasSyncStrategyMapper stasSyncStrategyMapper; + private StasSyncStrategyMapper syncStrategyMapper; @Resource - private StasSyncRecordMapper stasSyncRecordMapper; + private StasSyncRecordMapper syncRecordMapper; @Resource - private StasSyncLogMapper stasSyncLogMapper; + private StasSyncLogMapper syncLogMapper; @Resource - private StasSyncNumMapper stasSyncNumMapper; - + private StasSyncNumMapper syncNumMapper; + @Resource + private RedisUtil redisUtil; + @Resource + private DataSyncProperties dataSyncProperties; + private ThreadPoolExecutor threadPoolExecutor; @Getter @Setter private String parameter; @Override - public void execute(JobExecutionContext jobExecutionContext) { - try { - syncDataByFieldType(parameter); - } catch (SQLException e) { - log.error("执行数据同步任务出现错误,原因为:{}",e.getMessage()); + public void execute(JobExecutionContext context) { + Trigger trigger = context.getTrigger(); + if(trigger instanceof CronTrigger){ + Date nextFireTime = trigger.getNextFireTime(); + Date currentFireTime = context.getFireTime(); + if(nextFireTime != null && currentFireTime != null){ + Long intervalMs = nextFireTime.getTime() - currentFireTime.getTime(); + multiTaskSync(parameter,intervalMs,currentFireTime,nextFireTime); + } } } /** * 根据字段类型同步数据 */ - public void syncDataByFieldType(String taskId) throws SQLException { - StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId); - StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId()); - StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId()); - - StasSyncRecord stasSyncRecord = new StasSyncRecord(); - stasSyncRecord.setTaskId(taskId); - stasSyncRecord.setSourceId(stasTaskConfig.getSourceId()); - stasSyncRecord.setTargetId(stasTaskConfig.getTargetId()); - stasSyncRecord.setStartTime(new Date()); - stasSyncRecordMapper.insert(stasSyncRecord); - String recordId = stasSyncRecord.getId(); + public void multiTaskSync(String taskId,Long intervalMs,Date currentFireTime,Date nextFireTime){ + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + StasTaskConfig taskConfig = taskConfigMapper.selectById(taskId); + StasDataSource sourceInfo = dataSourceMapper.selectById(taskConfig.getSourceId()); + StasDataSource targetInfo = dataSourceMapper.selectById(taskConfig.getTargetId()); + log.info("任务:\"{}\",在{}时间批次开始执行,下次任务预计时间为:{}。",taskConfig.getTaskName(),DateUtil.format(currentFireTime,"yyyy-MM-dd HH:mm:ss"), + DateUtil.format(nextFireTime,"yyyy-MM-dd HH:mm:ss")); + //保存同步记录 + String recordId = this.saveSyncRecord(taskId,taskConfig.getSourceId(),taskConfig.getTargetId()); + //查询所有需要同步的数据表 + List stasSyncStrategies = syncStrategyMapper + .selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); + //获取源数据库连接,用于测试同步数据列 + String sourceDBLink = StrUtil.isNotBlank(sourceInfo.getDbLink())?sourceInfo.getDbLink():""; String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId()); - String targetUrl; - if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){ - targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); - }else{ - targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); - } - - try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),sourceInfo.getType()); - Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),targetInfo.getType())) { - - // 设置目标连接为批量提交模式 - targetConn.setAutoCommit(false); - - List stasSyncStrategies = stasSyncStrategyMapper - .selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); - - //处理DBlink - String sourceDBLink = StrUtil.isNotBlank(sourceInfo.getDbLink())?sourceInfo.getDbLink():""; - String targetDBLink = StrUtil.isNotBlank(targetInfo.getDbLink())?targetInfo.getDbLink():""; - for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { - saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName())); - long startTime = System.currentTimeMillis(); - - try { - syncByIdRange(sourceConn, targetConn, stasSyncStrategy, - stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId,sourceDBLink,targetDBLink); - } catch (SQLException e) { - saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage())); - targetConn.rollback(); - throw new SQLException(e.getMessage()); + try(Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(), sourceInfo.getType())){ + //循环启动线程开始同步 + this.initThreadPool(); + for (StasSyncStrategy syncStrategy : stasSyncStrategies) { + boolean dateColumn = this.isDateColumn(sourceConn, syncStrategy, sourceInfo.getType(), sourceDBLink); + AbstractSyncDataJob syncDataJob; + if (dateColumn) { + syncDataJob = new SyncDataByDateColumnJob(); + }else { + syncDataJob = new SyncDataByIncrementColumnJob(); } - long endTime = System.currentTimeMillis(); - saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime))); + syncDataJob.init(taskConfigMapper,dataSourceMapper,syncStrategyMapper,syncRecordMapper, + syncLogMapper,syncNumMapper,taskConfig,syncStrategy,sourceInfo,targetInfo,recordId); + //下面开始线程池执行,然后总体等待时间为,设置的时间间隔时长,超出即所有都停止 + threadPoolExecutor.execute(syncDataJob); + } + threadPoolExecutor.shutdown(); + boolean awaitResult = threadPoolExecutor.awaitTermination(intervalMs, TimeUnit.MILLISECONDS); + if(!awaitResult){ + this.closeThreadPool(recordId,taskConfig.getId()); + } + //修改同步记录 + this.updateSyncRecordById(recordId); + } catch (SQLException | InterruptedException e) { + String errLog = "执行数据同步任务出现错误,原因为:"+e.getMessage(); + log.error(errLog); + this.saveSyncLog(recordId,errLog,taskConfig.getId()); + }finally { + stopWatch.stop(); + log.info("任务:\"{}\",在{}时间批次执行完毕,耗时:{}秒。", taskConfig.getTaskName(),DateUtil.format(currentFireTime,"yyyy-MM-dd HH:mm:ss"),stopWatch.getTotalTimeSeconds()); + } + } + + /** + * 优雅关闭线程池 + * @throws InterruptedException + */ + private void closeThreadPool(String recordId,String taskId) throws InterruptedException { + //再次多等待2分钟,如果还未全部关闭,则强制停止 + if (!threadPoolExecutor.awaitTermination(120, TimeUnit.SECONDS)) { + List droppedTasks = threadPoolExecutor.shutdownNow(); + log.warn("超时!执行强制关闭,丢弃了 " + droppedTasks.size() + " 个未执行任务"); + if (!threadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) { + String errLog = "线程池无法正常终止,可能存在死锁或线程未响应中断"; + log.error(errLog); + this.saveSyncLog(recordId,errLog,taskId); } } - stasSyncRecord.setEndTime(new Date()); - stasSyncRecordMapper.updateById(stasSyncRecord); } /** - * 按ID范围同步数据 + * 判断字段是否为日期类型 */ - public void syncByIdRange(Connection sourceConn, Connection targetConn, - StasSyncStrategy stasSyncStrategy, Integer syncCount, - Integer sourceDbType, Integer targetDbType, String recordId, - String sourceDbLink, String targetDbLink) throws SQLException { - // 获取最小和最大ID - IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType,sourceDbLink); - - // 获取上次同步的位置 - String syncOrigin = stasSyncStrategy.getSyncOrigin(); - long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId(); - - long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ? - lastSyncedId + 1 : idRange.getMinId(); - - // 确保起始位置不超过结束位置 - if (currentStart > idRange.getMaxId()) { - saveSyncLog(recordId, String.format("%s表已同步到最新,无需继续同步", stasSyncStrategy.getTableName())); - return; - } - long currentEnd = currentStart + syncCount - 1; - - String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd; - - saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s", stasSyncStrategy.getTableName(), currentStart, currentEnd)); - - int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), - stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), - whereClause, sourceDbType, targetDbType,sourceDbLink,targetDbLink); - - saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced)); - saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced); - - // 更新同步位置 - if (currentEnd > 0) { - stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd)); - stasSyncStrategyMapper.updateById(stasSyncStrategy); - saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd)); - } - } - - /** - * 获取表的ID范围 - */ - public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType,String sourceDbLink) throws SQLException { + private boolean isDateColumn(Connection conn, StasSyncStrategy syncStrategy, Integer dbType, String dbLink) throws SQLException { String sql; if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) { - sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + - "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + - "FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + sourceDbLink; - } else { - sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " + - "MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " + - "FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\""; + sql = "SELECT data_type FROM all_tab_columns" + dbLink + " WHERE owner = ? AND table_name = ? AND column_name = ?"; + }else { + throw new SQLException("不支持的数据库类型: " + dbType); } + try (PreparedStatement pstmt = conn.prepareStatement(sql)) { + pstmt.setString(1, syncStrategy.getSourceOwner().toUpperCase()); + pstmt.setString(2, syncStrategy.getTableName().toUpperCase()); + pstmt.setString(3, syncStrategy.getColumnName().toUpperCase()); - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { + ResultSet rs = pstmt.executeQuery(); if (rs.next()) { - return new IdRangeVO(rs.getLong("min_id"), rs.getLong("max_id")); + String dataType = rs.getString("data_type").toUpperCase(); + return dataType.contains("DATE") || dataType.contains("TIME"); } } - throw new SQLException("无法获取ID范围"); + return false; } /** - * 同步一批数据(支持跨数据库类型) + * 保存同步记录 + * @param taskId 任务id + * @param sourceId 源数据库id + * @param targetId 目标数据库id + * @return */ - public int syncDataBatch(Connection sourceConn, Connection targetConn, - String sourceOwner, String targetOwner, - String tableName, String whereClause, - Integer sourceDbType, Integer targetDbType, - String sourceDbLink, String targetDbLink) throws SQLException { - int totalRows = 0; - String selectSql; + private String saveSyncRecord(String taskId,String sourceId,String targetId){ + StasSyncRecord stasSyncRecord = new StasSyncRecord(); + stasSyncRecord.setTaskId(taskId); + stasSyncRecord.setSourceId(sourceId); + stasSyncRecord.setTargetId(targetId); + stasSyncRecord.setStartTime(new Date()); + syncRecordMapper.insert(stasSyncRecord); + return stasSyncRecord.getId(); + } - // 构建查询SQL(根据源数据库类型) - if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) { - selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + - "\"" + sourceDbLink + " WHERE " + whereClause; - } else { - selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName + - "\" WHERE " + whereClause; - } - - try (Statement sourceStmt = sourceConn.createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - ResultSet rs = sourceStmt.executeQuery(selectSql)) { - - sourceStmt.setFetchSize(5000); // 优化大表读取 - ResultSetMetaData metaData = rs.getMetaData(); - int columnCount = metaData.getColumnCount(); - - // 构建插入SQL(根据目标数据库类型) - String insertSql; - if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) { - insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\"" + targetDbLink + " VALUES ("; - } else { - insertSql = "INSERT INTO \"" + tableName + "\" VALUES ("; - } - - for (int i = 1; i <= columnCount; i++) { - if (i > 1) insertSql += ", "; - insertSql += "?"; - } - insertSql += ")"; - - // 批量插入 - try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) { - int batchSize = 0; - - while (rs.next()) { - for (int i = 1; i <= columnCount; i++) { - Object value = rs.getObject(i); - - // 特殊处理Oracle的TIMESTAMP类型到PostgreSQL - if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) { - String columnType = metaData.getColumnTypeName(i); - if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) { - Timestamp ts = rs.getTimestamp(i); - if (ts != null) { - pstmt.setTimestamp(i, ts); - continue; - } - } - } - pstmt.setObject(i, value); - } - pstmt.addBatch(); - batchSize++; - totalRows++; - - if (batchSize % 5000 == 0) { - pstmt.executeBatch(); - targetConn.commit(); - batchSize = 0; - } - } - if (batchSize > 0) { - pstmt.executeBatch(); - targetConn.commit(); - } - } - } - return totalRows; + /** + * 修改同步记录 + * @param recordId + */ + private void updateSyncRecordById(String recordId){ + StasSyncRecord stasSyncRecord = syncRecordMapper.selectById(recordId); + stasSyncRecord.setEndTime(new Date()); + syncRecordMapper.updateById(stasSyncRecord); } /** @@ -260,25 +188,25 @@ public class SyncDataJob implements Job { * @param recordId * @param desc */ - private void saveSyncLog(String recordId, String desc){ + protected void saveSyncLog(String recordId, String desc,String taskId){ StasSyncLog stasSyncLog = new StasSyncLog(); stasSyncLog.setRecordId(recordId); stasSyncLog.setStartTime(new Date()); stasSyncLog.setDescription(desc); - stasSyncLogMapper.insert(stasSyncLog); + stasSyncLog.setTaskId(taskId); + syncLogMapper.insert(stasSyncLog); } /** - * 保存同步记录 - * @param recordId - * @param tableName - * @param num + * 初始化线程池 */ - private void saveSyncNum(String recordId, String tableName, Integer num){ - StasSyncNum stasSyncNum = new StasSyncNum(); - stasSyncNum.setRecordId(recordId); - stasSyncNum.setTableName(tableName); - stasSyncNum.setSyncNum(num); - stasSyncNumMapper.insert(stasSyncNum); + protected void initThreadPool(){ + //获取机器可用核心数 + int maximumPoolSize = Runtime.getRuntime().availableProcessors(); + int maxExecSize = Math.min(dataSyncProperties.getMaxExecNum(), maximumPoolSize); + String threadNamePrefix = "data_sync"; + int queueCapacity = maxExecSize * 2; + CustomThreadFactory threadFactory = new CustomThreadFactory(threadNamePrefix); + threadPoolExecutor = new ThreadPoolExecutor(maxExecSize,maximumPoolSize,5, TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),threadFactory); } } \ No newline at end of file diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java index bd1bd17..eccadbe 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/IStasTaskConfigService.java @@ -53,10 +53,4 @@ public interface IStasTaskConfigService extends IService { */ void executeQuartzJob(String quartzId); - /** - * 根据字段类型同步数据 - * @param taskId 任务id - * @throws SQLException 数据库异常 - */ - void syncDataByFieldType(String taskId) throws SQLException; } diff --git a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java index 49baa24..5e70f87 100644 --- a/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java +++ b/jeecg-module-sync/src/main/java/org/jeecg/taskConfig/service/impl/StasTaskConfigServiceImpl.java @@ -2,12 +2,10 @@ package org.jeecg.taskConfig.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.StringUtils; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.QuartzJobConstant; import org.jeecg.common.constant.enums.SyncTaskStatusEnum; import org.jeecg.common.exception.JeecgBootException; -import org.jeecg.common.util.DBUtil; import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.mapper.StasDataSourceMapper; import org.jeecg.modules.base.mapper.StasSyncStrategyMapper; @@ -19,17 +17,10 @@ import org.jeecg.syncNum.service.IStasSyncNumService; import org.jeecg.syncRecord.service.IStasSyncRecordService; import org.jeecg.taskConfig.service.IStasTaskConfigService; import org.jeecg.vo.*; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; - import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.springframework.transaction.annotation.Transactional; - -import java.sql.*; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.*; -import java.util.Date; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; @@ -134,6 +125,9 @@ public class StasTaskConfigServiceImpl extends ServiceImpl stasSyncStrategies = stasSyncStrategyMapper. - selectList(new LambdaQueryWrapper().eq(StasSyncStrategy::getTaskId, taskId)); - for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) { - System.out.println("开始同步表: " + stasSyncStrategy.getTableName() + " (依据字段: " + stasSyncStrategy.getColumnName() + ")"); - long startTime = System.currentTimeMillis(); - - try { - if (isDateColumn(sourceConn,stasSyncStrategy)) { - syncByDateRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncDay()); - } else { - syncByIdRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncCount()); - } - } catch (SQLException e) { - System.err.println("同步表 " + stasSyncStrategy.getTableName() + " 时出错: " + e.getMessage()); - targetConn.rollback(); - throw new JeecgBootException(e.getMessage()); - } catch (ParseException e) { - throw new JeecgBootException(e.getMessage()); - } - - long endTime = System.currentTimeMillis(); - System.out.println("表 " + stasSyncStrategy.getTableName() + " 同步耗时: " + (endTime - startTime) + " 毫秒"); - } - } - } - - public boolean isDateColumn(Connection sourceConn, StasSyncStrategy stasSyncStrategy) throws SQLException { - String sql = "SELECT data_type FROM all_tab_columns " - + "WHERE owner = ? AND table_name = ? AND column_name = ?"; - - try (PreparedStatement pstmt = sourceConn.prepareStatement(sql)) { - pstmt.setString(1, stasSyncStrategy.getSourceOwner().toUpperCase()); - pstmt.setString(2, stasSyncStrategy.getTableName().toUpperCase()); - pstmt.setString(3, stasSyncStrategy.getColumnName().toUpperCase()); - - ResultSet rs = pstmt.executeQuery(); - if (rs.next()) { - String dataType = rs.getString("data_type"); - // 判断是否为日期/时间类型 - return dataType != null && - (dataType.equalsIgnoreCase("DATE") || - dataType.equalsIgnoreCase("TIMESTAMP") || - dataType.equalsIgnoreCase("TIMESTAMP WITH TIME ZONE") || - dataType.equalsIgnoreCase("TIMESTAMP WITH LOCAL TIME ZONE")); - } - } catch (SQLException e) { - e.printStackTrace(); - throw e; // 重新抛出异常,让调用方处理 - } - - return false; - } - - /** - * 按日期范围同步数据 - * @param sourceConn 源数据库连接 - * @param targetConn 目标数据库连接 - * @param stasSyncStrategy 表配置 - * @param syncCount 同步起始位置 - * @throws SQLException 数据库异常 - */ - public void syncByDateRange(Connection sourceConn, Connection targetConn, - StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException, ParseException { - // 获取最小和最大日期 - DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy); - System.out.println("日期范围: " + dateRange.getMinDate() + " 至 " + dateRange.getMaxDate()); - - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - - // 获取上次同步的位置 - String syncOrigin = stasSyncStrategy.getSyncOrigin(); - Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(stasSyncStrategy.getSyncOrigin()) : dateRange.getMinDate(); - Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ? - lastSyncedDate : dateRange.getMinDate(); - - Date currentEnd = addDays(currentStart, syncCount); - if (currentEnd.after(dateRange.getMaxDate())) { - currentEnd = dateRange.getMaxDate(); - } - - String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) + - "', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')"; - - System.out.println("同步日期范围: " + sdf.format(currentStart) + " 至 " + sdf.format(currentEnd)); - - int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), - stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause); - System.out.println("已同步 " + rowsSynced + " 行数据"); - - // 只在最后记录同步位置 - if (currentEnd != null) { - stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd)); - stasSyncStrategyMapper.updateById(stasSyncStrategy); - System.out.println("最终同步位置已更新为: " + sdf.format(currentEnd)); - } - } - - /** - * 按ID范围同步数据 - * @param sourceConn 源数据库连接 - * @param targetConn 目标数据库连接 - * @param stasSyncStrategy 表配置 - * @param syncCount 每次同步的记录数 - * @throws SQLException 数据库异常 - */ - public void syncByIdRange(Connection sourceConn, Connection targetConn, - StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException { - // 获取最小和最大ID - IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy); - System.out.println("ID范围: " + idRange.getMinId() + " 至 " + idRange.getMaxId()); - - // 获取上次同步的位置 - String syncOrigin = stasSyncStrategy.getSyncOrigin(); - long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId(); - long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ? - lastSyncedId : idRange.getMinId(); - - long currentEnd = currentStart + syncCount; - if (currentEnd > idRange.getMaxId()) { - currentEnd = idRange.getMaxId(); - } - - String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd; - - System.out.println("同步ID范围: " + currentStart + " 至 " + currentEnd); - - int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(), - stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause); - System.out.println("已同步 " + rowsSynced + " 行数据"); - - // 只在最后记录同步位置 - if (currentEnd > 0) { - stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd)); - stasSyncStrategyMapper.updateById(stasSyncStrategy); - System.out.println("最终同步位置已更新为: " + currentEnd); - } - } - - /** - * 获取表的日期范围 - * @param conn 数据库连接 - * @param stasSyncStrategy 表配置 - * @return 日期范围对象 - * @throws SQLException 数据库异常 - */ - public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException { - String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, MAX(" + stasSyncStrategy.getColumnName() + ") as max_date FROM " + - stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName(); - - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - if (rs.next()) { - return new DateRangeVO(rs.getDate("min_date"), rs.getDate("max_date")); - } - } - throw new SQLException("无法获取日期范围"); - } - - /** - * 获取表的ID范围 - * @param conn 数据库连接 - * @param stasSyncStrategy 表配置 - * @return ID范围对象 - * @throws SQLException 数据库异常 - */ - public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException { - String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, MAX(" + stasSyncStrategy.getColumnName() + ") as max_id FROM " + - stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName(); - - try (Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery(sql)) { - if (rs.next()) { - return new IdRangeVO(rs.getLong("min_id"), rs.getLong("max_id")); - } - } - throw new SQLException("无法获取ID范围"); - } - - /** - * 同步一批数据 - * @param sourceConn 源数据库连接 - * @param targetConn 目标数据库连接 - * @param tableName 表名 - * @param whereClause 条件子句 - * @return 同步的行数 - * @throws SQLException 数据库异常 - */ - public int syncDataBatch(Connection sourceConn, Connection targetConn, - String sourceOwner, String targetOwner, - String tableName, String whereClause) throws SQLException { - int totalRows = 0; - - // 从源表读取数据(使用带引号的表名保持大小写) - String selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName + - "\" WHERE " + whereClause; - - try (Statement sourceStmt = sourceConn.createStatement( - ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); - ResultSet rs = sourceStmt.executeQuery(selectSql)) { - - // 设置获取大小为5000,优化大表读取 - sourceStmt.setFetchSize(5000); - - // 获取列信息 - ResultSetMetaData metaData = rs.getMetaData(); - int columnCount = metaData.getColumnCount(); - - // 准备插入语句(使用带引号的表名保持大小写) - StringBuilder insertSql = new StringBuilder("INSERT INTO \"") - .append(targetOwner.toUpperCase()).append("\".\"").append(tableName).append("\" VALUES ("); - for (int i = 1; i <= columnCount; i++) { - if (i > 1) insertSql.append(", "); - insertSql.append("?"); - } - insertSql.append(")"); - - // 批量插入 - try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql.toString())) { - int batchSize = 0; - - while (rs.next()) { - for (int i = 1; i <= columnCount; i++) { - pstmt.setObject(i, rs.getObject(i)); - } - pstmt.addBatch(); - batchSize++; - totalRows++; - - if (batchSize % 5000 == 0) { - pstmt.executeBatch(); - targetConn.commit(); - batchSize = 0; - } - } - if (batchSize > 0) { - pstmt.executeBatch(); - targetConn.commit(); - } - } - } - return totalRows; - } - - /** - * 计算指定日期加上指定天数后的日期 - * @param date 基准日期 - * @param days 要添加的天数 - * @return 计算后的日期 - */ - private static Date addDays(Date date, int days) { - long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000; - return new Date(time); - } }