fix:1.完成输运模拟正演和反演子模块功能

This commit is contained in:
panbaolin 2026-06-09 14:25:34 +08:00
parent bbad39f6c5
commit e963667e0b
31 changed files with 689 additions and 511 deletions

View File

@ -342,9 +342,10 @@
<version>${netcdfAll.version}</version> <version>${netcdfAll.version}</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.jcraft</groupId> <groupId>com.github.mwiede</groupId>
<artifactId>jsch</artifactId> <artifactId>jsch</artifactId>
<version>0.1.54</version> <version>2.28.2</version>
<scope>compile</scope>
</dependency> </dependency>
<!-- ssh2远程连接 --> <!-- ssh2远程连接 -->
<dependency> <dependency>

View File

@ -318,4 +318,27 @@ public interface CommonConstant {
String TRANSPORT_TIMING_ANALYSIS = "transport:timing_analysis:"; String TRANSPORT_TIMING_ANALYSIS = "transport:timing_analysis:";
String DATA_SYNC_FREQUENCY = "data_sync:frequency"; String DATA_SYNC_FREQUENCY = "data_sync:frequency";
/**
* 各服务器任务运行状态
*/
String HOST_TASK_STATE = "host_task_state";
/**
* 输运任务状态前缀
*/
String TRAN_TASK_STATE_PRE = "tran_task_";
/**
* 气象任务状态前缀
*/
String WEATHER_TASK_STATE_PRE = "weather_task_";
/**
* 源项重建任务状态前缀
*/
String BUILD_TASK_STATE_PRE = "build_task_";
} }

View File

@ -12,9 +12,4 @@ public interface RocketMQTopConstant {
String SAMPLE_RESULT_TOPIC = "topic_abnormal_sample_list"; String SAMPLE_RESULT_TOPIC = "topic_abnormal_sample_list";
String ASSOCIATED_WAVEFORM_SAMPLE_TOPIC = "topic_associated_waveform_list"; String ASSOCIATED_WAVEFORM_SAMPLE_TOPIC = "topic_associated_waveform_list";
/**
* 各服务器输运模拟任务运行状态
*/
String HOST_TASK_STATE = "host_task_state";
} }

View File

@ -24,7 +24,12 @@ public enum TransportTaskStatusEnum {
/** /**
* 已完成 * 已完成
*/ */
COMPLETED(3); COMPLETED(3),
/**
* 检查未通过
*/
INSPECTION_FAILED(4);
private Integer value; private Integer value;

View File

@ -0,0 +1,27 @@
package org.jeecg.common.constant.enums;
/**
* 输运模拟任务置顶说明枚举
*/
public enum TransportTaskTopEnum {
/**
* 未置顶
*/
NOT_TOP(0),
/**
* 置顶
*/
TOP(1);
private Integer value;
TransportTaskTopEnum(Integer value) {
this.value = value;
}
public Integer getValue(){
return this.value;
}
}

View File

@ -11,6 +11,7 @@ import org.jeecg.common.util.oConvertUtils;
import org.jeecg.config.security.utils.SecureUtil; import org.jeecg.config.security.utils.SecureUtil;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.time.LocalDateTime;
import java.util.Date; import java.util.Date;
import java.util.Properties; import java.util.Properties;
@ -57,7 +58,7 @@ public class MybatisInterceptor implements Interceptor {
} }
} }
// 注入创建时间 // 注入创建时间
if ("createTime".equals(field.getName())) { if ("createTime".equals(field.getName()) && Date.class.equals(field.getType())) {
field.setAccessible(true); field.setAccessible(true);
Object localCreateDate = field.get(parameter); Object localCreateDate = field.get(parameter);
field.setAccessible(false); field.setAccessible(false);
@ -108,7 +109,7 @@ public class MybatisInterceptor implements Interceptor {
field.setAccessible(false); field.setAccessible(false);
} }
} }
if ("updateTime".equals(field.getName())) { if ("updateTime".equals(field.getName()) && Date.class.equals(field.getType())) {
field.setAccessible(true); field.setAccessible(true);
field.set(parameter, new Date()); field.set(parameter, new Date());
field.setAccessible(false); field.setAccessible(false);

View File

@ -53,14 +53,7 @@ public class TransportTask{
private Integer taskType; private Integer taskType;
/** /**
* 任务进度 * 任务状态-1执行失败0未开始1等待中2运行中3已完成4任务检查未通过
*/
@Null(message = "任务进度必须为空",groups = {InsertGroup.class, UpdateGroup.class})
@TableField(value = "task_progress")
private Integer taskPprogress;
/**
* 任务状态-1执行失败0未开始1等待中2运行中3已完成
*/ */
@Null(message = "任务状态必须为空",groups = {InsertGroup.class, UpdateGroup.class}) @Null(message = "任务状态必须为空",groups = {InsertGroup.class, UpdateGroup.class})
@TableField(value = "task_status") @TableField(value = "task_status")
@ -80,25 +73,13 @@ public class TransportTask{
@TableField(value = "time_consuming") @TableField(value = "time_consuming")
private Double timeConsuming; private Double timeConsuming;
/**
* 创建人
*/
@TableField(value = "create_by")
private String createBy;
/** /**
* 创建时间 * 创建时间
*/ */
@TableField(value = "create_time") @TableField(value = "create_time")
@JsonFormat(timezone = "Asia/Shanghai", pattern = "yyyy-MM-dd HH:mm:ss") @JsonFormat(timezone = "Asia/Shanghai", pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date createTime; private LocalDateTime createTime;
/**
* 更新人
*/
@TableField(value = "update_by")
private String updateBy;
/** /**
* 更新时间 * 更新时间
@ -106,7 +87,7 @@ public class TransportTask{
@TableField(value = "update_time") @TableField(value = "update_time")
@JsonFormat(timezone = "Asia/Shanghai", pattern = "yyyy-MM-dd HH:mm:ss") @JsonFormat(timezone = "Asia/Shanghai", pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private Date updateTime; private LocalDateTime updateTime;
/** /**
* 模拟开始时间 * 模拟开始时间
@ -153,6 +134,12 @@ public class TransportTask{
@TableField(value = "release_data_source") @TableField(value = "release_data_source")
private Integer releaseDataSource; private Integer releaseDataSource;
/**
* 任务置顶
*/
@TableField(value = "top_task")
private Integer topTask;
/** /**
* 反演子表信息 * 反演子表信息
*/ */

View File

@ -1,7 +1,27 @@
package org.jeecg.modules.base.mapper; package org.jeecg.modules.base.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.base.entity.TransportTask; import org.jeecg.modules.base.entity.TransportTask;
import java.time.LocalDate;
import java.time.LocalDateTime;
public interface TransportTaskMapper extends BaseMapper<TransportTask> { public interface TransportTaskMapper extends BaseMapper<TransportTask> {
/**
* 分页查询任务数据
* @param page
* @param taskName
* @param taskMode
* @param taskStatus
* @param metType
* @param startDateTime
* @param endDateTime
* @return
*/
IPage<TransportTask> page(@Param("page") Page<TransportTask> page,@Param("taskName") String taskName,@Param("taskMode") Integer taskMode,
@Param("taskStatus") Integer taskStatus,@Param("metType") Integer metType,@Param("startDateTime") LocalDateTime startDateTime,@Param("endDateTime") LocalDateTime endDateTime);
} }

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.base.mapper.TransportTaskMapper">
<select id="page" resultType="org.jeecg.modules.base.entity.TransportTask">
select
t.id,
t.task_name as taskName,
t.task_mode as taskMode,
t.task_type as taskType,
t.task_status as taskStatus,
t.use_met_type as useMetType,
t.time_consuming as timeConsuming,
t.create_by as createBy,
t.create_time as createTime,
t.top_task as topTask
from stas_transport_task t
<where>
<if test="taskMode !=null">
t.task_mode = #{taskMode}
</if>
<if test="taskStatus !=null">
and t.task_status = #{taskStatus}
</if>
<if test="metType !=null">
and t.use_met_type = #{metType}
</if>
<if test="startDateTime != null and endDateTime != null">
and t.create_time between #{startDateTime} and #{endDateTime}
</if>
</where>
ORDER BY
CASE t.task_status
WHEN 1 THEN 0 <!-- 等待中 -->
WHEN -1 THEN 1 <!-- 执行失败 -->
WHEN 2 THEN 2
WHEN 0 THEN 3
ELSE 4 <!-- 其他状态 -->
END ASC,t.update_time desc
</select>
</mapper>

View File

@ -2,17 +2,22 @@ package org.jeecg.transport.consumer;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageExt;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.RocketMQTopConstant; import org.jeecg.common.constant.RocketMQTopConstant;
import org.jeecg.common.constant.enums.TransportTaskStatusEnum;
import org.jeecg.common.constant.enums.TransportTaskTopEnum;
import org.jeecg.common.properties.DataFusionProperties; import org.jeecg.common.properties.DataFusionProperties;
import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.SystemStorageProperties;
import org.jeecg.common.properties.TransportSimulationProperties; import org.jeecg.common.properties.TransportSimulationProperties;
import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.modules.base.dto.TransportTaskDTO;
import org.jeecg.modules.base.entity.TransportTask;
import org.jeecg.modules.base.mapper.*; import org.jeecg.modules.base.mapper.*;
import org.jeecg.properties.ServerProperties; import org.jeecg.properties.ServerProperties;
import org.jeecg.transport.consumer.china.AbstractTaskMsgHandler; import org.jeecg.transport.consumer.china.AbstractTaskMsgHandler;
@ -49,16 +54,47 @@ public class TranTaskMessageConsumerHandler{
private final TransportTaskForwardReleaseMapper taskForwardReleaseMapper; private final TransportTaskForwardReleaseMapper taskForwardReleaseMapper;
private final StationDataService stationDataService; private final StationDataService stationDataService;
private final StationsModValService stationsModValService; private final StationsModValService stationsModValService;
@Value("${rocketmq.consumer.group}")
private String consumerGroup;
@Value("${rocketmq.name-server}")
private String nameServerAddress;
public void startConsumerThread(){ public void startConsumerThread(){
MessageConsumerThread messageConsumerThread = new MessageConsumerThread(); MessageConsumerThread messageConsumerThread = new MessageConsumerThread();
messageConsumerThread.setName("输运模拟任务处理线程"); messageConsumerThread.setName("transport-task-thread");
messageConsumerThread.start(); messageConsumerThread.start();
log.info("启动输运模拟任务处理线程----------------------"); log.info("启动输运模拟任务消费线程----------------------");
}
/**
* 处理输运模拟任务
*/
private void handlerTask(TransportTask message){
AbstractTaskMsgHandler taskMsgHandler = new Server11TaskHandler();
taskMsgHandler.init(transportTaskMapper,taskBackwardChildMapper,weatherDataMapper,
simulationProperties,systemStorageProperties,dataFusionProperties,
serverProperties,taskForwardSpeciesMapper,taskForwardChildMapper,
transportTaskService,taskForwardReleaseMapper,stationDataService,
stationsModValService,redisUtil);
taskMsgHandler.handler(message);
}
/**
* 获取等待中的任务
* @return
*/
private TransportTask getTopMessage(){
LambdaQueryWrapper<TransportTask> topQueryWrapper = new LambdaQueryWrapper<>();
topQueryWrapper.eq(TransportTask::getTaskStatus, TransportTaskStatusEnum.WAITING.getValue());
topQueryWrapper.orderByDesc(TransportTask::getTopTask);
topQueryWrapper.orderByDesc(TransportTask::getUpdateTime);
topQueryWrapper.last("LIMIT 1");
TransportTask transportTask = transportTaskMapper.selectOne(topQueryWrapper);
//如果没有要置顶执行的任务则按照普通创建时间排序查询
if(Objects.isNull(transportTask)){
LambdaQueryWrapper<TransportTask> timeQueryWrapper = new LambdaQueryWrapper<>();
timeQueryWrapper.eq(TransportTask::getTaskStatus, TransportTaskStatusEnum.WAITING.getValue());
timeQueryWrapper.orderByAsc(TransportTask::getCreateTime);
timeQueryWrapper.last("LIMIT 1");
transportTask = transportTaskMapper.selectOne(timeQueryWrapper);
}
return transportTask;
} }
private class MessageConsumerThread extends Thread{ private class MessageConsumerThread extends Thread{
@ -68,53 +104,23 @@ public class TranTaskMessageConsumerHandler{
while (true) { while (true) {
try { try {
//获取本机项数据如果为空表示本机没有任务在运行 //获取本机项数据如果为空表示本机没有任务在运行
boolean flag = redisUtil.hHasKey(RocketMQTopConstant.HOST_TASK_STATE, serverProperties.getHost()); boolean flag = redisUtil.hHasKey(CommonConstant.HOST_TASK_STATE,CommonConstant.TRAN_TASK_STATE_PRE+serverProperties.getHost());
if (!flag) { if (!flag) {
//如果不存在则获取一条消息尝试执行
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer();
try { try {
consumer.setConsumerGroup(consumerGroup); TransportTask topMessage = getTopMessage();
consumer.setNamesrvAddr(nameServerAddress); if (Objects.nonNull(topMessage)) {
consumer.setPullBatchSize(1); handlerTask(topMessage);
consumer.subscribe(RocketMQTopConstant.BACKWARD_TRANSPORT_TASK_TOPIC);
consumer.setAutoCommit(false);
consumer.start();
TransportTaskDTO transportTaskDTO = null;
List<MessageExt> messages = consumer.poll(5000L);
if (CollUtil.isNotEmpty(messages)) {
transportTaskDTO = JSON.parseObject(messages.get(0).getBody(),TransportTaskDTO.class);
boolean matchFlag = handlerTask(transportTaskDTO);
if (matchFlag){
consumer.commit();
}
log.info("消费一条消息:{}",transportTaskDTO);
} }
}catch (MQClientException e){ }catch (Exception e){
log.error("消费者启动异常,原因为:{}",e.getMessage(),e); log.error("消费输运任务数据异常,原因为:{}",e.getMessage(),e);
}finally {
consumer.shutdown();
} }
} }
log.info("30秒钟处理一次"); log.info("60秒钟处理一次");
TimeUnit.SECONDS.sleep(30); TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }
/**
* 处理输运模拟任务
*/
private boolean handlerTask(TransportTaskDTO message){
AbstractTaskMsgHandler taskMsgHandler = new Server11TaskHandler();
taskMsgHandler.init(transportTaskMapper,taskBackwardChildMapper,weatherDataMapper,
simulationProperties,systemStorageProperties,dataFusionProperties,
serverProperties,taskForwardSpeciesMapper,taskForwardChildMapper,
transportTaskService,taskForwardReleaseMapper,stationDataService,
stationsModValService);
taskMsgHandler.handler(message);
return taskMsgHandler.isMatchFlag();
}
} }
} }

View File

@ -1,6 +1,5 @@
package org.jeecg.transport.consumer.china; package org.jeecg.transport.consumer.china;
import lombok.Getter;
import lombok.Setter; import lombok.Setter;
/** /**
@ -25,8 +24,4 @@ public abstract class AbstractChain {
*/ */
protected abstract void setChina(); protected abstract void setChina();
/**
* 是否匹配成功
*/
protected boolean matchFlag = false;
} }

View File

@ -1,11 +1,14 @@
package org.jeecg.transport.consumer.china; package org.jeecg.transport.consumer.china;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.enums.TransportTaskModeEnum; import org.jeecg.common.constant.enums.TransportTaskModeEnum;
import org.jeecg.common.properties.DataFusionProperties; import org.jeecg.common.properties.DataFusionProperties;
import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.SystemStorageProperties;
import org.jeecg.common.properties.TransportSimulationProperties; import org.jeecg.common.properties.TransportSimulationProperties;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.modules.base.dto.TransportTaskDTO;
import org.jeecg.modules.base.entity.TransportTask;
import org.jeecg.modules.base.mapper.*; import org.jeecg.modules.base.mapper.*;
import org.jeecg.properties.ServerProperties; import org.jeecg.properties.ServerProperties;
import org.jeecg.transport.flexparttask.AbstractTaskExec; import org.jeecg.transport.flexparttask.AbstractTaskExec;
@ -22,6 +25,7 @@ import java.util.Objects;
@Slf4j @Slf4j
public abstract class AbstractTaskMsgHandler extends AbstractChain{ public abstract class AbstractTaskMsgHandler extends AbstractChain{
protected RedisUtil redisUtil;
protected TransportTaskMapper transportTaskMapper; protected TransportTaskMapper transportTaskMapper;
protected TransportTaskBackwardChildMapper taskBackwardChildMapper; protected TransportTaskBackwardChildMapper taskBackwardChildMapper;
protected WeatherDataMapper weatherDataMapper; protected WeatherDataMapper weatherDataMapper;
@ -51,7 +55,8 @@ public abstract class AbstractTaskMsgHandler extends AbstractChain{
TransportTaskService transportTaskService, TransportTaskService transportTaskService,
TransportTaskForwardReleaseMapper taskForwardReleaseMapper, TransportTaskForwardReleaseMapper taskForwardReleaseMapper,
StationDataService stationDataService, StationDataService stationDataService,
StationsModValService stationsModValService) { StationsModValService stationsModValService,
RedisUtil redisUtil) {
this.transportTaskMapper = transportTaskMapper; this.transportTaskMapper = transportTaskMapper;
this.taskBackwardChildMapper = taskBackwardChildMapper; this.taskBackwardChildMapper = taskBackwardChildMapper;
this.weatherDataMapper = weatherDataMapper; this.weatherDataMapper = weatherDataMapper;
@ -65,6 +70,7 @@ public abstract class AbstractTaskMsgHandler extends AbstractChain{
this.taskForwardReleaseMapper = taskForwardReleaseMapper; this.taskForwardReleaseMapper = taskForwardReleaseMapper;
this.stationDataService = stationDataService; this.stationDataService = stationDataService;
this.stationsModValService = stationsModValService; this.stationsModValService = stationsModValService;
this.redisUtil = redisUtil;
this.setChina(); this.setChina();
} }
@ -80,59 +86,43 @@ public abstract class AbstractTaskMsgHandler extends AbstractChain{
/** /**
* 处理消息 * 处理消息
*/ */
public abstract void handler(TransportTaskDTO message); public abstract void handler(TransportTask message);
/**
* 设置匹配标记
* @param matchFlag
*/
protected void setMatchFlag(boolean matchFlag) {
this.matchFlag = matchFlag;
if (Objects.nonNull(this.previous)) {
this.previous.setMatchFlag(matchFlag);
}
}
/**
* 获取匹配标记
* @return
*/
public boolean isMatchFlag() {
return this.matchFlag;
}
/** /**
* 运行任务 * 运行任务
* @param transportTaskDTO * @param transportTask
*/ */
protected void runTask(TransportTaskDTO transportTaskDTO){ protected void runTask(TransportTask transportTask,String ip){
log.info("运行任务测试"); log.info("收到任务:"+transportTask.getTaskName());
if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTaskDTO.getTaskMode())){ boolean flag = false;
if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTask.getTaskMode())){
AbstractTaskExec taskExec = new BackwardTaskExec(); AbstractTaskExec taskExec = new BackwardTaskExec();
taskExec.init(weatherDataMapper,transportTaskService,transportTaskDTO, taskExec.init(weatherDataMapper,transportTaskService,transportTask,
simulationProperties,systemStorageProperties, simulationProperties,systemStorageProperties,
dataFusionProperties,serverProperties,taskBackwardChildMapper); dataFusionProperties,serverProperties,taskBackwardChildMapper,redisUtil);
taskExec.setName("大气输运反演任务执行线程"); taskExec.setName("flexpart-backword-thread");
//匹配成功并且检查成功才能设置为true //匹配成功并且检查成功才能设置为true
boolean flag = taskExec.checkTask(); flag = taskExec.checkTask();
if (flag){ if (flag){
this.setMatchFlag(true);
taskExec.start(); taskExec.start();
} }
}else if (TransportTaskModeEnum.FORWARD.getKey().equals(transportTaskDTO.getTaskMode())){ }else if (TransportTaskModeEnum.FORWARD.getKey().equals(transportTask.getTaskMode())){
AbstractTaskExec taskExec = new ForwardTaskExec(); AbstractTaskExec taskExec = new ForwardTaskExec();
taskExec.init(weatherDataMapper,transportTaskService,transportTaskDTO, taskExec.init(weatherDataMapper,transportTaskService,transportTask,
simulationProperties,systemStorageProperties,serverProperties, simulationProperties,systemStorageProperties,serverProperties,
stationDataService,stationsModValService,taskForwardSpeciesMapper, stationDataService,stationsModValService,taskForwardSpeciesMapper,
taskForwardChildMapper,taskForwardReleaseMapper); taskForwardChildMapper,taskForwardReleaseMapper,redisUtil);
taskExec.setName("大气输运正演任务执行线程"); taskExec.setName("flexpart-fword-thread");
//匹配成功并且检查成功才能设置为true //匹配成功并且检查成功才能设置为true
boolean flag = taskExec.checkTask(); flag = taskExec.checkTask();
if (flag){ if (flag){
this.setMatchFlag(true);
taskExec.start(); taskExec.start();
} }
taskExec.start(); }
if (flag){
log.info("任务:{},匹配成功,当前在主机:{}上执行。", transportTask.getTaskName(),ip);
}else {
transportTaskService.setInpectionFailed(transportTask);
} }
} }
} }

View File

@ -1,7 +1,7 @@
package org.jeecg.transport.consumer.china; package org.jeecg.transport.consumer.china;
import org.jeecg.common.constant.enums.TransportTaskModeEnum; import org.jeecg.common.constant.enums.TransportTaskModeEnum;
import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.modules.base.entity.TransportTask;
/** /**
* 只处理反演任务 * 只处理反演任务
@ -24,11 +24,12 @@ public class Server11TaskHandler extends AbstractTaskMsgHandler {
* 处理消息 * 处理消息
*/ */
@Override @Override
public void handler(TransportTaskDTO message) { public void handler(TransportTask message) {
if(serverProperties.getHost().equals(serverProperties.getIp11())){ if(serverProperties.getHost().equals(serverProperties.getIp11())){
if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(message.getTaskMode())){ super.runTask(message,serverProperties.getIp11());
super.runTask(message); // if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(message.getTaskMode())){
} // super.runTask(message,serverProperties.getIp11());
// }
}else { }else {
super.next.handler(message); super.next.handler(message);
} }

View File

@ -1,6 +1,6 @@
package org.jeecg.transport.consumer.china; package org.jeecg.transport.consumer.china;
import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.modules.base.entity.TransportTask;
/** /**
* 处理反演和正演任务 * 处理反演和正演任务
@ -22,9 +22,9 @@ public class Server12TaskHandler extends AbstractTaskMsgHandler {
* 处理消息 * 处理消息
*/ */
@Override @Override
public void handler(TransportTaskDTO message) { public void handler(TransportTask message) {
if(serverProperties.getHost().equals(serverProperties.getIp12())){ if(serverProperties.getHost().equals(serverProperties.getIp12())){
super.runTask(message); super.runTask(message,serverProperties.getIp12());
}else { }else {
super.next.handler(message); super.next.handler(message);
} }

View File

@ -1,6 +1,6 @@
package org.jeecg.transport.consumer.china; package org.jeecg.transport.consumer.china;
import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.modules.base.entity.TransportTask;
/** /**
* 处理反演和正演任务 * 处理反演和正演任务
@ -22,9 +22,9 @@ public class Server13TaskHandler extends AbstractTaskMsgHandler {
* 处理消息 * 处理消息
*/ */
@Override @Override
public void handler(TransportTaskDTO message) { public void handler(TransportTask message) {
if(serverProperties.getHost().equals(serverProperties.getIp13())){ if(serverProperties.getHost().equals(serverProperties.getIp13())){
super.runTask(message); super.runTask(message,serverProperties.getIp12());
}else { }else {
super.next.handler(message); super.next.handler(message);
} }

View File

@ -1,6 +1,6 @@
package org.jeecg.transport.consumer.china; package org.jeecg.transport.consumer.china;
import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.modules.base.entity.TransportTask;
/** /**
* 处理反演和永久型正演任务 * 处理反演和永久型正演任务
@ -19,11 +19,11 @@ public class Server14TaskHandler extends AbstractTaskMsgHandler {
* 处理消息 * 处理消息
*/ */
@Override @Override
public void handler(TransportTaskDTO message) { public void handler(TransportTask message) {
if(serverProperties.getHost().equals(serverProperties.getIp14())){ if(serverProperties.getHost().equals(serverProperties.getIp14())){
//这里需要再加个判断14服务器处理反演和永久任务 //这里需要再加个判断14服务器处理反演和永久任务
// if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(message.getTaskMode())){ // if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(message.getTaskMode())){
// super.runTask(message); // super.runTask(message,serverProperties.getIp11());
// } // }
} }
} }

View File

@ -4,17 +4,15 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.jcraft.jsch.ChannelExec; import com.jcraft.jsch.*;
import com.jcraft.jsch.JSch; import lombok.extern.slf4j.Slf4j;
import com.jcraft.jsch.JSchException;
import com.jcraft.jsch.Session;
import lombok.Setter;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.common.constant.enums.WeatherDataSourceEnum;
import org.jeecg.common.properties.DataFusionProperties; import org.jeecg.common.properties.DataFusionProperties;
import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.SystemStorageProperties;
import org.jeecg.common.properties.TransportSimulationProperties; import org.jeecg.common.properties.TransportSimulationProperties;
import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.TransportTask; import org.jeecg.modules.base.entity.TransportTask;
import org.jeecg.modules.base.entity.WeatherData; import org.jeecg.modules.base.entity.WeatherData;
import org.jeecg.modules.base.mapper.*; import org.jeecg.modules.base.mapper.*;
@ -26,8 +24,8 @@ import java.io.*;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.List; import java.util.List;
import java.util.Properties;
@Slf4j
public abstract class AbstractTaskExec extends Thread{ public abstract class AbstractTaskExec extends Thread{
protected WeatherDataMapper weatherDataMapper; protected WeatherDataMapper weatherDataMapper;
@ -43,14 +41,15 @@ public abstract class AbstractTaskExec extends Thread{
protected TransportTaskForwardReleaseMapper taskForwardReleaseMapper; protected TransportTaskForwardReleaseMapper taskForwardReleaseMapper;
protected TransportTaskBackwardChildMapper taskBackwardChildMapper; protected TransportTaskBackwardChildMapper taskBackwardChildMapper;
protected TransportTask transportTask; protected TransportTask transportTask;
protected TransportTaskDTO transportTaskDTO; protected RedisUtil redisUtil;
protected boolean taskRunError;
/** /**
* 初始化 * 初始化
*/ */
public void init(WeatherDataMapper weatherDataMapper, public void init(WeatherDataMapper weatherDataMapper,
TransportTaskService transportTaskService, TransportTaskService transportTaskService,
TransportTaskDTO transportTaskDTO, TransportTask transportTask,
TransportSimulationProperties simulationProperties, TransportSimulationProperties simulationProperties,
SystemStorageProperties systemStorageProperties, SystemStorageProperties systemStorageProperties,
ServerProperties serverProperties, ServerProperties serverProperties,
@ -58,10 +57,11 @@ public abstract class AbstractTaskExec extends Thread{
StationsModValService stationsModValService, StationsModValService stationsModValService,
TransportTaskForwardSpeciesMapper taskForwardSpeciesMapper, TransportTaskForwardSpeciesMapper taskForwardSpeciesMapper,
TransportTaskForwardChildMapper taskForwardChildMapper, TransportTaskForwardChildMapper taskForwardChildMapper,
TransportTaskForwardReleaseMapper taskForwardReleaseMapper){ TransportTaskForwardReleaseMapper taskForwardReleaseMapper,
RedisUtil redisUtil){
this.weatherDataMapper = weatherDataMapper; this.weatherDataMapper = weatherDataMapper;
this.transportTaskService = transportTaskService; this.transportTaskService = transportTaskService;
this.transportTaskDTO = transportTaskDTO; this.transportTask = transportTask;
this.simulationProperties = simulationProperties; this.simulationProperties = simulationProperties;
this.systemStorageProperties = systemStorageProperties; this.systemStorageProperties = systemStorageProperties;
this.serverProperties = serverProperties; this.serverProperties = serverProperties;
@ -70,6 +70,7 @@ public abstract class AbstractTaskExec extends Thread{
this.taskForwardSpeciesMapper = taskForwardSpeciesMapper; this.taskForwardSpeciesMapper = taskForwardSpeciesMapper;
this.taskForwardChildMapper = taskForwardChildMapper; this.taskForwardChildMapper = taskForwardChildMapper;
this.taskForwardReleaseMapper = taskForwardReleaseMapper; this.taskForwardReleaseMapper = taskForwardReleaseMapper;
this.redisUtil = redisUtil;
} }
/** /**
@ -77,20 +78,22 @@ public abstract class AbstractTaskExec extends Thread{
*/ */
public void init(WeatherDataMapper weatherDataMapper, public void init(WeatherDataMapper weatherDataMapper,
TransportTaskService transportTaskService, TransportTaskService transportTaskService,
TransportTaskDTO transportTaskDTO, TransportTask transportTask,
TransportSimulationProperties simulationProperties, TransportSimulationProperties simulationProperties,
SystemStorageProperties systemStorageProperties, SystemStorageProperties systemStorageProperties,
DataFusionProperties dataFusionProperties, DataFusionProperties dataFusionProperties,
ServerProperties serverProperties, ServerProperties serverProperties,
TransportTaskBackwardChildMapper taskBackwardChildMapper){ TransportTaskBackwardChildMapper taskBackwardChildMapper,
RedisUtil redisUtil){
this.weatherDataMapper = weatherDataMapper; this.weatherDataMapper = weatherDataMapper;
this.transportTaskService = transportTaskService; this.transportTaskService = transportTaskService;
this.transportTaskDTO = transportTaskDTO; this.transportTask = transportTask;
this.simulationProperties = simulationProperties; this.simulationProperties = simulationProperties;
this.systemStorageProperties = systemStorageProperties; this.systemStorageProperties = systemStorageProperties;
this.dataFusionProperties = dataFusionProperties; this.dataFusionProperties = dataFusionProperties;
this.serverProperties = serverProperties; this.serverProperties = serverProperties;
this.taskBackwardChildMapper = taskBackwardChildMapper; this.taskBackwardChildMapper = taskBackwardChildMapper;
this.redisUtil = redisUtil;
} }
public abstract boolean checkTask(); public abstract boolean checkTask();
@ -99,6 +102,20 @@ public abstract class AbstractTaskExec extends Thread{
protected abstract void execSimulation(); protected abstract void execSimulation();
/**
* 设置任务运行标记
*/
protected void setTaskRunFlag(){
redisUtil.hset(CommonConstant.HOST_TASK_STATE,CommonConstant.TRAN_TASK_STATE_PRE+serverProperties.getHost(),transportTask.getId());
}
/**
* 任务运行失败取消运行标记
*/
protected void cancelTaskRunFlag(){
redisUtil.hdel(CommonConstant.HOST_TASK_STATE,CommonConstant.TRAN_TASK_STATE_PRE+serverProperties.getHost());
}
/** /**
* 检查气象数据 * 检查气象数据
*/ */
@ -193,71 +210,36 @@ public abstract class AbstractTaskExec extends Thread{
return path.toString(); return path.toString();
} }
/**
* 持续读取日志
*/
protected void execCommand(String command,Session session){
ChannelExec channel = null;
try{
//打开一个执行通道
channel = (ChannelExec) session.openChannel("exec");
String fullCommand = command + " 2>&1";
channel.setCommand(fullCommand);
// 获取脚本的标准输出流包含错误输出流
InputStream in = channel.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
// 连接通道
channel.connect();
protected class JSchRemoteRunner{ String line;
while ((line = reader.readLine()) != null) {
private JSch jsch = new JSch(); if(StrUtil.isNotBlank(line)){
if(line.trim().startsWith("ERROR STOP")){
private Session session = null; taskRunError = true;
private ChannelExec channel = null;
@Setter
private String command;
/**
* 登录
*
* @param host
* @param port
* @param username
* @param password
* @throws JSchException
*/
protected void login(String host, int port, String username, String password) throws JSchException {
Properties config = new Properties();
config.put("StrictHostKeyChecking","no");
session = jsch.getSession(username, host, port);
session.setPassword(password);
session.setConfig(config);
session.setServerAliveInterval(30);
session.setServerAliveCountMax(3);
session.setTimeout(0);
session.connect();
}
/**
* 持续读取日志
*/
protected void execCommand() throws JSchException, IOException {
try{
//打开一个执行通道
channel = (ChannelExec) session.openChannel("exec");
String fullCommand = command + " 2>&1";
channel.setCommand(fullCommand);
// 获取脚本的标准输出流包含错误输出流
InputStream in = channel.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(in));
// 连接通道
channel.connect();
String line;
while ((line = reader.readLine()) != null) {
if(StrUtil.isNotBlank(line)){
ProgressQueue.getInstance().offer(new ProgressEvent(transportTask.getId(),line));
} }
ProgressQueue.getInstance().offer(new ProgressEvent(transportTask.getId(),line));
} }
}catch(JSchException |IOException e){ }
throw new RuntimeException(e); }catch(JSchException |IOException e){
}finally { throw new RuntimeException(e);
// 关闭资源 }finally {
if (channel != null) { if (channel != null) {
channel.disconnect(); channel.disconnect();
}
if (session != null) {
session.disconnect();
}
} }
} }
} }

View File

@ -1,14 +1,15 @@
package org.jeecg.transport.flexparttask; package org.jeecg.transport.flexparttask;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
import org.jeecg.common.constant.enums.FlexpartSpeciesType; import org.jeecg.common.constant.enums.FlexpartSpeciesType;
import org.jeecg.common.constant.enums.TransportTaskStatusEnum; import org.jeecg.common.constant.enums.TransportTaskStatusEnum;
import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.common.constant.enums.WeatherDataSourceEnum;
import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.entity.*;
import org.jeecg.transport.util.JSchRemoteRunner;
import java.io.File; import java.io.File;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
@ -22,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* 反演任务线程 * 反演任务线程
*/ */
@Slf4j
public class BackwardTaskExec extends AbstractTaskExec { public class BackwardTaskExec extends AbstractTaskExec {
@Override @Override
@ -38,10 +40,9 @@ public class BackwardTaskExec extends AbstractTaskExec {
//校验任务 //校验任务
AtomicBoolean flag = new AtomicBoolean(true); AtomicBoolean flag = new AtomicBoolean(true);
List<String> msgList = new ArrayList<>(); List<String> msgList = new ArrayList<>();
super.transportTask = this.transportTaskService.getById(super.transportTaskDTO.getId());
//查询站点信息 //查询站点信息
LambdaQueryWrapper<TransportTaskBackwardChild> queryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<TransportTaskBackwardChild> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TransportTaskBackwardChild::getTaskId,super.transportTaskDTO.getId()); queryWrapper.eq(TransportTaskBackwardChild::getTaskId,super.transportTask.getId());
queryWrapper.orderByAsc(TransportTaskBackwardChild::getId); queryWrapper.orderByAsc(TransportTaskBackwardChild::getId);
List<TransportTaskBackwardChild> transportTaskChildren = super.taskBackwardChildMapper.selectList(queryWrapper); List<TransportTaskBackwardChild> transportTaskChildren = super.taskBackwardChildMapper.selectList(queryWrapper);
if(CollUtil.isEmpty(transportTaskChildren)){ if(CollUtil.isEmpty(transportTaskChildren)){
@ -84,6 +85,8 @@ public class BackwardTaskExec extends AbstractTaskExec {
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
try{ try{
//设置任务运行标记
super.setTaskRunFlag();
//修改任务状态为执行中 //修改任务状态为执行中
super.transportTaskService.updateTaskStatus(super.transportTask.getId(), TransportTaskStatusEnum.IN_OPERATION.getValue()); super.transportTaskService.updateTaskStatus(super.transportTask.getId(), TransportTaskStatusEnum.IN_OPERATION.getValue());
//如果此任务已存在历史日志先清除 //如果此任务已存在历史日志先清除
@ -91,12 +94,15 @@ public class BackwardTaskExec extends AbstractTaskExec {
//执行模拟 //执行模拟
this.execSimulation(); this.execSimulation();
//生成SRS文件 //生成SRS文件
this.generateSRSFile(); // this.generateSRSFile();
}catch (Exception e){ }catch (Exception e){
String taskErrorLog = "任务执行失败,原因:"+e.getMessage(); String taskErrorLog = "任务执行失败,原因:"+e.getMessage();
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskErrorLog)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskErrorLog));
//如果还未执行到flexpartjava业务代码报错修改状态
super.transportTaskService.updateTaskStatus(super.transportTask.getId(),TransportTaskStatusEnum.FAILURE.getValue()); super.transportTaskService.updateTaskStatus(super.transportTask.getId(),TransportTaskStatusEnum.FAILURE.getValue());
throw e; super.cancelTaskRunFlag();
log.error(taskErrorLog);
e.printStackTrace();
}finally { }finally {
//添加任务耗时 //添加任务耗时
stopWatch.stop(); stopWatch.stop();
@ -104,7 +110,15 @@ public class BackwardTaskExec extends AbstractTaskExec {
double min = seconds/60D; double min = seconds/60D;
BigDecimal bgMin = new BigDecimal(min); BigDecimal bgMin = new BigDecimal(min);
BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP); BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP);
super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue());
Integer taskState;
if (super.taskRunError){
//如果super.taskRunError等于true说明flexpart运行过程日志提示出错这里把任务状态改为失败
taskState = TransportTaskStatusEnum.FAILURE.getValue();
}else{
taskState = TransportTaskStatusEnum.COMPLETED.getValue();
}
super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue(),taskState);
String taskCompletedLog = "任务执行完成,耗时:"+min+"分钟"; String taskCompletedLog = "任务执行完成,耗时:"+min+"分钟";
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskCompletedLog)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskCompletedLog));
} }
@ -114,16 +128,19 @@ public class BackwardTaskExec extends AbstractTaskExec {
* 执行模拟 * 执行模拟
*/ */
protected void execSimulation(){ protected void execSimulation(){
// Process process = null; //ssh连接宿主机调用flexpart
JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner();
try { try {
//登录ssh
jschRemoteRunner.login(super.serverProperties.getHost(),super.serverProperties.getPort(),
super.serverProperties.getUsername(),super.serverProperties.getPassword());
String paramMsg = "生成flexpart所需参数文件param.config,stations.config"; String paramMsg = "生成flexpart所需参数文件param.config,stations.config";
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),paramMsg)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),paramMsg));
//处理参数配置文件 //处理参数配置文件
String paramConfigPath = super.getParamConfigPath(super.transportTask.getUseMetType()); String paramConfigPath = super.getParamConfigPath(super.transportTask.getUseMetType());
String metDataPath = super.getMetDataPath(super.transportTask.getUseMetType()); String metDataPath = super.getMetDataPath(super.transportTask.getUseMetType());
if(!FileUtil.exist(paramConfigPath)){
FileUtil.touch(paramConfigPath);
}
StringBuilder paramContent = new StringBuilder(); StringBuilder paramContent = new StringBuilder();
paramContent.append(super.transportTask.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); paramContent.append(super.transportTask.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n");
paramContent.append(super.transportTask.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); paramContent.append(super.transportTask.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n");
@ -133,17 +150,15 @@ public class BackwardTaskExec extends AbstractTaskExec {
paramContent.append(super.transportTask.getZ1()).append("\n"); paramContent.append(super.transportTask.getZ1()).append("\n");
paramContent.append(super.transportTask.getZ2()).append("\n"); paramContent.append(super.transportTask.getZ2()).append("\n");
paramContent.append(metDataPath).append("\n"); paramContent.append(metDataPath).append("\n");
paramContent.append(super.simulationProperties.getOutputPath()+File.separator+super.transportTask.getTaskName()).append("\n"); paramContent.append(super.simulationProperties.getOutputPath()+ File.separator+super.transportTask.getTaskName()).append("\n");
paramContent.append(FlexpartSpeciesType.NOT_SPECIES.getId()).append("\n");//反演固定61不显示具体核素只显示Xe paramContent.append(FlexpartSpeciesType.NOT_SPECIES.getId()).append("\n");//反演固定61不显示具体核素只显示Xe
FileUtil.writeString(paramContent.toString(),paramConfigPath,"UTF-8"); jschRemoteRunner.writeFile(paramConfigPath,paramContent.toString());
//处理台站数据文件 //处理台站数据文件
List<String> stationConfigInfo = new ArrayList<>(); List<String> stationConfigInfo = new ArrayList<>();
String stationsConfigPath = super.getStationsConfigPath(super.transportTask.getUseMetType())+".backward"; String stationsConfigPath = super.getStationsConfigPath(super.transportTask.getUseMetType())+".backward";
if(!FileUtil.exist(stationsConfigPath)){
FileUtil.touch(stationsConfigPath);
}
super.transportTask.getBackwardChild().forEach(taskChild -> { super.transportTask.getBackwardChild().forEach(taskChild -> {
String format = "%s,%f,%f,%s,%s,%s,%s,%s"; String format = "%s,%f,%f,%s,%s,%s,%s,%s\n";
BigDecimal srcReleaseAmount = new BigDecimal(taskChild.getReleaseAmount()); BigDecimal srcReleaseAmount = new BigDecimal(taskChild.getReleaseAmount());
BigDecimal constant = new BigDecimal("1000"); BigDecimal constant = new BigDecimal("1000");
DecimalFormat scientificFormat = new DecimalFormat("0.00E0"); DecimalFormat scientificFormat = new DecimalFormat("0.00E0");
@ -156,21 +171,19 @@ public class BackwardTaskExec extends AbstractTaskExec {
taskChild.getAcqEndTime().format(DateTimeFormatter.ofPattern("HHmmss"))); taskChild.getAcqEndTime().format(DateTimeFormatter.ofPattern("HHmmss")));
stationConfigInfo.add(row); stationConfigInfo.add(row);
}); });
//最后一行需要换行否则启动flexpart报错 String stationConfigInfoStr = String.join("", stationConfigInfo);
stationConfigInfo.add("\n"); jschRemoteRunner.writeFile(stationsConfigPath,stationConfigInfoStr);
FileUtil.writeLines(stationConfigInfo,stationsConfigPath,"UTF-8");
//获取反演脚本路径 //获取反演脚本路径
String scriptPath = this.getBackForwardScriptPath(super.transportTask.getUseMetType()); String scriptPath = this.getBackForwardScriptPath(super.transportTask.getUseMetType());
String execScriptMsg = "执行任务脚本,开始模拟,路径为:"+scriptPath; String execScriptMsg = "执行任务脚本,开始模拟,路径为:"+scriptPath;
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),execScriptMsg)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),execScriptMsg));
//ssh连接宿主机调用flexpart //执行命令
JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner(); super.execCommand(scriptPath,jschRemoteRunner.getSession());
jschRemoteRunner.setCommand(scriptPath);
jschRemoteRunner.login(super.serverProperties.getHost(),super.serverProperties.getPort(),
super.serverProperties.getUsername(),super.serverProperties.getPassword());
jschRemoteRunner.execCommand();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
}finally {
jschRemoteRunner.close();
} }
} }

View File

@ -1,13 +1,14 @@
package org.jeecg.transport.flexparttask; package org.jeecg.transport.flexparttask;
import cn.hutool.core.collection.CollUtil; import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.io.FileUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch; import org.apache.commons.lang3.time.StopWatch;
import org.apache.logging.log4j.util.Strings; import org.apache.logging.log4j.util.Strings;
import org.jeecg.common.constant.enums.TransportTaskStatusEnum; import org.jeecg.common.constant.enums.TransportTaskStatusEnum;
import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.common.constant.enums.WeatherDataSourceEnum;
import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.entity.*;
import org.jeecg.transport.util.JSchRemoteRunner;
import java.io.File; import java.io.File;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.math.RoundingMode; import java.math.RoundingMode;
@ -21,6 +22,7 @@ import java.util.stream.Collectors;
/** /**
* 正演任务线程 * 正演任务线程
*/ */
@Slf4j
public class ForwardTaskExec extends AbstractTaskExec { public class ForwardTaskExec extends AbstractTaskExec {
@Override @Override
@ -37,7 +39,6 @@ public class ForwardTaskExec extends AbstractTaskExec {
//校验任务 //校验任务
AtomicBoolean flag = new AtomicBoolean(true); AtomicBoolean flag = new AtomicBoolean(true);
List<String> msgList = new ArrayList<>(); List<String> msgList = new ArrayList<>();
super.transportTask = this.transportTaskService.getById(super.transportTaskDTO.getId());
//查询站点信息 //查询站点信息
LambdaQueryWrapper<TransportTaskForwardChild> siteQueryWrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<TransportTaskForwardChild> siteQueryWrapper = new LambdaQueryWrapper<>();
siteQueryWrapper.eq(TransportTaskForwardChild::getTaskId,transportTask.getId()); siteQueryWrapper.eq(TransportTaskForwardChild::getTaskId,transportTask.getId());
@ -100,6 +101,8 @@ public class ForwardTaskExec extends AbstractTaskExec {
StopWatch stopWatch = new StopWatch(); StopWatch stopWatch = new StopWatch();
stopWatch.start(); stopWatch.start();
try{ try{
//设置任务运行标记
super.setTaskRunFlag();
//修改任务状态为执行中 //修改任务状态为执行中
super.transportTaskService.updateTaskStatus(super.transportTask.getId(), TransportTaskStatusEnum.IN_OPERATION.getValue()); super.transportTaskService.updateTaskStatus(super.transportTask.getId(), TransportTaskStatusEnum.IN_OPERATION.getValue());
//如果此任务已存在历史日志先清除 //如果此任务已存在历史日志先清除
@ -115,7 +118,8 @@ public class ForwardTaskExec extends AbstractTaskExec {
String taskErrorLog = "任务执行失败,原因:"+e.getMessage(); String taskErrorLog = "任务执行失败,原因:"+e.getMessage();
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskErrorLog)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskErrorLog));
this.transportTaskService.updateTaskStatus(super.transportTask.getId(),TransportTaskStatusEnum.FAILURE.getValue()); this.transportTaskService.updateTaskStatus(super.transportTask.getId(),TransportTaskStatusEnum.FAILURE.getValue());
throw e; super.cancelTaskRunFlag();
log.error(taskErrorLog);
}finally { }finally {
//添加任务耗时 //添加任务耗时
stopWatch.stop(); stopWatch.stop();
@ -123,7 +127,15 @@ public class ForwardTaskExec extends AbstractTaskExec {
double min = seconds/60D; double min = seconds/60D;
BigDecimal bgMin = new BigDecimal(min); BigDecimal bgMin = new BigDecimal(min);
BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP); BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP);
super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue());
Integer taskState;
if (super.taskRunError){
//如果super.taskRunError等于true说明flexpart运行过程日志提示出错这里把任务状态改为失败
taskState = TransportTaskStatusEnum.FAILURE.getValue();
}else{
taskState = TransportTaskStatusEnum.COMPLETED.getValue();
}
super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue(),taskState);
String taskCompletedLog = "任务执行完成,耗时:"+min+"分钟"; String taskCompletedLog = "任务执行完成,耗时:"+min+"分钟";
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskCompletedLog)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskCompletedLog));
} }
@ -133,16 +145,19 @@ public class ForwardTaskExec extends AbstractTaskExec {
* 执行模拟 * 执行模拟
*/ */
protected void execSimulation(){ protected void execSimulation(){
Process process = null; //ssh连接宿主机调用flexpart
JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner();
try { try {
//登录ssh
jschRemoteRunner.login(super.serverProperties.getHost(), super.serverProperties.getPort(),
super.serverProperties.getUsername(), super.serverProperties.getPassword());
String paramMsg = "生成flexpart所需配置文件param.config,stations.config,input_site_hour"; String paramMsg = "生成flexpart所需配置文件param.config,stations.config,input_site_hour";
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),paramMsg)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),paramMsg));
//处理参数配置文件-param.config //处理参数配置文件-param.config
String paramConfigPath = super.getParamConfigPath(super.transportTask.getUseMetType()); String paramConfigPath = super.getParamConfigPath(super.transportTask.getUseMetType());
String metDataPath = super.getMetDataPath(super.transportTask.getUseMetType()); String metDataPath = super.getMetDataPath(super.transportTask.getUseMetType());
if(!FileUtil.exist(paramConfigPath)){
FileUtil.touch(paramConfigPath);
}
StringBuilder paramContent = new StringBuilder(); StringBuilder paramContent = new StringBuilder();
paramContent.append(super.transportTask.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); paramContent.append(super.transportTask.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n");
paramContent.append(super.transportTask.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); paramContent.append(super.transportTask.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n");
@ -170,7 +185,7 @@ public class ForwardTaskExec extends AbstractTaskExec {
paramContent.append("\n"); paramContent.append("\n");
} }
} }
FileUtil.writeString(paramContent.toString(),paramConfigPath,"UTF-8"); jschRemoteRunner.writeFile(paramConfigPath,paramContent.toString());
//处理台站数据文件-stations.config //处理台站数据文件-stations.config
if (CollUtil.isEmpty(super.transportTask.getForwardChild())){ if (CollUtil.isEmpty(super.transportTask.getForwardChild())){
@ -181,26 +196,19 @@ public class ForwardTaskExec extends AbstractTaskExec {
} }
List<String> stationConfigInfo = new ArrayList<>(); List<String> stationConfigInfo = new ArrayList<>();
String stationsConfigPath = super.getStationsConfigPath(super.transportTask.getUseMetType()); String stationsConfigPath = super.getStationsConfigPath(super.transportTask.getUseMetType());
if(!FileUtil.exist(stationsConfigPath)){
FileUtil.touch(stationsConfigPath);
}
super.transportTask.getForwardChild().forEach(station ->{ super.transportTask.getForwardChild().forEach(station ->{
String format = "%s,%f,%f,%s"; String format = "%s,%f,%f,%s\n";
//最后0是以前版本的总释放量改版后无用给个默认值 //最后0是以前版本的总释放量改版后无用给个默认值
String row = String.format(format,station.getStationCode(),station.getLat(),station.getLon(),"0"); String row = String.format(format,station.getStationCode(),station.getLat(),station.getLon(),"0");
stationConfigInfo.add(row); stationConfigInfo.add(row);
}); });
//最后一行需要换行否则启动flexpart报错 String stationConfigInfoStr = String.join("", stationConfigInfo);
stationConfigInfo.add("\n"); jschRemoteRunner.writeFile(stationsConfigPath,stationConfigInfoStr);
FileUtil.writeLines(stationConfigInfo,stationsConfigPath,"UTF-8");
//配置各站点排放数据 //配置各站点排放数据
//如果input_site_hour目录不存在先创建 //清空历史文件数据
if(!FileUtil.exist(super.simulationProperties.getInputSiteHourPath())){ jschRemoteRunner.clearDir(super.simulationProperties.getInputSiteHourPath());
FileUtil.mkdir(super.simulationProperties.getInputSiteHourPath());
}else {
FileUtil.clean(super.simulationProperties.getInputSiteHourPath());
}
Map<String,List<TransportTaskForwardRelease>> releaseInfoMap = new HashMap<>(); Map<String,List<TransportTaskForwardRelease>> releaseInfoMap = new HashMap<>();
for (TransportTaskForwardChild station : super.transportTask.getForwardChild()){ for (TransportTaskForwardChild station : super.transportTask.getForwardChild()){
if (CollUtil.isEmpty(station.getForwardReleaseChild())){ if (CollUtil.isEmpty(station.getForwardReleaseChild())){
@ -212,11 +220,10 @@ public class ForwardTaskExec extends AbstractTaskExec {
releaseInfoMap.put(station.getStationCode(),station.getForwardReleaseChild()); releaseInfoMap.put(station.getStationCode(),station.getForwardReleaseChild());
} }
releaseInfoMap .forEach((stationCode,releaseInfoList)->{ releaseInfoMap .forEach((stationCode,releaseInfoList)->{
String stationReleaseFile = super.simulationProperties.getInputSiteHourPath() + File.separator + stationCode + ".txt"; String stationReleaseFile = super.simulationProperties.getInputSiteHourPath() + "/" + stationCode + ".txt";
FileUtil.touch(stationReleaseFile);
List<String> stationReleaseInfo = new ArrayList<>(); List<String> stationReleaseInfo = new ArrayList<>();
releaseInfoList.forEach(releaseInfo ->{ releaseInfoList.forEach(releaseInfo ->{
String format = "%s,%s,%s"; String format = "%s,%s,%s\n";
String startTime = releaseInfo.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMddHH")); String startTime = releaseInfo.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
String endTime = releaseInfo.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMddHH")); String endTime = releaseInfo.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMddHH"));
BigDecimal releaseAmount = new BigDecimal(releaseInfo.getReleaseAmount()); BigDecimal releaseAmount = new BigDecimal(releaseInfo.getReleaseAmount());
@ -227,20 +234,18 @@ public class ForwardTaskExec extends AbstractTaskExec {
String row = String.format(format, startTime, endTime, releaseAmountFormat); String row = String.format(format, startTime, endTime, releaseAmountFormat);
stationReleaseInfo.add(row); stationReleaseInfo.add(row);
}); });
FileUtil.writeLines(stationReleaseInfo,stationReleaseFile,"UTF-8"); jschRemoteRunner.writeFile(stationReleaseFile,String.join("",stationReleaseInfo));
}); });
//获取正演脚本路径 //获取正演脚本路径
String scriptPath = this.getForwardScriptPath(super.transportTask.getUseMetType()); String scriptPath = this.getForwardScriptPath(super.transportTask.getUseMetType());
String execScriptMsg = "执行任务脚本,开始模拟,路径为:"+scriptPath; String execScriptMsg = "执行任务脚本,开始模拟,路径为:"+scriptPath;
ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),execScriptMsg)); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),execScriptMsg));
//ssh连接宿主机调用flexpart //执行命令
JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner(); super.execCommand(scriptPath,jschRemoteRunner.getSession());
jschRemoteRunner.setCommand(scriptPath);
jschRemoteRunner.login(super.serverProperties.getHost(), super.serverProperties.getPort(),
super.serverProperties.getUsername(), super.serverProperties.getPassword());
jschRemoteRunner.execCommand();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
}finally {
jschRemoteRunner.close();
} }
} }
/** /**

View File

@ -36,10 +36,6 @@ public class ProgressMonitor{
try { try {
ProgressEvent event = ProgressQueue.getInstance().take(); ProgressEvent event = ProgressQueue.getInstance().take();
if(Objects.nonNull(event)) { if(Objects.nonNull(event)) {
//flexpart固定报错信息
if(event.getContent().trim().startsWith("ERROR STOP")){
transportTaskService.updateTaskStatus(event.getTaskId(), TransportTaskStatusEnum.FAILURE.getValue());
}
TransportTaskLog log = new TransportTaskLog(); TransportTaskLog log = new TransportTaskLog();
log.setTaskId(event.getTaskId()); log.setTaskId(event.getTaskId());
log.setLogContent(event.getContent()); log.setLogContent(event.getContent());

View File

@ -18,11 +18,11 @@ public interface TransportTaskService extends IService<TransportTask> {
void saveLog(TransportTaskLog transportTaskLog); void saveLog(TransportTaskLog transportTaskLog);
/** /**
* 修改任务耗时 * 修改任务运行完成
* @param taskId * @param taskId
* @param minute * @param minute
*/ */
void updateTaskStatusToCompleted(Integer taskId, Double minute); void updateTaskStatusToCompleted(Integer taskId, Double minute,Integer status);
/** /**
* 删除任务日志 * 删除任务日志
@ -43,4 +43,10 @@ public interface TransportTaskService extends IService<TransportTask> {
* @return * @return
*/ */
Map<Integer,String> getTaskSimulationSpecies(Integer taskId); Map<Integer,String> getTaskSimulationSpecies(Integer taskId);
/**
* 设置检查失败
* @param transportTask
*/
void setInpectionFailed(TransportTask transportTask);
} }

View File

@ -4,13 +4,17 @@ import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.enums.*; import org.jeecg.common.constant.enums.*;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.mapper.*; import org.jeecg.modules.base.mapper.*;
import org.jeecg.transport.service.StationDataService; import org.jeecg.properties.ServerProperties;
import org.jeecg.transport.service.TransportTaskService; import org.jeecg.transport.service.TransportTaskService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDateTime;
import java.util.*; import java.util.*;
/** /**
@ -22,7 +26,8 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
private final TransportTaskLogMapper transportTaskLogMapper; private final TransportTaskLogMapper transportTaskLogMapper;
private final TransportTaskForwardSpeciesMapper taskForwardSpeciesMapper; private final TransportTaskForwardSpeciesMapper taskForwardSpeciesMapper;
private final StationDataService stationDataService; private final ServerProperties serverProperties;
private final RedisUtil redisUtil;
/** /**
* 保存任务日志 * 保存任务日志
@ -36,18 +41,21 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
} }
/** /**
* 修改任务耗时 * 修改任务运行完成
* *
* @param taskId * @param taskId
* @param minute * @param minute
*/ */
@Transactional(rollbackFor = RuntimeException.class) @Transactional(rollbackFor = RuntimeException.class)
@Override @Override
public void updateTaskStatusToCompleted(Integer taskId, Double minute) { public void updateTaskStatusToCompleted(Integer taskId, Double minute,Integer status) {
TransportTask transportTask = this.baseMapper.selectById(taskId); TransportTask transportTask = this.baseMapper.selectById(taskId);
transportTask.setTaskStatus(TransportTaskStatusEnum.COMPLETED.getValue()); transportTask.setTaskStatus(status);
transportTask.setTimeConsuming(minute); transportTask.setTimeConsuming(minute);
//任务执行完成如果之前任务有设置置顶则修改为默认值
transportTask.setTopTask(TransportTaskTopEnum.NOT_TOP.getValue());
this.baseMapper.updateById(transportTask); this.baseMapper.updateById(transportTask);
redisUtil.hdel(CommonConstant.HOST_TASK_STATE,CommonConstant.TRAN_TASK_STATE_PRE+serverProperties.getHost());
} }
/** /**
@ -108,4 +116,17 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
} }
return result; return result;
} }
/**
* 设置检查失败
*
* @param transportTask
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void setInpectionFailed(TransportTask transportTask) {
transportTask.setTaskStatus(TransportTaskStatusEnum.INSPECTION_FAILED.getValue());
transportTask.setUpdateTime(LocalDateTime.now());
this.baseMapper.updateById(transportTask);
}
} }

View File

@ -0,0 +1,125 @@
package org.jeecg.transport.util;
import cn.hutool.core.util.StrUtil;
import com.jcraft.jsch.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.transport.flexparttask.ProgressEvent;
import org.jeecg.transport.flexparttask.ProgressQueue;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* ssh远程操作工具
*/
@Slf4j
public class JSchRemoteRunner {
private final JSch jsch = new JSch();
@Getter
private Session session = null;
/**
* 登录
*
* @param host
* @param port
* @param username
* @param password
* @throws JSchException
*/
public void login(String host, int port, String username, String password) throws JSchException {
Properties config = new Properties();
config.put("StrictHostKeyChecking","no");
config.put("PreferredAuthentications", "password");//只用秘密登录
session = jsch.getSession(username, host, port);
session.setUserInfo(new UserInfo() {
@Override
public String getPassphrase() {
return null;
}
@Override
public String getPassword() {
return password;
}
@Override
public boolean promptPassword(String s) {
return true;
}
@Override
public boolean promptPassphrase(String s) {
return false;
}
@Override
public boolean promptYesNo(String s) {
return true;
}
@Override
public void showMessage(String message) {
log.info("SSH Message: {}", message);
}
});
session.setConfig(config);
session.setServerAliveInterval(30);
session.setServerAliveCountMax(3);
session.setTimeout(0);
session.connect();
}
/**
* 写入文件内容如果文件不存在则创建
* @param path
* @param content
*/
public void writeFile(String path, String content) {
ChannelSftp sftp = null;
try {
sftp = (ChannelSftp)session.openChannel("sftp");
sftp.connect();
sftp.put(new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8)),path,ChannelSftp.OVERWRITE);
} catch (JSchException | SftpException e) {
throw new RuntimeException(e);
}finally {
if (sftp != null) {
sftp.disconnect();
}
}
}
/**
* 清空目录下所有文件
* @param path
*/
public void clearDir(String path){
if (StrUtil.isBlank(path) || StrUtil.equals("/", path)){
throw new RuntimeException("需要清空的目录路径不能为空");
}
ChannelExec channel = null;
try{
//打开一个执行通道
channel = (ChannelExec) session.openChannel("exec");
String fullCommand = "rm -f "+path+"/*";
channel.setCommand(fullCommand);
// 连接通道
channel.connect();
}catch(JSchException e){
throw new RuntimeException(e);
}finally {
if (channel != null) {
channel.disconnect();
}
}
}
/**
* 关闭会话
*/
public void close(){
if (session != null) {
session.disconnect();
}
}
}

View File

@ -1,6 +1,6 @@
package org.jeecg.consumer; package org.jeecg.consumer;
import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson.JSON;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.ConsumeMode;
@ -12,11 +12,9 @@ import org.jeecg.common.constant.enums.TransportTaskStatusEnum;
import org.jeecg.common.constant.enums.TransportTaskTypeEnum; import org.jeecg.common.constant.enums.TransportTaskTypeEnum;
import org.jeecg.common.properties.TransportSimulationProperties; import org.jeecg.common.properties.TransportSimulationProperties;
import org.jeecg.modules.base.dto.GardsSampleResultDTO; import org.jeecg.modules.base.dto.GardsSampleResultDTO;
import org.jeecg.modules.base.dto.TransportTaskDTO;
import org.jeecg.modules.base.entity.TransportTask; import org.jeecg.modules.base.entity.TransportTask;
import org.jeecg.modules.base.entity.TransportTaskBackwardChild; import org.jeecg.modules.base.entity.TransportTaskBackwardChild;
import org.jeecg.modules.base.entity.configuration.GardsStations; import org.jeecg.modules.base.entity.configuration.GardsStations;
import org.jeecg.producer.TransportTaskProducer;
import org.jeecg.service.StationDataService; import org.jeecg.service.StationDataService;
import org.jeecg.service.TransportTaskService; import org.jeecg.service.TransportTaskService;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -32,12 +30,11 @@ import java.util.Objects;
@Slf4j @Slf4j
@Component @Component
@RequiredArgsConstructor @RequiredArgsConstructor
@RocketMQMessageListener(consumerGroup = "consumer-group", @RocketMQMessageListener(consumerGroup = "${rocketmq.consumer.group}",
topic = RocketMQTopConstant.SAMPLE_RESULT_TOPIC, topic = RocketMQTopConstant.SAMPLE_RESULT_TOPIC,
consumeMode = ConsumeMode.ORDERLY) consumeMode = ConsumeMode.ORDERLY)
public class SampleMessageConsumer implements RocketMQListener<GardsSampleResultDTO> { public class SampleMessageConsumer implements RocketMQListener<GardsSampleResultDTO> {
private final TransportTaskProducer transportTaskProducer;
private final StationDataService stationDataService; private final StationDataService stationDataService;
private final TransportTaskService transportTaskService; private final TransportTaskService transportTaskService;
private final TransportSimulationProperties simulationProperties; private final TransportSimulationProperties simulationProperties;
@ -48,6 +45,7 @@ public class SampleMessageConsumer implements RocketMQListener<GardsSampleResult
*/ */
@Override @Override
public void onMessage(GardsSampleResultDTO sampleResultDTO) { public void onMessage(GardsSampleResultDTO sampleResultDTO) {
log.info("收到消息:"+ JSON.toJSONString(sampleResultDTO));
//查询台站信息 //查询台站信息
GardsStations stationInfo = stationDataService.getStationById(sampleResultDTO.getStationId()); GardsStations stationInfo = stationDataService.getStationById(sampleResultDTO.getStationId());
if (Objects.isNull(stationInfo)) { if (Objects.isNull(stationInfo)) {
@ -64,7 +62,6 @@ public class SampleMessageConsumer implements RocketMQListener<GardsSampleResult
//设置基础参数 //设置基础参数
TransportTask transportTask = new TransportTask(); TransportTask transportTask = new TransportTask();
transportTask.setTaskName(taskName); transportTask.setTaskName(taskName);
transportTask.setTaskPprogress(0);
transportTask.setTaskStatus(TransportTaskStatusEnum.WAITING.getValue()); transportTask.setTaskStatus(TransportTaskStatusEnum.WAITING.getValue());
transportTask.setTaskType(TransportTaskTypeEnum.AUTO.getKey()); transportTask.setTaskType(TransportTaskTypeEnum.AUTO.getKey());
transportTask.setTaskMode(TransportTaskModeEnum.BACK_FORWARD.getKey()); transportTask.setTaskMode(TransportTaskModeEnum.BACK_FORWARD.getKey());
@ -91,30 +88,6 @@ public class SampleMessageConsumer implements RocketMQListener<GardsSampleResult
transportTask.setBackwardChild(backwardChild); transportTask.setBackwardChild(backwardChild);
//保存反演任务 //保存反演任务
transportTaskService.cteate(transportTask); transportTaskService.cteate(transportTask);
//发送消息到消息队列
if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTask.getTaskMode())){
transportTaskProducer.sendBackwardMessage(buildMessage(transportTask));
}else if (TransportTaskModeEnum.FORWARD.getKey().equals(transportTask.getTaskMode())){
transportTaskProducer.sendForwardMessage(buildMessage(transportTask));
}
log.info("消费消息id为{}",sampleResultDTO.getSampleId());
} }
} }
/**
* 构造消息
* @param transportTask
* @return
*/
private TransportTaskDTO buildMessage(TransportTask transportTask){
TransportTaskDTO transportTaskDTO = new TransportTaskDTO();
transportTaskDTO.setId(transportTask.getId());
transportTaskDTO.setTaskName(transportTask.getTaskName());
transportTaskDTO.setTaskType(transportTask.getTaskType());
transportTaskDTO.setTaskMode(transportTask.getTaskMode());
transportTaskDTO.setUseMetType(transportTask.getUseMetType());
transportTaskDTO.setStartTime(transportTask.getStartTime());
transportTaskDTO.setEndTime(transportTask.getEndTime());
return transportTaskDTO;
}
} }

View File

@ -91,6 +91,22 @@ public class TransportTaskController {
return Result.OK(); return Result.OK();
} }
@AutoLog(value = "设置任务置顶")
@Operation(summary = "设置任务置顶")
@PutMapping("setTaskTop")
public Result<?> setTaskTop(@NotNull(message = "任务ID不能为空") Integer taskId){
transportTaskService.setTaskTop(taskId);
return Result.OK();
}
@AutoLog(value = "设置任务置顶")
@Operation(summary = "设置任务置顶")
@PutMapping("setCancelTaskTop")
public Result<?> setCancelTaskTop(@NotNull(message = "任务ID不能为空") Integer taskId){
transportTaskService.setCancelTaskTop(taskId);
return Result.OK();
}
@AutoLog(value = "获取单个站点的排放数据") @AutoLog(value = "获取单个站点的排放数据")
@Operation(summary = "获取单个站点的排放数据") @Operation(summary = "获取单个站点的排放数据")
@GetMapping("getReleaseDataByForwardId") @GetMapping("getReleaseDataByForwardId")

View File

@ -19,9 +19,4 @@ public interface StationsModValService extends IService <GardsStationsModVal>{
*/ */
List<GardsStationsModVal> getStationsModVal(Integer taskId,Integer stationId,Integer speciesId); List<GardsStationsModVal> getStationsModVal(Integer taskId,Integer stationId,Integer speciesId);
/**
* 保存台站模拟值
* @param list
*/
void saveStationsModVal(List<GardsStationsModVal> list);
} }

View File

@ -63,6 +63,12 @@ public interface TransportTaskService extends IService<TransportTask> {
*/ */
boolean checkTaskByName(String taskName); boolean checkTaskByName(String taskName);
/**
* 设置任务置顶
* @param taskId
*/
void setTaskTop(Integer taskId);
/** /**
* 运行任务 * 运行任务
* @param id * @param id
@ -77,32 +83,6 @@ public interface TransportTaskService extends IService<TransportTask> {
*/ */
IPage<TransportTaskLog> getTaskLog(Integer taskId,PageRequest pageRequest); IPage<TransportTaskLog> getTaskLog(Integer taskId,PageRequest pageRequest);
/**
* 保存任务日志
* @param transportTaskLog
*/
void saveLog(TransportTaskLog transportTaskLog);
/**
* 修改任务耗时
* @param taskId
* @param minute
*/
void updateTaskStatusToCompleted(Integer taskId, Double minute);
/**
* 删除任务日志
* @param taskId
*/
void deleteTaskLog(Integer taskId);
/**
* 修改任务状态
* @param taskId
* @param status
*/
void updateTaskStatus(Integer taskId, Integer status);
void handleExcelReleaseData(HttpServletResponse res) throws Exception; void handleExcelReleaseData(HttpServletResponse res) throws Exception;
/** /**
@ -136,4 +116,10 @@ public interface TransportTaskService extends IService<TransportTask> {
* 处理赵老师给的2014年的排放数据生成sql文件 * 处理赵老师给的2014年的排放数据生成sql文件
*/ */
void handlExcelReleaseData(); void handlExcelReleaseData();
/**
* 取消置顶
* @param taskId
*/
void setCancelTaskTop(Integer taskId);
} }

View File

@ -7,8 +7,6 @@ import org.jeecg.modules.base.entity.rnauto.GardsStationsModVal;
import org.jeecg.modules.base.mapper.GardsStationsModValMapper; import org.jeecg.modules.base.mapper.GardsStationsModValMapper;
import org.jeecg.service.StationsModValService; import org.jeecg.service.StationsModValService;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.List; import java.util.List;
@DS("ora") @DS("ora")
@ -31,14 +29,4 @@ public class StationsModValServiceImpl extends ServiceImpl<GardsStationsModValMa
queryWrapper.orderByAsc(GardsStationsModVal::getTimeSeries); queryWrapper.orderByAsc(GardsStationsModVal::getTimeSeries);
return this.baseMapper.selectList(queryWrapper); return this.baseMapper.selectList(queryWrapper);
} }
/**
* 保存台站模拟值
* @param list
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void saveStationsModVal(List<GardsStationsModVal> list) {
this.baseMapper.insert(list);
}
} }

View File

@ -14,22 +14,21 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import jakarta.servlet.http.HttpServletResponse; import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.apache.commons.lang3.StringUtils; import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.usermodel.*; import org.apache.poi.ss.usermodel.*;
import org.apache.poi.xssf.usermodel.XSSFWorkbook; import org.apache.poi.xssf.usermodel.XSSFWorkbook;
import org.jeecg.common.constant.enums.*; import org.jeecg.common.constant.enums.*;
import org.jeecg.common.properties.TransportSimulationProperties; import org.jeecg.common.properties.TransportSimulationProperties;
import org.jeecg.common.system.query.PageRequest; import org.jeecg.common.system.query.PageRequest;
import org.jeecg.modules.base.dto.TransportTaskDTO;
import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.entity.*;
import org.jeecg.modules.base.entity.configuration.GardsDetectors; import org.jeecg.modules.base.entity.configuration.GardsDetectors;
import org.jeecg.modules.base.entity.configuration.GardsStations; import org.jeecg.modules.base.entity.configuration.GardsStations;
import org.jeecg.modules.base.entity.original.GardsSampleData; import org.jeecg.modules.base.entity.original.GardsSampleData;
import org.jeecg.modules.base.entity.rnauto.GardsXeResults; import org.jeecg.modules.base.entity.rnauto.GardsXeResults;
import org.jeecg.modules.base.mapper.*; import org.jeecg.modules.base.mapper.*;
import org.jeecg.producer.TransportTaskProducer;
import org.jeecg.service.StationDataService; import org.jeecg.service.StationDataService;
import org.jeecg.service.TransportTaskService; import org.jeecg.service.TransportTaskService;
import org.jeecg.util.ExcelUtils;
import org.jeecg.vo.SiteAndReleaseDataVO; import org.jeecg.vo.SiteAndReleaseDataVO;
import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -49,6 +48,7 @@ import java.util.stream.Collectors;
/** /**
* 输运模拟任务管理 * 输运模拟任务管理
*/ */
@Slf4j
@RequiredArgsConstructor @RequiredArgsConstructor
@Service @Service
public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,TransportTask> implements TransportTaskService { public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,TransportTask> implements TransportTaskService {
@ -62,7 +62,6 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
private final DataSourceTransactionManager transactionManager; private final DataSourceTransactionManager transactionManager;
private final TransactionDefinition transactionDefinition; private final TransactionDefinition transactionDefinition;
private final StationDataService stationDataService; private final StationDataService stationDataService;
private final TransportTaskProducer transportTaskProducer;
/** /**
* 分页查询任务列表 * 分页查询任务列表
@ -77,20 +76,14 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
*/ */
@Override @Override
public IPage<TransportTask> page(PageRequest pageRequest, String taskName, Integer taskMode,Integer taskStatus,Integer metType, LocalDate startDate, LocalDate endDate) { public IPage<TransportTask> page(PageRequest pageRequest, String taskName, Integer taskMode,Integer taskStatus,Integer metType, LocalDate startDate, LocalDate endDate) {
LambdaQueryWrapper<TransportTask> queryWrapper = new LambdaQueryWrapper<>(); LocalDateTime startDateTime = null;
queryWrapper.eq(Objects.nonNull(taskMode),TransportTask::getTaskMode, taskMode); LocalDateTime endDateTime = null;
queryWrapper.eq(Objects.nonNull(taskStatus),TransportTask::getTaskStatus, taskStatus);
queryWrapper.eq(Objects.nonNull(metType),TransportTask::getUseMetType, metType);
if(Objects.nonNull(startDate) && Objects.nonNull(endDate)){ if(Objects.nonNull(startDate) && Objects.nonNull(endDate)){
LocalDateTime startDateTime = startDate.atTime(0, 0, 0); startDateTime = startDate.atTime(0, 0, 0);
LocalDateTime endDateTime = endDate.atTime(23, 59, 59); endDateTime = endDate.atTime(23, 59, 59);
queryWrapper.between(TransportTask::getCreateTime,startDateTime,endDateTime);
} }
queryWrapper.like(StringUtils.isNotBlank(taskName),TransportTask::getTaskName,taskName); Page<TransportTask> page = new Page<>(pageRequest.getPageNum(), pageRequest.getPageSize());
queryWrapper.orderByDesc(TransportTask::getCreateTime); return this.baseMapper.page(page,taskName,taskMode,taskStatus,metType,startDateTime,endDateTime);
queryWrapper.orderByAsc(TransportTask::getTaskStatus);
IPage<TransportTask> iPage = new Page<>(pageRequest.getPageNum(), pageRequest.getPageSize());
return this.page(iPage, queryWrapper);
} }
/** /**
@ -107,10 +100,16 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
if(Objects.nonNull(checkNameResult)){ if(Objects.nonNull(checkNameResult)){
throw new RuntimeException("此任务已存在,请核对任务名称信息"); throw new RuntimeException("此任务已存在,请核对任务名称信息");
} }
transportTask.setTaskPprogress(0); if(Objects.isNull(transportTask.getTaskStatus())){
transportTask.setTaskStatus(TransportTaskStatusEnum.NOT_STARTED.getValue()); transportTask.setTaskStatus(TransportTaskStatusEnum.NOT_STARTED.getValue());
transportTask.setTaskType(TransportTaskTypeEnum.MANUALLY.getKey()); }
if(Objects.isNull(transportTask.getTaskType())){
transportTask.setTaskType(TransportTaskTypeEnum.MANUALLY.getKey());
}
transportTask.setTimeConsuming(0D); transportTask.setTimeConsuming(0D);
transportTask.setTopTask(TransportTaskTopEnum.NOT_TOP.getValue());
transportTask.setCreateTime(LocalDateTime.now());
transportTask.setUpdateTime(LocalDateTime.now());
this.baseMapper.insert(transportTask); this.baseMapper.insert(transportTask);
if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTask.getTaskMode()) && if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTask.getTaskMode()) &&
CollUtil.isNotEmpty(transportTask.getBackwardChild())) { CollUtil.isNotEmpty(transportTask.getBackwardChild())) {
@ -211,6 +210,7 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
checkIdResult.setZ2(transportTask.getZ2()); checkIdResult.setZ2(transportTask.getZ2());
checkIdResult.setParticleCount(transportTask.getParticleCount()); checkIdResult.setParticleCount(transportTask.getParticleCount());
checkIdResult.setReleaseDataSource(transportTask.getReleaseDataSource()); checkIdResult.setReleaseDataSource(transportTask.getReleaseDataSource());
checkIdResult.setUpdateTime(LocalDateTime.now());
this.baseMapper.updateById(checkIdResult); this.baseMapper.updateById(checkIdResult);
//先删除再保存 //先删除再保存
if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTask.getTaskMode())) { if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTask.getTaskMode())) {
@ -303,6 +303,22 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
return Objects.isNull(transportTask); return Objects.isNull(transportTask);
} }
/**
* 设置任务置顶
* @param taskId
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void setTaskTop(Integer taskId) {
TransportTask transportTask = this.baseMapper.selectById(taskId);
if(Objects.isNull(transportTask)){
throw new RuntimeException("此任务不存在");
}
transportTask.setTopTask(TransportTaskTopEnum.TOP.getValue());
transportTask.setUpdateTime(LocalDateTime.now());
this.baseMapper.updateById(transportTask);
}
/** /**
* 运行任务 * 运行任务
* *
@ -358,22 +374,9 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
throw new RuntimeException("站点释放信息缺失,请确认"); throw new RuntimeException("站点释放信息缺失,请确认");
} }
} }
//构造消息
TransportTaskDTO transportTaskDTO = new TransportTaskDTO();
transportTaskDTO.setId(checkIdResult.getId());
transportTaskDTO.setTaskName(checkIdResult.getTaskName());
transportTaskDTO.setTaskMode(checkIdResult.getTaskMode());
transportTaskDTO.setTaskType(checkIdResult.getTaskType());
transportTaskDTO.setUseMetType(checkIdResult.getUseMetType());
transportTaskDTO.setStartTime(checkIdResult.getStartTime());
transportTaskDTO.setEndTime(checkIdResult.getEndTime());
if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTaskDTO.getTaskMode())){
transportTaskProducer.sendBackwardMessage(transportTaskDTO);
}else if (TransportTaskModeEnum.FORWARD.getKey().equals(transportTaskDTO.getTaskMode())){
transportTaskProducer.sendForwardMessage(transportTaskDTO);
}
//修改状态为等待中 //修改状态为等待中
checkIdResult.setTaskStatus(TransportTaskStatusEnum.WAITING.getValue()); checkIdResult.setTaskStatus(TransportTaskStatusEnum.WAITING.getValue());
checkIdResult.setUpdateTime(LocalDateTime.now());
this.baseMapper.updateById(checkIdResult); this.baseMapper.updateById(checkIdResult);
} }
@ -394,61 +397,6 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
return result; return result;
} }
/**
* 保存任务日志
*
* @param transportTaskLog
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void saveLog(TransportTaskLog transportTaskLog) {
transportTaskLogMapper.insert(transportTaskLog);
}
/**
* 修改任务耗时
*
* @param taskId
* @param minute
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void updateTaskStatusToCompleted(Integer taskId, Double minute) {
TransportTask transportTask = this.baseMapper.selectById(taskId);
transportTask.setTaskStatus(TransportTaskStatusEnum.COMPLETED.getValue());
transportTask.setTimeConsuming(minute);
this.baseMapper.updateById(transportTask);
}
/**
* 删除任务日志
*
* @param taskId
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void deleteTaskLog(Integer taskId) {
LambdaQueryWrapper<TransportTaskLog> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(TransportTaskLog::getTaskId,taskId);
transportTaskLogMapper.delete(queryWrapper);
}
/**
* 修改任务状态
* @param taskId
* @param status
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void updateTaskStatus(Integer taskId, Integer status) {
TransportTask transportTask = this.baseMapper.selectById(taskId);
if(Objects.isNull(transportTask)){
throw new RuntimeException("此任务不存在");
}
transportTask.setTaskStatus(status);
this.baseMapper.updateById(transportTask);
}
/** /**
* @param res * @param res
*/ */
@ -475,8 +423,6 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
} }
/** /**
* 处理基础信息 * 处理基础信息
* @param outputWorkBook * @param outputWorkBook
@ -876,7 +822,8 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
if (Objects.nonNull(transactionStatus)){ if (Objects.nonNull(transactionStatus)){
transactionManager.rollback(transactionStatus); transactionManager.rollback(transactionStatus);
} }
e.printStackTrace(); info.add(e.getMessage());
log.error("任务数据导入错误",e);
}finally { }finally {
if (Objects.nonNull(workBook)){ if (Objects.nonNull(workBook)){
workBook.close(); workBook.close();
@ -1111,10 +1058,23 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
} }
}
/**
* 取消置顶
*
* @param taskId
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void setCancelTaskTop(Integer taskId) {
TransportTask transportTask = this.baseMapper.selectById(taskId);
if(Objects.isNull(transportTask)){
throw new RuntimeException("此任务不存在");
}
transportTask.setTopTask(TransportTaskTopEnum.NOT_TOP.getValue());
transportTask.setUpdateTime(LocalDateTime.now());
this.baseMapper.updateById(transportTask);
} }
/** /**
@ -1127,7 +1087,7 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
Sheet sheet = workBook.getSheetAt(0); Sheet sheet = workBook.getSheetAt(0);
//设置任务名称 //设置任务名称
Cell taskCell = sheet.getRow(1).getCell(1); Cell taskCell = sheet.getRow(1).getCell(1);
String taskCellValue = taskCell.getStringCellValue(); String taskCellValue = ExcelUtils.getStringValue(taskCell);
if (StrUtil.isBlank(taskCellValue)) { if (StrUtil.isBlank(taskCellValue)) {
info.add("任务名称不能为空"); info.add("任务名称不能为空");
}else { }else {
@ -1135,7 +1095,7 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
} }
//设置任务模式 //设置任务模式
Cell taskModeCell = sheet.getRow(1).getCell(3); Cell taskModeCell = sheet.getRow(1).getCell(3);
Integer taskModeCellValue = (int)taskModeCell.getNumericCellValue(); Integer taskModeCellValue = ExcelUtils.getIntValue(taskModeCell);
if (!TransportTaskModeEnum.FORWARD.getKey().equals(taskModeCellValue) && if (!TransportTaskModeEnum.FORWARD.getKey().equals(taskModeCellValue) &&
!TransportTaskModeEnum.BACK_FORWARD.getKey().equals(taskModeCellValue)) { !TransportTaskModeEnum.BACK_FORWARD.getKey().equals(taskModeCellValue)) {
info.add("任务模式配置错误"); info.add("任务模式配置错误");
@ -1144,7 +1104,7 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
} }
//设置气象数据来源 //设置气象数据来源
Cell metDataSourceCell = sheet.getRow(1).getCell(5); Cell metDataSourceCell = sheet.getRow(1).getCell(5);
Integer metDataSourceCellValue = (int)metDataSourceCell.getNumericCellValue(); Integer metDataSourceCellValue = ExcelUtils.getIntValue(metDataSourceCell);
if (!WeatherDataSourceEnum.PANGU.getKey().equals(metDataSourceCellValue) && if (!WeatherDataSourceEnum.PANGU.getKey().equals(metDataSourceCellValue) &&
!WeatherDataSourceEnum.GRAPHCAST.getKey().equals(metDataSourceCellValue) && !WeatherDataSourceEnum.GRAPHCAST.getKey().equals(metDataSourceCellValue) &&
!WeatherDataSourceEnum.T1H.getKey().equals(metDataSourceCellValue) && !WeatherDataSourceEnum.T1H.getKey().equals(metDataSourceCellValue) &&
@ -1157,24 +1117,22 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
} }
//设置释放下部高度 //设置释放下部高度
Cell z1Cell = sheet.getRow(2).getCell(1); Cell z1Cell = sheet.getRow(2).getCell(1);
transportTask.setZ1(z1Cell.getNumericCellValue()); transportTask.setZ1(ExcelUtils.getNumericValue(z1Cell));
//设置释放上部高度 //设置释放上部高度
Cell z2Cell = sheet.getRow(2).getCell(3); Cell z2Cell = sheet.getRow(2).getCell(3);
transportTask.setZ2(z2Cell.getNumericCellValue()); transportTask.setZ2(ExcelUtils.getNumericValue(z2Cell));
//设置粒子数量 //设置粒子数量
Cell particleCountCell = sheet.getRow(2).getCell(5); Cell particleCountCell = sheet.getRow(2).getCell(5);
transportTask.setParticleCount((int)particleCountCell.getNumericCellValue()); transportTask.setParticleCount(ExcelUtils.getIntValue(particleCountCell));
//设置开始时间 //设置开始时间
Cell startTimeCell = sheet.getRow(3).getCell(1); Cell startTimeCell = sheet.getRow(3).getCell(1);
String startTimeCellValue = startTimeCell.getStringCellValue(); transportTask.setStartTime(ExcelUtils.getDateValue(startTimeCell));
transportTask.setStartTime(LocalDateTime.parse(startTimeCellValue, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
//设置结束时间 //设置结束时间
Cell endTimeCell = sheet.getRow(3).getCell(3); Cell endTimeCell = sheet.getRow(3).getCell(3);
String endTimeCellValue = endTimeCell.getStringCellValue(); transportTask.setEndTime(ExcelUtils.getDateValue(startTimeCell));
transportTask.setEndTime(LocalDateTime.parse(endTimeCellValue,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
//设置释放数据来源 //设置释放数据来源
Cell releaseDataSourceCell = sheet.getRow(3).getCell(5); Cell releaseDataSourceCell = sheet.getRow(3).getCell(5);
Integer releaseDataSourceCellValue = (int)releaseDataSourceCell.getNumericCellValue(); Integer releaseDataSourceCellValue = ExcelUtils.getIntValue(releaseDataSourceCell);
if (!TransportReleaseDataSource.AUTO_SELECT.getKey().equals(releaseDataSourceCellValue) && if (!TransportReleaseDataSource.AUTO_SELECT.getKey().equals(releaseDataSourceCellValue) &&
!TransportReleaseDataSource.MANUAL_ENTRY.getKey().equals(releaseDataSourceCellValue)) { !TransportReleaseDataSource.MANUAL_ENTRY.getKey().equals(releaseDataSourceCellValue)) {
info.add("释放数据来源配置错误"); info.add("释放数据来源配置错误");
@ -1183,12 +1141,7 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
} }
//设置释放核素 //设置释放核素
Cell releaseNuclideCell = sheet.getRow(4).getCell(1); Cell releaseNuclideCell = sheet.getRow(4).getCell(1);
String releaseNuclideCellValue = ""; String releaseNuclideCellValue = ExcelUtils.getStringValue(releaseNuclideCell);
if (releaseNuclideCell.getCellType() == CellType.NUMERIC) {
releaseNuclideCellValue = String.valueOf((int)releaseNuclideCell.getNumericCellValue());
}else if (releaseNuclideCell.getCellType() == CellType.STRING) {
releaseNuclideCellValue = releaseNuclideCell.getStringCellValue();
}
String[] nuclides = releaseNuclideCellValue.split(","); String[] nuclides = releaseNuclideCellValue.split(",");
if (ArrayUtil.isNotEmpty(nuclides)){ if (ArrayUtil.isNotEmpty(nuclides)){
List<TransportTaskForwardSpecies> species = new ArrayList<>(); List<TransportTaskForwardSpecies> species = new ArrayList<>();
@ -1222,7 +1175,7 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
if (TransportReleaseDataSource.AUTO_SELECT.getKey().equals(transportTask.getReleaseDataSource())) { if (TransportReleaseDataSource.AUTO_SELECT.getKey().equals(transportTask.getReleaseDataSource())) {
Sheet sheet = workBook.getSheetAt(0); Sheet sheet = workBook.getSheetAt(0);
Cell sitesCell = sheet.getRow(5).getCell(1); Cell sitesCell = sheet.getRow(5).getCell(1);
String sitesCellValue = sitesCell.getStringCellValue(); String sitesCellValue = ExcelUtils.getStringValue(sitesCell);
String[] site = sitesCellValue.split(","); String[] site = sitesCellValue.split(",");
if (ArrayUtil.isNotEmpty(site)) { if (ArrayUtil.isNotEmpty(site)) {
for (String siteName : site) { for (String siteName : site) {
@ -1270,25 +1223,20 @@ public class TransportTaskServiceImpl extends ServiceImpl<TransportTaskMapper,Tr
Row row = sheet.getRow(j); Row row = sheet.getRow(j);
//获取开始时间 //获取开始时间
Cell startTimeCell = row.getCell(0); Cell startTimeCell = row.getCell(0);
String startTimeCellValue = startTimeCell.getStringCellValue(); LocalDateTime startTimeCellValue = ExcelUtils.getDateValue(startTimeCell);
//获取结束时间 //获取结束时间
Cell endTimeCell = row.getCell(1); Cell endTimeCell = row.getCell(1);
String endTimeCellValue = endTimeCell.getStringCellValue(); LocalDateTime endTimeCellValue = ExcelUtils.getDateValue(endTimeCell);
//获取释放数据 //获取释放数据
Cell releaseDataCell = row.getCell(2); Cell releaseDataCell = row.getCell(2);
String releaseCellValue = ""; String releaseCellValue = ExcelUtils.getStringValue(releaseDataCell);
if (releaseDataCell.getCellType() == CellType.NUMERIC) { if(Objects.isNull(startTimeCellValue) || Objects.isNull(endTimeCellValue) || StrUtil.isBlank(releaseCellValue)){
releaseCellValue = String.valueOf(releaseDataCell.getNumericCellValue());
}else if (releaseDataCell.getCellType() == CellType.STRING) {
releaseCellValue = releaseDataCell.getStringCellValue();
}
if(StrUtil.isBlank(startTimeCellValue) || StrUtil.isBlank(endTimeCellValue)){
info.add(sheet.getSheetName()+"设施所在sheet的排放数据有缺失"); info.add(sheet.getSheetName()+"设施所在sheet的排放数据有缺失");
break; break;
} }
TransportTaskForwardRelease release = new TransportTaskForwardRelease(); TransportTaskForwardRelease release = new TransportTaskForwardRelease();
release.setStartTime(LocalDateTime.parse(startTimeCellValue,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); release.setStartTime(startTimeCellValue);
release.setEndTime(LocalDateTime.parse(endTimeCellValue,DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"))); release.setEndTime(endTimeCellValue);
release.setReleaseAmount(releaseCellValue); release.setReleaseAmount(releaseCellValue);
releases.add(release); releases.add(release);
} }

View File

@ -0,0 +1,72 @@
package org.jeecg.util;
import cn.hutool.core.util.StrUtil;
import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.DateUtil;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
public class ExcelUtils {
/**
* 安全获取数值型单元格值支持数字/字符串含带逗号空格的数字文本
* @param cell 单元格
* @return 解析后的 double
*/
public static double getNumericValue(Cell cell) {
switch (cell.getCellType()) {
case NUMERIC:
return cell.getNumericCellValue();
case STRING:
String str = cell.getStringCellValue().trim();
if (StrUtil.isBlank(str)) {
throw new RuntimeException("列内容为空");
}
// 尝试解析去除千分位逗号空格等
str = str.replaceAll("[,\\s]", "");
return Double.parseDouble(str);
default:
throw new RuntimeException("列类型数据格式化错误");
}
}
/**
* 安全获取整数用于 ID数量等
*/
public static int getIntValue(Cell cell) {
return (int) Math.round(getNumericValue(cell));
}
/**
* 安全获取字符串防空指针
*/
public static String getStringValue(Cell cell) {
return switch (cell.getCellType()) {
case STRING -> cell.getStringCellValue();
case NUMERIC -> String.valueOf(cell.getNumericCellValue());
default -> throw new RuntimeException("列类型数据格式化错误");
};
}
/**
* 获取时间字段
* @param cell
* @return
*/
public static LocalDateTime getDateValue(Cell cell) {
if (Objects.isNull(cell)) {
return null;
}
switch (cell.getCellType()) {
case NUMERIC:
if (DateUtil.isCellDateFormatted(cell)) {
return cell.getLocalDateTimeCellValue();
}
case STRING:
return LocalDateTime.parse(cell.getStringCellValue(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
default:
throw new RuntimeException("列类型数据格式化错误");
}
}
}

View File

@ -1,40 +0,0 @@
package org.jeecg.vo;
import lombok.Data;
/**
* srs记录数据
*/
@Data
public class SRSRecord {
/**
* 经度
*/
private Double lon;
/**
* 纬度
*/
private Double lat;
/**
* 处于的小时
*/
private Integer hour;
/**
* 根据样品时间-hour得到的开始时间
*/
private Long startSecond;
/**
* 测量结束时间就是模拟结束时间
*/
private Long endSecond;
/**
* 浓度值单位暂未知
*/
private String conc;
}