diff --git a/jeecg-boot-base-core/pom.xml b/jeecg-boot-base-core/pom.xml index 887eda1..1858f4f 100644 --- a/jeecg-boot-base-core/pom.xml +++ b/jeecg-boot-base-core/pom.xml @@ -342,9 +342,10 @@ ${netcdfAll.version} - com.jcraft + com.github.mwiede jsch - 0.1.54 + 2.28.2 + compile diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java index 6122d61..065eb5b 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/CommonConstant.java @@ -318,4 +318,27 @@ public interface CommonConstant { String TRANSPORT_TIMING_ANALYSIS = "transport:timing_analysis:"; String DATA_SYNC_FREQUENCY = "data_sync:frequency"; + + /** + * 各服务器任务运行状态 + */ + String HOST_TASK_STATE = "host_task_state"; + + /** + * 输运任务状态前缀 + */ + String TRAN_TASK_STATE_PRE = "tran_task_"; + + /** + * 气象任务状态前缀 + */ + String WEATHER_TASK_STATE_PRE = "weather_task_"; + + /** + * 源项重建任务状态前缀 + */ + String BUILD_TASK_STATE_PRE = "build_task_"; + + + } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RocketMQTopConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RocketMQTopConstant.java index 8cbee06..ab6f7c7 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RocketMQTopConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RocketMQTopConstant.java @@ -12,9 +12,4 @@ public interface RocketMQTopConstant { String SAMPLE_RESULT_TOPIC = "topic_abnormal_sample_list"; String ASSOCIATED_WAVEFORM_SAMPLE_TOPIC = "topic_associated_waveform_list"; - - /** - * 各服务器输运模拟任务运行状态 - */ - String HOST_TASK_STATE = "host_task_state"; } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/TransportTaskStatusEnum.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/TransportTaskStatusEnum.java index 080c189..92b74f4 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/TransportTaskStatusEnum.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/TransportTaskStatusEnum.java @@ -24,7 +24,12 @@ public enum TransportTaskStatusEnum { /** * 已完成 */ - COMPLETED(3); + COMPLETED(3), + + /** + * 检查未通过 + */ + INSPECTION_FAILED(4); private Integer value; diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/TransportTaskTopEnum.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/TransportTaskTopEnum.java new file mode 100644 index 0000000..42f6b3d --- /dev/null +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/enums/TransportTaskTopEnum.java @@ -0,0 +1,27 @@ +package org.jeecg.common.constant.enums; + +/** + * 输运模拟任务置顶说明枚举 + */ +public enum TransportTaskTopEnum { + + /** + * 未置顶 + */ + NOT_TOP(0), + + /** + * 置顶 + */ + TOP(1); + + private Integer value; + + TransportTaskTopEnum(Integer value) { + this.value = value; + } + + public Integer getValue(){ + return this.value; + } +} diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/config/mybatis/MybatisInterceptor.java b/jeecg-boot-base-core/src/main/java/org/jeecg/config/mybatis/MybatisInterceptor.java index 864ecce..4946c8c 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/config/mybatis/MybatisInterceptor.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/config/mybatis/MybatisInterceptor.java @@ -11,6 +11,7 @@ import org.jeecg.common.util.oConvertUtils; import org.jeecg.config.security.utils.SecureUtil; import org.springframework.stereotype.Component; import java.lang.reflect.Field; +import java.time.LocalDateTime; import java.util.Date; import java.util.Properties; @@ -57,7 +58,7 @@ public class MybatisInterceptor implements Interceptor { } } // 注入创建时间 - if ("createTime".equals(field.getName())) { + if ("createTime".equals(field.getName()) && Date.class.equals(field.getType())) { field.setAccessible(true); Object localCreateDate = field.get(parameter); field.setAccessible(false); @@ -108,7 +109,7 @@ public class MybatisInterceptor implements Interceptor { field.setAccessible(false); } } - if ("updateTime".equals(field.getName())) { + if ("updateTime".equals(field.getName()) && Date.class.equals(field.getType())) { field.setAccessible(true); field.set(parameter, new Date()); field.setAccessible(false); diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/TransportTask.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/TransportTask.java index 6392550..723d8a1 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/TransportTask.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/TransportTask.java @@ -53,14 +53,7 @@ public class TransportTask{ private Integer taskType; /** - * 任务进度 - */ - @Null(message = "任务进度必须为空",groups = {InsertGroup.class, UpdateGroup.class}) - @TableField(value = "task_progress") - private Integer taskPprogress; - - /** - * 任务状态(-1执行失败,0未开始,1等待中,2运行中,3已完成) + * 任务状态(-1执行失败,0未开始,1等待中,2运行中,3已完成,4任务检查未通过) */ @Null(message = "任务状态必须为空",groups = {InsertGroup.class, UpdateGroup.class}) @TableField(value = "task_status") @@ -80,25 +73,13 @@ public class TransportTask{ @TableField(value = "time_consuming") private Double timeConsuming; - /** - * 创建人 - */ - @TableField(value = "create_by") - private String createBy; - /** * 创建时间 */ @TableField(value = "create_time") @JsonFormat(timezone = "Asia/Shanghai", pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") - private Date createTime; - - /** - * 更新人 - */ - @TableField(value = "update_by") - private String updateBy; + private LocalDateTime createTime; /** * 更新时间 @@ -106,7 +87,7 @@ public class TransportTask{ @TableField(value = "update_time") @JsonFormat(timezone = "Asia/Shanghai", pattern = "yyyy-MM-dd HH:mm:ss") @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") - private Date updateTime; + private LocalDateTime updateTime; /** * 模拟开始时间 @@ -153,6 +134,12 @@ public class TransportTask{ @TableField(value = "release_data_source") private Integer releaseDataSource; + /** + * 任务置顶 + */ + @TableField(value = "top_task") + private Integer topTask; + /** * 反演子表信息 */ diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/TransportTaskMapper.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/TransportTaskMapper.java index c2b6730..a3885cd 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/TransportTaskMapper.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/TransportTaskMapper.java @@ -1,7 +1,27 @@ package org.jeecg.modules.base.mapper; import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import org.apache.ibatis.annotations.Param; import org.jeecg.modules.base.entity.TransportTask; +import java.time.LocalDate; +import java.time.LocalDateTime; + public interface TransportTaskMapper extends BaseMapper { + + /** + * 分页查询任务数据 + * @param page + * @param taskName + * @param taskMode + * @param taskStatus + * @param metType + * @param startDateTime + * @param endDateTime + * @return + */ + IPage page(@Param("page") Page page,@Param("taskName") String taskName,@Param("taskMode") Integer taskMode, + @Param("taskStatus") Integer taskStatus,@Param("metType") Integer metType,@Param("startDateTime") LocalDateTime startDateTime,@Param("endDateTime") LocalDateTime endDateTime); } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/xml/TransportTaskMapper.xml b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/xml/TransportTaskMapper.xml new file mode 100644 index 0000000..810eccb --- /dev/null +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/mapper/xml/TransportTaskMapper.xml @@ -0,0 +1,41 @@ + + + + + + \ No newline at end of file diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/TranTaskMessageConsumerHandler.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/TranTaskMessageConsumerHandler.java index 99fbdd0..fd82f38 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/TranTaskMessageConsumerHandler.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/TranTaskMessageConsumerHandler.java @@ -2,17 +2,22 @@ package org.jeecg.transport.consumer; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson2.JSON; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; +import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.RocketMQTopConstant; +import org.jeecg.common.constant.enums.TransportTaskStatusEnum; +import org.jeecg.common.constant.enums.TransportTaskTopEnum; import org.jeecg.common.properties.DataFusionProperties; import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.TransportSimulationProperties; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.dto.TransportTaskDTO; +import org.jeecg.modules.base.entity.TransportTask; import org.jeecg.modules.base.mapper.*; import org.jeecg.properties.ServerProperties; import org.jeecg.transport.consumer.china.AbstractTaskMsgHandler; @@ -49,16 +54,47 @@ public class TranTaskMessageConsumerHandler{ private final TransportTaskForwardReleaseMapper taskForwardReleaseMapper; private final StationDataService stationDataService; private final StationsModValService stationsModValService; - @Value("${rocketmq.consumer.group}") - private String consumerGroup; - @Value("${rocketmq.name-server}") - private String nameServerAddress; public void startConsumerThread(){ MessageConsumerThread messageConsumerThread = new MessageConsumerThread(); - messageConsumerThread.setName("输运模拟任务处理线程"); + messageConsumerThread.setName("transport-task-thread"); messageConsumerThread.start(); - log.info("启动输运模拟任务处理线程----------------------"); + log.info("启动输运模拟任务消费线程----------------------"); + } + + /** + * 处理输运模拟任务 + */ + private void handlerTask(TransportTask message){ + AbstractTaskMsgHandler taskMsgHandler = new Server11TaskHandler(); + taskMsgHandler.init(transportTaskMapper,taskBackwardChildMapper,weatherDataMapper, + simulationProperties,systemStorageProperties,dataFusionProperties, + serverProperties,taskForwardSpeciesMapper,taskForwardChildMapper, + transportTaskService,taskForwardReleaseMapper,stationDataService, + stationsModValService,redisUtil); + taskMsgHandler.handler(message); + } + + /** + * 获取等待中的任务 + * @return + */ + private TransportTask getTopMessage(){ + LambdaQueryWrapper topQueryWrapper = new LambdaQueryWrapper<>(); + topQueryWrapper.eq(TransportTask::getTaskStatus, TransportTaskStatusEnum.WAITING.getValue()); + topQueryWrapper.orderByDesc(TransportTask::getTopTask); + topQueryWrapper.orderByDesc(TransportTask::getUpdateTime); + topQueryWrapper.last("LIMIT 1"); + TransportTask transportTask = transportTaskMapper.selectOne(topQueryWrapper); + //如果没有要置顶执行的任务,则按照普通创建时间排序查询 + if(Objects.isNull(transportTask)){ + LambdaQueryWrapper timeQueryWrapper = new LambdaQueryWrapper<>(); + timeQueryWrapper.eq(TransportTask::getTaskStatus, TransportTaskStatusEnum.WAITING.getValue()); + timeQueryWrapper.orderByAsc(TransportTask::getCreateTime); + timeQueryWrapper.last("LIMIT 1"); + transportTask = transportTaskMapper.selectOne(timeQueryWrapper); + } + return transportTask; } private class MessageConsumerThread extends Thread{ @@ -68,53 +104,23 @@ public class TranTaskMessageConsumerHandler{ while (true) { try { //获取本机项数据,如果为空,表示本机没有任务在运行 - boolean flag = redisUtil.hHasKey(RocketMQTopConstant.HOST_TASK_STATE, serverProperties.getHost()); + boolean flag = redisUtil.hHasKey(CommonConstant.HOST_TASK_STATE,CommonConstant.TRAN_TASK_STATE_PRE+serverProperties.getHost()); if (!flag) { - //如果不存在,则获取一条消息尝试执行 - DefaultLitePullConsumer consumer = new DefaultLitePullConsumer(); try { - consumer.setConsumerGroup(consumerGroup); - consumer.setNamesrvAddr(nameServerAddress); - consumer.setPullBatchSize(1); - consumer.subscribe(RocketMQTopConstant.BACKWARD_TRANSPORT_TASK_TOPIC); - consumer.setAutoCommit(false); - consumer.start(); - TransportTaskDTO transportTaskDTO = null; - List messages = consumer.poll(5000L); - if (CollUtil.isNotEmpty(messages)) { - transportTaskDTO = JSON.parseObject(messages.get(0).getBody(),TransportTaskDTO.class); - boolean matchFlag = handlerTask(transportTaskDTO); - if (matchFlag){ - consumer.commit(); - } - log.info("消费一条消息:{}",transportTaskDTO); + TransportTask topMessage = getTopMessage(); + if (Objects.nonNull(topMessage)) { + handlerTask(topMessage); } - }catch (MQClientException e){ - log.error("消费者启动异常,原因为:{}",e.getMessage(),e); - }finally { - consumer.shutdown(); + }catch (Exception e){ + log.error("消费输运任务数据异常,原因为:{}",e.getMessage(),e); } } - log.info("30秒钟处理一次"); + log.info("60秒钟处理一次"); TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { throw new RuntimeException(e); } } } - - /** - * 处理输运模拟任务 - */ - private boolean handlerTask(TransportTaskDTO message){ - AbstractTaskMsgHandler taskMsgHandler = new Server11TaskHandler(); - taskMsgHandler.init(transportTaskMapper,taskBackwardChildMapper,weatherDataMapper, - simulationProperties,systemStorageProperties,dataFusionProperties, - serverProperties,taskForwardSpeciesMapper,taskForwardChildMapper, - transportTaskService,taskForwardReleaseMapper,stationDataService, - stationsModValService); - taskMsgHandler.handler(message); - return taskMsgHandler.isMatchFlag(); - } } } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractChain.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractChain.java index fa31fff..08bd7e1 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractChain.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractChain.java @@ -1,6 +1,5 @@ package org.jeecg.transport.consumer.china; -import lombok.Getter; import lombok.Setter; /** @@ -25,8 +24,4 @@ public abstract class AbstractChain { */ protected abstract void setChina(); - /** - * 是否匹配成功 - */ - protected boolean matchFlag = false; } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractTaskMsgHandler.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractTaskMsgHandler.java index 7fcb0f4..7494153 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractTaskMsgHandler.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/AbstractTaskMsgHandler.java @@ -1,11 +1,14 @@ package org.jeecg.transport.consumer.china; import lombok.extern.slf4j.Slf4j; +import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.enums.TransportTaskModeEnum; import org.jeecg.common.properties.DataFusionProperties; import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.TransportSimulationProperties; +import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.dto.TransportTaskDTO; +import org.jeecg.modules.base.entity.TransportTask; import org.jeecg.modules.base.mapper.*; import org.jeecg.properties.ServerProperties; import org.jeecg.transport.flexparttask.AbstractTaskExec; @@ -22,6 +25,7 @@ import java.util.Objects; @Slf4j public abstract class AbstractTaskMsgHandler extends AbstractChain{ + protected RedisUtil redisUtil; protected TransportTaskMapper transportTaskMapper; protected TransportTaskBackwardChildMapper taskBackwardChildMapper; protected WeatherDataMapper weatherDataMapper; @@ -51,7 +55,8 @@ public abstract class AbstractTaskMsgHandler extends AbstractChain{ TransportTaskService transportTaskService, TransportTaskForwardReleaseMapper taskForwardReleaseMapper, StationDataService stationDataService, - StationsModValService stationsModValService) { + StationsModValService stationsModValService, + RedisUtil redisUtil) { this.transportTaskMapper = transportTaskMapper; this.taskBackwardChildMapper = taskBackwardChildMapper; this.weatherDataMapper = weatherDataMapper; @@ -65,6 +70,7 @@ public abstract class AbstractTaskMsgHandler extends AbstractChain{ this.taskForwardReleaseMapper = taskForwardReleaseMapper; this.stationDataService = stationDataService; this.stationsModValService = stationsModValService; + this.redisUtil = redisUtil; this.setChina(); } @@ -80,59 +86,43 @@ public abstract class AbstractTaskMsgHandler extends AbstractChain{ /** * 处理消息 */ - public abstract void handler(TransportTaskDTO message); - - /** - * 设置匹配标记 - * @param matchFlag - */ - protected void setMatchFlag(boolean matchFlag) { - this.matchFlag = matchFlag; - if (Objects.nonNull(this.previous)) { - this.previous.setMatchFlag(matchFlag); - } - } - - /** - * 获取匹配标记 - * @return - */ - public boolean isMatchFlag() { - return this.matchFlag; - } + public abstract void handler(TransportTask message); /** * 运行任务 - * @param transportTaskDTO + * @param transportTask */ - protected void runTask(TransportTaskDTO transportTaskDTO){ - log.info("运行任务测试"); - if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTaskDTO.getTaskMode())){ + protected void runTask(TransportTask transportTask,String ip){ + log.info("收到任务:"+transportTask.getTaskName()); + boolean flag = false; + if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(transportTask.getTaskMode())){ AbstractTaskExec taskExec = new BackwardTaskExec(); - taskExec.init(weatherDataMapper,transportTaskService,transportTaskDTO, + taskExec.init(weatherDataMapper,transportTaskService,transportTask, simulationProperties,systemStorageProperties, - dataFusionProperties,serverProperties,taskBackwardChildMapper); - taskExec.setName("大气输运反演任务执行线程"); + dataFusionProperties,serverProperties,taskBackwardChildMapper,redisUtil); + taskExec.setName("flexpart-backword-thread"); //匹配成功并且检查成功才能设置为true - boolean flag = taskExec.checkTask(); + flag = taskExec.checkTask(); if (flag){ - this.setMatchFlag(true); taskExec.start(); } - }else if (TransportTaskModeEnum.FORWARD.getKey().equals(transportTaskDTO.getTaskMode())){ + }else if (TransportTaskModeEnum.FORWARD.getKey().equals(transportTask.getTaskMode())){ AbstractTaskExec taskExec = new ForwardTaskExec(); - taskExec.init(weatherDataMapper,transportTaskService,transportTaskDTO, + taskExec.init(weatherDataMapper,transportTaskService,transportTask, simulationProperties,systemStorageProperties,serverProperties, stationDataService,stationsModValService,taskForwardSpeciesMapper, - taskForwardChildMapper,taskForwardReleaseMapper); - taskExec.setName("大气输运正演任务执行线程"); + taskForwardChildMapper,taskForwardReleaseMapper,redisUtil); + taskExec.setName("flexpart-fword-thread"); //匹配成功并且检查成功才能设置为true - boolean flag = taskExec.checkTask(); + flag = taskExec.checkTask(); if (flag){ - this.setMatchFlag(true); taskExec.start(); } - taskExec.start(); + } + if (flag){ + log.info("任务:{},匹配成功,当前在主机:{}上执行。", transportTask.getTaskName(),ip); + }else { + transportTaskService.setInpectionFailed(transportTask); } } } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server11TaskHandler.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server11TaskHandler.java index 108fafa..495bb19 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server11TaskHandler.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server11TaskHandler.java @@ -1,7 +1,7 @@ package org.jeecg.transport.consumer.china; import org.jeecg.common.constant.enums.TransportTaskModeEnum; -import org.jeecg.modules.base.dto.TransportTaskDTO; +import org.jeecg.modules.base.entity.TransportTask; /** * 只处理反演任务 @@ -24,11 +24,12 @@ public class Server11TaskHandler extends AbstractTaskMsgHandler { * 处理消息 */ @Override - public void handler(TransportTaskDTO message) { + public void handler(TransportTask message) { if(serverProperties.getHost().equals(serverProperties.getIp11())){ - if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(message.getTaskMode())){ - super.runTask(message); - } + super.runTask(message,serverProperties.getIp11()); +// if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(message.getTaskMode())){ +// super.runTask(message,serverProperties.getIp11()); +// } }else { super.next.handler(message); } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server12TaskHandler.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server12TaskHandler.java index 0f53804..91656e3 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server12TaskHandler.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server12TaskHandler.java @@ -1,6 +1,6 @@ package org.jeecg.transport.consumer.china; -import org.jeecg.modules.base.dto.TransportTaskDTO; +import org.jeecg.modules.base.entity.TransportTask; /** * 处理反演和正演任务 @@ -22,9 +22,9 @@ public class Server12TaskHandler extends AbstractTaskMsgHandler { * 处理消息 */ @Override - public void handler(TransportTaskDTO message) { + public void handler(TransportTask message) { if(serverProperties.getHost().equals(serverProperties.getIp12())){ - super.runTask(message); + super.runTask(message,serverProperties.getIp12()); }else { super.next.handler(message); } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server13TaskHandler.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server13TaskHandler.java index 7eaa112..53c77dd 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server13TaskHandler.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server13TaskHandler.java @@ -1,6 +1,6 @@ package org.jeecg.transport.consumer.china; -import org.jeecg.modules.base.dto.TransportTaskDTO; +import org.jeecg.modules.base.entity.TransportTask; /** * 处理反演和正演任务 @@ -22,9 +22,9 @@ public class Server13TaskHandler extends AbstractTaskMsgHandler { * 处理消息 */ @Override - public void handler(TransportTaskDTO message) { + public void handler(TransportTask message) { if(serverProperties.getHost().equals(serverProperties.getIp13())){ - super.runTask(message); + super.runTask(message,serverProperties.getIp12()); }else { super.next.handler(message); } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server14TaskHandler.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server14TaskHandler.java index b680842..6b49a9e 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server14TaskHandler.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/consumer/china/Server14TaskHandler.java @@ -1,6 +1,6 @@ package org.jeecg.transport.consumer.china; -import org.jeecg.modules.base.dto.TransportTaskDTO; +import org.jeecg.modules.base.entity.TransportTask; /** * 处理反演和永久型正演任务 @@ -19,11 +19,11 @@ public class Server14TaskHandler extends AbstractTaskMsgHandler { * 处理消息 */ @Override - public void handler(TransportTaskDTO message) { + public void handler(TransportTask message) { if(serverProperties.getHost().equals(serverProperties.getIp14())){ //这里需要再加个判断,14服务器处理反演和永久任务 // if (TransportTaskModeEnum.BACK_FORWARD.getKey().equals(message.getTaskMode())){ -// super.runTask(message); +// super.runTask(message,serverProperties.getIp11()); // } } } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/AbstractTaskExec.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/AbstractTaskExec.java index 26f6a82..be7aee8 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/AbstractTaskExec.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/AbstractTaskExec.java @@ -4,17 +4,15 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.LocalDateTimeUtil; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; -import com.jcraft.jsch.ChannelExec; -import com.jcraft.jsch.JSch; -import com.jcraft.jsch.JSchException; -import com.jcraft.jsch.Session; -import lombok.Setter; +import com.jcraft.jsch.*; +import lombok.extern.slf4j.Slf4j; import org.apache.logging.log4j.util.Strings; +import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.common.properties.DataFusionProperties; import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.TransportSimulationProperties; -import org.jeecg.modules.base.dto.TransportTaskDTO; +import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.TransportTask; import org.jeecg.modules.base.entity.WeatherData; import org.jeecg.modules.base.mapper.*; @@ -26,8 +24,8 @@ import java.io.*; import java.time.LocalDateTime; import java.time.temporal.ChronoUnit; import java.util.List; -import java.util.Properties; +@Slf4j public abstract class AbstractTaskExec extends Thread{ protected WeatherDataMapper weatherDataMapper; @@ -43,14 +41,15 @@ public abstract class AbstractTaskExec extends Thread{ protected TransportTaskForwardReleaseMapper taskForwardReleaseMapper; protected TransportTaskBackwardChildMapper taskBackwardChildMapper; protected TransportTask transportTask; - protected TransportTaskDTO transportTaskDTO; + protected RedisUtil redisUtil; + protected boolean taskRunError; /** * 初始化 */ public void init(WeatherDataMapper weatherDataMapper, TransportTaskService transportTaskService, - TransportTaskDTO transportTaskDTO, + TransportTask transportTask, TransportSimulationProperties simulationProperties, SystemStorageProperties systemStorageProperties, ServerProperties serverProperties, @@ -58,10 +57,11 @@ public abstract class AbstractTaskExec extends Thread{ StationsModValService stationsModValService, TransportTaskForwardSpeciesMapper taskForwardSpeciesMapper, TransportTaskForwardChildMapper taskForwardChildMapper, - TransportTaskForwardReleaseMapper taskForwardReleaseMapper){ + TransportTaskForwardReleaseMapper taskForwardReleaseMapper, + RedisUtil redisUtil){ this.weatherDataMapper = weatherDataMapper; this.transportTaskService = transportTaskService; - this.transportTaskDTO = transportTaskDTO; + this.transportTask = transportTask; this.simulationProperties = simulationProperties; this.systemStorageProperties = systemStorageProperties; this.serverProperties = serverProperties; @@ -70,6 +70,7 @@ public abstract class AbstractTaskExec extends Thread{ this.taskForwardSpeciesMapper = taskForwardSpeciesMapper; this.taskForwardChildMapper = taskForwardChildMapper; this.taskForwardReleaseMapper = taskForwardReleaseMapper; + this.redisUtil = redisUtil; } /** @@ -77,20 +78,22 @@ public abstract class AbstractTaskExec extends Thread{ */ public void init(WeatherDataMapper weatherDataMapper, TransportTaskService transportTaskService, - TransportTaskDTO transportTaskDTO, + TransportTask transportTask, TransportSimulationProperties simulationProperties, SystemStorageProperties systemStorageProperties, DataFusionProperties dataFusionProperties, ServerProperties serverProperties, - TransportTaskBackwardChildMapper taskBackwardChildMapper){ + TransportTaskBackwardChildMapper taskBackwardChildMapper, + RedisUtil redisUtil){ this.weatherDataMapper = weatherDataMapper; this.transportTaskService = transportTaskService; - this.transportTaskDTO = transportTaskDTO; + this.transportTask = transportTask; this.simulationProperties = simulationProperties; this.systemStorageProperties = systemStorageProperties; this.dataFusionProperties = dataFusionProperties; this.serverProperties = serverProperties; this.taskBackwardChildMapper = taskBackwardChildMapper; + this.redisUtil = redisUtil; } public abstract boolean checkTask(); @@ -99,6 +102,20 @@ public abstract class AbstractTaskExec extends Thread{ protected abstract void execSimulation(); + /** + * 设置任务运行标记 + */ + protected void setTaskRunFlag(){ + redisUtil.hset(CommonConstant.HOST_TASK_STATE,CommonConstant.TRAN_TASK_STATE_PRE+serverProperties.getHost(),transportTask.getId()); + } + + /** + * 任务运行失败,取消运行标记 + */ + protected void cancelTaskRunFlag(){ + redisUtil.hdel(CommonConstant.HOST_TASK_STATE,CommonConstant.TRAN_TASK_STATE_PRE+serverProperties.getHost()); + } + /** * 检查气象数据 */ @@ -193,71 +210,36 @@ public abstract class AbstractTaskExec extends Thread{ return path.toString(); } + /** + * 持续读取日志 + */ + protected void execCommand(String command,Session session){ + ChannelExec channel = null; + try{ + //打开一个执行通道 + channel = (ChannelExec) session.openChannel("exec"); + String fullCommand = command + " 2>&1"; + channel.setCommand(fullCommand); + // 获取脚本的标准输出流,包含错误输出流 + InputStream in = channel.getInputStream(); + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + // 连接通道 + channel.connect(); - protected class JSchRemoteRunner{ - - private JSch jsch = new JSch(); - - private Session session = null; - - private ChannelExec channel = null; - - @Setter - private String command; - - /** - * 登录 - * - * @param host - * @param port - * @param username - * @param password - * @throws JSchException - */ - protected void login(String host, int port, String username, String password) throws JSchException { - Properties config = new Properties(); - config.put("StrictHostKeyChecking","no"); - - session = jsch.getSession(username, host, port); - session.setPassword(password); - session.setConfig(config); - session.setServerAliveInterval(30); - session.setServerAliveCountMax(3); - session.setTimeout(0); - session.connect(); - } - - /** - * 持续读取日志 - */ - protected void execCommand() throws JSchException, IOException { - try{ - //打开一个执行通道 - channel = (ChannelExec) session.openChannel("exec"); - String fullCommand = command + " 2>&1"; - channel.setCommand(fullCommand); - // 获取脚本的标准输出流,包含错误输出流 - InputStream in = channel.getInputStream(); - BufferedReader reader = new BufferedReader(new InputStreamReader(in)); - // 连接通道 - channel.connect(); - - String line; - while ((line = reader.readLine()) != null) { - if(StrUtil.isNotBlank(line)){ - ProgressQueue.getInstance().offer(new ProgressEvent(transportTask.getId(),line)); + String line; + while ((line = reader.readLine()) != null) { + if(StrUtil.isNotBlank(line)){ + if(line.trim().startsWith("ERROR STOP")){ + taskRunError = true; } + ProgressQueue.getInstance().offer(new ProgressEvent(transportTask.getId(),line)); } - }catch(JSchException |IOException e){ - throw new RuntimeException(e); - }finally { - // 关闭资源 - if (channel != null) { - channel.disconnect(); - } - if (session != null) { - session.disconnect(); - } + } + }catch(JSchException |IOException e){ + throw new RuntimeException(e); + }finally { + if (channel != null) { + channel.disconnect(); } } } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/BackwardTaskExec.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/BackwardTaskExec.java index f4c4d92..4d080ee 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/BackwardTaskExec.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/BackwardTaskExec.java @@ -1,14 +1,15 @@ package org.jeecg.transport.flexparttask; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.io.FileUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.util.Strings; import org.jeecg.common.constant.enums.FlexpartSpeciesType; import org.jeecg.common.constant.enums.TransportTaskStatusEnum; import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.modules.base.entity.*; +import org.jeecg.transport.util.JSchRemoteRunner; import java.io.File; import java.math.BigDecimal; import java.math.RoundingMode; @@ -22,6 +23,7 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * 反演任务线程 */ +@Slf4j public class BackwardTaskExec extends AbstractTaskExec { @Override @@ -38,10 +40,9 @@ public class BackwardTaskExec extends AbstractTaskExec { //校验任务 AtomicBoolean flag = new AtomicBoolean(true); List msgList = new ArrayList<>(); - super.transportTask = this.transportTaskService.getById(super.transportTaskDTO.getId()); //查询站点信息 LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(TransportTaskBackwardChild::getTaskId,super.transportTaskDTO.getId()); + queryWrapper.eq(TransportTaskBackwardChild::getTaskId,super.transportTask.getId()); queryWrapper.orderByAsc(TransportTaskBackwardChild::getId); List transportTaskChildren = super.taskBackwardChildMapper.selectList(queryWrapper); if(CollUtil.isEmpty(transportTaskChildren)){ @@ -84,6 +85,8 @@ public class BackwardTaskExec extends AbstractTaskExec { StopWatch stopWatch = new StopWatch(); stopWatch.start(); try{ + //设置任务运行标记 + super.setTaskRunFlag(); //修改任务状态为执行中 super.transportTaskService.updateTaskStatus(super.transportTask.getId(), TransportTaskStatusEnum.IN_OPERATION.getValue()); //如果此任务已存在历史日志,先清除 @@ -91,12 +94,15 @@ public class BackwardTaskExec extends AbstractTaskExec { //执行模拟 this.execSimulation(); //生成SRS文件 - this.generateSRSFile(); +// this.generateSRSFile(); }catch (Exception e){ String taskErrorLog = "任务执行失败,原因:"+e.getMessage(); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskErrorLog)); + //如果还未执行到flexpart,java业务代码报错,修改状态 super.transportTaskService.updateTaskStatus(super.transportTask.getId(),TransportTaskStatusEnum.FAILURE.getValue()); - throw e; + super.cancelTaskRunFlag(); + log.error(taskErrorLog); + e.printStackTrace(); }finally { //添加任务耗时 stopWatch.stop(); @@ -104,7 +110,15 @@ public class BackwardTaskExec extends AbstractTaskExec { double min = seconds/60D; BigDecimal bgMin = new BigDecimal(min); BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP); - super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue()); + + Integer taskState; + if (super.taskRunError){ + //如果super.taskRunError等于true,说明flexpart运行过程日志提示出错,这里把任务状态改为失败 + taskState = TransportTaskStatusEnum.FAILURE.getValue(); + }else{ + taskState = TransportTaskStatusEnum.COMPLETED.getValue(); + } + super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue(),taskState); String taskCompletedLog = "任务执行完成,耗时:"+min+"分钟"; ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskCompletedLog)); } @@ -114,16 +128,19 @@ public class BackwardTaskExec extends AbstractTaskExec { * 执行模拟 */ protected void execSimulation(){ -// Process process = null; + //ssh连接宿主机调用flexpart + JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner(); try { + //登录ssh + jschRemoteRunner.login(super.serverProperties.getHost(),super.serverProperties.getPort(), + super.serverProperties.getUsername(),super.serverProperties.getPassword()); + String paramMsg = "生成flexpart所需参数文件:param.config,stations.config"; ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),paramMsg)); //处理参数配置文件 String paramConfigPath = super.getParamConfigPath(super.transportTask.getUseMetType()); String metDataPath = super.getMetDataPath(super.transportTask.getUseMetType()); - if(!FileUtil.exist(paramConfigPath)){ - FileUtil.touch(paramConfigPath); - } + StringBuilder paramContent = new StringBuilder(); paramContent.append(super.transportTask.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); paramContent.append(super.transportTask.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); @@ -133,17 +150,15 @@ public class BackwardTaskExec extends AbstractTaskExec { paramContent.append(super.transportTask.getZ1()).append("\n"); paramContent.append(super.transportTask.getZ2()).append("\n"); paramContent.append(metDataPath).append("\n"); - paramContent.append(super.simulationProperties.getOutputPath()+File.separator+super.transportTask.getTaskName()).append("\n"); + paramContent.append(super.simulationProperties.getOutputPath()+ File.separator+super.transportTask.getTaskName()).append("\n"); paramContent.append(FlexpartSpeciesType.NOT_SPECIES.getId()).append("\n");//反演固定61,不显示具体核素,只显示Xe - FileUtil.writeString(paramContent.toString(),paramConfigPath,"UTF-8"); + jschRemoteRunner.writeFile(paramConfigPath,paramContent.toString()); //处理台站数据文件 List stationConfigInfo = new ArrayList<>(); String stationsConfigPath = super.getStationsConfigPath(super.transportTask.getUseMetType())+".backward"; - if(!FileUtil.exist(stationsConfigPath)){ - FileUtil.touch(stationsConfigPath); - } + super.transportTask.getBackwardChild().forEach(taskChild -> { - String format = "%s,%f,%f,%s,%s,%s,%s,%s"; + String format = "%s,%f,%f,%s,%s,%s,%s,%s\n"; BigDecimal srcReleaseAmount = new BigDecimal(taskChild.getReleaseAmount()); BigDecimal constant = new BigDecimal("1000"); DecimalFormat scientificFormat = new DecimalFormat("0.00E0"); @@ -156,21 +171,19 @@ public class BackwardTaskExec extends AbstractTaskExec { taskChild.getAcqEndTime().format(DateTimeFormatter.ofPattern("HHmmss"))); stationConfigInfo.add(row); }); - //最后一行需要换行,否则启动flexpart报错 - stationConfigInfo.add("\n"); - FileUtil.writeLines(stationConfigInfo,stationsConfigPath,"UTF-8"); + String stationConfigInfoStr = String.join("", stationConfigInfo); + jschRemoteRunner.writeFile(stationsConfigPath,stationConfigInfoStr); + //获取反演脚本路径 String scriptPath = this.getBackForwardScriptPath(super.transportTask.getUseMetType()); String execScriptMsg = "执行任务脚本,开始模拟,路径为:"+scriptPath; ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),execScriptMsg)); - //ssh连接宿主机调用flexpart - JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner(); - jschRemoteRunner.setCommand(scriptPath); - jschRemoteRunner.login(super.serverProperties.getHost(),super.serverProperties.getPort(), - super.serverProperties.getUsername(),super.serverProperties.getPassword()); - jschRemoteRunner.execCommand(); + //执行命令 + super.execCommand(scriptPath,jschRemoteRunner.getSession()); } catch (Exception e) { throw new RuntimeException(e); + }finally { + jschRemoteRunner.close(); } } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ForwardTaskExec.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ForwardTaskExec.java index 0244d7b..2e75041 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ForwardTaskExec.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ForwardTaskExec.java @@ -1,13 +1,14 @@ package org.jeecg.transport.flexparttask; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.io.FileUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.time.StopWatch; import org.apache.logging.log4j.util.Strings; import org.jeecg.common.constant.enums.TransportTaskStatusEnum; import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.modules.base.entity.*; +import org.jeecg.transport.util.JSchRemoteRunner; import java.io.File; import java.math.BigDecimal; import java.math.RoundingMode; @@ -21,6 +22,7 @@ import java.util.stream.Collectors; /** * 正演任务线程 */ +@Slf4j public class ForwardTaskExec extends AbstractTaskExec { @Override @@ -37,7 +39,6 @@ public class ForwardTaskExec extends AbstractTaskExec { //校验任务 AtomicBoolean flag = new AtomicBoolean(true); List msgList = new ArrayList<>(); - super.transportTask = this.transportTaskService.getById(super.transportTaskDTO.getId()); //查询站点信息 LambdaQueryWrapper siteQueryWrapper = new LambdaQueryWrapper<>(); siteQueryWrapper.eq(TransportTaskForwardChild::getTaskId,transportTask.getId()); @@ -100,6 +101,8 @@ public class ForwardTaskExec extends AbstractTaskExec { StopWatch stopWatch = new StopWatch(); stopWatch.start(); try{ + //设置任务运行标记 + super.setTaskRunFlag(); //修改任务状态为执行中 super.transportTaskService.updateTaskStatus(super.transportTask.getId(), TransportTaskStatusEnum.IN_OPERATION.getValue()); //如果此任务已存在历史日志,先清除 @@ -115,7 +118,8 @@ public class ForwardTaskExec extends AbstractTaskExec { String taskErrorLog = "任务执行失败,原因:"+e.getMessage(); ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskErrorLog)); this.transportTaskService.updateTaskStatus(super.transportTask.getId(),TransportTaskStatusEnum.FAILURE.getValue()); - throw e; + super.cancelTaskRunFlag(); + log.error(taskErrorLog); }finally { //添加任务耗时 stopWatch.stop(); @@ -123,7 +127,15 @@ public class ForwardTaskExec extends AbstractTaskExec { double min = seconds/60D; BigDecimal bgMin = new BigDecimal(min); BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP); - super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue()); + + Integer taskState; + if (super.taskRunError){ + //如果super.taskRunError等于true,说明flexpart运行过程日志提示出错,这里把任务状态改为失败 + taskState = TransportTaskStatusEnum.FAILURE.getValue(); + }else{ + taskState = TransportTaskStatusEnum.COMPLETED.getValue(); + } + super.transportTaskService.updateTaskStatusToCompleted(super.transportTask.getId(),result.doubleValue(),taskState); String taskCompletedLog = "任务执行完成,耗时:"+min+"分钟"; ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),taskCompletedLog)); } @@ -133,16 +145,19 @@ public class ForwardTaskExec extends AbstractTaskExec { * 执行模拟 */ protected void execSimulation(){ - Process process = null; + //ssh连接宿主机调用flexpart + JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner(); try { + //登录ssh + jschRemoteRunner.login(super.serverProperties.getHost(), super.serverProperties.getPort(), + super.serverProperties.getUsername(), super.serverProperties.getPassword()); + String paramMsg = "生成flexpart所需配置文件:param.config,stations.config,input_site_hour"; ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),paramMsg)); //处理参数配置文件-param.config String paramConfigPath = super.getParamConfigPath(super.transportTask.getUseMetType()); String metDataPath = super.getMetDataPath(super.transportTask.getUseMetType()); - if(!FileUtil.exist(paramConfigPath)){ - FileUtil.touch(paramConfigPath); - } + StringBuilder paramContent = new StringBuilder(); paramContent.append(super.transportTask.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); paramContent.append(super.transportTask.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMdd"))).append("\n"); @@ -170,7 +185,7 @@ public class ForwardTaskExec extends AbstractTaskExec { paramContent.append("\n"); } } - FileUtil.writeString(paramContent.toString(),paramConfigPath,"UTF-8"); + jschRemoteRunner.writeFile(paramConfigPath,paramContent.toString()); //处理台站数据文件-stations.config if (CollUtil.isEmpty(super.transportTask.getForwardChild())){ @@ -181,26 +196,19 @@ public class ForwardTaskExec extends AbstractTaskExec { } List stationConfigInfo = new ArrayList<>(); String stationsConfigPath = super.getStationsConfigPath(super.transportTask.getUseMetType()); - if(!FileUtil.exist(stationsConfigPath)){ - FileUtil.touch(stationsConfigPath); - } + super.transportTask.getForwardChild().forEach(station ->{ - String format = "%s,%f,%f,%s"; + String format = "%s,%f,%f,%s\n"; //最后0是以前版本的总释放量,改版后无用,给个默认值 String row = String.format(format,station.getStationCode(),station.getLat(),station.getLon(),"0"); stationConfigInfo.add(row); }); - //最后一行需要换行,否则启动flexpart报错 - stationConfigInfo.add("\n"); - FileUtil.writeLines(stationConfigInfo,stationsConfigPath,"UTF-8"); + String stationConfigInfoStr = String.join("", stationConfigInfo); + jschRemoteRunner.writeFile(stationsConfigPath,stationConfigInfoStr); //配置各站点排放数据 - //如果input_site_hour目录不存在先创建 - if(!FileUtil.exist(super.simulationProperties.getInputSiteHourPath())){ - FileUtil.mkdir(super.simulationProperties.getInputSiteHourPath()); - }else { - FileUtil.clean(super.simulationProperties.getInputSiteHourPath()); - } + //清空历史文件数据 + jschRemoteRunner.clearDir(super.simulationProperties.getInputSiteHourPath()); Map> releaseInfoMap = new HashMap<>(); for (TransportTaskForwardChild station : super.transportTask.getForwardChild()){ if (CollUtil.isEmpty(station.getForwardReleaseChild())){ @@ -212,11 +220,10 @@ public class ForwardTaskExec extends AbstractTaskExec { releaseInfoMap.put(station.getStationCode(),station.getForwardReleaseChild()); } releaseInfoMap .forEach((stationCode,releaseInfoList)->{ - String stationReleaseFile = super.simulationProperties.getInputSiteHourPath() + File.separator + stationCode + ".txt"; - FileUtil.touch(stationReleaseFile); + String stationReleaseFile = super.simulationProperties.getInputSiteHourPath() + "/" + stationCode + ".txt"; List stationReleaseInfo = new ArrayList<>(); releaseInfoList.forEach(releaseInfo ->{ - String format = "%s,%s,%s"; + String format = "%s,%s,%s\n"; String startTime = releaseInfo.getStartTime().format(DateTimeFormatter.ofPattern("yyyyMMddHH")); String endTime = releaseInfo.getEndTime().format(DateTimeFormatter.ofPattern("yyyyMMddHH")); BigDecimal releaseAmount = new BigDecimal(releaseInfo.getReleaseAmount()); @@ -227,20 +234,18 @@ public class ForwardTaskExec extends AbstractTaskExec { String row = String.format(format, startTime, endTime, releaseAmountFormat); stationReleaseInfo.add(row); }); - FileUtil.writeLines(stationReleaseInfo,stationReleaseFile,"UTF-8"); + jschRemoteRunner.writeFile(stationReleaseFile,String.join("",stationReleaseInfo)); }); //获取正演脚本路径 String scriptPath = this.getForwardScriptPath(super.transportTask.getUseMetType()); String execScriptMsg = "执行任务脚本,开始模拟,路径为:"+scriptPath; ProgressQueue.getInstance().offer(new ProgressEvent(super.transportTask.getId(),execScriptMsg)); - //ssh连接宿主机调用flexpart - JSchRemoteRunner jschRemoteRunner = new JSchRemoteRunner(); - jschRemoteRunner.setCommand(scriptPath); - jschRemoteRunner.login(super.serverProperties.getHost(), super.serverProperties.getPort(), - super.serverProperties.getUsername(), super.serverProperties.getPassword()); - jschRemoteRunner.execCommand(); + //执行命令 + super.execCommand(scriptPath,jschRemoteRunner.getSession()); } catch (Exception e) { throw new RuntimeException(e); + }finally { + jschRemoteRunner.close(); } } /** diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ProgressMonitor.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ProgressMonitor.java index 0ec0a02..bf0386f 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ProgressMonitor.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/flexparttask/ProgressMonitor.java @@ -36,10 +36,6 @@ public class ProgressMonitor{ try { ProgressEvent event = ProgressQueue.getInstance().take(); if(Objects.nonNull(event)) { - //flexpart固定报错信息 - if(event.getContent().trim().startsWith("ERROR STOP")){ - transportTaskService.updateTaskStatus(event.getTaskId(), TransportTaskStatusEnum.FAILURE.getValue()); - } TransportTaskLog log = new TransportTaskLog(); log.setTaskId(event.getTaskId()); log.setLogContent(event.getContent()); diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/TransportTaskService.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/TransportTaskService.java index 719adc4..58668bc 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/TransportTaskService.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/TransportTaskService.java @@ -18,11 +18,11 @@ public interface TransportTaskService extends IService { void saveLog(TransportTaskLog transportTaskLog); /** - * 修改任务耗时 + * 修改任务运行完成 * @param taskId * @param minute */ - void updateTaskStatusToCompleted(Integer taskId, Double minute); + void updateTaskStatusToCompleted(Integer taskId, Double minute,Integer status); /** * 删除任务日志 @@ -43,4 +43,10 @@ public interface TransportTaskService extends IService { * @return */ Map getTaskSimulationSpecies(Integer taskId); + + /** + * 设置检查失败 + * @param transportTask + */ + void setInpectionFailed(TransportTask transportTask); } diff --git a/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/impl/TransportTaskServiceImpl.java b/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/impl/TransportTaskServiceImpl.java index c9ca9bd..b15ddee 100644 --- a/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/impl/TransportTaskServiceImpl.java +++ b/jeecg-model-consumer/src/main/java/org/jeecg/transport/service/impl/TransportTaskServiceImpl.java @@ -4,13 +4,17 @@ import cn.hutool.core.collection.CollUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.RequiredArgsConstructor; +import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.enums.*; +import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.mapper.*; -import org.jeecg.transport.service.StationDataService; +import org.jeecg.properties.ServerProperties; import org.jeecg.transport.service.TransportTaskService; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; + +import java.time.LocalDateTime; import java.util.*; /** @@ -22,7 +26,8 @@ public class TransportTaskServiceImpl extends ServiceImpl { - private final TransportTaskProducer transportTaskProducer; private final StationDataService stationDataService; private final TransportTaskService transportTaskService; private final TransportSimulationProperties simulationProperties; @@ -48,6 +45,7 @@ public class SampleMessageConsumer implements RocketMQListener setTaskTop(@NotNull(message = "任务ID不能为空") Integer taskId){ + transportTaskService.setTaskTop(taskId); + return Result.OK(); + } + + @AutoLog(value = "设置任务置顶") + @Operation(summary = "设置任务置顶") + @PutMapping("setCancelTaskTop") + public Result setCancelTaskTop(@NotNull(message = "任务ID不能为空") Integer taskId){ + transportTaskService.setCancelTaskTop(taskId); + return Result.OK(); + } + @AutoLog(value = "获取单个站点的排放数据") @Operation(summary = "获取单个站点的排放数据") @GetMapping("getReleaseDataByForwardId") diff --git a/jeecg-module-transport/src/main/java/org/jeecg/service/StationsModValService.java b/jeecg-module-transport/src/main/java/org/jeecg/service/StationsModValService.java index 0d6d334..f2c42b2 100644 --- a/jeecg-module-transport/src/main/java/org/jeecg/service/StationsModValService.java +++ b/jeecg-module-transport/src/main/java/org/jeecg/service/StationsModValService.java @@ -19,9 +19,4 @@ public interface StationsModValService extends IService { */ List getStationsModVal(Integer taskId,Integer stationId,Integer speciesId); - /** - * 保存台站模拟值 - * @param list - */ - void saveStationsModVal(List list); } diff --git a/jeecg-module-transport/src/main/java/org/jeecg/service/TransportTaskService.java b/jeecg-module-transport/src/main/java/org/jeecg/service/TransportTaskService.java index 1173c28..d50d4cb 100644 --- a/jeecg-module-transport/src/main/java/org/jeecg/service/TransportTaskService.java +++ b/jeecg-module-transport/src/main/java/org/jeecg/service/TransportTaskService.java @@ -63,6 +63,12 @@ public interface TransportTaskService extends IService { */ boolean checkTaskByName(String taskName); + /** + * 设置任务置顶 + * @param taskId + */ + void setTaskTop(Integer taskId); + /** * 运行任务 * @param id @@ -77,32 +83,6 @@ public interface TransportTaskService extends IService { */ IPage getTaskLog(Integer taskId,PageRequest pageRequest); - /** - * 保存任务日志 - * @param transportTaskLog - */ - void saveLog(TransportTaskLog transportTaskLog); - - /** - * 修改任务耗时 - * @param taskId - * @param minute - */ - void updateTaskStatusToCompleted(Integer taskId, Double minute); - - /** - * 删除任务日志 - * @param taskId - */ - void deleteTaskLog(Integer taskId); - - /** - * 修改任务状态 - * @param taskId - * @param status - */ - void updateTaskStatus(Integer taskId, Integer status); - void handleExcelReleaseData(HttpServletResponse res) throws Exception; /** @@ -136,4 +116,10 @@ public interface TransportTaskService extends IService { * 处理赵老师给的2014年的排放数据,生成sql文件 */ void handlExcelReleaseData(); + + /** + * 取消置顶 + * @param taskId + */ + void setCancelTaskTop(Integer taskId); } diff --git a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationsModValServiceImpl.java b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationsModValServiceImpl.java index 0d39467..0a58cac 100644 --- a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationsModValServiceImpl.java +++ b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationsModValServiceImpl.java @@ -7,8 +7,6 @@ import org.jeecg.modules.base.entity.rnauto.GardsStationsModVal; import org.jeecg.modules.base.mapper.GardsStationsModValMapper; import org.jeecg.service.StationsModValService; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; - import java.util.List; @DS("ora") @@ -31,14 +29,4 @@ public class StationsModValServiceImpl extends ServiceImpl list) { - this.baseMapper.insert(list); - } } diff --git a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportTaskServiceImpl.java b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportTaskServiceImpl.java index 430aab1..c9fc2d9 100644 --- a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportTaskServiceImpl.java +++ b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportTaskServiceImpl.java @@ -14,22 +14,21 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import jakarta.servlet.http.HttpServletResponse; import lombok.RequiredArgsConstructor; -import org.apache.commons.lang3.StringUtils; +import lombok.extern.slf4j.Slf4j; import org.apache.poi.ss.usermodel.*; import org.apache.poi.xssf.usermodel.XSSFWorkbook; import org.jeecg.common.constant.enums.*; import org.jeecg.common.properties.TransportSimulationProperties; import org.jeecg.common.system.query.PageRequest; -import org.jeecg.modules.base.dto.TransportTaskDTO; import org.jeecg.modules.base.entity.*; import org.jeecg.modules.base.entity.configuration.GardsDetectors; import org.jeecg.modules.base.entity.configuration.GardsStations; import org.jeecg.modules.base.entity.original.GardsSampleData; import org.jeecg.modules.base.entity.rnauto.GardsXeResults; import org.jeecg.modules.base.mapper.*; -import org.jeecg.producer.TransportTaskProducer; import org.jeecg.service.StationDataService; import org.jeecg.service.TransportTaskService; +import org.jeecg.util.ExcelUtils; import org.jeecg.vo.SiteAndReleaseDataVO; import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; @@ -49,6 +48,7 @@ import java.util.stream.Collectors; /** * 输运模拟任务管理 */ +@Slf4j @RequiredArgsConstructor @Service public class TransportTaskServiceImpl extends ServiceImpl implements TransportTaskService { @@ -62,7 +62,6 @@ public class TransportTaskServiceImpl extends ServiceImpl page(PageRequest pageRequest, String taskName, Integer taskMode,Integer taskStatus,Integer metType, LocalDate startDate, LocalDate endDate) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(Objects.nonNull(taskMode),TransportTask::getTaskMode, taskMode); - queryWrapper.eq(Objects.nonNull(taskStatus),TransportTask::getTaskStatus, taskStatus); - queryWrapper.eq(Objects.nonNull(metType),TransportTask::getUseMetType, metType); + LocalDateTime startDateTime = null; + LocalDateTime endDateTime = null; if(Objects.nonNull(startDate) && Objects.nonNull(endDate)){ - LocalDateTime startDateTime = startDate.atTime(0, 0, 0); - LocalDateTime endDateTime = endDate.atTime(23, 59, 59); - queryWrapper.between(TransportTask::getCreateTime,startDateTime,endDateTime); + startDateTime = startDate.atTime(0, 0, 0); + endDateTime = endDate.atTime(23, 59, 59); } - queryWrapper.like(StringUtils.isNotBlank(taskName),TransportTask::getTaskName,taskName); - queryWrapper.orderByDesc(TransportTask::getCreateTime); - queryWrapper.orderByAsc(TransportTask::getTaskStatus); - IPage iPage = new Page<>(pageRequest.getPageNum(), pageRequest.getPageSize()); - return this.page(iPage, queryWrapper); + Page page = new Page<>(pageRequest.getPageNum(), pageRequest.getPageSize()); + return this.baseMapper.page(page,taskName,taskMode,taskStatus,metType,startDateTime,endDateTime); } /** @@ -107,10 +100,16 @@ public class TransportTaskServiceImpl extends ServiceImpl queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(TransportTaskLog::getTaskId,taskId); - transportTaskLogMapper.delete(queryWrapper); - } - - /** - * 修改任务状态 - * @param taskId - * @param status - */ - @Transactional(rollbackFor = RuntimeException.class) - @Override - public void updateTaskStatus(Integer taskId, Integer status) { - TransportTask transportTask = this.baseMapper.selectById(taskId); - if(Objects.isNull(transportTask)){ - throw new RuntimeException("此任务不存在"); - } - transportTask.setTaskStatus(status); - this.baseMapper.updateById(transportTask); - } - /** * @param res */ @@ -475,8 +423,6 @@ public class TransportTaskServiceImpl extends ServiceImpl species = new ArrayList<>(); @@ -1222,7 +1175,7 @@ public class TransportTaskServiceImpl extends ServiceImpl cell.getStringCellValue(); + case NUMERIC -> String.valueOf(cell.getNumericCellValue()); + default -> throw new RuntimeException("列类型数据格式化错误"); + }; + } + + /** + * 获取时间字段 + * @param cell + * @return + */ + public static LocalDateTime getDateValue(Cell cell) { + if (Objects.isNull(cell)) { + return null; + } + switch (cell.getCellType()) { + case NUMERIC: + if (DateUtil.isCellDateFormatted(cell)) { + return cell.getLocalDateTimeCellValue(); + } + case STRING: + return LocalDateTime.parse(cell.getStringCellValue(), DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + default: + throw new RuntimeException("列类型数据格式化错误"); + } + } +} diff --git a/jeecg-module-transport/src/main/java/org/jeecg/vo/SRSRecord.java b/jeecg-module-transport/src/main/java/org/jeecg/vo/SRSRecord.java deleted file mode 100644 index 2ab7300..0000000 --- a/jeecg-module-transport/src/main/java/org/jeecg/vo/SRSRecord.java +++ /dev/null @@ -1,40 +0,0 @@ -package org.jeecg.vo; - -import lombok.Data; - -/** - * srs记录数据 - */ -@Data -public class SRSRecord { - - /** - * 经度 - */ - private Double lon; - - /** - * 纬度 - */ - private Double lat; - - /** - * 处于的小时 - */ - private Integer hour; - - /** - * 根据样品时间-hour得到的开始时间 - */ - private Long startSecond; - - /** - * 测量结束时间就是模拟结束时间 - */ - private Long endSecond; - - /** - * 浓度值(单位暂未知) - */ - private String conc; -}