1.修改天气预报计算中模型预测功能完善适配flexpart模型逻辑
This commit is contained in:
panbaolin 2026-01-07 09:47:43 +08:00
parent 9e1fd90578
commit 0ee5dbde57
13 changed files with 302 additions and 158 deletions

View File

@ -54,10 +54,15 @@ public class SystemStorageProperties {
*/
private String cmaqPath;
/**
* grib文件格式化脚本使用的python环境
*/
private String formatScriptPythonEnv;
/**
* ai-models 安装地址
*/
private String aiModelsPath;
private String panguEnvPath;
/**
* 盘古模型执行路径
@ -68,4 +73,9 @@ public class SystemStorageProperties {
* graphcast模型执行路径
*/
private String graphcastModelExecPath;
/**
* 盘古数据格式化脚本路径格式化成flexpart能识别的
*/
private String panguFormatScriptPath;
}

View File

@ -172,7 +172,6 @@ public class SourceRebuildTask implements Serializable {
/**
* 结果存储地址
*/
@Null(message = "结果存储地址必须为空",groups = {InsertGroup.class, UpdateGroup.class})
@TableField(value = "result_address")
private String resultAddress;

View File

@ -56,6 +56,11 @@ public class AlarmRecordVO {
*/
private Integer sourceType;
/**
* 数据来源说明
*/
private String sourceTypeDescribe;
/**
* 数据表外接
*/

View File

@ -39,6 +39,11 @@
INNER JOIN ORIGINAL.GARDS_SAMPLE_DATA gsd ON gts.SAMPLE_ID = gsd.SAMPLE_ID
INNER JOIN CONFIGURATION.GARDS_STATIONS gst on gsd.STATION_ID = gst.STATION_ID
WHERE CLOSE_STATUS = 0
and (
(gsd.SAMPLE_TYPE = 'P' AND ga.CATEGORY IN (4, 5))
or
(gsd.SAMPLE_TYPE = 'B' AND ga.CATEGORY IN (3))
)
ORDER BY gts.MODDATE desc
</select>

View File

@ -83,9 +83,9 @@ public class SourceRebuildTaskController{
@AutoLog(value = "启动任务")
@Operation(summary = "启动任务")
@PutMapping("runTask")
public Result<?> runTask(@RequestBody @Validated(value = UpdateGroup.class) SourceRebuildTask sourceRebuildTask){
sourceRebuildTaskService.update(sourceRebuildTask);
sourceRebuildTaskService.runTask(sourceRebuildTask.getId());
public Result<?> runTask(@NotNull(message = "任务ID不能为空") Integer taskId){
// sourceRebuildTaskService.update(sourceRebuildTask);
sourceRebuildTaskService.runTask(taskId);
return Result.OK();
}
}

View File

@ -228,6 +228,7 @@ public class SourceRebuildTaskServiceImpl extends ServiceImpl<SourceRebuildTaskM
public void updateTaskTimeConsuming(Integer taskId, Double minute) {
SourceRebuildTask sourceRebuildTask = this.baseMapper.selectById(taskId);
sourceRebuildTask.setTimeConsuming(minute);
sourceRebuildTask.setTaskStatus(SourceRebuildTaskStatusEnum.COMPLETED.getValue());
this.updateById(sourceRebuildTask);
}
}

View File

@ -1,6 +1,6 @@
package org.jeecg.service;
import org.jeecg.modules.base.entity.configuration.GardsNuclearfacility;
import org.jeecg.modules.base.entity.configuration.GardsNuclearReactors;
import org.jeecg.modules.base.entity.configuration.GardsStations;
import java.util.Date;
import java.util.List;
@ -15,13 +15,13 @@ public interface StationDataService {
* 获取所有台站
* @return
*/
List<GardsStations> getAllStations();
List<Map<String,Object>> getAllStations();
/**
* 获取所有核设施
* @return
*/
List<GardsNuclearfacility> getAllNuclearfacility();
List<Map<String,Object>> getAllNuclearfacility();
/**
* 获取指定样品在指定时间范围内的核素监测结果

View File

@ -1,13 +1,16 @@
package org.jeecg.service.impl;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.util.CoordinateTransformUtil;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.configuration.GardsNuclearReactors;
import org.jeecg.modules.base.entity.configuration.GardsNuclearfacility;
import org.jeecg.modules.base.entity.configuration.GardsStations;
import org.jeecg.modules.base.mapper.GardsNuclearReactorsMapper;
import org.jeecg.modules.base.mapper.GardsNuclearfacilityMapper;
import org.jeecg.modules.base.mapper.GardsStationsMapper;
import org.jeecg.modules.base.mapper.GardsXeResultMapper;
@ -15,9 +18,7 @@ import org.jeecg.service.StationDataService;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.*;
/**
* 台站数据服务
@ -28,7 +29,7 @@ import java.util.Map;
public class StationDataServiceImpl implements StationDataService {
private final GardsStationsMapper stationsMapper;
private final GardsNuclearfacilityMapper nuclearfacilityMapper;
private final GardsNuclearReactorsMapper nuclearReactorsMapper;
private final GardsXeResultMapper gardsXeResultMapper;
private final RedisUtil redisUtil;
@ -37,14 +38,27 @@ public class StationDataServiceImpl implements StationDataService {
* @return
*/
@Override
public List<GardsStations> getAllStations() {
public List<Map<String,Object>> getAllStations() {
List<GardsStations> stations;
if(redisUtil.hasKey(CommonConstant.ALL_STATIONS)){
return (List<GardsStations>) redisUtil.get(CommonConstant.ALL_STATIONS);
stations = (List<GardsStations>) redisUtil.get(CommonConstant.ALL_STATIONS);
}else {
List<GardsStations> stations = stationsMapper.selectList(new LambdaQueryWrapper<>());
stations = stationsMapper.selectList(new LambdaQueryWrapper<>());
redisUtil.set(CommonConstant.ALL_STATIONS,stations);
return stations;
}
if (CollUtil.isNotEmpty(stations)) {
List<Map<String,Object>> result = new ArrayList<>();
stations.forEach(station -> {
Map<String,Object> map = new HashMap<>();
map.put("id",station.getStationId());
map.put("stationCode",station.getStationCode());
map.put("lon",station.getLon());
map.put("lat",station.getLat());
result.add(map);
});
return result;
}
return List.of();
}
/**
@ -52,22 +66,27 @@ public class StationDataServiceImpl implements StationDataService {
* @return
*/
@Override
public List<GardsNuclearfacility> getAllNuclearfacility() {
public List<Map<String,Object>> getAllNuclearfacility() {
List<GardsNuclearReactors> nuclearReactors;
if(redisUtil.hasKey(CommonConstant.ALL_NUCLEARFACILITY)){
return (List<GardsNuclearfacility>) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY);
nuclearReactors = (List<GardsNuclearReactors>) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY);
}else {
LambdaQueryWrapper<GardsNuclearfacility> queryWrapper = new LambdaQueryWrapper<>();
List<GardsNuclearfacility> nuclearfacilities = nuclearfacilityMapper.selectList(queryWrapper);
nuclearfacilities.forEach(nuclearfacility -> {
//数据库经纬度存储的是反的所以这里反着处理
Double lon = CoordinateTransformUtil.lonAndLatConversion(nuclearfacility.getLatitude());
Double lat = CoordinateTransformUtil.lonAndLatConversion(nuclearfacility.getLongitude());
nuclearfacility.setLonValue(lon);
nuclearfacility.setLatValue(lat);
});
redisUtil.set(CommonConstant.ALL_NUCLEARFACILITY, nuclearfacilities);
return nuclearfacilities;
nuclearReactors = nuclearReactorsMapper.selectList(new LambdaQueryWrapper<>());
redisUtil.set(CommonConstant.ALL_NUCLEARFACILITY, nuclearReactors);
}
if (CollUtil.isNotEmpty(nuclearReactors)) {
List<Map<String,Object>> result = new ArrayList<>();
nuclearReactors.forEach(nuclearReactor -> {
Map<String,Object> map = new HashMap<>();
map.put("id",nuclearReactor.getId());
map.put("unitName",nuclearReactor.getUnitName());
map.put("lonValue",nuclearReactor.getLongitude());
map.put("latValue",nuclearReactor.getLatitude());
result.add(map);
});
return result;
}
return List.of();
}
/**

View File

@ -16,7 +16,7 @@ import org.jeecg.common.util.NcUtil;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.TransportTask;
import org.jeecg.modules.base.entity.TransportTaskChild;
import org.jeecg.modules.base.entity.configuration.GardsNuclearfacility;
import org.jeecg.modules.base.entity.configuration.GardsNuclearReactors;
import org.jeecg.modules.base.entity.configuration.GardsStations;
import org.jeecg.modules.base.mapper.TransportTaskChildMapper;
import org.jeecg.modules.base.mapper.TransportTaskMapper;
@ -386,7 +386,7 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic
List<TransportTaskChild> stationInfos = this.transportTaskChildMapper.selectList(queryWrapper);
//所以核设施数据
List<GardsNuclearfacility> facilitys = stationDataService.getAllNuclearfacility();
List<GardsNuclearReactors> facilitys = (List<GardsNuclearReactors>) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY);
NetcdfFile ncFile = null;
try {
@ -423,19 +423,19 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic
//累加台站位置当前天的浓度数据
stationEveryDayConcDatas.put(dayStr,stationEveryDayConcDatas.get(dayStr)+stationConc);
for (GardsNuclearfacility facility : facilitys){
Double facilityConc = this.getTargetSiteConc(lonData,latData,facility.getLonValue(),facility.getLatValue(),k,spec001Mr);
for (GardsNuclearReactors facility : facilitys){
Double facilityConc = this.getTargetSiteConc(lonData,latData,facility.getLongitude(),facility.getLatitude(),k,spec001Mr);
if (facilityConc>0){
if (!facilityEveryDayConcDatas.containsKey(dayStr)) {
Map<String,Double> facilityConcMap = new HashMap<>();
facilityConcMap.put(facility.getFacilityName(),facilityConc);
facilityConcMap.put(facility.getUnitName(),facilityConc);
facilityEveryDayConcDatas.put(dayStr,facilityConcMap);
}else {
Map<String,Double> facilityConcMap = facilityEveryDayConcDatas.get(dayStr);
if (!facilityConcMap.containsKey(facility.getFacilityName())) {
facilityConcMap.put(facility.getFacilityName(),facilityConc);
if (!facilityConcMap.containsKey(facility.getUnitName())) {
facilityConcMap.put(facility.getUnitName(),facilityConc);
}else {
facilityConcMap.put(facility.getFacilityName(),facilityConcMap.get(facility.getFacilityName())+facilityConc);
facilityConcMap.put(facility.getUnitName(),facilityConcMap.get(facility.getUnitName())+facilityConc);
}
}
}
@ -498,7 +498,8 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic
throw new RuntimeException("此任务站点信息不存在,请确认任务配置信息");
}
//获取所有台站数据优先缓存
Map<String, Integer> stationsMap = stationDataService.getAllStations().stream().collect(Collectors.toMap(GardsStations::getStationCode,GardsStations::getStationId));
List<GardsStations> stations = (List<GardsStations>) redisUtil.get(CommonConstant.ALL_STATIONS);
Map<String, Integer> stationsMap = stations.stream().collect(Collectors.toMap(GardsStations::getStationCode,GardsStations::getStationId));
//本任务模拟的台站数据
List<TaskStationsVO> taskStationsVOList = new ArrayList<>();
for (int i = 0; i<transportTaskChildren.size();i++) {
@ -626,7 +627,7 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic
List<TransportTaskChild> stationInfos = this.transportTaskChildMapper.selectList(queryWrapper);
//所以核设施数据
List<GardsNuclearfacility> facilitys = stationDataService.getAllNuclearfacility();
List<GardsNuclearReactors> facilitys = (List<GardsNuclearReactors>) redisUtil.get(CommonConstant.ALL_NUCLEARFACILITY);
NetcdfFile ncFile = null;
try {
@ -640,7 +641,7 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic
List<Double> latData = NcUtil.getNCList(ncFile, "latitude");
List<Double> timeData = NcUtil.getNCList(ncFile, "time");
Variable spec001Mr = ncFile.findVariable("spec001_mr");
for (GardsNuclearfacility facility : facilitys){
for (GardsNuclearReactors facility : facilitys){
//存储台站每步的浓度值数据
Map<String,Double> everyStepConcDatas = new LinkedHashMap<>();
for(int k=0;k<timeData.size();k++){
@ -650,10 +651,10 @@ public class TransportResultDataServiceImpl implements TransportResultDataServic
localDateTime = localDateTime.plusSeconds(timeData.get(k).intValue());
String dayTimeStr = LocalDateTimeUtil.format(localDateTime, "yyyy-MM-dd HH:mm:ss");
//获取台站点位取整后±1.5度内的点坐标及数据使用插值算法求台站点位的值
Double facilityConc = this.getTargetSiteConc(lonData,latData,facility.getLonValue(),facility.getLatValue(),k,spec001Mr);
Double facilityConc = this.getTargetSiteConc(lonData,latData,facility.getLongitude(),facility.getLatitude(),k,spec001Mr);
everyStepConcDatas.put(dayTimeStr,facilityConc);
}
everyFacilityConcDatas.put(facility.getFacilityName(),everyStepConcDatas);
everyFacilityConcDatas.put(facility.getUnitName(),everyStepConcDatas);
}
if(CollUtil.isNotEmpty(everyFacilityConcDatas)){
everyFacilityConcDatas.forEach((facilityName,facilityConcData)->{

View File

@ -46,7 +46,6 @@ public class WeatherTaskController {
@Operation(summary = "新增天气预测任务")
@PostMapping("create")
public Result<?> create(@Validated(value = InsertGroup.class) WeatherTask weatherTask){
System.out.println(weatherTask);
weatherTaskService.cteate(weatherTask);
return Result.OK();
}

View File

@ -82,10 +82,11 @@ public interface WeatherTaskService extends IService<WeatherTask> {
void deleteTaskLog(String taskId);
/**
* 修改任务状态为结束
* 修改任务状态为结束(完成或异常停止)
* @param taskId
* @param timeConsuming
* @param taskStatus
*/
void updateTaskStatusToCompleted(String taskId, Double timeConsuming);
void updateTaskStatusToCompleted(String taskId, Double timeConsuming,Integer taskStatus);
}

View File

@ -1,5 +1,6 @@
package org.jeecg.service.impl;
import cn.hutool.core.io.FileUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
@ -102,15 +103,20 @@ public class WeatherTaskServiceImpl extends ServiceImpl<WeatherTaskMapper, Weath
fileName.append("_"+id+".");
fileName.append(WeatherFileSuffixEnum.GRIB.getValue());
//文件保存路径
//文件保存目录
StringBuilder filePath = new StringBuilder();
filePath.append(this.systemStorageProperties.getPanguModelExecPath());
filePath.append(File.separator);
filePath.append(fileName);
File storageFile = new File(filePath.toString());
if (storageFile.exists()){
storageFile.delete();
filePath.append(id);
//文件地址
File storageFile = new File(filePath+File.separator+fileName);
//如果不存在则创建
if(!FileUtil.exist(filePath.toString())){
FileUtil.mkdir(filePath.toString());
}else{
if (storageFile.exists()){
storageFile.delete();
}
}
file.transferTo(storageFile);
weatherTask.setInputFile(fileName.toString());
@ -279,16 +285,15 @@ public class WeatherTaskServiceImpl extends ServiceImpl<WeatherTaskMapper, Weath
}
/**
* 修改任务状态为结束
*
* 修改任务状态为结束(完成或异常停止)
* @param taskId
* @param timeConsuming
*/
@Transactional(rollbackFor = RuntimeException.class)
@Override
public void updateTaskStatusToCompleted(String taskId, Double timeConsuming) {
public void updateTaskStatusToCompleted(String taskId, Double timeConsuming,Integer taskStatus) {
WeatherTask weatherTask = this.baseMapper.selectById(taskId);
weatherTask.setTaskStatus(WeatherTaskStatusEnum.COMPLETED.getKey());
weatherTask.setTaskStatus(taskStatus);
weatherTask.setTimeConsuming(timeConsuming);
this.updateById(weatherTask);
}

View File

@ -1,6 +1,7 @@
package org.jeecg.task;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.date.LocalDateTimeUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
@ -28,6 +29,7 @@ import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
@ -74,20 +76,35 @@ public class WeatherTaskExec extends Thread{
this.weatherTaskService.deleteTaskLog(this.weatherTask.getId());
//执行模拟
this.execSimulation();
//执行完成
this.execComplete(stopWatch,WeatherTaskStatusEnum.COMPLETED.getKey(), "");
}catch (Exception e){
String taskErrorLog = "任务执行失败,原因:"+e.getMessage();
ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),taskErrorLog));
//执行失败
this.execComplete(stopWatch,WeatherTaskStatusEnum.FAILURE.getKey(), taskErrorLog);
throw e;
}finally {
//添加任务耗时
stopWatch.stop();
long seconds = stopWatch.getTime(TimeUnit.SECONDS);
double min = seconds/60D;
BigDecimal bgMin = new BigDecimal(min);
BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP);
this.weatherTaskService.updateTaskStatusToCompleted(this.weatherTask.getId(),result.doubleValue());
}
}
/**
* 任务执行完成
* @param stopWatch
* @param taskStatus
* @param taskErrorLog
*/
private void execComplete(StopWatch stopWatch,Integer taskStatus,String taskErrorLog){
//添加任务耗时
stopWatch.stop();
long seconds = stopWatch.getTime(TimeUnit.SECONDS);
double min = seconds/60D;
BigDecimal bgMin = new BigDecimal(min);
BigDecimal result = bgMin.setScale(2, RoundingMode.HALF_UP);
this.weatherTaskService.updateTaskStatusToCompleted(this.weatherTask.getId(),result.doubleValue(),taskStatus);
if (WeatherTaskStatusEnum.COMPLETED.getKey().equals(taskStatus)){
String taskCompletedLog = "任务执行完成,耗时:"+result+"分钟";
ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),taskCompletedLog));
}else if (WeatherTaskStatusEnum.FAILURE.getKey().equals(taskStatus)){
ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),taskErrorLog));
}
}
@ -95,6 +112,51 @@ public class WeatherTaskExec extends Thread{
* 执行模拟
*/
private void execSimulation(){
try {
//输入文件
String inputFileName = this.weatherTask.getInputFile();
//预测文件
String outputFileName = "panguweather_output_"+this.weatherTask.getId()+".grib";
//定义名称后缀,执行grib_set命令时去除否则命名会冲突
String gribFileSuffix = "_not_grib_set";
//临时目录grib_copy,grib_set,python转换都在这个目录里最后删除
String tempDir = systemStorageProperties.getPanguModelExecPath()+File.separator+this.weatherTask.getId();
if(!FileUtil.exist(tempDir)){
FileUtil.mkdir(tempDir);
}
//预测气象数据
this.forecast(inputFileName,outputFileName);
//切割文件6小时一个
this.splitGrib(outputFileName,gribFileSuffix,tempDir);
//适配flexpart
this.convertGrib(gribFileSuffix,tempDir);
//保存文件数据入库
this.saveGribInfoToDB(tempDir,this.getPanguWeatherPath());
//删除预测文件
File inputFile = new File(systemStorageProperties.getPanguModelExecPath()+File.separator+inputFileName);
if (inputFile.exists()){
inputFile.delete();
}
//删除输出的总文件
File outputFile = new File(systemStorageProperties.getPanguModelExecPath()+File.separator+outputFileName);
if (outputFile.exists()){
outputFile.delete();
}
//适配flexpart结束删除目录
if(FileUtil.exist(tempDir)){
FileUtil.del(tempDir);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* 盘古预测
* @param inputFileAddr
* @param outputFileAddr
*/
private void forecast(String inputFileAddr,String outputFileAddr){
try {
//处理开始日志
String forecastModel = "";
@ -113,11 +175,10 @@ public class WeatherTaskExec extends Thread{
}
String startLog = String.format(startLogFormat,forecastModel,startTimeFormat,forecastTime);
ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),startLog));
//处理运行命令
List<String> command = new ArrayList<>();
String aiModelsPath = systemStorageProperties.getAiModelsPath()+File.separator+"ai-models";
String fileName = "panguweather_output_"+this.weatherTask.getId()+".grib";
String aiModelsPath = systemStorageProperties.getPanguEnvPath()+File.separator+"ai-models";
if(this.weatherTask.getDataSources().equals(WeatherForecastDatasourceEnum.CDS.getKey())){
command.add(aiModelsPath);
command.add("--input");
@ -129,7 +190,7 @@ public class WeatherTaskExec extends Thread{
command.add("--lead-time");
command.add(this.weatherTask.getLeadTime().toString());
command.add("--path");
command.add(fileName);
command.add(outputFileAddr);
command.add("--assets");
command.add("assets-panguweather");
command.add("panguweather");
@ -137,11 +198,11 @@ public class WeatherTaskExec extends Thread{
//离线文件还需处理
command.add(aiModelsPath);
command.add("--file");
command.add(this.weatherTask.getInputFile());
command.add(inputFileAddr);
command.add("--lead-time");
command.add(this.weatherTask.getLeadTime().toString());
command.add("--path");
command.add(fileName);
command.add(outputFileAddr);
command.add("--assets");
command.add("assets-panguweather");
command.add("panguweather");
@ -162,137 +223,175 @@ public class WeatherTaskExec extends Thread{
}
//等待进程结束
process.waitFor();
}catch (Exception e){
throw new RuntimeException("盘古模型预测异常",e);
}
}
String handleGribLog = "预测结束开始处理grib文件";
/**
* 分割grib文件
* @param outputFileAddr
* @param gribFileSuffix
* @param gribCopyTargetDir
*/
private void splitGrib(String outputFileAddr,String gribFileSuffix,String gribCopyTargetDir){
try {
String handleGribLog = "预测结束开始处理grib文件,切割为6小时一个文件";
ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),handleGribLog));
LocalDateTime startTime = this.weatherTask.getStartDate().atTime(this.weatherTask.getStartTime(), 0, 0);
//grib_copy命令
String gribCopyCommandPath = systemStorageProperties.getAiModelsPath()+File.separator+"grib_copy";
String gribCopyCommandPath = systemStorageProperties.getPanguEnvPath()+File.separator+"grib_copy";
//grib_set命令
String gribSetCommandPath = systemStorageProperties.getAiModelsPath()+File.separator+"grib_set";
//需删除文件
List<String> delFiles = new ArrayList<>();
//需移动文件
List<String> moveFiles = new ArrayList<>();
String gribSetCommandPath = systemStorageProperties.getPanguEnvPath()+File.separator+"grib_set";
//公用gribProcessBuilder
ProcessBuilder gribProcessBuilder = new ProcessBuilder();
gribProcessBuilder.directory(new File(systemStorageProperties.getPanguModelExecPath()));
//定义名称后缀,执行grib_set命令时去除否则命名会冲突
String gribFileSuffix = "_not_grib_set";
//把grib文件切割成每6小时一份
int step = 6;
int i=this.weatherTask.getStartTime();
while(i <= this.weatherTask.getLeadTime()){
String gribCopyFileName = "panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+gribFileSuffix+".grib";
delFiles.add(gribCopyFileName);
String gribCopyFileAddr = gribCopyTargetDir+File.separator+"panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+gribFileSuffix+".grib";
//切割grib文件命令
List<String> gribCopyCommand = new ArrayList<>();
gribCopyCommand.add(gribCopyCommandPath);
gribCopyCommand.add("-w");
gribCopyCommand.add("step="+i);
gribCopyCommand.add(fileName);
gribCopyCommand.add(gribCopyFileName);
gribCopyCommand.add(outputFileAddr);
gribCopyCommand.add(gribCopyFileAddr);
System.out.println("gribCopyCommand:"+gribCopyCommand);
gribProcessBuilder.command(gribCopyCommand);
Process gribCopyProcess = gribProcessBuilder.start();
gribCopyProcess.waitFor();
//重新设置reftime信息
String gribSetFileName = "panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+".grib";
moveFiles.add(gribSetFileName);
String gribSetFileAddr = gribCopyTargetDir+File.separator+"panguweather_"+LocalDateTimeUtil.format(startTime,"yyyyMMddHH")+".grib";
String date = LocalDateTimeUtil.format(startTime,"yyyyMMdd");
String time = LocalDateTimeUtil.format(startTime,"HHmm");
List<String> gribSetCommand = new ArrayList<>();
gribSetCommand.add(gribSetCommandPath);
gribSetCommand.add("-s");
gribSetCommand.add("dataDate="+date+",dataTime="+time+",endStep="+0);
gribSetCommand.add(gribCopyFileName);
gribSetCommand.add(gribSetFileName);
gribSetCommand.add(gribCopyFileAddr);
gribSetCommand.add(gribSetFileAddr);
System.out.println("gribSetCommand:"+gribSetCommand);
gribProcessBuilder.command(gribSetCommand);
Process gribSetProcess = gribProcessBuilder.start();
gribSetProcess.waitFor();
i+=step;
startTime = startTime.plusHours(step);
}
if (CollUtil.isNotEmpty(moveFiles)) {
for (String moveFile : moveFiles) {
File srcFile = new File(systemStorageProperties.getPanguModelExecPath()+File.separator+moveFile);
if (srcFile.exists()) {
File targetFile = new File(this.getPanguWeatherPath()+File.separator+File.separator+moveFile);
FileUtil.move(srcFile,targetFile,true);
//保存生成的气象数据存储到数据库
this.saveGribInfoToDB(targetFile);
}
}
}
//删除gribCopy产生的文件
for (String delFile : delFiles) {
File srcFile = new File(systemStorageProperties.getPanguModelExecPath()+File.separator+delFile);
if (srcFile.exists()) {
srcFile.delete();
}
}
//删除预测的结果文件
String outputFilePath = systemStorageProperties.getPanguModelExecPath()+File.separator+fileName;
File outputFile = new File(outputFilePath);
if(outputFile.exists()){
outputFile.delete();
}
}catch (Exception e){
throw new RuntimeException("分割grib文件异常",e);
}
}
//如果是本地文件进行预测预测结束后删除文件
if(this.weatherTask.getDataSources().equals(WeatherForecastDatasourceEnum.LOCATION_FILE.getKey())){
String inputFilePath = systemStorageProperties.getPanguModelExecPath()+File.separator+this.weatherTask.getInputFile();
File inputFile = new File(inputFilePath);
if(inputFile.exists()){
inputFile.delete();
/**
* 转换grib
* @param gribFileSuffix
* @param gribFilePath
*/
private void convertGrib(String gribFileSuffix,String gribFilePath){
try {
// python3 pangu_reformat.py --ini_dir /export/xe_bk_data/weather/pangu_format_script --output_dir /export/xe_bk_data/weather/pangu2 --input_dir /export/xe_bk_data/weather/pangu2 --start_date 20251120 --end_date 20251120
//调用python脚本转换flexpart能识别的气象数据文件
//删除带有_not_grib_set名称的文件此文件还未设置reftime属性
List<File> files = FileUtil.loopFiles(gribFilePath, file -> file.getName().contains(gribFileSuffix));
if(!files.isEmpty()){
for(File file:files){
file.delete();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
String adapterGribLog = "文件切割结束开始调用python脚本适配flexpart模型";
ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),adapterGribLog));
//构建转换命令
String convertScriptPath = systemStorageProperties.getPanguFormatScriptPath();
String inputPath = systemStorageProperties.getPanguModelExecPath()+File.separator+this.weatherTask.getId();
String outPutPath = inputPath;
String startDate = this.weatherTask.getStartDate().format(DateTimeFormatter.ofPattern("yyyyMMdd"));
String endDate = this.weatherTask.getStartDate().atTime(this.weatherTask.getStartTime(), 0, 0).plusHours(this.weatherTask.getLeadTime()).format(DateTimeFormatter.ofPattern("yyyyMMdd"));
List<String> convertCommand = new ArrayList<>();
convertCommand.add(systemStorageProperties.getFormatScriptPythonEnv()+File.separator+"python3");
convertCommand.add("pangu_reformat.py");
convertCommand.add("--ini_dir");
convertCommand.add(convertScriptPath);
convertCommand.add("--output_dir");
convertCommand.add(outPutPath);
convertCommand.add("--input_dir");
convertCommand.add(inputPath);
convertCommand.add("--start_date");
convertCommand.add(startDate);
convertCommand.add("--end_date");
convertCommand.add(endDate);
System.out.println("convertCommand:"+convertCommand);
ProcessBuilder adapterGribProcessBuilder = new ProcessBuilder();
adapterGribProcessBuilder.directory(new File(convertScriptPath));
adapterGribProcessBuilder.command(convertCommand);
Process adapterGribProcess = adapterGribProcessBuilder.start();
adapterGribProcess.waitFor();
}catch (Exception e){
throw new RuntimeException("适配flexpart转换grib异常",e);
}
}
/**
* 保存生成的气象数据存储到数据库
* @param file
* @param inputPath
* @param targetPath
*/
private void saveGribInfoToDB(File file){
//获取文件数据开始日期
String reftime = NcUtil.getReftime(file.getAbsolutePath());
if(StringUtils.isBlank(reftime)) {
throw new JeecgFileUploadException("解析气象文件起始时间数据异常,此文件可能损坏");
private void saveGribInfoToDB(String inputPath,String targetPath){
String handleGribLog = "格式转换结束处理grib文件数据入库";
ProgressQueue.getInstance().offer(new ProgressEvent(this.weatherTask.getId(),handleGribLog));
List<File> files = FileUtil.loopFiles(inputPath,file -> file.getName().startsWith("pangu_"));
List<File> targetFiles = new ArrayList<>();
if(!files.isEmpty()){
for(File file:files){
File targetFile = new File(targetPath+File.separator+file.getName());
FileUtil.move(file,targetFile,true);
if(targetFile.exists()){
targetFiles.add(targetFile);
}
}
for(File targetFile:targetFiles){
//获取文件数据开始日期
String reftime = NcUtil.getReftime(targetFile.getAbsolutePath());
if(StringUtils.isBlank(reftime)) {
throw new JeecgFileUploadException("解析气象文件起始时间数据异常,此文件可能损坏");
}
//计算文件大小M
BigDecimal divideVal = new BigDecimal("1024");
BigDecimal bg = new BigDecimal(targetFile.length());
BigDecimal fileSize = bg.divide(divideVal).divide(divideVal).setScale(2, RoundingMode.HALF_UP);
//处理文件数据开始时间
Instant instant = Instant.parse(reftime);
LocalDateTime utcDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC"));
//计算文件MD5值
String md5Val = "";
try (InputStream is = new FileInputStream(targetFile.getAbsolutePath())) {
md5Val = DigestUtils.md5Hex(is);
}catch (Exception e){
throw new RuntimeException(targetFile.getName()+"MD5值计算失败");
}
//校验文件是否存在存在删除从新新增
LambdaQueryWrapper<WeatherData> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(WeatherData::getFileName,targetFile.getName());
WeatherData queryResult = weatherDataMapper.selectOne(queryWrapper);
if(Objects.nonNull(queryResult)){
weatherDataMapper.deleteById(queryResult.getId());
}
//构建文件信息
WeatherData weatherData = new WeatherData();
weatherData.setFileName(targetFile.getName());
weatherData.setFileSize(fileSize.doubleValue());
weatherData.setFileExt(targetFile.getName().substring(targetFile.getName().lastIndexOf(".")+1));
weatherData.setDataStartTime(utcDateTime);
weatherData.setDataSource(weatherTask.getPredictionModel());
weatherData.setFilePath(targetFile.getAbsolutePath());
weatherData.setMd5Value(md5Val);
weatherData.setShareTotal(1);
weatherDataMapper.insert(weatherData);
}
}
//计算文件大小M
BigDecimal divideVal = new BigDecimal("1024");
BigDecimal bg = new BigDecimal(file.length());
BigDecimal fileSize = bg.divide(divideVal).divide(divideVal).setScale(2, RoundingMode.HALF_UP);
//处理文件数据开始时间
Instant instant = Instant.parse(reftime);
LocalDateTime utcDateTime = LocalDateTime.ofInstant(instant, ZoneId.of("UTC"));
//计算文件MD5值
String md5Val = "";
try (InputStream is = new FileInputStream(file.getAbsolutePath())) {
md5Val = DigestUtils.md5Hex(is);
}catch (Exception e){
throw new RuntimeException(file.getName()+"MD5值计算失败");
}
//校验文件是否存在存在删除从新新增
LambdaQueryWrapper<WeatherData> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(WeatherData::getFileName,file.getName());
WeatherData queryResult = weatherDataMapper.selectOne(queryWrapper);
if(Objects.nonNull(queryResult)){
weatherDataMapper.deleteById(queryResult.getId());
}
//构建文件信息
WeatherData weatherData = new WeatherData();
weatherData.setFileName(file.getName());
weatherData.setFileSize(fileSize.doubleValue());
weatherData.setFileExt(file.getName().substring(file.getName().lastIndexOf(".")+1));
weatherData.setDataStartTime(utcDateTime);
weatherData.setDataSource(weatherTask.getPredictionModel());
weatherData.setFilePath(file.getAbsolutePath());
weatherData.setMd5Value(md5Val);
weatherData.setShareTotal(1);
weatherDataMapper.insert(weatherData);
}
/**
@ -308,7 +407,7 @@ public class WeatherTaskExec extends Thread{
}
/**
* 获取盘古数据存储路径
* 获取Graphcast数据存储路径
* @return
*/
private String getGraphcastWeatherPath(){