保存表事务提交

This commit is contained in:
hekaiyu 2025-10-23 10:20:20 +08:00
parent 83f027d1dd
commit e929c61f3b
3 changed files with 52 additions and 51 deletions

View File

@ -71,15 +71,7 @@ public class StasSyncStrategyController extends JeecgController<StasSyncStrategy
@Operation(summary="同步策略表-在目标库创建表结构") @Operation(summary="同步策略表-在目标库创建表结构")
@PostMapping(value = "/createTargetTables") @PostMapping(value = "/createTargetTables")
public Result<String> createTargetTables(@RequestBody List<StasSyncStrategy> stasSyncStrategys) { public Result<String> createTargetTables(@RequestBody List<StasSyncStrategy> stasSyncStrategys) {
if(null != stasSyncStrategys && stasSyncStrategys.size() > 0) { stasSyncStrategyService.createTargetTables(stasSyncStrategys);
String taskId = stasSyncStrategys.get(0).getTaskId();
stasSyncStrategyService.remove(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategys) {
if(stasSyncStrategyService.validateTables(stasSyncStrategy)){
stasSyncStrategyService.createTargetTables(stasSyncStrategy);
}
}
}
return Result.OK("添加成功!"); return Result.OK("添加成功!");
} }

View File

@ -4,6 +4,7 @@ import org.jeecg.modules.base.entity.StasSyncStrategy;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List;
/** /**
* @Description: 同步策略表 * @Description: 同步策略表
@ -23,8 +24,8 @@ public interface IStasSyncStrategyService extends IService<StasSyncStrategy> {
/** /**
* 在目标库创建表结构 * 在目标库创建表结构
* @param stasSyncStrategy 同步策略信息 * @param stasSyncStrategys 同步策略信息
* @throws SQLException 数据库异常 * @throws SQLException 数据库异常
*/ */
void createTargetTables(StasSyncStrategy stasSyncStrategy); void createTargetTables(List<StasSyncStrategy> stasSyncStrategys);
} }

View File

@ -1,5 +1,6 @@
package org.jeecg.stasSyncStrategy.service.impl; package org.jeecg.stasSyncStrategy.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.jeecg.common.constant.enums.SourceDataTypeEnum; import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.exception.JeecgBootException;
@ -32,6 +33,7 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
private final StasTaskConfigMapper stasTaskConfigMapper; private final StasTaskConfigMapper stasTaskConfigMapper;
private final StasDataSourceMapper stasDataSourceMapper; private final StasDataSourceMapper stasDataSourceMapper;
private final StasSyncStrategyMapper stasSyncStrategyService;
/** /**
@ -67,53 +69,59 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
/** /**
* 在目标库创建表结构 * 在目标库创建表结构
* @param stasSyncStrategy 同步策略信息 * @param stasSyncStrategys 同步策略信息
* @throws SQLException 数据库异常 * @throws SQLException 数据库异常
*/ */
@Transactional @Transactional
@Override @Override
public void createTargetTables(StasSyncStrategy stasSyncStrategy) { public void createTargetTables(List<StasSyncStrategy> stasSyncStrategys) {
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(stasSyncStrategy.getTaskId()); if(null != stasSyncStrategys && stasSyncStrategys.size() > 0) {
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId()); String taskId = stasSyncStrategys.get(0).getTaskId();
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId()); stasSyncStrategyService.delete(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategys) {
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(stasSyncStrategy.getTaskId());
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId()); String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl; String targetUrl;
if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){ if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); targetUrl = DBUtil.getUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
}else{ }else{
targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId()); targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
} }
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword()); try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) { Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
try { try {
// 检查表是否已存在 // 检查表是否已存在
if (!isTableExists(targetConn, stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), targetInfo.getType())) { if (!isTableExists(targetConn, stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), targetInfo.getType())) {
// 获取源表结构 // 获取源表结构
String createSql = getCreateTableSql(sourceConn, stasSyncStrategy.getSourceOwner(), String createSql = getCreateTableSql(sourceConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
sourceInfo.getType(), targetInfo.getType()); sourceInfo.getType(), targetInfo.getType());
// 在目标库创建表 // 在目标库创建表
try (Statement stmt = targetConn.createStatement()) { try (Statement stmt = targetConn.createStatement()) {
stmt.execute(createSql); stmt.execute(createSql);
this.baseMapper.insert(stasSyncStrategy); this.baseMapper.insert(stasSyncStrategy);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
throw new JeecgBootException(e.getMessage()); throw new JeecgBootException(e.getMessage());
} }
} else { } else {
this.baseMapper.insert(stasSyncStrategy); this.baseMapper.insert(stasSyncStrategy);
// throw new JeecgBootException(String.format("在用户%s中%s表已存在请删除已存在的表", // throw new JeecgBootException(String.format("在用户%s中%s表已存在请删除已存在的表",
// stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName())); // stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName()));
}
} catch (SQLException e) {
throw new JeecgBootException(String.format("处理表: %s时出错", stasSyncStrategy.getTableName()));
}
} catch (SQLException e) {
throw new JeecgBootException(e.getMessage());
} }
} catch (SQLException e) {
throw new JeecgBootException(String.format("处理表: %s时出错", stasSyncStrategy.getTableName()));
} }
} catch (SQLException e) {
throw new JeecgBootException(e.getMessage());
} }
} }