package org.jeecg; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.CalculateStationData; import org.jeecg.common.properties.MaximumPoolSizeProperties; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.configuration.GardsStations; import org.jeecg.modules.base.enums.DetectorStatus; import org.jeecg.modules.entity.data.RateParam; import org.jeecg.modules.entity.data.StationInfo; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.io.File; import java.util.*; import java.util.concurrent.*; import java.util.stream.Collectors; @Slf4j @Component @RequiredArgsConstructor public class DataProvisionEfficiencyManager { private final RedisUtil redisUtil; private final CalculateStationData calCulateStationData; private final MaximumPoolSizeProperties maximumPoolSizeProperties; /** * 开始 */ public void start(){ ProvisionEfficiencyThreadManager manager = new ProvisionEfficiencyThreadManager(); manager.init(); manager.start(); } /** * 台站状态线程管理器 */ private class ProvisionEfficiencyThreadManager extends Thread{ private ThreadPoolExecutor poolExecutor; public void init(){ //获取机器可用核心数 int systemCores = maximumPoolSizeProperties.getStation();//Runtime.getRuntime().availableProcessors(); //初始化线程池 ThreadFactory threadFactory = new CustomizableThreadFactory("provision-efficiency-"); poolExecutor = new ThreadPoolExecutor(systemCores-1,systemCores,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory); } @Override public void run() { for(;;){ long start = System.currentTimeMillis(); // 获取所有的台站信息 HashMap stationInfoMap = (HashMap) redisUtil.get("stationInfoMap"); List detectorsUsedList = (List) redisUtil.get("detectorsUsedList"); // 获取所有的台站信息 List stations = stationInfoMap.values().stream().filter(item-> item.getLon()!=null && item.getLat()!=null && item.getStatus() != null && item.getStatus().equalsIgnoreCase(DetectorStatus.ON.getValue())).sorted(Comparator.comparing(GardsStations::getStationId)).collect(Collectors.toList()); List stationInfos = new ArrayList<>(); for (GardsStations gardsStations : stations) { StationInfo stationInfo = new StationInfo(); stationInfo.setId(gardsStations.getStationId().toString()); stationInfo.setStationCode(gardsStations.getStationCode()); stationInfo.setCountryCode(gardsStations.getCountryCode()); stationInfo.setLon(gardsStations.getLon().toString()); stationInfo.setLat(gardsStations.getLat().toString()); stationInfo.setType(gardsStations.getType()); stationInfo.setDescription(gardsStations.getDescription()); stationInfo.setStatus(gardsStations.getStatus()); boolean contains = detectorsUsedList.contains(gardsStations.getStationId()); if (contains) { stationInfo.setUsed("YES"); } else { stationInfo.setUsed("NO"); } stationInfos.add(stationInfo); } RateParam mRateParam = calCulateStationData.initParameter(); calCulateStationData.mutiThreadGetStationInfo(stationInfos,mRateParam); long end = System.currentTimeMillis(); long sleepTime = 3600000 - (end-start); //如果sleepTime > 0 需要睡眠到指定时间,否则继续下次获取邮件 if(sleepTime > 0){ try { //如果本次 TimeUnit.MILLISECONDS.sleep(sleepTime); } catch (InterruptedException e) { e.printStackTrace(); } } } } } }