fix:1.完成重构数据同步后台功能

This commit is contained in:
panbaolin 2026-05-28 14:37:50 +08:00
parent db391d7519
commit 7d2846af18
15 changed files with 661 additions and 632 deletions

View File

@ -316,4 +316,6 @@ public interface CommonConstant {
* 输运模拟时序分析数据KEY
*/
String TRANSPORT_TIMING_ANALYSIS = "transport:timing_analysis:";
String DATA_SYNC_FREQUENCY = "data_sync:frequency";
}

View File

@ -0,0 +1,23 @@
package org.jeecg.common.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* 数据同步配置
*/
@Data
@Component
@ConfigurationProperties(prefix = "data-sync")
public class DataSyncProperties {
/**
* 最小间隔分钟
*/
private Integer minMinute;
/**
* 同时最大执行数据表数量
*/
private Integer maxExecNum;
}

View File

@ -55,4 +55,8 @@ public class StasSyncStrategy implements Serializable {
private String columnName;
/**同步位置*/
private String syncOrigin;
/**
* 同步数量/天数优先级高于同步任务的数据
*/
private Integer syncCount;
}

View File

@ -1,6 +1,7 @@
package org.jeecg.dataSource.controller;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
@ -114,7 +115,7 @@ public class StasDataSourceController extends JeecgController<StasDataSource, IS
@GetMapping(value = "/tableList")
public Result<?> tableList(String sourceId, String username) {
List<String> tableNameList= stasDataSourceService.queryTableList(sourceId, username);
if (tableNameList != null && !"".equals(tableNameList)){
if (CollUtil.isNotEmpty(tableNameList)){
return Result.OK(tableNameList);
}
return Result.error("获取源端用户为空");

View File

@ -137,7 +137,8 @@ public class QuartzJobServiceImpl extends ServiceImpl<QuartzJobMapper, QuartzJob
JobDetail jobDetail = JobBuilder.newJob(getClass(jobClassName).getClass()).withIdentity(id).usingJobData("parameter", parameter).build();
// 表达式调度构建器(即任务执行的时间)
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression);
//withMisfireHandlingInstructionDoNothing 设置错过就放弃等待下次触发
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
// 按新的cronExpression表达式构建一个新的trigger
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(id).withSchedule(scheduleBuilder).build();

View File

@ -41,7 +41,7 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
* @param stasSyncStrategys 同步策略信息
* @throws SQLException 数据库异常
*/
@Transactional
@Transactional(rollbackFor = Exception.class)
@Override
public void createTargetTables(List<StasSyncStrategy> stasSyncStrategys) {
if(null != stasSyncStrategys && stasSyncStrategys.size() > 0) {
@ -144,7 +144,7 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) {
// Oracle转PostgreSQL的特殊处理
return generatePgCreateTableFromOracle(conn, sourceOwner, targetOwner, table);
return generatePgCreateTableFromOracle(conn, sourceOwner,sourceDBLink, targetOwner, table);
} else {
return generateOracleCreateTable(conn, sourceOwner,sourceDBLink, targetOwner, table);
}
@ -169,10 +169,8 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
try (PreparedStatement pstmt = conn.prepareStatement(columnSql)) {
pstmt.setString(1, sourceOwner.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
ResultSet rs = pstmt.executeQuery();
boolean first = true;
while (rs.next()) {
if (!first) {
ddl.append(",\n");
@ -189,13 +187,11 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
if ("N".equalsIgnoreCase(nullable)) {
isNullable = " NOT NULL";
}
// 列名也用双引号括起来
ddl.append(" \"").append(columnName).append("\" ").append(dataType);
// 处理数据类型长度/精度
if (dataPrecision > 0) {
// 数字类型NUMBER(p,s), NUMERIC(p,s)
ddl.append("(").append(dataPrecision);
if (dataScale > 0) {
ddl.append(",").append(dataScale);
@ -218,9 +214,10 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
// 这些类型不需要长度/精度信息
} else {
// 其他可能需要长度的类型
ddl.append("(").append(dataLength).append(")");
if(!dataType.equalsIgnoreCase("NUMBER") && !dataType.equalsIgnoreCase("NUMERIC")) {
ddl.append("(").append(dataLength).append(")");
}
}
ddl.append(isNullable);
first = false;
}
@ -244,7 +241,6 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
// 移除末尾的分号只保留右括号
ddl.append("\n)");
return ddl.toString();
} catch (SQLException e) {
throw new RuntimeException("生成表DDL失败: " + e.getMessage(), e);
@ -254,19 +250,18 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
/**
* 生成从Oracle到PostgreSQL的建表语句
*/
private String generatePgCreateTableFromOracle(Connection conn, String sourceOwner,
private String generatePgCreateTableFromOracle(Connection conn, String sourceOwner,String sourceDBLink,
String targetOwner, String tableName) throws SQLException {
StringBuilder sqlBuilder = new StringBuilder();
//模式名称
sqlBuilder.append("CREATE TABLE \"").append(targetOwner).append("\".\"").append(tableName).append("\" (\n");
// sqlBuilder.append("CREATE TABLE \"").append(tableName).append("\" (\n");
// 获取列信息
String columnSql = "SELECT column_name, data_type, data_length, data_precision, data_scale, nullable " +
"FROM all_tab_columns " +
"WHERE owner = ? AND table_name = ? " +
"ORDER BY column_id";
sqlBuilder.append("create table \"").append(targetOwner.toLowerCase()).append("\".\"").append(tableName.toLowerCase()).append("\" (\n");
// 获取oracle列信息
String columnSql = "SELECT column_name, data_type, data_length, data_precision, data_scale, nullable" +
" FROM all_tab_columns"+sourceDBLink+
" WHERE owner = ? AND table_name = ?" +
" ORDER BY column_id";
System.out.println(columnSql);
try (PreparedStatement pstmt = conn.prepareStatement(columnSql)) {
pstmt.setString(1, sourceOwner.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
@ -287,47 +282,14 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
int dataScale = rs.getInt("data_scale");
String nullable = rs.getString("nullable");
sqlBuilder.append(" \"").append(columnName).append("\" ");
sqlBuilder.append(mapOracleTypeToPg(dataType, dataLength, dataPrecision, dataScale));
sqlBuilder.append(" \"").append(columnName.toLowerCase()).append("\" ");
sqlBuilder.append(this.mapOracleTypeToPg(dataType, dataLength, dataPrecision, dataScale));
if ("N".equalsIgnoreCase(nullable)) {
sqlBuilder.append(" NOT NULL");
}
}
}
// 获取主键信息
String pkSql = "SELECT cols.column_name " +
"FROM all_constraints cons, all_cons_columns cols " +
"WHERE cons.owner = ? AND cons.table_name = ? " +
"AND cons.constraint_type = 'P' " +
"AND cons.constraint_name = cols.constraint_name " +
"AND cons.owner = cols.owner " +
"ORDER BY cols.position";
try (PreparedStatement pstmt = conn.prepareStatement(pkSql)) {
pstmt.setString(1, sourceOwner.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
ResultSet rs = pstmt.executeQuery();
List<String> pkColumns = new ArrayList<>();
while (rs.next()) {
pkColumns.add(rs.getString("column_name"));
}
if (!pkColumns.isEmpty()) {
sqlBuilder.append(",\n PRIMARY KEY (");
for (int i = 0; i < pkColumns.size(); i++) {
if (i > 0) {
sqlBuilder.append(", ");
}
sqlBuilder.append("\"").append(pkColumns.get(i)).append("\"");
}
sqlBuilder.append(")");
}
}
sqlBuilder.append("\n)");
return sqlBuilder.toString();
}
@ -336,41 +298,35 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
* Oracle数据类型到PostgreSQL的映射
*/
private String mapOracleTypeToPg(String oracleType, int length, int precision, int scale) {
switch (oracleType.toUpperCase()) {
switch (oracleType) {
case "VARCHAR2":
return "VARCHAR(" + (length > 0 ? length : 255) + ")";
return "varchar(" + (length > 0 ? length : 255) + ")";
case "NVARCHAR2":
return "VARCHAR(" + (length > 0 ? length : 255) + ")";
return "varchar(" + (length > 0 ? length : 255) + ")";
case "CHAR":
return "CHAR(" + (length > 0 ? length : 1) + ")";
return "char(" + (length > 0 ? length : 1) + ")";
case "NCHAR":
return "CHAR(" + (length > 0 ? length : 1) + ")";
return "char(" + (length > 0 ? length : 1) + ")";
case "NUMBER":
if (precision == 0 && scale == 0) {
return "NUMERIC"; // 未指定精度的NUMBER
} else if (scale == 0) {
return "NUMERIC(" + precision + ")"; // 整数
} else {
return "NUMERIC(" + precision + "," + scale + ")"; // 小数
}
return "numeric";
case "FLOAT":
return "DOUBLE PRECISION";
return "float4";
case "DATE":
return "TIMESTAMP";
case "TIMESTAMP":
return "TIMESTAMP";
return "timestamp";
case "CLOB":
return "TEXT";
return "text";
case "BLOB":
return "BYTEA";
return "bytea";
case "RAW":
return "BYTEA";
return "bytea";
case "LONG":
return "TEXT";
return "text";
case "LONG RAW":
return "BYTEA";
return "bytea";
default:
return "TEXT"; // 默认转换为TEXT
return "text"; // 默认转换为TEXT
}
}
@ -389,55 +345,20 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
dbLink = StrUtil.isNotBlank(dbLink)?dbLink:"";
sql = "SELECT COUNT(*) FROM all_tables"+dbLink+" WHERE owner = ? AND (table_name = ? OR table_name = ?)";
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
sql = "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?";
sql = "select count(*) from information_schema.tables where table_schema = ? and table_name = ?";
} else {
throw new SQLException("不支持的数据库类型: " + dbType);
}
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
pstmt.setString(1, schema.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
pstmt.setString(3, tableName); // Oracle需要检查大小写两种形式
}else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
pstmt.setString(1, tableName.toUpperCase());
}else {
pstmt.setString(1, schema.toLowerCase());
pstmt.setString(2, tableName.toLowerCase());
}
try (ResultSet rs = pstmt.executeQuery()) {
return rs.next() && rs.getInt(1) > 0;
}
}
}
/**
* 检查列是否存在
* @param conn 数据库连接
* @param schema 模式名
* @param tableName 表名
* @param columnName 列名
* @param dbType 数据库类型
* @return 列是否存在
* @throws SQLException 数据库异常
*/
private boolean isColumnExists(Connection conn, String schema,String dbLink, String tableName,
String columnName, Integer dbType) throws SQLException {
String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
dbLink = StrUtil.isNotBlank(dbLink)?dbLink:"";
sql = "SELECT COUNT(*) FROM all_tab_columns"+dbLink+" WHERE owner = ? AND table_name = ? AND column_name = ?";
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
sql = "SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = ? AND table_name = ? AND column_name = ?";
} else {
throw new SQLException("不支持的数据库类型: " + dbType);
}
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, schema.toUpperCase());
pstmt.setString(2, tableName.toUpperCase());
pstmt.setString(3, columnName.toUpperCase());
try (ResultSet rs = pstmt.executeQuery()) {
return rs.next() && rs.getInt(1) > 0;
}

View File

@ -36,7 +36,7 @@ public class StasSyncLogController extends JeecgController<StasSyncLog, IStasSyn
* 分页列表查询
*
* @param stasSyncLog
* @param pageNo
* @param pageNum
* @param pageSize
* @param req
* @return
@ -44,12 +44,12 @@ public class StasSyncLogController extends JeecgController<StasSyncLog, IStasSyn
@Operation(summary = "同步日志信息-分页列表查询")
@GetMapping(value = "/list")
public Result<IPage<StasSyncLog>> queryPageList(StasSyncLog stasSyncLog,
@RequestParam(name="pageNo", defaultValue="1") Integer pageNo,
@RequestParam(name="pageNum", defaultValue="1") Integer pageNum,
@RequestParam(name="pageSize", defaultValue="10") Integer pageSize,
HttpServletRequest req) {
QueryWrapper<StasSyncLog> queryWrapper = QueryGenerator.initQueryWrapper(stasSyncLog, req.getParameterMap());
queryWrapper.orderByAsc("start_time");
Page<StasSyncLog> page = new Page<StasSyncLog>(pageNo, pageSize);
Page<StasSyncLog> page = new Page<StasSyncLog>(pageNum, pageSize);
IPage<StasSyncLog> pageList = stasSyncLogService.page(page, queryWrapper);
return Result.OK(pageList);
}

View File

@ -1,21 +1,15 @@
package org.jeecg.taskConfig.controller;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.servlet.http.HttpServletRequest;
import org.apache.commons.lang.StringUtils;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.enums.SyncTaskStatusEnum;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.dataSource.service.IStasDataSourceService;
import org.jeecg.modules.base.entity.StasDataSource;
import org.jeecg.modules.base.entity.StasSyncStrategy;
import org.jeecg.modules.base.entity.StasTaskConfig;
import org.jeecg.stasSyncStrategy.service.IStasSyncStrategyService;
import org.jeecg.taskConfig.service.IStasTaskConfigService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
@ -178,19 +172,6 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
return Result.OK("执行成功!");
}
/**
* 根据字段类型同步数据
*
* @param taskId
* @return
*/
@AutoLog(value = "任务配置表-根据字段类型同步数据")
@Operation(summary="任务配置表-根据字段类型同步数据")
@GetMapping(value = "/syncDataByFieldType")
public void syncDataByFieldType(String taskId) throws SQLException {
stasTaskConfigService.syncDataByFieldType(taskId);
}
/**
* 通过id查询
*

View File

@ -0,0 +1,223 @@
package org.jeecg.taskConfig.job;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.mapper.*;
import java.sql.*;
import java.util.Date;
import java.util.Objects;
@Slf4j
public abstract class AbstractSyncDataJob implements Runnable{
protected StasTaskConfigMapper taskConfigMapper;
protected StasDataSourceMapper dataSourceMapper;
protected StasSyncStrategyMapper syncStrategyMapper;
protected StasSyncRecordMapper syncRecordMapper;
protected StasSyncLogMapper syncLogMapper;
protected StasSyncNumMapper syncNumMapper;
protected StasTaskConfig taskConfig;
protected StasSyncStrategy syncStrategy;
protected StasDataSource sourceInfo;
protected StasDataSource targetInfo;
protected String recordId;
protected String sourceDBLink;
protected String targetDBLink;
protected Connection sourceConn;
protected Connection targetConn;
protected void init(StasTaskConfigMapper taskConfigMapper,
StasDataSourceMapper dataSourceMapper,
StasSyncStrategyMapper syncStrategyMapper,
StasSyncRecordMapper syncRecordMapper,
StasSyncLogMapper syncLogMapper,
StasSyncNumMapper syncNumMapper,
StasTaskConfig taskConfig,
StasSyncStrategy syncStrategy,
StasDataSource sourceInfo,
StasDataSource targetInfo,
String recordId) {
this.taskConfigMapper = taskConfigMapper;
this.dataSourceMapper = dataSourceMapper;
this.syncStrategyMapper = syncStrategyMapper;
this.syncRecordMapper = syncRecordMapper;
this.syncLogMapper = syncLogMapper;
this.syncNumMapper = syncNumMapper;
this.taskConfig = taskConfig;
this.syncStrategy = syncStrategy;
this.sourceInfo = sourceInfo;
this.targetInfo = targetInfo;
this.recordId = recordId;
}
/**
* 处理数据库查询dblink
*/
protected void handleCollDBLink(){
this.sourceDBLink = StrUtil.isNotBlank(sourceInfo.getDbLink())?sourceInfo.getDbLink():"";
this.targetDBLink = StrUtil.isNotBlank(targetInfo.getDbLink())?targetInfo.getDbLink():"";
}
/**
* 处理数据库链接
* @throws SQLException
*/
protected void handleConn() throws SQLException {
//只有oracle到oracleoracle到pg数据库同步需求所以sourceUrl只需要处理oracle的url
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl;
if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}else{
targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}
this.sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),sourceInfo.getType());
this.targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),targetInfo.getType());
// 设置目标连接为手动提交模式
if(Objects.nonNull(this.targetConn)){
this.targetConn.setAutoCommit(false);
}
}
/**
* 同步一批数据(支持Oracle和PG)
*/
protected int syncDataBatch(String whereClause) throws SQLException {
int totalRows = 0;
String selectSql;
// 构建查询SQL(根据源数据库类型)
if (SourceDataTypeEnum.ORACLE.getKey().equals(this.sourceInfo.getType())) {
selectSql = "SELECT * FROM \"" + this.syncStrategy.getSourceOwner().toUpperCase() + "\".\"" + this.syncStrategy.getTableName() +
"\"" + this.sourceDBLink + " WHERE " + whereClause;
} else {
selectSql = "select * from \"" + this.syncStrategy.getSourceOwner().toLowerCase() + "\".\"" + this.syncStrategy.getTableName() +
"\" where " + whereClause;
}
try (Statement sourceStmt = this.sourceConn.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = sourceStmt.executeQuery(selectSql)) {
sourceStmt.setFetchSize(5000); // 优化大表读取
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// 构建插入SQL(根据目标数据库类型)
String insertSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(this.targetInfo.getType())) {
insertSql = "INSERT INTO \"" + this.syncStrategy.getTargetOwner().toUpperCase() + "\".\"" + this.syncStrategy.getTableName() + "\"" + this.targetDBLink + " VALUES (";
} else {
insertSql = "insert into \"" + this.syncStrategy.getTargetOwner().toLowerCase() + "\".\"" + this.syncStrategy.getTableName().toLowerCase() + "\" values (";
}
for (int i = 1; i <= columnCount; i++) {
if (i > 1) insertSql += ", ";
insertSql += "?";
}
insertSql += ")";
// 批量插入
try (PreparedStatement pstmt = this.targetConn.prepareStatement(insertSql)) {
int batchSize = 0;
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i);
// 特殊处理Oracle的TIMESTAMP类型到PostgreSQL
if (SourceDataTypeEnum.ORACLE.getKey().equals(this.sourceInfo.getType()) && SourceDataTypeEnum.POSTGRES.getKey().equals(this.targetInfo.getType())) {
String columnType = metaData.getColumnTypeName(i);
if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) {
Timestamp ts = rs.getTimestamp(i);
if (ts != null) {
pstmt.setTimestamp(i, ts);
continue;
}
}
}
pstmt.setObject(i, value);
}
pstmt.addBatch();
batchSize++;
totalRows++;
if (batchSize % 5000 == 0) {
pstmt.executeBatch();
this.targetConn.commit();
batchSize = 0;
}
}
if (batchSize > 0) {
pstmt.executeBatch();
this.targetConn.commit();
}
}
}
return totalRows;
}
/**
* 同步数据前先走一次删除操作防止上次回滚失败本次插入历史数据
* @param whereClause
* @throws SQLException
*/
protected void confirmDeletion(String whereClause) throws SQLException {
String delSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(this.sourceInfo.getType())) {
delSql = "DELETE FROM \"" + this.syncStrategy.getTargetOwner().toUpperCase() + "\".\"" + this.syncStrategy.getTableName().toUpperCase() +
"\"" + this.targetDBLink + " WHERE " + whereClause;
} else {
delSql = "delete from \"" + this.syncStrategy.getTargetOwner().toLowerCase() + "\".\"" + this.syncStrategy.getTableName().toLowerCase() +
"\" where " + whereClause;
}
try (PreparedStatement pstmt = this.targetConn.prepareStatement(delSql)) {
pstmt.execute();
this.targetConn.commit();
}
}
/**
* 保存同步日志
* @param desc
*/
protected void saveSyncLog(String desc){
StasSyncLog stasSyncLog = new StasSyncLog();
stasSyncLog.setRecordId(this.recordId);
stasSyncLog.setStartTime(new Date());
stasSyncLog.setDescription(desc.length()>300?desc.substring(0,300):desc);
stasSyncLog.setTaskId(this.taskConfig.getId());
syncLogMapper.insert(stasSyncLog);
}
/**
* 保存表批次同步数量
* @param num
*/
protected void saveSyncNum(Integer num){
StasSyncNum stasSyncNum = new StasSyncNum();
stasSyncNum.setRecordId(this.recordId);
stasSyncNum.setTableName(this.syncStrategy.getTableName());
stasSyncNum.setSyncNum(num);
stasSyncNum.setTaskId(this.taskConfig.getId());
this.syncNumMapper.insert(stasSyncNum);
}
/**
* 处理同步过程报错问题
*
* @param e the exception
*/
public void uncaughtException(Throwable e) {
String tableName = this.syncStrategy.getSourceOwner()+"."+this.syncStrategy.getTableName();
String errLog = String.format("%s表数据同步出现错误原因为%s",tableName, e.getMessage());
log.error(errLog);
saveSyncLog(errLog);
try {
targetConn.rollback();
} catch (SQLException ex) {
log.error("数据回滚出现错误,原因为:{}",ex.getMessage());
}
}
}

View File

@ -0,0 +1,20 @@
package org.jeecg.taskConfig.job;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import java.util.concurrent.ThreadFactory;
public class CustomThreadFactory extends CustomizableThreadFactory implements ThreadFactory {
private final String threadNamePrefix;
public CustomThreadFactory(String threadNamePrefix) {
this.threadNamePrefix = threadNamePrefix;
}
@Override
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable);
thread.setName(this.threadNamePrefix +"_"+ thread.getName());
return thread;
}
}

View File

@ -0,0 +1,103 @@
package org.jeecg.taskConfig.job;
import cn.hutool.core.date.StopWatch;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.vo.DateRangeVO;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
import java.util.Objects;
public class SyncDataByDateColumnJob extends AbstractSyncDataJob {
@Override
public void run() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName();
String columnName = super.syncStrategy.getColumnName();
try{
super.handleCollDBLink();
super.handleConn();
super.saveSyncLog(String.format("开始同步表: %s (依据字段: %s)",tableName,columnName));
//开始同步过程
this.syncByDateColumnData();
} catch (Exception e) {
super.uncaughtException(e);
} finally {
stopWatch.stop();
super.saveSyncLog(String.format("表 %s 同步耗时: %s 秒",tableName,stopWatch.getTotalTimeSeconds()));
try {
if (null != super.sourceConn) {
super.sourceConn.close();
}
if (null != super.targetConn) {
super.targetConn.close();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
/**
* 按日期范围同步数据
*/
public void syncByDateColumnData() throws SQLException, ParseException {
String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName();
//获取数据表时间范围
DateRangeVO dateRange = getDateRange();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String syncOrigin = super.syncStrategy.getSyncOrigin();
//得到上次同步的最后时间
Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate();
//得到当前应该开始的时间
Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ?
lastSyncedDate : dateRange.getMinDate();
//得到本次应同步到的结束时间
int syncCount = (Objects.nonNull(super.syncStrategy.getSyncCount()) && super.syncStrategy.getSyncCount()>0)?super.syncStrategy.getSyncCount():super.taskConfig.getSyncDay();
Date currentEnd = new Date(currentStart.getTime() + Duration.ofDays(syncCount).toMillis());
//得到拼接条件
String whereClause = super.syncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
"', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')";
super.saveSyncLog(String.format("%s表同步日期范围: %s 至 %s (批次天数: %d)",
tableName, sdf.format(currentStart), sdf.format(currentEnd), super.taskConfig.getSyncDay()));
//确保如果上次同步失败时没有垃圾数据同步前先走一下删除逻辑
super.confirmDeletion(super.syncStrategy.getColumnName() + "> TO_DATE('"+ sdf.format(currentStart) +"', 'YYYY-MM-DD HH24:MI:SS')");
//获取并保存数据
int rowsSynced = syncDataBatch(whereClause);
//保存日志
super.saveSyncLog(String.format("%s表已同步 %s 行数据",tableName, rowsSynced));
//保存表批次同步数量
super.saveSyncNum(rowsSynced);
//保存同步日志并修改同步位置
super.syncStrategy.setSyncOrigin(sdf.format(currentEnd));
super.syncStrategyMapper.updateById(super.syncStrategy);
super.saveSyncLog(String.format("%s表最终同步位置已更新为: %s",tableName, sdf.format(currentEnd)));
}
/**
* 获取oracle表的日期范围
*/
public DateRangeVO getDateRange() throws SQLException {
String sql = "SELECT MIN(" + super.syncStrategy.getColumnName() + ") as min_date, " +
"MAX(" + super.syncStrategy.getColumnName() + ") as max_date " +
"FROM \"" + super.syncStrategy.getSourceOwner().toUpperCase() + "\".\"" + super.syncStrategy.getTableName() + "\"" + super.sourceDBLink;
try (Statement stmt = super.sourceConn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return new DateRangeVO(rs.getTimestamp("min_date"), rs.getTimestamp("max_date"));
}
}
throw new SQLException("无法获取日期范围");
}
}

View File

@ -0,0 +1,107 @@
package org.jeecg.taskConfig.job;
import cn.hutool.core.date.StopWatch;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.vo.IdRangeVO;
import java.sql.*;
import java.util.Objects;
@Slf4j
public class SyncDataByIncrementColumnJob extends AbstractSyncDataJob {
/**
* 执行同步
*/
@Override
public void run() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName();
String columnName = super.syncStrategy.getColumnName();
try{
super.handleCollDBLink();
super.handleConn();
super.saveSyncLog(String.format("开始同步表: %s (依据字段: %s)",tableName,columnName));
//开始同步过程
this.syncByIncrementColumnData();
} catch (Exception e) {
super.uncaughtException(e);
} finally {
stopWatch.stop();
super.saveSyncLog(String.format("表 %s 同步耗时: %s 秒",tableName,stopWatch.getTotalTimeSeconds()));
try {
if (null != super.sourceConn) {
super.sourceConn.close();
}
if (null != super.targetConn) {
super.targetConn.close();
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
}
/**
* 按ID范围同步数据
*/
public void syncByIncrementColumnData() throws SQLException {
String tableName = super.syncStrategy.getSourceOwner()+"."+super.syncStrategy.getTableName();
// 获取最小和最大ID
IdRangeVO idRange = this.getIncrementColumnRange();
// 获取上次同步的位置
String syncOrigin = super.syncStrategy.getSyncOrigin();
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ?
lastSyncedId + 1 : idRange.getMinId();
// 确保起始位置不超过结束位置
if (currentStart > idRange.getMaxId()) {
super.saveSyncLog(String.format("%s表已同步到最新无需继续同步",tableName));
return;
}
//得到当前同步的结束条件
//策略里配置的同步数量优先级高于任务中配置的同步数量
int syncCount = (Objects.nonNull(super.syncStrategy.getSyncCount()) && super.syncStrategy.getSyncCount()>0)?super.syncStrategy.getSyncCount():super.taskConfig.getSyncCount();
long currentEnd = currentStart + syncCount - 1;
//拼接条件
String whereClause = super.syncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
//保存日志
super.saveSyncLog(String.format("%s表同步ID范围: %s 至 %s",tableName, currentStart, currentEnd));
//确保如果上次同步失败时没有垃圾数据同步前先走一下删除逻辑
super.confirmDeletion(super.syncStrategy.getColumnName() + ">=" + currentStart);
//查询并保存数据
int rowsSynced = super.syncDataBatch(whereClause);
//保存日志
super.saveSyncLog(String.format("%s表已同步 %s 行数据",tableName, rowsSynced));
//保存表批次同步数量
super.saveSyncNum(rowsSynced);
// 更新同步位置
if (currentEnd > 0) {
super.syncStrategy.setSyncOrigin(String.valueOf(currentEnd));
super.syncStrategyMapper.updateById(syncStrategy);
super.saveSyncLog(String.format("%s表最终同步位置已更新为: %s",tableName, currentEnd));
}
}
/**
* 获取oracle表的ID范围
*/
private IdRangeVO getIncrementColumnRange() throws SQLException {
String sql = "SELECT MIN(" + super.syncStrategy.getColumnName() + ") as min_id, " +
"MAX(" + super.syncStrategy.getColumnName() + ") as max_id " +
"FROM \"" + super.syncStrategy.getSourceOwner().toUpperCase() + "\".\"" + super.syncStrategy.getTableName() + "\"" + super.sourceDBLink;
try (Statement stmt = super.sourceConn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return new IdRangeVO(rs.getLong("min_id"), rs.getLong("max_id"));
}
}
throw new SQLException("无法获取ID范围");
}
}

View File

@ -1,258 +1,186 @@
package org.jeecg.taskConfig.job;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.StopWatch;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import jakarta.annotation.Resource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.properties.DataSyncProperties;
import org.jeecg.common.util.DBUtil;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.mapper.*;
import org.jeecg.vo.IdRangeVO;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.*;
import java.sql.*;
import java.util.Date;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 数据同步任务(支持Oracle和PostgreSQL)
*/
@Slf4j
@DisallowConcurrentExecution
public class SyncDataJob implements Job {
@Resource
private StasTaskConfigMapper stasTaskConfigMapper;
private StasTaskConfigMapper taskConfigMapper;
@Resource
private StasDataSourceMapper stasDataSourceMapper;
private StasDataSourceMapper dataSourceMapper;
@Resource
private StasSyncStrategyMapper stasSyncStrategyMapper;
private StasSyncStrategyMapper syncStrategyMapper;
@Resource
private StasSyncRecordMapper stasSyncRecordMapper;
private StasSyncRecordMapper syncRecordMapper;
@Resource
private StasSyncLogMapper stasSyncLogMapper;
private StasSyncLogMapper syncLogMapper;
@Resource
private StasSyncNumMapper stasSyncNumMapper;
private StasSyncNumMapper syncNumMapper;
@Resource
private RedisUtil redisUtil;
@Resource
private DataSyncProperties dataSyncProperties;
private ThreadPoolExecutor threadPoolExecutor;
@Getter @Setter
private String parameter;
@Override
public void execute(JobExecutionContext jobExecutionContext) {
try {
syncDataByFieldType(parameter);
} catch (SQLException e) {
log.error("执行数据同步任务出现错误,原因为:{}",e.getMessage());
public void execute(JobExecutionContext context) {
Trigger trigger = context.getTrigger();
if(trigger instanceof CronTrigger){
Date nextFireTime = trigger.getNextFireTime();
Date currentFireTime = context.getFireTime();
if(nextFireTime != null && currentFireTime != null){
Long intervalMs = nextFireTime.getTime() - currentFireTime.getTime();
multiTaskSync(parameter,intervalMs,currentFireTime,nextFireTime);
}
}
}
/**
* 根据字段类型同步数据
*/
public void syncDataByFieldType(String taskId) throws SQLException {
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId);
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
StasSyncRecord stasSyncRecord = new StasSyncRecord();
stasSyncRecord.setTaskId(taskId);
stasSyncRecord.setSourceId(stasTaskConfig.getSourceId());
stasSyncRecord.setTargetId(stasTaskConfig.getTargetId());
stasSyncRecord.setStartTime(new Date());
stasSyncRecordMapper.insert(stasSyncRecord);
String recordId = stasSyncRecord.getId();
public void multiTaskSync(String taskId,Long intervalMs,Date currentFireTime,Date nextFireTime){
StopWatch stopWatch = new StopWatch();
stopWatch.start();
StasTaskConfig taskConfig = taskConfigMapper.selectById(taskId);
StasDataSource sourceInfo = dataSourceMapper.selectById(taskConfig.getSourceId());
StasDataSource targetInfo = dataSourceMapper.selectById(taskConfig.getTargetId());
log.info("任务:\"{}\",在{}时间批次开始执行,下次任务预计时间为:{}。",taskConfig.getTaskName(),DateUtil.format(currentFireTime,"yyyy-MM-dd HH:mm:ss"),
DateUtil.format(nextFireTime,"yyyy-MM-dd HH:mm:ss"));
//保存同步记录
String recordId = this.saveSyncRecord(taskId,taskConfig.getSourceId(),taskConfig.getTargetId());
//查询所有需要同步的数据表
List<StasSyncStrategy> stasSyncStrategies = syncStrategyMapper
.selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
//获取源数据库连接用于测试同步数据列
String sourceDBLink = StrUtil.isNotBlank(sourceInfo.getDbLink())?sourceInfo.getDbLink():"";
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl;
if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}else{
targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}
try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),sourceInfo.getType());
Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),targetInfo.getType())) {
// 设置目标连接为批量提交模式
targetConn.setAutoCommit(false);
List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper
.selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
//处理DBlink
String sourceDBLink = StrUtil.isNotBlank(sourceInfo.getDbLink())?sourceInfo.getDbLink():"";
String targetDBLink = StrUtil.isNotBlank(targetInfo.getDbLink())?targetInfo.getDbLink():"";
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()));
long startTime = System.currentTimeMillis();
try {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId,sourceDBLink,targetDBLink);
} catch (SQLException e) {
saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage()));
targetConn.rollback();
throw new SQLException(e.getMessage());
try(Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(), sourceInfo.getType())){
//循环启动线程开始同步
this.initThreadPool();
for (StasSyncStrategy syncStrategy : stasSyncStrategies) {
boolean dateColumn = this.isDateColumn(sourceConn, syncStrategy, sourceInfo.getType(), sourceDBLink);
AbstractSyncDataJob syncDataJob;
if (dateColumn) {
syncDataJob = new SyncDataByDateColumnJob();
}else {
syncDataJob = new SyncDataByIncrementColumnJob();
}
long endTime = System.currentTimeMillis();
saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime)));
syncDataJob.init(taskConfigMapper,dataSourceMapper,syncStrategyMapper,syncRecordMapper,
syncLogMapper,syncNumMapper,taskConfig,syncStrategy,sourceInfo,targetInfo,recordId);
//下面开始线程池执行然后总体等待时间为设置的时间间隔时长超出即所有都停止
threadPoolExecutor.execute(syncDataJob);
}
threadPoolExecutor.shutdown();
boolean awaitResult = threadPoolExecutor.awaitTermination(intervalMs, TimeUnit.MILLISECONDS);
if(!awaitResult){
this.closeThreadPool(recordId,taskConfig.getId());
}
//修改同步记录
this.updateSyncRecordById(recordId);
} catch (SQLException | InterruptedException e) {
String errLog = "执行数据同步任务出现错误,原因为:"+e.getMessage();
log.error(errLog);
this.saveSyncLog(recordId,errLog,taskConfig.getId());
}finally {
stopWatch.stop();
log.info("任务:\"{}\",在{}时间批次执行完毕,耗时:{}秒。", taskConfig.getTaskName(),DateUtil.format(currentFireTime,"yyyy-MM-dd HH:mm:ss"),stopWatch.getTotalTimeSeconds());
}
}
/**
* 优雅关闭线程池
* @throws InterruptedException
*/
private void closeThreadPool(String recordId,String taskId) throws InterruptedException {
//再次多等待2分钟如果还未全部关闭则强制停止
if (!threadPoolExecutor.awaitTermination(120, TimeUnit.SECONDS)) {
List<Runnable> droppedTasks = threadPoolExecutor.shutdownNow();
log.warn("超时!执行强制关闭,丢弃了 " + droppedTasks.size() + " 个未执行任务");
if (!threadPoolExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
String errLog = "线程池无法正常终止,可能存在死锁或线程未响应中断";
log.error(errLog);
this.saveSyncLog(recordId,errLog,taskId);
}
}
stasSyncRecord.setEndTime(new Date());
stasSyncRecordMapper.updateById(stasSyncRecord);
}
/**
* 按ID范围同步数据
* 判断字段是否为日期类型
*/
public void syncByIdRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType, String recordId,
String sourceDbLink, String targetDbLink) throws SQLException {
// 获取最小和最大ID
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType,sourceDbLink);
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ?
lastSyncedId + 1 : idRange.getMinId();
// 确保起始位置不超过结束位置
if (currentStart > idRange.getMaxId()) {
saveSyncLog(recordId, String.format("%s表已同步到最新无需继续同步", stasSyncStrategy.getTableName()));
return;
}
long currentEnd = currentStart + syncCount - 1;
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
saveSyncLog(recordId, String.format("%s表同步ID范围: %s 至 %s", stasSyncStrategy.getTableName(), currentStart, currentEnd));
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType,sourceDbLink,targetDbLink);
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
// 更新同步位置
if (currentEnd > 0) {
stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd));
}
}
/**
* 获取表的ID范围
*/
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType,String sourceDbLink) throws SQLException {
private boolean isDateColumn(Connection conn, StasSyncStrategy syncStrategy, Integer dbType, String dbLink) throws SQLException {
String sql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + sourceDbLink;
} else {
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
"FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
sql = "SELECT data_type FROM all_tab_columns" + dbLink + " WHERE owner = ? AND table_name = ? AND column_name = ?";
}else {
throw new SQLException("不支持的数据库类型: " + dbType);
}
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
pstmt.setString(1, syncStrategy.getSourceOwner().toUpperCase());
pstmt.setString(2, syncStrategy.getTableName().toUpperCase());
pstmt.setString(3, syncStrategy.getColumnName().toUpperCase());
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
return new IdRangeVO(rs.getLong("min_id"), rs.getLong("max_id"));
String dataType = rs.getString("data_type").toUpperCase();
return dataType.contains("DATE") || dataType.contains("TIME");
}
}
throw new SQLException("无法获取ID范围");
return false;
}
/**
* 同步一批数据(支持跨数据库类型)
* 保存同步记录
* @param taskId 任务id
* @param sourceId 源数据库id
* @param targetId 目标数据库id
* @return
*/
public int syncDataBatch(Connection sourceConn, Connection targetConn,
String sourceOwner, String targetOwner,
String tableName, String whereClause,
Integer sourceDbType, Integer targetDbType,
String sourceDbLink, String targetDbLink) throws SQLException {
int totalRows = 0;
String selectSql;
private String saveSyncRecord(String taskId,String sourceId,String targetId){
StasSyncRecord stasSyncRecord = new StasSyncRecord();
stasSyncRecord.setTaskId(taskId);
stasSyncRecord.setSourceId(sourceId);
stasSyncRecord.setTargetId(targetId);
stasSyncRecord.setStartTime(new Date());
syncRecordMapper.insert(stasSyncRecord);
return stasSyncRecord.getId();
}
// 构建查询SQL(根据源数据库类型)
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
"\"" + sourceDbLink + " WHERE " + whereClause;
} else {
selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName +
"\" WHERE " + whereClause;
}
try (Statement sourceStmt = sourceConn.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = sourceStmt.executeQuery(selectSql)) {
sourceStmt.setFetchSize(5000); // 优化大表读取
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// 构建插入SQL(根据目标数据库类型)
String insertSql;
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\"" + targetDbLink + " VALUES (";
} else {
insertSql = "INSERT INTO \"" + tableName + "\" VALUES (";
}
for (int i = 1; i <= columnCount; i++) {
if (i > 1) insertSql += ", ";
insertSql += "?";
}
insertSql += ")";
// 批量插入
try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql)) {
int batchSize = 0;
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i);
// 特殊处理Oracle的TIMESTAMP类型到PostgreSQL
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType) && SourceDataTypeEnum.POSTGRES.getKey().equals(targetDbType)) {
String columnType = metaData.getColumnTypeName(i);
if ("TIMESTAMP".equalsIgnoreCase(columnType) || "DATE".equalsIgnoreCase(columnType)) {
Timestamp ts = rs.getTimestamp(i);
if (ts != null) {
pstmt.setTimestamp(i, ts);
continue;
}
}
}
pstmt.setObject(i, value);
}
pstmt.addBatch();
batchSize++;
totalRows++;
if (batchSize % 5000 == 0) {
pstmt.executeBatch();
targetConn.commit();
batchSize = 0;
}
}
if (batchSize > 0) {
pstmt.executeBatch();
targetConn.commit();
}
}
}
return totalRows;
/**
* 修改同步记录
* @param recordId
*/
private void updateSyncRecordById(String recordId){
StasSyncRecord stasSyncRecord = syncRecordMapper.selectById(recordId);
stasSyncRecord.setEndTime(new Date());
syncRecordMapper.updateById(stasSyncRecord);
}
/**
@ -260,25 +188,25 @@ public class SyncDataJob implements Job {
* @param recordId
* @param desc
*/
private void saveSyncLog(String recordId, String desc){
protected void saveSyncLog(String recordId, String desc,String taskId){
StasSyncLog stasSyncLog = new StasSyncLog();
stasSyncLog.setRecordId(recordId);
stasSyncLog.setStartTime(new Date());
stasSyncLog.setDescription(desc);
stasSyncLogMapper.insert(stasSyncLog);
stasSyncLog.setTaskId(taskId);
syncLogMapper.insert(stasSyncLog);
}
/**
* 保存同步记录
* @param recordId
* @param tableName
* @param num
* 初始化线程池
*/
private void saveSyncNum(String recordId, String tableName, Integer num){
StasSyncNum stasSyncNum = new StasSyncNum();
stasSyncNum.setRecordId(recordId);
stasSyncNum.setTableName(tableName);
stasSyncNum.setSyncNum(num);
stasSyncNumMapper.insert(stasSyncNum);
protected void initThreadPool(){
//获取机器可用核心数
int maximumPoolSize = Runtime.getRuntime().availableProcessors();
int maxExecSize = Math.min(dataSyncProperties.getMaxExecNum(), maximumPoolSize);
String threadNamePrefix = "data_sync";
int queueCapacity = maxExecSize * 2;
CustomThreadFactory threadFactory = new CustomThreadFactory(threadNamePrefix);
threadPoolExecutor = new ThreadPoolExecutor(maxExecSize,maximumPoolSize,5, TimeUnit.SECONDS,new LinkedBlockingQueue<>(queueCapacity),threadFactory);
}
}

View File

@ -53,10 +53,4 @@ public interface IStasTaskConfigService extends IService<StasTaskConfig> {
*/
void executeQuartzJob(String quartzId);
/**
* 根据字段类型同步数据
* @param taskId 任务id
* @throws SQLException 数据库异常
*/
void syncDataByFieldType(String taskId) throws SQLException;
}

View File

@ -2,12 +2,10 @@ package org.jeecg.taskConfig.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.QuartzJobConstant;
import org.jeecg.common.constant.enums.SyncTaskStatusEnum;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.mapper.StasDataSourceMapper;
import org.jeecg.modules.base.mapper.StasSyncStrategyMapper;
@ -19,17 +17,10 @@ import org.jeecg.syncNum.service.IStasSyncNumService;
import org.jeecg.syncRecord.service.IStasSyncRecordService;
import org.jeecg.taskConfig.service.IStasTaskConfigService;
import org.jeecg.vo.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.springframework.transaction.annotation.Transactional;
import java.sql.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@ -134,6 +125,9 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
@Transactional(rollbackFor = Exception.class)
@Override
public void edit(StasTaskConfig stasTaskConfig) {
if(SyncTaskStatusEnum.IN_OPERATION.getKey().equals(stasTaskConfig.getTaskStatus())){
throw new RuntimeException("当前任务正在进行中,请停止后再次进行修改");
}
this.updateById(stasTaskConfig);
//编辑定时任务
StasTaskConfig taskConfig = stasTaskConfigMapper.selectById(stasTaskConfig.getId());
@ -201,277 +195,4 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
throw new JeecgBootException(e.getMessage());
}
}
/**
* 根据字段类型同步数据
* @param taskId 任务id
* @throws SQLException 数据库异常
*/
@Override
public void syncDataByFieldType(String taskId) throws SQLException {
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(taskId);
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
// 设置目标连接为批量提交模式
targetConn.setAutoCommit(false);
List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper.
selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
System.out.println("开始同步表: " + stasSyncStrategy.getTableName() + " (依据字段: " + stasSyncStrategy.getColumnName() + ")");
long startTime = System.currentTimeMillis();
try {
if (isDateColumn(sourceConn,stasSyncStrategy)) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncDay());
} else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy, stasTaskConfig.getSyncCount());
}
} catch (SQLException e) {
System.err.println("同步表 " + stasSyncStrategy.getTableName() + " 时出错: " + e.getMessage());
targetConn.rollback();
throw new JeecgBootException(e.getMessage());
} catch (ParseException e) {
throw new JeecgBootException(e.getMessage());
}
long endTime = System.currentTimeMillis();
System.out.println("" + stasSyncStrategy.getTableName() + " 同步耗时: " + (endTime - startTime) + " 毫秒");
}
}
}
public boolean isDateColumn(Connection sourceConn, StasSyncStrategy stasSyncStrategy) throws SQLException {
String sql = "SELECT data_type FROM all_tab_columns "
+ "WHERE owner = ? AND table_name = ? AND column_name = ?";
try (PreparedStatement pstmt = sourceConn.prepareStatement(sql)) {
pstmt.setString(1, stasSyncStrategy.getSourceOwner().toUpperCase());
pstmt.setString(2, stasSyncStrategy.getTableName().toUpperCase());
pstmt.setString(3, stasSyncStrategy.getColumnName().toUpperCase());
ResultSet rs = pstmt.executeQuery();
if (rs.next()) {
String dataType = rs.getString("data_type");
// 判断是否为日期/时间类型
return dataType != null &&
(dataType.equalsIgnoreCase("DATE") ||
dataType.equalsIgnoreCase("TIMESTAMP") ||
dataType.equalsIgnoreCase("TIMESTAMP WITH TIME ZONE") ||
dataType.equalsIgnoreCase("TIMESTAMP WITH LOCAL TIME ZONE"));
}
} catch (SQLException e) {
e.printStackTrace();
throw e; // 重新抛出异常让调用方处理
}
return false;
}
/**
* 按日期范围同步数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param stasSyncStrategy 表配置
* @param syncCount 同步起始位置
* @throws SQLException 数据库异常
*/
public void syncByDateRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException, ParseException {
// 获取最小和最大日期
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy);
System.out.println("日期范围: " + dateRange.getMinDate() + "" + dateRange.getMaxDate());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(stasSyncStrategy.getSyncOrigin()) : dateRange.getMinDate();
Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ?
lastSyncedDate : dateRange.getMinDate();
Date currentEnd = addDays(currentStart, syncCount);
if (currentEnd.after(dateRange.getMaxDate())) {
currentEnd = dateRange.getMaxDate();
}
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
"', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')";
System.out.println("同步日期范围: " + sdf.format(currentStart) + "" + sdf.format(currentEnd));
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause);
System.out.println("已同步 " + rowsSynced + " 行数据");
// 只在最后记录同步位置
if (currentEnd != null) {
stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
System.out.println("最终同步位置已更新为: " + sdf.format(currentEnd));
}
}
/**
* 按ID范围同步数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param stasSyncStrategy 表配置
* @param syncCount 每次同步的记录数
* @throws SQLException 数据库异常
*/
public void syncByIdRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount) throws SQLException {
// 获取最小和最大ID
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy);
System.out.println("ID范围: " + idRange.getMinId() + "" + idRange.getMaxId());
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
long lastSyncedId = StringUtils.isNotBlank(syncOrigin) ? Long.parseLong(syncOrigin) : idRange.getMinId();
long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ?
lastSyncedId : idRange.getMinId();
long currentEnd = currentStart + syncCount;
if (currentEnd > idRange.getMaxId()) {
currentEnd = idRange.getMaxId();
}
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
System.out.println("同步ID范围: " + currentStart + "" + currentEnd);
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), whereClause);
System.out.println("已同步 " + rowsSynced + " 行数据");
// 只在最后记录同步位置
if (currentEnd > 0) {
stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
System.out.println("最终同步位置已更新为: " + currentEnd);
}
}
/**
* 获取表的日期范围
* @param conn 数据库连接
* @param stasSyncStrategy 表配置
* @return 日期范围对象
* @throws SQLException 数据库异常
*/
public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException {
String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, MAX(" + stasSyncStrategy.getColumnName() + ") as max_date FROM " +
stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName();
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return new DateRangeVO(rs.getDate("min_date"), rs.getDate("max_date"));
}
}
throw new SQLException("无法获取日期范围");
}
/**
* 获取表的ID范围
* @param conn 数据库连接
* @param stasSyncStrategy 表配置
* @return ID范围对象
* @throws SQLException 数据库异常
*/
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy) throws SQLException {
String sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, MAX(" + stasSyncStrategy.getColumnName() + ") as max_id FROM " +
stasSyncStrategy.getSourceOwner() + "." + stasSyncStrategy.getTableName();
try (Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery(sql)) {
if (rs.next()) {
return new IdRangeVO(rs.getLong("min_id"), rs.getLong("max_id"));
}
}
throw new SQLException("无法获取ID范围");
}
/**
* 同步一批数据
* @param sourceConn 源数据库连接
* @param targetConn 目标数据库连接
* @param tableName 表名
* @param whereClause 条件子句
* @return 同步的行数
* @throws SQLException 数据库异常
*/
public int syncDataBatch(Connection sourceConn, Connection targetConn,
String sourceOwner, String targetOwner,
String tableName, String whereClause) throws SQLException {
int totalRows = 0;
// 从源表读取数据使用带引号的表名保持大小写
String selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
"\" WHERE " + whereClause;
try (Statement sourceStmt = sourceConn.createStatement(
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
ResultSet rs = sourceStmt.executeQuery(selectSql)) {
// 设置获取大小为5000优化大表读取
sourceStmt.setFetchSize(5000);
// 获取列信息
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();
// 准备插入语句使用带引号的表名保持大小写
StringBuilder insertSql = new StringBuilder("INSERT INTO \"")
.append(targetOwner.toUpperCase()).append("\".\"").append(tableName).append("\" VALUES (");
for (int i = 1; i <= columnCount; i++) {
if (i > 1) insertSql.append(", ");
insertSql.append("?");
}
insertSql.append(")");
// 批量插入
try (PreparedStatement pstmt = targetConn.prepareStatement(insertSql.toString())) {
int batchSize = 0;
while (rs.next()) {
for (int i = 1; i <= columnCount; i++) {
pstmt.setObject(i, rs.getObject(i));
}
pstmt.addBatch();
batchSize++;
totalRows++;
if (batchSize % 5000 == 0) {
pstmt.executeBatch();
targetConn.commit();
batchSize = 0;
}
}
if (batchSize > 0) {
pstmt.executeBatch();
targetConn.commit();
}
}
}
return totalRows;
}
/**
* 计算指定日期加上指定天数后的日期
* @param date 基准日期
* @param days 要添加的天数
* @return 计算后的日期
*/
private static Date addDays(Date date, int days) {
long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000;
return new Date(time);
}
}