剂量功能

This commit is contained in:
hekaiyu 2025-11-14 09:30:00 +08:00
parent 85f3a3fe8a
commit d276b60d4d
15 changed files with 544 additions and 184 deletions

View File

@ -3,4 +3,34 @@ package org.jeecg.common.constant;
public class EventConstants { public class EventConstants {
public static final String WRF_DIR = "wrf"; public static final String WRF_DIR = "wrf";
public static final String CONC_DIR = "cctm";
public static final String METCR03D_DIR = "mcip";
public static final String EMIS_DIR = "data/emis/d03";
public static final String GK_NAME = "data_48.xlsx";
public static final String MERGE_RESULTS = "merge_results_to_nc.py";
public static final String SUM_DRY_WET = "sum_dry_wet_to_nc.py";
public static final String CONVERT_CONC_TO_DOSE = "convert_conc_to_dose.py";
public static final String DEP_VNAMES = "ASIJ";
public static final String WRF_VNAMES = "XLAT,XLONG,HGT,U,V,W";
public static final String CONC_VNAMES = "CO,ASIJ";
public static final String METCR03D_VNAMES = "TA,PRES";
public static final String EMIS_VNAMES = "CO,PSI";
public static final String WRF_PATTERN = "wrfout_d03_{YYYY-MM-DD}_00_00_00";
public static final String CONC_PATTERN = "CCTM.CONC.d03.{YYYYMMDD";
public static final String METCR03D_PATTERN = "METCR03D_d03_{YYYYMMDD}";
public static final String EMIS_PATTERN = "emis_{YYYYMMDD}";
public static final String DEP_OUT_PREFIX = "out_dep_";
public static final String WRF_OUT_PREFIX = "out_wrf_";
public static final String CONC_OUT_PREFIX = "out_conc_";
public static final String METCR03D_OUT_PREFIX = "out_metcr03d_";
public static final String EMIS_OUT_PREFIX = "out_emis_";
public static final String DOSE_OUT_PREFIX = "out_dose_";
} }

View File

@ -1,29 +0,0 @@
package org.jeecg.common.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
@Data
@Component
@ConfigurationProperties(prefix = "diffusion")
public class DiffusionProperties {
/**
* 扩散环境目录
*/
private String diffusionPath;
/**
* wrf数据目录
*/
private String wrfPath;
/**
* cmaq数据目录
*/
private String cmaqPath;
}

View File

@ -0,0 +1,91 @@
package org.jeecg.common.util;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.StopWatch;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Slf4j
public class ExecutePyUtils {
private static final int PROCESS_TIMEOUT_SECONDS = 3600; // 30分钟超时
/**
* 执行Python进程
*/
public static void executePythonProcess(ProcessBuilder processBuilder, String processDescription) {
StopWatch processWatch = new StopWatch();
processWatch.start();
Process process = null;
try {
process = processBuilder.start();
// 异步处理输出流
CompletableFuture<Void> outputFuture = readStreamAsync(process.getInputStream(), "OUTPUT");
CompletableFuture<Void> errorFuture = readStreamAsync(process.getErrorStream(), "ERROR");
// 等待进程完成带超时
boolean finished = process.waitFor(PROCESS_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException(processDescription + "脚本执行超时");
}
// 等待输出处理完成
CompletableFuture.allOf(outputFuture, errorFuture)
.get(10, TimeUnit.SECONDS);
int exitCode = process.exitValue();
processWatch.stop();
if (exitCode == 0) {
log.info("{}脚本执行完成,耗时: {} 毫秒",
processDescription, processWatch.getTotalTimeMillis());
} else {
throw new RuntimeException(processDescription +
"脚本执行失败,退出码: " + exitCode);
}
} catch (TimeoutException e) {
throw new RuntimeException(processDescription + "脚本执行超时", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(processDescription + "脚本执行被中断", e);
} catch (Exception e) {
throw new RuntimeException(processDescription + "脚本执行失败", e);
} finally {
if (process != null) {
process.destroy();
}
}
}
/**
* 异步读取流
*/
private static CompletableFuture<Void> readStreamAsync(InputStream inputStream, String type) {
return CompletableFuture.runAsync(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if ("ERROR".equals(type)) {
log.error("[Python {}] {}", type, line);
} else {
log.info("[Python {}] {}", type, line);
}
}
} catch (IOException e) {
log.error("处理Python{}流失败", type, e);
}
});
}
}

View File

@ -155,7 +155,16 @@ public final class NcUtil {
private static List<List<Double>> process2DData(Array data, Index index, int[] shape, String name, private static List<List<Double>> process2DData(Array data, Index index, int[] shape, String name,
Integer layer, Integer hour) { Integer layer, Integer hour) {
List<List<Double>> resultAll; List<List<Double>> resultAll;
if (shape.length == 3) { if (shape.length == 2) {
resultAll = new ArrayList<>(shape[0]);
for (int j = 0; j < shape[0]; j++) {
List<Double> strings = new ArrayList<>(shape[1]);
for (int k = 0; k < shape[1]; k++) {
strings.add(processValue(data.getDouble(index.set(j, k)), name));
}
resultAll.add(strings);
}
}else if (shape.length == 3) {
resultAll = new ArrayList<>(shape[1]); resultAll = new ArrayList<>(shape[1]);
for (int j = 0; j < shape[1]; j++) { for (int j = 0; j < shape[1]; j++) {
List<Double> strings = new ArrayList<>(shape[2]); List<Double> strings = new ArrayList<>(shape[2]);

View File

@ -2,7 +2,6 @@ package org.jeecg.baseAPI.service.impl;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import org.jeecg.baseAPI.service.BaseAPIService; import org.jeecg.baseAPI.service.BaseAPIService;
import org.jeecg.common.properties.DiffusionProperties;
import org.jeecg.common.properties.EventServerProperties; import org.jeecg.common.properties.EventServerProperties;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;

View File

@ -0,0 +1,31 @@
package org.jeecg.diffusion.controller;
import io.swagger.v3.oas.annotations.Operation;
import lombok.RequiredArgsConstructor;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.aspect.annotation.AutoLog;
import org.jeecg.diffusion.service.DiffusionDataService;
import org.jeecg.diffusion.service.DoseDataService;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Validated
@RestController
@RequestMapping("doseData")
@RequiredArgsConstructor
public class DoseDataController {
private final DoseDataService doseDataService;
@AutoLog(value = "查询剂量数据")
@Operation(summary = "查询剂量数据")
@GetMapping("getDoseResult")
public Result<?> getDoseResult(String enginId, int hour, int layer){
return Result.ok(doseDataService.getDoseResult(enginId, hour, layer));
}
}

View File

@ -0,0 +1,9 @@
package org.jeecg.diffusion.service;
import org.jeecg.diffusion.vo.DoseResultVO;
public interface DoseDataService {
DoseResultVO getDoseResult(String enginId, int hour, int layer);
}

View File

@ -1,35 +1,25 @@
package org.jeecg.diffusion.service.impl; package org.jeecg.diffusion.service.impl;
import cn.hutool.core.date.DateUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jeecg.baseAPI.service.BaseAPIService; import org.jeecg.baseAPI.service.BaseAPIService;
import org.jeecg.baseAPI.utils.DateTimeUtils; import org.jeecg.baseAPI.utils.DateTimeUtils;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.aspect.annotation.AutoLog;
import org.jeecg.common.constant.DiffusionPrefixConstants; import org.jeecg.common.constant.DiffusionPrefixConstants;
import org.jeecg.common.constant.EventConstants;
import org.jeecg.common.exception.JeecgBootException; import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.properties.DiffusionProperties;
import org.jeecg.common.properties.EventServerProperties; import org.jeecg.common.properties.EventServerProperties;
import org.jeecg.common.properties.SystemStorageProperties;
import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.NcUtil; import org.jeecg.common.util.NcUtil;
import org.jeecg.diffusion.service.DiffusionDataService; import org.jeecg.diffusion.service.DiffusionDataService;
import org.jeecg.diffusion.vo.DiffusionResultVO; import org.jeecg.diffusion.vo.DiffusionResultVO;
import org.jeecg.modules.base.entity.Cmaq;
import org.jeecg.modules.base.entity.Engineering; import org.jeecg.modules.base.entity.Engineering;
import org.jeecg.modules.base.entity.Wrf; import org.jeecg.modules.base.entity.Wrf;
import org.jeecg.modules.base.mapper.CmaqMapper; import org.jeecg.modules.base.mapper.CmaqMapper;
import org.jeecg.modules.base.mapper.EngineeringMapper; import org.jeecg.modules.base.mapper.EngineeringMapper;
import org.jeecg.modules.base.mapper.WrfMapper; import org.jeecg.modules.base.mapper.WrfMapper;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import ucar.nc2.NetcdfFile; import ucar.nc2.NetcdfFile;
import ucar.nc2.dataset.NetcdfDataset;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -48,42 +38,22 @@ public class DiffusionDataServiceImpl implements DiffusionDataService {
private final EventServerProperties eventServerProperties; private final EventServerProperties eventServerProperties;
private final BaseAPIService baseAPIService; private final BaseAPIService baseAPIService;
private final WrfMapper wrfMapper; private final WrfMapper wrfMapper;
private final CmaqMapper cmaqMapper;
private final EngineeringMapper engineeringMapper; private final EngineeringMapper engineeringMapper;
@Override @Override
public DiffusionResultVO getDiffusionResult(String enginId, int hour, int layer) { public DiffusionResultVO getDiffusionResult(String enginId, int hour, int layer) {
Engineering engineering = engineeringMapper.selectById(enginId); Engineering engineering = engineeringMapper.selectById(enginId);
Wrf wrf = wrfMapper.selectOne(new LambdaQueryWrapper<Wrf>().eq(Wrf::getEnginId, enginId)); Wrf wrf = wrfMapper.selectOne(new LambdaQueryWrapper<Wrf>().eq(Wrf::getEnginId, enginId));
String wrfFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName()); String resultFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName());
String cmaqFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName());
// 处理hour大于等于24的情况 String startTime = wrf.getStartTime().substring(0, 10);
int actualHour = hour % 24; try (NetcdfFile wrfNcFile = getWrfNetcdfFile(resultFilePath, startTime);
int daysToAdd = hour / 24; NetcdfFile cmaqNcFile = getCmaqNetcdfFile(resultFilePath, startTime)) {
Date startTimeDate = null;
try {
startTimeDate = DateUtils.parseDate(wrf.getStartTime(),"yyyy-MM-dd HH:mm:ss");
} catch (ParseException e) {
throw new RuntimeException(e);
}
// 如果hour大于等于24需要调整日期
if (daysToAdd > 0) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(startTimeDate);
calendar.add(Calendar.DAY_OF_MONTH, daysToAdd);
startTimeDate = calendar.getTime();
}
String startTime = DateUtils.formatDate(startTimeDate, "yyyy-MM-dd HH:mm:ss");
try (NetcdfFile wrfNcFile = getWrfNetcdfFile(wrfFilePath, startTime);
NetcdfFile cmaqNcFile = getCmaqNetcdfFile(cmaqFilePath, startTime)) {
DiffusionResultVO diffusionResultVO = new DiffusionResultVO(); DiffusionResultVO diffusionResultVO = new DiffusionResultVO();
List<List<Double>> values = NcUtil.get2DNCByName(cmaqNcFile, "CO", layer, actualHour); List<List<Double>> values = NcUtil.get2DNCByName(cmaqNcFile, "CO", layer, hour);
List<List<Double>> xlats = NcUtil.get2DNCByName(wrfNcFile, "XLAT", layer, actualHour); List<List<Double>> xlats = NcUtil.get2DNCByName(wrfNcFile, "XLAT", layer, hour);
List<List<Double>> xlons = NcUtil.get2DNCByName(wrfNcFile, "XLONG", layer, actualHour); List<List<Double>> xlons = NcUtil.get2DNCByName(wrfNcFile, "XLONG", layer, hour);
getDataInfo(xlats, xlons, values, 1, diffusionResultVO); getDataInfo(xlats, xlons, values, 1, diffusionResultVO);
diffusionResultVO.setSn(values.size()); diffusionResultVO.setSn(values.size());
diffusionResultVO.setWe(values.get(0).size()); diffusionResultVO.setWe(values.get(0).size());
@ -95,8 +65,7 @@ public class DiffusionDataServiceImpl implements DiffusionDataService {
} }
public NetcdfFile getWrfNetcdfFile(String localFilePath, String startTime){ public NetcdfFile getWrfNetcdfFile(String localFilePath, String startTime){
String newStartTime = DateTimeUtils.standardToFileSafe(startTime); String wrfNcPath = localFilePath + EventConstants.WRF_OUT_PREFIX + startTime;
String wrfNcPath = localFilePath + DiffusionPrefixConstants.WRF_PREFIX + newStartTime;
File file = new File(wrfNcPath); File file = new File(wrfNcPath);
if(!file.exists()){ if(!file.exists()){
throw new JeecgBootException("WRF文件不存在文件地址" + wrfNcPath); throw new JeecgBootException("WRF文件不存在文件地址" + wrfNcPath);
@ -111,8 +80,7 @@ public class DiffusionDataServiceImpl implements DiffusionDataService {
} }
public NetcdfFile getCmaqNetcdfFile(String localFilePath, String startTime){ public NetcdfFile getCmaqNetcdfFile(String localFilePath, String startTime){
String newStartTime = DateTimeUtils.standardToCompactDate(startTime); String cmaqNcPath = localFilePath + EventConstants.CONC_OUT_PREFIX + startTime;
String cmaqNcPath = localFilePath + DiffusionPrefixConstants.CMAQ_PREFIX + newStartTime;
File file = new File(cmaqNcPath); File file = new File(cmaqNcPath);
if(!file.exists()){ if(!file.exists()){
throw new JeecgBootException("CMAQ文件不存在文件地址" + cmaqNcPath); throw new JeecgBootException("CMAQ文件不存在文件地址" + cmaqNcPath);

View File

@ -0,0 +1,146 @@
package org.jeecg.diffusion.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.baseAPI.service.BaseAPIService;
import org.jeecg.baseAPI.utils.DateTimeUtils;
import org.jeecg.common.constant.DiffusionPrefixConstants;
import org.jeecg.common.constant.EventConstants;
import org.jeecg.common.exception.JeecgBootException;
import org.jeecg.common.properties.EventServerProperties;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.NcUtil;
import org.jeecg.diffusion.service.DiffusionDataService;
import org.jeecg.diffusion.service.DoseDataService;
import org.jeecg.diffusion.vo.DiffusionResultVO;
import org.jeecg.diffusion.vo.DoseResultVO;
import org.jeecg.modules.base.entity.Engineering;
import org.jeecg.modules.base.entity.Wrf;
import org.jeecg.modules.base.mapper.CmaqMapper;
import org.jeecg.modules.base.mapper.EngineeringMapper;
import org.jeecg.modules.base.mapper.WrfMapper;
import org.springframework.stereotype.Service;
import ucar.nc2.NetcdfFile;
import java.io.File;
import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
@Slf4j
@Service
@RequiredArgsConstructor
public class DoseDataServiceImpl implements DoseDataService {
private final EventServerProperties eventServerProperties;
private final BaseAPIService baseAPIService;
private final WrfMapper wrfMapper;
private final EngineeringMapper engineeringMapper;
@Override
public DoseResultVO getDoseResult(String enginId, int hour, int layer) {
Engineering engineering = engineeringMapper.selectById(enginId);
Wrf wrf = wrfMapper.selectOne(new LambdaQueryWrapper<Wrf>().eq(Wrf::getEnginId, enginId));
String resultFilePath = baseAPIService.buildEngineeringFilePath(eventServerProperties.getResultFilePrefix(), engineering.getCreateBy(), engineering.getEngineeringName());
String startTime = wrf.getStartTime().substring(0, 10);
try (NetcdfFile doseNcFile = getDoseNetcdfFile(resultFilePath, startTime)) {
DoseResultVO doseResultVO = new DoseResultVO();
List<List<Double>> values = NcUtil.get2DNCByName(doseNcFile, "total_acc", layer, hour);
List<List<Double>> xlats = NcUtil.get2DNCByName(doseNcFile, "lat", layer, hour);
List<List<Double>> xlons = NcUtil.get2DNCByName(doseNcFile, "lon", layer, hour);
getDataInfo(xlats, xlons, values, doseResultVO);
doseResultVO.setSn(values.size());
doseResultVO.setWe(values.get(0).size());
return doseResultVO;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public NetcdfFile getDoseNetcdfFile(String localFilePath, String startTime){
String ncPath = localFilePath + EventConstants.DOSE_OUT_PREFIX + startTime;
File file = new File(ncPath);
if(!file.exists()){
throw new JeecgBootException("Dose文件不存在文件地址" + ncPath);
}
try{
return NetcdfFile.open(ncPath);
}catch (IOException e) {
log.error("WRF文件数据处理失败", e);
throw new JeecgBootException("文件读取失败", e);
}
}
/**
* 封装数据
*
* @param xlats 纬度二维列表
* @param xlongs 经度二维列表
* @param valueHours 数值二维列表
* @return 处理后的数据列表
*/
public void getDataInfo(List<List<Double>> xlats,
List<List<Double>> xlongs,
List<List<Double>> valueHours,
DoseResultVO doseResultVO) {
if (xlats == null || xlongs == null || valueHours == null) {
throw new IllegalArgumentException("输入列表不能为null");
}
List<List<Double>> resultAll = new ArrayList<>();
Double max = -Double.MAX_VALUE;
Double min = Double.MAX_VALUE;
// 主循环处理
for (int i = 0; i < xlats.size(); i++) {
List<Double> xlongsRow = xlongs.get(i);
List<Double> xlatsRow = xlats.get(i);
List<Double> valuesRow = valueHours.get(i);
for (int j = 0; j < xlongsRow.size(); j++) {
double value = valuesRow.get(j);
double longitude = xlongsRow.get(j);
double latitude = xlatsRow.get(j);
// 更新最大值
if (value > max) {
max = value;
}
// 更新最小值
if (value < min) {
min = value;
}
// 创建结果条目
resultAll.add(createResultEntry(longitude, latitude, value));
}
}
doseResultVO.setDataList(resultAll);
doseResultVO.setMax(max);
doseResultVO.setMin(min);
}
/**
* 创建结果条目
*/
private List<Double> createResultEntry(double longitude, double latitude, double value) {
List<Double> entry = new ArrayList<>(3);
entry.add(longitude);
entry.add(latitude);
entry.add(value);
return entry;
}
}

View File

@ -0,0 +1,14 @@
package org.jeecg.diffusion.vo;
import lombok.Data;
import java.util.List;
@Data
public class DoseResultVO {
private List<List<Double>> dataList;
private double max;
private double min;
private int sn;
private int we;
}

View File

@ -0,0 +1,104 @@
package org.jeecg.runProcess.controller;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.ExecutePyUtils;
import java.util.Arrays;
@Slf4j
public class TestMain {
public static void main(String[] args) {
String[] cmd_dep = {
"python",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\sum_dry_wet_to_nc.py",
"ASIJ",
"D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113",
"out_dep_20181113",
"20181113",
"20181114"
};
ProcessBuilder depBuilder = new ProcessBuilder(cmd_dep);
ExecutePyUtils.executePythonProcess(depBuilder, "dep");
String[] cmd_wrf = {
"python",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py",
"XLAT,XLONG,HGT,U,V,W",
"D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113",
"out_wrf_20181113",
"20181113",
"20181114",
"wrfout_d03_{YYYY-MM-DD}_00_00_00" // 使用占位符
};
ProcessBuilder wrfBuilder = new ProcessBuilder(cmd_wrf);
ExecutePyUtils.executePythonProcess(wrfBuilder, "wrf");
String[] cmd_cmaq = {
"python",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py",
"CO,ASIJ",
"D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113",
"out_conc_20181113",
"20181113",
"20181114",
"CCTM.CONC.d03.{YYYYMMDD}" // 使用占位符
};
ProcessBuilder concBuilder = new ProcessBuilder(cmd_cmaq);
ExecutePyUtils.executePythonProcess(concBuilder, "conc");
String[] cmd_metcr03d = {
"python",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py",
"TA,PRES",
"D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113",
"out_metcr03d_20181113",
"20181113",
"20181114",
"METCRO3D_d03_{YYYYMMDD}" // 使用占位符
};
ProcessBuilder metcr03dBuilder = new ProcessBuilder(cmd_metcr03d);
ExecutePyUtils.executePythonProcess(metcr03dBuilder, "metcr03d");
String[] cmd_emis = {
"python",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\merge_results_to_nc.py",
"CO,PSI",
"D:\\hky_word\\Projectlibrary\\resultFile\\admin\\1113",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113",
"out_emis_20181113",
"20181113",
"20181114",
"emis_{YYYYMMDD}" // 使用占位符
};
ProcessBuilder emisBuilder = new ProcessBuilder(cmd_emis);
ExecutePyUtils.executePythonProcess(emisBuilder, "emis");
String[] cmd_dose = {
"python",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\py\\convert_conc_to_dose.py",
"20181113",
"20181114",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\1113",
"D:\\hky_word\\Projectlibrary\\resultFile\\dose\\data_48.xlsx",
"out_dep_20181113",
"out_wrf_20181113",
"out_conc_20181113",
"out_metcr03d_20181113",
"out_emis_20181113",
"out_dose_20181113",
};
ProcessBuilder doseBuilder = new ProcessBuilder(cmd_dose);
ExecutePyUtils.executePythonProcess(doseBuilder, "emis");
}
}

View File

@ -5,7 +5,9 @@ import lombok.RequiredArgsConstructor;
import org.jeecg.baseAPI.service.BaseAPIService; import org.jeecg.baseAPI.service.BaseAPIService;
import org.jeecg.cmaq.service.CmaqService; import org.jeecg.cmaq.service.CmaqService;
import org.jeecg.common.api.vo.Result; import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.EventConstants;
import org.jeecg.common.properties.EventServerProperties; import org.jeecg.common.properties.EventServerProperties;
import org.jeecg.common.util.ExecutePyUtils;
import org.jeecg.engineering.service.EngineeringService; import org.jeecg.engineering.service.EngineeringService;
import org.jeecg.modules.base.entity.Engineering; import org.jeecg.modules.base.entity.Engineering;
import org.jeecg.modules.base.entity.Wrf; import org.jeecg.modules.base.entity.Wrf;
@ -49,7 +51,7 @@ public class RunProcessServiceImpl implements RunProcessService {
//生成WRF脚本 //生成WRF脚本
genWrfRunShell(allRunPath, scriptsPath, wrf); genWrfRunShell(allRunPath, scriptsPath, wrf);
//运行WRF程序 //运行WRF程序
Result<String> wrfResult = wrfService.runAllWrf(engineeringId,scriptsPath, workDirPath, resultFilePath); Result<String> wrfResult = wrfService.runAllWrf(engineeringId,scriptsPath, workDirPath);
if(!wrfResult.isSuccess()){ if(!wrfResult.isSuccess()){
return wrfResult; return wrfResult;
} }
@ -61,37 +63,31 @@ public class RunProcessServiceImpl implements RunProcessService {
// return cmaqResult; // return cmaqResult;
// } // }
String sDate = wrf.getStartTime().substring(0, 10);
String eDate = wrf.getEndTime().substring(0, 10);
// //下载wrf文件 exeDepPy(sDate, eDate, EventConstants.DEP_VNAMES,
// for (Integer i = 0; i <= sumDay; i++) { workDirPath + EventConstants.CONC_DIR, resultFilePath,
// String newStartTime = DateUtil.format(new Date(startTime.getTime() + oneDaySecs * i), format); EventConstants.DEP_OUT_PREFIX, "相加DEP结果数据");
// String ncName = "wrfout_d03_" + newStartTime;
// exePyScript(sDate, eDate, EventConstants.WRF_VNAMES,
// String targetName = "wrfout_d03_" + DateUtil.format(new Date(startTime.getTime() + oneDaySecs * i), "yyyy-MM-dd_HH_mm_ss"); workDirPath + EventConstants.WRF_DIR, resultFilePath,
// sftpAPIService.download(allRunPath + "workdir/WRF/",ncName,resultFilePath + targetName); EventConstants.WRF_OUT_PREFIX, EventConstants.WRF_PATTERN, "合并WRF结果数据");
// }
// exePyScript(sDate, eDate, EventConstants.CONC_VNAMES,
// //下载fnl文件 workDirPath + EventConstants.CONC_DIR, resultFilePath,
// String fnlStartTime = DateUtil.format(DateUtil.parse(wrf.getStartTime(), "yyyy-MM-dd_HH:mm:ss"), "yyyyMMdd_HH"); EventConstants.CONC_OUT_PREFIX, EventConstants.CONC_PATTERN, "合并CONC结果数据");
// DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd_HH");
// LocalDateTime dateTime = LocalDateTime.parse(fnlStartTime, formatter); exePyScript(sDate, eDate, EventConstants.METCR03D_VNAMES,
// for (int t = 0; t < sumDay * 24; t= t+6) { workDirPath + EventConstants.METCR03D_DIR, resultFilePath,
// // 增加t小时 EventConstants.METCR03D_OUT_PREFIX, EventConstants.METCR03D_PATTERN, "合并METCR03D结果数据");
// LocalDateTime newDateTime = dateTime.plusHours(t);
// String fnlFileName = "fnl_"+ newDateTime.format(formatter) +"_00.grib2"; exePyScript(sDate, eDate, EventConstants.EMIS_VNAMES,
// sftpAPIService.download(eventServerProperties.getWeatherDataPath() ,fnlFileName,resultFilePath + fnlFileName); workDirPath + EventConstants.EMIS_DIR, resultFilePath,
// } EventConstants.EMIS_OUT_PREFIX, EventConstants.EMIS_PATTERN, "合并EMIS结果数据");
exeDosePy(sDate, eDate, resultFilePath, resultFilePath, scriptsPath,"合并EMIS结果数据");
//下载cmaq文件
// for (Integer i = 1; i <= sumDay; i++) {
// String dataString = DateUtil.format(new Date(startTimeSecs + oneDaySecs * i), yyyymdFormat);
// String ncName = "CCTM.CONC.d03."+ dataString;
// String metcr03dName = "METCRO3D_d03_"+ dataString;
// String emisName = "emis_"+ dataString;
// sftpAPIService.download(allRunPath + "workdir/CCTM/",ncName,resultFilePath + ncName);
// sftpAPIService.download(mcipPath,metcr03dName,resultFilePath + metcr03dName);
// sftpAPIService.download(allRunPath + "data/emis/d03/",emisName,resultFilePath + emisName);
// }
}catch (Exception e){ }catch (Exception e){
e.printStackTrace(); e.printStackTrace();
return Result.error(e.getMessage()); return Result.error(e.getMessage());
@ -99,6 +95,9 @@ public class RunProcessServiceImpl implements RunProcessService {
return Result.OK("运行成功!"); return Result.OK("运行成功!");
} }
// private void executePythonScript(String sDate, String eDate, String vNames, String inputPath,
// String outputPath, String outFileName, String pattern, String desc) {
public Result<String> genWrfRunShell(String allRunPath, String scriptsPath, Wrf wrf) { public Result<String> genWrfRunShell(String allRunPath, String scriptsPath, Wrf wrf) {
try { try {
wrfService.genWpsShell(scriptsPath, "run_wps.sh", wrf); wrfService.genWpsShell(scriptsPath, "run_wps.sh", wrf);
@ -160,4 +159,69 @@ public class RunProcessServiceImpl implements RunProcessService {
// return fileName; // return fileName;
// } // }
/**
* 执行Python脚本
*/
private void exePyScript(String sDate, String eDate, String vNames, String inputPath,
String outputPath, String outFileName, String pattern, String desc) {
String[] command = {
"python",
eventServerProperties.getBaseHome() + File.separator +
eventServerProperties.getPythonPath() + File.separator + EventConstants.MERGE_RESULTS,
vNames,
eventServerProperties.getBaseHome() + File.separator + inputPath,
outputPath,
outFileName,
sDate,
eDate,
pattern // 使用占位符
};
ProcessBuilder processBuilder = new ProcessBuilder(command);
ExecutePyUtils.executePythonProcess(processBuilder, desc);
}
/**
* 执行沉降Python脚本
*/
private void exeDepPy(String sDate, String eDate, String vNames, String inputPath,
String outputPath, String outFileName, String desc) {
String[] command = {
"python",
eventServerProperties.getBaseHome() + File.separator +
eventServerProperties.getPythonPath() + File.separator + EventConstants.SUM_DRY_WET,
vNames,
eventServerProperties.getBaseHome() + File.separator + inputPath,
outputPath,
outFileName,
sDate,
eDate
};
ProcessBuilder processBuilder = new ProcessBuilder(command);
ExecutePyUtils.executePythonProcess(processBuilder, desc);
}
/**
* 执行浓度转剂量Python脚本
*/
private void exeDosePy(String sDate, String eDate, String inputPath, String outputPath, String gkPath, String desc) {
String[] command = {
"python",
eventServerProperties.getBaseHome() + File.separator +
eventServerProperties.getPythonPath() + File.separator + EventConstants.CONVERT_CONC_TO_DOSE,
sDate,
eDate,
eventServerProperties.getBaseHome() + File.separator + inputPath,
eventServerProperties.getBaseHome() + File.separator + outputPath,
gkPath + File.separator + EventConstants.GK_NAME,
EventConstants.DEP_OUT_PREFIX + sDate,
EventConstants.WRF_OUT_PREFIX + sDate,
EventConstants.CONC_OUT_PREFIX + sDate,
EventConstants.METCR03D_OUT_PREFIX + sDate,
EventConstants.EMIS_OUT_PREFIX + sDate,
EventConstants.DOSE_OUT_PREFIX + sDate
};
ProcessBuilder processBuilder = new ProcessBuilder(command);
ExecutePyUtils.executePythonProcess(processBuilder, desc);
}
} }

View File

@ -17,6 +17,6 @@ public interface WrfService extends IService<Wrf> {
Result<String> updateWrfInfo(RunProcessParamVO paramVO); Result<String> updateWrfInfo(RunProcessParamVO paramVO);
String genWpsShell(String scriptsPath, String fileName, Wrf wrf) throws IOException; String genWpsShell(String scriptsPath, String fileName, Wrf wrf) throws IOException;
Result<String> runAllWrf(String engId, String scriptsPath, String resultFilePath, String localFilePath) throws IOException; Result<String> runAllWrf(String engId, String scriptsPath, String resultFilePath) throws IOException;
} }

View File

@ -115,7 +115,7 @@ public class WrfServiceImpl extends ServiceImpl<WrfMapper, Wrf> implements WrfSe
} }
@Override @Override
public Result<String> runAllWrf(String engId, String scriptsPath, String resultFilePath, String localFilePath) throws IOException { public Result<String> runAllWrf(String engId, String scriptsPath, String resultFilePath) throws IOException {
String cdShRunPath = "cd " + scriptsPath + ";"; String cdShRunPath = "cd " + scriptsPath + ";";
boolean wpsSuccess = runProjectWps(cdShRunPath + "chmod +x run_project_wps.sh;./run_project_wps.sh", resultFilePath); boolean wpsSuccess = runProjectWps(cdShRunPath + "chmod +x run_project_wps.sh;./run_project_wps.sh", resultFilePath);
if (!wpsSuccess){ if (!wpsSuccess){

View File

@ -8,6 +8,7 @@ import org.jeecg.common.constant.enums.T1hFilePrefixEnum;
import org.jeecg.common.constant.enums.WeatherDataSourceEnum; import org.jeecg.common.constant.enums.WeatherDataSourceEnum;
import org.jeecg.common.properties.SystemStorageProperties; import org.jeecg.common.properties.SystemStorageProperties;
import org.jeecg.common.properties.T1hDownloadProperties; import org.jeecg.common.properties.T1hDownloadProperties;
import org.jeecg.common.util.ExecutePyUtils;
import org.jeecg.modules.base.entity.WeatherData; import org.jeecg.modules.base.entity.WeatherData;
import org.jeecg.service.WeatherDataService; import org.jeecg.service.WeatherDataService;
import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.annotation.Scheduled;
@ -15,7 +16,6 @@ import org.springframework.util.StopWatch;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import java.io.*; import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -25,9 +25,6 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Date; import java.util.Date;
import java.util.List; import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.Stream; import java.util.stream.Stream;
@ -98,7 +95,7 @@ public class DownloadT1hJob {
"--base_date", baseTime "--base_date", baseTime
); );
executePythonProcess(processBuilder, "文件合并"); ExecutePyUtils.executePythonProcess(processBuilder, "文件合并");
} }
private void saveWeatherData(){ private void saveWeatherData(){
@ -130,7 +127,7 @@ public class DownloadT1hJob {
log.info("执行Python脚本: {}", Arrays.toString(command)); log.info("执行Python脚本: {}", Arrays.toString(command));
ProcessBuilder processBuilder = new ProcessBuilder(command); ProcessBuilder processBuilder = new ProcessBuilder(command);
executePythonProcess(processBuilder, ExecutePyUtils.executePythonProcess(processBuilder,
String.format("元素%s预报时长%s下载", element, forecastHours)); String.format("元素%s预报时长%s下载", element, forecastHours));
} }
@ -150,79 +147,6 @@ public class DownloadT1hJob {
}; };
} }
/**
* 执行Python进程
*/
private void executePythonProcess(ProcessBuilder processBuilder, String processDescription) {
StopWatch processWatch = new StopWatch();
processWatch.start();
Process process = null;
try {
process = processBuilder.start();
// 异步处理输出流
CompletableFuture<Void> outputFuture = readStreamAsync(process.getInputStream(), "OUTPUT");
CompletableFuture<Void> errorFuture = readStreamAsync(process.getErrorStream(), "ERROR");
// 等待进程完成带超时
boolean finished = process.waitFor(PROCESS_TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!finished) {
process.destroyForcibly();
throw new RuntimeException(processDescription + "脚本执行超时");
}
// 等待输出处理完成
CompletableFuture.allOf(outputFuture, errorFuture)
.get(10, TimeUnit.SECONDS);
int exitCode = process.exitValue();
processWatch.stop();
if (exitCode == 0) {
log.info("{}脚本执行完成,耗时: {} 毫秒",
processDescription, processWatch.getTotalTimeMillis());
} else {
throw new RuntimeException(processDescription +
"脚本执行失败,退出码: " + exitCode);
}
} catch (TimeoutException e) {
throw new RuntimeException(processDescription + "脚本执行超时", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(processDescription + "脚本执行被中断", e);
} catch (Exception e) {
throw new RuntimeException(processDescription + "脚本执行失败", e);
} finally {
if (process != null) {
process.destroy();
}
}
}
/**
* 异步读取流
*/
private CompletableFuture<Void> readStreamAsync(InputStream inputStream, String type) {
return CompletableFuture.runAsync(() -> {
try (BufferedReader reader = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
String line;
while ((line = reader.readLine()) != null) {
if ("ERROR".equals(type)) {
log.error("[Python {}] {}", type, line);
} else {
log.info("[Python {}] {}", type, line);
}
}
} catch (IOException e) {
log.error("处理Python{}流失败", type, e);
}
});
}
/** /**
* 生成预报时间列表 * 生成预报时间列表
*/ */