diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RedisConstant.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RedisConstant.java index 3f00a7b9..8a1fa80f 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RedisConstant.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/constant/RedisConstant.java @@ -40,4 +40,11 @@ public interface RedisConstant { String QIYE_EMAIL_TOKEN = "Token:QiyeEmail"; // 网易企业邮箱Token String PREFIX_TEMPLATE = "Template:"; // 消息模板 + + /** + * 删除失败邮件KEY + */ + String EMAIL_MSG_ID = "email_msg_id"; + + String UNDEAL_FILE = "Undeal:"; } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java index 58a6fdfc..d701893e 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java @@ -8,14 +8,16 @@ import com.sun.mail.smtp.SMTPAddressFailedException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.jeecg.common.api.dto.message.MessageDTO; +import org.jeecg.common.constant.RedisConstant; import org.jeecg.common.constant.StringConstant; import org.jeecg.common.constant.SymbolConstant; import org.jeecg.common.email.emuns.MailContentType; import org.jeecg.common.properties.SpectrumPathProperties; import org.jeecg.common.util.DateUtils; +import org.jeecg.common.util.Md5Util; +import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.postgre.SysEmail; import org.jetbrains.annotations.NotNull; - import javax.mail.*; import javax.mail.internet.InternetAddress; import javax.mail.internet.MimeMessage; @@ -55,6 +57,8 @@ public class EmailServiceManager { /** 收件箱 */ private Folder folder = null; + private RedisUtil redisUtil; + @NotNull public static EmailServiceManager getInstance(){ return new EmailServiceManager(); @@ -72,12 +76,15 @@ public class EmailServiceManager { * 初始化邮件服务管理器 * @param email 邮件属性 */ - public void init(SysEmail email,Integer receiveNum,String temporaryStoragePath,Date systemStartupTime, SpectrumPathProperties pathProperties){ + public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath, + Date systemStartupTime, SpectrumPathProperties pathProperties, + RedisUtil redisUtil){ this.email = email; this.receiveNum = receiveNum; this.temporaryStoragePath = temporaryStoragePath; this.systemStartupTime = systemStartupTime; this.spectrumPathProperties = pathProperties; + this.redisUtil = redisUtil; } /** @@ -121,13 +128,23 @@ public class EmailServiceManager { properties.put("mail.imap.ssl.enable", "false"); } + HashMap IAM = new HashMap(); + //带上IMAP ID信息,由key和value组成,例如name,version,vendor,support-email等。 + IAM.put("name","myname"); + IAM.put("version","1.0.0"); + IAM.put("vendor","myclient"); + IAM.put("support-email","testmail@test.com"); + //获取邮件回话 final Session session = Session.getDefaultInstance(properties); + //获取smtp协议的存储对象 store = (IMAPStore) session.getStore(); //连接 store.connect(email.getUsername(),email.getPassword()); + // 解决163普通邮箱无法建立连接问题 + store.id(IAM); //获取收件箱 folder = store.getFolder("INBOX");//INBOX folder.open(Folder.READ_WRITE); @@ -572,16 +589,41 @@ public class EmailServiceManager { /** * 关闭邮件服务连接资源 */ - public void close(){ + public void close(List messageIds){ try { if(null != folder){ - folder.close(true); + folder.expunge(); + folder.close(); } if(null != store){ store.close(); } + for(String messageId : messageIds){ + String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; + redisUtil.del(key); + } } catch (MessagingException e) { + log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage()); e.printStackTrace(); } } + + /** + * 校验邮件 + * 若此次获取的邮件是上次删除失败的邮件直接删除 + * @param message + */ + public boolean check(Message message,String messageId){ + boolean exist = false; + try { + String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; + exist = redisUtil.hasKey(key); + if(exist){ + message.setFlag(Flags.Flag.DELETED,true); + } + return exist; + } catch (MessagingException e) { + return false; + } + } } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DetectorIdFormat.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DetectorIdFormat.java index bcca18cf..51f0b436 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DetectorIdFormat.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/DetectorIdFormat.java @@ -2,7 +2,6 @@ package org.jeecg.common.properties; import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.StrUtil; -import io.swagger.models.auth.In; import lombok.Data; import org.apache.commons.lang3.StringUtils; import org.jeecg.common.constant.StringConstant; @@ -50,7 +49,10 @@ public class DetectorIdFormat { return detectorId; } - public Integer detectorCodeToId(String detectorCode){ + /** + * 探测器Code解析出探测器Id + */ + public Integer codeToId(String detectorCode){ if (!StrUtil.contains(detectorCode, StrUtil.UNDERLINE)) return null; String[] split = StrUtil.split(detectorCode, StrUtil.UNDERLINE); @@ -61,9 +63,9 @@ public class DetectorIdFormat { for (Map.Entry entry : suffixMap.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); - if (!StrUtil.contains(prefix, key)) - continue; + if (!StrUtil.contains(prefix, key)) continue; prefix = StrUtil.replace(prefix, key, value); + break; } if (!NumberUtil.isNumber(prefix)) return null; @@ -100,4 +102,25 @@ public class DetectorIdFormat { return stationId; } + + /** + * 探测器Code解析出台站Id + */ + public Integer codeToStationId(String detectorCode){ + if (!StrUtil.contains(detectorCode, StrUtil.UNDERLINE)) + return null; + String[] split = StrUtil.split(detectorCode, StrUtil.UNDERLINE); + String stationCode = split[0]; + stationCode = StrUtil.sub(stationCode, 2, stationCode.length()); + for (Map.Entry entry : suffixMap.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (!StrUtil.contains(stationCode, key)) continue; + stationCode = StrUtil.replace(stationCode, key, value); + break; + } + if (!NumberUtil.isNumber(stationCode)) + return null; + return Integer.valueOf(stationCode); + } } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/MaximumPoolSizeProperties.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/MaximumPoolSizeProperties.java index 81b10e7b..a2883eaa 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/MaximumPoolSizeProperties.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/MaximumPoolSizeProperties.java @@ -11,4 +11,6 @@ public class MaximumPoolSizeProperties { private Integer station; + private Integer auto; + } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/configuration/GardsDetectors.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/configuration/GardsDetectors.java index 1f57ae83..32d317ed 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/configuration/GardsDetectors.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/configuration/GardsDetectors.java @@ -5,9 +5,14 @@ import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; import com.fasterxml.jackson.annotation.JsonFormat; import lombok.Data; +import org.jeecg.config.valid.InsertGroup; +import org.jeecg.config.valid.UpdateGroup; import org.springframework.format.annotation.DateTimeFormat; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; import java.io.Serializable; +import java.time.LocalDateTime; import java.util.Date; @Data diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/DetectorsStatus.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/DetectorsStatus.java deleted file mode 100644 index e5b9c45f..00000000 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/DetectorsStatus.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.jeecg.modules.base.enums; - -/** - * 探测器运行状态 - */ -public enum DetectorsStatus { - - /** - * 未运行 - */ - UNOPERATING("Unoperating"), - /** - * 在运行 - */ - OPERATING("Operating"); - - private String status; - - DetectorsStatus(String type) { - this.status = type; - } - - public String getStatus(){ - return this.status; - } -} diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java index 9d9eaeac..4472ca79 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java @@ -2,6 +2,8 @@ package org.jeecg.modules; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.jeecg.common.constant.RedisConstant; +import org.jeecg.common.constant.StringConstant; import org.jeecg.common.email.EmailServiceManager; import org.jeecg.common.properties.TaskProperties; import org.jeecg.common.util.RedisUtil; @@ -57,7 +59,9 @@ public class AutoProcessManager{ final MailServerMonitor monitorThread = new MailServerMonitor(); monitorThread.setName("mail-server-monitor"); monitorThread.start(); - + //邮箱执行线程前给redis中添加一个default key,及default value,便于查看及排查 + String defaultKey = RedisConstant.EMAIL_MSG_ID+ StringConstant.COLON+"default"; + redisUtil.set(defaultKey,"default"); //邮件执行线程管理 final MailExecManager autoProcessThread = new MailExecManager(); autoProcessThread.setName("mail-exec-thread-manage"); diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java index 2d2c0ca5..00351894 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java @@ -6,14 +6,17 @@ import org.jeecg.common.email.EmailLogManager; import org.jeecg.common.email.EmailServiceManager; import org.jeecg.common.properties.TaskProperties; import org.jeecg.modules.email.EmailProperties; -import org.jeecg.modules.eneity.event.SpectrumLog; import org.jeecg.modules.spectrum.EmailCounter; import org.jeecg.modules.spectrum.SpectrumLogManager; import org.jeecg.modules.spectrum.SpectrumParsingActuator; import org.jeecg.modules.spectrum.SpectrumServiceQuotes; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; import javax.mail.Message; +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage; +import java.util.ArrayList; import java.util.Date; +import java.util.List; import java.util.concurrent.*; /** @@ -39,7 +42,7 @@ public class EmailParsingActuator extends Thread{ this.systemStartupTime = systemStartupTime; //获取机器可用核心数 - int systemCores = Runtime.getRuntime().availableProcessors(); + int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto(); int maximumPoolSize = taskProperties.getReceiveNum() > systemCores?taskProperties.getReceiveNum():systemCores; //初始化线程池 @@ -60,10 +63,23 @@ public class EmailParsingActuator extends Thread{ } long start = System.currentTimeMillis(); final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); - emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties()); + emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(), + this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(),spectrumServiceQuotes.getRedisUtil()); + List messageIds = new ArrayList<>(); try { - final Message[] messages = emailServiceManager.receiveMail(); + Message[] messages = emailServiceManager.receiveMail(); if(ArrayUtils.isNotEmpty(messages)){ + //检验获取的邮件是否在之前删除失败列表中,若在直接调用邮件API删除,并且此次数组里元素也删除 + for(int i=messages.length-1;i>=0;i--){ + if (!messages[i].isExpunged()){ + String messageId = ((MimeMessage) messages[i]).getMessageID(); + final boolean exist = emailServiceManager.check(messages[i],messageId); + messageIds.add(messageId); + if(exist){ + messages = ArrayUtils.remove(messages,i); + } + } + } CountDownLatch taskLatch = new CountDownLatch(messages.length); for(Message message : messages){ SpectrumParsingActuator spectrumParsingActuator = new SpectrumParsingActuator(); @@ -73,9 +89,9 @@ public class EmailParsingActuator extends Thread{ } taskLatch.await(); } - }catch (InterruptedException e) { + }catch (InterruptedException | MessagingException e) { e.printStackTrace(); - }finally { + } finally { //清除本批次邮件日志缓存 EmailLogManager.getInstance().clear(); //保存本批次所有能谱日志 @@ -83,7 +99,7 @@ public class EmailParsingActuator extends Thread{ //清除本批次能谱日志缓存 SpectrumLogManager.mailSpectrumLogManager.clear(); //关闭资源 - emailServiceManager.close(); + emailServiceManager.close(messageIds); } long end = System.currentTimeMillis(); long sleepTime = taskProperties.getMailThreadExecCycle() - (end-start); @@ -98,6 +114,4 @@ public class EmailParsingActuator extends Thread{ } } } - - } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/ErrorLogManager.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/ErrorLogManager.java index 6f6d03bc..5e830a23 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/ErrorLogManager.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/ErrorLogManager.java @@ -51,7 +51,9 @@ public class ErrorLogManager { //台站找不到,格式化报错信息 if(event.getErrorType().equals(ErrorType.STATION_ERROR)){ errorContent = String.format(ErrorType.STATION_ERROR.getContent(),event.getFormatArgs()); - }else{ + } else if (event.getErrorType().equals(ErrorType.INSERT_ERROR)) { + errorContent = String.format(ErrorType.INSERT_ERROR.getContent(), event.getFormatArgs()); + } else{ errorContent = event.getErrorType().getContent(); } //header、acquisition、ariSamplerFlow错误使用mesg_id生成文件名称 diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/FileSourceHandleManager.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/FileSourceHandleManager.java index 0127b1d7..0eccdbc1 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/FileSourceHandleManager.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/FileSourceHandleManager.java @@ -52,7 +52,7 @@ public class FileSourceHandleManager{ public void init(){ //获取机器可用核心数 - int systemCores = Runtime.getRuntime().availableProcessors(); + int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto(); int maximumPoolSize = taskProperties.getFilesourceDirReceiveNum() > systemCores?taskProperties.getFilesourceDirReceiveNum():systemCores; //初始化线程池 diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/UndealHandleManager.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/UndealHandleManager.java index 63129990..12d16596 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/UndealHandleManager.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/UndealHandleManager.java @@ -2,6 +2,7 @@ package org.jeecg.modules; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.jeecg.common.constant.RedisConstant; import org.jeecg.common.properties.TaskProperties; import org.jeecg.modules.enums.SpectrumSource; import org.jeecg.modules.service.BlockConstant; @@ -52,7 +53,7 @@ public class UndealHandleManager{ public void init(){ //获取机器可用核心数 - int systemCores = Runtime.getRuntime().availableProcessors(); + int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto(); int maximumPoolSize = taskProperties.getUndealDirReceiveNum() > systemCores?taskProperties.getUndealDirReceiveNum():systemCores; //初始化线程池 @@ -128,9 +129,9 @@ public class UndealHandleManager{ long createMillis = currentMillis; //判断redis是否包含文件名称 key if (spectrumServiceQuotes.getRedisStreamUtil().hasKey(spectrumFile.getName())) { - createMillis = (long) spectrumServiceQuotes.getRedisStreamUtil().get(spectrumFile.getName()); + createMillis = (long) spectrumServiceQuotes.getRedisStreamUtil().get(RedisConstant.UNDEAL_FILE + spectrumFile.getName()); } else { - spectrumServiceQuotes.getRedisStreamUtil().set(spectrumFile.getName(), currentMillis); + spectrumServiceQuotes.getRedisStreamUtil().set(RedisConstant.UNDEAL_FILE + spectrumFile.getName(), currentMillis); } try { //解析文件 @@ -148,7 +149,7 @@ public class UndealHandleManager{ this.taskLatch.countDown(); //满足undeal文件处理周期时长会删除源文件 if ((currentMillis - createMillis) >= taskProperties.getUndealFileTimeOut()) { - spectrumServiceQuotes.getRedisStreamUtil().del(spectrumFile.getName()); + spectrumServiceQuotes.getRedisStreamUtil().del(RedisConstant.UNDEAL_FILE + spectrumFile.getName()); spectrumFile.delete(); } } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/enums/ErrorType.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/enums/ErrorType.java index 9c002fe4..bc385a57 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/enums/ErrorType.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/enums/ErrorType.java @@ -7,7 +7,8 @@ public enum ErrorType { STATION_ERROR("station_code:%s=0"), FILE_REPEAT("file repeat"), GAS_OR_DET_ERROR("gas or det file is no exist or is error"), - AIR_SAMPLER_FLOW_ERROR("this is no ariSamplerFlow data"); + AIR_SAMPLER_FLOW_ERROR("this is no ariSamplerFlow data"), + INSERT_ERROR("The sample_id:%s has been reported missing in the database"); private String content; diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/AnalyseException.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/AnalyseException.java new file mode 100644 index 00000000..bdc00859 --- /dev/null +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/AnalyseException.java @@ -0,0 +1,19 @@ +package org.jeecg.modules.exception; + +import lombok.Getter; + +public class AnalyseException extends Exception{ + + @Getter + private boolean isDuplicateKeyException = false; + + public AnalyseException(String message) { + super(message); + } + + public AnalyseException(String message, boolean isDuplicateKeyException) { + super(message); + this.isDuplicateKeyException = isDuplicateKeyException; + } + +} diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/BAnalyseException.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/BAnalyseException.java index d0125608..c9270fae 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/BAnalyseException.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/BAnalyseException.java @@ -3,7 +3,7 @@ package org.jeecg.modules.exception; /** * B谱分析异常 */ -public class BAnalyseException extends Exception{ +public class BAnalyseException extends AnalyseException{ /** * Constructs a new exception with the specified detail message. The @@ -16,4 +16,9 @@ public class BAnalyseException extends Exception{ public BAnalyseException(String message) { super(message); } + + public BAnalyseException(String message, boolean isDuplicateKeyException) { + super(message,isDuplicateKeyException); + } + } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/GAnalyseException.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/GAnalyseException.java index 406dc605..2f69fdfa 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/GAnalyseException.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/exception/GAnalyseException.java @@ -3,7 +3,7 @@ package org.jeecg.modules.exception; /** * B谱分析异常 */ -public class GAnalyseException extends Exception{ +public class GAnalyseException extends AnalyseException{ /** * Constructs a new exception with the specified detail message. The @@ -16,4 +16,9 @@ public class GAnalyseException extends Exception{ public GAnalyseException(String message) { super(message); } + + public GAnalyseException(String message, boolean isDuplicateKeyException) { + super(message,isDuplicateKeyException); + } + } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsDetectorsServiceImpl.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsDetectorsServiceImpl.java index 9a70a65f..d30086eb 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsDetectorsServiceImpl.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsDetectorsServiceImpl.java @@ -1,32 +1,74 @@ package org.jeecg.modules.service.impl; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.collection.ListUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; import com.baomidou.dynamic.datasource.annotation.DS; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.RequiredArgsConstructor; +import org.jeecg.common.api.vo.Result; +import org.jeecg.common.constant.Prompt; import org.jeecg.common.properties.DetectorIdFormat; import org.jeecg.modules.base.entity.configuration.GardsDetectors; -import org.jeecg.modules.base.enums.DetectorsStatus; +import org.jeecg.modules.base.enums.DetectorStatus; import org.jeecg.modules.mapper.GardsDetectorsMapper; import org.jeecg.modules.service.GardsDetectorsService; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -import java.util.Date; -import java.util.Objects; + +import java.util.*; +import java.util.stream.Collectors; @DS("ora") @Service @RequiredArgsConstructor public class GardsDetectorsServiceImpl extends ServiceImpl implements GardsDetectorsService { - private final DetectorIdFormat format; + private final DetectorIdFormat idFormat; /** - * 校验探测器是否存在,不存在则创建 - * @param detectorCode + * 校验探测器是否存在,不存在则创建 */ @Transactional @Override + public GardsDetectors check(String detectorCode) { + LambdaQueryWrapper wrapper = new LambdaQueryWrapper<>(); + wrapper.eq(GardsDetectors::getDetectorCode, detectorCode); + Optional optional = this.list(wrapper).stream().findFirst(); + if (optional.isPresent()) + return optional.get(); + Integer detectorId = idFormat.codeToId(detectorCode); + Integer stationId = idFormat.codeToStationId(detectorCode); + if (ObjectUtil.isNull(detectorId) || ObjectUtil.isNull(stationId)) + throw new RuntimeException("Invalid Detector Code: " + detectorCode); + GardsDetectors detector = new GardsDetectors(); + detector.setDetectorId(detectorId); + detector.setStationId(stationId); + detector.setDetectorCode(detectorCode); + detector.setStatus(DetectorStatus.ON.getValue()); + + // 查询相同台站下所有工作的探测器 按照探测器Id升序排序 + wrapper.clear(); + wrapper.eq(GardsDetectors::getStationId, stationId); + List detectors = this.list(wrapper); + detectors = detectors.stream() + .filter(item -> StrUtil.equals(StrUtil.trim(item.getStatus()), DetectorStatus.ON.getValue())) + .sorted(Comparator.comparingInt(GardsDetectors::getDetectorId)) + .collect(Collectors.toList()); + // 如果相同台站下没有工作探测器 + if (CollUtil.isEmpty(detectors)) + return this.save(detector) ? detector : null; + // 如果相同台站下有工作探测器 将Id最小的探测器状态置为 Unoperating + GardsDetectors detectorMin = detectors.get(0); + detectorMin.setStatus(DetectorStatus.OFF.getValue()); + detectors = ListUtil.toList(detectorMin, detector); + return this.saveOrUpdateBatch(detectors) ? detector : null; + } + + /*@Transactional + @Override public GardsDetectors check(String detectorCode) { LambdaQueryWrapper detectorsQuery = new LambdaQueryWrapper<>(); detectorsQuery.select(GardsDetectors::getDetectorId); @@ -42,5 +84,5 @@ public class GardsDetectorsServiceImpl extends ServiceImpl{ - this.saveLog(k); - }); + synchronized (execLogMap){ + if(!execLogMap.isEmpty()){ + execLogMap.forEach((k,v)->{ + this.saveLog(k); + }); + } } } } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java index e382fd6f..4af40e0a 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java @@ -3,12 +3,16 @@ package org.jeecg.modules.spectrum; import cn.hutool.core.io.FileUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.jeecg.common.constant.RedisConstant; +import org.jeecg.common.constant.StringConstant; import org.jeecg.common.email.EmailLogEvent; import org.jeecg.common.email.EmailLogManager; import org.jeecg.common.email.EmailServiceManager; +import org.jeecg.common.util.DateUtils; import org.jeecg.modules.email.EmailProperties; import org.jeecg.modules.enums.SpectrumSource; import javax.mail.Message; +import javax.mail.internet.MimeMessage; import java.io.File; import java.util.Objects; import java.util.concurrent.CountDownLatch; @@ -25,7 +29,6 @@ public class SpectrumParsingActuator implements Runnable{ private final static String MSG_TYPE = "MSG_TYPE DATA"; private final static String EMAIL_STOP = "STOP"; - /** * 邮件对象 */ @@ -50,6 +53,10 @@ public class SpectrumParsingActuator implements Runnable{ * 邮件计数器 */ private EmailCounter emailCounter; + /** + * 一天秒数 + */ + private final int expiryTime = 86400; public void init(Message message, EmailProperties emailProperties,EmailServiceManager emailServiceManager, CountDownLatch taskLatch, SpectrumServiceQuotes spectrumServiceQuotes, @@ -66,11 +73,17 @@ public class SpectrumParsingActuator implements Runnable{ public void run() { String subject = null; try { - //线程开始初始化时,初始本线程负责的能谱日志事件 - SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null); //获取邮件主题 subject = emailServiceManager.getMailSubject(message); + //解析之前先把邮件唯一信息存储到redis + String messageId = ((MimeMessage) message).getMessageID(); + String emlName = subject+ StringConstant.UNDER_LINE+ DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"); + String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; + spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime); + //线程开始初始化时,初始本线程负责的能谱日志事件 + SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null); + //所有邮件都需以.eml格式存储到eml文件夹中 final File emlFile = emailServiceManager.downloadEmailToEmlDir(message, emailCounter.getCurrValue()); //保存邮件日志到PG数据库 diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumServiceQuotes.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumServiceQuotes.java index ea4dc666..9158d08c 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumServiceQuotes.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumServiceQuotes.java @@ -5,6 +5,7 @@ import lombok.RequiredArgsConstructor; import org.jeecg.common.properties.*; import org.jeecg.common.util.NameStandUtil; import org.jeecg.common.util.RedisStreamUtil; +import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.datasource.OraDataSourceProperties; import org.jeecg.modules.service.*; import org.springframework.context.ApplicationContext; @@ -81,6 +82,9 @@ public class SpectrumServiceQuotes { private final ApplicationContext applicationContext; + private final RedisUtil redisUtil; + + private final MaximumPoolSizeProperties maximumPoolSizeProperties; /** * 原始库插入数据锁 */ diff --git a/jeecg-module-demo/src/main/java/org/jeecg/modules/demo/test/controller/JeecgDemoController.java b/jeecg-module-demo/src/main/java/org/jeecg/modules/demo/test/controller/JeecgDemoController.java index 70d53d8d..d2d08cc4 100644 --- a/jeecg-module-demo/src/main/java/org/jeecg/modules/demo/test/controller/JeecgDemoController.java +++ b/jeecg-module-demo/src/main/java/org/jeecg/modules/demo/test/controller/JeecgDemoController.java @@ -5,6 +5,8 @@ import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; +import com.baomidou.mybatisplus.core.toolkit.StringPool; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; @@ -13,6 +15,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPClient; +import org.apache.commons.net.ftp.FTPFile; import org.jeecg.common.api.vo.Result; import org.jeecg.common.aspect.annotation.AutoLog; import org.jeecg.common.aspect.annotation.PermissionData; @@ -52,15 +55,13 @@ public class JeecgDemoController extends JeecgController files = new LinkedList<>(); + readFiles(ftpClient, ftpClient.printWorkingDirectory()+ StringPool.SLASH + "savefile", files); + System.out.println("文件数量:"+files.size()); } catch (IOException e) { throw new RuntimeException(e); } finally { @@ -80,18 +80,32 @@ public class JeecgDemoController extends JeecgController files) { + String oldPath = parentFilePath; + try { + ftpClient.changeWorkingDirectory(parentFilePath); + FTPFile[] directories = ftpClient.listFiles(parentFilePath); + if (Objects.nonNull(directories)) { + for (FTPFile ftpFile:directories) { + if (ftpFile.isDirectory()) { + parentFilePath = oldPath + StringPool.SLASH + ftpFile.getName(); + readFiles(ftpClient, parentFilePath, files); + } else if (ftpFile.isFile()) { + files.add(ftpFile.getName()); + } + } + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + @Autowired diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/GardsDetectorsController.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/GardsDetectorsController.java index f986567f..e24cae7e 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/GardsDetectorsController.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/controller/GardsDetectorsController.java @@ -70,5 +70,4 @@ public class GardsDetectorsController { public Map> findStationDetectors(@RequestBody List stationIds){ return gardsDetectorsService.findStationDetectors(stationIds); } - } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/GardsDetectorsMapper.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/GardsDetectorsMapper.java index 6b430814..ac93aa94 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/GardsDetectorsMapper.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/GardsDetectorsMapper.java @@ -22,6 +22,4 @@ public interface GardsDetectorsMapper extends BaseMapper { * @return */ List findType(); - - List list(Integer stationId, String status); } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/xml/GardsDetectorsMapper.xml b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/xml/GardsDetectorsMapper.xml index 80c8407a..201fc2cf 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/xml/GardsDetectorsMapper.xml +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/mapper/xml/GardsDetectorsMapper.xml @@ -38,15 +38,4 @@ - - \ No newline at end of file diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/IGardsDetectorsService.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/IGardsDetectorsService.java index e2e30219..05a0bf87 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/IGardsDetectorsService.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/IGardsDetectorsService.java @@ -36,14 +36,14 @@ public interface IGardsDetectorsService extends IService { * @param gardsDetectors * @return */ - Result create(GardsDetectorsSystem gardsDetectors); + Result create(GardsDetectorsSystem gardsDetectors); /** * 修改监测器信息 * @param gardsDetectors * @return */ - Result update(GardsDetectorsSystem gardsDetectors); + Result update(GardsDetectorsSystem gardsDetectors); /** * 删除监测器信息 @@ -64,5 +64,4 @@ public interface IGardsDetectorsService extends IService { * @return */ Map> findStationDetectors(List stationIds); - } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/impl/GardsDetectorsServiceImpl.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/impl/GardsDetectorsServiceImpl.java index 4decda5b..ce834aed 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/impl/GardsDetectorsServiceImpl.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/system/service/impl/GardsDetectorsServiceImpl.java @@ -11,7 +11,6 @@ import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.baomidou.mybatisplus.core.toolkit.StringUtils; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; -import com.google.common.collect.Lists; import org.jeecg.common.api.QueryRequest; import org.jeecg.common.api.vo.Result; import org.jeecg.common.constant.Prompt; @@ -19,7 +18,6 @@ import org.jeecg.common.properties.DetectorIdFormat; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.configuration.GardsDetectors; import org.jeecg.modules.base.enums.DetectorStatus; -import org.jeecg.modules.base.enums.DetectorsStatus; import org.jeecg.modules.system.entity.GardsDetectorsSystem; import org.jeecg.modules.system.mapper.GardsDetectorsMapper; import org.jeecg.modules.system.service.IGardsDetectorsService; @@ -90,7 +88,7 @@ public class GardsDetectorsServiceImpl extends ServiceImpl create(GardsDetectorsSystem detector) { String detectorCode = detector.getDetectorCode(); - Integer detectorId = idFormat.detectorCodeToId(detectorCode); + Integer detectorId = idFormat.codeToId(detectorCode); if (ObjectUtil.isNull(detectorId)) return Result.error("Detector Code Is Invalid"); if (ObjectUtil.isNotNull(getById(detectorId))) @@ -152,7 +150,7 @@ public class GardsDetectorsServiceImpl extends ServiceImpl gardsDetectors = this.baseMapper.selectList(new LambdaQueryWrapper<>()); Map detectorsMap = gardsDetectors.stream().collect(Collectors.toMap(GardsDetectorsSystem::getDetectorId, GardsDetectorsSystem::getDetectorCode)); - List detectorsUsedList = gardsDetectors.stream().filter(item -> item.getStatus()!=null && DetectorsStatus.OPERATING.getStatus().equals(item.getStatus().trim())).map(GardsDetectorsSystem::getStationId).collect(Collectors.toList()); + List detectorsUsedList = gardsDetectors.stream().filter(item -> item.getStatus()!=null && "Operating".equals(item.getStatus().trim())).map(GardsDetectorsSystem::getStationId).collect(Collectors.toList()); redisUtil.set("detectorsMap",detectorsMap); redisUtil.set("detectorsUsedList", detectorsUsedList); } @@ -165,7 +163,7 @@ public class GardsDetectorsServiceImpl extends ServiceImpl detectorsList = this.baseMapper.selectList(queryWrapper); for (String stationId:stationIds) { - List detectors = detectorsList.stream().filter(item -> item.getStationId()!=null && item.getStationId().equals(Integer.valueOf(stationId)) && item.getStatus()!=null && item.getStatus().trim().equals(DetectorsStatus.OPERATING.getStatus())).collect(Collectors.toList()); + List detectors = detectorsList.stream().filter(item -> item.getStationId()!=null && item.getStationId().equals(Integer.valueOf(stationId)) && item.getStatus()!=null && item.getStatus().trim().equals("Operating")).collect(Collectors.toList()); map.put(stationId, detectors); } } diff --git a/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java b/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java index ab22ec69..671aa707 100644 --- a/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java +++ b/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java @@ -80,12 +80,12 @@ public class JeecgAutoProcessApplication extends SpringBootServletInitializer im //校验临时存储目录是否存在,不存在则创建 checkTempStorageDirectory(); //初始化邮箱邮件声明周期日志 - EmailLogManager.init(spectrumPathProperties); +// EmailLogManager.init(spectrumPathProperties); //初始化能谱解析错误日志管理器 ErrorLogManager.init(spectrumPathProperties); //校验存储目录是否存在,不存在则创建,存在无操作 checkStorageDirectory(); - autoProcessManager.start(systemStartupTime); +// autoProcessManager.start(systemStartupTime); undealHandleManager.start(); fileSourceHandleManager.start(); // 删除过期的文件