package org.jeecg; import cn.hutool.core.util.StrUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.CacheName; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.configuration.GardsStations; import org.jeecg.modules.base.entity.original.GardsMetData; import org.jeecg.modules.base.entity.original.GardsSampleData; import org.jeecg.modules.base.entity.original.GardsSohData; import org.jeecg.modules.base.entity.postgre.SysUserFocusStation; import org.jeecg.modules.entity.SysUserFocusStationStation; import org.jeecg.modules.entity.data.*; import org.jeecg.modules.mapper.StationMetDataMapper; import org.jeecg.modules.mapper.StationSampleDataMapper; import org.jeecg.modules.mapper.StationSohDataMapper; import org.jeecg.modules.service.ICacheTimeService; import org.jeecg.modules.system.entity.GardsDetectorsSystem; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.*; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @Slf4j @Component @RequiredArgsConstructor public class DataReceivingStatusManager { private final RedisUtil redisUtil; private final ICacheTimeService cacheTimeService; private final StationMetDataMapper stationMetDataMapper; private final StationSampleDataMapper stationSampleDataMapper; private final StationSohDataMapper stationSohDataMapper; public void start() { ReceivingStatusThreadManager receivingStatusManager = new ReceivingStatusThreadManager(); receivingStatusManager.init(); receivingStatusManager.start(); } /** * 台站接收状态线程管理器 */ private class ReceivingStatusThreadManager extends Thread{ private ThreadPoolExecutor poolExecutor; public void init(){ //获取机器可用核心数 int systemCores = Runtime.getRuntime().availableProcessors(); //初始化线程池 ThreadFactory threadFactory = new CustomizableThreadFactory("undeal-file-parsing-"); poolExecutor = new ThreadPoolExecutor(systemCores-1,systemCores,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory); } @Override public void run() { for(;;){ long start = System.currentTimeMillis(); //获取四项缓存数据的对应内容 List> cacheList = cacheTimeService.findCacheTime(); //缓存时间 String cacheTime = ""; for (int i=0; i< cacheList.size(); i++) { if ( StringUtils.isNotBlank(cacheList.get(i).get(CacheName.cacheTime)) ) { cacheTime = cacheList.get(i).get(CacheName.cacheTime); break; } } //从redis中获取台站信息 Map stationInfoMap = (Map)redisUtil.get("stationMap"); List stationIds = stationInfoMap.keySet().stream().collect(Collectors.toList()); //从redis中获取探测器信息 Map detectorInfoMap = (Map)redisUtil.get("detectorsMap"); //声明存储所有台站id对应的数据信息的集合 List stationDataList = Objects.nonNull(redisUtil.get("stationDataList"))? (List) redisUtil.get("stationDataList") : new LinkedList<>(); //遍历台站id if (CollectionUtils.isNotEmpty(stationIds)) { //获取当前日期时间 作为结束查询时间 LocalDateTime endDate = LocalDateTime.now(); //根据缓存日期 得到开始查询时间 LocalDateTime startDate = endDate.minusDays(Integer.valueOf(cacheTime)); String startDateTime = startDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); //根据台站id,开始时间查询出台站下的气象数据 List metDataList = stationMetDataMapper.findMetDataList(stationIds, startDateTime); //根据台站id查询出当前台站下处于运行状态的数据 Map> stationDetectors = cacheTimeService.findStationDetectors(stationIds); //遍历台站id 获取台站下的探测器数据 if (CollectionUtils.isNotEmpty(stationDetectors)) { for (Integer stationId : stationIds) { Map>> stationMap = new HashMap<>(); //获取台站下对应的探测器数据 List detectors = stationDetectors.get(String.valueOf(stationId)); if (CollectionUtils.isNotEmpty(detectors)) { StationData stationData = new StationData(); //stream流获取探测器id List detectorIds = detectors.stream().map(GardsDetectorsSystem::getDetectorId).collect(Collectors.toList()); //根据探测器id 开始时间查询样品基础数据 List sampleDataList = stationSampleDataMapper.findSampleDataList(detectorIds, startDateTime); //根据台站id,探测器id,开始时间 List sohDataList = stationSohDataMapper.findSohDataList(stationId, detectorIds, startDateTime); //用于接收当前台站下所有探测器及探测器所有的样品数据,气体数据,状态数据集合 List> detectorDataList = new LinkedList<>(); for (Integer detectorId : detectorIds) { Map detectorMap = new HashMap<>(); DetectorData detectorData = new DetectorData(); detectorData.setDetectorId(detectorId); //声明数据实体实体类 根据参数存储 样品基础数据对应的数据 气体数据 状态数据 List dataInfoList = new LinkedList<>(); if (CollectionUtils.isNotEmpty(sampleDataList)) { //根据探测器id过滤出对应的样品数据 并进行遍历封装进dataInfo List dataListSample = sampleDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(dataListSample)) { for (GardsSampleData sampleData : dataListSample) { DataInfoVo dataInfo = new DataInfoVo(); //根据样品数据类型判断 数据类型 根据不同的样品数据状态 String dataType = sampleData.getDataType(); String spectralQualifie = sampleData.getSpectralQualifie(); if (StrUtil.equals(dataType, "S")) { dataInfo.setType("PHD"); if (StrUtil.equals(spectralQualifie, "PREL")) { dataInfo.setStatus("SPREL"); } else if (StrUtil.equals(spectralQualifie, "FULL")) { dataInfo.setStatus("SFULL"); } } else if (StrUtil.equals(dataType, "Q")) { dataInfo.setType("QC"); dataInfo.setStatus("QC"); } else if (StrUtil.equals(dataType, "G")) { dataInfo.setType("PHD"); if (StrUtil.equals(spectralQualifie, "PREL")) { dataInfo.setStatus("GPREL"); } else if (StrUtil.equals(spectralQualifie, "FULL")) { dataInfo.setStatus("GFULL"); } } else { continue; } //处理开始时间 Date acquisitionStart = sampleData.getAcquisitionStart(); dataInfo.setBeginTime(Double.valueOf(acquisitionStart.getTime() / 1000)); //处理结束时间 Date acquisitionStop = sampleData.getAcquisitionStop(); dataInfo.setEndTime(Double.valueOf(acquisitionStop.getTime() / 1000)); //时间间隔 Double span = Double.valueOf(acquisitionStop.getTime() / 1000) - Double.valueOf(acquisitionStart.getTime() / 1000); dataInfo.setSpanTime(span); dataInfoList.add(dataInfo); } } } if (CollectionUtils.isNotEmpty(sohDataList)) { List dataListSoh = sohDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList()); //根据探测器id 台站id 开始时间查询状态数据 if (CollectionUtils.isNotEmpty(dataListSoh)) { for (GardsSohData sohData : dataListSoh) { DataInfoVo dataInfo = new DataInfoVo(); dataInfo.setType("SOH"); dataInfo.setStatus("SOH"); Date startTime = sohData.getStartTime(); dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000)); dataInfo.setSpanTime(Double.valueOf(sohData.getTime())); dataInfoList.add(dataInfo); } } } if (CollectionUtils.isNotEmpty(metDataList)) { List dataListMet = metDataList.stream().filter(item -> item.getStationId().equals(stationId)).collect(Collectors.toList()); if (CollectionUtils.isNotEmpty(dataListMet)) { for (GardsMetData metData : dataListMet) { DataInfoVo dataInfo = new DataInfoVo(); dataInfo.setType("MET"); dataInfo.setStatus("MET"); Date startTime = metData.getStartTime(); dataInfo.setBeginTime(Double.valueOf(startTime.getTime() / 1000)); Date endTime = metData.getEndTime(); dataInfo.setEndTime(Double.valueOf(endTime.getTime() / 1000)); Double span = Double.valueOf(startTime.getTime() / 1000) - Double.valueOf(endTime.getTime() / 1000); dataInfo.setSpanTime(span); dataInfoList.add(dataInfo); } } } detectorData.setDataList(dataInfoList); if (CollectionUtils.isNotEmpty(detectorInfoMap)) { if (StringUtils.isNotBlank(detectorInfoMap.get(detectorId.toString()))) { detectorData.setDetectorCode(detectorInfoMap.get(detectorId.toString())); } } detectorMap.put(String.valueOf(detectorId), detectorData); detectorDataList.add(detectorMap); } stationMap.put(String.valueOf(stationId), detectorDataList); stationData.setStationId(String.valueOf(stationId)); if (CollectionUtils.isNotEmpty(stationInfoMap)) { if (StringUtils.isNotBlank(stationInfoMap.get(stationId))) { stationData.setStationCode(stationInfoMap.get(stationId)); } } stationData.setDetectors(stationMap); stationDataList.add(stationData); redisUtil.set("stationDataList", stationDataList); } } } } long end = System.currentTimeMillis(); long sleepTime = 3600000 - (end-start); //如果sleepTime > 0 需要睡眠到指定时间,否则继续下次获取邮件 if(sleepTime > 0){ try { //如果本次 TimeUnit.MILLISECONDS.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }