同步日志模块

This commit is contained in:
hekaiyu 2025-10-14 15:57:26 +08:00
parent 268cacd2c1
commit e50c4662dc
21 changed files with 651 additions and 22 deletions

View File

@ -0,0 +1,43 @@
package org.jeecg.modules.base.entity;
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.format.annotation.DateTimeFormat;
import org.jeecgframework.poi.excel.annotation.Excel;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* @Description: 同步日志信息
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Data
@TableName("stas_sync_log")
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
public class StasSyncLog implements Serializable {
private static final long serialVersionUID = 1L;
/**主键*/
@TableId(type = IdType.ASSIGN_ID)
private String id;
/**记录id*/
@Excel(name = "记录id", width = 15)
private String recordId;
/**开始时间*/
@Excel(name = "开始时间", width = 20, format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
private Date startTime;
/**描述*/
@Excel(name = "描述", width = 15)
private String description;
}

View File

@ -0,0 +1,37 @@
package org.jeecg.modules.base.entity;
import java.io.Serializable;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.jeecgframework.poi.excel.annotation.Excel;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* @Description: 同步数量
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Data
@TableName("stas_sync_num")
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
public class StasSyncNum implements Serializable {
private static final long serialVersionUID = 1L;
/**主键*/
@TableId(type = IdType.ASSIGN_ID)
private String id;
/**记录id*/
@Excel(name = "记录id", width = 15)
private String recordId;
/**同步表名*/
@Excel(name = "同步表名", width = 15)
private String tableName;
/**同步数据条数*/
@Excel(name = "同步数据条数", width = 15)
private Integer syncNum;
}

View File

@ -0,0 +1,51 @@
package org.jeecg.modules.base.entity;
import java.io.Serializable;
import java.util.Date;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import com.fasterxml.jackson.annotation.JsonFormat;
import org.springframework.format.annotation.DateTimeFormat;
import org.jeecgframework.poi.excel.annotation.Excel;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* @Description: 同步记录表
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Data
@TableName("stas_sync_record")
@Accessors(chain = true)
@EqualsAndHashCode(callSuper = false)
public class StasSyncRecord implements Serializable {
private static final long serialVersionUID = 1L;
/**主键*/
@TableId(type = IdType.ASSIGN_ID)
private String id;
/**任务id*/
@Excel(name = "任务id", width = 15)
private String taskId;
/**源库id*/
@Excel(name = "源库id", width = 15)
private String sourceId;
/**目标库id*/
@Excel(name = "目标库id", width = 15)
private String targetId;
/**开始时间*/
@Excel(name = "开始时间", width = 20, format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
private Date startTime;
/**结束时间*/
@Excel(name = "结束时间", width = 20, format = "yyyy-MM-dd HH:mm:ss")
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
private Date endTime;
}

View File

@ -0,0 +1,14 @@
package org.jeecg.modules.base.mapper;
import org.jeecg.modules.base.entity.StasSyncLog;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @Description: 同步日志信息
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
public interface StasSyncLogMapper extends BaseMapper<StasSyncLog> {
}

View File

@ -0,0 +1,14 @@
package org.jeecg.modules.base.mapper;
import org.jeecg.modules.base.entity.StasSyncNum;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @Description: 同步数量
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
public interface StasSyncNumMapper extends BaseMapper<StasSyncNum> {
}

View File

@ -0,0 +1,14 @@
package org.jeecg.modules.base.mapper;
import org.jeecg.modules.base.entity.StasSyncRecord;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
/**
* @Description: 同步记录表
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
public interface StasSyncRecordMapper extends BaseMapper<StasSyncRecord> {
}

View File

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.StasSyncLogMapper">
</mapper>

View File

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.StasSyncNumMapper">
</mapper>

View File

@ -0,0 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.StasSyncRecordMapper">
</mapper>

View File

@ -0,0 +1,57 @@
package org.jeecg.syncLog.controller;
import jakarta.servlet.http.HttpServletRequest;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.modules.base.entity.StasSyncLog;
import org.jeecg.syncLog.service.IStasSyncLogService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.system.base.controller.JeecgController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
/**
* @Description: 同步日志信息
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Tag(name="同步日志信息")
@RestController
@RequestMapping("/stasSyncLog")
@Slf4j
public class StasSyncLogController extends JeecgController<StasSyncLog, IStasSyncLogService> {
@Autowired
private IStasSyncLogService stasSyncLogService;
/**
* 分页列表查询
*
* @param stasSyncLog
* @param pageNo
* @param pageSize
* @param req
* @return
*/
@Operation(summary = "同步日志信息-分页列表查询")
@GetMapping(value = "/list")
public Result<IPage<StasSyncLog>> queryPageList(StasSyncLog stasSyncLog,
@RequestParam(name="pageNo", defaultValue="1") Integer pageNo,
@RequestParam(name="pageSize", defaultValue="10") Integer pageSize,
HttpServletRequest req) {
QueryWrapper<StasSyncLog> queryWrapper = QueryGenerator.initQueryWrapper(stasSyncLog, req.getParameterMap());
queryWrapper.orderByAsc("start_time");
Page<StasSyncLog> page = new Page<StasSyncLog>(pageNo, pageSize);
IPage<StasSyncLog> pageList = stasSyncLogService.page(page, queryWrapper);
return Result.OK(pageList);
}
}

View File

@ -0,0 +1,14 @@
package org.jeecg.syncLog.service;
import org.jeecg.modules.base.entity.StasSyncLog;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* @Description: 同步日志信息
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
public interface IStasSyncLogService extends IService<StasSyncLog> {
}

View File

@ -0,0 +1,21 @@
package org.jeecg.syncLog.service.impl;
import org.jeecg.modules.base.entity.StasSyncLog;
import org.jeecg.modules.base.mapper.StasSyncLogMapper;
import org.jeecg.syncLog.service.IStasSyncLogService;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
/**
* @Description: 同步日志信息
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Service
public class StasSyncLogServiceImpl extends ServiceImpl<StasSyncLogMapper, StasSyncLog> implements IStasSyncLogService {
}

View File

@ -0,0 +1,84 @@
package org.jeecg.syncNum.controller;
import jakarta.servlet.http.HttpServletRequest;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.modules.base.entity.StasSyncNum;
import org.jeecg.syncNum.service.IStasSyncNumService;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.system.base.controller.JeecgController;
import org.jeecg.syncNum.vo.PieChartVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @Description: 同步数量
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Tag(name="同步数量")
@RestController
@RequestMapping("/stasSyncNum")
@Slf4j
public class StasSyncNumController extends JeecgController<StasSyncNum, IStasSyncNumService> {
@Autowired
private IStasSyncNumService stasSyncNumService;
/**
* 分页列表查询
*
* @param stasSyncNum
* @param req
* @return
*/
//@AutoLog(value = "同步数量-分页列表查询")
@Operation(summary = "同步数量-分页列表查询")
@GetMapping(value = "/list")
public Result<List<PieChartVO>> queryPageList(StasSyncNum stasSyncNum,
HttpServletRequest req) {
// 构建查询条件
QueryWrapper<StasSyncNum> queryWrapper = QueryGenerator.initQueryWrapper(stasSyncNum, req.getParameterMap());
// 查询数据并按表名分组统计
List<StasSyncNum> records = stasSyncNumService.list(queryWrapper);
// 按表名分组统计总同步量
Map<String, Integer> tableSyncMap = records.stream()
.collect(Collectors.groupingBy(
StasSyncNum::getTableName,
Collectors.summingInt(StasSyncNum::getSyncNum)
));
// 计算总同步量用于计算百分比
int total = tableSyncMap.values().stream().mapToInt(Integer::intValue).sum();
// 转换为饼图数据
List<PieChartVO> pieData = tableSyncMap.entrySet().stream()
.map(entry -> {
PieChartVO vo = new PieChartVO();
vo.setName(entry.getKey());
vo.setValue(entry.getValue());
if (total > 0) {
double percent = (entry.getValue() * 100.0) / total;
vo.setPercent(String.format("%.2f%%", percent));
}
return vo;
})
.collect(Collectors.toList());
return Result.OK(pieData);
}
}

View File

@ -0,0 +1,14 @@
package org.jeecg.syncNum.service;
import org.jeecg.modules.base.entity.StasSyncNum;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* @Description: 同步数量
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
public interface IStasSyncNumService extends IService<StasSyncNum> {
}

View File

@ -0,0 +1,19 @@
package org.jeecg.syncNum.service.impl;
import org.jeecg.modules.base.entity.StasSyncNum;
import org.jeecg.modules.base.mapper.StasSyncNumMapper;
import org.jeecg.syncNum.service.IStasSyncNumService;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
/**
* @Description: 同步数量
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Service
public class StasSyncNumServiceImpl extends ServiceImpl<StasSyncNumMapper, StasSyncNum> implements IStasSyncNumService {
}

View File

@ -0,0 +1,13 @@
package org.jeecg.syncNum.vo;
import lombok.Data;
@Data
public class PieChartVO {
// 表名饼图的每块名称
private String name;
// 同步数量饼图的每块值
private Integer value;
// 可选百分比
private String percent;
}

View File

@ -0,0 +1,113 @@
package org.jeecg.syncRecord.controller;
import jakarta.servlet.http.HttpServletRequest;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.dataSource.service.IStasDataSourceService;
import org.jeecg.modules.base.entity.StasDataSource;
import org.jeecg.modules.base.entity.StasSyncRecord;
import org.jeecg.modules.base.entity.StasTaskConfig;
import org.jeecg.syncRecord.service.IStasSyncRecordService;
import org.jeecg.syncRecord.vo.SyncRecordVO;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.system.base.controller.JeecgController;
import org.jeecg.taskConfig.service.IStasTaskConfigService;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @Description: 同步记录表
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Tag(name="同步记录表")
@RestController
@RequestMapping("/stasSyncRecord")
@Slf4j
public class StasSyncRecordController extends JeecgController<StasSyncRecord, IStasSyncRecordService> {
@Autowired
private IStasSyncRecordService stasSyncRecordService;
@Autowired
private IStasDataSourceService stasDataSourceService;
@Autowired
private IStasTaskConfigService stasTaskConfigService;
/**
* 分页列表查询
*
* @param stasSyncRecord
* @param pageNo
* @param pageSize
* @param req
* @return
*/
//@AutoLog(value = "同步记录表-分页列表查询")
@Operation(summary = "同步记录表-分页列表查询")
@GetMapping(value = "/list")
public Result<IPage<SyncRecordVO>> queryPageList(StasSyncRecord stasSyncRecord,
@RequestParam(name="pageNo", defaultValue="1") Integer pageNo,
@RequestParam(name="pageSize", defaultValue="10") Integer pageSize,
HttpServletRequest req) {
stasSyncRecord.setStartTime(null);
stasSyncRecord.setEndTime(null);
QueryWrapper<StasSyncRecord> queryWrapper = QueryGenerator.initQueryWrapper(stasSyncRecord, req.getParameterMap());
// 处理开始时间区间查询
String beginStartTime = req.getParameter("startTime");
String endStartTime = req.getParameter("endTime");
if (StringUtils.isNotBlank(beginStartTime)) {
queryWrapper.ge("start_time", beginStartTime);
}
if (StringUtils.isNotBlank(endStartTime)) {
queryWrapper.le("start_time", endStartTime);
}
queryWrapper.orderByAsc("start_time");
Page<StasSyncRecord> page = new Page<>(pageNo, pageSize);
IPage<StasSyncRecord> pageList = stasSyncRecordService.page(page, queryWrapper);
List<StasSyncRecord> records = pageList.getRecords();
// 准备数据源和任务名称映射
Map<String, String> dataSourceMap = stasDataSourceService.list().stream()
.collect(Collectors.toMap(StasDataSource::getId, StasDataSource::getInstanceName));
Map<String, String> taskMap = stasTaskConfigService.list().stream()
.collect(Collectors.toMap(StasTaskConfig::getId, StasTaskConfig::getTaskName));
// 转换为VO列表
List<SyncRecordVO> syncRecordVOS = records.stream().map(record -> {
SyncRecordVO vo = new SyncRecordVO();
vo.setId(record.getId());
vo.setTaskName(taskMap.get(record.getTaskId()));
vo.setSourceName(dataSourceMap.get(record.getSourceId()));
vo.setTargetName(dataSourceMap.get(record.getTargetId()));
vo.setStartTime(record.getStartTime());
vo.setEndTime(record.getEndTime());
return vo;
}).collect(Collectors.toList());
// 创建新的分页对象
Page<SyncRecordVO> resultPage = new Page<>();
BeanUtils.copyProperties(pageList, resultPage, "records");
resultPage.setRecords(syncRecordVOS);
return Result.OK(resultPage);
}
}

View File

@ -0,0 +1,14 @@
package org.jeecg.syncRecord.service;
import org.jeecg.modules.base.entity.StasSyncRecord;
import com.baomidou.mybatisplus.extension.service.IService;
/**
* @Description: 同步记录表
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
public interface IStasSyncRecordService extends IService<StasSyncRecord> {
}

View File

@ -0,0 +1,19 @@
package org.jeecg.syncRecord.service.impl;
import org.jeecg.modules.base.entity.StasSyncRecord;
import org.jeecg.modules.base.mapper.StasSyncRecordMapper;
import org.jeecg.syncRecord.service.IStasSyncRecordService;
import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
/**
* @Description: 同步记录表
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Service
public class StasSyncRecordServiceImpl extends ServiceImpl<StasSyncRecordMapper, StasSyncRecord> implements IStasSyncRecordService {
}

View File

@ -0,0 +1,42 @@
package org.jeecg.syncRecord.vo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import org.jeecgframework.poi.excel.annotation.Excel;
import org.springframework.format.annotation.DateTimeFormat;
import java.io.Serializable;
import java.util.Date;
/**
* @Description: 同步记录表
* @Author: jeecg-boot
* @Date: 2025-10-14
* @Version: V1.0
*/
@Data
public class SyncRecordVO implements Serializable {
private static final long serialVersionUID = 1L;
/**任务名称*/
private String id;
/**任务名称*/
private String taskName;
/**源库名称*/
private String sourceName;
/**目标库名称*/
private String targetName;
/**开始时间*/
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
private Date startTime;
/**结束时间*/
@JsonFormat(timezone = "GMT+8",pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern="yyyy-MM-dd HH:mm:ss")
private Date endTime;
}

View File

@ -7,12 +7,8 @@ import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.enums.SourceDataTypeEnum;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.util.DBUtil;
import org.jeecg.modules.base.entity.StasDataSource;
import org.jeecg.modules.base.entity.StasSyncStrategy;
import org.jeecg.modules.base.entity.StasTaskConfig;
import org.jeecg.modules.base.mapper.StasDataSourceMapper;
import org.jeecg.modules.base.mapper.StasSyncStrategyMapper;
import org.jeecg.modules.base.mapper.StasTaskConfigMapper;
import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.mapper.*;
import org.jeecg.vo.DateRangeVO;
import org.jeecg.vo.IdRangeVO;
import org.quartz.Job;
@ -37,6 +33,12 @@ public class SyncDataJob implements Job {
private StasDataSourceMapper stasDataSourceMapper;
@Resource
private StasSyncStrategyMapper stasSyncStrategyMapper;
@Resource
private StasSyncRecordMapper stasSyncRecordMapper;
@Resource
private StasSyncLogMapper stasSyncLogMapper;
@Resource
private StasSyncNumMapper stasSyncNumMapper;
private String parameter;
@ -61,6 +63,14 @@ public class SyncDataJob implements Job {
StasDataSource sourceInfo = stasDataSourceMapper.selectById(stasTaskConfig.getSourceId());
StasDataSource targetInfo = stasDataSourceMapper.selectById(stasTaskConfig.getTargetId());
StasSyncRecord stasSyncRecord = new StasSyncRecord();
stasSyncRecord.setTaskId(taskId);
stasSyncRecord.setSourceId(stasTaskConfig.getSourceId());
stasSyncRecord.setTargetId(stasTaskConfig.getTargetId());
stasSyncRecord.setStartTime(new Date());
stasSyncRecordMapper.insert(stasSyncRecord);
String recordId = stasSyncRecord.getId();
String sourceUrl = DBUtil.getUrl(sourceInfo.getIpAddress(), sourceInfo.getPort(), sourceInfo.getServeId());
String targetUrl;
if(SourceDataTypeEnum.ORACLE.getKey().equals(targetInfo.getType())){
@ -79,27 +89,29 @@ public class SyncDataJob implements Job {
.selectList(new LambdaQueryWrapper<StasSyncStrategy>().eq(StasSyncStrategy::getTaskId, taskId));
for (StasSyncStrategy stasSyncStrategy : stasSyncStrategies) {
log.info("开始同步表: {} (依据字段: {})", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName());
saveSyncLog(recordId,String.format("开始同步表: %s (依据字段: %s)", stasSyncStrategy.getTableName(), stasSyncStrategy.getColumnName()));
long startTime = System.currentTimeMillis();
try {
if (isDateColumn(sourceConn, stasSyncStrategy, sourceInfo.getType())) {
syncByDateRange(sourceConn, targetConn, stasSyncStrategy,
stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType());
stasTaskConfig.getSyncDay(), sourceInfo.getType(), targetInfo.getType(), recordId);
} else {
syncByIdRange(sourceConn, targetConn, stasSyncStrategy,
stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType());
stasTaskConfig.getSyncCount(), sourceInfo.getType(), targetInfo.getType(), recordId);
}
} catch (SQLException | ParseException e) {
log.error("同步表 {} 时出错: {}", stasSyncStrategy.getTableName(), e.getMessage());
saveSyncLog(recordId,String.format("同步表 %s 时出错: %s", stasSyncStrategy.getTableName(), e.getMessage()));
targetConn.rollback();
throw new JeecgBootException(e.getMessage());
}
long endTime = System.currentTimeMillis();
log.info("表 {} 同步耗时: {} 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime));
saveSyncLog(recordId,String.format("表 %s 同步耗时: %s 毫秒", stasSyncStrategy.getTableName(), (endTime - startTime)));
}
}
stasSyncRecord.setEndTime(new Date());
stasSyncRecordMapper.updateById(stasSyncRecord);
}
/**
@ -136,10 +148,9 @@ public class SyncDataJob implements Job {
*/
public void syncByDateRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType) throws SQLException, ParseException {
Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException, ParseException {
// 获取最小和最大日期
DateRangeVO dateRange = getDateRange(sourceConn, stasSyncStrategy, sourceDbType);
log.info("日期范围: {} 至 {}", dateRange.getMinDate(), dateRange.getMaxDate());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@ -164,18 +175,20 @@ public class SyncDataJob implements Job {
"' AND TIMESTAMP '" + sdf.format(currentEnd) + "'";
}
log.info("同步日期范围: {} 至 {}", sdf.format(currentStart), sdf.format(currentEnd));
saveSyncLog(recordId,String.format("%s表同步日期范围: %s 至 %s", stasSyncStrategy.getTableName(), sdf.format(currentStart), sdf.format(currentEnd)));
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType);
log.info("已同步 {} 行数据", rowsSynced);
saveSyncLog(recordId,String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
// 更新同步位置
if (currentEnd != null) {
stasSyncStrategy.setSyncOrigin(sdf.format(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
log.info("最终同步位置已更新为: {}", sdf.format(currentEnd));
saveSyncLog(recordId, String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), sdf.format(currentEnd)));
}
}
@ -184,10 +197,9 @@ public class SyncDataJob implements Job {
*/
public void syncByIdRange(Connection sourceConn, Connection targetConn,
StasSyncStrategy stasSyncStrategy, Integer syncCount,
Integer sourceDbType, Integer targetDbType) throws SQLException {
Integer sourceDbType, Integer targetDbType, String recordId) throws SQLException {
// 获取最小和最大ID
IdRangeVO idRange = getIdRange(sourceConn, stasSyncStrategy, sourceDbType);
log.info("ID范围: {} 至 {}", idRange.getMinId(), idRange.getMaxId());
// 获取上次同步的位置
String syncOrigin = stasSyncStrategy.getSyncOrigin();
@ -202,18 +214,20 @@ public class SyncDataJob implements Job {
String whereClause = stasSyncStrategy.getColumnName() + " BETWEEN " + currentStart + " AND " + currentEnd;
log.info("同步ID范围: {} 至 {}", currentStart, currentEnd);
saveSyncLog(recordId,String.format("%s表同步ID范围: %s 至 %s", stasSyncStrategy.getTableName(), currentStart, currentEnd));
int rowsSynced = syncDataBatch(sourceConn, targetConn, stasSyncStrategy.getSourceOwner(),
stasSyncStrategy.getTargetOwner(), stasSyncStrategy.getTableName(),
whereClause, sourceDbType, targetDbType);
log.info("已同步 {} 行数据", rowsSynced);
saveSyncLog(recordId,String.format("%s表已同步 %s 行数据", stasSyncStrategy.getTableName(), rowsSynced));
saveSyncNum(recordId, stasSyncStrategy.getTableName(), rowsSynced);
// 更新同步位置
if (currentEnd > 0) {
stasSyncStrategy.setSyncOrigin(String.valueOf(currentEnd));
stasSyncStrategyMapper.updateById(stasSyncStrategy);
log.info("最终同步位置已更新为: {}", currentEnd);
saveSyncLog(recordId,String.format("%s表最终同步位置已更新为: %s", stasSyncStrategy.getTableName(), currentEnd));
}
}
@ -346,10 +360,27 @@ public class SyncDataJob implements Job {
return totalRows;
}
private void saveSyncLog(String recordId, String desc){
StasSyncLog stasSyncLog = new StasSyncLog();
stasSyncLog.setRecordId(recordId);
stasSyncLog.setStartTime(new Date());
stasSyncLog.setDescription(desc);
stasSyncLogMapper.insert(stasSyncLog);
}
private void saveSyncNum(String recordId, String tableName, Integer num){
StasSyncNum stasSyncNum = new StasSyncNum();
stasSyncNum.setRecordId(recordId);
stasSyncNum.setTableName(tableName);
stasSyncNum.setSyncNum(num);
stasSyncNumMapper.insert(stasSyncNum);
}
/**
* 计算指定日期加上指定天数后的日期
*/
private static Date addDays(Date date, int days) {
private Date addDays(Date date, int days) {
long time = date.getTime() + (long)days * 24 * 60 * 60 * 1000;
return new Date(time);
}