2023-10-28 09:54:33 +08:00
|
|
|
|
package org.jeecg;
|
|
|
|
|
|
|
|
|
|
import cn.hutool.core.util.StrUtil;
|
|
|
|
|
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
|
|
|
|
|
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
|
|
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
import org.jeecg.common.CacheName;
|
2023-10-28 14:06:45 +08:00
|
|
|
|
import org.jeecg.common.config.mqtoken.UserTokenContext;
|
2024-01-09 14:36:32 +08:00
|
|
|
|
import org.jeecg.common.properties.MaximumPoolSizeProperties;
|
2024-01-03 19:30:22 +08:00
|
|
|
|
import org.jeecg.common.util.DateUtils;
|
2023-10-28 09:54:33 +08:00
|
|
|
|
import org.jeecg.common.util.RedisUtil;
|
2024-01-23 15:43:23 +08:00
|
|
|
|
import org.jeecg.modules.base.entity.configuration.GardsStations;
|
2023-10-28 09:54:33 +08:00
|
|
|
|
import org.jeecg.modules.base.entity.original.GardsMetData;
|
|
|
|
|
import org.jeecg.modules.base.entity.original.GardsSampleData;
|
|
|
|
|
import org.jeecg.modules.base.entity.original.GardsSohData;
|
2024-01-23 15:43:23 +08:00
|
|
|
|
import org.jeecg.modules.base.enums.DetectorStatus;
|
2023-10-28 09:54:33 +08:00
|
|
|
|
import org.jeecg.modules.entity.data.*;
|
|
|
|
|
import org.jeecg.modules.mapper.StationMetDataMapper;
|
|
|
|
|
import org.jeecg.modules.mapper.StationSampleDataMapper;
|
|
|
|
|
import org.jeecg.modules.mapper.StationSohDataMapper;
|
|
|
|
|
import org.jeecg.modules.service.ICacheTimeService;
|
|
|
|
|
import org.jeecg.modules.system.entity.GardsDetectorsSystem;
|
|
|
|
|
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
|
|
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
|
|
|
|
import java.time.LocalDateTime;
|
|
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
|
|
import java.util.*;
|
|
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
2023-10-28 14:06:45 +08:00
|
|
|
|
import static org.jeecg.common.util.TokenUtils.getTempToken;
|
|
|
|
|
|
2023-10-28 09:54:33 +08:00
|
|
|
|
@Slf4j
|
|
|
|
|
@Component
|
|
|
|
|
@RequiredArgsConstructor
|
|
|
|
|
public class DataReceivingStatusManager {
|
|
|
|
|
|
|
|
|
|
private final RedisUtil redisUtil;
|
|
|
|
|
|
|
|
|
|
private final ICacheTimeService cacheTimeService;
|
|
|
|
|
|
|
|
|
|
private final StationMetDataMapper stationMetDataMapper;
|
|
|
|
|
|
|
|
|
|
private final StationSampleDataMapper stationSampleDataMapper;
|
|
|
|
|
|
|
|
|
|
private final StationSohDataMapper stationSohDataMapper;
|
|
|
|
|
|
2024-01-09 14:36:32 +08:00
|
|
|
|
private final MaximumPoolSizeProperties maximumPoolSizeProperties;
|
|
|
|
|
|
2023-10-28 09:54:33 +08:00
|
|
|
|
public void start() {
|
|
|
|
|
ReceivingStatusThreadManager receivingStatusManager = new ReceivingStatusThreadManager();
|
|
|
|
|
receivingStatusManager.init();
|
|
|
|
|
receivingStatusManager.start();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* 台站接收状态线程管理器
|
|
|
|
|
*/
|
|
|
|
|
private class ReceivingStatusThreadManager extends Thread{
|
|
|
|
|
|
|
|
|
|
private ThreadPoolExecutor poolExecutor;
|
|
|
|
|
|
|
|
|
|
public void init(){
|
|
|
|
|
//获取机器可用核心数
|
2024-01-09 14:36:32 +08:00
|
|
|
|
int systemCores = maximumPoolSizeProperties.getStation();//Runtime.getRuntime().availableProcessors();
|
2023-10-28 09:54:33 +08:00
|
|
|
|
//初始化线程池
|
|
|
|
|
ThreadFactory threadFactory = new CustomizableThreadFactory("undeal-file-parsing-");
|
|
|
|
|
poolExecutor = new ThreadPoolExecutor(systemCores-1,systemCores,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void run() {
|
|
|
|
|
for(;;){
|
|
|
|
|
long start = System.currentTimeMillis();
|
2023-10-28 14:06:45 +08:00
|
|
|
|
// start:生成临时Token到线程中
|
|
|
|
|
UserTokenContext.setToken(getTempToken());
|
2023-10-28 09:54:33 +08:00
|
|
|
|
//获取四项缓存数据的对应内容
|
|
|
|
|
List<Map<String, String>> cacheList = cacheTimeService.findCacheTime();
|
|
|
|
|
//缓存时间
|
|
|
|
|
String cacheTime = "";
|
|
|
|
|
for (int i=0; i< cacheList.size(); i++) {
|
|
|
|
|
if ( StringUtils.isNotBlank(cacheList.get(i).get(CacheName.cacheTime)) ) {
|
|
|
|
|
cacheTime = cacheList.get(i).get(CacheName.cacheTime);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
//从redis中获取台站信息
|
2024-01-23 15:43:23 +08:00
|
|
|
|
Map<String, GardsStations> stationInfoMap = (HashMap<String, GardsStations>) redisUtil.get("stationInfoMap");
|
|
|
|
|
List<String> stationIds = new LinkedList<>();
|
|
|
|
|
for (Map.Entry<String, GardsStations> stationInfo:stationInfoMap.entrySet()) {
|
|
|
|
|
GardsStations infoValue = stationInfo.getValue();
|
|
|
|
|
if (infoValue.getStatus() != null && infoValue.getStatus().equalsIgnoreCase(DetectorStatus.ON.getValue())) {
|
|
|
|
|
stationIds.add(infoValue.getStationId().toString());
|
|
|
|
|
}
|
|
|
|
|
}
|
2023-10-28 09:54:33 +08:00
|
|
|
|
//从redis中获取探测器信息
|
|
|
|
|
Map<Integer, String> detectorInfoMap = (Map<Integer, String>)redisUtil.get("detectorsMap");
|
|
|
|
|
//声明存储所有台站id对应的数据信息的集合
|
2023-10-28 14:06:45 +08:00
|
|
|
|
Map<String, StationData> stationDataMap = Objects.nonNull(redisUtil.get("stationDataMap"))?(Map<String, StationData>) redisUtil.get("stationDataMap"):new HashMap<>();
|
2023-10-28 09:54:33 +08:00
|
|
|
|
//遍历台站id
|
|
|
|
|
if (CollectionUtils.isNotEmpty(stationIds)) {
|
|
|
|
|
//获取当前日期时间 作为结束查询时间
|
|
|
|
|
LocalDateTime endDate = LocalDateTime.now();
|
|
|
|
|
//根据缓存日期 得到开始查询时间
|
|
|
|
|
LocalDateTime startDate = endDate.minusDays(Integer.valueOf(cacheTime));
|
|
|
|
|
String startDateTime = startDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
|
|
|
|
//根据台站id,开始时间查询出台站下的气象数据
|
|
|
|
|
List<GardsMetData> metDataList = stationMetDataMapper.findMetDataList(stationIds, startDateTime);
|
|
|
|
|
//根据台站id查询出当前台站下处于运行状态的数据
|
|
|
|
|
Map<String, List<GardsDetectorsSystem>> stationDetectors = cacheTimeService.findStationDetectors(stationIds);
|
|
|
|
|
//遍历台站id 获取台站下的探测器数据
|
|
|
|
|
if (CollectionUtils.isNotEmpty(stationDetectors)) {
|
2023-10-28 14:06:45 +08:00
|
|
|
|
for (String stationId : stationIds) {
|
2023-10-28 09:54:33 +08:00
|
|
|
|
Map<String, List<Map<String, DetectorData>>> stationMap = new HashMap<>();
|
|
|
|
|
//获取台站下对应的探测器数据
|
2023-10-28 14:06:45 +08:00
|
|
|
|
List<GardsDetectorsSystem> detectors = stationDetectors.get(stationId);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
if (CollectionUtils.isNotEmpty(detectors)) {
|
|
|
|
|
StationData stationData = new StationData();
|
|
|
|
|
//stream流获取探测器id
|
|
|
|
|
List<Integer> detectorIds = detectors.stream().map(GardsDetectorsSystem::getDetectorId).collect(Collectors.toList());
|
2024-01-03 19:30:22 +08:00
|
|
|
|
//探测器加上NA 无效探测器id
|
|
|
|
|
detectorIds.add(1);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
//根据探测器id 开始时间查询样品基础数据
|
|
|
|
|
List<GardsSampleData> sampleDataList = stationSampleDataMapper.findSampleDataList(detectorIds, startDateTime);
|
|
|
|
|
//根据台站id,探测器id,开始时间
|
|
|
|
|
List<GardsSohData> sohDataList = stationSohDataMapper.findSohDataList(stationId, detectorIds, startDateTime);
|
2024-01-03 19:30:22 +08:00
|
|
|
|
//移除掉最后一个探测器
|
|
|
|
|
detectorIds.remove(detectorIds.size()-1);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
//用于接收当前台站下所有探测器及探测器所有的样品数据,气体数据,状态数据集合
|
|
|
|
|
List<Map<String, DetectorData>> detectorDataList = new LinkedList<>();
|
|
|
|
|
for (Integer detectorId : detectorIds) {
|
|
|
|
|
Map<String, DetectorData> detectorMap = new HashMap<>();
|
|
|
|
|
DetectorData detectorData = new DetectorData();
|
|
|
|
|
detectorData.setDetectorId(detectorId);
|
|
|
|
|
//声明数据实体实体类 根据参数存储 样品基础数据对应的数据 气体数据 状态数据
|
|
|
|
|
List<DataInfoVo> dataInfoList = new LinkedList<>();
|
|
|
|
|
if (CollectionUtils.isNotEmpty(sampleDataList)) {
|
|
|
|
|
//根据探测器id过滤出对应的样品数据 并进行遍历封装进dataInfo
|
|
|
|
|
List<GardsSampleData> dataListSample = sampleDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList());
|
|
|
|
|
if (CollectionUtils.isNotEmpty(dataListSample)) {
|
|
|
|
|
for (GardsSampleData sampleData : dataListSample) {
|
|
|
|
|
DataInfoVo dataInfo = new DataInfoVo();
|
|
|
|
|
//根据样品数据类型判断 数据类型 根据不同的样品数据状态
|
|
|
|
|
String dataType = sampleData.getDataType();
|
|
|
|
|
String spectralQualifie = sampleData.getSpectralQualifie();
|
|
|
|
|
if (StrUtil.equals(dataType, "S")) {
|
|
|
|
|
dataInfo.setType("PHD");
|
|
|
|
|
if (StrUtil.equals(spectralQualifie, "PREL")) {
|
|
|
|
|
dataInfo.setStatus("SPREL");
|
|
|
|
|
} else if (StrUtil.equals(spectralQualifie, "FULL")) {
|
|
|
|
|
dataInfo.setStatus("SFULL");
|
|
|
|
|
}
|
|
|
|
|
} else if (StrUtil.equals(dataType, "Q")) {
|
|
|
|
|
dataInfo.setType("QC");
|
|
|
|
|
dataInfo.setStatus("QC");
|
|
|
|
|
} else if (StrUtil.equals(dataType, "G")) {
|
|
|
|
|
dataInfo.setType("PHD");
|
|
|
|
|
if (StrUtil.equals(spectralQualifie, "PREL")) {
|
|
|
|
|
dataInfo.setStatus("GPREL");
|
|
|
|
|
} else if (StrUtil.equals(spectralQualifie, "FULL")) {
|
|
|
|
|
dataInfo.setStatus("GFULL");
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
//处理开始时间
|
|
|
|
|
Date acquisitionStart = sampleData.getAcquisitionStart();
|
|
|
|
|
dataInfo.setBeginTime(Double.valueOf(acquisitionStart.getTime() / 1000));
|
|
|
|
|
//处理结束时间
|
|
|
|
|
Date acquisitionStop = sampleData.getAcquisitionStop();
|
|
|
|
|
dataInfo.setEndTime(Double.valueOf(acquisitionStop.getTime() / 1000));
|
|
|
|
|
//时间间隔
|
2023-11-28 15:19:56 +08:00
|
|
|
|
Double span = Double.valueOf((acquisitionStop.getTime() - acquisitionStart.getTime()) / 1000);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
dataInfo.setSpanTime(span);
|
|
|
|
|
dataInfoList.add(dataInfo);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (CollectionUtils.isNotEmpty(sohDataList)) {
|
2024-01-03 19:30:22 +08:00
|
|
|
|
//根据台站id 开始时间查询状态数据
|
|
|
|
|
for (GardsSohData sohData : sohDataList) {
|
|
|
|
|
DataInfoVo dataInfo = new DataInfoVo();
|
|
|
|
|
dataInfo.setType("SOH");
|
|
|
|
|
dataInfo.setStatus("SOH");
|
|
|
|
|
Date startTime = sohData.getStartTime();
|
|
|
|
|
dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000));
|
|
|
|
|
dataInfo.setSpanTime(Double.valueOf(sohData.getTime()));
|
|
|
|
|
dataInfo.setEndTime(Double.valueOf((startTime.getTime() / 1000) + sohData.getTime()));
|
|
|
|
|
dataInfoList.add(dataInfo);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (CollectionUtils.isNotEmpty(metDataList)) {
|
|
|
|
|
List<GardsMetData> dataListMet = metDataList.stream().filter(item -> item.getStationId().equals(stationId)).collect(Collectors.toList());
|
|
|
|
|
if (CollectionUtils.isNotEmpty(dataListMet)) {
|
|
|
|
|
for (GardsMetData metData : dataListMet) {
|
|
|
|
|
DataInfoVo dataInfo = new DataInfoVo();
|
|
|
|
|
dataInfo.setType("MET");
|
|
|
|
|
dataInfo.setStatus("MET");
|
|
|
|
|
Date startTime = metData.getStartTime();
|
|
|
|
|
dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000));
|
|
|
|
|
Date endTime = metData.getEndTime();
|
|
|
|
|
dataInfo.setEndTime(Double.valueOf(endTime.getTime() / 1000));
|
2023-11-28 15:19:56 +08:00
|
|
|
|
Double span = Double.valueOf( (endTime.getTime() - startTime.getTime()) / 1000);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
dataInfo.setSpanTime(span);
|
|
|
|
|
dataInfoList.add(dataInfo);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
detectorData.setDataList(dataInfoList);
|
|
|
|
|
if (CollectionUtils.isNotEmpty(detectorInfoMap)) {
|
|
|
|
|
if (StringUtils.isNotBlank(detectorInfoMap.get(detectorId.toString()))) {
|
|
|
|
|
detectorData.setDetectorCode(detectorInfoMap.get(detectorId.toString()));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
detectorMap.put(String.valueOf(detectorId), detectorData);
|
|
|
|
|
detectorDataList.add(detectorMap);
|
|
|
|
|
}
|
|
|
|
|
|
2023-10-28 14:06:45 +08:00
|
|
|
|
stationMap.put(stationId, detectorDataList);
|
|
|
|
|
stationData.setStationId(stationId);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
if (CollectionUtils.isNotEmpty(stationInfoMap)) {
|
2024-01-23 15:43:23 +08:00
|
|
|
|
if (Objects.nonNull(stationInfoMap.get(stationId))) {
|
|
|
|
|
GardsStations stations = stationInfoMap.get(stationId);
|
|
|
|
|
stationData.setStationCode(stations.getStationCode());
|
2023-10-28 09:54:33 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
stationData.setDetectors(stationMap);
|
2023-10-28 14:06:45 +08:00
|
|
|
|
stationDataMap.put(stationData.getStationId(), stationData);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
}
|
2023-10-28 14:06:45 +08:00
|
|
|
|
redisUtil.set("stationDataMap", stationDataMap);
|
2023-10-28 09:54:33 +08:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
long end = System.currentTimeMillis();
|
|
|
|
|
long sleepTime = 3600000 - (end-start);
|
|
|
|
|
//如果sleepTime > 0 需要睡眠到指定时间,否则继续下次获取邮件
|
|
|
|
|
if(sleepTime > 0){
|
|
|
|
|
try {
|
|
|
|
|
//如果本次
|
|
|
|
|
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
|
|
|
|
} catch (InterruptedException e) {
|
|
|
|
|
e.printStackTrace();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|