package org.jeecg; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.CacheName; import org.jeecg.common.config.mqtoken.UserTokenContext; import org.jeecg.common.properties.MaximumPoolSizeProperties; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.configuration.GardsStations; import org.jeecg.modules.base.entity.original.GardsMetData; import org.jeecg.modules.base.entity.original.GardsSampleData; import org.jeecg.modules.base.entity.original.GardsSohData; import org.jeecg.modules.base.enums.DetectorStatus; import org.jeecg.modules.entity.StationReceivingConfigStation; import org.jeecg.modules.entity.data.*; import org.jeecg.modules.mapper.StationMetDataMapper; import org.jeecg.modules.mapper.StationReceivingConfigMapper; import org.jeecg.modules.mapper.StationSampleDataMapper; import org.jeecg.modules.mapper.StationSohDataMapper; import org.jeecg.modules.service.ICacheTimeService; import org.jeecg.modules.system.entity.GardsDetectorsSystem; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.jeecg.common.util.TokenUtils.getTempToken; @Slf4j @Component @RequiredArgsConstructor public class DataReceivingStatusManager { private final RedisUtil redisUtil; private final StationMetDataMapper stationMetDataMapper; private final StationSampleDataMapper stationSampleDataMapper; private final StationSohDataMapper stationSohDataMapper; private final MaximumPoolSizeProperties maximumPoolSizeProperties; public void start() { ReceivingStatusThreadManager receivingStatusManager = new ReceivingStatusThreadManager(); receivingStatusManager.init(); receivingStatusManager.start(); } /** * 台站接收状态线程管理器 */ private class ReceivingStatusThreadManager extends Thread{ private ThreadPoolExecutor poolExecutor; public void init(){ //获取机器可用核心数 int systemCores = maximumPoolSizeProperties.getStation();//Runtime.getRuntime().availableProcessors(); //初始化线程池 ThreadFactory threadFactory = new CustomizableThreadFactory("data-receiving-status-"); poolExecutor = new ThreadPoolExecutor(systemCores-1,systemCores,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory); } @Override public void run() { for(;;){ long start = System.currentTimeMillis(); //读取缓存 String cacheTime = (String) redisUtil.get("maxCacheTime"); //从redis中获取台站信息 Map stationInfoMap = (HashMap) redisUtil.get("stationInfoMap"); // List stationIds = stationInfoMap.keySet().stream().collect(Collectors.toList()); List stationIds = new LinkedList<>(); for (Map.Entry stationInfo:stationInfoMap.entrySet()) { GardsStations infoValue = stationInfo.getValue(); if (infoValue.getStatus() != null && infoValue.getStatus().equalsIgnoreCase(DetectorStatus.ON.getValue())) { stationIds.add(infoValue.getStationId().toString()); } } //从redis中获取探测器信息 Map detectorInfoMap = (Map)redisUtil.get("detectorsMap"); Map> stationDetectorMap = (Map>) redisUtil.get("stationDetectorMap"); //声明存储所有台站id对应的数据信息的集合 Map stationDataMap = Objects.nonNull(redisUtil.get("stationDataMap"))?(Map) redisUtil.get("stationDataMap"):new HashMap<>(); //遍历台站id if (CollectionUtils.isNotEmpty(stationIds)) { //获取当前日期时间 作为结束查询时间 LocalDateTime endDate = LocalDateTime.now(); //根据缓存日期 得到开始查询时间 LocalDateTime startDate = endDate.minusDays(Integer.valueOf(cacheTime)); String startDateTime = startDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //根据台站id,开始时间查询出台站下的气象数据 List metDataList = stationMetDataMapper.findMetDataList(stationIds, startDateTime); //根据台站id查询出当前台站下处于运行状态的数据 // Map> stationDetectors = cacheTimeService.findStationDetectors(stationIds); //遍历台站id 获取台站下的探测器数据 if (CollectionUtils.isNotEmpty(stationDetectorMap)) { for (String stationId : stationIds) { Map>> stationMap = new HashMap<>(); //获取台站下对应的探测器数据 List detectors = stationDetectorMap.get(stationId); if (CollectionUtils.isNotEmpty(detectors)) { StationData stationData = new StationData(); //stream流获取探测器id List detectorIds = detectors.stream().map(GardsDetectorsSystem::getDetectorId).collect(Collectors.toList()); //根据探测器id 开始时间查询样品基础数据 List sampleDataList = stationSampleDataMapper.findSampleDataList(detectorIds, startDateTime); //探测器加上NA 无效探测器id detectorIds.add(1); //根据台站id,探测器id,开始时间 List sohDataList = stationSohDataMapper.findSohDataList(stationId, detectorIds, startDateTime); //移除掉最后一个探测器 detectorIds.remove(detectorIds.size()-1); //用于接收当前台站下所有探测器及探测器所有的样品数据,气体数据,状态数据集合 List> detectorDataList = new LinkedList<>(); for (Integer detectorId : detectorIds) { Map detectorMap = new HashMap<>(); DetectorData detectorData = new DetectorData(); detectorData.setDetectorId(detectorId); //声明数据实体实体类 根据参数存储 样品基础数据对应的数据 气体数据 状态数据 List dataInfoList = new LinkedList<>(); if (CollectionUtils.isNotEmpty(sampleDataList)) { //根据探测器id过滤出对应的样品数据 并进行遍历封装进dataInfo List dataListSample = sampleDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(dataListSample)) { for (GardsSampleData sampleData : dataListSample) { DataInfoVo dataInfo = new DataInfoVo(); //根据样品数据类型判断 数据类型 根据不同的样品数据状态 String dataType = sampleData.getDataType(); String spectralQualifie = sampleData.getSpectralQualifie(); if (StrUtil.equals(dataType, "S")) { dataInfo.setType("PHD"); if (StrUtil.equals(spectralQualifie, "PREL")) { dataInfo.setStatus("SPREL"); } else if (StrUtil.equals(spectralQualifie, "FULL")) { dataInfo.setStatus("SFULL"); } } else if (StrUtil.equals(dataType, "Q")) { dataInfo.setType("QC"); dataInfo.setStatus("QC"); } else if (StrUtil.equals(dataType, "G")) { dataInfo.setType("PHD"); if (StrUtil.equals(spectralQualifie, "PREL")) { dataInfo.setStatus("GPREL"); } else if (StrUtil.equals(spectralQualifie, "FULL")) { dataInfo.setStatus("GFULL"); } } else { continue; } if (Objects.nonNull(sampleData.getAcquisitionStart()) && Objects.nonNull(sampleData.getAcquisitionStop())) { //处理开始时间 Date acquisitionStart = sampleData.getAcquisitionStart(); dataInfo.setBeginTime(Double.valueOf(acquisitionStart.getTime() / 1000)); //处理结束时间 Date acquisitionStop = sampleData.getAcquisitionStop(); dataInfo.setEndTime(Double.valueOf(acquisitionStop.getTime() / 1000)); //时间间隔 Double span = Double.valueOf((acquisitionStop.getTime() - acquisitionStart.getTime()) / 1000); dataInfo.setSpanTime(span); dataInfoList.add(dataInfo); } } } } if (CollectionUtils.isNotEmpty(sohDataList)) { //根据台站id 开始时间查询状态数据 for (GardsSohData sohData : sohDataList) { DataInfoVo dataInfo = new DataInfoVo(); dataInfo.setType("SOH"); dataInfo.setStatus("SOH"); Date startTime = sohData.getStartTime(); dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000)); dataInfo.setSpanTime(Double.valueOf(sohData.getTime())); dataInfo.setEndTime(Double.valueOf((startTime.getTime() / 1000) + sohData.getTime())); dataInfoList.add(dataInfo); } } if (CollectionUtils.isNotEmpty(metDataList)) { List dataListMet = metDataList.stream().filter(item -> item.getStationId().equals(stationId)).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(dataListMet)) { for (GardsMetData metData : dataListMet) { DataInfoVo dataInfo = new DataInfoVo(); dataInfo.setType("MET"); dataInfo.setStatus("MET"); Date startTime = metData.getStartTime(); dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000)); Date endTime = metData.getEndTime(); dataInfo.setEndTime(Double.valueOf(endTime.getTime() / 1000)); Double span = Double.valueOf( (endTime.getTime() - startTime.getTime()) / 1000); dataInfo.setSpanTime(span); dataInfoList.add(dataInfo); } } } detectorData.setDataList(dataInfoList); if (CollectionUtils.isNotEmpty(detectorInfoMap)) { if (StringUtils.isNotBlank(detectorInfoMap.get(detectorId.toString()))) { detectorData.setDetectorCode(detectorInfoMap.get(detectorId.toString())); } } detectorMap.put(String.valueOf(detectorId), detectorData); detectorDataList.add(detectorMap); } stationMap.put(stationId, detectorDataList); stationData.setStationId(stationId); if (CollectionUtils.isNotEmpty(stationInfoMap)) { if (Objects.nonNull(stationInfoMap.get(stationId))) { GardsStations stations = stationInfoMap.get(stationId); stationData.setStationCode(stations.getStationCode()); } } stationData.setDetectors(stationMap); stationDataMap.put(stationData.getStationId(), stationData); } redisUtil.set("stationDataMap", stationDataMap); } } } long end = System.currentTimeMillis(); long sleepTime = 3600000 - (end-start); //如果sleepTime > 0 需要睡眠到指定时间,否则继续下次获取邮件 if(sleepTime > 0){ try { //如果本次 TimeUnit.MILLISECONDS.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }