From 600c5235ec9da6d2e27b3523483ad517d73b4f19 Mon Sep 17 00:00:00 2001 From: qiaoqinzheng Date: Tue, 30 Apr 2024 11:51:54 +0800 Subject: [PATCH] =?UTF-8?q?=E8=87=AA=E5=BB=BA=E5=8F=B0=E7=AB=99=E5=88=86?= =?UTF-8?q?=E6=94=AF=E5=90=8C=E6=AD=A5=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/email/EmailServiceManager.java | 195 +++++++++--------- .../common/properties/TaskProperties.java | 5 + .../jeecg/modules/base/enums/Condition.java | 2 +- .../org/jeecg/modules/AutoProcessManager.java | 11 +- .../jeecg/modules/EmailParsingActuator.java | 28 +-- .../spectrum/SpectrumParsingActuator.java | 9 +- 6 files changed, 128 insertions(+), 122 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java index b5548832..8a3e9e25 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java @@ -20,6 +20,7 @@ import org.jeecg.common.constant.SymbolConstant; import org.jeecg.common.email.emuns.MailContentType; import org.jeecg.common.exception.DownloadEmailException; import org.jeecg.common.properties.SpectrumPathProperties; +import org.jeecg.common.properties.TaskProperties; import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.Md5Util; import org.jeecg.common.util.RedisUtil; @@ -54,6 +55,8 @@ public class EmailServiceManager { private SysEmail email; private SpectrumPathProperties spectrumPathProperties; + + private TaskProperties taskProperties; /** * 系统启动时间 */ @@ -69,6 +72,8 @@ public class EmailServiceManager { private RedisUtil redisUtil; + private Object downloadEmlLocal = new Object(); + @NotNull public static EmailServiceManager getInstance(){ return new EmailServiceManager(); @@ -87,13 +92,14 @@ public class EmailServiceManager { * @param email 邮件属性 */ public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath, - Date systemStartupTime, SpectrumPathProperties pathProperties, + Date systemStartupTime, SpectrumPathProperties pathProperties,TaskProperties taskProperties, RedisUtil redisUtil){ this.email = email; this.receiveNum = receiveNum; this.temporaryStoragePath = temporaryStoragePath; this.systemStartupTime = systemStartupTime; this.spectrumPathProperties = pathProperties; + this.taskProperties = taskProperties; this.redisUtil = redisUtil; } @@ -528,105 +534,85 @@ public class EmailServiceManager { * 当计数大于10000后从0开始,服务重启后也从0开始 */ public File downloadEmailToEmlDir(@NotNull Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException { - String subject = ""; - File emlFile = null; - String status = EmailLogManager.STATUS_SUCCESS; - Date receivedDate = null; - try { - //获取发件人 - final String address = ((InternetAddress) message.getFrom()[0]).getAddress(); - final String from = address.substring(0,address.indexOf(StringConstant.AT)); - //获取主题 - subject = MimeUtility.decodeText(message.getSubject()); - if(subject.contains(StringConstant.SLASH)){ - subject = StringUtils.replace(subject,StringConstant.SLASH,""); - } - if(subject.contains(StringConstant.COLON)){ - subject = StringUtils.replace(subject,StringConstant.COLON,""); - } - receivedDate = message.getReceivedDate(); - StringBuilder fileName = new StringBuilder(); - fileName.append(from); - fileName.append(StringConstant.UNDER_LINE); - fileName.append(subject); - fileName.append(StringConstant.UNDER_LINE); - fileName.append(DateUtils.formatDate(new Date(),"YYMMdd")); - fileName.append(StringConstant.UNDER_LINE); - fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS")); - fileName.append(StringConstant.UNDER_LINE); - fileName.append("receive"); - fileName.append(StringConstant.UNDER_LINE); - fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd")); - fileName.append(StringConstant.UNDER_LINE); - fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS")); - fileName.append(StringConstant.UNDER_LINE); - fileName.append(emailCounter); - fileName.append(SAVE_EML_SUFFIX); - final String rootPath = spectrumPathProperties.getRootPath(); - final String emlPath = spectrumPathProperties.getEmlPath(); - emlFile = new File(rootPath + emlPath + File.separator + fileName); - /* 如果邮件内容经过Base64编码 需要解码后再生成.eml文件 否则直接生成.eml文件 */ - final String BASE64_FLAG = "Base64"; - String content = (String) message.getContent(); - // 将正文内容分割为两部分: Base64 和 MD5|压缩前大小|压缩后大小|正文Base64编码 - List contents = StrUtil.split(content, '|', 2); - if (StrUtil.equals(contents.get(0), BASE64_FLAG)){ - // 先将未解码的内容保存为.eml文件 - FileUtil.writeFromStream(message.getInputStream(), emlFile); - // content: MD5|压缩前大小|压缩后大小|正文Base64编码 - content = contents.get(1); - // 将content分割为: MD5 和 压缩前大小|压缩后大小|正文Base64编码 - contents = StrUtil.split(content, '|', 2); - String md5 = contents.get(0); - content = contents.get(1); - if (StrUtil.isBlank(content)) return emlFile; - content = StrUtil.cleanBlank(content); - String md5Verified = MD5.create().digestHex(content); - // 如果md5验证失败 则不进行解码 - if (!StrUtil.equals(md5, md5Verified)) return emlFile; - contents = StrUtil.split(content, '|'); - String base64 = contents.get(contents.size() - 1); - // 解码Base64字符串 并用解码后的内容覆盖未解码内容 - byte[] data = Base64.decode(base64); - try (OutputStream out = Files.newOutputStream(emlFile.toPath()); - InflaterOutputStream inflaterOutputStream = new InflaterOutputStream(out)) { - inflaterOutputStream.write(data); - } catch (Exception e) { - log.error("Create the Base64 decoded file[{}] error: {}", emlFile.getAbsolutePath(), e.getMessage()); + synchronized (downloadEmlLocal) { + String subject = ""; + File emlFile = null; + String status = EmailLogManager.STATUS_SUCCESS; + Date receivedDate = null; + InputStream inputStream = null; + BufferedOutputStream outputStream = null; + try { + //获取发件人 + final String address = ((InternetAddress) message.getFrom()[0]).getAddress(); + final String from = address.substring(0,address.indexOf(StringConstant.AT)); + //获取主题 + subject = MimeUtility.decodeText(message.getSubject()); + if(subject.indexOf(StringConstant.SLASH) != -1){ + subject = StringUtils.replace(subject,StringConstant.SLASH,""); + } + if(subject.indexOf(StringConstant.COLON) != -1){ + subject = StringUtils.replace(subject,StringConstant.COLON,""); + } + receivedDate = message.getReceivedDate(); + StringBuilder fileName = new StringBuilder(); + fileName.append(from); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(subject); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(new Date(),"YYMMdd")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append("receive"); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(emailCounter); + fileName.append(SAVE_EML_SUFFIX); + final String rootPath = spectrumPathProperties.getRootPath(); + final String emlPath = spectrumPathProperties.getEmlPath(); + emlFile = new File(rootPath+emlPath+File.separator+fileName); +// outputStream = new FileOutputStream(emlFile); +// message.writeTo(outputStream); + + int bufferSize = 1024 * 1024; // 1M + inputStream = message.getInputStream(); + outputStream = new BufferedOutputStream(new FileOutputStream(emlFile), bufferSize); + // 从邮件的输入流读取内容,并写入到本地文件 + byte[] buffer = new byte[bufferSize]; + int bytesRead; + while ((bytesRead = inputStream.read(buffer)) != -1) { + outputStream.write(buffer, 0, bytesRead); + } + + } catch (MessagingException | IOException e) { + // 下载邮件失败 抛出自定义邮件下载异常 + status = EmailLogManager.STATUS_ERROR; + String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage()); + log.error(errorMsg); + throw new DownloadEmailException(errorMsg); + }catch (Exception e) { + log.error("",e); + }finally { + EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"), + (Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath())); + EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event); + try { + if (Objects.nonNull(inputStream)) { + inputStream.close(); + } + if (Objects.nonNull(outputStream)) { + outputStream.flush(); + outputStream.close(); + } + } catch (IOException e) { + throw new RuntimeException(e); } - } else { // 直接生成.eml文件 - message.writeTo(Files.newOutputStream(emlFile.toPath())); } -// int bufferSize = 1024 * 1024; // 1M -// InputStream inputStream = message.getInputStream(); -// BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize); -// // 或者使用 BufferedOutputStream -// OutputStream outputStream = new FileOutputStream(emlFile); -// BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize); -// // 从邮件的输入流读取内容,并写入到本地文件 -// byte[] buffer = new byte[bufferSize]; -// int bytesRead; -// while ((bytesRead = bufferedInputStream.read(buffer)) != -1) { -// bufferedOutputStream.write(buffer, 0, bytesRead); -// } -// -// // 关闭流 -// bufferedInputStream.close(); -// bufferedOutputStream.close(); - } catch (MessagingException | IOException e) { - // 下载邮件失败 抛出自定义邮件下载异常 - status = EmailLogManager.STATUS_ERROR; - String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage()); - log.error(errorMsg); - throw new DownloadEmailException(errorMsg); - }catch (Exception e) { - log.error("The email download failed: {}", e.getMessage()); - }finally { - EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"), - (Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath())); - EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event); + return emlFile; } - return emlFile; } /** @@ -668,10 +654,10 @@ public class EmailServiceManager { if(null != store){ store.close(); } - for(String messageId : messageIds){ - String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; - redisUtil.del(key); - } +// for(String messageId : messageIds){ +// String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; +// redisUtil.del(key); +// } } catch (MessagingException e) { log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage()); e.printStackTrace(); @@ -687,10 +673,13 @@ public class EmailServiceManager { boolean exist = false; try { String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; - exist = redisUtil.hasKey(key); - if(exist){ + int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0; +// exist = redisUtil.hasKey(key); + if(numberKey >= taskProperties.getForceDeletedNumber()){ + exist = true; log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss")); message.setFlag(Flags.Flag.DELETED,true); + redisUtil.del(key); } return exist; } catch (MessagingException e) { diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java index 34f9e294..3a9122b7 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java @@ -41,6 +41,11 @@ public class TaskProperties implements Serializable { */ private Integer mailThreadExecCycle; + /** + * 线程获取失败邮件次数 + */ + private Integer forceDeletedNumber; + /** * 监测需删除的邮件线程执行周期(毫秒) */ diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Condition.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Condition.java index 7ce3278a..587c8923 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Condition.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Condition.java @@ -9,7 +9,7 @@ import lombok.Getter; public enum Condition { FIRST_FOUND("1"), ABOVE_AVERAGE("2"), MEANWHILE("3"); - private String value; + private final String value; public static Condition valueOf1(String value){ for (Condition condition : Condition.values()) { diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java index 3a0880dd..9460d44d 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java @@ -233,9 +233,15 @@ public class AutoProcessManager{ if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){ final boolean testFlag = testConnectEmailServer(databaseEmail); if(testFlag){ - databaseEmail.setNewEmailFlag(true); + if (emailExecThreadMap.containsKey(databaseEmail.getId())) { + EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId()); + actuator.setStop(false); + log.info("{}邮箱重新加入监测队列",databaseEmail.getUsername()); + } else { + databaseEmail.setNewEmailFlag(true); + log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername()); + } emailMap.put(databaseEmail.getId(),databaseEmail); - log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername()); } } } @@ -279,6 +285,7 @@ public class AutoProcessManager{ if(next.getValue().getState() == State.TERMINATED){ log.info("{}邮箱执行线程已停止,emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername()); checkStopThreads.remove(); + emailMap.remove(next.getKey()); } } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java index 8abbeb06..67983395 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java @@ -74,23 +74,23 @@ public class EmailParsingActuator extends Thread{ long start = System.currentTimeMillis(); final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(), - this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(),spectrumServiceQuotes.getRedisUtil()); + this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil()); List messageIds = new ArrayList<>(); try { Message[] messages = emailServiceManager.receiveMail(); if(ArrayUtils.isNotEmpty(messages)){ - log.info("EmailParsingActuator本次获取邮件数量为:{}",messages.length); + log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), messages.length); //检验获取的邮件是否在之前删除失败列表中,若在直接调用邮件API删除,并且此次数组里元素也删除 -// for(int i=messages.length-1;i>=0;i--){ -// if (!messages[i].isExpunged()){ -// String messageId = ((MimeMessage) messages[i]).getMessageID(); -// final boolean exist = emailServiceManager.check(messages[i],messageId); -// messageIds.add(messageId); -// if(exist){ -// messages = ArrayUtils.remove(messages,i); -// } -// } -// } + for(int i=messages.length-1;i>=0;i--){ + if (!messages[i].isExpunged()){ + String messageId = ((MimeMessage) messages[i]).getMessageID(); + final boolean exist = emailServiceManager.check(messages[i],messageId); + messageIds.add(messageId); + if(exist){ + messages = ArrayUtils.remove(messages,i); + } + } + } log.info("EmailParsingActuator本次真实执行邮件数量为:{}",messages.length); if(messages.length > 0){ //本批次邮件号 @@ -105,8 +105,10 @@ public class EmailParsingActuator extends Thread{ taskLatch.await(); } } - }catch (InterruptedException e) { + } catch (InterruptedException e) { e.printStackTrace(); + } catch (MessagingException e) { + throw new RuntimeException(e); } finally { //清除本批次邮件日志缓存 EmailLogManager.getInstance().clear(); diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java index 53f17107..a88f07ad 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/spectrum/SpectrumParsingActuator.java @@ -96,7 +96,10 @@ public class SpectrumParsingActuator implements Runnable{ receiveDate = DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"); String emlName = subject+ StringConstant.UNDER_LINE+ receiveDate; String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; - spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime); +// spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime); + //判断当前key的下载次数是否超过限制次数 + spectrumServiceQuotes.getRedisUtil().incr(key, 1L); + spectrumServiceQuotes.getRedisUtil().expire(key, expiryTime); //线程开始初始化时,初始本线程负责的能谱日志事件 SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null); @@ -123,6 +126,7 @@ public class SpectrumParsingActuator implements Runnable{ try { //开始解析 spectrumHandler.handler(); + spectrumServiceQuotes.getRedisUtil().del(key); } catch (Exception e) { //如果是gamma谱的分析异常 if (e instanceof AnalySpectrumException) { @@ -136,7 +140,6 @@ public class SpectrumParsingActuator implements Runnable{ throw e; } } - }else{ log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject); } @@ -204,7 +207,7 @@ public class SpectrumParsingActuator implements Runnable{ final String finalPath = rootPath+emlErrorPath; FileOperation.moveFile(emlFile,finalPath,true); // 删除 key,防止下次线程执行删除邮件 - spectrumServiceQuotes.getRedisUtil().del(key); +// spectrumServiceQuotes.getRedisUtil().del(key); } }