AnalysisSystemForRadionuclide/jeecg-module-station-operation/src/main/java/org/jeecg/DataReceivingStatusManager.java
qiaoqinzheng f1ca1b2593 台站运行管理SOH数据查询属于台站级别的查询,不再区分具体的探测器
台站运行管理SOH数据结束时间计算方式通过开始时间+时间间隔的形式计算每一个健康状态的独立结束时间解决前端无法画出SOH色块问题
2024-01-03 19:30:22 +08:00

246 lines
15 KiB
Java
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package org.jeecg;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.CacheName;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.original.GardsMetData;
import org.jeecg.modules.base.entity.original.GardsSampleData;
import org.jeecg.modules.base.entity.original.GardsSohData;
import org.jeecg.modules.entity.data.*;
import org.jeecg.modules.mapper.StationMetDataMapper;
import org.jeecg.modules.mapper.StationSampleDataMapper;
import org.jeecg.modules.mapper.StationSohDataMapper;
import org.jeecg.modules.service.ICacheTimeService;
import org.jeecg.modules.system.entity.GardsDetectorsSystem;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.jeecg.common.util.TokenUtils.getTempToken;
@Slf4j
@Component
@RequiredArgsConstructor
public class DataReceivingStatusManager {
private final RedisUtil redisUtil;
private final ICacheTimeService cacheTimeService;
private final StationMetDataMapper stationMetDataMapper;
private final StationSampleDataMapper stationSampleDataMapper;
private final StationSohDataMapper stationSohDataMapper;
public void start() {
ReceivingStatusThreadManager receivingStatusManager = new ReceivingStatusThreadManager();
receivingStatusManager.init();
receivingStatusManager.start();
}
/**
* 台站接收状态线程管理器
*/
private class ReceivingStatusThreadManager extends Thread{
private ThreadPoolExecutor poolExecutor;
public void init(){
//获取机器可用核心数
int systemCores = Runtime.getRuntime().availableProcessors();
//初始化线程池
ThreadFactory threadFactory = new CustomizableThreadFactory("undeal-file-parsing-");
poolExecutor = new ThreadPoolExecutor(systemCores-1,systemCores,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory);
}
@Override
public void run() {
for(;;){
long start = System.currentTimeMillis();
// start:生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
//获取四项缓存数据的对应内容
List<Map<String, String>> cacheList = cacheTimeService.findCacheTime();
//缓存时间
String cacheTime = "";
for (int i=0; i< cacheList.size(); i++) {
if ( StringUtils.isNotBlank(cacheList.get(i).get(CacheName.cacheTime)) ) {
cacheTime = cacheList.get(i).get(CacheName.cacheTime);
break;
}
}
//从redis中获取台站信息
Map<String, String> stationInfoMap = (Map<String, String>)redisUtil.get("stationMap");
List<String> stationIds = stationInfoMap.keySet().stream().collect(Collectors.toList());
//从redis中获取探测器信息
Map<Integer, String> detectorInfoMap = (Map<Integer, String>)redisUtil.get("detectorsMap");
//声明存储所有台站id对应的数据信息的集合
Map<String, StationData> stationDataMap = Objects.nonNull(redisUtil.get("stationDataMap"))?(Map<String, StationData>) redisUtil.get("stationDataMap"):new HashMap<>();
//遍历台站id
if (CollectionUtils.isNotEmpty(stationIds)) {
//获取当前日期时间 作为结束查询时间
LocalDateTime endDate = LocalDateTime.now();
//根据缓存日期 得到开始查询时间
LocalDateTime startDate = endDate.minusDays(Integer.valueOf(cacheTime));
String startDateTime = startDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
//根据台站id开始时间查询出台站下的气象数据
List<GardsMetData> metDataList = stationMetDataMapper.findMetDataList(stationIds, startDateTime);
//根据台站id查询出当前台站下处于运行状态的数据
Map<String, List<GardsDetectorsSystem>> stationDetectors = cacheTimeService.findStationDetectors(stationIds);
//遍历台站id 获取台站下的探测器数据
if (CollectionUtils.isNotEmpty(stationDetectors)) {
for (String stationId : stationIds) {
Map<String, List<Map<String, DetectorData>>> stationMap = new HashMap<>();
//获取台站下对应的探测器数据
List<GardsDetectorsSystem> detectors = stationDetectors.get(stationId);
if (CollectionUtils.isNotEmpty(detectors)) {
StationData stationData = new StationData();
//stream流获取探测器id
List<Integer> detectorIds = detectors.stream().map(GardsDetectorsSystem::getDetectorId).collect(Collectors.toList());
//探测器加上NA 无效探测器id
detectorIds.add(1);
//根据探测器id 开始时间查询样品基础数据
List<GardsSampleData> sampleDataList = stationSampleDataMapper.findSampleDataList(detectorIds, startDateTime);
//根据台站id探测器id开始时间
List<GardsSohData> sohDataList = stationSohDataMapper.findSohDataList(stationId, detectorIds, startDateTime);
//移除掉最后一个探测器
detectorIds.remove(detectorIds.size()-1);
//用于接收当前台站下所有探测器及探测器所有的样品数据,气体数据,状态数据集合
List<Map<String, DetectorData>> detectorDataList = new LinkedList<>();
for (Integer detectorId : detectorIds) {
Map<String, DetectorData> detectorMap = new HashMap<>();
DetectorData detectorData = new DetectorData();
detectorData.setDetectorId(detectorId);
//声明数据实体实体类 根据参数存储 样品基础数据对应的数据 气体数据 状态数据
List<DataInfoVo> dataInfoList = new LinkedList<>();
if (CollectionUtils.isNotEmpty(sampleDataList)) {
//根据探测器id过滤出对应的样品数据 并进行遍历封装进dataInfo
List<GardsSampleData> dataListSample = sampleDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dataListSample)) {
for (GardsSampleData sampleData : dataListSample) {
DataInfoVo dataInfo = new DataInfoVo();
//根据样品数据类型判断 数据类型 根据不同的样品数据状态
String dataType = sampleData.getDataType();
String spectralQualifie = sampleData.getSpectralQualifie();
if (StrUtil.equals(dataType, "S")) {
dataInfo.setType("PHD");
if (StrUtil.equals(spectralQualifie, "PREL")) {
dataInfo.setStatus("SPREL");
} else if (StrUtil.equals(spectralQualifie, "FULL")) {
dataInfo.setStatus("SFULL");
}
} else if (StrUtil.equals(dataType, "Q")) {
dataInfo.setType("QC");
dataInfo.setStatus("QC");
} else if (StrUtil.equals(dataType, "G")) {
dataInfo.setType("PHD");
if (StrUtil.equals(spectralQualifie, "PREL")) {
dataInfo.setStatus("GPREL");
} else if (StrUtil.equals(spectralQualifie, "FULL")) {
dataInfo.setStatus("GFULL");
}
} else {
continue;
}
//处理开始时间
Date acquisitionStart = sampleData.getAcquisitionStart();
dataInfo.setBeginTime(Double.valueOf(acquisitionStart.getTime() / 1000));
//处理结束时间
Date acquisitionStop = sampleData.getAcquisitionStop();
dataInfo.setEndTime(Double.valueOf(acquisitionStop.getTime() / 1000));
//时间间隔
Double span = Double.valueOf((acquisitionStop.getTime() - acquisitionStart.getTime()) / 1000);
dataInfo.setSpanTime(span);
dataInfoList.add(dataInfo);
}
}
}
if (CollectionUtils.isNotEmpty(sohDataList)) {
//根据台站id 开始时间查询状态数据
for (GardsSohData sohData : sohDataList) {
DataInfoVo dataInfo = new DataInfoVo();
dataInfo.setType("SOH");
dataInfo.setStatus("SOH");
Date startTime = sohData.getStartTime();
dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000));
dataInfo.setSpanTime(Double.valueOf(sohData.getTime()));
dataInfo.setEndTime(Double.valueOf((startTime.getTime() / 1000) + sohData.getTime()));
dataInfoList.add(dataInfo);
}
}
if (CollectionUtils.isNotEmpty(metDataList)) {
List<GardsMetData> dataListMet = metDataList.stream().filter(item -> item.getStationId().equals(stationId)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dataListMet)) {
for (GardsMetData metData : dataListMet) {
DataInfoVo dataInfo = new DataInfoVo();
dataInfo.setType("MET");
dataInfo.setStatus("MET");
Date startTime = metData.getStartTime();
dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000));
Date endTime = metData.getEndTime();
dataInfo.setEndTime(Double.valueOf(endTime.getTime() / 1000));
Double span = Double.valueOf( (endTime.getTime() - startTime.getTime()) / 1000);
dataInfo.setSpanTime(span);
dataInfoList.add(dataInfo);
}
}
}
detectorData.setDataList(dataInfoList);
if (CollectionUtils.isNotEmpty(detectorInfoMap)) {
if (StringUtils.isNotBlank(detectorInfoMap.get(detectorId.toString()))) {
detectorData.setDetectorCode(detectorInfoMap.get(detectorId.toString()));
}
}
detectorMap.put(String.valueOf(detectorId), detectorData);
detectorDataList.add(detectorMap);
}
stationMap.put(stationId, detectorDataList);
stationData.setStationId(stationId);
if (CollectionUtils.isNotEmpty(stationInfoMap)) {
if (StringUtils.isNotBlank(stationInfoMap.get(stationId))) {
stationData.setStationCode(stationInfoMap.get(stationId));
}
}
stationData.setDetectors(stationMap);
stationDataMap.put(stationData.getStationId(), stationData);
}
redisUtil.set("stationDataMap", stationDataMap);
}
}
}
long end = System.currentTimeMillis();
long sleepTime = 3600000 - (end-start);
//如果sleepTime > 0 需要睡眠到指定时间,否则继续下次获取邮件
if(sleepTime > 0){
try {
//如果本次
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}