From d276b60d4d6a2ebae760e3262286c65132770f0d Mon Sep 17 00:00:00 2001 From: hekaiyu <13673834656@163.com> Date: Fri, 14 Nov 2025 09:30:00 +0800 Subject: [PATCH] =?UTF-8?q?=E5=89=82=E9=87=8F=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../jeecg/common/constant/EventConstants.java | 30 ++++ .../properties/DiffusionProperties.java | 29 ---- .../org/jeecg/common/util/ExecutePyUtils.java | 91 +++++++++++ .../java/org/jeecg/common/util/NcUtil.java | 11 +- .../service/impl/BaseAPIServiceImpl.java | 1 - .../controller/DoseDataController.java | 31 ++++ .../diffusion/service/DoseDataService.java | 9 ++ .../impl/DiffusionDataServiceImpl.java | 52 ++----- .../service/impl/DoseDataServiceImpl.java | 146 ++++++++++++++++++ .../org/jeecg/diffusion/vo/DoseResultVO.java | 14 ++ .../jeecg/runProcess/controller/TestMain.java | 104 +++++++++++++ .../service/impl/RunProcessServiceImpl.java | 124 +++++++++++---- .../org/jeecg/wrf/service/WrfService.java | 2 +- .../wrf/service/impl/WrfServiceImpl.java | 2 +- .../java/org/jeecg/job/DownloadT1hJob.java | 82 +--------- 15 files changed, 544 insertions(+), 184 deletions(-) delete mode 100644 jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DiffusionProperties.java create mode 100644 jeecg-boot-base-core/src/main/java/org/jeecg/common/util/ExecutePyUtils.java create mode 100644 jeecg-module-event/src/main/java/org/jeecg/diffusion/controller/DoseDataController.java create mode 100644 jeecg-module-event/src/main/java/org/jeecg/diffusion/service/DoseDataService.java create mode 100644 jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DoseDataServiceImpl.java create mode 100644 jeecg-module-event/src/main/java/org/jeecg/diffusion/vo/DoseResultVO.java create mode 100644 jeecg-module-event/src/main/java/org/jeecg/runProcess/controller/TestMain.java diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/EventConstants.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/EventConstants.java index 7f372f8..4adbb03 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/EventConstants.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/EventConstants.java @@ -3,4 +3,34 @@ package org.jeecg.common.constant; public class EventConstants { public static final String WRF_DIR = "wrf"; + public static final String CONC_DIR = "cctm"; + public static final String METCR03D_DIR = "mcip"; + public static final String EMIS_DIR = "data/emis/d03"; + public static final String GK_NAME = "data_48.xlsx"; + + public static final String MERGE_RESULTS = "merge_results_to_nc.py"; + public static final String SUM_DRY_WET = "sum_dry_wet_to_nc.py"; + public static final String CONVERT_CONC_TO_DOSE = "convert_conc_to_dose.py"; + + public static final String DEP_VNAMES = "ASIJ"; + public static final String WRF_VNAMES = "XLAT,XLONG,HGT,U,V,W"; + public static final String CONC_VNAMES = "CO,ASIJ"; + public static final String METCR03D_VNAMES = "TA,PRES"; + public static final String EMIS_VNAMES = "CO,PSI"; + + public static final String WRF_PATTERN = "wrfout_d03_{YYYY-MM-DD}_00_00_00"; + public static final String CONC_PATTERN = "CCTM.CONC.d03.{YYYYMMDD"; + public static final String METCR03D_PATTERN = "METCR03D_d03_{YYYYMMDD}"; + public static final String EMIS_PATTERN = "emis_{YYYYMMDD}"; + + public static final String DEP_OUT_PREFIX = "out_dep_"; + public static final String WRF_OUT_PREFIX = "out_wrf_"; + public static final String CONC_OUT_PREFIX = "out_conc_"; + public static final String METCR03D_OUT_PREFIX = "out_metcr03d_"; + public static final String EMIS_OUT_PREFIX = "out_emis_"; + public static final String DOSE_OUT_PREFIX = "out_dose_"; + + + + } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DiffusionProperties.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DiffusionProperties.java deleted file mode 100644 index 59cbf3e..0000000 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DiffusionProperties.java +++ /dev/null @@ -1,29 +0,0 @@ -package org.jeecg.common.properties; - -import lombok.Data; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.stereotype.Component; - -@Data -@Component -@ConfigurationProperties(prefix = "diffusion") -public class DiffusionProperties { - - /** - * 扩散环境目录 - */ - private String diffusionPath; - - /** - * wrf数据目录 - */ - private String wrfPath; - - - /** - * cmaq数据目录 - */ - private String cmaqPath; - - -} diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/ExecutePyUtils.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/ExecutePyUtils.java new file mode 100644 index 0000000..24e2a35 --- /dev/null +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/ExecutePyUtils.java @@ -0,0 +1,91 @@ +package org.jeecg.common.util; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.util.StopWatch; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Slf4j +public class ExecutePyUtils { + + private static final int PROCESS_TIMEOUT_SECONDS = 3600; // 30分钟超时 + /** + * 执行Python进程 + */ + public static void executePythonProcess(ProcessBuilder processBuilder, String processDescription) { + StopWatch processWatch = new StopWatch(); + processWatch.start(); + Process process = null; + try { + + process = processBuilder.start(); + + // 异步处理输出流 + CompletableFuture outputFuture = readStreamAsync(process.getInputStream(), "OUTPUT"); + CompletableFuture errorFuture = readStreamAsync(process.getErrorStream(), "ERROR"); + + // 等待进程完成(带超时) + boolean finished = process.waitFor(PROCESS_TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!finished) { + process.destroyForcibly(); + throw new RuntimeException(processDescription + "脚本执行超时"); + } + + // 等待输出处理完成 + CompletableFuture.allOf(outputFuture, errorFuture) + .get(10, TimeUnit.SECONDS); + + int exitCode = process.exitValue(); + processWatch.stop(); + + if (exitCode == 0) { + log.info("{}脚本执行完成,耗时: {} 毫秒", + processDescription, processWatch.getTotalTimeMillis()); + } else { + throw new RuntimeException(processDescription + + "脚本执行失败,退出码: " + exitCode); + } + + } catch (TimeoutException e) { + throw new RuntimeException(processDescription + "脚本执行超时", e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(processDescription + "脚本执行被中断", e); + } catch (Exception e) { + throw new RuntimeException(processDescription + "脚本执行失败", e); + } finally { + if (process != null) { + process.destroy(); + } + } + } + + /** + * 异步读取流 + */ + private static CompletableFuture readStreamAsync(InputStream inputStream, String type) { + return CompletableFuture.runAsync(() -> { + try (BufferedReader reader = new BufferedReader( + new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + + String line; + while ((line = reader.readLine()) != null) { + if ("ERROR".equals(type)) { + log.error("[Python {}] {}", type, line); + } else { + log.info("[Python {}] {}", type, line); + } + } + } catch (IOException e) { + log.error("处理Python{}流失败", type, e); + } + }); + } +} diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NcUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NcUtil.java index 8213643..240b671 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NcUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NcUtil.java @@ -155,7 +155,16 @@ public final class NcUtil { private static List> process2DData(Array data, Index index, int[] shape, String name, Integer layer, Integer hour) { List> resultAll; - if (shape.length == 3) { + if (shape.length == 2) { + resultAll = new ArrayList<>(shape[0]); + for (int j = 0; j < shape[0]; j++) { + List strings = new ArrayList<>(shape[1]); + for (int k = 0; k < shape[1]; k++) { + strings.add(processValue(data.getDouble(index.set(j, k)), name)); + } + resultAll.add(strings); + } + }else if (shape.length == 3) { resultAll = new ArrayList<>(shape[1]); for (int j = 0; j < shape[1]; j++) { List strings = new ArrayList<>(shape[2]); diff --git a/jeecg-module-event/src/main/java/org/jeecg/baseAPI/service/impl/BaseAPIServiceImpl.java b/jeecg-module-event/src/main/java/org/jeecg/baseAPI/service/impl/BaseAPIServiceImpl.java index 4204a87..89981b6 100644 --- a/jeecg-module-event/src/main/java/org/jeecg/baseAPI/service/impl/BaseAPIServiceImpl.java +++ b/jeecg-module-event/src/main/java/org/jeecg/baseAPI/service/impl/BaseAPIServiceImpl.java @@ -2,7 +2,6 @@ package org.jeecg.baseAPI.service.impl; import lombok.RequiredArgsConstructor; import org.jeecg.baseAPI.service.BaseAPIService; -import org.jeecg.common.properties.DiffusionProperties; import org.jeecg.common.properties.EventServerProperties; import org.springframework.stereotype.Service; diff --git a/jeecg-module-event/src/main/java/org/jeecg/diffusion/controller/DoseDataController.java b/jeecg-module-event/src/main/java/org/jeecg/diffusion/controller/DoseDataController.java new file mode 100644 index 0000000..4530340 --- /dev/null +++ b/jeecg-module-event/src/main/java/org/jeecg/diffusion/controller/DoseDataController.java @@ -0,0 +1,31 @@ +package org.jeecg.diffusion.controller; + +import io.swagger.v3.oas.annotations.Operation; +import lombok.RequiredArgsConstructor; +import org.jeecg.common.api.vo.Result; +import org.jeecg.common.aspect.annotation.AutoLog; +import org.jeecg.diffusion.service.DiffusionDataService; +import org.jeecg.diffusion.service.DoseDataService; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + + +@Validated +@RestController +@RequestMapping("doseData") +@RequiredArgsConstructor +public class DoseDataController { + + private final DoseDataService doseDataService; + + + @AutoLog(value = "查询剂量数据") + @Operation(summary = "查询剂量数据") + @GetMapping("getDoseResult") + public Result getDoseResult(String enginId, int hour, int layer){ + return Result.ok(doseDataService.getDoseResult(enginId, hour, layer)); + } + +} diff --git a/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/DoseDataService.java b/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/DoseDataService.java new file mode 100644 index 0000000..83980e7 --- /dev/null +++ b/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/DoseDataService.java @@ -0,0 +1,9 @@ +package org.jeecg.diffusion.service; + +import org.jeecg.diffusion.vo.DoseResultVO; + +public interface DoseDataService { + + DoseResultVO getDoseResult(String enginId, int hour, int layer); + +} diff --git a/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DiffusionDataServiceImpl.java b/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DiffusionDataServiceImpl.java index 36b353a..b4456e3 100644 --- a/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DiffusionDataServiceImpl.java +++ b/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DiffusionDataServiceImpl.java @@ -1,35 +1,25 @@ package org.jeecg.diffusion.service.impl; -import cn.hutool.core.date.DateUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jeecg.baseAPI.service.BaseAPIService; import org.jeecg.baseAPI.utils.DateTimeUtils; -import org.jeecg.common.api.vo.Result; -import org.jeecg.common.aspect.annotation.AutoLog; import org.jeecg.common.constant.DiffusionPrefixConstants; +import org.jeecg.common.constant.EventConstants; import org.jeecg.common.exception.JeecgBootException; -import org.jeecg.common.properties.DiffusionProperties; import org.jeecg.common.properties.EventServerProperties; -import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.NcUtil; import org.jeecg.diffusion.service.DiffusionDataService; import org.jeecg.diffusion.vo.DiffusionResultVO; -import org.jeecg.modules.base.entity.Cmaq; import org.jeecg.modules.base.entity.Engineering; import org.jeecg.modules.base.entity.Wrf; import org.jeecg.modules.base.mapper.CmaqMapper; import org.jeecg.modules.base.mapper.EngineeringMapper; import org.jeecg.modules.base.mapper.WrfMapper; -import org.springframework.jdbc.datasource.DataSourceTransactionManager; import org.springframework.stereotype.Service; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; import ucar.nc2.NetcdfFile; -import ucar.nc2.dataset.NetcdfDataset; import java.io.File; import java.io.IOException; @@ -48,42 +38,22 @@ public class DiffusionDataServiceImpl implements DiffusionDataService { private final EventServerProperties eventServerProperties; private final BaseAPIService baseAPIService; private final WrfMapper wrfMapper; - private final CmaqMapper cmaqMapper; private final EngineeringMapper engineeringMapper; @Override public DiffusionResultVO getDiffusionResult(String enginId, int hour, int layer) { Engineering engineering = engineeringMapper.selectById(enginId); Wrf wrf = wrfMapper.selectOne(new LambdaQueryWrapper().eq(Wrf::getEnginId, enginId)); - String wrfFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName()); - String cmaqFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName()); + String resultFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName()); - // 处理hour大于等于24的情况 - int actualHour = hour % 24; - int daysToAdd = hour / 24; - Date startTimeDate = null; - try { - startTimeDate = DateUtils.parseDate(wrf.getStartTime(),"yyyy-MM-dd HH:mm:ss"); - } catch (ParseException e) { - throw new RuntimeException(e); - } - - // 如果hour大于等于24,需要调整日期 - if (daysToAdd > 0) { - Calendar calendar = Calendar.getInstance(); - calendar.setTime(startTimeDate); - calendar.add(Calendar.DAY_OF_MONTH, daysToAdd); - startTimeDate = calendar.getTime(); - } - - String startTime = DateUtils.formatDate(startTimeDate, "yyyy-MM-dd HH:mm:ss"); - try (NetcdfFile wrfNcFile = getWrfNetcdfFile(wrfFilePath, startTime); - NetcdfFile cmaqNcFile = getCmaqNetcdfFile(cmaqFilePath, startTime)) { + String startTime = wrf.getStartTime().substring(0, 10); + try (NetcdfFile wrfNcFile = getWrfNetcdfFile(resultFilePath, startTime); + NetcdfFile cmaqNcFile = getCmaqNetcdfFile(resultFilePath, startTime)) { DiffusionResultVO diffusionResultVO = new DiffusionResultVO(); - List> values = NcUtil.get2DNCByName(cmaqNcFile, "CO", layer, actualHour); - List> xlats = NcUtil.get2DNCByName(wrfNcFile, "XLAT", layer, actualHour); - List> xlons = NcUtil.get2DNCByName(wrfNcFile, "XLONG", layer, actualHour); + List> values = NcUtil.get2DNCByName(cmaqNcFile, "CO", layer, hour); + List> xlats = NcUtil.get2DNCByName(wrfNcFile, "XLAT", layer, hour); + List> xlons = NcUtil.get2DNCByName(wrfNcFile, "XLONG", layer, hour); getDataInfo(xlats, xlons, values, 1, diffusionResultVO); diffusionResultVO.setSn(values.size()); diffusionResultVO.setWe(values.get(0).size()); @@ -95,8 +65,7 @@ public class DiffusionDataServiceImpl implements DiffusionDataService { } public NetcdfFile getWrfNetcdfFile(String localFilePath, String startTime){ - String newStartTime = DateTimeUtils.standardToFileSafe(startTime); - String wrfNcPath = localFilePath + DiffusionPrefixConstants.WRF_PREFIX + newStartTime; + String wrfNcPath = localFilePath + EventConstants.WRF_OUT_PREFIX + startTime; File file = new File(wrfNcPath); if(!file.exists()){ throw new JeecgBootException("WRF文件不存在,文件地址" + wrfNcPath); @@ -111,8 +80,7 @@ public class DiffusionDataServiceImpl implements DiffusionDataService { } public NetcdfFile getCmaqNetcdfFile(String localFilePath, String startTime){ - String newStartTime = DateTimeUtils.standardToCompactDate(startTime); - String cmaqNcPath = localFilePath + DiffusionPrefixConstants.CMAQ_PREFIX + newStartTime; + String cmaqNcPath = localFilePath + EventConstants.CONC_OUT_PREFIX + startTime; File file = new File(cmaqNcPath); if(!file.exists()){ throw new JeecgBootException("CMAQ文件不存在,文件地址" + cmaqNcPath); diff --git a/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DoseDataServiceImpl.java b/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DoseDataServiceImpl.java new file mode 100644 index 0000000..bc0be1d --- /dev/null +++ b/jeecg-module-event/src/main/java/org/jeecg/diffusion/service/impl/DoseDataServiceImpl.java @@ -0,0 +1,146 @@ +package org.jeecg.diffusion.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jeecg.baseAPI.service.BaseAPIService; +import org.jeecg.baseAPI.utils.DateTimeUtils; +import org.jeecg.common.constant.DiffusionPrefixConstants; +import org.jeecg.common.constant.EventConstants; +import org.jeecg.common.exception.JeecgBootException; +import org.jeecg.common.properties.EventServerProperties; +import org.jeecg.common.util.DateUtils; +import org.jeecg.common.util.NcUtil; +import org.jeecg.diffusion.service.DiffusionDataService; +import org.jeecg.diffusion.service.DoseDataService; +import org.jeecg.diffusion.vo.DiffusionResultVO; +import org.jeecg.diffusion.vo.DoseResultVO; +import org.jeecg.modules.base.entity.Engineering; +import org.jeecg.modules.base.entity.Wrf; +import org.jeecg.modules.base.mapper.CmaqMapper; +import org.jeecg.modules.base.mapper.EngineeringMapper; +import org.jeecg.modules.base.mapper.WrfMapper; +import org.springframework.stereotype.Service; +import ucar.nc2.NetcdfFile; + +import java.io.File; +import java.io.IOException; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Date; +import java.util.List; + + +@Slf4j +@Service +@RequiredArgsConstructor +public class DoseDataServiceImpl implements DoseDataService { + + private final EventServerProperties eventServerProperties; + private final BaseAPIService baseAPIService; + private final WrfMapper wrfMapper; + private final EngineeringMapper engineeringMapper; + + + @Override + public DoseResultVO getDoseResult(String enginId, int hour, int layer) { + Engineering engineering = engineeringMapper.selectById(enginId); + Wrf wrf = wrfMapper.selectOne(new LambdaQueryWrapper().eq(Wrf::getEnginId, enginId)); + String resultFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName()); + + String startTime = wrf.getStartTime().substring(0, 10); + try (NetcdfFile doseNcFile = getDoseNetcdfFile(resultFilePath, startTime)) { + DoseResultVO doseResultVO = new DoseResultVO(); + + List> values = NcUtil.get2DNCByName(doseNcFile, "total_acc", layer, hour); + List> xlats = NcUtil.get2DNCByName(doseNcFile, "lat", layer, hour); + List> xlons = NcUtil.get2DNCByName(doseNcFile, "lon", layer, hour); + getDataInfo(xlats, xlons, values, doseResultVO); + doseResultVO.setSn(values.size()); + doseResultVO.setWe(values.get(0).size()); + + return doseResultVO; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public NetcdfFile getDoseNetcdfFile(String localFilePath, String startTime){ + String ncPath = localFilePath + EventConstants.DOSE_OUT_PREFIX + startTime; + File file = new File(ncPath); + if(!file.exists()){ + throw new JeecgBootException("Dose文件不存在,文件地址" + ncPath); + } + + try{ + return NetcdfFile.open(ncPath); + }catch (IOException e) { + log.error("WRF文件数据处理失败", e); + throw new JeecgBootException("文件读取失败", e); + } + } + + /** + * 封装数据 + * + * @param xlats 纬度二维列表 + * @param xlongs 经度二维列表 + * @param valueHours 数值二维列表 + * @return 处理后的数据列表 + */ + public void getDataInfo(List> xlats, + List> xlongs, + List> valueHours, + DoseResultVO doseResultVO) { + if (xlats == null || xlongs == null || valueHours == null) { + throw new IllegalArgumentException("输入列表不能为null"); + } + + List> resultAll = new ArrayList<>(); + + Double max = -Double.MAX_VALUE; + Double min = Double.MAX_VALUE; + + // 主循环处理 + for (int i = 0; i < xlats.size(); i++) { + List xlongsRow = xlongs.get(i); + List xlatsRow = xlats.get(i); + List valuesRow = valueHours.get(i); + + for (int j = 0; j < xlongsRow.size(); j++) { + double value = valuesRow.get(j); + double longitude = xlongsRow.get(j); + double latitude = xlatsRow.get(j); + + // 更新最大值 + if (value > max) { + max = value; + } + + // 更新最小值 + if (value < min) { + min = value; + } + + // 创建结果条目 + resultAll.add(createResultEntry(longitude, latitude, value)); + } + } + doseResultVO.setDataList(resultAll); + doseResultVO.setMax(max); + doseResultVO.setMin(min); + + } + + /** + * 创建结果条目 + */ + private List createResultEntry(double longitude, double latitude, double value) { + List entry = new ArrayList<>(3); + entry.add(longitude); + entry.add(latitude); + entry.add(value); + return entry; + } +} \ No newline at end of file diff --git a/jeecg-module-event/src/main/java/org/jeecg/diffusion/vo/DoseResultVO.java b/jeecg-module-event/src/main/java/org/jeecg/diffusion/vo/DoseResultVO.java new file mode 100644 index 0000000..77baced --- /dev/null +++ b/jeecg-module-event/src/main/java/org/jeecg/diffusion/vo/DoseResultVO.java @@ -0,0 +1,14 @@ +package org.jeecg.diffusion.vo; + +import lombok.Data; + +import java.util.List; + +@Data +public class DoseResultVO { + private List> dataList; + private double max; + private double min; + private int sn; + private int we; +} diff --git a/jeecg-module-event/src/main/java/org/jeecg/runProcess/controller/TestMain.java b/jeecg-module-event/src/main/java/org/jeecg/runProcess/controller/TestMain.java new file mode 100644 index 0000000..180a667 --- /dev/null +++ b/jeecg-module-event/src/main/java/org/jeecg/runProcess/controller/TestMain.java @@ -0,0 +1,104 @@ +package org.jeecg.runProcess.controller; + +import lombok.extern.slf4j.Slf4j; +import org.jeecg.common.util.ExecutePyUtils; + +import java.util.Arrays; + +@Slf4j +public class TestMain { + + + public static void main(String[] args) { + String[] cmd_dep = { + "python", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\sum_dry_wet_to_nc.py", + "ASIJ", + "D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113", + "out_dep_20181113", + "20181113", + "20181114" + }; + ProcessBuilder depBuilder = new ProcessBuilder(cmd_dep); + ExecutePyUtils.executePythonProcess(depBuilder, "dep"); + + String[] cmd_wrf = { + "python", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py", + "XLAT,XLONG,HGT,U,V,W", + "D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113", + "out_wrf_20181113", + "20181113", + "20181114", + "wrfout_d03_{YYYY-MM-DD}_00_00_00" // 使用占位符 + }; + ProcessBuilder wrfBuilder = new ProcessBuilder(cmd_wrf); + ExecutePyUtils.executePythonProcess(wrfBuilder, "wrf"); + + String[] cmd_cmaq = { + "python", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py", + "CO,ASIJ", + "D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113", + "out_conc_20181113", + "20181113", + "20181114", + "CCTM.CONC.d03.{YYYYMMDD}" // 使用占位符 + }; + ProcessBuilder concBuilder = new ProcessBuilder(cmd_cmaq); + ExecutePyUtils.executePythonProcess(concBuilder, "conc"); + + String[] cmd_metcr03d = { + "python", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py", + "TA,PRES", + "D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113", + "out_metcr03d_20181113", + "20181113", + "20181114", + "METCRO3D_d03_{YYYYMMDD}" // 使用占位符 + }; + ProcessBuilder metcr03dBuilder = new ProcessBuilder(cmd_metcr03d); + ExecutePyUtils.executePythonProcess(metcr03dBuilder, "metcr03d"); + + String[] cmd_emis = { + "python", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py", + "CO,PSI", + "D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113", + "out_emis_20181113", + "20181113", + "20181114", + "emis_{YYYYMMDD}" // 使用占位符 + }; + ProcessBuilder emisBuilder = new ProcessBuilder(cmd_emis); + ExecutePyUtils.executePythonProcess(emisBuilder, "emis"); + + String[] cmd_dose = { + "python", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\convert_conc_to_dose.py", + "20181113", + "20181114", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113", + "D:\\hky_word\\Projectlibrary\\resultFile\\dose\\data_48.xlsx", + "out_dep_20181113", + "out_wrf_20181113", + "out_conc_20181113", + "out_metcr03d_20181113", + "out_emis_20181113", + "out_dose_20181113", + }; + ProcessBuilder doseBuilder = new ProcessBuilder(cmd_dose); + ExecutePyUtils.executePythonProcess(doseBuilder, "emis"); + } + + + + +} diff --git a/jeecg-module-event/src/main/java/org/jeecg/runProcess/service/impl/RunProcessServiceImpl.java b/jeecg-module-event/src/main/java/org/jeecg/runProcess/service/impl/RunProcessServiceImpl.java index 888d511..91cd29d 100644 --- a/jeecg-module-event/src/main/java/org/jeecg/runProcess/service/impl/RunProcessServiceImpl.java +++ b/jeecg-module-event/src/main/java/org/jeecg/runProcess/service/impl/RunProcessServiceImpl.java @@ -5,7 +5,9 @@ import lombok.RequiredArgsConstructor; import org.jeecg.baseAPI.service.BaseAPIService; import org.jeecg.cmaq.service.CmaqService; import org.jeecg.common.api.vo.Result; +import org.jeecg.common.constant.EventConstants; import org.jeecg.common.properties.EventServerProperties; +import org.jeecg.common.util.ExecutePyUtils; import org.jeecg.engineering.service.EngineeringService; import org.jeecg.modules.base.entity.Engineering; import org.jeecg.modules.base.entity.Wrf; @@ -49,7 +51,7 @@ public class RunProcessServiceImpl implements RunProcessService { //生成WRF脚本 genWrfRunShell(allRunPath, scriptsPath, wrf); //运行WRF程序 - Result wrfResult = wrfService.runAllWrf(engineeringId,scriptsPath, workDirPath, resultFilePath); + Result wrfResult = wrfService.runAllWrf(engineeringId,scriptsPath, workDirPath); if(!wrfResult.isSuccess()){ return wrfResult; } @@ -61,37 +63,31 @@ public class RunProcessServiceImpl implements RunProcessService { // return cmaqResult; // } + String sDate = wrf.getStartTime().substring(0, 10); + String eDate = wrf.getEndTime().substring(0, 10); -// //下载wrf文件 -// for (Integer i = 0; i <= sumDay; i++) { -// String newStartTime = DateUtil.format(new Date(startTime.getTime() + oneDaySecs * i), format); -// String ncName = "wrfout_d03_" + newStartTime; -// -// String targetName = "wrfout_d03_" + DateUtil.format(new Date(startTime.getTime() + oneDaySecs * i), "yyyy-MM-dd_HH_mm_ss"); -// sftpAPIService.download(allRunPath + "workdir/WRF/",ncName,resultFilePath + targetName); -// } -// -// //下载fnl文件 -// String fnlStartTime = DateUtil.format(DateUtil.parse(wrf.getStartTime(), "yyyy-MM-dd_HH:mm:ss"), "yyyyMMdd_HH"); -// DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HH"); -// LocalDateTime dateTime = LocalDateTime.parse(fnlStartTime, formatter); -// for (int t = 0; t < sumDay * 24; t= t+6) { -// // 增加t小时 -// LocalDateTime newDateTime = dateTime.plusHours(t); -// String fnlFileName = "fnl_"+ newDateTime.format(formatter) +"_00.grib2"; -// sftpAPIService.download(eventServerProperties.getWeatherDataPath() ,fnlFileName,resultFilePath + fnlFileName); -// } + exeDepPy(sDate, eDate, EventConstants.DEP_VNAMES, + workDirPath + EventConstants.CONC_DIR, resultFilePath, + EventConstants.DEP_OUT_PREFIX, "相加DEP结果数据"); + + exePyScript(sDate, eDate, EventConstants.WRF_VNAMES, + workDirPath + EventConstants.WRF_DIR, resultFilePath, + EventConstants.WRF_OUT_PREFIX, EventConstants.WRF_PATTERN, "合并WRF结果数据"); + + exePyScript(sDate, eDate, EventConstants.CONC_VNAMES, + workDirPath + EventConstants.CONC_DIR, resultFilePath, + EventConstants.CONC_OUT_PREFIX, EventConstants.CONC_PATTERN, "合并CONC结果数据"); + + exePyScript(sDate, eDate, EventConstants.METCR03D_VNAMES, + workDirPath + EventConstants.METCR03D_DIR, resultFilePath, + EventConstants.METCR03D_OUT_PREFIX, EventConstants.METCR03D_PATTERN, "合并METCR03D结果数据"); + + exePyScript(sDate, eDate, EventConstants.EMIS_VNAMES, + workDirPath + EventConstants.EMIS_DIR, resultFilePath, + EventConstants.EMIS_OUT_PREFIX, EventConstants.EMIS_PATTERN, "合并EMIS结果数据"); + + exeDosePy(sDate, eDate, resultFilePath, resultFilePath, scriptsPath,"合并EMIS结果数据"); - //下载cmaq文件 -// for (Integer i = 1; i <= sumDay; i++) { -// String dataString = DateUtil.format(new Date(startTimeSecs + oneDaySecs * i), yyyymdFormat); -// String ncName = "CCTM.CONC.d03."+ dataString; -// String metcr03dName = "METCRO3D_d03_"+ dataString; -// String emisName = "emis_"+ dataString; -// sftpAPIService.download(allRunPath + "workdir/CCTM/",ncName,resultFilePath + ncName); -// sftpAPIService.download(mcipPath,metcr03dName,resultFilePath + metcr03dName); -// sftpAPIService.download(allRunPath + "data/emis/d03/",emisName,resultFilePath + emisName); -// } }catch (Exception e){ e.printStackTrace(); return Result.error(e.getMessage()); @@ -99,6 +95,9 @@ public class RunProcessServiceImpl implements RunProcessService { return Result.OK("运行成功!"); } +// private void executePythonScript(String sDate, String eDate, String vNames, String inputPath, +// String outputPath, String outFileName, String pattern, String desc) { + public Result genWrfRunShell(String allRunPath, String scriptsPath, Wrf wrf) { try { wrfService.genWpsShell(scriptsPath, "run_wps.sh", wrf); @@ -160,4 +159,69 @@ public class RunProcessServiceImpl implements RunProcessService { // return fileName; // } + /** + * 执行Python脚本 + */ + private void exePyScript(String sDate, String eDate, String vNames, String inputPath, + String outputPath, String outFileName, String pattern, String desc) { + String[] command = { + "python", + eventServerProperties.getBaseHome() + File.separator + + eventServerProperties.getPythonPath() + File.separator + EventConstants.MERGE_RESULTS, + vNames, + eventServerProperties.getBaseHome() + File.separator + inputPath, + outputPath, + outFileName, + sDate, + eDate, + pattern // 使用占位符 + }; + ProcessBuilder processBuilder = new ProcessBuilder(command); + ExecutePyUtils.executePythonProcess(processBuilder, desc); + } + + /** + * 执行沉降Python脚本 + */ + private void exeDepPy(String sDate, String eDate, String vNames, String inputPath, + String outputPath, String outFileName, String desc) { + String[] command = { + "python", + eventServerProperties.getBaseHome() + File.separator + + eventServerProperties.getPythonPath() + File.separator + EventConstants.SUM_DRY_WET, + vNames, + eventServerProperties.getBaseHome() + File.separator + inputPath, + outputPath, + outFileName, + sDate, + eDate + }; + ProcessBuilder processBuilder = new ProcessBuilder(command); + ExecutePyUtils.executePythonProcess(processBuilder, desc); + } + + /** + * 执行浓度转剂量Python脚本 + */ + private void exeDosePy(String sDate, String eDate, String inputPath, String outputPath, String gkPath, String desc) { + String[] command = { + "python", + eventServerProperties.getBaseHome() + File.separator + + eventServerProperties.getPythonPath() + File.separator + EventConstants.CONVERT_CONC_TO_DOSE, + sDate, + eDate, + eventServerProperties.getBaseHome() + File.separator + inputPath, + eventServerProperties.getBaseHome() + File.separator + outputPath, + gkPath + File.separator + EventConstants.GK_NAME, + EventConstants.DEP_OUT_PREFIX + sDate, + EventConstants.WRF_OUT_PREFIX + sDate, + EventConstants.CONC_OUT_PREFIX + sDate, + EventConstants.METCR03D_OUT_PREFIX + sDate, + EventConstants.EMIS_OUT_PREFIX + sDate, + EventConstants.DOSE_OUT_PREFIX + sDate + }; + ProcessBuilder processBuilder = new ProcessBuilder(command); + ExecutePyUtils.executePythonProcess(processBuilder, desc); + } + } diff --git a/jeecg-module-event/src/main/java/org/jeecg/wrf/service/WrfService.java b/jeecg-module-event/src/main/java/org/jeecg/wrf/service/WrfService.java index f582fa4..c5f767c 100644 --- a/jeecg-module-event/src/main/java/org/jeecg/wrf/service/WrfService.java +++ b/jeecg-module-event/src/main/java/org/jeecg/wrf/service/WrfService.java @@ -17,6 +17,6 @@ public interface WrfService extends IService { Result updateWrfInfo(RunProcessParamVO paramVO); String genWpsShell(String scriptsPath, String fileName, Wrf wrf) throws IOException; - Result runAllWrf(String engId, String scriptsPath, String resultFilePath, String localFilePath) throws IOException; + Result runAllWrf(String engId, String scriptsPath, String resultFilePath) throws IOException; } diff --git a/jeecg-module-event/src/main/java/org/jeecg/wrf/service/impl/WrfServiceImpl.java b/jeecg-module-event/src/main/java/org/jeecg/wrf/service/impl/WrfServiceImpl.java index be4ae75..e8c5097 100644 --- a/jeecg-module-event/src/main/java/org/jeecg/wrf/service/impl/WrfServiceImpl.java +++ b/jeecg-module-event/src/main/java/org/jeecg/wrf/service/impl/WrfServiceImpl.java @@ -115,7 +115,7 @@ public class WrfServiceImpl extends ServiceImpl implements WrfSe } @Override - public Result runAllWrf(String engId, String scriptsPath, String resultFilePath, String localFilePath) throws IOException { + public Result runAllWrf(String engId, String scriptsPath, String resultFilePath) throws IOException { String cdShRunPath = "cd " + scriptsPath + ";"; boolean wpsSuccess = runProjectWps(cdShRunPath + "chmod +x run_project_wps.sh;./run_project_wps.sh", resultFilePath); if (!wpsSuccess){ diff --git a/jeecg-module-weather/src/main/java/org/jeecg/job/DownloadT1hJob.java b/jeecg-module-weather/src/main/java/org/jeecg/job/DownloadT1hJob.java index 83ebadc..09ff135 100644 --- a/jeecg-module-weather/src/main/java/org/jeecg/job/DownloadT1hJob.java +++ b/jeecg-module-weather/src/main/java/org/jeecg/job/DownloadT1hJob.java @@ -8,6 +8,7 @@ import org.jeecg.common.constant.enums.T1hFilePrefixEnum; import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.T1hDownloadProperties; +import org.jeecg.common.util.ExecutePyUtils; import org.jeecg.modules.base.entity.WeatherData; import org.jeecg.service.WeatherDataService; import org.springframework.scheduling.annotation.Scheduled; @@ -15,7 +16,6 @@ import org.springframework.util.StopWatch; import org.springframework.web.bind.annotation.RestController; import java.io.*; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -25,9 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -98,7 +95,7 @@ public class DownloadT1hJob { "--base_date", baseTime ); - executePythonProcess(processBuilder, "文件合并"); + ExecutePyUtils.executePythonProcess(processBuilder, "文件合并"); } private void saveWeatherData(){ @@ -130,7 +127,7 @@ public class DownloadT1hJob { log.info("执行Python脚本: {}", Arrays.toString(command)); ProcessBuilder processBuilder = new ProcessBuilder(command); - executePythonProcess(processBuilder, + ExecutePyUtils.executePythonProcess(processBuilder, String.format("元素%s预报时长%s下载", element, forecastHours)); } @@ -150,79 +147,6 @@ public class DownloadT1hJob { }; } - /** - * 执行Python进程 - */ - private void executePythonProcess(ProcessBuilder processBuilder, String processDescription) { - StopWatch processWatch = new StopWatch(); - processWatch.start(); - - Process process = null; - try { - process = processBuilder.start(); - - // 异步处理输出流 - CompletableFuture outputFuture = readStreamAsync(process.getInputStream(), "OUTPUT"); - CompletableFuture errorFuture = readStreamAsync(process.getErrorStream(), "ERROR"); - - // 等待进程完成(带超时) - boolean finished = process.waitFor(PROCESS_TIMEOUT_SECONDS, TimeUnit.SECONDS); - if (!finished) { - process.destroyForcibly(); - throw new RuntimeException(processDescription + "脚本执行超时"); - } - - // 等待输出处理完成 - CompletableFuture.allOf(outputFuture, errorFuture) - .get(10, TimeUnit.SECONDS); - - int exitCode = process.exitValue(); - processWatch.stop(); - - if (exitCode == 0) { - log.info("{}脚本执行完成,耗时: {} 毫秒", - processDescription, processWatch.getTotalTimeMillis()); - } else { - throw new RuntimeException(processDescription + - "脚本执行失败,退出码: " + exitCode); - } - - } catch (TimeoutException e) { - throw new RuntimeException(processDescription + "脚本执行超时", e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(processDescription + "脚本执行被中断", e); - } catch (Exception e) { - throw new RuntimeException(processDescription + "脚本执行失败", e); - } finally { - if (process != null) { - process.destroy(); - } - } - } - - /** - * 异步读取流 - */ - private CompletableFuture readStreamAsync(InputStream inputStream, String type) { - return CompletableFuture.runAsync(() -> { - try (BufferedReader reader = new BufferedReader( - new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { - - String line; - while ((line = reader.readLine()) != null) { - if ("ERROR".equals(type)) { - log.error("[Python {}] {}", type, line); - } else { - log.info("[Python {}] {}", type, line); - } - } - } catch (IOException e) { - log.error("处理Python{}流失败", type, e); - } - }); - } - /** * 生成预报时间列表 */