onlyDownload修改邮箱处理程序频繁增加线程问题

onlyDownload修改文件下载部分长期堵塞问题
This commit is contained in:
qiaoqinzheng 2024-04-26 10:31:59 +08:00
parent 7da8eeb921
commit f62158a721
4 changed files with 148 additions and 152 deletions

View File

@ -9,6 +9,8 @@ import com.sun.mail.imap.IMAPStore;
import com.sun.mail.smtp.SMTPAddressFailedException; import com.sun.mail.smtp.SMTPAddressFailedException;
import io.swagger.models.auth.In; import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.api.dto.message.MessageDTO; import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.constant.RedisConstant; import org.jeecg.common.constant.RedisConstant;
@ -66,8 +68,6 @@ public class EmailServiceManager {
private RedisUtil redisUtil; private RedisUtil redisUtil;
private Object receive;
private Object downloadEmlLocal = new Object(); private Object downloadEmlLocal = new Object();
@NotNull @NotNull
@ -89,7 +89,7 @@ public class EmailServiceManager {
*/ */
public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath, public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath,
Date systemStartupTime, SpectrumPathProperties pathProperties,TaskProperties taskProperties, Date systemStartupTime, SpectrumPathProperties pathProperties,TaskProperties taskProperties,
RedisUtil redisUtil, Object receive){ RedisUtil redisUtil){
this.email = email; this.email = email;
this.receiveNum = receiveNum; this.receiveNum = receiveNum;
this.temporaryStoragePath = temporaryStoragePath; this.temporaryStoragePath = temporaryStoragePath;
@ -97,7 +97,6 @@ public class EmailServiceManager {
this.spectrumPathProperties = pathProperties; this.spectrumPathProperties = pathProperties;
this.taskProperties = taskProperties; this.taskProperties = taskProperties;
this.redisUtil = redisUtil; this.redisUtil = redisUtil;
this.receive = receive;
} }
/** /**
@ -128,7 +127,6 @@ public class EmailServiceManager {
* 接收邮件 * 接收邮件
*/ */
public Message[] receiveMail() { public Message[] receiveMail() {
synchronized (receive) {
String status = EmailLogManager.STATUS_SUCCESS; String status = EmailLogManager.STATUS_SUCCESS;
try{ try{
//配置邮件服务属性 //配置邮件服务属性
@ -201,7 +199,6 @@ public class EmailServiceManager {
} }
return null; return null;
} }
}
/* /*
* 测试收件邮箱账号是否可以正常使用 * 测试收件邮箱账号是否可以正常使用
@ -537,6 +534,7 @@ public class EmailServiceManager {
File emlFile = null; File emlFile = null;
String status = EmailLogManager.STATUS_SUCCESS; String status = EmailLogManager.STATUS_SUCCESS;
Date receivedDate = null; Date receivedDate = null;
InputStream inputStream = null;
BufferedOutputStream outputStream = null; BufferedOutputStream outputStream = null;
try { try {
//获取发件人 //获取发件人
@ -571,26 +569,19 @@ public class EmailServiceManager {
final String rootPath = spectrumPathProperties.getRootPath(); final String rootPath = spectrumPathProperties.getRootPath();
final String emlPath = spectrumPathProperties.getEmlPath(); final String emlPath = spectrumPathProperties.getEmlPath();
emlFile = new File(rootPath+emlPath+File.separator+fileName); emlFile = new File(rootPath+emlPath+File.separator+fileName);
outputStream = new BufferedOutputStream(new FileOutputStream(emlFile)); // outputStream = new FileOutputStream(emlFile);
message.writeTo(outputStream); // message.writeTo(outputStream);
// 刷新缓冲区内容存入内存
outputStream.flush(); int bufferSize = 1024 * 1024; // 1M
// int bufferSize = 1024 * 1024; // 1M inputStream = message.getInputStream();
// InputStream inputStream = message.getInputStream(); outputStream = new BufferedOutputStream(new FileOutputStream(emlFile), bufferSize);
// BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize); // 从邮件的输入流读取内容并写入到本地文件
// // 或者使用 BufferedOutputStream byte[] buffer = new byte[bufferSize];
// OutputStream outputStream = new FileOutputStream(emlFile); int bytesRead;
// BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize); while ((bytesRead = inputStream.read(buffer)) != -1) {
// // 从邮件的输入流读取内容并写入到本地文件 outputStream.write(buffer, 0, bytesRead);
// byte[] buffer = new byte[bufferSize]; }
// int bytesRead;
// while ((bytesRead = bufferedInputStream.read(buffer)) != -1) {
// bufferedOutputStream.write(buffer, 0, bytesRead);
// }
//
// // 关闭流
// bufferedInputStream.close();
// bufferedOutputStream.close();
} catch (MessagingException | IOException e) { } catch (MessagingException | IOException e) {
// 下载邮件失败 抛出自定义邮件下载异常 // 下载邮件失败 抛出自定义邮件下载异常
status = EmailLogManager.STATUS_ERROR; status = EmailLogManager.STATUS_ERROR;
@ -604,7 +595,11 @@ public class EmailServiceManager {
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath())); (Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event); EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
try { try {
if (Objects.nonNull(inputStream)) {
inputStream.close();
}
if (Objects.nonNull(outputStream)) { if (Objects.nonNull(outputStream)) {
outputStream.flush();
outputStream.close(); outputStream.close();
} }
} catch (IOException e) { } catch (IOException e) {

View File

@ -37,10 +37,6 @@ public class AutoProcessManager{
* 邮件Map数据锁 * 邮件Map数据锁
*/ */
private final Object lock = new Object(); private final Object lock = new Object();
/**
* 邮箱接收邮件锁
*/
private final Object receive = new Object();
/** /**
* 以邮件Id为key邮件信息为value * 以邮件Id为key邮件信息为value
*/ */
@ -103,7 +99,7 @@ public class AutoProcessManager{
boolean testFlag = emailServiceManager.testConnectEmailServer(); boolean testFlag = emailServiceManager.testConnectEmailServer();
if(testFlag){ if(testFlag){
EmailParsingActuator emailParsingActuator = new EmailParsingActuator(); EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime, receive); emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
emailParsingActuator.setName(next.getUsername()+"-email-monitor"); emailParsingActuator.setName(next.getUsername()+"-email-monitor");
emailParsingActuator.start(); emailParsingActuator.start();
//把邮件监测执行线程加入管理队列 //把邮件监测执行线程加入管理队列
@ -237,10 +233,16 @@ public class AutoProcessManager{
if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){ if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){
final boolean testFlag = testConnectEmailServer(databaseEmail); final boolean testFlag = testConnectEmailServer(databaseEmail);
if(testFlag){ if(testFlag){
if (emailExecThreadMap.containsKey(databaseEmail.getId())) {
EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId());
actuator.setStop(false);
log.info("{}邮箱重新加入监测队列",databaseEmail.getUsername());
} else {
databaseEmail.setNewEmailFlag(true); databaseEmail.setNewEmailFlag(true);
emailMap.put(databaseEmail.getId(),databaseEmail);
log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername()); log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername());
} }
emailMap.put(databaseEmail.getId(),databaseEmail);
}
} }
} }
} }

View File

@ -36,20 +36,18 @@ public class EmailParsingActuator extends Thread{
private SpectrumServiceQuotes spectrumServiceQuotes; private SpectrumServiceQuotes spectrumServiceQuotes;
private EmailCounter emailCounter; private EmailCounter emailCounter;
private Date systemStartupTime; private Date systemStartupTime;
private Object receive;
@Setter @Getter @Setter @Getter
private boolean isStop; private boolean isStop;
@Setter @Getter @Setter @Getter
private Date stopTime; private Date stopTime;
public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes, public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes,
EmailCounter emailCounter,Date systemStartupTime, Object receive){ EmailCounter emailCounter,Date systemStartupTime){
this.emailProperties = emailProperties; this.emailProperties = emailProperties;
this.spectrumServiceQuotes = spectrumServiceQuotes; this.spectrumServiceQuotes = spectrumServiceQuotes;
this.taskProperties = spectrumServiceQuotes.getTaskProperties(); this.taskProperties = spectrumServiceQuotes.getTaskProperties();
this.emailCounter = emailCounter; this.emailCounter = emailCounter;
this.systemStartupTime = systemStartupTime; this.systemStartupTime = systemStartupTime;
this.receive = receive;
//获取机器可用核心数 //获取机器可用核心数
int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto(); int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto();
@ -76,21 +74,21 @@ public class EmailParsingActuator extends Thread{
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(), emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil(), receive); this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil());
List<String> messageIds = new ArrayList<>(); List<String> messageIds = new ArrayList<>();
try { try {
Message[] messages = emailServiceManager.receiveMail(); Message[] messages = emailServiceManager.receiveMail();
if(ArrayUtils.isNotEmpty(messages)){ if(ArrayUtils.isNotEmpty(messages)){
log.info("EmailParsingActuator本次获取邮件数量为:{}",messages.length); log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), messages.length);
//检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除 //检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除
for(int i=messages.length-1;i>=0;i--){ for(int i=messages.length-1;i>=0;i--){
if (!messages[i].isExpunged()){ if (!messages[i].isExpunged()){
String messageId = ((MimeMessage) messages[i]).getMessageID(); String messageId = ((MimeMessage) messages[i]).getMessageID();
final boolean exist = emailServiceManager.check(messages[i],messageId); // final boolean exist = emailServiceManager.check(messages[i],messageId);
messageIds.add(messageId); messageIds.add(messageId);
if(exist){ // if(exist){
messages = ArrayUtils.remove(messages,i); // messages = ArrayUtils.remove(messages,i);
} // }
} }
} }
log.info("EmailParsingActuator本次真实执行邮件数量为{}",messages.length); log.info("EmailParsingActuator本次真实执行邮件数量为{}",messages.length);
@ -107,11 +105,12 @@ public class EmailParsingActuator extends Thread{
taskLatch.await(); taskLatch.await();
} }
} }
}catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} catch (MessagingException e) { } catch (MessagingException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} finally { } finally {
log.info(Thread.currentThread().getName() +"执行完毕!!!!");
//清除本批次邮件日志缓存 //清除本批次邮件日志缓存
EmailLogManager.getInstance().clear(); EmailLogManager.getInstance().clear();
//保存本批次所有能谱日志 //保存本批次所有能谱日志

View File

@ -110,47 +110,47 @@ public class SpectrumParsingActuator implements Runnable{
//保存邮件日志到PG数据库 //保存邮件日志到PG数据库
this.spectrumServiceQuotes.getMailLogService().create(message,emailProperties); this.spectrumServiceQuotes.getMailLogService().create(message,emailProperties);
//获取邮件内容 // //获取邮件内容
StringBuilder mailContent = new StringBuilder(); // StringBuilder mailContent = new StringBuilder();
if(Objects.nonNull(emlFile) && emlFile.length() > 0){ // if(Objects.nonNull(emlFile) && emlFile.length() > 0){
mailContent.append(FileUtil.readUtf8String(emlFile)); // mailContent.append(FileUtil.readUtf8String(emlFile));
} // }
//
//判断是否是IMS2.0协议文件 // //判断是否是IMS2.0协议文件
// 如果邮件内容校验成功 将文件保存到eml目录 并删除邮件对象 // // 如果邮件内容校验成功 将文件保存到eml目录 并删除邮件对象
if(checkMailContent(mailContent,subject)){ // if(checkMailContent(mailContent,subject)){
AbstractSpectrumHandler spectrumHandler = new SamplephdSpectrum(); // AbstractSpectrumHandler spectrumHandler = new SamplephdSpectrum();
spectrumHandler.init(mailContent.toString(),emlFile.getName(),spectrumServiceQuotes,new StringBuilder(),SpectrumSource.FORM_EMAIL_SERVICE.getSourceType(),batchesCounter); // spectrumHandler.init(mailContent.toString(),emlFile.getName(),spectrumServiceQuotes,new StringBuilder(),SpectrumSource.FORM_EMAIL_SERVICE.getSourceType(),batchesCounter);
final boolean matchResult = spectrumHandler.saveEmailToLocal(); // final boolean matchResult = spectrumHandler.saveEmailToLocal();
if(matchResult){ // if(matchResult){
try { // try {
//开始解析 // //开始解析
spectrumHandler.handler(); // spectrumHandler.handler();
spectrumServiceQuotes.getRedisUtil().del(key); // spectrumServiceQuotes.getRedisUtil().del(key);
} catch (Exception e) { // } catch (Exception e) {
//如果是gamma谱的分析异常 // //如果是gamma谱的分析异常
if (e instanceof AnalySpectrumException) { // if (e instanceof AnalySpectrumException) {
// 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError // // 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError
if (Objects.nonNull(emlFile) && emlFile.exists()){ // if (Objects.nonNull(emlFile) && emlFile.exists()){
moveEmail(emlFile, key); // moveEmail(emlFile, key);
} // }
//删除邮件 // //删除邮件
emailServiceManager.removeMail(message,batchesCounter); // emailServiceManager.removeMail(message,batchesCounter);
} else { // } else {
throw e; // throw e;
} // }
} // }
}else{ // }else{
log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject); // log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject);
} // }
emailServiceManager.removeMail(message,batchesCounter); // emailServiceManager.removeMail(message,batchesCounter);
}else { // }else {
// 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError // // 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError
if (Objects.nonNull(emlFile) && emlFile.exists()){ // if (Objects.nonNull(emlFile) && emlFile.exists()){
moveEmail(emlFile, key); // moveEmail(emlFile, key);
throw new DownloadEmailException("邮件移走后手动抛出DownloadEmailException"); // throw new DownloadEmailException("邮件移走后手动抛出DownloadEmailException");
} // }
} // }
} catch (Exception e) { } catch (Exception e) {
// 如果不是下载导致的失败 并且 下载成功则删除下载的邮件对象 // 如果不是下载导致的失败 并且 下载成功则删除下载的邮件对象
if(!(e instanceof DownloadEmailException) && downloadFlag){ if(!(e instanceof DownloadEmailException) && downloadFlag){