同步mdc

This commit is contained in:
qiaoqinzheng 2024-01-23 15:03:21 +08:00
parent 630bc4ee4f
commit 5498e0e433
36 changed files with 376 additions and 126 deletions

View File

@ -40,4 +40,11 @@ public interface RedisConstant {
String QIYE_EMAIL_TOKEN = "Token:QiyeEmail"; // 网易企业邮箱Token
String PREFIX_TEMPLATE = "Template:"; // 消息模板
/**
* 删除失败邮件KEY
*/
String EMAIL_MSG_ID = "email_msg_id";
String UNDEAL_FILE = "Undeal:";
}

View File

@ -8,14 +8,16 @@ import com.sun.mail.smtp.SMTPAddressFailedException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.constant.StringConstant;
import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.email.emuns.MailContentType;
import org.jeecg.common.properties.SpectrumPathProperties;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.Md5Util;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.postgre.SysEmail;
import org.jetbrains.annotations.NotNull;
import javax.mail.*;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
@ -55,6 +57,8 @@ public class EmailServiceManager {
/** 收件箱 */
private Folder folder = null;
private RedisUtil redisUtil;
@NotNull
public static EmailServiceManager getInstance(){
return new EmailServiceManager();
@ -72,12 +76,15 @@ public class EmailServiceManager {
* 初始化邮件服务管理器
* @param email 邮件属性
*/
public void init(SysEmail email,Integer receiveNum,String temporaryStoragePath,Date systemStartupTime, SpectrumPathProperties pathProperties){
public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath,
Date systemStartupTime, SpectrumPathProperties pathProperties,
RedisUtil redisUtil){
this.email = email;
this.receiveNum = receiveNum;
this.temporaryStoragePath = temporaryStoragePath;
this.systemStartupTime = systemStartupTime;
this.spectrumPathProperties = pathProperties;
this.redisUtil = redisUtil;
}
/**
@ -121,13 +128,23 @@ public class EmailServiceManager {
properties.put("mail.imap.ssl.enable", "false");
}
HashMap IAM = new HashMap();
//带上IMAP ID信息由key和value组成例如nameversionvendorsupport-email等
IAM.put("name","myname");
IAM.put("version","1.0.0");
IAM.put("vendor","myclient");
IAM.put("support-email","testmail@test.com");
//获取邮件回话
final Session session = Session.getDefaultInstance(properties);
//获取smtp协议的存储对象
store = (IMAPStore) session.getStore();
//连接
store.connect(email.getUsername(),email.getPassword());
// 解决163普通邮箱无法建立连接问题
store.id(IAM);
//获取收件箱
folder = store.getFolder("INBOX");//INBOX
folder.open(Folder.READ_WRITE);
@ -572,16 +589,41 @@ public class EmailServiceManager {
/**
* 关闭邮件服务连接资源
*/
public void close(){
public void close(List<String> messageIds){
try {
if(null != folder){
folder.close(true);
folder.expunge();
folder.close();
}
if(null != store){
store.close();
}
for(String messageId : messageIds){
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
redisUtil.del(key);
}
} catch (MessagingException e) {
log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage());
e.printStackTrace();
}
}
/**
* 校验邮件
* 若此次获取的邮件是上次删除失败的邮件直接删除
* @param message
*/
public boolean check(Message message,String messageId){
boolean exist = false;
try {
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
exist = redisUtil.hasKey(key);
if(exist){
message.setFlag(Flags.Flag.DELETED,true);
}
return exist;
} catch (MessagingException e) {
return false;
}
}
}

View File

@ -2,7 +2,6 @@ package org.jeecg.common.properties;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import io.swagger.models.auth.In;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.StringConstant;
@ -50,7 +49,10 @@ public class DetectorIdFormat {
return detectorId;
}
public Integer detectorCodeToId(String detectorCode){
/**
* 探测器Code解析出探测器Id
*/
public Integer codeToId(String detectorCode){
if (!StrUtil.contains(detectorCode, StrUtil.UNDERLINE))
return null;
String[] split = StrUtil.split(detectorCode, StrUtil.UNDERLINE);
@ -61,9 +63,9 @@ public class DetectorIdFormat {
for (Map.Entry<String, String> entry : suffixMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (!StrUtil.contains(prefix, key))
continue;
if (!StrUtil.contains(prefix, key)) continue;
prefix = StrUtil.replace(prefix, key, value);
break;
}
if (!NumberUtil.isNumber(prefix))
return null;
@ -100,4 +102,25 @@ public class DetectorIdFormat {
return stationId;
}
/**
* 探测器Code解析出台站Id
*/
public Integer codeToStationId(String detectorCode){
if (!StrUtil.contains(detectorCode, StrUtil.UNDERLINE))
return null;
String[] split = StrUtil.split(detectorCode, StrUtil.UNDERLINE);
String stationCode = split[0];
stationCode = StrUtil.sub(stationCode, 2, stationCode.length());
for (Map.Entry<String, String> entry : suffixMap.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
if (!StrUtil.contains(stationCode, key)) continue;
stationCode = StrUtil.replace(stationCode, key, value);
break;
}
if (!NumberUtil.isNumber(stationCode))
return null;
return Integer.valueOf(stationCode);
}
}

View File

@ -11,4 +11,6 @@ public class MaximumPoolSizeProperties {
private Integer station;
private Integer auto;
}

View File

@ -5,9 +5,14 @@ import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Data;
import org.jeecg.config.valid.InsertGroup;
import org.jeecg.config.valid.UpdateGroup;
import org.springframework.format.annotation.DateTimeFormat;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Date;
@Data

View File

@ -1,26 +0,0 @@
package org.jeecg.modules.base.enums;
/**
* 探测器运行状态
*/
public enum DetectorsStatus {
/**
* 未运行
*/
UNOPERATING("Unoperating"),
/**
* 在运行
*/
OPERATING("Operating");
private String status;
DetectorsStatus(String type) {
this.status = type;
}
public String getStatus(){
return this.status;
}
}

View File

@ -2,6 +2,8 @@ package org.jeecg.modules;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.constant.StringConstant;
import org.jeecg.common.email.EmailServiceManager;
import org.jeecg.common.properties.TaskProperties;
import org.jeecg.common.util.RedisUtil;
@ -57,7 +59,9 @@ public class AutoProcessManager{
final MailServerMonitor monitorThread = new MailServerMonitor();
monitorThread.setName("mail-server-monitor");
monitorThread.start();
//邮箱执行线程前给redis中添加一个default key及default value便于查看及排查
String defaultKey = RedisConstant.EMAIL_MSG_ID+ StringConstant.COLON+"default";
redisUtil.set(defaultKey,"default");
//邮件执行线程管理
final MailExecManager autoProcessThread = new MailExecManager();
autoProcessThread.setName("mail-exec-thread-manage");

View File

@ -6,14 +6,17 @@ import org.jeecg.common.email.EmailLogManager;
import org.jeecg.common.email.EmailServiceManager;
import org.jeecg.common.properties.TaskProperties;
import org.jeecg.modules.email.EmailProperties;
import org.jeecg.modules.eneity.event.SpectrumLog;
import org.jeecg.modules.spectrum.EmailCounter;
import org.jeecg.modules.spectrum.SpectrumLogManager;
import org.jeecg.modules.spectrum.SpectrumParsingActuator;
import org.jeecg.modules.spectrum.SpectrumServiceQuotes;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
/**
@ -39,7 +42,7 @@ public class EmailParsingActuator extends Thread{
this.systemStartupTime = systemStartupTime;
//获取机器可用核心数
int systemCores = Runtime.getRuntime().availableProcessors();
int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto();
int maximumPoolSize = taskProperties.getReceiveNum() > systemCores?taskProperties.getReceiveNum():systemCores;
//初始化线程池
@ -60,10 +63,23 @@ public class EmailParsingActuator extends Thread{
}
long start = System.currentTimeMillis();
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties());
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(),spectrumServiceQuotes.getRedisUtil());
List<String> messageIds = new ArrayList<>();
try {
final Message[] messages = emailServiceManager.receiveMail();
Message[] messages = emailServiceManager.receiveMail();
if(ArrayUtils.isNotEmpty(messages)){
//检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除
for(int i=messages.length-1;i>=0;i--){
if (!messages[i].isExpunged()){
String messageId = ((MimeMessage) messages[i]).getMessageID();
final boolean exist = emailServiceManager.check(messages[i],messageId);
messageIds.add(messageId);
if(exist){
messages = ArrayUtils.remove(messages,i);
}
}
}
CountDownLatch taskLatch = new CountDownLatch(messages.length);
for(Message message : messages){
SpectrumParsingActuator spectrumParsingActuator = new SpectrumParsingActuator();
@ -73,9 +89,9 @@ public class EmailParsingActuator extends Thread{
}
taskLatch.await();
}
}catch (InterruptedException e) {
}catch (InterruptedException | MessagingException e) {
e.printStackTrace();
}finally {
} finally {
//清除本批次邮件日志缓存
EmailLogManager.getInstance().clear();
//保存本批次所有能谱日志
@ -83,7 +99,7 @@ public class EmailParsingActuator extends Thread{
//清除本批次能谱日志缓存
SpectrumLogManager.mailSpectrumLogManager.clear();
//关闭资源
emailServiceManager.close();
emailServiceManager.close(messageIds);
}
long end = System.currentTimeMillis();
long sleepTime = taskProperties.getMailThreadExecCycle() - (end-start);
@ -98,6 +114,4 @@ public class EmailParsingActuator extends Thread{
}
}
}
}

View File

@ -51,7 +51,9 @@ public class ErrorLogManager {
//台站找不到格式化报错信息
if(event.getErrorType().equals(ErrorType.STATION_ERROR)){
errorContent = String.format(ErrorType.STATION_ERROR.getContent(),event.getFormatArgs());
}else{
} else if (event.getErrorType().equals(ErrorType.INSERT_ERROR)) {
errorContent = String.format(ErrorType.INSERT_ERROR.getContent(), event.getFormatArgs());
} else{
errorContent = event.getErrorType().getContent();
}
//headeracquisitionariSamplerFlow错误使用mesg_id生成文件名称

View File

@ -52,7 +52,7 @@ public class FileSourceHandleManager{
public void init(){
//获取机器可用核心数
int systemCores = Runtime.getRuntime().availableProcessors();
int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto();
int maximumPoolSize = taskProperties.getFilesourceDirReceiveNum() > systemCores?taskProperties.getFilesourceDirReceiveNum():systemCores;
//初始化线程池

View File

@ -2,6 +2,7 @@ package org.jeecg.modules;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.properties.TaskProperties;
import org.jeecg.modules.enums.SpectrumSource;
import org.jeecg.modules.service.BlockConstant;
@ -52,7 +53,7 @@ public class UndealHandleManager{
public void init(){
//获取机器可用核心数
int systemCores = Runtime.getRuntime().availableProcessors();
int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto();
int maximumPoolSize = taskProperties.getUndealDirReceiveNum() > systemCores?taskProperties.getUndealDirReceiveNum():systemCores;
//初始化线程池
@ -128,9 +129,9 @@ public class UndealHandleManager{
long createMillis = currentMillis;
//判断redis是否包含文件名称 key
if (spectrumServiceQuotes.getRedisStreamUtil().hasKey(spectrumFile.getName())) {
createMillis = (long) spectrumServiceQuotes.getRedisStreamUtil().get(spectrumFile.getName());
createMillis = (long) spectrumServiceQuotes.getRedisStreamUtil().get(RedisConstant.UNDEAL_FILE + spectrumFile.getName());
} else {
spectrumServiceQuotes.getRedisStreamUtil().set(spectrumFile.getName(), currentMillis);
spectrumServiceQuotes.getRedisStreamUtil().set(RedisConstant.UNDEAL_FILE + spectrumFile.getName(), currentMillis);
}
try {
//解析文件
@ -148,7 +149,7 @@ public class UndealHandleManager{
this.taskLatch.countDown();
//满足undeal文件处理周期时长会删除源文件
if ((currentMillis - createMillis) >= taskProperties.getUndealFileTimeOut()) {
spectrumServiceQuotes.getRedisStreamUtil().del(spectrumFile.getName());
spectrumServiceQuotes.getRedisStreamUtil().del(RedisConstant.UNDEAL_FILE + spectrumFile.getName());
spectrumFile.delete();
}
}

View File

@ -7,7 +7,8 @@ public enum ErrorType {
STATION_ERROR("station_code:%s=0"),
FILE_REPEAT("file repeat"),
GAS_OR_DET_ERROR("gas or det file is no exist or is error"),
AIR_SAMPLER_FLOW_ERROR("this is no ariSamplerFlow data");
AIR_SAMPLER_FLOW_ERROR("this is no ariSamplerFlow data"),
INSERT_ERROR("The sample_id%s has been reported missing in the database");
private String content;

View File

@ -0,0 +1,19 @@
package org.jeecg.modules.exception;
import lombok.Getter;
public class AnalyseException extends Exception{
@Getter
private boolean isDuplicateKeyException = false;
public AnalyseException(String message) {
super(message);
}
public AnalyseException(String message, boolean isDuplicateKeyException) {
super(message);
this.isDuplicateKeyException = isDuplicateKeyException;
}
}

View File

@ -3,7 +3,7 @@ package org.jeecg.modules.exception;
/**
* B谱分析异常
*/
public class BAnalyseException extends Exception{
public class BAnalyseException extends AnalyseException{
/**
* Constructs a new exception with the specified detail message. The
@ -16,4 +16,9 @@ public class BAnalyseException extends Exception{
public BAnalyseException(String message) {
super(message);
}
public BAnalyseException(String message, boolean isDuplicateKeyException) {
super(message,isDuplicateKeyException);
}
}

View File

@ -3,7 +3,7 @@ package org.jeecg.modules.exception;
/**
* B谱分析异常
*/
public class GAnalyseException extends Exception{
public class GAnalyseException extends AnalyseException{
/**
* Constructs a new exception with the specified detail message. The
@ -16,4 +16,9 @@ public class GAnalyseException extends Exception{
public GAnalyseException(String message) {
super(message);
}
public GAnalyseException(String message, boolean isDuplicateKeyException) {
super(message,isDuplicateKeyException);
}
}

View File

@ -1,32 +1,74 @@
package org.jeecg.modules.service.impl;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.RequiredArgsConstructor;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.Prompt;
import org.jeecg.common.properties.DetectorIdFormat;
import org.jeecg.modules.base.entity.configuration.GardsDetectors;
import org.jeecg.modules.base.enums.DetectorsStatus;
import org.jeecg.modules.base.enums.DetectorStatus;
import org.jeecg.modules.mapper.GardsDetectorsMapper;
import org.jeecg.modules.service.GardsDetectorsService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
import java.util.Objects;
import java.util.*;
import java.util.stream.Collectors;
@DS("ora")
@Service
@RequiredArgsConstructor
public class GardsDetectorsServiceImpl extends ServiceImpl<GardsDetectorsMapper, GardsDetectors> implements GardsDetectorsService {
private final DetectorIdFormat format;
private final DetectorIdFormat idFormat;
/**
* 校验探测器是否存在不存在则创建
* @param detectorCode
* 校验探测器是否存在不存在则创建
*/
@Transactional
@Override
public GardsDetectors check(String detectorCode) {
LambdaQueryWrapper<GardsDetectors> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(GardsDetectors::getDetectorCode, detectorCode);
Optional<GardsDetectors> optional = this.list(wrapper).stream().findFirst();
if (optional.isPresent())
return optional.get();
Integer detectorId = idFormat.codeToId(detectorCode);
Integer stationId = idFormat.codeToStationId(detectorCode);
if (ObjectUtil.isNull(detectorId) || ObjectUtil.isNull(stationId))
throw new RuntimeException("Invalid Detector Code: " + detectorCode);
GardsDetectors detector = new GardsDetectors();
detector.setDetectorId(detectorId);
detector.setStationId(stationId);
detector.setDetectorCode(detectorCode);
detector.setStatus(DetectorStatus.ON.getValue());
// 查询相同台站下所有工作的探测器 按照探测器Id升序排序
wrapper.clear();
wrapper.eq(GardsDetectors::getStationId, stationId);
List<GardsDetectors> detectors = this.list(wrapper);
detectors = detectors.stream()
.filter(item -> StrUtil.equals(StrUtil.trim(item.getStatus()), DetectorStatus.ON.getValue()))
.sorted(Comparator.comparingInt(GardsDetectors::getDetectorId))
.collect(Collectors.toList());
// 如果相同台站下没有工作探测器
if (CollUtil.isEmpty(detectors))
return this.save(detector) ? detector : null;
// 如果相同台站下有工作探测器 将Id最小的探测器状态置为 Unoperating
GardsDetectors detectorMin = detectors.get(0);
detectorMin.setStatus(DetectorStatus.OFF.getValue());
detectors = ListUtil.toList(detectorMin, detector);
return this.saveOrUpdateBatch(detectors) ? detector : null;
}
/*@Transactional
@Override
public GardsDetectors check(String detectorCode) {
LambdaQueryWrapper<GardsDetectors> detectorsQuery = new LambdaQueryWrapper<>();
detectorsQuery.select(GardsDetectors::getDetectorId);
@ -42,5 +84,5 @@ public class GardsDetectorsServiceImpl extends ServiceImpl<GardsDetectorsMapper,
return detector;
}
return query;
}
}*/
}

View File

@ -16,6 +16,7 @@ import org.jeecg.modules.eneity.event.SpectrumErrorEvent;
import org.jeecg.modules.eneity.event.SpectrumLog;
import org.jeecg.modules.enums.ErrorType;
import org.jeecg.modules.enums.SpectrumSource;
import org.jeecg.modules.exception.AnalyseException;
import org.jeecg.modules.exception.FileRepeatException;
import org.jeecg.modules.exception.HeaderBlockException;
import org.jeecg.modules.file.FileOperation;
@ -272,9 +273,37 @@ public abstract class AbstractSpectrumHandler extends AbstractChain {
final String rootPath = spectrumServiceQuotes.getSpectrumPathProperties().getRootPath();
final String undealPath = spectrumServiceQuotes.getSpectrumPathProperties().getUndealPath();
final String finalPath = rootPath+File.separator+undealPath;
FileOperation.moveFile(spectrumFile,finalPath,true);
//判断文件是否在savefile下已经保存过
final String savefileName = spectrumServiceQuotes.getSpectrumPathProperties().getSaveFilePath().substring(1);
//若文件已经在savefile了进行复制
if(spectrumFile.getAbsolutePath().contains(savefileName)){
boolean copyFlag = false;
if (e instanceof AnalyseException) {
AnalyseException exception = (AnalyseException) e;
if (!exception.isDuplicateKeyException()) {
copyFlag = true;
}
}else{
copyFlag = true;
}
if(copyFlag){
FileOperation.copyFile(spectrumFile,finalPath,true);
}
}else {
//若文件不在savefile 则判断异常是否属于违反唯一约束性
if (e instanceof AnalyseException) {
AnalyseException exception = (AnalyseException) e;
if (exception.isDuplicateKeyException()) {
this.spectrumFile.delete();
}
} else {
//如果文件在savefile中没有 并且 不属于违反唯一约束性的异常 将文件移动到undeal
FileOperation.moveFile(spectrumFile,finalPath,true);
}
}
}
} catch (IOException ex) {
log.error("An error occurred during the process of processing the failed parsing file. The file is: {}, and the reason is: {}",this.spectrumFile.getAbsolutePath(),e.getMessage());
ex.printStackTrace();
}
}else if(SpectrumSource.FROM_FILE_SOURCE.getSourceType().equals(spectrumSource) && (e instanceof FileRepeatException)){

View File

@ -1,5 +1,6 @@
package org.jeecg.modules.spectrum;
import cn.hutool.core.util.ObjectUtil;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.StringConstant;
import org.jeecg.common.properties.SpectrumPathProperties;
@ -81,13 +82,18 @@ public class AlertSpectrum extends AbstractSpectrumHandler{
this.parseingEmail();
//修改能谱文件名称
this.updateSpectrumFileName();
//结构体数据入库
this.handlerOriginalData();
//保存PHD文件到savefile
super.saveFileToSavefile();
//结构体数据入库
this.handlerOriginalData();
//若本次文件来自于undel目录解析成功则删除
deleteIfFromUndelFile();
}catch (Exception e){
// 如果解析流程异常 入库开始/结束时间赋初始值
if (ObjectUtil.isNull(this.startIntoDatabaseTime))
this.startIntoDatabaseTime = new Date();
if (ObjectUtil.isNull(this.endIntoDatabaseTime))
this.endIntoDatabaseTime = new Date();
//异常返回文件名称用于报错日志
super.returnFileName.append(super.spectrumFile.getName());

View File

@ -41,10 +41,10 @@ public class DetbkphdSpectrum extends AbstractS_D_Q_G_SpectrumHandler {
super.readFileLabel();
//修改能谱文件名称
super.updateSpectrumFileName();
//结构体数据入库
super.handlerOriginalData();
//保存PHD文件到savefile
super.saveFileToSavefile();
//结构体数据入库
super.handlerOriginalData();
//修改状态为解析完成
super.status = SampleStatus.COMPLETE.getValue();
super.updateStatus();

View File

@ -43,10 +43,10 @@ public class GasbkphdSpectrum extends AbstractS_D_Q_G_SpectrumHandler {
super.readFileLabel();
//修改能谱文件名称
super.updateSpectrumFileName();
//结构体数据入库
super.handlerOriginalData();
//保存PHD文件到savefile
super.saveFileToSavefile();
//结构体数据入库
super.handlerOriginalData();
//修改状态为解析完成
super.status = SampleStatus.COMPLETE.getValue();
super.updateStatus();

View File

@ -1,5 +1,6 @@
package org.jeecg.modules.spectrum;
import cn.hutool.core.util.ObjectUtil;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.StringConstant;
import org.jeecg.common.properties.SpectrumPathProperties;
@ -80,15 +81,21 @@ public class HealthStatusSpectrum extends AbstractSpectrumHandler{
this.parseingEmail();
//修改能谱文件名称
this.updateSpectrumFileName();
//结构体数据入库
this.handlerOriginalData();
//保存PHD文件到savefile
super.saveFileToSavefile();
//结构体数据入库
this.handlerOriginalData();
//把流程日志保存到日志目录
this.saveLogToLogDir();
//若本次文件来自于undel目录解析成功则删除
deleteIfFromUndelFile();
}catch (Exception e){
// 如果解析流程异常 入库开始/结束时间赋初始值
if (ObjectUtil.isNull(this.startIntoDatabaseTime))
this.startIntoDatabaseTime = new Date();
if (ObjectUtil.isNull(this.endIntoDatabaseTime))
this.endIntoDatabaseTime = new Date();
//异常返回文件名称用于报错日志
super.returnFileName.append(super.spectrumFile.getName());
//处理解析失败的文件

View File

@ -1,5 +1,6 @@
package org.jeecg.modules.spectrum;
import cn.hutool.core.util.ObjectUtil;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.StringConstant;
import org.jeecg.common.properties.SpectrumPathProperties;
@ -71,15 +72,21 @@ public class MetSpectrum extends AbstractSpectrumHandler{
this.parseingEmail();
//修改能谱文件名称
this.updateSpectrumFileName();
//结构体数据入库
this.handlerOriginalData();
//保存PHD文件到savefile
super.saveFileToSavefile();
//结构体数据入库
this.handlerOriginalData();
//把流程日志保存到日志目录
this.saveLogToLogDir();
//若本次文件来自于undel目录解析成功则删除
deleteIfFromUndelFile();
}catch (Exception e){
// 如果解析流程异常 入库开始/结束时间赋初始值
if (ObjectUtil.isNull(this.startIntoDatabaseTime))
this.startIntoDatabaseTime = new Date();
if (ObjectUtil.isNull(this.endIntoDatabaseTime))
this.endIntoDatabaseTime = new Date();
//异常返回文件名称用于报错日志
super.returnFileName.append(super.spectrumFile.getName());
throw e;

View File

@ -42,10 +42,10 @@ public class QcphdSpectrum extends AbstractS_D_Q_G_SpectrumHandler {
super.readFileLabel();
//修改能谱文件名称
super.updateSpectrumFileName();
//结构体数据入库
super.handlerOriginalData();
//保存PHD文件到savefile
super.saveFileToSavefile();
//结构体数据入库
super.handlerOriginalData();
//修改状态为解析完成
super.status = SampleStatus.COMPLETE.getValue();
super.updateStatus();

View File

@ -23,6 +23,7 @@ import org.jeecg.modules.native_jni.EnergySpectrumHandler;
import org.jeecg.modules.native_jni.struct.BgAnalyseResult;
import org.jeecg.modules.native_jni.struct.EnergySpectrumStruct;
import org.jeecg.modules.service.BlockConstant;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.transaction.TransactionStatus;
import java.io.*;
import java.time.Instant;
@ -170,7 +171,11 @@ public class Sample_B_Analysis implements BlockConstant {
pushToRedis();
}catch (Exception e){
analyseFail = true;
throw new BAnalyseException(e.getMessage());
if (e instanceof DuplicateKeyException) {
throw new BAnalyseException(e.getMessage(), true);
} else {
throw new BAnalyseException(e.getMessage());
}
}finally {
this.endAnalysisTime = new Date();
//如果分析成功并且analyses对象不为空

View File

@ -3,7 +3,6 @@ package org.jeecg.modules.spectrum;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
@ -20,6 +19,7 @@ import org.jeecg.common.constant.enums.SpectrumSystemType;
import org.jeecg.common.properties.ParameterProperties;
import org.jeecg.common.properties.SpectrumPathProperties;
import org.jeecg.common.util.*;
import org.jeecg.modules.ErrorLogManager;
import org.jeecg.modules.base.bizVo.AttributeItemVo;
import org.jeecg.modules.base.dto.*;
import org.jeecg.modules.base.entity.original.GardsSampleData;
@ -28,11 +28,14 @@ import org.jeecg.modules.base.enums.DSType;
import org.jeecg.modules.base.enums.MiddleDataType;
import org.jeecg.modules.base.enums.SpectrumType;
import org.jeecg.modules.config.datasource.DataSourceSwitcher;
import org.jeecg.modules.eneity.event.SpectrumErrorEvent;
import org.jeecg.modules.entity.vo.*;
import org.jeecg.modules.enums.ErrorType;
import org.jeecg.modules.exception.GAnalyseException;
import org.jeecg.modules.file.FileOperation;
import org.jeecg.modules.native_jni.struct.EnergySpectrumStruct;
import org.jeecgframework.core.util.ApplicationContextUtil;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.transaction.TransactionStatus;
import org.w3c.dom.*;
import org.xml.sax.SAXException;
@ -43,13 +46,13 @@ import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.sql.SQLIntegrityConstraintViolationException;
import java.text.ParseException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.*;
import static org.jeecg.modules.service.BlockConstant.XE_131m;
@Data
@Slf4j
@ -124,7 +127,7 @@ public class Sample_G_Analysis {
this.dataType = energySpectrumStruct.data_type;
this.sampleInputFilename = sampleData.getInputFileName();
this.sampleFilename = StringUtils.substring(sampleData.getInputFileName(),
sampleData.getInputFileName().lastIndexOf((StringConstant.SLASH)+1));
sampleData.getInputFileName().lastIndexOf((StringConstant.SLASH))+1);
}
public void analysis() throws GAnalyseException{
@ -194,7 +197,11 @@ public class Sample_G_Analysis {
}catch (Exception e){
e.printStackTrace();
log.error("Sample_G_Analysis", e);
throw new GAnalyseException("Sample Analyse Error at "+DateUtils.formatDate(new Date(),"yyyy-MM-dd HH:mm:ss"));
if (e instanceof DuplicateKeyException) {
throw new GAnalyseException("Sample Analyse Error at "+DateUtils.formatDate(new Date(),"yyyy-MM-dd HH:mm:ss"), true);
} else {
throw new GAnalyseException("Sample Analyse Error at "+DateUtils.formatDate(new Date(),"yyyy-MM-dd HH:mm:ss"));
}
}
log.info("Gamma自动处理分析--End");
}

View File

@ -1,8 +1,18 @@
package org.jeecg.modules.spectrum;
import org.jeecg.modules.ErrorLogManager;
import org.jeecg.modules.base.enums.DataType;
import org.jeecg.modules.base.enums.SampleStatus;
import org.jeecg.modules.base.enums.SystemType;
import org.jeecg.modules.eneity.event.SpectrumErrorEvent;
import org.jeecg.modules.enums.ErrorType;
import org.jeecg.modules.enums.SpectrumSource;
import org.jeecg.modules.exception.AnalyseException;
import org.jeecg.modules.exception.GAnalyseException;
import org.springframework.dao.DuplicateKeyException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Date;
import java.util.Objects;
/**
@ -42,12 +52,20 @@ public class SamplephdSpectrum extends AbstractS_D_Q_G_SpectrumHandler {
super.readFileLabel();
//修改能谱文件名称
super.updateSpectrumFileName();
//保存PHD文件到savefile
//如果文件来自于邮箱或filesource则在存储原始库之前需移动到savefile便于和input_file_name字段对应
if(!SpectrumSource.FORM_FILE_UNDEL.getSourceType().equals(super.spectrumSource)){
super.saveFileToSavefile();
}
//结构体数据入库
super.handlerOriginalData();
//进行BG(P)谱分析
this.autoAnalysis();
//保存PHD文件到savefile
super.saveFileToSavefile();
//如果文件来自于undel必须到分析结束后才能移动到savefile否则可能导致本次处理再次失败然后就移动到savefile了
if(SpectrumSource.FORM_FILE_UNDEL.getSourceType().equals(super.spectrumSource)){
super.saveFileToSavefile();
}
//修改状态为解析完成
super.status = SampleStatus.COMPLETE.getValue();
super.updateStatus();
@ -56,10 +74,19 @@ public class SamplephdSpectrum extends AbstractS_D_Q_G_SpectrumHandler {
}catch (Exception e){
//异常返回文件名称用于报错日志
super.returnFileName.append(super.spectrumFile.getName());
//修改状态为解析失败
super.status = SampleStatus.FAIL.getValue();
super.updateStatus();
if (e instanceof AnalyseException) {
AnalyseException exception = (AnalyseException) e;
if (exception.isDuplicateKeyException()) {
ErrorLogManager.getInstance().write(new SpectrumErrorEvent(new Date(), ErrorType.INSERT_ERROR, super.spectrumFile.getName(), String.valueOf(this.sampleData.getSampleId())));
}else{
super.updateStatus();
}
} else {
super.updateStatus();
}
//处理解析失败的文件
super.handleParseingFailFile(e);
throw e;

View File

@ -65,10 +65,12 @@ public class SpectrumLogManager {
* 保存所有日志
*/
public void saveAllLog(){
if(!execLogMap.isEmpty()){
execLogMap.forEach((k,v)->{
this.saveLog(k);
});
synchronized (execLogMap){
if(!execLogMap.isEmpty()){
execLogMap.forEach((k,v)->{
this.saveLog(k);
});
}
}
}
}

View File

@ -3,12 +3,16 @@ package org.jeecg.modules.spectrum;
import cn.hutool.core.io.FileUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.constant.StringConstant;
import org.jeecg.common.email.EmailLogEvent;
import org.jeecg.common.email.EmailLogManager;
import org.jeecg.common.email.EmailServiceManager;
import org.jeecg.common.util.DateUtils;
import org.jeecg.modules.email.EmailProperties;
import org.jeecg.modules.enums.SpectrumSource;
import javax.mail.Message;
import javax.mail.internet.MimeMessage;
import java.io.File;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
@ -25,7 +29,6 @@ public class SpectrumParsingActuator implements Runnable{
private final static String MSG_TYPE = "MSG_TYPE DATA";
private final static String EMAIL_STOP = "STOP";
/**
* 邮件对象
*/
@ -50,6 +53,10 @@ public class SpectrumParsingActuator implements Runnable{
* 邮件计数器
*/
private EmailCounter emailCounter;
/**
* 一天秒数
*/
private final int expiryTime = 86400;
public void init(Message message, EmailProperties emailProperties,EmailServiceManager emailServiceManager,
CountDownLatch taskLatch, SpectrumServiceQuotes spectrumServiceQuotes,
@ -66,11 +73,17 @@ public class SpectrumParsingActuator implements Runnable{
public void run() {
String subject = null;
try {
//线程开始初始化时初始本线程负责的能谱日志事件
SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null);
//获取邮件主题
subject = emailServiceManager.getMailSubject(message);
//解析之前先把邮件唯一信息存储到redis
String messageId = ((MimeMessage) message).getMessageID();
String emlName = subject+ StringConstant.UNDER_LINE+ DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss");
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime);
//线程开始初始化时初始本线程负责的能谱日志事件
SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null);
//所有邮件都需以.eml格式存储到eml文件夹中
final File emlFile = emailServiceManager.downloadEmailToEmlDir(message, emailCounter.getCurrValue());
//保存邮件日志到PG数据库

View File

@ -5,6 +5,7 @@ import lombok.RequiredArgsConstructor;
import org.jeecg.common.properties.*;
import org.jeecg.common.util.NameStandUtil;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.datasource.OraDataSourceProperties;
import org.jeecg.modules.service.*;
import org.springframework.context.ApplicationContext;
@ -81,6 +82,9 @@ public class SpectrumServiceQuotes {
private final ApplicationContext applicationContext;
private final RedisUtil redisUtil;
private final MaximumPoolSizeProperties maximumPoolSizeProperties;
/**
* 原始库插入数据锁
*/

View File

@ -5,6 +5,8 @@ import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@ -13,6 +15,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.aspect.annotation.AutoLog;
import org.jeecg.common.aspect.annotation.PermissionData;
@ -52,15 +55,13 @@ public class JeecgDemoController extends JeecgController<JeecgDemo, IJeecgDemoSe
//声明FTP客户端
FTPClient ftpClient = new FTPClient();
//连接
ftpClient.connect("172.21.70.87", 21);
ftpClient.connect("127.0.0.1", 21);
//登录
ftpClient.login("rmsops", "cnndc66367220");
ftpClient.login("rmsops", "cnndc010");
//判断ftp是否连接成功
if (Objects.isNull(ftpClient)){
throw new RuntimeException("ftp connection failed!");
}
InputStream iStream= null;
File file = null;
try {
//被动模式
ftpClient.enterLocalPassiveMode();
@ -69,10 +70,9 @@ public class JeecgDemoController extends JeecgController<JeecgDemo, IJeecgDemoSe
//
ftpClient.setControlEncoding("UTF-8");
ftpClient.setFileTransferMode(FTPClient.STREAM_TRANSFER_MODE);
System.out.println(ftpClient.printWorkingDirectory());
//读取ftp文件的输入流
iStream=ftpClient.retrieveFileStream("/savefile/Spectrum/Xenon/Sauna/Samplephd/2023/10/SEX63_007-20231026_0452_S_FULL_22495.8.PHD");
System.out.println(Objects.isNull(iStream));
List<String> files = new LinkedList<>();
readFiles(ftpClient, ftpClient.printWorkingDirectory()+ StringPool.SLASH + "savefile", files);
System.out.println("文件数量:"+files.size());
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
@ -80,18 +80,32 @@ public class JeecgDemoController extends JeecgController<JeecgDemo, IJeecgDemoSe
if (Objects.nonNull(ftpClient)){
ftpClient.disconnect();
}
if (Objects.nonNull(iStream)){
iStream.close();
}
if (Objects.nonNull(file)) {
file.delete();
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
private static void readFiles(FTPClient ftpClient, String parentFilePath, List<String> files) {
String oldPath = parentFilePath;
try {
ftpClient.changeWorkingDirectory(parentFilePath);
FTPFile[] directories = ftpClient.listFiles(parentFilePath);
if (Objects.nonNull(directories)) {
for (FTPFile ftpFile:directories) {
if (ftpFile.isDirectory()) {
parentFilePath = oldPath + StringPool.SLASH + ftpFile.getName();
readFiles(ftpClient, parentFilePath, files);
} else if (ftpFile.isFile()) {
files.add(ftpFile.getName());
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Autowired

View File

@ -70,5 +70,4 @@ public class GardsDetectorsController {
public Map<String, List<GardsDetectorsSystem>> findStationDetectors(@RequestBody List<String> stationIds){
return gardsDetectorsService.findStationDetectors(stationIds);
}
}

View File

@ -22,6 +22,4 @@ public interface GardsDetectorsMapper extends BaseMapper<GardsDetectorsSystem> {
* @return
*/
List<String> findType();
List<GardsDetectorsSystem> list(Integer stationId, String status);
}

View File

@ -38,15 +38,4 @@
<select id="findType" resultType="java.lang.String">
SELECT DISTINCT TYPE FROM CONFIGURATION.GARDS_DETECTORS
</select>
<select id="list" resultType="org.jeecg.modules.system.entity.GardsDetectorsSystem">
SELECT
*
FROM
CONFIGURATION.GARDS_DETECTORS det
<where>
det.STATION_ID = #{stationId} AND TRIM(det.STATUS) = #{status}
</where>
ORDER BY det.DETECTOR_ID ASC
</select>
</mapper>

View File

@ -36,14 +36,14 @@ public interface IGardsDetectorsService extends IService<GardsDetectorsSystem> {
* @param gardsDetectors
* @return
*/
Result create(GardsDetectorsSystem gardsDetectors);
Result<?> create(GardsDetectorsSystem gardsDetectors);
/**
* 修改监测器信息
* @param gardsDetectors
* @return
*/
Result update(GardsDetectorsSystem gardsDetectors);
Result<?> update(GardsDetectorsSystem gardsDetectors);
/**
* 删除监测器信息
@ -64,5 +64,4 @@ public interface IGardsDetectorsService extends IService<GardsDetectorsSystem> {
* @return
*/
Map<String, List<GardsDetectorsSystem>> findStationDetectors(List<String> stationIds);
}

View File

@ -11,7 +11,6 @@ import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.Lists;
import org.jeecg.common.api.QueryRequest;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.Prompt;
@ -19,7 +18,6 @@ import org.jeecg.common.properties.DetectorIdFormat;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.configuration.GardsDetectors;
import org.jeecg.modules.base.enums.DetectorStatus;
import org.jeecg.modules.base.enums.DetectorsStatus;
import org.jeecg.modules.system.entity.GardsDetectorsSystem;
import org.jeecg.modules.system.mapper.GardsDetectorsMapper;
import org.jeecg.modules.system.service.IGardsDetectorsService;
@ -90,7 +88,7 @@ public class GardsDetectorsServiceImpl extends ServiceImpl<GardsDetectorsMapper,
@Transactional
public Result<?> create(GardsDetectorsSystem detector) {
String detectorCode = detector.getDetectorCode();
Integer detectorId = idFormat.detectorCodeToId(detectorCode);
Integer detectorId = idFormat.codeToId(detectorCode);
if (ObjectUtil.isNull(detectorId))
return Result.error("Detector Code Is Invalid");
if (ObjectUtil.isNotNull(getById(detectorId)))
@ -152,7 +150,7 @@ public class GardsDetectorsServiceImpl extends ServiceImpl<GardsDetectorsMapper,
redisUtil.del("detectorsMap", "detectorsUsedList");
List<GardsDetectorsSystem> gardsDetectors = this.baseMapper.selectList(new LambdaQueryWrapper<>());
Map<Integer, String> detectorsMap = gardsDetectors.stream().collect(Collectors.toMap(GardsDetectorsSystem::getDetectorId, GardsDetectorsSystem::getDetectorCode));
List<Integer> detectorsUsedList = gardsDetectors.stream().filter(item -> item.getStatus()!=null && DetectorsStatus.OPERATING.getStatus().equals(item.getStatus().trim())).map(GardsDetectorsSystem::getStationId).collect(Collectors.toList());
List<Integer> detectorsUsedList = gardsDetectors.stream().filter(item -> item.getStatus()!=null && "Operating".equals(item.getStatus().trim())).map(GardsDetectorsSystem::getStationId).collect(Collectors.toList());
redisUtil.set("detectorsMap",detectorsMap);
redisUtil.set("detectorsUsedList", detectorsUsedList);
}
@ -165,7 +163,7 @@ public class GardsDetectorsServiceImpl extends ServiceImpl<GardsDetectorsMapper,
queryWrapper.in(GardsDetectorsSystem::getStationId, stationIds);
List<GardsDetectorsSystem> detectorsList = this.baseMapper.selectList(queryWrapper);
for (String stationId:stationIds) {
List<GardsDetectorsSystem> detectors = detectorsList.stream().filter(item -> item.getStationId()!=null && item.getStationId().equals(Integer.valueOf(stationId)) && item.getStatus()!=null && item.getStatus().trim().equals(DetectorsStatus.OPERATING.getStatus())).collect(Collectors.toList());
List<GardsDetectorsSystem> detectors = detectorsList.stream().filter(item -> item.getStationId()!=null && item.getStationId().equals(Integer.valueOf(stationId)) && item.getStatus()!=null && item.getStatus().trim().equals("Operating")).collect(Collectors.toList());
map.put(stationId, detectors);
}
}

View File

@ -80,12 +80,12 @@ public class JeecgAutoProcessApplication extends SpringBootServletInitializer im
//校验临时存储目录是否存在不存在则创建
checkTempStorageDirectory();
//初始化邮箱邮件声明周期日志
EmailLogManager.init(spectrumPathProperties);
// EmailLogManager.init(spectrumPathProperties);
//初始化能谱解析错误日志管理器
ErrorLogManager.init(spectrumPathProperties);
//校验存储目录是否存在不存在则创建存在无操作
checkStorageDirectory();
autoProcessManager.start(systemStartupTime);
// autoProcessManager.start(systemStartupTime);
undealHandleManager.start();
fileSourceHandleManager.start();
// 删除过期的文件