修改获取邮件Message-ID的方式,获取不到Message-ID时换成主题+接收时间或者时发送时间

This commit is contained in:
duwenyuan 2025-06-30 10:28:54 +08:00
parent 9546f610b9
commit 25bf3542b5

View File

@ -1,9 +1,12 @@
package org.jeecg.modules;
import cn.hutool.core.date.DatePattern;
import cn.hutool.core.date.DateUtil;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.jeecg.common.constant.StringConstant;
import org.jeecg.common.email.EmailLogManager;
import org.jeecg.common.email.EmailServiceManager;
import org.jeecg.common.properties.TaskProperties;
@ -14,6 +17,7 @@ import org.jeecg.modules.spectrum.SpectrumLogManager;
import org.jeecg.modules.spectrum.SpectrumParsingActuator;
import org.jeecg.modules.spectrum.SpectrumServiceQuotes;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.mail.Message;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
@ -24,7 +28,7 @@ import java.util.concurrent.*;
* 邮件解析执行器
*/
@Slf4j
public class EmailParsingActuator extends Thread{
public class EmailParsingActuator extends Thread {
private TaskProperties taskProperties;
@Getter
@ -33,15 +37,18 @@ public class EmailParsingActuator extends Thread{
private SpectrumServiceQuotes spectrumServiceQuotes;
private EmailCounter emailCounter;
private Date systemStartupTime;
@Setter @Getter
@Setter
@Getter
private boolean isStop;
@Setter @Getter
@Setter
@Getter
private boolean threadSleep;
@Setter @Getter
@Setter
@Getter
private Date stopTime;
public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes,
EmailCounter emailCounter,Date systemStartupTime){
public void init(EmailProperties emailProperties, SpectrumServiceQuotes spectrumServiceQuotes,
EmailCounter emailCounter, Date systemStartupTime) {
this.emailProperties = emailProperties;
this.spectrumServiceQuotes = spectrumServiceQuotes;
this.taskProperties = spectrumServiceQuotes.getTaskProperties();
@ -50,11 +57,11 @@ public class EmailParsingActuator extends Thread{
//获取机器可用核心数
int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto();
int maximumPoolSize = taskProperties.getReceiveNum() > systemCores?taskProperties.getReceiveNum():systemCores;
int maximumPoolSize = taskProperties.getReceiveNum() > systemCores ? taskProperties.getReceiveNum() : systemCores;
//初始化线程池
ThreadFactory threadFactory = new CustomizableThreadFactory("mail-parsing-");
poolExecutor = new ThreadPoolExecutor(taskProperties.getReceiveNum(),maximumPoolSize,5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),threadFactory);
poolExecutor = new ThreadPoolExecutor(taskProperties.getReceiveNum(), maximumPoolSize, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), threadFactory);
}
public void updateEmail(EmailProperties emailProperties) {
@ -63,10 +70,10 @@ public class EmailParsingActuator extends Thread{
@Override
public void run() {
for(;;){
for (; ; ) {
String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
if (threadSleep) {
log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is sleep!");
log.info(nowDate + " " + this.emailProperties.getName() + " EmailParsingActuator is sleep!");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
@ -75,43 +82,50 @@ public class EmailParsingActuator extends Thread{
continue;
}
if (isStop) {
log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!");
log.info(nowDate + " " + this.emailProperties.getName() + " EmailParsingActuator is Stop!");
closeResource();
return;
}
long start = System.currentTimeMillis();
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),
emailServiceManager.init(this.emailProperties, this.taskProperties.getReceiveNum(), this.taskProperties.getTemporaryStoragePath(),
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil());
List<String> messageIds = new ArrayList<>();
try {
Message[] messages = emailServiceManager.receiveMail();
log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), ArrayUtils.isEmpty(messages) ? 0 : messages.length);
if(ArrayUtils.isNotEmpty(messages)){
if (ArrayUtils.isNotEmpty(messages)) {
//检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除
for(int i=messages.length-1;i>=0;i--){
String messageId = null;
if (null == messages[i].getHeader("Message-ID")) {
messages = ArrayUtils.remove(messages, i);
continue;
// 有些邮箱拿不到 message-ID换成主题+接收时间
String subject = messages[i].getSubject().replace(" ", StringConstant.UNDER_LINE);
Date date = messages[i].getReceivedDate() == null ? messages[i].getSentDate() : messages[i].getReceivedDate();
String receivedStr = DateUtil.format(date, DatePattern.NORM_DATETIME_MINUTE_PATTERN);
messageId = subject + StringConstant.UNDER_LINE + receivedStr;
// messages = ArrayUtils.remove(messages, i);
// continue;
}else {
messageId = ((MimeMessage) messages[i]).getMessageID();
}
if (!messages[i].isExpunged()){
String messageId = ((MimeMessage) messages[i]).getMessageID();
final boolean exist = emailServiceManager.check(messages[i],messageId);
messageIds.add(messageId);
if(exist){
messages = ArrayUtils.remove(messages,i);
if (exist) {
messages = ArrayUtils.remove(messages, i);
}
}
}
log.info("EmailParsingActuator本次真实执行邮件数量为{}",messages.length);
if(messages.length > 0){
log.info("EmailParsingActuator本次真实执行邮件数量为{}", messages.length);
if (messages.length > 0) {
//本批次邮件号
final Integer batchesCounter = spectrumServiceQuotes.getBatchesCounter().getCurrValue();
CountDownLatch taskLatch = new CountDownLatch(messages.length);
for(Message message : messages){
for (Message message : messages) {
SpectrumParsingActuator spectrumParsingActuator = new SpectrumParsingActuator();
spectrumParsingActuator.init(message,emailProperties,emailServiceManager,
taskLatch,spectrumServiceQuotes,emailCounter,batchesCounter);
spectrumParsingActuator.init(message, emailProperties, emailServiceManager,
taskLatch, spectrumServiceQuotes, emailCounter, batchesCounter);
poolExecutor.execute(spectrumParsingActuator);
}
taskLatch.await();
@ -134,9 +148,9 @@ public class EmailParsingActuator extends Thread{
emailServiceManager.close(messageIds);
}
long end = System.currentTimeMillis();
long sleepTime = taskProperties.getMailThreadExecCycle() - (end-start);
long sleepTime = taskProperties.getMailThreadExecCycle() - (end - start);
//如果sleepTime > 0 需要睡眠到指定时间否则继续下次获取邮件
if(sleepTime > 0){
if (sleepTime > 0) {
try {
//如果本次
TimeUnit.MILLISECONDS.sleep(sleepTime);
@ -152,8 +166,8 @@ public class EmailParsingActuator extends Thread{
/**
* 立即关闭线程池
*/
protected void closeResource(){
if(Objects.nonNull(poolExecutor)) {
protected void closeResource() {
if (Objects.nonNull(poolExecutor)) {
poolExecutor.shutdownNow();
}
}