下载、合并、记录、显示T1H数据
This commit is contained in:
		
							parent
							
								
									c4b49cfd2b
								
							
						
					
					
						commit
						b54df8b5f8
					
				|  | @ -5,4 +5,5 @@ public class WeatherPrefixConstants { | |||
|     public static final String PANGU_PREFIX = "panguweather_"; | ||||
|     public static final String CRA40_PREFIX = "CRA40_"; | ||||
|     public static final String NCEP_PREFIX = "cdas1"; | ||||
|     public static final String T1H_PREFIX = "T1H_"; | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,51 @@ | |||
| package org.jeecg.common.constant.enums; | ||||
| 
 | ||||
| import java.util.Arrays; | ||||
| import java.util.List; | ||||
| import java.util.stream.Collectors; | ||||
| 
 | ||||
| /** | ||||
|  * 文件类型说明枚举 | ||||
|  */ | ||||
| public enum T1hFilePrefixEnum { | ||||
| 
 | ||||
| 
 | ||||
|     T2MZ(0, "t2mz"),     //2米气温 | ||||
|     PSZ(1, "psz"),      //地表气压 | ||||
|     RH2M(2, "rh2m"),     //2米相对湿度 | ||||
|     VGRD10M(3, "VGRD10m"),  //10米经向风 | ||||
|     UGRD10M(4, "UGRD10m");  //10米纬向风 | ||||
| 
 | ||||
|     private Integer key; | ||||
| 
 | ||||
|     private String value; | ||||
| 
 | ||||
|     T1hFilePrefixEnum(Integer key, String value) { | ||||
|         this.key = key; | ||||
|         this.value = value; | ||||
|     } | ||||
| 
 | ||||
|     public Integer getKey(){ | ||||
|         return this.key; | ||||
|     } | ||||
| 
 | ||||
|     public String getValue(){ | ||||
|         return this.value; | ||||
|     } | ||||
| 
 | ||||
|     public static String getValueByKey(int key) { | ||||
|         for (T1hFilePrefixEnum prefix : T1hFilePrefixEnum.values()) { | ||||
|             if (prefix.getKey() == key) { | ||||
|                 return prefix.getValue(); | ||||
|             } | ||||
|         } | ||||
|         return null; | ||||
|     } | ||||
| 
 | ||||
|     public static List<String> getAllValues() { | ||||
|         return Arrays.stream(values()) | ||||
|                 .map(T1hFilePrefixEnum::getValue) | ||||
|                 .collect(Collectors.toList()); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
|  | @ -21,7 +21,12 @@ public enum WeatherDataSourceEnum { | |||
|     /** | ||||
|      * NCEP | ||||
|      */ | ||||
|     NCEP(4,"NCEP"); | ||||
|     NCEP(4,"NCEP"), | ||||
| 
 | ||||
|     /** | ||||
|      * T1H | ||||
|      */ | ||||
|     T1H(5,"T1H"); | ||||
| 
 | ||||
|     private Integer key; | ||||
| 
 | ||||
|  |  | |||
|  | @ -6,7 +6,7 @@ package org.jeecg.common.constant.enums; | |||
| public enum WeatherFileSuffixEnum { | ||||
| 
 | ||||
| 
 | ||||
|     GRIB("grib"),GRIB2("grib2"),GRB2("grb2");; | ||||
|     GRIB("grib"),GRIB2("grib2"),GRB2("grb2"),NC("nc"); | ||||
| 
 | ||||
|     private String value; | ||||
| 
 | ||||
|  |  | |||
|  | @ -20,7 +20,12 @@ public enum WeatherVariableNameEnum { | |||
|     NCEP_P(4, 1, "Pressure_msl"), | ||||
|     NCEP_H(4, 2, "Relative_humidity_height_above_ground"), | ||||
|     NCEP_U(4, 3, "u-component_of_wind_height_above_ground"), | ||||
|     NCEP_V(4, 4, "v-component_of_wind_height_above_ground"); | ||||
|     NCEP_V(4, 4, "v-component_of_wind_height_above_ground"), | ||||
|     T1H_T(5, 0, "t2mz"), | ||||
|     T1H_P(5, 1, "psz"), | ||||
|     T1H_H(5, 2, "rh2m"), | ||||
|     T1H_U(5, 3, "UGRD10m"), | ||||
|     T1H_V(5, 4, "VGRD10m"); | ||||
| 
 | ||||
|     private Integer type; | ||||
| 
 | ||||
|  |  | |||
|  | @ -29,6 +29,11 @@ public class SystemStorageProperties { | |||
|      */ | ||||
|     private String ncep; | ||||
| 
 | ||||
|     /** | ||||
|      * T1H数据存储路径 | ||||
|      */ | ||||
|     private String t1h; | ||||
| 
 | ||||
|     /** | ||||
|      * graphcast模型预测数据存储路径 | ||||
|      */ | ||||
|  |  | |||
|  | @ -0,0 +1,42 @@ | |||
| package org.jeecg.common.properties; | ||||
| 
 | ||||
| import lombok.Data; | ||||
| import org.springframework.boot.context.properties.ConfigurationProperties; | ||||
| import org.springframework.stereotype.Component; | ||||
| 
 | ||||
| @Data | ||||
| @Component | ||||
| @ConfigurationProperties(prefix = "t1h-download") | ||||
| public class T1hDownloadProperties { | ||||
| 
 | ||||
|     /** | ||||
|      * 下载T1H的Py脚本路径 | ||||
|      */ | ||||
|     private String downloadT1hPy; | ||||
| 
 | ||||
|     /** | ||||
|      * 合并T1H的Py脚本路径 | ||||
|      */ | ||||
|     private String mergeT1hPy; | ||||
| 
 | ||||
|     /** | ||||
|      * 未合并T1H的文件目录 | ||||
|      */ | ||||
|     private String t1hPath; | ||||
| 
 | ||||
|     /** | ||||
|      * T1H下载Key | ||||
|      */ | ||||
|     private String t1hKey; | ||||
| 
 | ||||
|     /** | ||||
|      * TJ1HCN(中国区域) 或 TJ1GB(全球区域) | ||||
|      */ | ||||
|     private String dataType; | ||||
| 
 | ||||
|     /** | ||||
|      * 下载小时数 | ||||
|      */ | ||||
|     private String hour; | ||||
| 
 | ||||
| } | ||||
|  | @ -1,14 +1,22 @@ | |||
| package org.jeecg.controller; | ||||
| 
 | ||||
| import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; | ||||
| import com.baomidou.mybatisplus.core.metadata.IPage; | ||||
| import io.swagger.v3.oas.annotations.Operation; | ||||
| import jakarta.annotation.Resource; | ||||
| import jakarta.validation.constraints.NotBlank; | ||||
| import lombok.RequiredArgsConstructor; | ||||
| import lombok.extern.slf4j.Slf4j; | ||||
| import org.apache.commons.codec.digest.DigestUtils; | ||||
| import org.jeecg.common.api.vo.Result; | ||||
| import org.jeecg.common.aspect.annotation.AutoLog; | ||||
| import org.jeecg.common.constant.enums.T1hFilePrefixEnum; | ||||
| import org.jeecg.common.constant.enums.WeatherDataSourceEnum; | ||||
| import org.jeecg.common.constant.enums.WeatherFileSuffixEnum; | ||||
| import org.jeecg.common.properties.SystemStorageProperties; | ||||
| import org.jeecg.common.properties.T1hDownloadProperties; | ||||
| import org.jeecg.common.system.query.PageRequest; | ||||
| import org.jeecg.job.DownloadT1hJob; | ||||
| import org.jeecg.modules.base.entity.WeatherData; | ||||
| import org.jeecg.service.WeatherDataService; | ||||
| import org.jeecg.vo.FileExistVo; | ||||
|  | @ -16,21 +24,31 @@ import org.jeecg.vo.FileUploadResultVo; | |||
| import org.jeecg.vo.FileVo; | ||||
| import org.jeecg.vo.WeatherResultVO; | ||||
| import org.springframework.format.annotation.DateTimeFormat; | ||||
| import org.springframework.scheduling.annotation.Scheduled; | ||||
| import org.springframework.util.StopWatch; | ||||
| import org.springframework.validation.annotation.Validated; | ||||
| import org.springframework.web.bind.annotation.*; | ||||
| import ucar.ma2.Array; | ||||
| import ucar.nc2.NetcdfFile; | ||||
| import ucar.nc2.Variable; | ||||
| 
 | ||||
| import java.io.FileInputStream; | ||||
| import java.io.IOException; | ||||
| import java.io.InputStream; | ||||
| import java.io.*; | ||||
| import java.nio.charset.StandardCharsets; | ||||
| import java.nio.file.Files; | ||||
| import java.nio.file.Path; | ||||
| import java.nio.file.Paths; | ||||
| import java.time.LocalDate; | ||||
| import java.time.LocalDateTime; | ||||
| import java.util.HashMap; | ||||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| import java.time.Year; | ||||
| import java.time.format.DateTimeFormatter; | ||||
| import java.util.*; | ||||
| import java.util.concurrent.CompletableFuture; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.concurrent.TimeoutException; | ||||
| import java.util.stream.Collectors; | ||||
| import java.util.stream.Stream; | ||||
| 
 | ||||
| @Slf4j | ||||
| @Validated | ||||
| @RestController | ||||
| @RequestMapping("weatherData") | ||||
|  | @ -39,6 +57,9 @@ public class WeatherDataController { | |||
| 
 | ||||
|     private final WeatherDataService weatherDataService; | ||||
| 
 | ||||
|     @Resource | ||||
|     private DownloadT1hJob downloadT1hJob; | ||||
| 
 | ||||
| 
 | ||||
|     @AutoLog(value = "分页查询气象文件数据") | ||||
|     @Operation(summary = "分页查询气象文件数据") | ||||
|  | @ -117,42 +138,8 @@ public class WeatherDataController { | |||
|         return Result.OK(); | ||||
|     } | ||||
| 
 | ||||
|     public static String calculateMD5(String filePath) throws IOException { | ||||
|         try (InputStream is = new FileInputStream(filePath)) { | ||||
|             return DigestUtils.md5Hex(is); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     public static void main(String[] args) { | ||||
|         //reftime_ISO | ||||
|         String filePath = "E:\\runtimeEnv\\fileSystem\\weather\\pangu1\\panguweather_2025073118.grib"; | ||||
|         try { | ||||
|             String md5 = calculateMD5(filePath); | ||||
|             System.out.println("MD5: " + md5); | ||||
|         } catch (IOException e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
| //        try (NetcdfFile ncFile = NetcdfFile.open(filePath2)) { | ||||
| //            Variable variable = ncFile.findVariable("reftime_ISO"); | ||||
| //            if (variable != null) { | ||||
| //                Array data = variable.read(); | ||||
| //                System.out.println(variable.getFullName()); | ||||
| //                System.out.println(data.getObject(0)); | ||||
| //            } | ||||
| //            int index = 0; | ||||
| //            for (Variable variable : ncFile.getVariables()) { | ||||
| //                if (variable != null) { | ||||
| //                    Array data = variable.read(); | ||||
| //                    System.out.println(variable.getFullName()); | ||||
| //                    System.out.println(data); | ||||
| //                    if (index == 7) { | ||||
| //                        break; | ||||
| //                    } | ||||
| //                    index++; | ||||
| //                } | ||||
| //            } | ||||
| //        }catch (Exception e){ | ||||
| // | ||||
| //        } | ||||
|     @GetMapping("downloadT1HFile") | ||||
|     public void downloadT1HFile() { | ||||
|         downloadT1hJob.downloadT1HFile(); | ||||
|     } | ||||
| } | ||||
|  |  | |||
|  | @ -0,0 +1,314 @@ | |||
| package org.jeecg.job; | ||||
| 
 | ||||
| import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; | ||||
| import lombok.RequiredArgsConstructor; | ||||
| import lombok.extern.slf4j.Slf4j; | ||||
| import org.apache.commons.codec.digest.DigestUtils; | ||||
| import org.jeecg.common.constant.enums.T1hFilePrefixEnum; | ||||
| import org.jeecg.common.constant.enums.WeatherDataSourceEnum; | ||||
| import org.jeecg.common.properties.SystemStorageProperties; | ||||
| import org.jeecg.common.properties.T1hDownloadProperties; | ||||
| import org.jeecg.modules.base.entity.WeatherData; | ||||
| import org.jeecg.service.WeatherDataService; | ||||
| import org.springframework.scheduling.annotation.Scheduled; | ||||
| import org.springframework.util.StopWatch; | ||||
| import org.springframework.web.bind.annotation.RestController; | ||||
| 
 | ||||
| import java.io.*; | ||||
| import java.nio.charset.StandardCharsets; | ||||
| import java.nio.file.Files; | ||||
| import java.nio.file.Path; | ||||
| import java.nio.file.Paths; | ||||
| import java.time.LocalDateTime; | ||||
| import java.time.format.DateTimeFormatter; | ||||
| import java.util.ArrayList; | ||||
| import java.util.Arrays; | ||||
| import java.util.Date; | ||||
| import java.util.List; | ||||
| import java.util.concurrent.CompletableFuture; | ||||
| import java.util.concurrent.TimeUnit; | ||||
| import java.util.concurrent.TimeoutException; | ||||
| import java.util.stream.Collectors; | ||||
| import java.util.stream.Stream; | ||||
| 
 | ||||
| @Slf4j | ||||
| @RestController | ||||
| @RequiredArgsConstructor | ||||
| public class DownloadT1hJob { | ||||
|     private final WeatherDataService weatherDataService; | ||||
|     private final SystemStorageProperties systemStorageProperties; | ||||
|     private final T1hDownloadProperties t1hDownloadProperties; | ||||
| 
 | ||||
|     private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMdd"); | ||||
|     private static final int PROCESS_TIMEOUT_SECONDS = 3600; // 30分钟超时 | ||||
| 
 | ||||
|     @Scheduled(cron = "0 0 1 * * ?") | ||||
|     public void downloadT1HFile() { | ||||
|         log.info("开始执行T1H文件下载任务"); | ||||
|         StopWatch stopWatch = new StopWatch(); | ||||
|         stopWatch.start(); | ||||
|         try { | ||||
|             String baseTime = getBaseTime(); | ||||
|             // 第一阶段:下载文件 | ||||
| //            downloadAllT1hFiles(baseTime); | ||||
|             // 第二阶段:合并文件 | ||||
| //            mergeT1hFiles(baseTime); | ||||
|             // 合并后删除原始文件 | ||||
|             Arrays.stream(new File(getFullPath(t1hDownloadProperties.getT1hPath())).listFiles()).filter(File::isFile).forEach(File::delete); | ||||
|             // 更新气象文件信息 | ||||
|             saveWeatherData(); | ||||
|             log.info("T1H文件下载任务执行完成"); | ||||
|         } catch (Exception e) { | ||||
|             log.error("T1H文件下载任务执行失败", e); | ||||
|             throw new RuntimeException("T1H文件下载任务失败", e); | ||||
|         } finally { | ||||
|             stopWatch.stop(); | ||||
|             log.info("T1H文件下载任务执行耗时: {} 毫秒", stopWatch.getTotalTimeMillis()); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 下载所有T1H文件 | ||||
|      */ | ||||
|     private void downloadAllT1hFiles(String baseTime) { | ||||
|         for (T1hFilePrefixEnum prefixEnum : T1hFilePrefixEnum.values()) { | ||||
|             String element = prefixEnum.getValue(); | ||||
|             log.info("开始处理元素: {}", element); | ||||
|             String[] FORECAST_HOURS_CONFIGS = {"(1, 1, 1)", "(6, 6, " + t1hDownloadProperties.getHour() + ")"}; | ||||
|             for (String forecastHours : FORECAST_HOURS_CONFIGS) { | ||||
|                 executePythonScript(element, baseTime, forecastHours); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 合并T1H文件 | ||||
|      */ | ||||
|     private void mergeT1hFiles(String baseTime) { | ||||
|         List<String> variables = T1hFilePrefixEnum.getAllValues(); | ||||
|         List<String> forecastTimes = generateForecastTimes(); | ||||
| 
 | ||||
|         ProcessBuilder processBuilder = new ProcessBuilder( | ||||
|                 "python", | ||||
|                 getPythonScriptPath(t1hDownloadProperties.getMergeT1hPy()), | ||||
|                 "--indir", getFullPath(t1hDownloadProperties.getT1hPath()), | ||||
|                 "--output_dir", getFullPath(systemStorageProperties.getT1h()), | ||||
|                 "--variables", String.join(",", variables), | ||||
|                 "--forecast_times", String.join(",", forecastTimes), | ||||
|                 "--base_date", baseTime | ||||
|         ); | ||||
| 
 | ||||
|         executePythonProcess(processBuilder, "文件合并"); | ||||
|     } | ||||
| 
 | ||||
|     private void saveWeatherData(){ | ||||
|         //删除一个月前的文件 | ||||
|         LocalDateTime oneMonthAgo = LocalDateTime.now().minusMonths(1); | ||||
|         List<WeatherData> weatherDatas = weatherDataService.list(new LambdaQueryWrapper<WeatherData>().lt(WeatherData::getDataStartTime, oneMonthAgo)); | ||||
|         // 遍历删除文件 | ||||
|         weatherDatas.forEach(data -> { | ||||
|             if (data.getFilePath() != null && !data.getFilePath().isEmpty()) { | ||||
|                 File file = new File(data.getFilePath()); | ||||
|                 if (file.exists()) { | ||||
|                     file.delete(); | ||||
|                 } | ||||
|             } | ||||
|         }); | ||||
|         // 删除数据库记录 | ||||
|         weatherDataService.remove(new LambdaQueryWrapper<WeatherData>().eq(WeatherData::getDataSource, WeatherDataSourceEnum.T1H.getKey())); | ||||
|         // 读取目录文件信息 | ||||
|         List<WeatherData> weatherFileInfos = readFolderFiles(getFullPath(systemStorageProperties.getT1h())); | ||||
|         //保存文件信息 | ||||
|         weatherDataService.saveBatch(weatherFileInfos); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 执行Python脚本 | ||||
|      */ | ||||
|     private void executePythonScript(String element, String baseTime, String forecastHours) { | ||||
|         String[] command = buildPythonCommand(element, baseTime, forecastHours); | ||||
|         log.info("执行Python脚本: {}", Arrays.toString(command)); | ||||
| 
 | ||||
|         ProcessBuilder processBuilder = new ProcessBuilder(command); | ||||
|         executePythonProcess(processBuilder, | ||||
|                 String.format("元素%s预报时长%s下载", element, forecastHours)); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 构建Python命令 | ||||
|      */ | ||||
|     private String[] buildPythonCommand(String element, String baseTime, String forecastHours) { | ||||
|         return new String[]{ | ||||
|                 "python", | ||||
|                 getPythonScriptPath(t1hDownloadProperties.getDownloadT1hPy()), | ||||
|                 "--base-time", baseTime, | ||||
|                 "--element", element, | ||||
|                 "--output-dir", getFullPath(t1hDownloadProperties.getT1hPath()), | ||||
|                 "--forecast-hours", forecastHours, | ||||
|                 "--auth-token", t1hDownloadProperties.getT1hKey(), | ||||
|                 "--data-type", t1hDownloadProperties.getDataType() | ||||
|         }; | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 执行Python进程 | ||||
|      */ | ||||
|     private void executePythonProcess(ProcessBuilder processBuilder, String processDescription) { | ||||
|         StopWatch processWatch = new StopWatch(); | ||||
|         processWatch.start(); | ||||
| 
 | ||||
|         Process process = null; | ||||
|         try { | ||||
|             process = processBuilder.start(); | ||||
| 
 | ||||
|             // 异步处理输出流 | ||||
|             CompletableFuture<Void> outputFuture = readStreamAsync(process.getInputStream(), "OUTPUT"); | ||||
|             CompletableFuture<Void> errorFuture = readStreamAsync(process.getErrorStream(), "ERROR"); | ||||
| 
 | ||||
|             // 等待进程完成(带超时) | ||||
|             boolean finished = process.waitFor(PROCESS_TIMEOUT_SECONDS, TimeUnit.SECONDS); | ||||
|             if (!finished) { | ||||
|                 process.destroyForcibly(); | ||||
|                 throw new RuntimeException(processDescription + "脚本执行超时"); | ||||
|             } | ||||
| 
 | ||||
|             // 等待输出处理完成 | ||||
|             CompletableFuture.allOf(outputFuture, errorFuture) | ||||
|                     .get(10, TimeUnit.SECONDS); | ||||
| 
 | ||||
|             int exitCode = process.exitValue(); | ||||
|             processWatch.stop(); | ||||
| 
 | ||||
|             if (exitCode == 0) { | ||||
|                 log.info("{}脚本执行成功,耗时: {} 毫秒", | ||||
|                         processDescription, processWatch.getTotalTimeMillis()); | ||||
|             } else { | ||||
|                 throw new RuntimeException(processDescription + | ||||
|                         "脚本执行失败,退出码: " + exitCode); | ||||
|             } | ||||
| 
 | ||||
|         } catch (TimeoutException e) { | ||||
|             throw new RuntimeException(processDescription + "脚本执行超时", e); | ||||
|         } catch (InterruptedException e) { | ||||
|             Thread.currentThread().interrupt(); | ||||
|             throw new RuntimeException(processDescription + "脚本执行被中断", e); | ||||
|         } catch (Exception e) { | ||||
|             throw new RuntimeException(processDescription + "脚本执行失败", e); | ||||
|         } finally { | ||||
|             if (process != null) { | ||||
|                 process.destroy(); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 异步读取流 | ||||
|      */ | ||||
|     private CompletableFuture<Void> readStreamAsync(InputStream inputStream, String type) { | ||||
|         return CompletableFuture.runAsync(() -> { | ||||
|             try (BufferedReader reader = new BufferedReader( | ||||
|                     new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { | ||||
| 
 | ||||
|                 String line; | ||||
|                 while ((line = reader.readLine()) != null) { | ||||
|                     if ("ERROR".equals(type)) { | ||||
|                         log.error("[Python {}] {}", type, line); | ||||
|                     } else { | ||||
|                         log.info("[Python {}] {}", type, line); | ||||
|                     } | ||||
|                 } | ||||
|             } catch (IOException e) { | ||||
|                 log.error("处理Python{}流失败", type, e); | ||||
|             } | ||||
|         }); | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 生成预报时间列表 | ||||
|      */ | ||||
|     private List<String> generateForecastTimes() { | ||||
|         List<String> forecastTimes = new ArrayList<>(); | ||||
|         int hour = Integer.parseInt(t1hDownloadProperties.getHour()); | ||||
| 
 | ||||
|         for (int i = 0; i <= hour; i += 6) { | ||||
|             if (i == 0) { | ||||
|                 forecastTimes.add(String.format("f%03d", i + 1)); | ||||
|             } else { | ||||
|                 forecastTimes.add(String.format("f%03d", i)); | ||||
|             } | ||||
|         } | ||||
|         return forecastTimes; | ||||
|     } | ||||
| 
 | ||||
|     public List<WeatherData> readFolderFiles(String folderPath) { | ||||
|         try (Stream<Path> paths = Files.list(Paths.get(folderPath))) { | ||||
|             return paths.filter(Files::isRegularFile) | ||||
|                     .map(Path::toFile) | ||||
|                     .map(this::extractFileInfo) | ||||
|                     .collect(Collectors.toList()); | ||||
|         } catch (IOException e) { | ||||
|             throw new RuntimeException("读取文件夹失败: " + folderPath, e); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private WeatherData extractFileInfo(File file) { | ||||
|         WeatherData data = new WeatherData(); | ||||
|         data.setFileName(file.getName()); | ||||
|         data.setFileSize((double) file.length() / 1024 / 1024); // MB | ||||
|         data.setFileExt(getFileExtension(file.getName())); | ||||
|         data.setFilePath(file.getAbsolutePath()); | ||||
|         data.setDataSource(WeatherDataSourceEnum.T1H.getKey()); | ||||
|         data.setDataStartTime(parseStartTimeFromFileName(file.getName())); | ||||
|         data.setCreateTime(new Date()); | ||||
|         data.setMd5Value(calculateMD5(file.getAbsolutePath())); | ||||
|         data.setShareTotal(1); | ||||
|         return data; | ||||
|     } | ||||
| 
 | ||||
|     private LocalDateTime parseStartTimeFromFileName(String fileName) { | ||||
|         // 从文件名解析时间 示例:"T1H_20251029_00.nc" 这样的格式 | ||||
|         try { | ||||
|             String dateStr = fileName.substring(4, 12);  // 20251029 | ||||
|             String hourStr = fileName.substring(13, 15); // 00 | ||||
|             // 解析为 LocalDateTime(时分秒设置为 00:00:00) | ||||
|             return LocalDateTime.parse(dateStr + hourStr + "0000",DateTimeFormatter.ofPattern("yyyyMMddHHmmss")); | ||||
|         } catch (Exception e) { | ||||
|             e.printStackTrace(); | ||||
|         } | ||||
|         return null; // 解析失败返回当前时间 | ||||
|     } | ||||
| 
 | ||||
|     public String calculateMD5(String filePath){ | ||||
|         try (InputStream is = new FileInputStream(filePath)) { | ||||
|             return DigestUtils.md5Hex(is); | ||||
|         } catch (IOException e) { | ||||
|             throw new RuntimeException(e); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private String getFileExtension(String fileName) { | ||||
|         int lastDot = fileName.lastIndexOf("."); | ||||
|         return lastDot > 0 ? fileName.substring(lastDot + 1).toLowerCase() : ""; | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 获取完整路径 | ||||
|      */ | ||||
|     private String getFullPath(String relativePath) { | ||||
|         return systemStorageProperties.getRootPath() + File.separator + relativePath; | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 获取Python脚本路径 | ||||
|      */ | ||||
|     private String getPythonScriptPath(String scriptName) { | ||||
|         return systemStorageProperties.getRootPath() + File.separator + scriptName; | ||||
|     } | ||||
| 
 | ||||
|     /** | ||||
|      * 获取基准时间 | ||||
|      */ | ||||
|     private String getBaseTime() { | ||||
|         return LocalDateTime.now().format(DATE_FORMATTER) + "00"; | ||||
|     } | ||||
| } | ||||
|  | @ -1,7 +1,9 @@ | |||
| package org.jeecg.service; | ||||
| 
 | ||||
| import com.baomidou.mybatisplus.core.metadata.IPage; | ||||
| import com.baomidou.mybatisplus.extension.service.IService; | ||||
| import org.jeecg.common.system.query.PageRequest; | ||||
| import org.jeecg.modules.base.entity.StasDataSource; | ||||
| import org.jeecg.modules.base.entity.WeatherData; | ||||
| import org.jeecg.vo.FileExistVo; | ||||
| import org.jeecg.vo.FileUploadResultVo; | ||||
|  | @ -13,7 +15,7 @@ import java.time.LocalDateTime; | |||
| import java.util.List; | ||||
| import java.util.Map; | ||||
| 
 | ||||
| public interface WeatherDataService { | ||||
| public interface WeatherDataService extends IService<WeatherData> { | ||||
| 
 | ||||
|     WeatherResultVO getWeatherData(Integer dataType, Integer weatherType, LocalDateTime startTime, int hour); | ||||
|     WeatherResultVO getWeatherDataPreview(String weatherId, Integer weatherType); | ||||
|  |  | |||
|  | @ -81,6 +81,8 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
|                 return processWeatherData(weatherType, targetTime, WeatherDataSourceEnum.CRA40); | ||||
|             } else if (WeatherDataSourceEnum.NCEP.getKey() == dataType){ | ||||
|                 return processWeatherData(weatherType, targetTime, WeatherDataSourceEnum.NCEP); | ||||
|             } else if (WeatherDataSourceEnum.T1H.getKey() == dataType){ | ||||
|                 return processWeatherData(weatherType, targetTime, WeatherDataSourceEnum.T1H); | ||||
|             } | ||||
|         } catch (JeecgBootException e) { | ||||
|             throw e; | ||||
|  | @ -110,6 +112,8 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
|                 return processWeatherData(weatherType, targetTime, WeatherDataSourceEnum.CRA40); | ||||
|             } else if (WeatherDataSourceEnum.NCEP.getKey() == dataType){ | ||||
|                 return processWeatherData(weatherType, targetTime, WeatherDataSourceEnum.NCEP); | ||||
|             } else if (WeatherDataSourceEnum.T1H.getKey() == dataType){ | ||||
|                 return processWeatherData(weatherType, targetTime, WeatherDataSourceEnum.T1H); | ||||
|             } | ||||
|         } catch (JeecgBootException e) { | ||||
|             throw e; | ||||
|  | @ -152,26 +156,19 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
|         // 循环处理每个时间点的数据 | ||||
|         LocalDateTime currentTime = startTime; | ||||
|         while (!currentTime.isAfter(endTime)) { | ||||
|             try { | ||||
|                 String filePath = buildFilePath(currentTime, | ||||
|                         WeatherTypeEnum.TEMPERATURE.getKey(), // 使用温度作为基础类型 | ||||
|                         WeatherDataSourceEnum.getInfoByKey(dataType)); | ||||
| 
 | ||||
|                 if (!isFileValid(filePath)) { | ||||
|                     log.warn("文件无效或不存在: {}", filePath); | ||||
|                     currentTime = currentTime.plusHours(hourInterval); | ||||
|                     continue; | ||||
|             String filePath = buildFilePath(currentTime, WeatherTypeEnum.TEMPERATURE.getKey(), | ||||
|                     WeatherDataSourceEnum.getInfoByKey(dataType)); | ||||
|             if (isFileValid(filePath)) { | ||||
|                 try { | ||||
|                     processCommonData(filePath, gridIndex, currentTime, timeList, temperatureList, | ||||
|                             pressureList, humidityList, windSpeedList, dataType); | ||||
|                 } catch (Exception e) { | ||||
|                     log.error("处理时间点 {} 数据时出错: {}", currentTime, e.getMessage(), e); | ||||
|                 } | ||||
| 
 | ||||
|                 // 使用重构后的通用处理方法 | ||||
|                 processCommonData(filePath, gridIndex, currentTime, timeList, temperatureList, | ||||
|                         pressureList, humidityList, windSpeedList, dataType); | ||||
| 
 | ||||
|             } catch (Exception e) { | ||||
|                 log.error("处理时间点 {} 数据时出错: {}", currentTime, e.getMessage(), e); | ||||
|             } finally { | ||||
|                 currentTime = currentTime.plusHours(hourInterval); | ||||
|             } else { | ||||
|                 log.warn("文件无效或不存在: {}", filePath); | ||||
|             } | ||||
|             currentTime = currentTime.plusHours(hourInterval); | ||||
|         } | ||||
| 
 | ||||
|         // 构建结果 | ||||
|  | @ -461,10 +458,12 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
|             storagePath.append(this.systemStorageProperties.getPangu()); | ||||
|         } else if (WeatherDataSourceEnum.GRAPHCAST.getKey().equals(dataSource)) { | ||||
|             storagePath.append(this.systemStorageProperties.getGraphcast()); | ||||
|         }else if (WeatherDataSourceEnum.CRA40.getKey().equals(dataSource)) { | ||||
|         } else if (WeatherDataSourceEnum.CRA40.getKey().equals(dataSource)) { | ||||
|             storagePath.append(this.systemStorageProperties.getCra40()); | ||||
|         }else if (WeatherDataSourceEnum.NCEP.getKey().equals(dataSource)) { | ||||
|         } else if (WeatherDataSourceEnum.NCEP.getKey().equals(dataSource)) { | ||||
|             storagePath.append(this.systemStorageProperties.getNcep()); | ||||
|         } else if (WeatherDataSourceEnum.T1H.getKey().equals(dataSource)) { | ||||
|             storagePath.append(this.systemStorageProperties.getT1h()); | ||||
|         } | ||||
|         storagePath.append(File.separator); | ||||
|         storagePath.append(fileName.substring(0,fileName.lastIndexOf(StringPool.DOT))); | ||||
|  | @ -483,7 +482,7 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
| 
 | ||||
|         // 使用与第一个方法相同的变量名获取方式 | ||||
|         Map<String, String> variables = getVariableNames(dataType); | ||||
|         if(WeatherDataSourceEnum.PANGU.getKey() == dataType || WeatherDataSourceEnum.NCEP.getKey() == dataType){ | ||||
|         if(WeatherDataSourceEnum.CRA40.getKey() != dataType){ | ||||
|             try (NetcdfFile ncFile = NetcdfFile.open(filePath)) { | ||||
|                 // 读取数据(使用通用NcUtil方法) | ||||
|                 List<List<Double>> tData = getVariableData(ncFile, variables.get("temperature")); | ||||
|  | @ -564,6 +563,12 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
|             variables.put("humidity", WeatherVariableNameEnum.NCEP_H.getValue()); | ||||
|             variables.put("windU", WeatherVariableNameEnum.NCEP_U.getValue()); | ||||
|             variables.put("windV", WeatherVariableNameEnum.NCEP_V.getValue()); | ||||
|         } else if (WeatherDataSourceEnum.T1H.getKey() == dataType){ | ||||
|             variables.put("temperature", WeatherVariableNameEnum.T1H_T.getValue()); | ||||
|             variables.put("pressure", WeatherVariableNameEnum.T1H_P.getValue()); | ||||
|             variables.put("humidity", WeatherVariableNameEnum.T1H_H.getValue()); | ||||
|             variables.put("windU", WeatherVariableNameEnum.T1H_U.getValue()); | ||||
|             variables.put("windV", WeatherVariableNameEnum.T1H_V.getValue()); | ||||
|         } | ||||
|         return variables; | ||||
|     } | ||||
|  | @ -693,7 +698,7 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
|      */ | ||||
|     private WeatherResultVO processWindData(Integer weatherType, LocalDateTime targetTime, | ||||
|                                             WeatherDataSourceEnum dataTypeEnum, WeatherResultVO weatherResultVO) { | ||||
|         if (WeatherDataSourceEnum.PANGU.equals(dataTypeEnum) || WeatherDataSourceEnum.NCEP.equals(dataTypeEnum)) { | ||||
|         if (!WeatherDataSourceEnum.CRA40.equals(dataTypeEnum)) { | ||||
|             String filePath = buildFilePath(targetTime, weatherType, dataTypeEnum); | ||||
|             validateFile(filePath); | ||||
| 
 | ||||
|  | @ -820,7 +825,15 @@ public class WeatherDataServiceImpl extends ServiceImpl<WeatherDataMapper, Weath | |||
|                     .append(WeatherSuffixConstants.NCEP_SUFFIX) | ||||
|                     .append(".") | ||||
|                     .append(WeatherFileSuffixEnum.GRB2.getValue()); | ||||
|         } else if (WeatherDataSourceEnum.T1H.equals(dataTypeEnum)) { | ||||
|             storagePath.append(systemStorageProperties.getT1h()) | ||||
|                     .append(File.separator) | ||||
|                     .append(WeatherPrefixConstants.T1H_PREFIX) | ||||
|                     .append(targetTime.format(DateTimeFormatter.ofPattern("yyyyMMdd_HH"))) | ||||
|                     .append(".") | ||||
|                     .append(WeatherFileSuffixEnum.NC.getValue()); | ||||
|         } | ||||
| 
 | ||||
|         return storagePath.toString(); | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -0,0 +1,39 @@ | |||
| package org.jeecg.utils; | ||||
| 
 | ||||
| import lombok.extern.slf4j.Slf4j; | ||||
| 
 | ||||
| import java.io.BufferedReader; | ||||
| import java.io.IOException; | ||||
| import java.io.InputStream; | ||||
| import java.io.InputStreamReader; | ||||
| 
 | ||||
| /** | ||||
|  * 专门的进程输出处理类 | ||||
|  */ | ||||
| @Slf4j | ||||
| public class ProcessOutputHandler extends Thread { | ||||
|     private final InputStream inputStream; | ||||
|     private final String type; | ||||
| 
 | ||||
|     public ProcessOutputHandler(InputStream inputStream, String type) { | ||||
|         this.inputStream = inputStream; | ||||
|         this.type = type; | ||||
|         this.setDaemon(true); | ||||
|     } | ||||
| 
 | ||||
|     @Override | ||||
|     public void run() { | ||||
|         try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, "GBK"))) { | ||||
|             String line; | ||||
|             while ((line = reader.readLine()) != null) { | ||||
|                 if ("ERROR".equals(type)) { | ||||
|                     log.error("Python脚本错误输出: {}", line); | ||||
|                 } else { | ||||
|                     log.info("Python脚本输出: {}", line); | ||||
|                 } | ||||
|             } | ||||
|         } catch (IOException e) { | ||||
|             log.error("读取Python脚本{}流失败", type, e); | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | @ -0,0 +1,86 @@ | |||
| package org.jeecg.utils; | ||||
| 
 | ||||
| import java.io.File; | ||||
| import java.time.LocalDate; | ||||
| import java.time.format.DateTimeFormatter; | ||||
| import java.util.*; | ||||
| import java.util.stream.Collectors; | ||||
| 
 | ||||
| public class T1hUtils { | ||||
| 
 | ||||
|     public static void main(String[] args) { | ||||
|         String folderPath = "D:\\runtimeEnv\\fileSystem\\weather\\T1HDownload"; | ||||
|         File folder = new File(folderPath); | ||||
| 
 | ||||
|         if (!folder.exists() || !folder.isDirectory()) { | ||||
|             System.out.println("文件夹不存在或路径不正确"); | ||||
|             return; | ||||
|         } | ||||
| 
 | ||||
|         LocalDate targetDate = LocalDate.now(); // 使用当天日期 | ||||
| 
 | ||||
|         Map<String, List<String>> groupedFiles = groupAndSortFilesByDate(folder, targetDate); | ||||
|         printGroupedFiles(groupedFiles, targetDate); | ||||
|     } | ||||
| 
 | ||||
|     public static Map<String, List<String>> groupAndSortFilesByDate(File folder, LocalDate targetDate) { | ||||
|         File[] files = folder.listFiles(); | ||||
|         if (files == null) return new HashMap<>(); | ||||
| 
 | ||||
|         String datePattern = targetDate.format(DateTimeFormatter.ofPattern("yyyyMMdd")); | ||||
| 
 | ||||
|         return Arrays.stream(files) | ||||
|                 .filter(File::isFile) | ||||
|                 .map(File::getName) | ||||
|                 .filter(name -> name.matches("\\d{10}_[a-zA-Z0-9]+_f\\d{3}\\.nc")) | ||||
|                 .filter(name -> isFileForTargetDate(name, datePattern)) | ||||
|                 .collect(Collectors.groupingBy( | ||||
|                         fileName -> fileName.split("_")[1], | ||||
|                         Collectors.collectingAndThen( | ||||
|                                 Collectors.toList(), | ||||
|                                 list -> list.stream() | ||||
|                                         .sorted(Comparator.comparingInt(fileName -> extractForecastTime(fileName))) // 使用lambda表达式 | ||||
|                                         .collect(Collectors.toList()) | ||||
|                         ) | ||||
|                 )); | ||||
|     } | ||||
| 
 | ||||
|     private static boolean isFileForTargetDate(String fileName, String targetDate) { | ||||
|         try { | ||||
|             String fileDate = fileName.substring(0, 8); | ||||
|             return fileDate.equals(targetDate); | ||||
|         } catch (Exception e) { | ||||
|             return false; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private static int extractForecastTime(String fileName) { | ||||
|         try { | ||||
|             String timeStr = fileName.split("_")[2].replace("f", "").replace(".nc", ""); | ||||
|             return Integer.parseInt(timeStr); | ||||
|         } catch (Exception e) { | ||||
|             return 999; // 如果解析失败,放到最后 | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private static void printGroupedFiles(Map<String, List<String>> groupedFiles, LocalDate targetDate) { | ||||
|         System.out.println("=== " + targetDate.format(DateTimeFormatter.ofPattern("yyyy年MM月dd日")) + " 的文件 ==="); | ||||
| 
 | ||||
|         if (groupedFiles.isEmpty()) { | ||||
|             System.out.println("未找到符合日期条件的文件"); | ||||
|             return; | ||||
|         } | ||||
| 
 | ||||
|         groupedFiles.entrySet().stream() | ||||
|                 .sorted(Map.Entry.comparingByKey()) | ||||
|                 .forEach(entry -> { | ||||
|                     System.out.println("变量: " + entry.getKey()); | ||||
|                     entry.getValue().forEach(file -> System.out.println("  " + file)); | ||||
|                     System.out.println(); | ||||
|                 }); | ||||
| 
 | ||||
|         // 统计信息 | ||||
|         int totalFiles = groupedFiles.values().stream().mapToInt(List::size).sum(); | ||||
|         System.out.println("总计: " + groupedFiles.size() + " 个变量组, " + totalFiles + " 个文件"); | ||||
|     } | ||||
| } | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	 hekaiyu
						hekaiyu