Merge remote-tracking branch 'origin/master'

This commit is contained in:
hekaiyu 2026-01-12 18:30:32 +08:00
commit aca2e0983c
24 changed files with 1149 additions and 6 deletions

View File

@ -0,0 +1,38 @@
package org.jeecg.common.constant.enums;
/**
* 关联波形任务状态枚举
*/
public enum AssociatedWaveformTaskEnum {
/**
* 关联未成功
*/
UNSUCCESSFUL(-1),
/**
* 未关联
*/
UNASSOCIATED(0),
/**
* 排队中
*/
IN_LINE(1),
/**
* 执行中
*/
IN_PROCESS(2),
/**
* 执行中
*/
COMPLETE(3);
private Integer value;
AssociatedWaveformTaskEnum(Integer value) {
this.value = value;
}
public Integer getValue(){
return this.value;
}
}

View File

@ -0,0 +1,19 @@
package org.jeecg.common.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import java.util.LinkedHashMap;
import java.util.Map;
@Data
@Component
@ConfigurationProperties(prefix = "armd")
public class ArmdReportProperties {
/**
* armd 报告父路径 服务地址
*/
private String reportBasePath;
}

View File

@ -0,0 +1,28 @@
package org.jeecg.common.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "data-fusion")
public class DataFusionProperties {
/**
* idc srs文件存储路径
*/
private String idcSRSPath;
/**
* ndc srs文件存储路径
*/
private String ndcSRSPath;
/**
* srs文件的上级目录有可能是flexpart.x.ecmwf.l1或flexpart.x.ncep.l1
*/
private String srmParentDir;
/**
* 溯源时间
*/
private Integer traceabilityTime;
}

View File

@ -11,6 +11,8 @@ public class DataSourceSwitcher {
private final static String AUTH = "authSource";
private final static String ORA = "ora";
public static void switchToMaster(){
DynamicDataSourceContextHolder.push(MASTER);
}
@ -19,6 +21,10 @@ public class DataSourceSwitcher {
DynamicDataSourceContextHolder.push(AUTH);
}
public static void switchToOracle(){
DynamicDataSourceContextHolder.push(ORA);
}
public static void clearDataSource(){
DynamicDataSourceContextHolder.clear();
}

View File

@ -0,0 +1,88 @@
package org.jeecg.modules.base.entity;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import java.util.Date;
@Data
@TableName("COPSDB.ORIGIN")
public class Origin {
@TableField("LAT")
private Double lat;
@TableField("LON")
private Double lon;
@TableField("DEPTH")
private Double depth;
@TableField("TIME")
private Double time;
@TableField("ORID")
private Integer orid;
@TableField("EVID")
private Integer evid;
@TableField("JDATE")
private Integer jdate;
@TableField("NASS")
private Integer nass;
@TableField("NDEF")
private Integer ndef;
@TableField("NDP")
private Integer ndp;
@TableField("GRN")
private Integer grn;
@TableField("SRN")
private Integer srn;
@TableField("ETYPE")
private String eType;
@TableField("DEPDP")
private Double depdp;
@TableField("DTYPE")
private String dType;
@TableField("MB")
private Double mb;
@TableField("MBID")
private Integer mbid;
@TableField("MS")
private Double ms;
@TableField("MSID")
private Integer msid;
@TableField("ML")
private Double ml;
@TableField("MLID")
private Integer mlid;
@TableField("ALGORITHM")
private String algorithm;
@TableField("AUTH")
private String auth;
@TableField("COMMID")
private Integer commid;
@TableField("LDDATE")
private Date lddate;
}

View File

@ -0,0 +1,53 @@
package org.jeecg.modules.base.entity.rnauto;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import java.util.Date;
/**
* 关联波形事件数据表
*/
@Data
@TableName("RNAUTO.GARDS_WAVEFORM_EVENT")
public class GardsWaveformEvent {
/**
* 主键
*/
@TableId(value = "SAMPLE_ID",type = IdType.INPUT)
private Integer sampleId;
/**
* idc事件
*/
@TableField(value = "IDC_EVENTS")
private Integer idcEvents;
/**
* ndc事件
*/
@TableField(value = "NDC_EVENTS")
private Integer ndcEvents;
/**
* 说明
*/
@TableField(value = "DESCRIPTION")
private String description;
/**
* -1关联未成功0未关联1排队中2执行中3已关联
*/
@TableField(value = "STATUS")
private Integer status;
/**
* 更新时间
*/
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private Date moddate;
}

View File

@ -0,0 +1,19 @@
package org.jeecg.modules.base.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
public interface GardsAnalysesMapper extends BaseMapper {
/**
* 获取Arr报告地址
* @return
*/
String getArrReportPath(@Param("sampleId") Integer sampleId);
/**
* 获取Rrr报告地址
* @return
*/
String getRrrReportPath(@Param("sampleId") Integer sampleId);
}

View File

@ -0,0 +1,39 @@
package org.jeecg.modules.base.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.base.entity.rnauto.GardsWaveformEvent;
import org.jeecg.modules.base.vo.GardsSampleInfoVO;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Map;
public interface GardsWaveformEventMapper extends BaseMapper<GardsWaveformEvent> {
/**
* 分页查询
* @param page
* @param sampleId
* @param status
* @param sampleType
* @param startTime
* @param endTime
* @return
*/
IPage<Map<String,Object>> page(IPage<Map<String,Object>> page, @Param("sampleId") Integer sampleId,
@Param("status") Integer status,
@Param("sampleType") String sampleType,
@Param("startTime") Date startTime,
@Param("endTime") Date endTime);
/**
* 根据样品id查询样品信息
* @param sampleId
* @return
*/
GardsSampleInfoVO selectSampleInfoBySampleId(@Param("sampleId") Integer sampleId);
}

View File

@ -0,0 +1,12 @@
package org.jeecg.modules.base.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.base.entity.Origin;
import java.util.List;
public interface OriginMapper extends BaseMapper<Origin> {
List<Origin> selectOriginByTime(@Param("startTime") Long startTime,@Param("endTime") Long endTime);
}

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.GardsAnalysesMapper">
<select id="getArrReportPath" resultType="string">
SELECT
ga.REPORT_PAHT
FROM
RNAUTO.GARDS_ANALYSES ga
<where>
<if test="sampleId != null">
ga.SAMPLE_ID = #{sampleId}
</if>
</where>
</select>
<select id="getRrrReportPath" resultType="string">
SELECT
ga.REPORT_PAHT
FROM
RNMAN.GARDS_ANALYSES ga
<where>
<if test="sampleId != null">
ga.SAMPLE_ID = #{sampleId}
</if>
</where>
</select>
</mapper>

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.GardsWaveformEventMapper">
<select id="page" resultType="java.util.Map">
SELECT
gwe.SAMPLE_ID as "sampleId",
gs.STATION_CODE as "stationCode",
gsd.SAMPLE_TYPE as "sampleType",
gsd.ACQUISITION_START as "acqStart",
gsd.ACQUISITION_STOP as "acqEnd",
ga.CATEGORY as "category",
gwe.STATUS as "status",
gwe.IDC_EVENTS as "idcEvents",
gwe.NDC_EVENTS as "ndcEvents",
gwe.DESCRIPTION as "description"
FROM
RNAUTO.GARDS_WAVEFORM_EVENT gwe
INNER JOIN ORIGINAL.GARDS_SAMPLE_DATA gsd ON gwe.SAMPLE_ID = gsd.SAMPLE_ID
INNER JOIN CONFIGURATION.GARDS_STATIONS gs ON gsd.STATION_ID = gs.STATION_ID
INNER JOIN RNAUTO.GARDS_ANALYSES ga ON gwe.SAMPLE_ID = ga.SAMPLE_ID
<where>
<if test="sampleId != null">
gwe.SAMPLE_ID = #{sampleId}
</if>
<if test="status != null">
and gwe.STATUS = #{status}
</if>
<if test="sampleType != null and sampleType !=''">
and gsd.SAMPLE_TYPE = #{sampleType}
</if>
<if test="startTime != null and endTime != null">
and gsd.ACQUISITION_STOP between #{startTime} and #{endTime}
</if>
</where>
order by gwe.STATUS asc
</select>
<select id="selectSampleInfoBySampleId" resultType="org.jeecg.modules.base.vo.GardsSampleInfoVO">
SELECT
gwe.SAMPLE_ID as "sampleId",
gs.STATION_CODE as "stationCode",
gsd.ACQUISITION_STOP as "acqEndTime"
FROM
RNAUTO.GARDS_WAVEFORM_EVENT gwe
INNER JOIN ORIGINAL.GARDS_SAMPLE_DATA gsd ON gwe.SAMPLE_ID = gsd.SAMPLE_ID
INNER JOIN CONFIGURATION.GARDS_STATIONS gs ON gsd.STATION_ID = gs.STATION_ID
INNER JOIN RNAUTO.GARDS_ANALYSES ga ON gwe.SAMPLE_ID = ga.SAMPLE_ID
where gwe.SAMPLE_ID = #{sampleId}
</select>
</mapper>

View File

@ -0,0 +1,16 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.OriginMapper">
<select id="selectOriginByTime" resultType="org.jeecg.modules.base.entity.Origin">
SELECT
t.LAT as "lat",
t.LON as "lon",
t.TIME as "time",
t.ORID as "orid"
FROM
COPSDB.ORIGIN t
where t.TIME between #{startTime} and #{endTime}
</select>
</mapper>

View File

@ -0,0 +1,22 @@
package org.jeecg.modules.base.vo;
import lombok.Data;
import java.util.Date;
@Data
public class GardsSampleInfoVO {
/**
* 样品id
*/
private Integer sampleId;
/**
* 台站编码
*/
private String stationCode;
/**
* 测量结束时间
*/
private Date acqEndTime;
}

View File

@ -84,7 +84,6 @@ public class SourceRebuildTaskController{
@Operation(summary = "启动任务")
@PutMapping("runTask")
public Result<?> runTask(@NotNull(message = "任务ID不能为空") Integer taskId){
// sourceRebuildTaskService.update(sourceRebuildTask);
sourceRebuildTaskService.runTask(taskId);
return Result.OK();
}

View File

@ -0,0 +1,61 @@
package org.jeecg.controller;
import io.swagger.v3.oas.annotations.Operation;
import jakarta.servlet.http.HttpServletResponse;
import jakarta.validation.constraints.NotNull;
import lombok.RequiredArgsConstructor;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.aspect.annotation.AutoLog;
import org.jeecg.common.system.query.PageRequest;
import org.jeecg.service.AssociatedWaveformService;
import org.springframework.format.annotation.DateTimeFormat;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDate;
/**
* 数据融合-关联波形
*/
@Validated
@RestController
@RequestMapping("waveform")
@RequiredArgsConstructor
public class AssociatedWaveformController {
private final AssociatedWaveformService associatedWaveformService;
@AutoLog(value = "分页查询样品关联的波形数据")
@Operation(summary = "分页查询样品关联的波形数据")
@GetMapping("getSampleWaveforms")
public Result<?> getSampleWaveforms(PageRequest pageRequest, Integer sampleId, Integer status, String sampleType,
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate startDate,
@DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate endDate) {
return Result.OK(associatedWaveformService.page(pageRequest,sampleId,status,sampleType,startDate,endDate));
}
@AutoLog(value = "执行关联")
@Operation(summary = "执行关联")
@PutMapping("execAssociated")
public Result<?> execAssociated(@NotNull(message = "样品id不能为空") Integer sampleId) {
associatedWaveformService.execAssociated(sampleId);
return Result.OK();
}
@AutoLog(value = "查看ARR报告")
@Operation(summary = "查看ARR报告")
@GetMapping("viewARR")
public void viewARR(@NotNull(message = "样品id不能为空") Integer sampleId, HttpServletResponse response) {
associatedWaveformService.viewARR(sampleId,response);
}
@AutoLog(value = "查看RRR报告")
@Operation(summary = "查看RRR报告")
@GetMapping("viewRRR")
public void viewRRR(@NotNull(message = "样品id不能为空") Integer sampleId, HttpServletResponse response) {
associatedWaveformService.viewRRR(sampleId,response);
}
}

View File

@ -0,0 +1,45 @@
package org.jeecg.service;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;
import jakarta.servlet.http.HttpServletResponse;
import org.jeecg.common.system.query.PageRequest;
import org.jeecg.modules.base.entity.rnauto.GardsWaveformEvent;
import java.time.LocalDate;
import java.util.Map;
/**
* 数据融合-关联波形
*/
public interface AssociatedWaveformService extends IService<GardsWaveformEvent> {
/**
* 分页查询关联波形数据
* @param pageRequest
* @param sampleId
* @param status
* @param sampleType
* @param startDate
* @param endDate
* @return
*/
IPage<Map<String,Object>> page(PageRequest pageRequest, Integer sampleId, Integer status, String sampleType, LocalDate startDate, LocalDate endDate);
/**
* 样品id不能为空
* @param sampleId
*/
void execAssociated(Integer sampleId);
/**
* 查看自动处理报告
* @param sampleId
*/
void viewARR(Integer sampleId, HttpServletResponse response);
/**
* 查看人工交互分析报告
* @param sampleId
*/
void viewRRR(Integer sampleId, HttpServletResponse response);
}

View File

@ -0,0 +1,189 @@
package org.jeecg.service.impl;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ObjectUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import org.jeecg.common.properties.ArmdReportProperties;
import org.jeecg.common.properties.DataFusionProperties;
import org.jeecg.common.system.query.PageRequest;
import org.jeecg.modules.base.entity.rnauto.GardsWaveformEvent;
import org.jeecg.modules.base.mapper.GardsAnalysesMapper;
import org.jeecg.modules.base.mapper.GardsWaveformEventMapper;
import org.jeecg.modules.base.mapper.OriginMapper;
import org.jeecg.modules.base.vo.GardsSampleInfoVO;
import org.jeecg.service.AssociatedWaveformService;
import org.jeecg.task.waveformtask.AssociatedWaveformTaskExec;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service;
import org.springframework.transaction.TransactionDefinition;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Date;
import java.util.Map;
import java.util.Objects;
/**
* 数据融合-关联波形
*/
@Service
@RequiredArgsConstructor
public class AssociatedWaveformServiceImpl extends ServiceImpl<GardsWaveformEventMapper, GardsWaveformEvent> implements AssociatedWaveformService {
private final ArmdReportProperties armdReportProperties;
private final GardsAnalysesMapper analysesMapper;
private final DataFusionProperties dataFusionProperties;
private final OriginMapper originMapper;
private final DataSourceTransactionManager transactionManager;
private final TransactionDefinition transactionDefinition;
/**
* 分页查询关联波形数据
*
* @param pageRequest
* @param sampleId
* @param status
* @param sampleType
* @param startDate
* @param endDate
* @return
*/
@DS(value = "ora")
@Override
public IPage<Map<String, Object>> page(PageRequest pageRequest, Integer sampleId, Integer status, String sampleType, LocalDate startDate, LocalDate endDate) {
IPage<Map<String, Object>> page = new Page<>(pageRequest.getPageNum(), pageRequest.getPageSize());
Date startTime = null;
Date endTime = null;
if(Objects.nonNull(startDate) && Objects.nonNull(endDate)){
startTime = Date.from(startDate.atTime(0,0,0).atZone(ZoneId.of("Asia/Shanghai")).toInstant());
endTime = Date.from(endDate.atTime(0,0,0).atZone(ZoneId.of("Asia/Shanghai")).toInstant());
}
return this.baseMapper.page(page,sampleId,status,sampleType,startTime,endTime);
}
/**
* 样品id不能为空
*
* @param sampleId
*/
@DS(value = "ora")
@Override
public void execAssociated(Integer sampleId) {
//把消息写到消息队列中
//暂且先把手动关联代码写这里
GardsSampleInfoVO sampleInfo = this.baseMapper.selectSampleInfoBySampleId(sampleId);
AssociatedWaveformTaskExec associatedWaveformTaskExec = new AssociatedWaveformTaskExec();
associatedWaveformTaskExec.init(
transactionManager,
transactionDefinition,
dataFusionProperties,
this.baseMapper,
originMapper,
sampleInfo.getSampleId(),
sampleInfo.getAcqEndTime(),
sampleInfo.getStationCode());
associatedWaveformTaskExec.setName("sampleAssociatedThread_"+sampleId);
associatedWaveformTaskExec.start();
}
/**
* 查看自动处理报告
*
* @param sampleId
*/
@DS(value = "ora")
@Override
public void viewARR(Integer sampleId, HttpServletResponse response) {
response.setContentType("text/plain");
String reportBasePath = armdReportProperties.getReportBasePath();
String arrReportPath = analysesMapper.getArrReportPath(sampleId);
String filePath = reportBasePath + File.separator + arrReportPath +".txt";
InputStream inputStream = null;
ServletOutputStream outputStream = null;
try {
inputStream = new FileInputStream(filePath);
if (Objects.nonNull(inputStream)) {
outputStream = response.getOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;
// 将文件输出流写入到输出流中
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (ObjectUtil.isNotNull(inputStream)) {
inputStream.close();
}
if (ObjectUtil.isNotNull(outputStream)) {
outputStream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
/**
* 查看人工交互分析报告
*
* @param sampleId
*/
@DS(value = "ora")
@Override
public void viewRRR(Integer sampleId,HttpServletResponse response) {
String reportBasePath = armdReportProperties.getReportBasePath();
String rrrReportPath = analysesMapper.getRrrReportPath(sampleId);
String fullPath = reportBasePath + File.separator + rrrReportPath;
String filePath;
if(FileUtil.exist(fullPath) && FileUtil.isDirectory(fullPath)){
response.setContentType("text/html");
String fileName = fullPath.substring(fullPath.lastIndexOf("/")) + ".html";
filePath = fullPath + fileName;
}else {
response.setContentType("text/plain");
filePath = fullPath + ".txt";
}
InputStream inputStream = null;
ServletOutputStream outputStream = null;
try {
inputStream = new FileInputStream(filePath);
if (Objects.nonNull(inputStream)) {
outputStream = response.getOutputStream();
byte[] buffer = new byte[1024];
int bytesRead;
// 将文件输出流写入到输出流中
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
}
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
try {
if (ObjectUtil.isNotNull(inputStream)) {
inputStream.close();
}
if (ObjectUtil.isNotNull(outputStream)) {
outputStream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -22,7 +22,7 @@ import org.jeecg.modules.base.mapper.TransportTaskLogMapper;
import org.jeecg.modules.base.mapper.TransportTaskMapper;
import org.jeecg.modules.base.mapper.WeatherDataMapper;
import org.jeecg.service.TransportTaskService;
import org.jeecg.task.TransportTaskExec;
import org.jeecg.task.flexparttask.TransportTaskExec;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDate;

View File

@ -1,4 +1,4 @@
package org.jeecg.task;
package org.jeecg.task.flexparttask;
import lombok.Data;
import lombok.NoArgsConstructor;

View File

@ -1,4 +1,4 @@
package org.jeecg.task;
package org.jeecg.task.flexparttask;
import jakarta.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;

View File

@ -1,4 +1,4 @@
package org.jeecg.task;
package org.jeecg.task.flexparttask;
import java.util.LinkedList;

View File

@ -1,4 +1,4 @@
package org.jeecg.task;
package org.jeecg.task.flexparttask;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil;

View File

@ -0,0 +1,386 @@
package org.jeecg.task.waveformtask;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.util.Strings;
import org.jeecg.common.constant.enums.*;
import org.jeecg.common.properties.DataFusionProperties;
import org.jeecg.config.datasource.DataSourceSwitcher;
import org.jeecg.modules.base.entity.Origin;
import org.jeecg.modules.base.entity.rnauto.GardsWaveformEvent;
import org.jeecg.modules.base.mapper.GardsWaveformEventMapper;
import org.jeecg.modules.base.mapper.OriginMapper;
import org.jeecg.vo.SRSRecord;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import java.io.*;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.charset.StandardCharsets;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
@Slf4j
public class AssociatedWaveformTaskExec extends Thread{
private DataSourceTransactionManager transactionManager;
private TransactionDefinition transactionDefinition;
private DataFusionProperties dataFusionProperties;
private GardsWaveformEventMapper waveformEventMapper;
private OriginMapper originMapper;
private Integer sampleId;
private Date acqEndTime;
private String stationCode;
private File idcSrsFile = null;
private File ndcSrsFile = null;
private List<String> idcSrsContents;
private List<String> ndcSrsContents;
//idc srs关联到的波形事件信息
private Integer idcEvents;
//ndc srs关联到的波形事件信息
private Integer ndcEvents;
/**
* 初始化
*/
public void init(
DataSourceTransactionManager transactionManager,
TransactionDefinition transactionDefinition,
DataFusionProperties dataFusionProperties,
GardsWaveformEventMapper waveformEventMapper,
OriginMapper originMapper,
Integer sampleId,
Date acqEndTime,
String stationCode){
this.transactionManager = transactionManager;
this.transactionDefinition = transactionDefinition;
this.dataFusionProperties = dataFusionProperties;
this.waveformEventMapper = waveformEventMapper;
this.originMapper = originMapper;
this.sampleId = sampleId;
this.acqEndTime = acqEndTime;
this.stationCode = stationCode;
}
@Override
public void run() {
this.execute();
}
/**
* 执行任务
*/
public void execute() {
StopWatch stopWatch = new StopWatch();
stopWatch.start();
try{
//修改任务状态为执行中
this.updateTaskStatus(this.sampleId, AssociatedWaveformTaskEnum.IN_PROCESS.getValue());
//检查srs文件是否存在
this.checkSRSExist();
//解析SRS文件数据
this.parseSrsFile();
//关联波形数据
this.associationWaveform();
this.execComplete(stopWatch,AssociatedWaveformTaskEnum.COMPLETE.getValue(),null);
}catch (Exception e){
String taskErrorLog = "任务执行失败,原因:"+e.getMessage();
this.execComplete(stopWatch,AssociatedWaveformTaskEnum.UNSUCCESSFUL.getValue(),taskErrorLog);
throw e;
}
}
/**
* 检查IDC和NDC SRS文件是否存在
*/
private void checkSRSExist(){
String acqEndDate = DateUtil.format(this.acqEndTime,"yyyyMMdd");
this.checkIdcSrsExist(acqEndDate);
this.checkNdcSrsExist(acqEndDate);
}
/**
* 解析SRS文件
*/
private void parseSrsFile(){
if(Objects.nonNull(this.idcSrsFile)){
this.idcSrsContents = this.readSrmFromGzStreaming(this.idcSrsFile);
}
if(Objects.nonNull(this.ndcSrsFile)){
this.ndcSrsContents = this.readSrmFromGzStreaming(this.ndcSrsFile);
}
}
/**
* 关联波形数据
*/
private void associationWaveform(){
Integer idcEvents = this.associationSrs(this.idcSrsContents);
Integer ndcEvents = this.associationSrs(this.ndcSrsContents);
DataSourceSwitcher.switchToOracle();
final TransactionStatus transactionStatus = this.transactionManager.getTransaction(this.transactionDefinition);
try {
GardsWaveformEvent waveformEvent = this.waveformEventMapper.selectById(sampleId);
waveformEvent.setIdcEvents(idcEvents);
waveformEvent.setNdcEvents(ndcEvents);
waveformEvent.setDescription(Strings.EMPTY);
waveformEvent.setModdate(new Date());
waveformEvent.setStatus(AssociatedWaveformTaskEnum.COMPLETE.getValue());
this.waveformEventMapper.updateById(waveformEvent);
this.transactionManager.commit(transactionStatus);
}finally {
DataSourceSwitcher.clearDataSource();
}
}
/**
* 关联idc srs文件
*/
private Integer associationSrs(List<String> srsContents){
if (CollUtil.isEmpty(srsContents)) {
return 0;
}
// 台站经度 台站纬度 开始测量日期 小时 结束测量时间 小时 系数 总共模拟时长 小时数 网格大小 台站编码
// 139.08 36.30 20241203 12 20241203 18 0.1300000E+16 336 1 1 0.50 0.50 "JPX38"
String[] firstLine = srsContents.get(0).split("\\s+");
Integer totalHour = Integer.parseInt(firstLine[8]);
Integer hourlyCoefficient = Integer.parseInt(firstLine[9]);
Double gridSize = Double.parseDouble(firstLine[11]);
//根据测量结束时间生成时间范围用于查询范围内的波形事件
LocalDateTime endTime = LocalDateTime.ofInstant(this.acqEndTime.toInstant(), ZoneId.of("Asia/Shanghai"));
endTime = endTime.withMinute(0).withSecond(0);
LocalDateTime startTime = endTime.minusHours(totalHour).minusHours(dataFusionProperties.getTraceabilityTime()*24);
//查询范围内波形事件
List<Origin> origins = this.selectOriginByTime(startTime,endTime);
//处理srs文件中的坐标记录格式化成对象
List<SRSRecord> srsRecords = buildSRSRecord(srsContents, hourlyCoefficient,endTime);
//怎么对srsRecords进行按经纬度分组分组后对每组经纬度只求一个网格边界然后再根据时间做origins查询统计关联数
Map<Map.Entry<Double, Double>, List<SRSRecord>> srsRecordsGroup = srsRecords.stream()
.collect(Collectors.groupingBy(record ->
new AbstractMap.SimpleImmutableEntry<>(record.getLon(), record.getLat())
));
Integer totalEvents = this.queryWaveform(origins, srsRecordsGroup, gridSize);
return totalEvents;
}
/**
* 检查IDC SRS文件是否存在
*/
private void checkIdcSrsExist(String acqEndDate){
//构建srs文件路径
StringBuilder srsFilePath = new StringBuilder();
srsFilePath.append(dataFusionProperties.getIdcSRSPath());
srsFilePath.append(File.separator);
srsFilePath.append(acqEndDate);
srsFilePath.append(File.separator);
srsFilePath.append(dataFusionProperties.getSrmParentDir());
//构建srm.gz文件名称
StringBuilder srmFileName = new StringBuilder();
srmFileName.append(stationCode);
srmFileName.append(".fp.");
srmFileName.append(DateUtil.format(this.acqEndTime,"yyyyMMddHH"));
srmFileName.append(".f9.srm.gz");
String fullFilePath = srsFilePath + File.separator + srmFileName;
File idcSrsFile = new File(fullFilePath);
if (idcSrsFile.exists()){
this.idcSrsFile = idcSrsFile;
}else {
this.updateTaskNotSuccessful("IDC SRS文件不存在");
}
}
/**
* 检查NDC SRS文件是否存在
*/
private void checkNdcSrsExist(String acqEndDate){
//构建srs文件路径
StringBuilder srsFilePath = new StringBuilder();
srsFilePath.append(dataFusionProperties.getNdcSRSPath());
srsFilePath.append(File.separator);
srsFilePath.append(acqEndDate);
srsFilePath.append(File.separator);
srsFilePath.append(dataFusionProperties.getSrmParentDir());
//构建srm.gz文件名称
StringBuilder srmFileName = new StringBuilder();
srmFileName.append(stationCode);
srmFileName.append(".fp.");
srmFileName.append(DateUtil.format(this.acqEndTime,"yyyyMMddHH"));
srmFileName.append(".f9.srm.gz");
String fullFilePath = srsFilePath + File.separator + srmFileName;
File ndcSrsFile = new File(fullFilePath);
if (ndcSrsFile.exists()){
this.ndcSrsFile = idcSrsFile;
}else {
this.updateTaskNotSuccessful("NDC SRS文件不存在");
}
}
/**
* 处理srs文件中的坐标记录格式化成对象
* @param contents
* @param hourlyCoefficient
* @param acqEndTime 测量结束时间
* @return
*/
private List<SRSRecord> buildSRSRecord(List<String> contents,Integer hourlyCoefficient,LocalDateTime acqEndTime){
List<SRSRecord> list = contents.parallelStream().skip(1).map(content->{
String[] line = content.split("\\s+");
SRSRecord srsRecord = new SRSRecord();
srsRecord.setLat(Double.parseDouble(line[1]));
srsRecord.setLon(Double.parseDouble(line[2]));
srsRecord.setHour(Integer.parseInt(line[3])*hourlyCoefficient);
srsRecord.setConc(line[4]);
LocalDateTime endTime = acqEndTime.minusHours(srsRecord.getHour());
LocalDateTime startTime = acqEndTime.minusHours(srsRecord.getHour()+dataFusionProperties.getTraceabilityTime()*24);
long startSecond = startTime.atZone(ZoneId.of("Asia/Shanghai")).toEpochSecond();
long endSecond = endTime.atZone(ZoneId.of("Asia/Shanghai")).toEpochSecond();
srsRecord.setStartSecond(startSecond);
srsRecord.setEndSecond(endSecond);
return srsRecord;
}).toList();
return list;
}
/**
* 根据经纬度及时间查询范围内的波形数据并进行计数最后的计数就是关联的波形结果
* @param srsRecordsGroup
*/
private Integer queryWaveform(List<Origin> origins,Map<Map.Entry<Double, Double>, List<SRSRecord>> srsRecordsGroup,Double gridSize){
Set<Integer> idcOrids = ConcurrentHashMap.newKeySet();
if (CollUtil.isNotEmpty(srsRecordsGroup) && CollUtil.isNotEmpty(origins)){
srsRecordsGroup.forEach((key,srsRecords)->{
double leftLon = Math.floor(key.getKey() / gridSize) * gridSize;
double bottomLat = Math.floor(key.getValue() / gridSize) * gridSize;
double rightLon = leftLon + gridSize;
double topLat = bottomLat + gridSize;
//统计srs中每条记录内关联的波形数据
srsRecords.parallelStream().forEach(record->{
Set<Integer> orids = origins.stream()
.filter(origin ->origin.getLon() >= leftLon && origin.getLon() < rightLon &&
origin.getLat() >= bottomLat && origin.getLat() < topLat &&
origin.getTime() >= record.getStartSecond() && origin.getTime() <= record.getEndSecond())
.map(Origin::getOrid).collect(Collectors.toSet());
idcOrids.addAll(orids);
});
});
return idcOrids.size();
}
return idcOrids.size();
}
/**
* 修改任务状态
* @param sampleId
* @param status
*/
private void updateTaskStatus(Integer sampleId, Integer status) {
DataSourceSwitcher.switchToOracle();
final TransactionStatus transactionStatus = this.transactionManager.getTransaction(this.transactionDefinition);
try {
GardsWaveformEvent waveformEvent = this.waveformEventMapper.selectById(sampleId);
waveformEvent.setStatus(status);
waveformEvent.setModdate(new Date());
this.waveformEventMapper.updateById(waveformEvent);
this.transactionManager.commit(transactionStatus);
}finally {
DataSourceSwitcher.clearDataSource();
}
}
/**
* 修改任务关联未成功
* @param description
*/
private void updateTaskNotSuccessful(String description) {
DataSourceSwitcher.switchToOracle();
final TransactionStatus transactionStatus = this.transactionManager.getTransaction(this.transactionDefinition);
try {
GardsWaveformEvent waveformEvent = this.waveformEventMapper.selectById(this.sampleId);
waveformEvent.setStatus(AssociatedWaveformTaskEnum.UNSUCCESSFUL.getValue());
waveformEvent.setDescription(waveformEvent.getDescription()+"\n"+description);
waveformEvent.setModdate(new Date());
this.waveformEventMapper.updateById(waveformEvent);
this.transactionManager.commit(transactionStatus);
}finally {
DataSourceSwitcher.clearDataSource();
}
}
/**
* 获取时间范围内的波形数据
* @param startTime
* @param endTime
* @return
*/
private List<Origin> selectOriginByTime(LocalDateTime startTime,LocalDateTime endTime){
DataSourceSwitcher.switchToOracle();
try {
long startSecond = startTime.atZone(ZoneId.of("Asia/Shanghai")).toEpochSecond();
long endSecond = endTime.atZone(ZoneId.of("Asia/Shanghai")).toEpochSecond();
return originMapper.selectOriginByTime(startSecond,endSecond);
}finally {
DataSourceSwitcher.clearDataSource();
}
}
/**
* 读取srm文件
* @param gzFile
* @return
*/
public List<String> readSrmFromGzStreaming(File gzFile){
try {
List<String> contents = new ArrayList<>();
try (FileInputStream fis = new FileInputStream(gzFile);
GZIPInputStream gis = new GZIPInputStream(fis);
BufferedReader reader = new BufferedReader(new InputStreamReader(gis, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
contents.add(line);
}
}
return contents;
}catch (Exception e){
log.error("SRS文件解析异常路径为{},原因为:{}",gzFile.getAbsolutePath(),e.getMessage());
return List.of();
}
}
/**
* 任务执行完成
* @param stopWatch
* @param taskStatus
* @param taskErrorLog
*/
private void execComplete(StopWatch stopWatch,Integer taskStatus,String taskErrorLog){
//添加任务耗时
stopWatch.stop();
long seconds = stopWatch.getTime(TimeUnit.SECONDS);
double min = seconds/60D;
BigDecimal bgMin = new BigDecimal(min);
BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP);
if (AssociatedWaveformTaskEnum.COMPLETE.getValue().equals(taskStatus)){
log.info("任务执行完成,耗时{}分钟",result);
}else if (AssociatedWaveformTaskEnum.UNSUCCESSFUL.getValue().equals(taskStatus)){
log.error(taskErrorLog);
}
this.updateTaskStatus(this.sampleId,taskStatus);
}
}

View File

@ -0,0 +1,42 @@
package org.jeecg.vo;
import lombok.Data;
import java.time.LocalDateTime;
/**
* srs记录数据
*/
@Data
public class SRSRecord {
/**
* 经度
*/
private Double lon;
/**
* 纬度
*/
private Double lat;
/**
* 处于的小时
*/
private Integer hour;
/**
* 根据样品时间-hour得到的开始时间
*/
private Long startSecond;
/**
* 测量结束时间就是模拟结束时间
*/
private Long endSecond;
/**
* 浓度值单位暂未知
*/
private String conc;
}