fix:1.梳理数据同步功能,去除无用的同步代码
2.修改任务配置及策略配置管理功能,添加事务,防止数据污染
This commit is contained in:
parent
204df4954e
commit
db391d7519
|
|
@ -5,9 +5,9 @@ public enum SourceDataTypeEnum {
|
|||
ORACLE(0, "ORACLE"),
|
||||
POSTGRES(1, "POSTGRES");
|
||||
|
||||
private Integer key;
|
||||
private final Integer key;
|
||||
|
||||
private String value;
|
||||
private final String value;
|
||||
|
||||
SourceDataTypeEnum(Integer key, String value) {
|
||||
this.key = key;
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
package org.jeecg.common.util;
|
||||
|
||||
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.Properties;
|
||||
|
||||
|
|
@ -18,6 +20,41 @@ public class DBUtil {
|
|||
return String.format("%s%s:%s%s", DBUtil.POSTGRES_URL_PREFIX, ip, port, serveId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取数据库连接(自定义超时)
|
||||
* @param url JDBC连接URL
|
||||
* @param username 用户名
|
||||
* @param password 密码
|
||||
* @param type 0=PostgreSQL, 1=Oracle
|
||||
*/
|
||||
public static Connection getConnection(String url, String username, String password,
|
||||
int type) throws SQLException{
|
||||
try {
|
||||
Properties props = new Properties();
|
||||
props.put("user", username);
|
||||
props.put("password", password);
|
||||
|
||||
if (SourceDataTypeEnum.POSTGRES.getKey().equals(type)) {
|
||||
// PostgreSQL
|
||||
Class.forName(POSTGRES_DRIVER);
|
||||
|
||||
props.put("loginTimeout", 30);
|
||||
props.put("socketTimeout", 300000);
|
||||
props.put("tcpKeepAlive", "true");
|
||||
} else {
|
||||
// Oracle
|
||||
Class.forName(ORACLE_DRIVER);
|
||||
|
||||
props.put("oracle.net.CONNECT_TIMEOUT",30000);
|
||||
props.put("oracle.jdbc.ReadTimeout",300000);
|
||||
}
|
||||
return DriverManager.getConnection(url, props);
|
||||
} catch(ClassNotFoundException e){
|
||||
e.printStackTrace() ;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static Connection getConnection(String url, String username, String password, String Driver,int type) throws SQLException{
|
||||
try {
|
||||
Class.forName(Driver);
|
||||
|
|
|
|||
|
|
@ -33,8 +33,6 @@ public class QuartzJob implements Serializable {
|
|||
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
|
||||
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
|
||||
private java.util.Date createTime;
|
||||
/**删除状态*/
|
||||
private Integer delFlag;
|
||||
/**修改人*/
|
||||
private String updateBy;
|
||||
/**修改时间*/
|
||||
|
|
|
|||
|
|
@ -29,6 +29,8 @@ public class StasSyncLog implements Serializable {
|
|||
/**主键*/
|
||||
@TableId(type = IdType.ASSIGN_ID)
|
||||
private String id;
|
||||
/**任务id*/
|
||||
private String taskId;
|
||||
/**记录id*/
|
||||
private String recordId;
|
||||
/**开始时间*/
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ public class StasSyncNum implements Serializable {
|
|||
/**主键*/
|
||||
@TableId(type = IdType.ASSIGN_ID)
|
||||
private String id;
|
||||
/**任务id*/
|
||||
private String taskId;
|
||||
/**记录id*/
|
||||
private String recordId;
|
||||
/**同步表名*/
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
|||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.annotation.Resource;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
|
@ -15,12 +14,9 @@ import org.jeecg.common.api.vo.Result;
|
|||
import org.jeecg.common.aspect.annotation.AutoLog;
|
||||
import org.jeecg.common.system.base.controller.JeecgController;
|
||||
import org.jeecg.common.system.query.QueryGenerator;
|
||||
import org.jeecg.common.util.oConvertUtils;
|
||||
import org.jeecg.modules.base.entity.StasDataSource;
|
||||
import org.jeecg.dataSource.service.IStasDataSourceService;
|
||||
import org.jeecg.modules.base.entity.StasSyncStrategy;
|
||||
import org.jeecg.modules.base.entity.StasTaskConfig;
|
||||
import org.jeecg.modules.base.service.BaseCommonService;
|
||||
import org.jeecg.taskConfig.service.IStasTaskConfigService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
|
@ -224,24 +220,6 @@ public class StasDataSourceController extends JeecgController<StasDataSource, IS
|
|||
return Result.ok("删除成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量删除
|
||||
* @param ids
|
||||
* @return
|
||||
*/
|
||||
//@RequiresRoles({"admin"})
|
||||
@RequestMapping(value = "/deleteBatch", method = RequestMethod.DELETE)
|
||||
public Result<StasDataSource> deleteBatch(@RequestParam(name="ids",required=true) String ids) {
|
||||
Result<StasDataSource> result = new Result<StasDataSource>();
|
||||
if(oConvertUtils.isEmpty(ids)) {
|
||||
result.error500("未选中数据源!");
|
||||
}else {
|
||||
stasDataSourceService.deleteBatchDataSource(ids.split(","));
|
||||
result.success("删除数据源成功!");
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过id查询
|
||||
* @param id
|
||||
|
|
|
|||
|
|
@ -1,17 +1,19 @@
|
|||
package org.jeecg.dataSource.service.impl;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jeecg.common.constant.CommonConstant;
|
||||
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
|
||||
import org.jeecg.common.util.DBUtil;
|
||||
import org.jeecg.modules.base.entity.StasDataSource;
|
||||
import org.jeecg.modules.base.entity.StasTaskConfig;
|
||||
import org.jeecg.modules.base.mapper.StasDataSourceMapper;
|
||||
import org.jeecg.dataSource.service.IStasDataSourceService;
|
||||
import org.jeecg.taskConfig.service.IStasTaskConfigService;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.*;
|
||||
import java.util.*;
|
||||
import java.util.Date;
|
||||
|
|
@ -25,8 +27,11 @@ import java.util.Date;
|
|||
* @since 2020-12-12
|
||||
*/
|
||||
@Service
|
||||
@RequiredArgsConstructor
|
||||
public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper, StasDataSource> implements IStasDataSourceService {
|
||||
|
||||
private final IStasTaskConfigService stasTaskConfigService;
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean saveDataSource(StasDataSource dataSource) {
|
||||
|
|
@ -36,15 +41,24 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
|
|||
if(null != selectCount && selectCount > 0){
|
||||
throw new RuntimeException("数据库名称已存在!");
|
||||
}
|
||||
dataSource.setDelFlag(CommonConstant.DEL_FLAG_0);
|
||||
dataSource.setCreateTime(new Date());
|
||||
// dataSource.setCreateBy(setCreateBy)
|
||||
return this.save(dataSource);
|
||||
}
|
||||
|
||||
@Override
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public boolean deleteDataSource(String dataSourceId) {
|
||||
//此数据源被任务配置作为源数据库使用
|
||||
LambdaQueryWrapper<StasTaskConfig> useSrcQueryWrapper = new LambdaQueryWrapper<>();
|
||||
useSrcQueryWrapper.eq(StasTaskConfig::getSourceId, dataSourceId);
|
||||
List<StasTaskConfig> useSrcTasks = stasTaskConfigService.list(useSrcQueryWrapper);
|
||||
//此数据源被任务配置作为目标数据库使用
|
||||
LambdaQueryWrapper<StasTaskConfig> useTargetQueryWrapper = new LambdaQueryWrapper<>();
|
||||
useTargetQueryWrapper.eq(StasTaskConfig::getTargetId, dataSourceId);
|
||||
List<StasTaskConfig> useTargetTasks = stasTaskConfigService.list(useTargetQueryWrapper);
|
||||
if (CollUtil.isNotEmpty(useSrcTasks) || CollUtil.isNotEmpty(useTargetTasks)){
|
||||
throw new RuntimeException("此数据源已被任务配置数据所占用,请先清除任务配置数据");
|
||||
}
|
||||
this.removeById(dataSourceId);
|
||||
return true;
|
||||
}
|
||||
|
|
@ -99,8 +113,11 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
|
|||
@Override
|
||||
public List<String> queryTableList(String sourceId, String username) {
|
||||
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
|
||||
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
|
||||
String sql = "SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = ?";
|
||||
if(Objects.equals(SourceDataTypeEnum.ORACLE.getKey(), stasDataSource.getType())){
|
||||
// 如果配置了DBLink,则在元数据视图后追加@dblink以查询远程数据库的所有表
|
||||
String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink())
|
||||
? stasDataSource.getDbLink().trim() : "";
|
||||
String sql = "SELECT TABLE_NAME FROM ALL_TABLES" + dbLinkSuffix + " WHERE OWNER = ?";
|
||||
return queryDatabaseMetadata(stasDataSource, sql, "TABLE_NAME", username);
|
||||
} else {
|
||||
String sql = "SELECT tablename AS table_name FROM pg_tables WHERE schemaname = ?";
|
||||
|
|
@ -111,8 +128,10 @@ public class StasDataSourceServiceImpl extends ServiceImpl<StasDataSourceMapper,
|
|||
@Override
|
||||
public List<String> queryColumnList(String sourceId, String username, String tableName) {
|
||||
StasDataSource stasDataSource = this.baseMapper.selectById(sourceId);
|
||||
if(SourceDataTypeEnum.ORACLE.getKey() == stasDataSource.getType()){
|
||||
String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE OWNER = ? AND TABLE_NAME = ?";
|
||||
if(Objects.equals(SourceDataTypeEnum.ORACLE.getKey(), stasDataSource.getType())){
|
||||
String dbLinkSuffix = StringUtils.isNotBlank(stasDataSource.getDbLink())
|
||||
? stasDataSource.getDbLink().trim() : "";
|
||||
String sql = "SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS" + dbLinkSuffix + " WHERE OWNER = ? AND TABLE_NAME = ?";
|
||||
return queryDatabaseMetadata(stasDataSource, sql, "COLUMN_NAME", username, tableName);
|
||||
} else {
|
||||
String sql = "SELECT column_name FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
|
||||
|
|
|
|||
|
|
@ -41,7 +41,6 @@ public class QuartzJobServiceImpl extends ServiceImpl<QuartzJobMapper, QuartzJob
|
|||
@Transactional(rollbackFor = JeecgBootException.class)
|
||||
public boolean saveAndScheduleJob(QuartzJob quartzJob) {
|
||||
// DB设置修改
|
||||
quartzJob.setDelFlag(CommonConstant.DEL_FLAG_0);
|
||||
boolean success = this.saveOrUpdate(quartzJob);
|
||||
if (success) {
|
||||
if (CommonConstant.STATUS_NORMAL.equals(quartzJob.getStatus())) {
|
||||
|
|
|
|||
|
|
@ -1,34 +1,25 @@
|
|||
package org.jeecg.stasSyncStrategy.controller;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import org.jeecg.common.api.vo.Result;
|
||||
import org.jeecg.common.system.query.QueryGenerator;
|
||||
import org.jeecg.modules.base.entity.StasSyncStrategy;
|
||||
import org.jeecg.stasSyncStrategy.service.IStasSyncStrategyService;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.jeecg.common.system.base.controller.JeecgController;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.servlet.ModelAndView;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import org.jeecg.common.aspect.annotation.AutoLog;
|
||||
|
||||
/**
|
||||
* @Description: 同步策略表
|
||||
* @Author: jeecg-boot
|
||||
* @Date: 2025-09-11
|
||||
* @Version: V1.0
|
||||
*/
|
||||
@Tag(name="同步策略表")
|
||||
@RestController
|
||||
|
|
@ -48,7 +39,7 @@ public class StasSyncStrategyController extends JeecgController<StasSyncStrategy
|
|||
* @param req
|
||||
* @return
|
||||
*/
|
||||
//@AutoLog(value = "同步策略表-分页列表查询")
|
||||
@AutoLog(value = "同步策略表-分页列表查询")
|
||||
@Operation(summary="同步策略表-分页列表查询")
|
||||
@GetMapping(value = "/list")
|
||||
public Result<IPage<StasSyncStrategy>> queryPageList(StasSyncStrategy stasSyncStrategy,
|
||||
|
|
@ -85,7 +76,7 @@ public class StasSyncStrategyController extends JeecgController<StasSyncStrategy
|
|||
@Operation(summary="同步策略表-添加")
|
||||
@PostMapping(value = "/add")
|
||||
public Result<String> add(@RequestBody StasSyncStrategy stasSyncStrategy) {
|
||||
stasSyncStrategyService.save(stasSyncStrategy);
|
||||
stasSyncStrategyService.add(stasSyncStrategy);
|
||||
return Result.OK("添加成功!");
|
||||
}
|
||||
|
||||
|
|
@ -99,7 +90,7 @@ public class StasSyncStrategyController extends JeecgController<StasSyncStrategy
|
|||
@Operation(summary="同步策略表-编辑")
|
||||
@RequestMapping(value = "/edit", method = {RequestMethod.PUT,RequestMethod.POST})
|
||||
public Result<String> edit(@RequestBody StasSyncStrategy stasSyncStrategy) {
|
||||
stasSyncStrategyService.updateById(stasSyncStrategy);
|
||||
stasSyncStrategyService.edit(stasSyncStrategy);
|
||||
return Result.OK("编辑成功!");
|
||||
}
|
||||
|
||||
|
|
@ -113,31 +104,17 @@ public class StasSyncStrategyController extends JeecgController<StasSyncStrategy
|
|||
@Operation(summary="同步策略表-通过id删除")
|
||||
@DeleteMapping(value = "/delete")
|
||||
public Result<String> delete(@RequestParam(name="id",required=true) String id) {
|
||||
stasSyncStrategyService.removeById(id);
|
||||
stasSyncStrategyService.delete(id);
|
||||
return Result.OK("删除成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量删除
|
||||
*
|
||||
* @param ids
|
||||
* @return
|
||||
*/
|
||||
@AutoLog(value = "同步策略表-批量删除")
|
||||
@Operation(summary="同步策略表-批量删除")
|
||||
@DeleteMapping(value = "/deleteBatch")
|
||||
public Result<String> deleteBatch(@RequestParam(name="ids",required=true) String ids) {
|
||||
this.stasSyncStrategyService.removeByIds(Arrays.asList(ids.split(",")));
|
||||
return Result.OK("批量删除成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过id查询
|
||||
*
|
||||
* @param id
|
||||
* @return
|
||||
*/
|
||||
//@AutoLog(value = "同步策略表-通过id查询")
|
||||
@AutoLog(value = "同步策略表-通过id查询")
|
||||
@Operation(summary="同步策略表-通过id查询")
|
||||
@GetMapping(value = "/queryById")
|
||||
public Result<StasSyncStrategy> queryById(@RequestParam(name="id",required=true) String id) {
|
||||
|
|
@ -147,28 +124,4 @@ public class StasSyncStrategyController extends JeecgController<StasSyncStrategy
|
|||
}
|
||||
return Result.OK(stasSyncStrategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* 导出excel
|
||||
*
|
||||
* @param request
|
||||
* @param stasSyncStrategy
|
||||
*/
|
||||
@RequestMapping(value = "/exportXls")
|
||||
public ModelAndView exportXls(HttpServletRequest request, StasSyncStrategy stasSyncStrategy) {
|
||||
return super.exportXls(request, stasSyncStrategy, StasSyncStrategy.class, "同步策略表");
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过excel导入数据
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @return
|
||||
*/
|
||||
@RequestMapping(value = "/importExcel", method = RequestMethod.POST)
|
||||
public Result<?> importExcel(HttpServletRequest request, HttpServletResponse response) {
|
||||
return super.importExcel(request, response, StasSyncStrategy.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,7 +2,6 @@ package org.jeecg.stasSyncStrategy.service;
|
|||
|
||||
import org.jeecg.modules.base.entity.StasSyncStrategy;
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
|
||||
|
|
@ -14,18 +13,28 @@ import java.util.List;
|
|||
*/
|
||||
public interface IStasSyncStrategyService extends IService<StasSyncStrategy> {
|
||||
|
||||
/**
|
||||
* 验证表是否存在且包含指定字段
|
||||
* @param stasSyncStrategy 同步策略信息
|
||||
* @return 是否有效
|
||||
* @throws SQLException 数据库异常
|
||||
*/
|
||||
boolean validateTables(StasSyncStrategy stasSyncStrategy);
|
||||
|
||||
/**
|
||||
* 在目标库创建表结构
|
||||
* @param stasSyncStrategys 同步策略信息
|
||||
* @throws SQLException 数据库异常
|
||||
*/
|
||||
void createTargetTables(List<StasSyncStrategy> stasSyncStrategys);
|
||||
|
||||
/**
|
||||
* 保存策略
|
||||
* @param stasSyncStrategy
|
||||
*/
|
||||
void add(StasSyncStrategy stasSyncStrategy);
|
||||
|
||||
/**
|
||||
* 修改策略
|
||||
* @param stasSyncStrategy
|
||||
*/
|
||||
void edit(StasSyncStrategy stasSyncStrategy);
|
||||
|
||||
/**
|
||||
* 删除策略
|
||||
* @param id
|
||||
*/
|
||||
void delete(String id);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,6 @@
|
|||
package org.jeecg.stasSyncStrategy.service.impl;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
|
||||
|
|
@ -35,38 +36,6 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
private final StasDataSourceMapper stasDataSourceMapper;
|
||||
private final StasSyncStrategyMapper stasSyncStrategyService;
|
||||
|
||||
|
||||
/**
|
||||
* 验证表是否存在且包含指定字段
|
||||
* @param stasSyncStrategy 同步策略信息
|
||||
* @return 是否有效
|
||||
* @throws SQLException 数据库异常
|
||||
*/
|
||||
@Override
|
||||
public boolean validateTables(StasSyncStrategy stasSyncStrategy) {
|
||||
StasTaskConfig stasTaskConfig = stasTaskConfigMapper.selectById(stasSyncStrategy.getTaskId());
|
||||
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
|
||||
|
||||
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
|
||||
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword())) {
|
||||
String username = stasSyncStrategy.getSourceOwner();
|
||||
String tableName = stasSyncStrategy.getTableName();
|
||||
String columnName = stasSyncStrategy.getColumnName();
|
||||
|
||||
if (isTableExists(sourceConn, username, tableName, sourceInfo.getType())) {
|
||||
if (isColumnExists(sourceConn, username, tableName, columnName, sourceInfo.getType())) {
|
||||
return true;
|
||||
} else {
|
||||
throw new JeecgBootException(String.format("警告: 表%s中,依据字段%s不存在!", tableName, columnName));
|
||||
}
|
||||
} else {
|
||||
throw new JeecgBootException(String.format("警告: %s用户下,%s表不存在!", username, tableName));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new JeecgBootException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 在目标库创建表结构
|
||||
* @param stasSyncStrategys 同步策略信息
|
||||
|
|
@ -96,9 +65,9 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
|
||||
try {
|
||||
// 检查表是否已存在
|
||||
if (!isTableExists(targetConn, stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(), targetInfo.getType())) {
|
||||
if (!isTableExists(targetConn, stasSyncStrategy.getTargetOwner(),targetInfo.getDbLink(), stasSyncStrategy.getTableName(), targetInfo.getType())) {
|
||||
// 获取源表结构
|
||||
String createSql = getCreateTableSql(sourceConn, stasSyncStrategy.getSourceOwner(),
|
||||
String createSql = getCreateTableSql(sourceConn, stasSyncStrategy.getSourceOwner(),sourceInfo.getDbLink(),
|
||||
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
|
||||
sourceInfo.getType(), targetInfo.getType());
|
||||
|
||||
|
|
@ -107,15 +76,13 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
stmt.execute(createSql);
|
||||
this.baseMapper.insert(stasSyncStrategy);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw new JeecgBootException(e.getMessage());
|
||||
}
|
||||
} else {
|
||||
this.baseMapper.insert(stasSyncStrategy);
|
||||
// throw new JeecgBootException(String.format("在用户%s中,%s表已存在,请删除已存在的表!",
|
||||
// stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName()));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
e.printStackTrace();
|
||||
throw new JeecgBootException(String.format("处理表: %s时出错", stasSyncStrategy.getTableName()));
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
|
|
@ -125,6 +92,39 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存策略
|
||||
*
|
||||
* @param stasSyncStrategy
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void add(StasSyncStrategy stasSyncStrategy) {
|
||||
this.baseMapper.insert(stasSyncStrategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* 修改策略
|
||||
*
|
||||
* @param stasSyncStrategy
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void edit(StasSyncStrategy stasSyncStrategy) {
|
||||
this.baseMapper.updateById(stasSyncStrategy);
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除策略
|
||||
*
|
||||
* @param id
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void delete(String id) {
|
||||
this.baseMapper.deleteById(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取创建表的SQL语句
|
||||
* @param conn 数据库连接
|
||||
|
|
@ -136,9 +136,9 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
* @return 创建表的SQL语句
|
||||
* @throws SQLException 数据库异常
|
||||
*/
|
||||
private String getCreateTableSql(Connection conn, String sourceOwner, String targetOwner,
|
||||
private String getCreateTableSql(Connection conn, String sourceOwner,String sourceDBLink, String targetOwner,
|
||||
String table, Integer sourceDbType, Integer targetDbType) throws SQLException {
|
||||
if (!isTableExists(conn, sourceOwner, table, sourceDbType)) {
|
||||
if (!isTableExists(conn, sourceOwner,sourceDBLink, table, sourceDbType)) {
|
||||
throw new SQLException("表 " + table + " 在源数据库中不存在");
|
||||
}
|
||||
|
||||
|
|
@ -146,12 +146,12 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
// Oracle转PostgreSQL的特殊处理
|
||||
return generatePgCreateTableFromOracle(conn, sourceOwner, targetOwner, table);
|
||||
} else {
|
||||
return generateOracleCreateTable(conn, sourceOwner, targetOwner, table);
|
||||
return generateOracleCreateTable(conn, sourceOwner,sourceDBLink, targetOwner, table);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public String generateOracleCreateTable(Connection conn, String sourceOwner, String targetOwner, String tableName) {
|
||||
public String generateOracleCreateTable(Connection conn, String sourceOwner,String sourceDBLink, String targetOwner, String tableName) {
|
||||
try {
|
||||
DatabaseMetaData metaData = conn.getMetaData();
|
||||
StringBuilder ddl = new StringBuilder();
|
||||
|
|
@ -159,11 +159,12 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
// 使用双引号括起targetOwner和tableName
|
||||
ddl.append("CREATE TABLE \"").append(targetOwner).append("\".\"").append(tableName).append("\" (\n");
|
||||
|
||||
sourceDBLink = StrUtil.isNotBlank(sourceDBLink)?sourceDBLink:"";
|
||||
// 获取列信息
|
||||
String columnSql = "SELECT column_name, data_type, data_length, data_precision, data_scale, nullable " +
|
||||
"FROM all_tab_columns " +
|
||||
"WHERE owner = ? AND table_name = ? " +
|
||||
"ORDER BY column_id";
|
||||
String columnSql = "SELECT column_name, data_type, data_length, data_precision, data_scale, nullable" +
|
||||
" FROM all_tab_columns"+sourceDBLink+
|
||||
" WHERE owner = ? AND table_name = ?" +
|
||||
" ORDER BY column_id";
|
||||
|
||||
try (PreparedStatement pstmt = conn.prepareStatement(columnSql)) {
|
||||
pstmt.setString(1, sourceOwner.toUpperCase());
|
||||
|
|
@ -382,10 +383,11 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
* @return 表是否存在
|
||||
* @throws SQLException 数据库异常
|
||||
*/
|
||||
private boolean isTableExists(Connection conn, String schema, String tableName, Integer dbType) throws SQLException {
|
||||
private boolean isTableExists(Connection conn, String schema,String dbLink, String tableName, Integer dbType) throws SQLException {
|
||||
String sql;
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||
sql = "SELECT COUNT(*) FROM all_tables WHERE owner = ? AND (table_name = ? OR table_name = ?)";
|
||||
dbLink = StrUtil.isNotBlank(dbLink)?dbLink:"";
|
||||
sql = "SELECT COUNT(*) FROM all_tables"+dbLink+" WHERE owner = ? AND (table_name = ? OR table_name = ?)";
|
||||
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
|
||||
sql = "SELECT COUNT(*) FROM information_schema.tables WHERE table_name = ?";
|
||||
} else {
|
||||
|
|
@ -419,11 +421,12 @@ public class StasSyncStrategyServiceImpl extends ServiceImpl<StasSyncStrategyMap
|
|||
* @return 列是否存在
|
||||
* @throws SQLException 数据库异常
|
||||
*/
|
||||
private boolean isColumnExists(Connection conn, String schema, String tableName,
|
||||
private boolean isColumnExists(Connection conn, String schema,String dbLink, String tableName,
|
||||
String columnName, Integer dbType) throws SQLException {
|
||||
String sql;
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||
sql = "SELECT COUNT(*) FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?";
|
||||
dbLink = StrUtil.isNotBlank(dbLink)?dbLink:"";
|
||||
sql = "SELECT COUNT(*) FROM all_tab_columns"+dbLink+" WHERE owner = ? AND table_name = ? AND column_name = ?";
|
||||
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
|
||||
sql = "SELECT COUNT(*) FROM information_schema.columns WHERE table_schema = ? AND table_name = ? AND column_name = ?";
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -5,10 +5,8 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import jakarta.servlet.http.HttpServletResponse;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.jeecg.common.api.vo.Result;
|
||||
import org.jeecg.common.constant.enums.SyncTaskStatusEnum;
|
||||
|
|
@ -19,26 +17,19 @@ import org.jeecg.modules.base.entity.StasSyncStrategy;
|
|||
import org.jeecg.modules.base.entity.StasTaskConfig;
|
||||
import org.jeecg.stasSyncStrategy.service.IStasSyncStrategyService;
|
||||
import org.jeecg.taskConfig.service.IStasTaskConfigService;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
||||
import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import org.jeecg.common.system.base.controller.JeecgController;
|
||||
import org.jeecg.vo.MindMapVO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.servlet.ModelAndView;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import org.jeecg.common.aspect.annotation.AutoLog;
|
||||
|
||||
/**
|
||||
* @Description: 任务配置表
|
||||
* @Author: jeecg-boot
|
||||
* @Date: 2025-09-11
|
||||
* @Version: V1.0
|
||||
*/
|
||||
@Tag(name="任务配置表")
|
||||
@RestController
|
||||
|
|
@ -49,8 +40,6 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
@Autowired
|
||||
private IStasTaskConfigService stasTaskConfigService;
|
||||
@Autowired
|
||||
private IStasSyncStrategyService stasSyncStrategyService;
|
||||
@Autowired
|
||||
private IStasDataSourceService stasDataSourceService;
|
||||
|
||||
/**
|
||||
|
|
@ -109,8 +98,7 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
}
|
||||
|
||||
/**
|
||||
* 添加
|
||||
*
|
||||
* 添加任务
|
||||
* @param stasTaskConfig
|
||||
* @return
|
||||
*/
|
||||
|
|
@ -118,24 +106,20 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
@Operation(summary="任务配置表-添加")
|
||||
@PostMapping(value = "/add")
|
||||
public Result<String> add(@RequestBody StasTaskConfig stasTaskConfig) {
|
||||
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey());
|
||||
stasTaskConfigService.save(stasTaskConfig);
|
||||
stasTaskConfigService.saveQuartzJob(stasTaskConfig);
|
||||
stasTaskConfigService.add(stasTaskConfig);
|
||||
return Result.OK("添加成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 编辑
|
||||
*
|
||||
*编辑
|
||||
* @param stasTaskConfig
|
||||
* @return
|
||||
*/
|
||||
@AutoLog(value = "任务配置表-编辑")
|
||||
@Operation(summary="任务配置表-编辑")
|
||||
@RequestMapping(value = "/edit", method = {RequestMethod.PUT,RequestMethod.POST})
|
||||
@PutMapping(value = "/edit")
|
||||
public Result<String> edit(@RequestBody StasTaskConfig stasTaskConfig) {
|
||||
stasTaskConfigService.updateById(stasTaskConfig);
|
||||
stasTaskConfigService.editQuartzJob(stasTaskConfig);
|
||||
stasTaskConfigService.edit(stasTaskConfig);
|
||||
return Result.OK("编辑成功!");
|
||||
}
|
||||
|
||||
|
|
@ -149,10 +133,7 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
@Operation(summary="任务配置表-通过id删除")
|
||||
@DeleteMapping(value = "/delete")
|
||||
public Result<String> delete(@RequestParam(name="id",required=true) String id) {
|
||||
StasTaskConfig byId = stasTaskConfigService.getById(id);
|
||||
stasTaskConfigService.delQuartzJob(byId.getQuartzId());
|
||||
stasSyncStrategyService.remove(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId,id));
|
||||
stasTaskConfigService.removeById(id);
|
||||
stasTaskConfigService.delete(id);
|
||||
return Result.OK("删除成功!");
|
||||
}
|
||||
|
||||
|
|
@ -166,16 +147,12 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
@Operation(summary="任务配置表-暂停定时任务")
|
||||
@GetMapping(value = "/pause")
|
||||
public Result<String> pause(@RequestParam(name="taskId",required=true) String taskId) {
|
||||
StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId);
|
||||
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey());
|
||||
stasTaskConfigService.updateById(stasTaskConfig);
|
||||
stasTaskConfigService.pauseQuartzJob(stasTaskConfig.getQuartzId());
|
||||
stasTaskConfigService.pause(taskId);
|
||||
return Result.OK("暂停成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动定时任务
|
||||
*
|
||||
*启动定时任务
|
||||
* @param taskId
|
||||
* @return
|
||||
*/
|
||||
|
|
@ -183,16 +160,12 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
@Operation(summary="任务配置表-启动定时任务")
|
||||
@GetMapping(value = "/resume")
|
||||
public Result<String> resume(@RequestParam(name="taskId",required=true) String taskId) {
|
||||
StasTaskConfig stasTaskConfig = stasTaskConfigService.getById(taskId);
|
||||
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.IN_OPERATION.getKey());
|
||||
stasTaskConfigService.updateById(stasTaskConfig);
|
||||
stasTaskConfigService.resumeQuartzJob(stasTaskConfig.getQuartzId());
|
||||
stasTaskConfigService.resume(taskId);
|
||||
return Result.OK("启动成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 立即执行定时任务
|
||||
*
|
||||
*立即执行定时任务
|
||||
* @param taskId
|
||||
* @return
|
||||
*/
|
||||
|
|
@ -218,20 +191,6 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
stasTaskConfigService.syncDataByFieldType(taskId);
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量删除
|
||||
*
|
||||
* @param ids
|
||||
* @return
|
||||
*/
|
||||
@AutoLog(value = "任务配置表-批量删除")
|
||||
@Operation(summary="任务配置表-批量删除")
|
||||
@DeleteMapping(value = "/deleteBatch")
|
||||
public Result<String> deleteBatch(@RequestParam(name="ids",required=true) String ids) {
|
||||
this.stasTaskConfigService.removeByIds(Arrays.asList(ids.split(",")));
|
||||
return Result.OK("批量删除成功!");
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过id查询
|
||||
*
|
||||
|
|
@ -248,28 +207,4 @@ public class StasTaskConfigController extends JeecgController<StasTaskConfig, IS
|
|||
}
|
||||
return Result.OK(stasTaskConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* 导出excel
|
||||
*
|
||||
* @param request
|
||||
* @param stasTaskConfig
|
||||
*/
|
||||
@RequestMapping(value = "/exportXls")
|
||||
public ModelAndView exportXls(HttpServletRequest request, StasTaskConfig stasTaskConfig) {
|
||||
return super.exportXls(request, stasTaskConfig, StasTaskConfig.class, "任务配置表");
|
||||
}
|
||||
|
||||
/**
|
||||
* 通过excel导入数据
|
||||
*
|
||||
* @param request
|
||||
* @param response
|
||||
* @return
|
||||
*/
|
||||
@RequestMapping(value = "/importExcel", method = RequestMethod.POST)
|
||||
public Result<?> importExcel(HttpServletRequest request, HttpServletResponse response) {
|
||||
return super.importExcel(request, response, StasTaskConfig.class);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,10 @@
|
|||
package org.jeecg.taskConfig.job;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
|
||||
|
|
@ -9,15 +12,10 @@ import org.jeecg.common.exception.JeecgBootException;
|
|||
import org.jeecg.common.util.DBUtil;
|
||||
import org.jeecg.modules.base.entity.*;
|
||||
import org.jeecg.modules.base.mapper.*;
|
||||
import org.jeecg.vo.DateRangeVO;
|
||||
import org.jeecg.vo.IdRangeVO;
|
||||
import org.quartz.Job;
|
||||
import org.quartz.JobExecutionContext;
|
||||
import org.quartz.JobExecutionException;
|
||||
|
||||
import java.sql.*;
|
||||
import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
|
|
@ -40,14 +38,15 @@ public class SyncDataJob implements Job {
|
|||
@Resource
|
||||
private StasSyncNumMapper stasSyncNumMapper;
|
||||
|
||||
@Getter @Setter
|
||||
private String parameter;
|
||||
|
||||
@Override
|
||||
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
|
||||
public void execute(JobExecutionContext jobExecutionContext) {
|
||||
try {
|
||||
syncDataByFieldType(parameter);
|
||||
} catch (SQLException e) {
|
||||
throw new JeecgBootException(e.getMessage());
|
||||
log.error("执行数据同步任务出现错误,原因为:{}",e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -75,8 +74,8 @@ public class SyncDataJob implements Job {
|
|||
targetUrl = DBUtil.getPgUrl(targetInfo.getIpAddress(), targetInfo.getPort(), targetInfo.getServeId());
|
||||
}
|
||||
|
||||
try (Connection sourceConn = DriverManager.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword());
|
||||
Connection targetConn = DriverManager.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword())) {
|
||||
try (Connection sourceConn = DBUtil.getConnection(sourceUrl, sourceInfo.getUsername(), sourceInfo.getPassword(),sourceInfo.getType());
|
||||
Connection targetConn = DBUtil.getConnection(targetUrl, targetInfo.getUsername(), targetInfo.getPassword(),targetInfo.getType())) {
|
||||
|
||||
// 设置目标连接为批量提交模式
|
||||
targetConn.setAutoCommit(false);
|
||||
|
|
@ -84,24 +83,21 @@ public class SyncDataJob implements Job {
|
|||
List<StasSyncStrategy> stasSyncStrategies = stasSyncStrategyMapper
|
||||
.selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
|
||||
|
||||
//处理DBlink
|
||||
String sourceDBLink = StrUtil.isNotBlank(sourceInfo.getDbLink())?sourceInfo.getDbLink():"";
|
||||
String targetDBLink = StrUtil.isNotBlank(targetInfo.getDbLink())?targetInfo.getDbLink():"";
|
||||
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
|
||||
saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()));
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
try {
|
||||
if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) {
|
||||
syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
|
||||
stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType(), recordId);
|
||||
} else {
|
||||
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
|
||||
stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId);
|
||||
}
|
||||
} catch (SQLException | ParseException e) {
|
||||
stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId,sourceDBLink,targetDBLink);
|
||||
} catch (SQLException e) {
|
||||
saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage()));
|
||||
targetConn.rollback();
|
||||
throw new JeecgBootException(e.getMessage());
|
||||
throw new SQLException(e.getMessage());
|
||||
}
|
||||
|
||||
long endTime = System.currentTimeMillis();
|
||||
saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime)));
|
||||
}
|
||||
|
|
@ -110,99 +106,15 @@ public class SyncDataJob implements Job {
|
|||
stasSyncRecordMapper.updateById(stasSyncRecord);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断字段是否为日期类型
|
||||
*/
|
||||
public boolean isDateColumn(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
|
||||
String sql;
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||
sql = "SELECT data_type FROM all_tab_columns WHERE owner = ? AND table_name = ? AND column_name = ?";
|
||||
} else if (SourceDataTypeEnum.POSTGRES.getKey().equals(dbType)) {
|
||||
sql = "SELECT data_type FROM information_schema.columns " +
|
||||
"WHERE table_schema = ? AND table_name = ? AND column_name = ?";
|
||||
} else {
|
||||
throw new SQLException("不支持的数据库类型: " + dbType);
|
||||
}
|
||||
|
||||
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
|
||||
pstmt.setString(1, stasSyncStrategy.getSourceOwner().toUpperCase());
|
||||
pstmt.setString(2, stasSyncStrategy.getTableName().toUpperCase());
|
||||
pstmt.setString(3, stasSyncStrategy.getColumnName().toUpperCase());
|
||||
|
||||
ResultSet rs = pstmt.executeQuery();
|
||||
if (rs.next()) {
|
||||
String dataType = rs.getString("data_type").toUpperCase();
|
||||
// 判断是否为日期/时间类型
|
||||
return dataType.contains("DATE") || dataType.contains("TIME");
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* 按日期范围同步数据
|
||||
*/
|
||||
public void syncByDateRange(Connection sourceConn, Connection targetConn,
|
||||
StasSyncStrategy stasSyncStrategy, Integer syncCount,
|
||||
Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException, ParseException {
|
||||
// 获取最小和最大日期
|
||||
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType);
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
||||
|
||||
// 获取上次同步的位置
|
||||
String syncOrigin = stasSyncStrategy.getSyncOrigin();
|
||||
Date lastSyncedDate = StringUtils.isNotBlank(syncOrigin) ? sdf.parse(syncOrigin) : dateRange.getMinDate();
|
||||
Date currentStart = (lastSyncedDate != null && lastSyncedDate.after(dateRange.getMinDate())) ?
|
||||
lastSyncedDate : dateRange.getMinDate();
|
||||
|
||||
Date currentEnd = addDays(currentStart, syncCount);
|
||||
if (currentEnd.after(dateRange.getMaxDate())) {
|
||||
currentEnd = dateRange.getMaxDate();
|
||||
StringBuilder whereClause = new StringBuilder();
|
||||
whereClause.append("TO_CHAR(")
|
||||
.append(stasSyncStrategy.getColumnName())
|
||||
.append(", 'YYYY-MM-DD HH24:MI:SS') = '")
|
||||
.append(sdf.format(currentEnd))
|
||||
.append("'");
|
||||
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString());
|
||||
}
|
||||
|
||||
// 根据数据库类型构建不同的日期条件
|
||||
String whereClause;
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
|
||||
whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TO_DATE('" + sdf.format(currentStart) +
|
||||
"', 'YYYY-MM-DD HH24:MI:SS') AND TO_DATE('" + sdf.format(currentEnd) + "', 'YYYY-MM-DD HH24:MI:SS')";
|
||||
} else {
|
||||
whereClause = stasSyncStrategy.getColumnName() + " BETWEEN TIMESTAMP '" + sdf.format(currentStart) +
|
||||
"' AND TIMESTAMP '" + sdf.format(currentEnd) + "'";
|
||||
}
|
||||
|
||||
saveSyncLog(recordId,String.format("%s表同步日期范围: %s 至 %s", stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd)));
|
||||
|
||||
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
|
||||
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
|
||||
whereClause, sourceDbType, targetDbType);
|
||||
|
||||
saveSyncLog(recordId,String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
|
||||
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
|
||||
|
||||
// 更新同步位置
|
||||
if (currentEnd != null) {
|
||||
stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd));
|
||||
stasSyncStrategyMapper.updateById(stasSyncStrategy);
|
||||
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 按ID范围同步数据
|
||||
*/
|
||||
public void syncByIdRange(Connection sourceConn, Connection targetConn,
|
||||
StasSyncStrategy stasSyncStrategy, Integer syncCount,
|
||||
Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException {
|
||||
Integer sourceDbType, Integer targetDbType, String recordId,
|
||||
String sourceDbLink, String targetDbLink) throws SQLException {
|
||||
// 获取最小和最大ID
|
||||
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType);
|
||||
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType,sourceDbLink);
|
||||
|
||||
// 获取上次同步的位置
|
||||
String syncOrigin = stasSyncStrategy.getSyncOrigin();
|
||||
|
|
@ -211,22 +123,12 @@ public class SyncDataJob implements Job {
|
|||
long currentStart = (lastSyncedId > 0 && lastSyncedId > idRange.getMinId()) ?
|
||||
lastSyncedId + 1 : idRange.getMinId();
|
||||
|
||||
long currentEnd = currentStart + syncCount - 1;
|
||||
if (currentEnd > idRange.getMaxId()) {
|
||||
currentEnd = idRange.getMaxId();
|
||||
StringBuilder whereClause = new StringBuilder();
|
||||
whereClause.append(stasSyncStrategy.getColumnName())
|
||||
.append(" = '")
|
||||
.append(currentEnd)
|
||||
.append("'");
|
||||
deleteByEquals(targetConn, stasSyncStrategy, sourceDbType, whereClause.toString());
|
||||
}
|
||||
|
||||
// 确保起始位置不超过结束位置
|
||||
if (currentStart > currentEnd) {
|
||||
if (currentStart > idRange.getMaxId()) {
|
||||
saveSyncLog(recordId, String.format("%s表已同步到最新,无需继续同步", stasSyncStrategy.getTableName()));
|
||||
return;
|
||||
}
|
||||
long currentEnd = currentStart + syncCount - 1;
|
||||
|
||||
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
|
||||
|
||||
|
|
@ -234,7 +136,7 @@ public class SyncDataJob implements Job {
|
|||
|
||||
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
|
||||
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
|
||||
whereClause, sourceDbType, targetDbType);
|
||||
whereClause, sourceDbType, targetDbType,sourceDbLink,targetDbLink);
|
||||
|
||||
saveSyncLog(recordId, String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
|
||||
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
|
||||
|
|
@ -247,58 +149,15 @@ public class SyncDataJob implements Job {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据日期等于或ID等于删除数据(使用yyyy-MM-dd格式比较)
|
||||
*/
|
||||
public int deleteByEquals(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType, String whereClause) throws SQLException {
|
||||
String sql;
|
||||
// 构建完整的SQL语句
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toUpperCase() + "\".\"" +
|
||||
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
|
||||
} else {
|
||||
sql = "DELETE FROM \"" + stasSyncStrategy.getTargetOwner().toLowerCase() + "\".\"" +
|
||||
stasSyncStrategy.getTableName() + "\" WHERE " + whereClause;
|
||||
}
|
||||
|
||||
try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
|
||||
return pstmt.executeUpdate();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取表的日期范围
|
||||
*/
|
||||
public DateRangeVO getDateRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
|
||||
String sql;
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
|
||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
|
||||
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
|
||||
} else {
|
||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_date, " +
|
||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_date " +
|
||||
"FROM \"" + stasSyncStrategy.getSourceOwner().toLowerCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
|
||||
}
|
||||
|
||||
try (Statement stmt = conn.createStatement();
|
||||
ResultSet rs = stmt.executeQuery(sql)) {
|
||||
if (rs.next()) {
|
||||
return new DateRangeVO(rs.getTimestamp("min_date"), rs.getTimestamp("max_date"));
|
||||
}
|
||||
}
|
||||
throw new SQLException("无法获取日期范围");
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取表的ID范围
|
||||
*/
|
||||
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType) throws SQLException {
|
||||
public IdRangeVO getIdRange(Connection conn, StasSyncStrategy stasSyncStrategy, Integer dbType,String sourceDbLink) throws SQLException {
|
||||
String sql;
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(dbType)) {
|
||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
|
||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
|
||||
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"";
|
||||
"FROM \"" + stasSyncStrategy.getSourceOwner().toUpperCase() + "\".\"" + stasSyncStrategy.getTableName() + "\"" + sourceDbLink;
|
||||
} else {
|
||||
sql = "SELECT MIN(" + stasSyncStrategy.getColumnName() + ") as min_id, " +
|
||||
"MAX(" + stasSyncStrategy.getColumnName() + ") as max_id " +
|
||||
|
|
@ -320,14 +179,15 @@ public class SyncDataJob implements Job {
|
|||
public int syncDataBatch(Connection sourceConn, Connection targetConn,
|
||||
String sourceOwner, String targetOwner,
|
||||
String tableName, String whereClause,
|
||||
Integer sourceDbType, Integer targetDbType) throws SQLException {
|
||||
Integer sourceDbType, Integer targetDbType,
|
||||
String sourceDbLink, String targetDbLink) throws SQLException {
|
||||
int totalRows = 0;
|
||||
String selectSql;
|
||||
|
||||
// 构建查询SQL(根据源数据库类型)
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(sourceDbType)) {
|
||||
selectSql = "SELECT * FROM \"" + sourceOwner.toUpperCase() + "\".\"" + tableName +
|
||||
"\" WHERE " + whereClause;
|
||||
"\"" + sourceDbLink + " WHERE " + whereClause;
|
||||
} else {
|
||||
selectSql = "SELECT * FROM \"" + sourceOwner.toLowerCase() + "\".\"" + tableName +
|
||||
"\" WHERE " + whereClause;
|
||||
|
|
@ -344,7 +204,7 @@ public class SyncDataJob implements Job {
|
|||
// 构建插入SQL(根据目标数据库类型)
|
||||
String insertSql;
|
||||
if (SourceDataTypeEnum.ORACLE.getKey().equals(targetDbType)) {
|
||||
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\" VALUES (";
|
||||
insertSql = "INSERT INTO \"" + targetOwner.toUpperCase() + "\".\"" + tableName + "\"" + targetDbLink + " VALUES (";
|
||||
} else {
|
||||
insertSql = "INSERT INTO \"" + tableName + "\" VALUES (";
|
||||
}
|
||||
|
|
@ -395,7 +255,11 @@ public class SyncDataJob implements Job {
|
|||
return totalRows;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* 保存同步日志
|
||||
* @param recordId
|
||||
* @param desc
|
||||
*/
|
||||
private void saveSyncLog(String recordId, String desc){
|
||||
StasSyncLog stasSyncLog = new StasSyncLog();
|
||||
stasSyncLog.setRecordId(recordId);
|
||||
|
|
@ -404,6 +268,12 @@ public class SyncDataJob implements Job {
|
|||
stasSyncLogMapper.insert(stasSyncLog);
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存同步记录
|
||||
* @param recordId
|
||||
* @param tableName
|
||||
* @param num
|
||||
*/
|
||||
private void saveSyncNum(String recordId, String tableName, Integer num){
|
||||
StasSyncNum stasSyncNum = new StasSyncNum();
|
||||
stasSyncNum.setRecordId(recordId);
|
||||
|
|
@ -411,12 +281,4 @@ public class SyncDataJob implements Job {
|
|||
stasSyncNum.setSyncNum(num);
|
||||
stasSyncNumMapper.insert(stasSyncNum);
|
||||
}
|
||||
|
||||
/**
|
||||
* 计算指定日期加上指定天数后的日期
|
||||
*/
|
||||
private Date addDays(Date date, int days) {
|
||||
long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000;
|
||||
return new Date(time);
|
||||
}
|
||||
}
|
||||
|
|
@ -21,34 +21,31 @@ public interface IStasTaskConfigService extends IService<StasTaskConfig> {
|
|||
MindMapVO getMindMap(String taskId);
|
||||
|
||||
/**
|
||||
* 添加定时任务
|
||||
* @param stasTaskConfig
|
||||
* 保存任务
|
||||
*/
|
||||
void saveQuartzJob(StasTaskConfig stasTaskConfig);
|
||||
void add(StasTaskConfig stasTaskConfig);
|
||||
|
||||
/**
|
||||
* 编辑定时任务
|
||||
* @param stasTaskConfig
|
||||
* 编辑任务
|
||||
*/
|
||||
void editQuartzJob(StasTaskConfig stasTaskConfig);
|
||||
void edit(StasTaskConfig stasTaskConfig);
|
||||
|
||||
/**
|
||||
* 删除定时任务
|
||||
* @param quartzId
|
||||
* @param id
|
||||
*/
|
||||
void delQuartzJob(String quartzId);
|
||||
void delete(String id);
|
||||
|
||||
/**
|
||||
* 暂停定时任务
|
||||
* @param quartzId
|
||||
*/
|
||||
void pauseQuartzJob(String quartzId);
|
||||
void pause(String taskId);
|
||||
|
||||
/**
|
||||
* 启动定时任务
|
||||
* @param quartzId
|
||||
* @param taskId
|
||||
*/
|
||||
void resumeQuartzJob(String quartzId);
|
||||
void resume(String taskId);
|
||||
|
||||
/**
|
||||
* 立即执行定时任务
|
||||
|
|
|
|||
|
|
@ -5,21 +5,25 @@ import lombok.RequiredArgsConstructor;
|
|||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jeecg.common.constant.CommonConstant;
|
||||
import org.jeecg.common.constant.QuartzJobConstant;
|
||||
import org.jeecg.common.constant.enums.SyncTaskStatusEnum;
|
||||
import org.jeecg.common.exception.JeecgBootException;
|
||||
import org.jeecg.common.util.DBUtil;
|
||||
import org.jeecg.modules.base.entity.StasDataSource;
|
||||
import org.jeecg.modules.base.entity.*;
|
||||
import org.jeecg.modules.base.mapper.StasDataSourceMapper;
|
||||
import org.jeecg.modules.base.entity.StasSyncStrategy;
|
||||
import org.jeecg.modules.base.mapper.StasSyncStrategyMapper;
|
||||
import org.jeecg.modules.base.entity.StasTaskConfig;
|
||||
import org.jeecg.modules.base.mapper.StasTaskConfigMapper;
|
||||
import org.jeecg.modules.base.entity.QuartzJob;
|
||||
import org.jeecg.quartz.service.IQuartzJobService;
|
||||
import org.jeecg.stasSyncStrategy.service.IStasSyncStrategyService;
|
||||
import org.jeecg.syncLog.service.IStasSyncLogService;
|
||||
import org.jeecg.syncNum.service.IStasSyncNumService;
|
||||
import org.jeecg.syncRecord.service.IStasSyncRecordService;
|
||||
import org.jeecg.taskConfig.service.IStasTaskConfigService;
|
||||
import org.jeecg.vo.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.sql.*;
|
||||
import java.text.ParseException;
|
||||
|
|
@ -43,6 +47,10 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
|
|||
private final StasDataSourceMapper stasDataSourceMapper;
|
||||
private final StasSyncStrategyMapper stasSyncStrategyMapper;
|
||||
private final IQuartzJobService quartzJobService;
|
||||
private final IStasSyncStrategyService stasSyncStrategyService;
|
||||
private final IStasSyncRecordService syncRecordService;
|
||||
private final IStasSyncNumService stasSyncNumService;
|
||||
private final IStasSyncLogService stasSyncLogService;
|
||||
|
||||
/**
|
||||
* 数据表思维导图
|
||||
|
|
@ -98,8 +106,16 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
|
|||
return mindMapVO;
|
||||
}
|
||||
|
||||
/**
|
||||
* 保存任务
|
||||
* @param stasTaskConfig
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void saveQuartzJob(StasTaskConfig stasTaskConfig){
|
||||
public void add(StasTaskConfig stasTaskConfig) {
|
||||
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey());
|
||||
this.save(stasTaskConfig);
|
||||
//添加定时任务
|
||||
QuartzJob quartzJob = new QuartzJob();
|
||||
quartzJob.setCronExpression(stasTaskConfig.getCron());
|
||||
quartzJob.setParameter(stasTaskConfig.getId());
|
||||
|
|
@ -111,8 +127,15 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
|
|||
stasTaskConfigMapper.updateById(stasTaskConfig);
|
||||
}
|
||||
|
||||
/**
|
||||
* 编辑任务
|
||||
* @param stasTaskConfig
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void editQuartzJob(StasTaskConfig stasTaskConfig){
|
||||
public void edit(StasTaskConfig stasTaskConfig) {
|
||||
this.updateById(stasTaskConfig);
|
||||
//编辑定时任务
|
||||
StasTaskConfig taskConfig = stasTaskConfigMapper.selectById(stasTaskConfig.getId());
|
||||
QuartzJob quartzJob = quartzJobService.getById(taskConfig.getQuartzId());
|
||||
quartzJob.setCronExpression(stasTaskConfig.getCron());
|
||||
|
|
@ -120,34 +143,53 @@ public class StasTaskConfigServiceImpl extends ServiceImpl<StasTaskConfigMapper,
|
|||
quartzJobService.saveAndScheduleJob(quartzJob);
|
||||
}
|
||||
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void delQuartzJob(String quartzId){
|
||||
try {
|
||||
QuartzJob quartzJob = quartzJobService.getById(quartzId);
|
||||
public void delete(String id) {
|
||||
StasTaskConfig byId = this.getById(id);
|
||||
//删除并停止定时任务
|
||||
QuartzJob quartzJob = quartzJobService.getById(byId.getQuartzId());
|
||||
quartzJobService.deleteAndStopJob(quartzJob);
|
||||
} catch (Exception e) {
|
||||
throw new JeecgBootException(e.getMessage());
|
||||
}
|
||||
//删除策略
|
||||
stasSyncStrategyService.remove(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId,id));
|
||||
//删除同步记录
|
||||
syncRecordService.remove(new LambdaQueryWrapper<StasSyncRecord>().eq(StasSyncRecord::getTaskId,id));
|
||||
//删除同步数量记录
|
||||
stasSyncNumService.remove(new LambdaQueryWrapper<StasSyncNum>().eq(StasSyncNum::getTaskId,id));
|
||||
//删除同步任务产生的日志
|
||||
stasSyncLogService.remove(new LambdaQueryWrapper<StasSyncLog>().eq(StasSyncLog::getTaskId,id));
|
||||
//删除本任务数据
|
||||
this.removeById(id);
|
||||
}
|
||||
|
||||
/**
|
||||
* 暂停定时任务
|
||||
* @param taskId
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void pauseQuartzJob(String quartzId){
|
||||
try {
|
||||
QuartzJob quartzJob = quartzJobService.getById(quartzId);
|
||||
public void pause(String taskId) {
|
||||
StasTaskConfig stasTaskConfig = this.getById(taskId);
|
||||
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.NOT_STARTED.getKey());
|
||||
this.updateById(stasTaskConfig);
|
||||
//暂停定时任务
|
||||
QuartzJob quartzJob = quartzJobService.getById(stasTaskConfig.getQuartzId());
|
||||
quartzJobService.pause(quartzJob);
|
||||
} catch (Exception e) {
|
||||
throw new JeecgBootException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动定时任务
|
||||
* @param taskId
|
||||
*/
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
@Override
|
||||
public void resumeQuartzJob(String quartzId){
|
||||
try {
|
||||
QuartzJob quartzJob = quartzJobService.getById(quartzId);
|
||||
public void resume(String taskId) {
|
||||
StasTaskConfig stasTaskConfig = this.getById(taskId);
|
||||
stasTaskConfig.setTaskStatus(SyncTaskStatusEnum.IN_OPERATION.getKey());
|
||||
this.updateById(stasTaskConfig);
|
||||
//启动定时任务
|
||||
QuartzJob quartzJob = quartzJobService.getById(stasTaskConfig.getQuartzId());
|
||||
quartzJobService.resumeJob(quartzJob);
|
||||
} catch (Exception e) {
|
||||
throw new JeecgBootException(e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user