From 97f4aae3ab84b584d83314b8cb7422276899111b Mon Sep 17 00:00:00 2001 From: nieziyan Date: Tue, 4 Jun 2024 09:23:21 +0800 Subject: [PATCH] =?UTF-8?q?fix=EF=BC=9A=E8=87=AA=E5=8A=A8=E5=A4=84?= =?UTF-8?q?=E7=90=86debug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/email/EmailServiceManager.java | 116 ++++++++++++++++-- .../org/jeecg/modules/AutoProcessManager.java | 23 ++-- .../jeecg/modules/EmailParsingActuator.java | 16 ++- 3 files changed, 135 insertions(+), 20 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java index 257b0b26..49f02b82 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java @@ -1,16 +1,13 @@ package org.jeecg.common.email; -import cn.hutool.core.io.FileUtil; import cn.hutool.core.util.ArrayUtil; import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.RandomUtil; import cn.hutool.core.util.StrUtil; import com.google.common.collect.Lists; import com.sun.mail.imap.IMAPStore; import com.sun.mail.smtp.SMTPAddressFailedException; -import io.swagger.models.auth.In; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.io.Charsets; -import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.jeecg.common.api.dto.message.MessageDTO; import org.jeecg.common.constant.RedisConstant; @@ -21,7 +18,6 @@ import org.jeecg.common.exception.DownloadEmailException; import org.jeecg.common.properties.SpectrumPathProperties; import org.jeecg.common.properties.TaskProperties; import org.jeecg.common.util.DateUtils; -import org.jeecg.common.util.Md5Util; import org.jeecg.common.util.RedisUtil; import org.jeecg.modules.base.entity.postgre.SysEmail; import org.jetbrains.annotations.NotNull; @@ -36,6 +32,9 @@ import java.io.*; import java.net.InetSocketAddress; import java.net.Socket; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; /** @@ -70,6 +69,8 @@ public class EmailServiceManager { private Object downloadEmlLocal = new Object(); + private final ReentrantLock lock = new ReentrantLock(); + @NotNull public static EmailServiceManager getInstance(){ return new EmailServiceManager(); @@ -155,6 +156,8 @@ public class EmailServiceManager { store = (IMAPStore) session.getStore(); //连接 store.connect(email.getUsername(),email.getPassword()); + if (RandomUtil.randomInt(1, 5) == 3) + throw new MessagingException(); // 解决163普通邮箱无法建立连接问题 store.id(IAM); //获取收件箱 @@ -609,6 +612,105 @@ public class EmailServiceManager { } } + /*public File downloadEmailToEmlDir(@NotNull Message message, Integer emailCounter, Integer batchesCounter) throws MessagingException, IOException { + AtomicReference outputStream = new AtomicReference<>(); + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + lock.lock(); // 获取锁 + // 执行需要获取锁才能进行的操作 + // return executeWithLock(message, emailCounter, batchesCounter); + File file = executeWithLock(message, emailCounter, batchesCounter); + outputStream.set(new FileOutputStream(file)); + a(outputStream,message); + return null; + } catch (MessagingException e) { + throw new RuntimeException(e); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + // 如果成功拿到锁 释放锁 + lock.unlock(); + } + }); + + try { + return future.get(5, TimeUnit.SECONDS); + // return future.get(5, TimeUnit.SECONDS);// 等待任务执行,超过5秒则抛出TimeoutException + } catch (InterruptedException | ExecutionException | TimeoutException e) { + future.cancel(true); // 取消任务执行 + if (ObjectUtil.isNotNull(outputStream) && ObjectUtil.isNotNull(outputStream.get())) + outputStream.get().close(); + log.error("下载 eml 执行超时", e); + throw new RuntimeException("下载 eml 执行超时"); + } + }*/ + public File executeWithLock(Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException { + + String subject = ""; + File emlFile = null; + String status = EmailLogManager.STATUS_SUCCESS; + Date receivedDate = null; + try { + // 获取锁 设置超时 + //获取发件人 + final String address = ((InternetAddress) message.getFrom()[0]).getAddress(); + final String from = address.substring(0,address.indexOf(StringConstant.AT)); + //获取主题 + subject = MimeUtility.decodeText(message.getSubject()); + if(subject.contains(StringConstant.SLASH)){ + subject = StringUtils.replace(subject,StringConstant.SLASH,""); + } + if(subject.contains(StringConstant.COLON)){ + subject = StringUtils.replace(subject,StringConstant.COLON,""); + } + receivedDate = message.getReceivedDate(); + StringBuilder fileName = new StringBuilder(); + fileName.append(from); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(subject); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(new Date(),"YYMMdd")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append("receive"); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS")); + fileName.append(StringConstant.UNDER_LINE); + fileName.append(emailCounter); + fileName.append(SAVE_EML_SUFFIX); + final String rootPath = spectrumPathProperties.getRootPath(); + final String emlPath = spectrumPathProperties.getEmlPath(); + emlFile = new File(rootPath+emlPath+File.separator+fileName); + // Thread.sleep(6000l); + // try(FileOutputStream outputStream = new FileOutputStream(emlFile)) { + // message.writeTo(outputStream); + // } catch (IOException e) { + // throw new RuntimeException(e); + // } + } catch (MessagingException | IOException e) { + // 下载邮件失败 抛出自定义邮件下载异常 + status = EmailLogManager.STATUS_ERROR; + String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage()); + log.error(errorMsg); + throw new DownloadEmailException(errorMsg); + } catch (Exception e) { + log.error("",e); + } finally { + EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"), + (Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath())); + EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event); + } + return emlFile; + } + + public void a(AtomicReference outputStream, Message message) throws MessagingException, IOException { + message.writeTo(outputStream.get()); + } /** * 删除邮件 * @param message @@ -621,7 +723,7 @@ public class EmailServiceManager { try { subject = MimeUtility.decodeText(message.getSubject()); receivedDate = message.getReceivedDate(); -// message.setFlag(Flags.Flag.DELETED,true); + // message.setFlag(Flags.Flag.DELETED,true); // log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss")); } catch (MessagingException | UnsupportedEncodingException e) { status = EmailLogManager.STATUS_ERROR; @@ -672,7 +774,7 @@ public class EmailServiceManager { if(numberKey >= taskProperties.getForceDeletedNumber()){ exist = true; log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss")); -// message.setFlag(Flags.Flag.DELETED,true); + // message.setFlag(Flags.Flag.DELETED,true); redisUtil.del(key); } return exist; diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java index d62ad50e..19bf67d0 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java @@ -1,5 +1,6 @@ package org.jeecg.modules; +import cn.hutool.core.util.RandomUtil; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.constant.RedisConstant; @@ -15,7 +16,6 @@ import org.jeecg.modules.spectrum.SpectrumServiceQuotes; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; -import java.io.*; import java.util.*; import java.util.concurrent.TimeUnit; @@ -47,6 +47,8 @@ public class AutoProcessManager{ */ private Map emailExecThreadMap = new HashMap<>(); + private boolean flag = true; + /** * 启动自动处理 */ @@ -145,9 +147,14 @@ public class AutoProcessManager{ if(!email.isDelFlag()){ final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); emailServiceManager.init(email); - boolean testFlag = emailServiceManager.testConnectEmailServer(); + /*boolean testFlag = emailServiceManager.testConnectEmailServer(); if(!testFlag){ emails.add(email); + }*/ + int i = RandomUtil.randomInt(1, 5); + flag = i == 3; + if(flag){ + emails.add(email); } } }); @@ -158,7 +165,7 @@ public class AutoProcessManager{ //如果这时邮箱线程里已有执行的线程则设置停止标记 if(emailExecThreadMap.containsKey(email.getId())){ EmailParsingActuator actuator = emailExecThreadMap.get(email.getId()); - actuator.setStop(true); + actuator.setThreadSleep(true); actuator.setStopTime(new Date()); } log.info("{}邮箱测试连接失败,emailMap删除此邮箱数据,emailExecThreadMap设置线程停止标记",email.getUsername()); @@ -231,11 +238,11 @@ public class AutoProcessManager{ }else{ //如果不包含邮箱id 并且 邮箱处于启用状态 将邮箱对象存入到map中 并将新邮箱标识设置为true if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){ - final boolean testFlag = testConnectEmailServer(databaseEmail); - if(testFlag){ + // final boolean testFlag = testConnectEmailServer(databaseEmail); + if(flag){ if (emailExecThreadMap.containsKey(databaseEmail.getId())) { EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId()); - actuator.setStop(false); + actuator.setThreadSleep(false); log.info("{}邮箱重新加入监测队列",databaseEmail.getUsername()); } else { databaseEmail.setNewEmailFlag(true); @@ -290,7 +297,7 @@ public class AutoProcessManager{ } //遍历邮箱执行线程,如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程 - /*final Iterator> iterator = emailExecThreadMap.entrySet().iterator(); + final Iterator> iterator = emailExecThreadMap.entrySet().iterator(); emailExecThreadMap.forEach((emailId,emailExecThread)->{ try{ if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){ @@ -315,7 +322,7 @@ public class AutoProcessManager{ } } } - });*/ + }); } long end = System.currentTimeMillis(); long sleepTime = taskProperties.getDeletedMailThreadExecCycle() - (end-start); diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java index 975626fa..c53a70d8 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java @@ -36,6 +36,8 @@ public class EmailParsingActuator extends Thread{ @Setter @Getter private boolean isStop; @Setter @Getter + private boolean threadSleep; + @Setter @Getter private Date stopTime; public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes, @@ -62,17 +64,21 @@ public class EmailParsingActuator extends Thread{ @Override public void run() { for(;;){ - if (isStop) { - String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"); - log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!"); + String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"); + if (threadSleep) { + log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is sleep!"); try { Thread.sleep(1000L); } catch (InterruptedException e) { log.error("Thread sleep error"); } -// closeResource(); continue; } + if (isStop) { + log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!"); + closeResource(); + return; + } long start = System.currentTimeMillis(); final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(), @@ -115,7 +121,7 @@ public class EmailParsingActuator extends Thread{ } } catch (MessagingException e) { System.out.println("捕获MessagingException!!!!!!!!"); -// closeResource(); + // closeResource(); throw new RuntimeException(e); } catch (Exception e) { // closeResource();