From 0ee5dbde57d3add6eb6e69ce3d9c99bd8d1fd55d Mon Sep 17 00:00:00 2001 From: panbaolin <13071138970@163.com> Date: Wed, 7 Jan 2026 09:47:43 +0800 Subject: [PATCH] =?UTF-8?q?fix:=201.=E4=BF=AE=E6=94=B9=E5=A4=A9=E6=B0=94?= =?UTF-8?q?=E9=A2=84=E6=8A=A5=E8=AE=A1=E7=AE=97=E4=B8=AD=E6=A8=A1=E5=9E=8B?= =?UTF-8?q?=E9=A2=84=E6=B5=8B=E5=8A=9F=E8=83=BD=E5=AE=8C=E5=96=84=E9=80=82?= =?UTF-8?q?=E9=85=8Dflexpart=E6=A8=A1=E5=9E=8B=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../properties/SystemStorageProperties.java | 12 +- .../base/entity/SourceRebuildTask.java | 1 - .../java/org/jeecg/gis/vo/AlarmRecordVO.java | 5 + .../mapper/xml/IMSSampleAnalysesMapper.xml | 5 + .../SourceRebuildTaskController.java | 6 +- .../impl/SourceRebuildTaskServiceImpl.java | 1 + .../org/jeecg/service/StationDataService.java | 6 +- .../service/impl/StationDataServiceImpl.java | 61 ++-- .../impl/TransportResultDataServiceImpl.java | 27 +- .../controller/WeatherTaskController.java | 1 - .../org/jeecg/service/WeatherTaskService.java | 5 +- .../service/impl/WeatherTaskServiceImpl.java | 25 +- .../java/org/jeecg/task/WeatherTaskExec.java | 305 ++++++++++++------ 13 files changed, 302 insertions(+), 158 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/SystemStorageProperties.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/SystemStorageProperties.java index 4d9bb0c..907a213 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/SystemStorageProperties.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/SystemStorageProperties.java @@ -54,10 +54,15 @@ public class SystemStorageProperties { */ private String cmaqPath; + /** + * grib文件格式化脚本使用的python环境 + */ + private String formatScriptPythonEnv; + /** * ai-models 安装地址 */ - private String aiModelsPath; + private String panguEnvPath; /** * 盘古模型执行路径 @@ -68,4 +73,9 @@ public class SystemStorageProperties { * graphcast模型执行路径 */ private String graphcastModelExecPath; + + /** + * 盘古数据格式化脚本路径(格式化成flexpart能识别的) + */ + private String panguFormatScriptPath; } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/SourceRebuildTask.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/SourceRebuildTask.java index 73c163a..fa1f7ba 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/SourceRebuildTask.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/SourceRebuildTask.java @@ -172,7 +172,6 @@ public class SourceRebuildTask implements Serializable { /** * 结果存储地址 */ - @Null(message = "结果存储地址必须为空",groups = {InsertGroup.class, UpdateGroup.class}) @TableField(value = "result_address") private String resultAddress; diff --git a/jeecg-module-large-screen/src/main/java/org/jeecg/gis/vo/AlarmRecordVO.java b/jeecg-module-large-screen/src/main/java/org/jeecg/gis/vo/AlarmRecordVO.java index 172230a..9fc0023 100644 --- a/jeecg-module-large-screen/src/main/java/org/jeecg/gis/vo/AlarmRecordVO.java +++ b/jeecg-module-large-screen/src/main/java/org/jeecg/gis/vo/AlarmRecordVO.java @@ -56,6 +56,11 @@ public class AlarmRecordVO { */ private Integer sourceType; + /** + * 数据来源说明 + */ + private String sourceTypeDescribe; + /** * 数据表外接 */ diff --git a/jeecg-module-large-screen/src/main/java/org/jeecg/sample/mapper/xml/IMSSampleAnalysesMapper.xml b/jeecg-module-large-screen/src/main/java/org/jeecg/sample/mapper/xml/IMSSampleAnalysesMapper.xml index c21e413..9c597e9 100644 --- a/jeecg-module-large-screen/src/main/java/org/jeecg/sample/mapper/xml/IMSSampleAnalysesMapper.xml +++ b/jeecg-module-large-screen/src/main/java/org/jeecg/sample/mapper/xml/IMSSampleAnalysesMapper.xml @@ -39,6 +39,11 @@ INNER JOIN ORIGINAL.GARDS_SAMPLE_DATA gsd ON gts.SAMPLE_ID = gsd.SAMPLE_ID INNER JOIN CONFIGURATION.GARDS_STATIONS gst on gsd.STATION_ID = gst.STATION_ID WHERE CLOSE_STATUS = 0 + and ( + (gsd.SAMPLE_TYPE = 'P' AND ga.CATEGORY IN (4, 5)) + or + (gsd.SAMPLE_TYPE = 'B' AND ga.CATEGORY IN (3)) + ) ORDER BY gts.MODDATE desc diff --git a/jeecg-module-source-rebuild/src/main/java/org/jeecg/controller/SourceRebuildTaskController.java b/jeecg-module-source-rebuild/src/main/java/org/jeecg/controller/SourceRebuildTaskController.java index 30ddbee..4092ff0 100644 --- a/jeecg-module-source-rebuild/src/main/java/org/jeecg/controller/SourceRebuildTaskController.java +++ b/jeecg-module-source-rebuild/src/main/java/org/jeecg/controller/SourceRebuildTaskController.java @@ -83,9 +83,9 @@ public class SourceRebuildTaskController{ @AutoLog(value = "启动任务") @Operation(summary = "启动任务") @PutMapping("runTask") - public Result runTask(@RequestBody @Validated(value = UpdateGroup.class) SourceRebuildTask sourceRebuildTask){ - sourceRebuildTaskService.update(sourceRebuildTask); - sourceRebuildTaskService.runTask(sourceRebuildTask.getId()); + public Result runTask(@NotNull(message = "任务ID不能为空") Integer taskId){ +// sourceRebuildTaskService.update(sourceRebuildTask); + sourceRebuildTaskService.runTask(taskId); return Result.OK(); } } diff --git a/jeecg-module-source-rebuild/src/main/java/org/jeecg/service/impl/SourceRebuildTaskServiceImpl.java b/jeecg-module-source-rebuild/src/main/java/org/jeecg/service/impl/SourceRebuildTaskServiceImpl.java index 508f6d7..cd758c3 100644 --- a/jeecg-module-source-rebuild/src/main/java/org/jeecg/service/impl/SourceRebuildTaskServiceImpl.java +++ b/jeecg-module-source-rebuild/src/main/java/org/jeecg/service/impl/SourceRebuildTaskServiceImpl.java @@ -228,6 +228,7 @@ public class SourceRebuildTaskServiceImpl extends ServiceImpl getAllStations(); + List> getAllStations(); /** * 获取所有核设施 * @return */ - List getAllNuclearfacility(); + List> getAllNuclearfacility(); /** * 获取指定样品在指定时间范围内的核素监测结果 diff --git a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationDataServiceImpl.java b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationDataServiceImpl.java index 479f924..fe64a68 100644 --- a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationDataServiceImpl.java +++ b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/StationDataServiceImpl.java @@ -1,13 +1,16 @@ package org.jeecg.service.impl; +import cn.hutool.core.collection.CollUtil; import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.util.CoordinateTransformUtil; import org.jeecg.common.util.RedisUtil; +import org.jeecg.modules.base.entity.configuration.GardsNuclearReactors; import org.jeecg.modules.base.entity.configuration.GardsNuclearfacility; import org.jeecg.modules.base.entity.configuration.GardsStations; +import org.jeecg.modules.base.mapper.GardsNuclearReactorsMapper; import org.jeecg.modules.base.mapper.GardsNuclearfacilityMapper; import org.jeecg.modules.base.mapper.GardsStationsMapper; import org.jeecg.modules.base.mapper.GardsXeResultMapper; @@ -15,9 +18,7 @@ import org.jeecg.service.StationDataService; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.math.RoundingMode; -import java.util.Date; -import java.util.List; -import java.util.Map; +import java.util.*; /** * 台站数据服务 @@ -28,7 +29,7 @@ import java.util.Map; public class StationDataServiceImpl implements StationDataService { private final GardsStationsMapper stationsMapper; - private final GardsNuclearfacilityMapper nuclearfacilityMapper; + private final GardsNuclearReactorsMapper nuclearReactorsMapper; private final GardsXeResultMapper gardsXeResultMapper; private final RedisUtil redisUtil; @@ -37,14 +38,27 @@ public class StationDataServiceImpl implements StationDataService { * @return */ @Override - public List getAllStations() { + public List> getAllStations() { + List stations; if(redisUtil.hasKey(CommonConstant.ALL_STATIONS)){ - return (List) redisUtil.get(CommonConstant.ALL_STATIONS); + stations = (List) redisUtil.get(CommonConstant.ALL_STATIONS); }else { - List stations = stationsMapper.selectList(new LambdaQueryWrapper<>()); + stations = stationsMapper.selectList(new LambdaQueryWrapper<>()); redisUtil.set(CommonConstant.ALL_STATIONS,stations); - return stations; } + if (CollUtil.isNotEmpty(stations)) { + List> result = new ArrayList<>(); + stations.forEach(station -> { + Map map = new HashMap<>(); + map.put("id",station.getStationId()); + map.put("stationCode",station.getStationCode()); + map.put("lon",station.getLon()); + map.put("lat",station.getLat()); + result.add(map); + }); + return result; + } + return List.of(); } /** @@ -52,22 +66,27 @@ public class StationDataServiceImpl implements StationDataService { * @return */ @Override - public List getAllNuclearfacility() { + public List> getAllNuclearfacility() { + List nuclearReactors; if(redisUtil.hasKey(CommonConstant.ALL_NUCLEARFACILITY)){ - return (List) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY); + nuclearReactors = (List) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY); }else { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - List nuclearfacilities = nuclearfacilityMapper.selectList(queryWrapper); - nuclearfacilities.forEach(nuclearfacility -> { - //数据库经纬度存储的是反的,所以这里反着处理 - Double lon = CoordinateTransformUtil.lonAndLatConversion(nuclearfacility.getLatitude()); - Double lat = CoordinateTransformUtil.lonAndLatConversion(nuclearfacility.getLongitude()); - nuclearfacility.setLonValue(lon); - nuclearfacility.setLatValue(lat); - }); - redisUtil.set(CommonConstant.ALL_NUCLEARFACILITY, nuclearfacilities); - return nuclearfacilities; + nuclearReactors = nuclearReactorsMapper.selectList(new LambdaQueryWrapper<>()); + redisUtil.set(CommonConstant.ALL_NUCLEARFACILITY, nuclearReactors); } + if (CollUtil.isNotEmpty(nuclearReactors)) { + List> result = new ArrayList<>(); + nuclearReactors.forEach(nuclearReactor -> { + Map map = new HashMap<>(); + map.put("id",nuclearReactor.getId()); + map.put("unitName",nuclearReactor.getUnitName()); + map.put("lonValue",nuclearReactor.getLongitude()); + map.put("latValue",nuclearReactor.getLatitude()); + result.add(map); + }); + return result; + } + return List.of(); } /** diff --git a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportResultDataServiceImpl.java b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportResultDataServiceImpl.java index 5a57cdd..e589f17 100644 --- a/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportResultDataServiceImpl.java +++ b/jeecg-module-transport/src/main/java/org/jeecg/service/impl/TransportResultDataServiceImpl.java @@ -16,7 +16,7 @@ import org.jeecg.common.util.NcUtil; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.TransportTask; import org.jeecg.modules.base.entity.TransportTaskChild; -import org.jeecg.modules.base.entity.configuration.GardsNuclearfacility; +import org.jeecg.modules.base.entity.configuration.GardsNuclearReactors; import org.jeecg.modules.base.entity.configuration.GardsStations; import org.jeecg.modules.base.mapper.TransportTaskChildMapper; import org.jeecg.modules.base.mapper.TransportTaskMapper; @@ -386,7 +386,7 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic List stationInfos = this.transportTaskChildMapper.selectList(queryWrapper); //所以核设施数据 - List facilitys = stationDataService.getAllNuclearfacility(); + List facilitys = (List) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY); NetcdfFile ncFile = null; try { @@ -423,19 +423,19 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic //累加台站位置当前天的浓度数据 stationEveryDayConcDatas.put(dayStr,stationEveryDayConcDatas.get(dayStr)+stationConc); - for (GardsNuclearfacility facility : facilitys){ - Double facilityConc = this.getTargetSiteConc(lonData,latData,facility.getLonValue(),facility.getLatValue(),k,spec001Mr); + for (GardsNuclearReactors facility : facilitys){ + Double facilityConc = this.getTargetSiteConc(lonData,latData,facility.getLongitude(),facility.getLatitude(),k,spec001Mr); if (facilityConc>0){ if (!facilityEveryDayConcDatas.containsKey(dayStr)) { Map facilityConcMap = new HashMap<>(); - facilityConcMap.put(facility.getFacilityName(),facilityConc); + facilityConcMap.put(facility.getUnitName(),facilityConc); facilityEveryDayConcDatas.put(dayStr,facilityConcMap); }else { Map facilityConcMap = facilityEveryDayConcDatas.get(dayStr); - if (!facilityConcMap.containsKey(facility.getFacilityName())) { - facilityConcMap.put(facility.getFacilityName(),facilityConc); + if (!facilityConcMap.containsKey(facility.getUnitName())) { + facilityConcMap.put(facility.getUnitName(),facilityConc); }else { - facilityConcMap.put(facility.getFacilityName(),facilityConcMap.get(facility.getFacilityName())+facilityConc); + facilityConcMap.put(facility.getUnitName(),facilityConcMap.get(facility.getUnitName())+facilityConc); } } } @@ -498,7 +498,8 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic throw new RuntimeException("此任务站点信息不存在,请确认任务配置信息"); } //获取所有台站数据,优先缓存 - Map stationsMap = stationDataService.getAllStations().stream().collect(Collectors.toMap(GardsStations::getStationCode,GardsStations::getStationId)); + List stations = (List) redisUtil.get(CommonConstant.ALL_STATIONS); + Map stationsMap = stations.stream().collect(Collectors.toMap(GardsStations::getStationCode,GardsStations::getStationId)); //本任务模拟的台站数据 List taskStationsVOList = new ArrayList<>(); for (int i = 0; i stationInfos = this.transportTaskChildMapper.selectList(queryWrapper); //所以核设施数据 - List facilitys = stationDataService.getAllNuclearfacility(); + List facilitys = (List) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY); NetcdfFile ncFile = null; try { @@ -640,7 +641,7 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic List latData = NcUtil.getNCList(ncFile, "latitude"); List timeData = NcUtil.getNCList(ncFile, "time"); Variable spec001Mr = ncFile.findVariable("spec001_mr"); - for (GardsNuclearfacility facility : facilitys){ + for (GardsNuclearReactors facility : facilitys){ //存储台站每步的浓度值数据 Map everyStepConcDatas = new LinkedHashMap<>(); for(int k=0;k{ diff --git a/jeecg-module-weather/src/main/java/org/jeecg/controller/WeatherTaskController.java b/jeecg-module-weather/src/main/java/org/jeecg/controller/WeatherTaskController.java index b5b9e7f..cc0c012 100644 --- a/jeecg-module-weather/src/main/java/org/jeecg/controller/WeatherTaskController.java +++ b/jeecg-module-weather/src/main/java/org/jeecg/controller/WeatherTaskController.java @@ -46,7 +46,6 @@ public class WeatherTaskController { @Operation(summary = "新增天气预测任务") @PostMapping("create") public Result create(@Validated(value = InsertGroup.class) WeatherTask weatherTask){ - System.out.println(weatherTask); weatherTaskService.cteate(weatherTask); return Result.OK(); } diff --git a/jeecg-module-weather/src/main/java/org/jeecg/service/WeatherTaskService.java b/jeecg-module-weather/src/main/java/org/jeecg/service/WeatherTaskService.java index 2158ca8..b30477e 100644 --- a/jeecg-module-weather/src/main/java/org/jeecg/service/WeatherTaskService.java +++ b/jeecg-module-weather/src/main/java/org/jeecg/service/WeatherTaskService.java @@ -82,10 +82,11 @@ public interface WeatherTaskService extends IService { void deleteTaskLog(String taskId); /** - * 修改任务状态为结束 + * 修改任务状态为结束(完成或异常停止) * @param taskId * @param timeConsuming + * @param taskStatus */ - void updateTaskStatusToCompleted(String taskId, Double timeConsuming); + void updateTaskStatusToCompleted(String taskId, Double timeConsuming,Integer taskStatus); } diff --git a/jeecg-module-weather/src/main/java/org/jeecg/service/impl/WeatherTaskServiceImpl.java b/jeecg-module-weather/src/main/java/org/jeecg/service/impl/WeatherTaskServiceImpl.java index 0d7f882..9f86058 100644 --- a/jeecg-module-weather/src/main/java/org/jeecg/service/impl/WeatherTaskServiceImpl.java +++ b/jeecg-module-weather/src/main/java/org/jeecg/service/impl/WeatherTaskServiceImpl.java @@ -1,5 +1,6 @@ package org.jeecg.service.impl; +import cn.hutool.core.io.FileUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.core.toolkit.IdWorker; @@ -102,15 +103,20 @@ public class WeatherTaskServiceImpl extends ServiceImpl command = new ArrayList<>(); - String aiModelsPath = systemStorageProperties.getAiModelsPath()+File.separator+"ai-models"; - String fileName = "panguweather_output_"+this.weatherTask.getId()+".grib"; + String aiModelsPath = systemStorageProperties.getPanguEnvPath()+File.separator+"ai-models"; + if(this.weatherTask.getDataSources().equals(WeatherForecastDatasourceEnum.CDS.getKey())){ command.add(aiModelsPath); command.add("--input"); @@ -129,7 +190,7 @@ public class WeatherTaskExec extends Thread{ command.add("--lead-time"); command.add(this.weatherTask.getLeadTime().toString()); command.add("--path"); - command.add(fileName); + command.add(outputFileAddr); command.add("--assets"); command.add("assets-panguweather"); command.add("panguweather"); @@ -137,11 +198,11 @@ public class WeatherTaskExec extends Thread{ //离线文件还需处理 command.add(aiModelsPath); command.add("--file"); - command.add(this.weatherTask.getInputFile()); + command.add(inputFileAddr); command.add("--lead-time"); command.add(this.weatherTask.getLeadTime().toString()); command.add("--path"); - command.add(fileName); + command.add(outputFileAddr); command.add("--assets"); command.add("assets-panguweather"); command.add("panguweather"); @@ -162,137 +223,175 @@ public class WeatherTaskExec extends Thread{ } //等待进程结束 process.waitFor(); + }catch (Exception e){ + throw new RuntimeException("盘古模型预测异常",e); + } + } - String handleGribLog = "预测结束,开始处理grib文件"; + /** + * 分割grib文件 + * @param outputFileAddr + * @param gribFileSuffix + * @param gribCopyTargetDir + */ + private void splitGrib(String outputFileAddr,String gribFileSuffix,String gribCopyTargetDir){ + try { + String handleGribLog = "预测结束,开始处理grib文件,切割为6小时一个文件"; ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),handleGribLog)); LocalDateTime startTime = this.weatherTask.getStartDate().atTime(this.weatherTask.getStartTime(), 0, 0); //grib_copy命令 - String gribCopyCommandPath = systemStorageProperties.getAiModelsPath()+File.separator+"grib_copy"; + String gribCopyCommandPath = systemStorageProperties.getPanguEnvPath()+File.separator+"grib_copy"; //grib_set命令 - String gribSetCommandPath = systemStorageProperties.getAiModelsPath()+File.separator+"grib_set"; - //需删除文件 - List delFiles = new ArrayList<>(); - //需移动文件 - List moveFiles = new ArrayList<>(); + String gribSetCommandPath = systemStorageProperties.getPanguEnvPath()+File.separator+"grib_set"; //公用gribProcessBuilder ProcessBuilder gribProcessBuilder = new ProcessBuilder(); gribProcessBuilder.directory(new File(systemStorageProperties.getPanguModelExecPath())); - //定义名称后缀,执行grib_set命令时去除,否则命名会冲突 - String gribFileSuffix = "_not_grib_set"; //把grib文件切割成每6小时一份 int step = 6; int i=this.weatherTask.getStartTime(); while(i <= this.weatherTask.getLeadTime()){ - String gribCopyFileName = "panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+gribFileSuffix+".grib"; - delFiles.add(gribCopyFileName); + String gribCopyFileAddr = gribCopyTargetDir+File.separator+"panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+gribFileSuffix+".grib"; //切割grib文件命令 List gribCopyCommand = new ArrayList<>(); gribCopyCommand.add(gribCopyCommandPath); gribCopyCommand.add("-w"); gribCopyCommand.add("step="+i); - gribCopyCommand.add(fileName); - gribCopyCommand.add(gribCopyFileName); + gribCopyCommand.add(outputFileAddr); + gribCopyCommand.add(gribCopyFileAddr); + System.out.println("gribCopyCommand:"+gribCopyCommand); gribProcessBuilder.command(gribCopyCommand); Process gribCopyProcess = gribProcessBuilder.start(); gribCopyProcess.waitFor(); //重新设置reftime信息 - String gribSetFileName = "panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+".grib"; - moveFiles.add(gribSetFileName); + String gribSetFileAddr = gribCopyTargetDir+File.separator+"panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+".grib"; String date = LocalDateTimeUtil.format(startTime,"yyyyMMdd"); String time = LocalDateTimeUtil.format(startTime,"HHmm"); List gribSetCommand = new ArrayList<>(); gribSetCommand.add(gribSetCommandPath); gribSetCommand.add("-s"); gribSetCommand.add("dataDate="+date+",dataTime="+time+",endStep="+0); - gribSetCommand.add(gribCopyFileName); - gribSetCommand.add(gribSetFileName); + gribSetCommand.add(gribCopyFileAddr); + gribSetCommand.add(gribSetFileAddr); + System.out.println("gribSetCommand:"+gribSetCommand); gribProcessBuilder.command(gribSetCommand); Process gribSetProcess = gribProcessBuilder.start(); gribSetProcess.waitFor(); i+=step; startTime = startTime.plusHours(step); } - if (CollUtil.isNotEmpty(moveFiles)) { - for (String moveFile : moveFiles) { - File srcFile = new File(systemStorageProperties.getPanguModelExecPath()+File.separator+moveFile); - if (srcFile.exists()) { - File targetFile = new File(this.getPanguWeatherPath()+File.separator+File.separator+moveFile); - FileUtil.move(srcFile,targetFile,true); - //保存生成的气象数据存储到数据库 - this.saveGribInfoToDB(targetFile); - } - } - } - //删除gribCopy产生的文件 - for (String delFile : delFiles) { - File srcFile = new File(systemStorageProperties.getPanguModelExecPath()+File.separator+delFile); - if (srcFile.exists()) { - srcFile.delete(); - } - } - //删除预测的结果文件 - String outputFilePath = systemStorageProperties.getPanguModelExecPath()+File.separator+fileName; - File outputFile = new File(outputFilePath); - if(outputFile.exists()){ - outputFile.delete(); - } + }catch (Exception e){ + throw new RuntimeException("分割grib文件异常",e); + } + } - //如果是本地文件进行预测,预测结束后删除文件 - if(this.weatherTask.getDataSources().equals(WeatherForecastDatasourceEnum.LOCATION_FILE.getKey())){ - String inputFilePath = systemStorageProperties.getPanguModelExecPath()+File.separator+this.weatherTask.getInputFile(); - File inputFile = new File(inputFilePath); - if(inputFile.exists()){ - inputFile.delete(); + /** + * 转换grib + * @param gribFileSuffix + * @param gribFilePath + */ + private void convertGrib(String gribFileSuffix,String gribFilePath){ + try { + // python3 pangu_reformat.py --ini_dir /export/xe_bk_data/weather/pangu_format_script --output_dir /export/xe_bk_data/weather/pangu2 --input_dir /export/xe_bk_data/weather/pangu2 --start_date 20251120 --end_date 20251120 + //调用python脚本转换flexpart能识别的气象数据文件 + //删除带有_not_grib_set名称的文件,此文件还未设置reftime属性 + List files = FileUtil.loopFiles(gribFilePath, file -> file.getName().contains(gribFileSuffix)); + if(!files.isEmpty()){ + for(File file:files){ + file.delete(); } } - } catch (Exception e) { - throw new RuntimeException(e); + String adapterGribLog = "文件切割结束,开始调用python脚本适配flexpart模型"; + ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),adapterGribLog)); + //构建转换命令 + String convertScriptPath = systemStorageProperties.getPanguFormatScriptPath(); + String inputPath = systemStorageProperties.getPanguModelExecPath()+File.separator+this.weatherTask.getId(); + String outPutPath = inputPath; + String startDate = this.weatherTask.getStartDate().format(DateTimeFormatter.ofPattern("yyyyMMdd")); + String endDate = this.weatherTask.getStartDate().atTime(this.weatherTask.getStartTime(), 0, 0).plusHours(this.weatherTask.getLeadTime()).format(DateTimeFormatter.ofPattern("yyyyMMdd")); + List convertCommand = new ArrayList<>(); + convertCommand.add(systemStorageProperties.getFormatScriptPythonEnv()+File.separator+"python3"); + convertCommand.add("pangu_reformat.py"); + convertCommand.add("--ini_dir"); + convertCommand.add(convertScriptPath); + convertCommand.add("--output_dir"); + convertCommand.add(outPutPath); + convertCommand.add("--input_dir"); + convertCommand.add(inputPath); + convertCommand.add("--start_date"); + convertCommand.add(startDate); + convertCommand.add("--end_date"); + convertCommand.add(endDate); + System.out.println("convertCommand:"+convertCommand); + ProcessBuilder adapterGribProcessBuilder = new ProcessBuilder(); + adapterGribProcessBuilder.directory(new File(convertScriptPath)); + adapterGribProcessBuilder.command(convertCommand); + Process adapterGribProcess = adapterGribProcessBuilder.start(); + adapterGribProcess.waitFor(); + }catch (Exception e){ + throw new RuntimeException("适配flexpart,转换grib异常",e); } } /** * 保存生成的气象数据存储到数据库 - * @param file + * @param inputPath + * @param targetPath */ - private void saveGribInfoToDB(File file){ - //获取文件数据开始日期 - String reftime = NcUtil.getReftime(file.getAbsolutePath()); - if(StringUtils.isBlank(reftime)) { - throw new JeecgFileUploadException("解析气象文件起始时间数据异常,此文件可能损坏"); + private void saveGribInfoToDB(String inputPath,String targetPath){ + String handleGribLog = "格式转换结束,处理grib文件数据入库"; + ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),handleGribLog)); + + List files = FileUtil.loopFiles(inputPath,file -> file.getName().startsWith("pangu_")); + List targetFiles = new ArrayList<>(); + if(!files.isEmpty()){ + for(File file:files){ + File targetFile = new File(targetPath+File.separator+file.getName()); + FileUtil.move(file,targetFile,true); + if(targetFile.exists()){ + targetFiles.add(targetFile); + } + } + for(File targetFile:targetFiles){ + //获取文件数据开始日期 + String reftime = NcUtil.getReftime(targetFile.getAbsolutePath()); + if(StringUtils.isBlank(reftime)) { + throw new JeecgFileUploadException("解析气象文件起始时间数据异常,此文件可能损坏"); + } + //计算文件大小M + BigDecimal divideVal = new BigDecimal("1024"); + BigDecimal bg = new BigDecimal(targetFile.length()); + BigDecimal fileSize = bg.divide(divideVal).divide(divideVal).setScale(2, RoundingMode.HALF_UP); + //处理文件数据开始时间 + Instant instant = Instant.parse(reftime); + LocalDateTime utcDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC")); + //计算文件MD5值 + String md5Val = ""; + try (InputStream is = new FileInputStream(targetFile.getAbsolutePath())) { + md5Val = DigestUtils.md5Hex(is); + }catch (Exception e){ + throw new RuntimeException(targetFile.getName()+"MD5值计算失败"); + } + //校验文件是否存在,存在删除从新新增 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + queryWrapper.eq(WeatherData::getFileName,targetFile.getName()); + WeatherData queryResult = weatherDataMapper.selectOne(queryWrapper); + if(Objects.nonNull(queryResult)){ + weatherDataMapper.deleteById(queryResult.getId()); + } + //构建文件信息 + WeatherData weatherData = new WeatherData(); + weatherData.setFileName(targetFile.getName()); + weatherData.setFileSize(fileSize.doubleValue()); + weatherData.setFileExt(targetFile.getName().substring(targetFile.getName().lastIndexOf(".")+1)); + weatherData.setDataStartTime(utcDateTime); + weatherData.setDataSource(weatherTask.getPredictionModel()); + weatherData.setFilePath(targetFile.getAbsolutePath()); + weatherData.setMd5Value(md5Val); + weatherData.setShareTotal(1); + weatherDataMapper.insert(weatherData); + } } - //计算文件大小M - BigDecimal divideVal = new BigDecimal("1024"); - BigDecimal bg = new BigDecimal(file.length()); - BigDecimal fileSize = bg.divide(divideVal).divide(divideVal).setScale(2, RoundingMode.HALF_UP); - //处理文件数据开始时间 - Instant instant = Instant.parse(reftime); - LocalDateTime utcDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC")); - //计算文件MD5值 - String md5Val = ""; - try (InputStream is = new FileInputStream(file.getAbsolutePath())) { - md5Val = DigestUtils.md5Hex(is); - }catch (Exception e){ - throw new RuntimeException(file.getName()+"MD5值计算失败"); - } - //校验文件是否存在,存在删除从新新增 - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(WeatherData::getFileName,file.getName()); - WeatherData queryResult = weatherDataMapper.selectOne(queryWrapper); - if(Objects.nonNull(queryResult)){ - weatherDataMapper.deleteById(queryResult.getId()); - } - //构建文件信息 - WeatherData weatherData = new WeatherData(); - weatherData.setFileName(file.getName()); - weatherData.setFileSize(fileSize.doubleValue()); - weatherData.setFileExt(file.getName().substring(file.getName().lastIndexOf(".")+1)); - weatherData.setDataStartTime(utcDateTime); - weatherData.setDataSource(weatherTask.getPredictionModel()); - weatherData.setFilePath(file.getAbsolutePath()); - weatherData.setMd5Value(md5Val); - weatherData.setShareTotal(1); - weatherDataMapper.insert(weatherData); } /** @@ -308,7 +407,7 @@ public class WeatherTaskExec extends Thread{ } /** - * 获取盘古数据存储路径 + * 获取Graphcast数据存储路径 * @return */ private String getGraphcastWeatherPath(){