diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailLogManager.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailLogManager.java index 668bf016..f947be89 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailLogManager.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailLogManager.java @@ -1,12 +1,11 @@ package org.jeecg.common.email; import cn.hutool.core.io.FileUtil; -import com.baomidou.mybatisplus.core.toolkit.StringPool; import lombok.Setter; import org.jeecg.common.constant.StringConstant; import org.jeecg.common.properties.SpectrumPathProperties; import org.jeecg.common.util.DateUtils; - +import org.springframework.util.CollectionUtils; import java.io.File; import java.time.LocalDateTime; import java.util.*; @@ -44,11 +43,6 @@ public class EmailLogManager { @Setter private EmailLogEvent getAllIdLogEvent = null; - /** - * 完成解析邮件流程的线程id集合 - */ - private LinkedList completeThreadIds = new LinkedList<>(); - /** * 线程邮件日志队列 */ @@ -64,9 +58,6 @@ public class EmailLogManager { public EmailLogManager(SpectrumPathProperties spectrumPathProperties) { this.spectrumPathProperties = spectrumPathProperties; - EmailLogExecThread logExecThread = new EmailLogExecThread(); - logExecThread.setName("email-get-exec-thread"); - logExecThread.start(); } /** @@ -82,13 +73,9 @@ public class EmailLogManager { * @param event */ public void offer(Long threadId,EmailLogEvent event){ - synchronized (completeThreadIds){ + synchronized (queue){ if(queue.containsKey(threadId)){ queue.get(threadId).offer(event); - if(EmailLogManager.DONE.equals(event.getLogProcess())){ - completeThreadIds.offer(threadId); - completeThreadIds.notify(); - } }else{ LinkedList logEventList = new LinkedList<>(); logEventList.offer(event); @@ -97,61 +84,39 @@ public class EmailLogManager { } } + /** + * 清空队列日志 + */ + public void clear(){ + synchronized (queue){ + this.setConnectLogEvent(null); + this.setGetAllIdLogEvent(null); + this.queue.clear(); + } + } + /** * 获取日志事件 * @return * @throws InterruptedException */ - public LinkedList take(Long threadId) throws InterruptedException { - synchronized (completeThreadIds){ - final LinkedList logEventList = queue.remove(threadId); - return logEventList; - } - } - - /** - * 获取解析邮件完成的线程id - * @return - * @throws InterruptedException - */ - public Long getCompleteThreadId() throws InterruptedException { - synchronized (completeThreadIds){ - if(completeThreadIds.isEmpty()){ - completeThreadIds.wait(); - return null; - } - final Long threadId = completeThreadIds.removeFirst(); - return threadId; - } - } - - /** - * 邮件日志执行线程 - */ - private class EmailLogExecThread extends Thread{ - - @Override - public void run() { - for(;;){ - try { - final Long threadId = EmailLogManager.getInstance().getCompleteThreadId(); - if(Objects.nonNull(threadId)){ - final LinkedList logEventList = EmailLogManager.getInstance().take(threadId); - if(Objects.nonNull(getAllIdLogEvent)){ - logEventList.addFirst(getAllIdLogEvent); - } - if(Objects.nonNull(connectLogEvent)){ - logEventList.addFirst(connectLogEvent); - } - List logContentList = new ArrayList<>(); - logEventList.forEach(logEvent->{ - final String logContent = getLogContent(logEvent); - logContentList.add(logContent); - }); - writeLog(GS_TYPE_GET,logContentList); + public void writeLog(Long threadId){ + synchronized (queue){ + if(queue.containsKey(threadId)){ + final LinkedList logEventList = queue.remove(threadId); + if(!CollectionUtils.isEmpty(logEventList)){ + if(Objects.nonNull(getAllIdLogEvent)){ + logEventList.addFirst(getAllIdLogEvent); } - } catch (InterruptedException e) { - e.printStackTrace(); + if(Objects.nonNull(connectLogEvent)){ + logEventList.addFirst(connectLogEvent); + } + List logContentList = new ArrayList<>(); + logEventList.forEach(logEvent->{ + final String logContent = this.getLogContent(logEvent); + logContentList.add(logContent); + }); + this.write(GS_TYPE_GET,logContentList); } } } @@ -339,7 +304,7 @@ public class EmailLogManager { /** * 把日志写入文件 */ - private void writeLog(String gsType, List logContentList){ + private void write(String gsType, List logContentList){ LocalDateTime now = LocalDateTime.now(); StringBuilder logFilePath = new StringBuilder(); logFilePath.append(spectrumPathProperties.getRootPath()); diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java index 6c0a7a86..61b5cc1c 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java @@ -223,6 +223,7 @@ public class EmailServiceManager { // props.put("mail.imap.starttls.enable", "true"); Session session = Session.getInstance(props, new Authenticator() { + @Override protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(username, password); } @@ -286,6 +287,7 @@ public class EmailServiceManager { // props.put("mail.smtp.starttls.enable", "true"); Session session = Session.getInstance(props, new Authenticator() { + @Override protected PasswordAuthentication getPasswordAuthentication() { return new PasswordAuthentication(username, password); } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java index 741d3f01..484606d3 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java @@ -63,9 +63,8 @@ public class EmailParsingActuator extends Thread{ }catch (InterruptedException e) { e.printStackTrace(); }finally { - //每批次连接关闭后清空邮箱全局日志 - EmailLogManager.getInstance().setConnectLogEvent(null); - EmailLogManager.getInstance().setGetAllIdLogEvent(null); + //清除本批次日志缓存 + EmailLogManager.getInstance().clear(); //关闭资源 emailServiceManager.close(); } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/ParsingProcessLog.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/ParsingProcessLog.java index 425c56e8..cac80365 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/ParsingProcessLog.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/ParsingProcessLog.java @@ -135,7 +135,12 @@ public class ParsingProcessLog extends AbstractAutoLogOrReport{ storageLog.append(System.lineSeparator()).append(System.lineSeparator()); storageLog.append(rowFormat(INSTANCE_STATUS,spectrumHandler.status)); storageLog.append(System.lineSeparator()).append(System.lineSeparator()); - storageLog.append(titleFormat(WRITE_INTO_SUCCESS,19,StringConstant.DASH,endIntoDatabaseTime,StringConstant.DASH)); + //判断是否有文件重复 或 gas,det文件不存在的情况 或 分析数据存储失败 + if (fileRepeat || fileNotExist || analysisDataStoreFlag) { + storageLog.append(titleFormat(WRITE_INTO_ERROR,19,StringConstant.DASH,endIntoDatabaseTime,StringConstant.DASH)); + } else { + storageLog.append(titleFormat(WRITE_INTO_SUCCESS,19,StringConstant.DASH,endIntoDatabaseTime,StringConstant.DASH)); + } storageLog.append(System.lineSeparator()).append(System.lineSeparator()).append(System.lineSeparator()); } } @@ -171,8 +176,8 @@ public class ParsingProcessLog extends AbstractAutoLogOrReport{ this.NCC_analysis(); } - if(analysisDataStoreFlag == true){ - storeFlag = "error"; + if(fileRepeat || fileNotExist || analysisDataStoreFlag){ + storeFlag = "Error"; } analysisLog.append(rowFormat(storeResult,storeFlag)); analysisLog.append(System.lineSeparator()).append(System.lineSeparator()); diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/Sample_G_Analysis.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/Sample_G_Analysis.java index 02aea1b5..c491aaf7 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/Sample_G_Analysis.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/Sample_G_Analysis.java @@ -23,7 +23,9 @@ import org.jeecg.modules.base.bizVo.AttributeItemVo; import org.jeecg.modules.base.dto.*; import org.jeecg.modules.base.entity.original.GardsSampleData; import org.jeecg.modules.base.entity.rnauto.*; +import org.jeecg.modules.base.enums.DSType; import org.jeecg.modules.base.enums.MiddleDataType; +import org.jeecg.modules.base.enums.SpectrumType; import org.jeecg.modules.config.datasource.DataSourceSwitcher; import org.jeecg.modules.entity.vo.*; import org.jeecg.modules.exception.GAnalyseException; @@ -34,8 +36,14 @@ import org.springframework.transaction.TransactionStatus; import java.io.File; import java.io.IOException; import java.lang.reflect.Field; +import java.text.ParseException; +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.*; +import static org.jeecg.modules.service.BlockConstant.XE_131m; + @Data @Slf4j public class Sample_G_Analysis { @@ -169,6 +177,9 @@ public class Sample_G_Analysis { File.separator + reportName; // 保存文件 FileOperation.saveOrAppendFile(savePath, reportContent, false); + //发送数据到redis + middleData.setSample_id(String.valueOf(sampleId)); + pushToRedis(middleData); }catch (Exception e){ e.printStackTrace(); log.error("Sample_G_Analysis", e); @@ -185,6 +196,32 @@ public class Sample_G_Analysis { log.info("Gamma自动处理分析--End"); } + /** + * 分析成功数据发送到Redis + */ + private void pushToRedis(GStoreMiddleProcessData middleData){ + try { + Info info = new Info(); + info.setStationId(this.sampleData.getStationId().toString()); + info.setSampleId(middleData.sample_id); + info.setSampleName(middleData.analyses_save_filePath.substring(middleData.analyses_save_filePath.lastIndexOf(StringPool.SLASH)+1)); + final Instant instant = DateUtils.parseDate(middleData.sample_collection_start).toInstant(); + final LocalDateTime collectTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime(); + info.setCollectionDate(collectTime); + info.setDatasource(DSType.ARMDARR.getType()); + info.setFullOrPrel(this.sampleData.getSpectralQualifie()); + info.setBetaOrGamma(SpectrumType.GAMMA.getType()); + Map nuclides = Maps.newHashMap(); + for (int i=0; i qcItems){ //如果数据已经存储,不在重复存储 final Integer idAnalysis = serviceQuotes.getAnalysesService().getIdAnalysis(this.sampleData.getSampleId()); @@ -226,6 +263,7 @@ public class Sample_G_Analysis { DataSourceSwitcher.clearDataSource(); } } + /** * 生成日志文件 * @param logFilePath diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java index 3cb25246..d36894e3 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java @@ -87,13 +87,19 @@ public class SpectrumParsingActuator implements Runnable{ log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject); } } + emailServiceManager.removeMail(message); } catch (Exception e) { + emailServiceManager.removeMail(message); e.printStackTrace(); }finally { - emailServiceManager.removeMail(message); - EmailLogEvent expungeEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,EmailLogManager.DONE); - EmailLogManager.getInstance().offer(Thread.currentThread().getId(),expungeEvent); - this.taskLatch.countDown(); + try { + EmailLogEvent expungeEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,EmailLogManager.DONE); + EmailLogManager.getInstance().offer(Thread.currentThread().getId(),expungeEvent); + + EmailLogManager.getInstance().writeLog(Thread.currentThread().getId()); + }finally { + this.taskLatch.countDown(); + } } } diff --git a/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/entity/vo/BetaDataFile.java b/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/entity/vo/BetaDataFile.java index 6f0b6ab1..5a2b3936 100644 --- a/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/entity/vo/BetaDataFile.java +++ b/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/entity/vo/BetaDataFile.java @@ -39,6 +39,8 @@ public class BetaDataFile implements Serializable { private String sampleId; + private String stationId; + private boolean bProcessed; private boolean saveAnalysisResult; @@ -225,6 +227,7 @@ public class BetaDataFile implements Serializable { qcFilePathName = ""; qcFileName = ""; sampleId = ""; + stationId = ""; bProcessed = false; saveAnalysisResult = false; diff --git a/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/service/impl/GammaServiceImpl.java b/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/service/impl/GammaServiceImpl.java index 7cd44626..d07cfc7a 100644 --- a/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/service/impl/GammaServiceImpl.java +++ b/jeecg-module-spectrum-analysis/src/main/java/org/jeecg/modules/service/impl/GammaServiceImpl.java @@ -14,6 +14,7 @@ import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.StringPool; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -31,16 +32,14 @@ import org.jeecg.common.system.vo.LoginUser; import org.jeecg.common.util.*; import org.jeecg.modules.base.abstracts.AbstractLogOrReport; import org.jeecg.modules.base.bizVo.GammaRLR; +import org.jeecg.modules.base.dto.Info; import org.jeecg.modules.base.dto.NuclideActMdaDto; import org.jeecg.modules.base.dto.PeakInfoDto; import org.jeecg.modules.base.entity.configuration.GardsNuclLib; import org.jeecg.modules.base.entity.configuration.GardsNuclLinesLib; import org.jeecg.modules.base.entity.postgre.SysUser; import org.jeecg.modules.base.entity.rnman.GardsAnalySetting; -import org.jeecg.modules.base.enums.CalName; -import org.jeecg.modules.base.enums.CalType; -import org.jeecg.modules.base.enums.ExportTemplate; -import org.jeecg.modules.base.enums.RoleType; +import org.jeecg.modules.base.enums.*; import org.jeecg.modules.entity.vo.*; import org.jeecg.modules.entity.*; import org.jeecg.modules.mapper.SpectrumAnalysisMapper; @@ -68,7 +67,9 @@ import java.sql.Statement; import java.text.DecimalFormat; import java.text.NumberFormat; import java.text.ParseException; +import java.time.Instant; import java.time.LocalDateTime; +import java.time.ZoneId; import java.util.*; import java.util.stream.Collectors; @@ -139,6 +140,8 @@ public class GammaServiceImpl extends AbstractLogOrReport implements IGammaServi private IGardsQcCheckSpectrumService qcCheckSpectrumService; @Autowired private IGardsAnalySettingSpectrumService analySettingSpectrumService; + @Autowired + private RedisStreamUtil redisStreamUtil; @Override public Result initValue(Integer sampleId, String dbName, String analyst, String samfileName, HttpServletRequest request) { @@ -4588,11 +4591,41 @@ public class GammaServiceImpl extends AbstractLogOrReport implements IGammaServi //更新detial Information List detailedInfo = gammaFileUtil.DetailedInfo(phd.getId_sample(), phd); map.put("DetailedInformation", detailedInfo); + //发送数据到redis + middleData.setSample_stationID(String.valueOf(stationId)); + middleData.setSample_id(String.valueOf(phd.getId_sample())); + pushToRedis(middleData, phd); result.setSuccess(true); result.setResult(map); return result; } + /** + * 分析成功数据发送到Redis + */ + private void pushToRedis(GStoreMiddleProcessData middleData, PHDFile phd){ + try { + Info info = new Info(); + info.setStationId(middleData.sample_stationID); + info.setSampleId(middleData.sample_id); + info.setSampleName(middleData.analyses_save_filePath.substring(middleData.analyses_save_filePath.lastIndexOf(StringPool.SLASH)+1)); + final Instant instant = DateUtils.parseDate(middleData.sample_collection_start).toInstant(); + final LocalDateTime collectTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime(); + info.setCollectionDate(collectTime); + info.setDatasource(DSType.ARMDRRR.getType()); + info.setFullOrPrel(phd.getHeader().getSpectrum_quantity()); + info.setBetaOrGamma(SpectrumType.GAMMA.getType()); + Map nuclides = Maps.newHashMap(); + for (int i=0; i userStation = userTaskUtil.findUserStation(userName); boolean bAnalysisResultWriteAuthority = false; @@ -3865,11 +3871,48 @@ public class SpectrumAnalysisServiceImpl extends AbstractLogOrReport implements betaDataFile.setQcSpectrumData(qcData); map.put("qc", qcData); } + //发送数据到redis + pushToRedis(betaDataFile); result.setSuccess(true); result.setResult(map); return result; } + /** + * 分析成功数据发送到Redis + */ + private void pushToRedis(BetaDataFile betaDataFile){ + try { + Info info = new Info(); + info.setStationId(betaDataFile.getStationId()); + info.setSampleId(betaDataFile.getSampleId()); + info.setSampleName(betaDataFile.getSampleFileName()); + final Instant instant = DateUtils.parseDate(betaDataFile.getSampleStruct().collection_start_date+ StringPool.SPACE+betaDataFile.getSampleStruct().collection_start_time).toInstant(); + final LocalDateTime collectTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime(); + info.setCollectionDate(collectTime); + info.setDatasource(DSType.ARMDRRR.getType()); + info.setFullOrPrel(betaDataFile.getSampleStruct().spectrum_quantity); + info.setBetaOrGamma(SpectrumType.BETA.getType()); + Map nuclides = Maps.newHashMap(); + for (int i=0; i< betaDataFile.getXeDataList().size(); i++) { + GardsXeResults xeResults = betaDataFile.getXeDataList().get(i); + if (xeResults.getNuclideName().equalsIgnoreCase(XeNuclideName.XE_131m.getType())) { + nuclides.put(XeNuclideName.XE_131m.getType(), String.valueOf(xeResults.getConc())); + } else if (xeResults.getNuclideName().equalsIgnoreCase(XeNuclideName.XE_133.getType())) { + nuclides.put(XeNuclideName.XE_133.getType(), String.valueOf(xeResults.getConc())); + } else if (xeResults.getNuclideName().equalsIgnoreCase(XeNuclideName.XE_133m.getType())) { + nuclides.put(XeNuclideName.XE_133m.getType(), String.valueOf(xeResults.getConc())); + } else if (xeResults.getNuclideName().equalsIgnoreCase(XeNuclideName.XE_135.getType())) { + nuclides.put(XeNuclideName.XE_135.getType(), String.valueOf(xeResults.getConc())); + } + } + info.setNuclides(nuclides); + redisStreamUtil.pushAnalysis(info); + } catch (ParseException e) { + throw new RuntimeException(e); + } + } + /** * 获取需要导出的数据 (saveToHtml|saveToTxt|saveToExcel) */ diff --git a/jeecg-module-station-operation/src/main/java/org/jeecg/modules/service/impl/StationOperationServiceImpl.java b/jeecg-module-station-operation/src/main/java/org/jeecg/modules/service/impl/StationOperationServiceImpl.java index feb2fec6..1bc071de 100644 --- a/jeecg-module-station-operation/src/main/java/org/jeecg/modules/service/impl/StationOperationServiceImpl.java +++ b/jeecg-module-station-operation/src/main/java/org/jeecg/modules/service/impl/StationOperationServiceImpl.java @@ -43,27 +43,12 @@ public class StationOperationServiceImpl extends ServiceImpl stationInfoMap = (Map) redisUtil.get("dataStationInfoList"); - List stationInfoList = stationInfoMap.values().stream().filter(Objects::nonNull).collect(Collectors.toList()); + List stationInfoList = new LinkedList<>(); + if (Objects.nonNull(stationInfoMap)) { + stationInfoList = stationInfoMap.values().stream().filter(Objects::nonNull).collect(Collectors.toList()); + } + // // 获取所有的台站信息 // HashMap stationInfoMap = (HashMap) redisUtil.get("stationInfoMap"); // List detectorsUsedList = (List) redisUtil.get("detectorsUsedList"); diff --git a/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java b/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java index 54a0ea73..8aa0afa8 100644 --- a/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java +++ b/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java @@ -80,7 +80,7 @@ public class JeecgAutoProcessApplication extends SpringBootServletInitializer im EmailLogManager.init(spectrumPathProperties); //校验存储目录是否存在,不存在则创建,存在无操作 checkStorageDirectory(); -// autoProcessManager.start(systemStartupTime); + autoProcessManager.start(systemStartupTime); undealHandleManager.start(); fileSourceHandleManager.start(); // 删除过期的文件