台站运行管理启动时增加查询所有用户最大缓存时间的方法

台站运行管理查询台站的运行状态接口根据用户设置的缓存时间进行结果切割返回
系统启动类新增加缓存台站id及台站id关联的探测器数组方法
台站运行管理数据状态线程向redis缓存过于频繁导致触发redis序列化错误问题修改
This commit is contained in:
qiaoqinzheng 2024-01-24 14:22:33 +08:00
parent 0d7000058d
commit e166a470be
14 changed files with 137 additions and 328 deletions

View File

@ -15,8 +15,10 @@ import org.jeecg.modules.base.entity.original.GardsMetData;
import org.jeecg.modules.base.entity.original.GardsSampleData;
import org.jeecg.modules.base.entity.original.GardsSohData;
import org.jeecg.modules.base.enums.DetectorStatus;
import org.jeecg.modules.entity.StationReceivingConfigStation;
import org.jeecg.modules.entity.data.*;
import org.jeecg.modules.mapper.StationMetDataMapper;
import org.jeecg.modules.mapper.StationReceivingConfigMapper;
import org.jeecg.modules.mapper.StationSampleDataMapper;
import org.jeecg.modules.mapper.StationSohDataMapper;
import org.jeecg.modules.service.ICacheTimeService;
@ -42,8 +44,6 @@ public class DataReceivingStatusManager {
private final RedisUtil redisUtil;
private final ICacheTimeService cacheTimeService;
private final StationMetDataMapper stationMetDataMapper;
private final StationSampleDataMapper stationSampleDataMapper;
@ -69,7 +69,7 @@ public class DataReceivingStatusManager {
//获取机器可用核心数
int systemCores = maximumPoolSizeProperties.getStation();//Runtime.getRuntime().availableProcessors();
//初始化线程池
ThreadFactory threadFactory = new CustomizableThreadFactory("undeal-file-parsing-");
ThreadFactory threadFactory = new CustomizableThreadFactory("data-receiving-status-");
poolExecutor = new ThreadPoolExecutor(systemCores-1,systemCores,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory);
}
@ -77,18 +77,8 @@ public class DataReceivingStatusManager {
public void run() {
for(;;){
long start = System.currentTimeMillis();
// start:生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
//获取四项缓存数据的对应内容
List<Map<String, String>> cacheList = cacheTimeService.findCacheTime();
//缓存时间
String cacheTime = "";
for (int i=0; i< cacheList.size(); i++) {
if ( StringUtils.isNotBlank(cacheList.get(i).get(CacheName.cacheTime)) ) {
cacheTime = cacheList.get(i).get(CacheName.cacheTime);
break;
}
}
//读取缓存
String cacheTime = (String) redisUtil.get("maxCacheTime");
//从redis中获取台站信息
Map<String, GardsStations> stationInfoMap = (HashMap<String, GardsStations>) redisUtil.get("stationInfoMap");
List<String> stationIds = new LinkedList<>();
@ -100,6 +90,7 @@ public class DataReceivingStatusManager {
}
//从redis中获取探测器信息
Map<Integer, String> detectorInfoMap = (Map<Integer, String>)redisUtil.get("detectorsMap");
Map<Integer, List<GardsDetectorsSystem>> stationDetectorMap = (Map<Integer, List<GardsDetectorsSystem>>) redisUtil.get("stationDetectorMap");
//声明存储所有台站id对应的数据信息的集合
Map<String, StationData> stationDataMap = Objects.nonNull(redisUtil.get("stationDataMap"))?(Map<String, StationData>) redisUtil.get("stationDataMap"):new HashMap<>();
//遍历台站id
@ -112,21 +103,21 @@ public class DataReceivingStatusManager {
//根据台站id开始时间查询出台站下的气象数据
List<GardsMetData> metDataList = stationMetDataMapper.findMetDataList(stationIds, startDateTime);
//根据台站id查询出当前台站下处于运行状态的数据
Map<String, List<GardsDetectorsSystem>> stationDetectors = cacheTimeService.findStationDetectors(stationIds);
// Map<String, List<GardsDetectorsSystem>> stationDetectors = cacheTimeService.findStationDetectors(stationIds);
//遍历台站id 获取台站下的探测器数据
if (CollectionUtils.isNotEmpty(stationDetectors)) {
if (CollectionUtils.isNotEmpty(stationDetectorMap)) {
for (String stationId : stationIds) {
Map<String, List<Map<String, DetectorData>>> stationMap = new HashMap<>();
//获取台站下对应的探测器数据
List<GardsDetectorsSystem> detectors = stationDetectors.get(stationId);
List<GardsDetectorsSystem> detectors = stationDetectorMap.get(stationId);
if (CollectionUtils.isNotEmpty(detectors)) {
StationData stationData = new StationData();
//stream流获取探测器id
List<Integer> detectorIds = detectors.stream().map(GardsDetectorsSystem::getDetectorId).collect(Collectors.toList());
//探测器加上NA 无效探测器id
detectorIds.add(1);
//根据探测器id 开始时间查询样品基础数据
List<GardsSampleData> sampleDataList = stationSampleDataMapper.findSampleDataList(detectorIds, startDateTime);
//探测器加上NA 无效探测器id
detectorIds.add(1);
//根据台站id探测器id开始时间
List<GardsSohData> sohDataList = stationSohDataMapper.findSohDataList(stationId, detectorIds, startDateTime);
//移除掉最后一个探测器

View File

@ -20,11 +20,9 @@ public class CalculateDataRateThread implements Runnable{
private ICalCulStationDataService calCulStationDataService = ApplicationContextUtil.getContext().getBean(ICalCulStationDataService.class);
private RedisUtil redisUtil = ApplicationContextUtil.getContext().getBean(RedisUtil.class);
private GetStationinfoAndDataRate stationinfoAndDataRate;
private final GetStationinfoAndDataRate stationinfoAndDataRate;
private final Map<String, StationInfo> finallySta;
private Map<String, StationInfo> finallySta;
private CountDownLatch countDownLatch;
@ -64,10 +62,8 @@ public class CalculateDataRateThread implements Runnable{
CalculateDataRate calculateDataRate = new CalculateDataRate();
calculateDataRate.setParameter(mRateparam);
//根据台站编码 查询 台站信息
// StationInfo stationInfo = calCulStationDataService.getStationInfo(originalstationsinfo.getStationCode());
StationInfo stationInfo = new StationInfo();
BeanUtils.copyProperties(originalstationsinfo, stationInfo);
// stationInfo.setUsed(calCulStationDataService.getUsed(Integer.valueOf(stationInfo.getId())));
//赋值台站信息
calculateDataRate.setMStationId(stationInfo.getId());
calculateDataRate.setMStationCode(stationInfo.getStationCode());
@ -88,7 +84,7 @@ public class CalculateDataRateThread implements Runnable{
}
//赋值最后的结果
finallySta.put(stationInfo.getStationCode(), stationInfo);
redisUtil.set("dataStationInfoList", finallySta);
// redisUtil.set("dataStationInfoList", finallySta);
}
}

View File

@ -825,18 +825,9 @@ public class CalculateStationData {
CountDownLatch countDownLatch = null;
//初始化线程数量=0
int threadNum = 0;
// //获取当前设备的理想线程数
// int idealnum = Runtime.getRuntime().availableProcessors();
//获取需要处理的台站信息数量
int works = stationInfos.size();
// // 如果需要工作处理的数量 小于 理想线程数
// if(works < idealnum) {
//将需要处理的工作数量赋值给线程数按照需要处理的工作数量处理数据
threadNum = works;
// } else {
// //否则按照理想线程数 处理数据
// threadNum = idealnum;
// }
// 如果线程数不等于0
if (threadNum != 0){
//获取机器可用核心数
@ -852,12 +843,7 @@ public class CalculateStationData {
}
};
poolExecutor = new ThreadPoolExecutor(15,maximumPoolSize,10, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(),threadFactory);
//初始化线程池
// poolExecutor = new ThreadPoolExecutor(16, 32, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
}
// else {
// return finallySta;
// }
// 线程数 等于 需要处理的台站数量
if( threadNum == works ){
countDownLatch = new CountDownLatch(threadNum);
@ -883,67 +869,7 @@ public class CalculateStationData {
poolExecutor.shutdownNow();
}
}
}
// else { // 线程数 不等于 需要处理的台站数量时
// //将台站数组按照线程数分成多组
// //声明一个变量 看当前台站信息需要分为多少组
// int Tworks = 0;
// //如果当前需要处理的台站数量是线程数量的整数倍
// if(works % threadNum == 0){
// //变量值 等于 需要处理的台站数量与线程数的商
// Tworks = works / threadNum; // 300 30 10
// } else {
// // 变量值 等于 需要处理的台站数量与线程数的商 +1
// Tworks = works / threadNum + 1; // 301 30 11
// }
// //遍历需要查询的组数
// for (int i = 0; i < Tworks; i++){
// long start = System.currentTimeMillis();
// //根据每组的台站大小 分割台站数组
// List<StationInfo> infos = new ArrayList<>();
// int startIndex = i * threadNum;
// int endIndex = ((i + 1) * threadNum);
// //判断当前结束下标是否超出台站数量 没有超出说明还在范围内 正常截取数组
// if (endIndex <= works){
// infos = stationInfos.subList(startIndex, endIndex);
// }else {//如果超出台站数量 则从截取开始下标 截取到台站数组长度
// infos = stationInfos.subList(startIndex, works);
// }
// countDownLatch = new CountDownLatch(infos.size());
// //遍历当前组的台站并进行计算
// for (int j = 0; j < infos.size(); j++){
// //获取台站信息
// StationInfo stationInfo = infos.get(j);
// //声明一个实体类
// GetStationinfoAndDataRate stationinfoAndDataRate = new GetStationinfoAndDataRate();
// stationinfoAndDataRate.setMOriginalstationsinfo(stationInfo);
// stationinfoAndDataRate.setMRateparam(mRateParam);
// CalculateDataRateThread calculateDataRateThread = new CalculateDataRateThread(finallySta, stationinfoAndDataRate, countDownLatch);
// //调用线程计算率值
// poolExecutor.execute(calculateDataRateThread);
// }
// if (i == Tworks - 1){
// try {
// countDownLatch.await();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// } finally {
// //关闭线程池
// if(poolExecutor != null) {
// poolExecutor.shutdownNow();
// }
// }
// }else {
// try {
// countDownLatch.await();
// } catch (InterruptedException e) {
// throw new RuntimeException(e);
// }
// }
// long end = System.currentTimeMillis();
// System.out.println("单次线程执行总时长:"+ (end-start));
// }
// }
// return finallySta;
redisUtil.set("dataStationInfoList", finallySta);
}
}
}

View File

@ -1,7 +1,14 @@
package org.jeecg.modules.mapper;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.jeecg.modules.entity.StationReceivingConfigStation;
@Mapper
@DS("master")
public interface StationReceivingConfigMapper extends BaseMapper<StationReceivingConfigStation> {
String findMaxCacheTime();
}

View File

@ -0,0 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="org.jeecg.modules.mapper.StationReceivingConfigMapper">
<select id="findMaxCacheTime" resultType="java.lang.String">
select
max(cache_time)
FROM
station_receiving_config
</select>
</mapper>

View File

@ -19,9 +19,6 @@ public interface ICacheTimeService {
@RequestMapping("/sys/dictItem/findCacheTime")
List<Map<String, String>> findCacheTime();
@RequestMapping("/gardsDetectors/findStationDetectors")
Map<String, List<GardsDetectorsSystem>> findStationDetectors(@RequestBody List<String> stationIds);
@RequestMapping("/sys/user/findUserByName")
SysUser findUserByName(@RequestParam String userName);

View File

@ -46,7 +46,7 @@ public interface IStationOperationService extends IService<StationOperation> {
/**
* 查询台站监测数据
* @param userId
* @param stationId
* @param oneStationId
* @return
*/
Result getDataReceivingStatus(String userId, String oneStationId);

View File

@ -34,4 +34,9 @@ public interface ISysUserFocusStationService extends IService<SysUserFocusStatio
*/
Result findUserFocusByUserId(String userId);
/**
* 更新各用户的缓存时间
*/
void cacheStationReceivingConfig();
}

View File

@ -23,7 +23,9 @@ import org.jeecg.modules.base.entity.configuration.GardsStations;
import org.jeecg.modules.base.entity.original.GardsMetData;
import org.jeecg.modules.base.entity.original.GardsSampleData;
import org.jeecg.modules.base.entity.original.GardsSohData;
import org.jeecg.modules.base.entity.postgre.StationReceivingConfig;
import org.jeecg.modules.base.entity.postgre.SysUserFocusStation;
import org.jeecg.modules.entity.StationReceivingConfigStation;
import org.jeecg.modules.entity.SysUserFocusStationStation;
import org.jeecg.modules.entity.data.*;
import org.jeecg.modules.mapper.*;
@ -36,6 +38,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.*;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.stream.Collectors;
@ -48,6 +51,9 @@ public class StationOperationServiceImpl extends ServiceImpl<StationOperationMap
@Resource
private SysUserFocusStationMapper sysUserFocusStationMapper;
@Resource
private StationReceivingConfigMapper stationReceivingConfigMapper;
@Autowired
private StationTypeUtil stationTypeUtil;
@ -384,24 +390,38 @@ public class StationOperationServiceImpl extends ServiceImpl<StationOperationMap
public Result getDataReceivingStatus(String userId, String oneStationId) {
Result result = new Result();
Map<String, StationData> stationDataMap = (Map<String, StationData>) redisUtil.get("stationDataMap");
// //获取四项缓存数据的对应内容
// List<Map<String, String>> cacheList = cacheTimeService.findCacheTime();
// //缓存时间
// String cacheTime = "";
// for (int i=0; i< cacheList.size(); i++) {
// if ( StringUtils.isNotBlank(cacheList.get(i).get(CacheName.cacheTime)) ) {
// cacheTime = cacheList.get(i).get(CacheName.cacheTime);
// break;
// }
// }
// if (StringUtils.isBlank(cacheTime)) {
// result.error500("The cache time cannot be empty");
// return result;
// }
LambdaQueryWrapper<StationReceivingConfigStation> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(StationReceivingConfig::getUserId, userId);
StationReceivingConfigStation configStation = stationReceivingConfigMapper.selectOne(queryWrapper);
Double cacheTime = configStation.getCacheTime();
//获取当前日期时间 作为结束查询时间
LocalDateTime endDate = LocalDateTime.now();
//根据缓存日期 得到开始查询时间
LocalDateTime startDate = endDate.minusDays(cacheTime.intValue());
//获取到开始时间的秒数
long startMill = startDate.toInstant(ZoneOffset.UTC).toEpochMilli();
//过滤出当前用户关注的台站信息
List<StationData> stationDataList = new LinkedList<>();
if (CollectionUtils.isNotEmpty(stationDataMap)) {
if (StringUtils.isNotBlank(oneStationId)) {
StationData stationData = stationDataMap.get(oneStationId);
if (Objects.nonNull(stationData)) {
//读取探测器的数据集合
Map<String, List<Map<String, DetectorData>>> detectors = stationData.getDetectors();
//遍历探测器的集合
for (Map.Entry<String, List<Map<String, DetectorData>>> detector:detectors.entrySet()) {
//获取探测器对应的数组
List<Map<String, DetectorData>> detectorValue = detector.getValue();
//遍历探测器数组
for (Map<String, DetectorData> detectorDataMap:detectorValue) {
for (String key :detectorDataMap.keySet()) {
DetectorData detectorData = detectorDataMap.get(key);
//通过流过滤出数据的开始时间戳在用户统计的开始时间戳之后的数据
detectorData.getDataList().stream().filter(item -> item.getBeginTime() >= startMill).collect(Collectors.toList());
}
}
}
}
stationDataList.add(stationData);
} else {
//根据用户id查询出当前用户关注的台站信息
@ -410,151 +430,30 @@ public class StationOperationServiceImpl extends ServiceImpl<StationOperationMap
List<SysUserFocusStationStation> userFocusStations = sysUserFocusStationMapper.selectList(userFocusStationQueryWrapper);
List<String> stationIds = userFocusStations.stream().map(SysUserFocusStation::getStationId).collect(Collectors.toList());
for (String stationId:stationIds) {
if (Objects.nonNull(stationDataMap.get(stationId))) {
stationDataList.add(stationDataMap.get(stationId));
StationData stationData = stationDataMap.get(stationId);
if (Objects.nonNull(stationData)) {
//读取探测器的数据集合
Map<String, List<Map<String, DetectorData>>> detectors = stationData.getDetectors();
//遍历探测器的集合
for (Map.Entry<String, List<Map<String, DetectorData>>> detector:detectors.entrySet()) {
//获取探测器对应的数组
List<Map<String, DetectorData>> detectorValue = detector.getValue();
//遍历探测器数组
for (Map<String, DetectorData> detectorDataMap:detectorValue) {
for (String key :detectorDataMap.keySet()) {
DetectorData detectorData = detectorDataMap.get(key);
//通过流过滤出数据的开始时间戳在用户统计的开始时间戳之后的数据
detectorData.getDataList().stream().filter(item -> item.getBeginTime() >= startMill).collect(Collectors.toList());
}
}
}
stationDataList.add(stationData);
}
}
}
}
// //从redis中获取台站信息
// Map<Integer, String> stationInfoMap = (Map<Integer, String>)redisUtil.get("stationMap");
// //从redis中获取探测器信息
// Map<Integer, String> detectorInfoMap = (Map<Integer, String>)redisUtil.get("detectorsMap");
// //遍历台站id
// if (CollectionUtils.isNotEmpty(stationIds)) {
// //获取当前日期时间 作为结束查询时间
// LocalDateTime endDate = LocalDateTime.now();
// //根据缓存日期 得到开始查询时间
// LocalDateTime startDate = endDate.minusDays(Integer.valueOf(cacheTime));
// String startDateTime = startDate.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
// //根据台站id开始时间查询出台站下的气象数据
// List<GardsMetData> metDataList = stationMetDataMapper.findMetDataList(stationIds, startDateTime);
// //根据台站id查询出当前台站下处于运行状态的数据
// Map<String, List<GardsDetectorsSystem>> stationDetectors = cacheTimeService.findStationDetectors(stationIds);
// //声明存储所有台站id对应的数据信息的集合
// List<StationData> stationDataList = new LinkedList<>();
// //遍历台站id 获取台站下的探测器数据
// if (CollectionUtils.isNotEmpty(stationDetectors)) {
// for (String stationId:stationIds) {
// Map<String, List<Map<String, DetectorData>>> stationMap = new HashMap<>();
// //获取台站下对应的探测器数据
// List<GardsDetectorsSystem> detectors = stationDetectors.get(stationId);
// if (CollectionUtils.isNotEmpty(detectors)) {
// StationData stationData = new StationData();
// //stream流获取探测器id
// List<Integer> detectorIds = detectors.stream().map(GardsDetectorsSystem::getDetectorId).collect(Collectors.toList());
// //根据探测器id 开始时间查询样品基础数据
// List<GardsSampleData> sampleDataList = stationSampleDataMapper.findSampleDataList(detectorIds, startDateTime);
// //根据台站id探测器id开始时间
// List<GardsSohData> sohDataList = stationSohDataMapper.findSohDataList(stationId, detectorIds, startDateTime);
// //用于接收当前台站下所有探测器及探测器所有的样品数据气体数据状态数据集合
// List<Map<String, DetectorData>> detectorDataList = new LinkedList<>();
//
// for (Integer detectorId:detectorIds) {
// Map<String, DetectorData> detectorMap = new HashMap<>();
// DetectorData detectorData = new DetectorData();
// detectorData.setDetectorId(detectorId);
// //声明数据实体实体类 根据参数存储 样品基础数据对应的数据 气体数据 状态数据
// List<DataInfoVo> dataInfoList = new LinkedList<>();
// if (CollectionUtils.isNotEmpty(sampleDataList)) {
// //根据探测器id过滤出对应的样品数据 并进行遍历封装进dataInfo
// List<GardsSampleData> dataListSample = sampleDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList());
// if (CollectionUtils.isNotEmpty(dataListSample)) {
// for (GardsSampleData sampleData:dataListSample) {
// DataInfoVo dataInfo = new DataInfoVo();
// //根据样品数据类型判断 数据类型 根据不同的样品数据状态
// String dataType = sampleData.getDataType();
// String spectralQualifie = sampleData.getSpectralQualifie();
// if (StrUtil.equals(dataType,"S")){
// dataInfo.setType("PHD");
// if (StrUtil.equals(spectralQualifie,"PREL")){
// dataInfo.setStatus("SPREL");
// } else if (StrUtil.equals(spectralQualifie,"FULL")) {
// dataInfo.setStatus("SFULL");
// }
// } else if (StrUtil.equals(dataType,"Q")){
// dataInfo.setType("QC");
// dataInfo.setStatus("QC");
// } else if (StrUtil.equals(dataType,"G")){
// dataInfo.setType("PHD");
// if (StrUtil.equals(spectralQualifie,"PREL")){
// dataInfo.setStatus("GPREL");
// } else if (StrUtil.equals(spectralQualifie,"FULL")) {
// dataInfo.setStatus("GFULL");
// }
// } else {
// continue;
// }
// //处理开始时间
// Date acquisitionStart = sampleData.getAcquisitionStart();
// dataInfo.setBeginTime(Double.valueOf(acquisitionStart.getTime()/1000));
// //处理结束时间
// Date acquisitionStop = sampleData.getAcquisitionStop();
// dataInfo.setEndTime(Double.valueOf(acquisitionStop.getTime()/1000));
// //时间间隔
// Double span = Double.valueOf(acquisitionStop.getTime()/1000) - Double.valueOf(acquisitionStart.getTime()/1000);
// dataInfo.setSpanTime(span);
// dataInfoList.add(dataInfo);
// }
// }
// }
// if (CollectionUtils.isNotEmpty(sohDataList)) {
// List<GardsSohData> dataListSoh = sohDataList.stream().filter(item -> item.getDetectorId().equals(detectorId)).collect(Collectors.toList());
// //根据探测器id 台站id 开始时间查询状态数据
// if (CollectionUtils.isNotEmpty(dataListSoh)) {
// for (GardsSohData sohData:dataListSoh) {
// DataInfoVo dataInfo = new DataInfoVo();
// dataInfo.setType("SOH");
// dataInfo.setStatus("SOH");
// Date startTime = sohData.getStartTime();
// dataInfo.setBeginTime(Double.valueOf(startTime.getTime()/1000));
// dataInfo.setSpanTime(Double.valueOf(sohData.getTime()));
// dataInfoList.add(dataInfo);
// }
// }
// }
// if (CollectionUtils.isNotEmpty(metDataList)) {
// List<GardsMetData> dataListMet = metDataList.stream().filter(item -> item.getStationId().equals(stationId)).collect(Collectors.toList());
// if (CollectionUtils.isNotEmpty(dataListMet)) {
// for (GardsMetData metData:dataListMet) {
// DataInfoVo dataInfo = new DataInfoVo();
// dataInfo.setType("MET");
// dataInfo.setStatus("MET");
// Date startTime = metData.getStartTime();
// dataInfo.setBeginTime(Double.valueOf(startTime.getTime()/1000));
// Date endTime = metData.getEndTime();
// dataInfo.setEndTime(Double.valueOf(endTime.getTime()/1000));
// Double span = Double.valueOf(startTime.getTime() / 1000) - Double.valueOf(endTime.getTime() / 1000);
// dataInfo.setSpanTime(span);
// dataInfoList.add(dataInfo);
// }
// }
// }
// detectorData.setDataList(dataInfoList);
// if (CollectionUtils.isNotEmpty(detectorInfoMap)) {
// if (StringUtils.isNotBlank(detectorInfoMap.get(detectorId.toString()))) {
// detectorData.setDetectorCode(detectorInfoMap.get(detectorId.toString()));
// }
// }
// detectorMap.put(String.valueOf(detectorId), detectorData);
// detectorDataList.add(detectorMap);
// }
//
// stationMap.put(stationId, detectorDataList);
// stationData.setStationId(stationId);
// if (CollectionUtils.isNotEmpty(stationInfoMap)) {
// if (StringUtils.isNotBlank(stationInfoMap.get(stationId))) {
// stationData.setStationCode(stationInfoMap.get(stationId));
// }
// }
// stationData.setDetectors(stationMap);
// stationDataList.add(stationData);
// }
// }
// }
result.setSuccess(true);
result.setResult(stationDataList);
// }
return result;
}
@ -565,33 +464,6 @@ public class StationOperationServiceImpl extends ServiceImpl<StationOperationMap
if (Objects.nonNull(stationInfoMap)) {
stationInfoList = stationInfoMap.values().stream().filter(Objects::nonNull).collect(Collectors.toList());
}
// // 获取所有的台站信息
// HashMap<String, GardsStations> stationInfoMap = (HashMap<String, GardsStations>) redisUtil.get("stationInfoMap");
// List<Integer> detectorsUsedList = (List<Integer>) redisUtil.get("detectorsUsedList");
// // 获取所有的台站信息
// List<GardsStations> stations = stationInfoMap.values().stream().sorted(Comparator.comparing(GardsStations::getStationId)).collect(Collectors.toList());
// List<StationInfo> stationInfos = new ArrayList<>();
// for (GardsStations gardsStations : stations) {
// StationInfo stationInfo = new StationInfo();
// stationInfo.setId(gardsStations.getStationId().toString());
// stationInfo.setStationCode(gardsStations.getStationCode());
// stationInfo.setCountryCode(gardsStations.getCountryCode());
// stationInfo.setLon(gardsStations.getLon().toString());
// stationInfo.setLat(gardsStations.getLat().toString());
// stationInfo.setType(gardsStations.getType());
// stationInfo.setDescription(gardsStations.getDescription());
// stationInfo.setStatus(gardsStations.getStatus());
// boolean contains = detectorsUsedList.contains(gardsStations.getStationId());
// if (contains) {
// stationInfo.setUsed("YES");
// } else {
// stationInfo.setUsed("NO");
// }
// stationInfos.add(stationInfo);
// }
// RateParam mRateParam = calCulateStationData.initParameter();
// List<StationInfo> stationInfoList = calCulateStationData.mutiThreadGetStationInfo(stationInfos,mRateParam);
return Result.OK(stationInfoList);
}

View File

@ -12,6 +12,7 @@ import org.jeecg.common.system.util.JwtUtil;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.entity.configuration.GardsStations;
import org.jeecg.modules.base.entity.postgre.StationReceivingConfig;
import org.jeecg.modules.base.entity.postgre.SysUser;
import org.jeecg.modules.entity.StationReceivingConfigStation;
import org.jeecg.modules.entity.SysUserFocusStationStation;
@ -27,6 +28,7 @@ import org.springframework.transaction.annotation.Transactional;
import javax.servlet.http.HttpServletRequest;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;
@Service("sysUserFocusStationService")
public class SysUserFocusStationServiceImpl extends ServiceImpl<SysUserFocusStationMapper, SysUserFocusStationStation> implements ISysUserFocusStationService {
@ -134,6 +136,8 @@ public class SysUserFocusStationServiceImpl extends ServiceImpl<SysUserFocusStat
this.saveBatch(focusStationStationList);
}
}
//重新保存后重新缓存
cacheStationReceivingConfig();
result.success("Save successfully");
return result;
}
@ -200,4 +204,11 @@ public class SysUserFocusStationServiceImpl extends ServiceImpl<SysUserFocusStat
return result;
}
@Override
public void cacheStationReceivingConfig() {
//查询用户名以及用户名关联的缓存设置
String maxCacheTime = stationReceivingConfigMapper.findMaxCacheTime();
redisUtil.set("maxCacheTime", maxCacheTime);
}
}

View File

@ -65,9 +65,4 @@ public class GardsDetectorsController {
Result result = gardsDetectorsService.deleteById(id);
return result;
}
@RequestMapping("findStationDetectors")
public Map<String, List<GardsDetectorsSystem>> findStationDetectors(@RequestBody List<String> stationIds){
return gardsDetectorsService.findStationDetectors(stationIds);
}
}

View File

@ -57,11 +57,4 @@ public interface IGardsDetectorsService extends IService<GardsDetectorsSystem> {
* @return
*/
void findDetectors();
/**
* 根据台站id查询对应的探测器数据信息
* @param stationIds
* @return
*/
Map<String, List<GardsDetectorsSystem>> findStationDetectors(List<String> stationIds);
}

View File

@ -27,6 +27,7 @@ import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service("gardsDetectorsService")
@ -147,26 +148,26 @@ public class GardsDetectorsServiceImpl extends ServiceImpl<GardsDetectorsMapper,
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void findDetectors(){
redisUtil.del("detectorsMap", "detectorsUsedList");
redisUtil.del("detectorsMap", "detectorsUsedList", "stationDetectorMap");
//查询探测器信息
List<GardsDetectorsSystem> gardsDetectors = this.baseMapper.selectList(new LambdaQueryWrapper<>());
//根据台站id获取对应的探测集合
Map<Integer, List<GardsDetectorsSystem>> stationDetectorMap = new HashMap<>();
for (GardsDetectorsSystem detectorsSystem: gardsDetectors) {
if (detectorsSystem.getStationId() != null) {
List<GardsDetectorsSystem> systemList = CollectionUtils.isNotEmpty(stationDetectorMap.get(detectorsSystem.getStationId()))?stationDetectorMap.get(detectorsSystem.getStationId()):new LinkedList<>();
if (detectorsSystem.getStatus() != null && detectorsSystem.getStatus().trim().equals(DetectorStatus.ON.getValue())) {
systemList.add(detectorsSystem);
stationDetectorMap.put(detectorsSystem.getStationId(), systemList);
}
}
}
//生成一个探测器id探测器编码的map进行缓存
Map<Integer, String> detectorsMap = gardsDetectors.stream().collect(Collectors.toMap(GardsDetectorsSystem::getDetectorId, GardsDetectorsSystem::getDetectorCode));
//过滤出使用状态的探测器相关的台站信息
List<Integer> detectorsUsedList = gardsDetectors.stream().filter(item -> item.getStatus()!=null && DetectorStatus.ON.getValue().equals(item.getStatus().trim())).map(GardsDetectorsSystem::getStationId).collect(Collectors.toList());
redisUtil.set("detectorsMap",detectorsMap);
redisUtil.set("detectorsUsedList", detectorsUsedList);
}
@Override
public Map<String, List<GardsDetectorsSystem>> findStationDetectors(List<String> stationIds) {
Map<String, List<GardsDetectorsSystem>> map = new HashMap<>();
if (CollectionUtils.isNotEmpty(stationIds)){
LambdaQueryWrapper<GardsDetectorsSystem> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.in(GardsDetectorsSystem::getStationId, stationIds);
List<GardsDetectorsSystem> detectorsList = this.baseMapper.selectList(queryWrapper);
for (String stationId:stationIds) {
List<GardsDetectorsSystem> detectors = detectorsList.stream().filter(item -> item.getStationId()!=null && item.getStationId().equals(Integer.valueOf(stationId)) && item.getStatus()!=null && item.getStatus().trim().equals(DetectorStatus.ON.getValue())).collect(Collectors.toList());
map.put(stationId, detectors);
}
}
return map;
redisUtil.set("stationDetectorMap", stationDetectorMap);
}
}

View File

@ -3,6 +3,7 @@ package org.jeecg;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.service.ISysUserFocusStationService;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -23,6 +24,7 @@ import java.net.UnknownHostException;
@RequiredArgsConstructor
public class JeecgStationOperationApplication extends SpringBootServletInitializer implements CommandLineRunner {
private final ISysUserFocusStationService sysUserFocusStationService;
private final DataProvisionEfficiencyManager dataProvisionEfficiencyManager;
private final DataReceivingStatusManager dataReceivingStatusManager;
@ -48,6 +50,7 @@ public class JeecgStationOperationApplication extends SpringBootServletInitializ
@Override
public void run(String... args) throws Exception {
sysUserFocusStationService.cacheStationReceivingConfig();
dataProvisionEfficiencyManager.start();
dataReceivingStatusManager.start();
}