diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java index 06c37e71..e3f72868 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java @@ -1,9 +1,12 @@ package org.jeecg.modules; +import cn.hutool.core.date.DatePattern; +import cn.hutool.core.date.DateUtil; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; +import org.jeecg.common.constant.StringConstant; import org.jeecg.common.email.EmailLogManager; import org.jeecg.common.email.EmailServiceManager; import org.jeecg.common.properties.TaskProperties; @@ -14,6 +17,7 @@ import org.jeecg.modules.spectrum.SpectrumLogManager; import org.jeecg.modules.spectrum.SpectrumParsingActuator; import org.jeecg.modules.spectrum.SpectrumServiceQuotes; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; + import javax.mail.Message; import javax.mail.MessagingException; import javax.mail.internet.MimeMessage; @@ -24,7 +28,7 @@ import java.util.concurrent.*; * 邮件解析执行器 */ @Slf4j -public class EmailParsingActuator extends Thread{ +public class EmailParsingActuator extends Thread { private TaskProperties taskProperties; @Getter @@ -33,15 +37,18 @@ public class EmailParsingActuator extends Thread{ private SpectrumServiceQuotes spectrumServiceQuotes; private EmailCounter emailCounter; private Date systemStartupTime; - @Setter @Getter + @Setter + @Getter private boolean isStop; - @Setter @Getter + @Setter + @Getter private boolean threadSleep; - @Setter @Getter + @Setter + @Getter private Date stopTime; - public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes, - EmailCounter emailCounter,Date systemStartupTime){ + public void init(EmailProperties emailProperties, SpectrumServiceQuotes spectrumServiceQuotes, + EmailCounter emailCounter, Date systemStartupTime) { this.emailProperties = emailProperties; this.spectrumServiceQuotes = spectrumServiceQuotes; this.taskProperties = spectrumServiceQuotes.getTaskProperties(); @@ -50,11 +57,11 @@ public class EmailParsingActuator extends Thread{ //获取机器可用核心数 int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto(); - int maximumPoolSize = taskProperties.getReceiveNum() > systemCores?taskProperties.getReceiveNum():systemCores; + int maximumPoolSize = taskProperties.getReceiveNum() > systemCores ? taskProperties.getReceiveNum() : systemCores; //初始化线程池 ThreadFactory threadFactory = new CustomizableThreadFactory("mail-parsing-"); - poolExecutor = new ThreadPoolExecutor(taskProperties.getReceiveNum(),maximumPoolSize,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory); + poolExecutor = new ThreadPoolExecutor(taskProperties.getReceiveNum(), maximumPoolSize, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory); } public void updateEmail(EmailProperties emailProperties) { @@ -63,10 +70,10 @@ public class EmailParsingActuator extends Thread{ @Override public void run() { - for(;;){ + for (; ; ) { String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"); if (threadSleep) { - log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is sleep!"); + log.info(nowDate + " " + this.emailProperties.getName() + " EmailParsingActuator is sleep!"); try { Thread.sleep(1000L); } catch (InterruptedException e) { @@ -75,43 +82,50 @@ public class EmailParsingActuator extends Thread{ continue; } if (isStop) { - log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!"); + log.info(nowDate + " " + this.emailProperties.getName() + " EmailParsingActuator is Stop!"); closeResource(); return; } long start = System.currentTimeMillis(); final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); - emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(), + emailServiceManager.init(this.emailProperties, this.taskProperties.getReceiveNum(), this.taskProperties.getTemporaryStoragePath(), this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil()); List messageIds = new ArrayList<>(); try { Message[] messages = emailServiceManager.receiveMail(); log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), ArrayUtils.isEmpty(messages) ? 0 : messages.length); - if(ArrayUtils.isNotEmpty(messages)){ + if (ArrayUtils.isNotEmpty(messages)) { //检验获取的邮件是否在之前删除失败列表中,若在直接调用邮件API删除,并且此次数组里元素也删除 for(int i=messages.length-1;i>=0;i--){ + String messageId = null; if (null == messages[i].getHeader("Message-ID")) { - messages = ArrayUtils.remove(messages, i); - continue; + // 有些邮箱拿不到 message-ID,换成主题+接收时间 + String subject = messages[i].getSubject().replace(" ", StringConstant.UNDER_LINE); + Date date = messages[i].getReceivedDate() == null ? messages[i].getSentDate() : messages[i].getReceivedDate(); + String receivedStr = DateUtil.format(date, DatePattern.NORM_DATETIME_MINUTE_PATTERN); + messageId = subject + StringConstant.UNDER_LINE + receivedStr; + // messages = ArrayUtils.remove(messages, i); + // continue; + }else { + messageId = ((MimeMessage) messages[i]).getMessageID(); } if (!messages[i].isExpunged()){ - String messageId = ((MimeMessage) messages[i]).getMessageID(); final boolean exist = emailServiceManager.check(messages[i],messageId); messageIds.add(messageId); - if(exist){ - messages = ArrayUtils.remove(messages,i); + if (exist) { + messages = ArrayUtils.remove(messages, i); } } } - log.info("EmailParsingActuator本次真实执行邮件数量为:{}",messages.length); - if(messages.length > 0){ + log.info("EmailParsingActuator本次真实执行邮件数量为:{}", messages.length); + if (messages.length > 0) { //本批次邮件号 final Integer batchesCounter = spectrumServiceQuotes.getBatchesCounter().getCurrValue(); CountDownLatch taskLatch = new CountDownLatch(messages.length); - for(Message message : messages){ + for (Message message : messages) { SpectrumParsingActuator spectrumParsingActuator = new SpectrumParsingActuator(); - spectrumParsingActuator.init(message,emailProperties,emailServiceManager, - taskLatch,spectrumServiceQuotes,emailCounter,batchesCounter); + spectrumParsingActuator.init(message, emailProperties, emailServiceManager, + taskLatch, spectrumServiceQuotes, emailCounter, batchesCounter); poolExecutor.execute(spectrumParsingActuator); } taskLatch.await(); @@ -134,9 +148,9 @@ public class EmailParsingActuator extends Thread{ emailServiceManager.close(messageIds); } long end = System.currentTimeMillis(); - long sleepTime = taskProperties.getMailThreadExecCycle() - (end-start); + long sleepTime = taskProperties.getMailThreadExecCycle() - (end - start); //如果sleepTime > 0 需要睡眠到指定时间,否则继续下次获取邮件 - if(sleepTime > 0){ + if (sleepTime > 0) { try { //如果本次 TimeUnit.MILLISECONDS.sleep(sleepTime); @@ -152,8 +166,8 @@ public class EmailParsingActuator extends Thread{ /** * 立即关闭线程池 */ - protected void closeResource(){ - if(Objects.nonNull(poolExecutor)) { + protected void closeResource() { + if (Objects.nonNull(poolExecutor)) { poolExecutor.shutdownNow(); } }