AnalysisSystemForRadionuclide/jeecg-module-station-operation/src/main/java/org/jeecg/DataReceivingStatusManager.java

249 lines
16 KiB
Java
Raw Normal View History

package org.jeecg;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.CacheName;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.properties.MaximumPoolSizeProperties;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.configuration.GardsStations;
import org.jeecg.modules.base.entity.original.GardsMetData;
import org.jeecg.modules.base.entity.original.GardsSampleData;
import org.jeecg.modules.base.entity.original.GardsSohData;
import org.jeecg.modules.base.enums.DetectorStatus;
import org.jeecg.modules.entity.StationReceivingConfigStation;
import org.jeecg.modules.entity.data.*;
import org.jeecg.modules.mapper.StationMetDataMapper;
import org.jeecg.modules.mapper.StationReceivingConfigMapper;
import org.jeecg.modules.mapper.StationSampleDataMapper;
import org.jeecg.modules.mapper.StationSohDataMapper;
import org.jeecg.modules.service.ICacheTimeService;
import org.jeecg.modules.system.entity.GardsDetectorsSystem;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.jeecg.common.util.TokenUtils.getTempToken;
@Slf4j
@Component
@RequiredArgsConstructor
public class DataReceivingStatusManager {
private final RedisUtil redisUtil;
private final StationMetDataMapper stationMetDataMapper;
private final StationSampleDataMapper stationSampleDataMapper;
private final StationSohDataMapper stationSohDataMapper;
private final MaximumPoolSizeProperties maximumPoolSizeProperties;
public void start() {
ReceivingStatusThreadManager receivingStatusManager = new ReceivingStatusThreadManager();
receivingStatusManager.init();
receivingStatusManager.start();
}
/**
* 台站接收状态线程管理器
*/
private class ReceivingStatusThreadManager extends Thread{
private ThreadPoolExecutor poolExecutor;
public void init(){
//获取机器可用核心数
int systemCores = maximumPoolSizeProperties.getStation();//Runtime.getRuntime().availableProcessors();
//初始化线程池
ThreadFactory threadFactory = new CustomizableThreadFactory("data-receiving-status-");
poolExecutor = new ThreadPoolExecutor(systemCores-1,systemCores,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory);
}
@Override
public void run() {
for(;;){
long start = System.currentTimeMillis();
//读取缓存
String cacheTime = (String) redisUtil.get("maxCacheTime");
//从redis中获取台站信息
Map<String, GardsStations> stationInfoMap = (HashMap<String, GardsStations>) redisUtil.get("stationInfoMap");
List<String> stationIds = new LinkedList<>();
for (Map.Entry<String, GardsStations> stationInfo:stationInfoMap.entrySet()) {
GardsStations infoValue = stationInfo.getValue();
if (infoValue.getStatus() != null && infoValue.getStatus().equalsIgnoreCase(DetectorStatus.ON.getValue())) {
stationIds.add(infoValue.getStationId().toString());
}
}
//从redis中获取探测器信息
Map<Integer, String> detectorInfoMap = (Map<Integer, String>)redisUtil.get("detectorsMap");
Map<Integer, List<GardsDetectorsSystem>> stationDetectorMap = (Map<Integer, List<GardsDetectorsSystem>>) redisUtil.get("stationDetectorMap");
//声明存储所有台站id对应的数据信息的集合
Map<String, StationData> stationDataMap = Objects.nonNull(redisUtil.get("stationDataMap"))?(Map<String, StationData>) redisUtil.get("stationDataMap"):new HashMap<>();
//遍历台站id
if (CollectionUtils.isNotEmpty(stationIds)) {
//获取当前日期时间 作为结束查询时间
LocalDateTime endDate = LocalDateTime.now();
//根据缓存日期 得到开始查询时间
LocalDateTime startDate = endDate.minusDays(Integer.valueOf(cacheTime));
String startDateTime = startDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
//根据台站id开始时间查询出台站下的气象数据
List<GardsMetData> metDataList = stationMetDataMapper.findMetDataList(stationIds, startDateTime);
//根据台站id查询出当前台站下处于运行状态的数据
// Map<String, List<GardsDetectorsSystem>> stationDetectors = cacheTimeService.findStationDetectors(stationIds);
//遍历台站id 获取台站下的探测器数据
if (CollectionUtils.isNotEmpty(stationDetectorMap)) {
for (String stationId : stationIds) {
Map<String, List<Map<String, DetectorData>>> stationMap = new HashMap<>();
//获取台站下对应的探测器数据
List<GardsDetectorsSystem> detectors = stationDetectorMap.get(stationId);
if (CollectionUtils.isNotEmpty(detectors)) {
StationData stationData = new StationData();
//stream流获取探测器id
List<Integer> detectorIds = detectors.stream().map(GardsDetectorsSystem::getDetectorId).collect(Collectors.toList());
//根据探测器id 开始时间查询样品基础数据
List<GardsSampleData> sampleDataList = stationSampleDataMapper.findSampleDataList(detectorIds, startDateTime);
//探测器加上NA 无效探测器id
detectorIds.add(1);
//根据台站id探测器id开始时间
List<GardsSohData> sohDataList = stationSohDataMapper.findSohDataList(stationId, detectorIds, startDateTime);
//移除掉最后一个探测器
detectorIds.remove(detectorIds.size()-1);
//用于接收当前台站下所有探测器及探测器所有的样品数据,气体数据,状态数据集合
List<Map<String, DetectorData>> detectorDataList = new LinkedList<>();
for (Integer detectorId : detectorIds) {
Map<String, DetectorData> detectorMap = new HashMap<>();
DetectorData detectorData = new DetectorData();
detectorData.setDetectorId(detectorId);
//声明数据实体实体类 根据参数存储 样品基础数据对应的数据 气体数据 状态数据
List<DataInfoVo> dataInfoList = new LinkedList<>();
if (CollectionUtils.isNotEmpty(sampleDataList)) {
//根据探测器id过滤出对应的样品数据 并进行遍历封装进dataInfo
List<GardsSampleData> dataListSample = sampleDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dataListSample)) {
for (GardsSampleData sampleData : dataListSample) {
DataInfoVo dataInfo = new DataInfoVo();
//根据样品数据类型判断 数据类型 根据不同的样品数据状态
String dataType = sampleData.getDataType();
String spectralQualifie = sampleData.getSpectralQualifie();
if (StrUtil.equals(dataType, "S")) {
dataInfo.setType("PHD");
if (StrUtil.equals(spectralQualifie, "PREL")) {
dataInfo.setStatus("SPREL");
} else if (StrUtil.equals(spectralQualifie, "FULL")) {
dataInfo.setStatus("SFULL");
}
} else if (StrUtil.equals(dataType, "Q")) {
dataInfo.setType("QC");
dataInfo.setStatus("QC");
} else if (StrUtil.equals(dataType, "G")) {
dataInfo.setType("PHD");
if (StrUtil.equals(spectralQualifie, "PREL")) {
dataInfo.setStatus("GPREL");
} else if (StrUtil.equals(spectralQualifie, "FULL")) {
dataInfo.setStatus("GFULL");
}
} else {
continue;
}
//处理开始时间
Date acquisitionStart = sampleData.getAcquisitionStart();
dataInfo.setBeginTime(Double.valueOf(acquisitionStart.getTime() / 1000));
//处理结束时间
Date acquisitionStop = sampleData.getAcquisitionStop();
dataInfo.setEndTime(Double.valueOf(acquisitionStop.getTime() / 1000));
//时间间隔
Double span = Double.valueOf((acquisitionStop.getTime() - acquisitionStart.getTime()) / 1000);
dataInfo.setSpanTime(span);
dataInfoList.add(dataInfo);
}
}
}
if (CollectionUtils.isNotEmpty(sohDataList)) {
//根据台站id 开始时间查询状态数据
for (GardsSohData sohData : sohDataList) {
DataInfoVo dataInfo = new DataInfoVo();
dataInfo.setType("SOH");
dataInfo.setStatus("SOH");
Date startTime = sohData.getStartTime();
dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000));
dataInfo.setSpanTime(Double.valueOf(sohData.getTime()));
dataInfo.setEndTime(Double.valueOf((startTime.getTime() / 1000) + sohData.getTime()));
dataInfoList.add(dataInfo);
}
}
if (CollectionUtils.isNotEmpty(metDataList)) {
List<GardsMetData> dataListMet = metDataList.stream().filter(item -> item.getStationId().equals(stationId)).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dataListMet)) {
for (GardsMetData metData : dataListMet) {
DataInfoVo dataInfo = new DataInfoVo();
dataInfo.setType("MET");
dataInfo.setStatus("MET");
Date startTime = metData.getStartTime();
dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000));
Date endTime = metData.getEndTime();
dataInfo.setEndTime(Double.valueOf(endTime.getTime() / 1000));
Double span = Double.valueOf( (endTime.getTime() - startTime.getTime()) / 1000);
dataInfo.setSpanTime(span);
dataInfoList.add(dataInfo);
}
}
}
detectorData.setDataList(dataInfoList);
if (CollectionUtils.isNotEmpty(detectorInfoMap)) {
if (StringUtils.isNotBlank(detectorInfoMap.get(detectorId.toString()))) {
detectorData.setDetectorCode(detectorInfoMap.get(detectorId.toString()));
}
}
detectorMap.put(String.valueOf(detectorId), detectorData);
detectorDataList.add(detectorMap);
}
stationMap.put(stationId, detectorDataList);
stationData.setStationId(stationId);
if (CollectionUtils.isNotEmpty(stationInfoMap)) {
if (Objects.nonNull(stationInfoMap.get(stationId))) {
GardsStations stations = stationInfoMap.get(stationId);
stationData.setStationCode(stations.getStationCode());
}
}
stationData.setDetectors(stationMap);
stationDataMap.put(stationData.getStationId(), stationData);
}
redisUtil.set("stationDataMap", stationDataMap);
}
}
}
long end = System.currentTimeMillis();
long sleepTime = 3600000 - (end-start);
//如果sleepTime > 0 需要睡眠到指定时间,否则继续下次获取邮件
if(sleepTime > 0){
try {
//如果本次
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}