fix:解决自动处理网络不好下载会中断问题

This commit is contained in:
xiaoguangbin 2024-11-25 11:18:45 +08:00
parent e5bcf3bae0
commit a762d4abab
4 changed files with 92 additions and 72 deletions

View File

@ -111,7 +111,7 @@ public class EmailServiceManager {
log.info("{}邮件服务连接测试成功",email.getName()); log.info("{}邮件服务连接测试成功",email.getName());
flag = true; flag = true;
} catch (IOException e) { } catch (IOException e) {
log.error("{}邮件服务连接测试失败,请检查邮件服务属性配置是否正确或邮件服务未开启,原因{}",email.getName(),e.getMessage()); log.error("{}邮件服务连接测试失败,请检查邮件服务属性配置是否正确或邮件服务未开启,原因: ",email.getName(),e);
}finally { }finally {
try { try {
if(null != socket){ if(null != socket){
@ -127,7 +127,7 @@ public class EmailServiceManager {
/** /**
* 接收邮件 * 接收邮件
*/ */
public Message[] receiveMail() throws MessagingException { public Message[] receiveMail() throws Exception {
String status = EmailLogManager.STATUS_SUCCESS; String status = EmailLogManager.STATUS_SUCCESS;
try{ try{
//配置邮件服务属性 //配置邮件服务属性
@ -135,6 +135,8 @@ public class EmailServiceManager {
properties.put("mail.store.protocol", "imap"); properties.put("mail.store.protocol", "imap");
properties.put("mail.imap.host", email.getEmailServerAddress()); properties.put("mail.imap.host", email.getEmailServerAddress());
properties.put("mail.imap.port",email.getPort()); properties.put("mail.imap.port",email.getPort());
properties.put("mail.imap.connectiontimeout", "3000");
properties.put("mail.imap.timeout", "3000");
if (email.getIsQiye() == 1) { if (email.getIsQiye() == 1) {
properties.put("mail.imap.ssl.enable", "true"); properties.put("mail.imap.ssl.enable", "true");
} else { } else {
@ -177,6 +179,7 @@ public class EmailServiceManager {
return o1.getReceivedDate().compareTo(o2.getReceivedDate()); return o1.getReceivedDate().compareTo(o2.getReceivedDate());
} catch (MessagingException e) { } catch (MessagingException e) {
e.printStackTrace(); e.printStackTrace();
log.error(e.getMessage(), e);
} }
return 0; return 0;
}); });
@ -186,7 +189,7 @@ public class EmailServiceManager {
return Arrays.copyOfRange(messages,0,this.receiveNum-1); return Arrays.copyOfRange(messages,0,this.receiveNum-1);
} }
} }
} catch (MessagingException e){ } catch (Exception e){
status = EmailLogManager.STATUS_ERROR; status = EmailLogManager.STATUS_ERROR;
log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage()); log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage());
throw e; throw e;
@ -590,6 +593,7 @@ public class EmailServiceManager {
throw new DownloadEmailException(errorMsg); throw new DownloadEmailException(errorMsg);
}catch (Exception e) { }catch (Exception e) {
log.error("",e); log.error("",e);
throw new RuntimeException(e);
}finally { }finally {
EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"), EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"),
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath())); (Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
@ -725,9 +729,11 @@ public class EmailServiceManager {
// log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss")); // log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
} catch (MessagingException | UnsupportedEncodingException e) { } catch (MessagingException | UnsupportedEncodingException e) {
status = EmailLogManager.STATUS_ERROR; status = EmailLogManager.STATUS_ERROR;
log.error("Email deletion failed, the subject of the email is :{}, the reason is :{}.",subject,e.getMessage()); log.error("Email deletion failed, the subject of the email is :{}, the reason is :",subject, e);
e.printStackTrace(); e.printStackTrace();
}finally { } catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
EmailLogEvent removeEvent = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.DELETEID,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS")); EmailLogEvent removeEvent = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.DELETEID,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"));
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),removeEvent); EmailLogManager.getInstance().offer(Thread.currentThread().getId(),removeEvent);
//这里删除和彻底删除一起写入日志java和C++处理有差异java是在连接关闭时彻底删除的 //这里删除和彻底删除一起写入日志java和C++处理有差异java是在连接关闭时彻底删除的
@ -748,13 +754,13 @@ public class EmailServiceManager {
if(null != store){ if(null != store){
store.close(); store.close();
} }
log.info("EmailServiceManage资源关闭完成."); log.info("{}: EmailServiceManage资源关闭完成.", Thread.currentThread().getName());
// for(String messageId : messageIds){ // for(String messageId : messageIds){
// String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; // String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
// redisUtil.del(key); // redisUtil.del(key);
// } // }
} catch (MessagingException e) { } catch (MessagingException e) {
log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage()); log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e);
e.printStackTrace(); e.printStackTrace();
} }
} }

View File

@ -17,6 +17,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils; import org.springframework.util.CollectionUtils;
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
@ -40,12 +41,12 @@ public class AutoProcessManager{
/** /**
* 以邮件Id为key邮件信息为value * 以邮件Id为key邮件信息为value
*/ */
private Map<String,EmailProperties> emailMap = new HashMap<>(); private Map<String,EmailProperties> emailMap = new ConcurrentHashMap<>();
/** /**
* 以邮件id为key以邮件执行线程为value * 以邮件id为key以邮件执行线程为value
*/ */
private Map<String,EmailParsingActuator> emailExecThreadMap = new HashMap<>(); private Map<String,EmailParsingActuator> emailExecThreadMap = new ConcurrentHashMap<>();
/** /**
* 启动自动处理 * 启动自动处理
@ -82,34 +83,39 @@ public class AutoProcessManager{
public void run() { public void run() {
for(;;){ for(;;){
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
if(!CollectionUtils.isEmpty(emailMap)){ try {
synchronized (lock){ if(!CollectionUtils.isEmpty(emailMap)){
Iterator<EmailProperties> iterator = emailMap.values().iterator(); synchronized (lock){
while(iterator.hasNext()){ Iterator<EmailProperties> iterator = emailMap.values().iterator();
EmailProperties next = iterator.next(); while(iterator.hasNext()){
if (next.isResetFlag()) { EmailProperties next = iterator.next();
EmailParsingActuator actuator = emailExecThreadMap.get(next.getId()); if (next.isResetFlag()) {
actuator.updateEmail(next); EmailParsingActuator actuator = emailExecThreadMap.get(next.getId());
next.setResetFlag(false); actuator.updateEmail(next);
} next.setResetFlag(false);
if(next.isNewEmailFlag()){ }
// 网络正常之后才允许创建新的实例 if(next.isNewEmailFlag()){
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); // 网络正常之后才允许创建新的实例
emailServiceManager.init(next); final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
boolean testFlag = emailServiceManager.testConnectEmailServer(); emailServiceManager.init(next);
if(testFlag){ boolean testFlag = emailServiceManager.testConnectEmailServer();
EmailParsingActuator emailParsingActuator = new EmailParsingActuator(); if(testFlag){
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime); EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
emailParsingActuator.setName(next.getUsername()+"-email-monitor"); emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
emailParsingActuator.start(); emailParsingActuator.setName(next.getUsername()+"-email-monitor");
//把邮件监测执行线程加入管理队列 emailParsingActuator.start();
emailExecThreadMap.put(next.getId(),emailParsingActuator); //把邮件监测执行线程加入管理队列
//新邮件监测监测线程已启动则修改新邮件标记为false emailExecThreadMap.put(next.getId(),emailParsingActuator);
next.setNewEmailFlag(false); //新邮件监测监测线程已启动则修改新邮件标记为false
next.setNewEmailFlag(false);
}
} }
} }
} }
} }
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage(), e);
} }
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
long sleepTime = taskProperties.getMonitoringMailDataCycle() - (end-start); long sleepTime = taskProperties.getMonitoringMailDataCycle() - (end-start);
@ -282,46 +288,52 @@ public class AutoProcessManager{
public void run() { public void run() {
for(;;){ for(;;){
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
if(!CollectionUtils.isEmpty(emailExecThreadMap)){ try {
//遍历邮箱执行线程如果状态为已停止则删除 if(!CollectionUtils.isEmpty(emailExecThreadMap)){
final Iterator<Map.Entry<String, EmailParsingActuator>> checkStopThreads = emailExecThreadMap.entrySet().iterator(); //遍历邮箱执行线程如果状态为已停止则删除
while (checkStopThreads.hasNext()){ final Iterator<Map.Entry<String, EmailParsingActuator>> checkStopThreads = emailExecThreadMap.entrySet().iterator();
final Map.Entry<String, EmailParsingActuator> next = checkStopThreads.next(); while (checkStopThreads.hasNext()){
if(next.getValue().getState() == State.TERMINATED){ final Map.Entry<String, EmailParsingActuator> next = checkStopThreads.next();
log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername()); if(next.getValue().getState() == State.TERMINATED){
checkStopThreads.remove(); log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
emailMap.remove(next.getKey()); checkStopThreads.remove();
emailMap.remove(next.getKey());
}
} }
}
//遍历邮箱执行线程如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程 //遍历邮箱执行线程如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程
final Iterator<Map.Entry<String, EmailParsingActuator>> iterator = emailExecThreadMap.entrySet().iterator(); final Iterator<Map.Entry<String, EmailParsingActuator>> iterator = emailExecThreadMap.entrySet().iterator();
emailExecThreadMap.forEach((emailId,emailExecThread)->{ emailExecThreadMap.forEach((emailId,emailExecThread)->{
try{ try{
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){ log.info("当前线程状态:{}", emailExecThread.getState());
final long nowTime = System.currentTimeMillis(); if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long setStoptime = emailExecThread.getStopTime().getTime(); final long nowTime = System.currentTimeMillis();
final long val = nowTime - setStoptime; final long setStoptime = emailExecThread.getStopTime().getTime();
if(val >= taskProperties.getForceDeletedTime()){ final long val = nowTime - setStoptime;
log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername()); if(val >= taskProperties.getForceDeletedTime()){
emailExecThread.closeResource(); log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername());
emailExecThread.closeResource();
}
}
}catch (Exception e){
e.printStackTrace();
log.error(e.getMessage(), e);
}finally {
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername());
emailExecThread.stop();
}
} }
} }
}catch (Exception e){ });
e.printStackTrace(); }
log.error(e.getMessage(), e); } catch (Exception e) {
}finally { e.printStackTrace();
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){ log.error(e.getMessage(), e);
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername());
emailExecThread.stop();
}
}
}
});
} }
long end = System.currentTimeMillis(); long end = System.currentTimeMillis();
long sleepTime = taskProperties.getDeletedMailThreadExecCycle() - (end-start); long sleepTime = taskProperties.getDeletedMailThreadExecCycle() - (end-start);

View File

@ -119,7 +119,7 @@ public class EmailParsingActuator extends Thread{
} }
} }
} catch (Exception e) { } catch (Exception e) {
log.error("EmailParsingActuator has exception: {}", e.getMessage()); log.error("EmailParsingActuator has exception: ", e);
log.info("Mail-Parsing线程池资源关闭..."); log.info("Mail-Parsing线程池资源关闭...");
closeResource(); closeResource();
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -152,13 +152,15 @@ public class SpectrumParsingActuator implements Runnable{
} }
} }
} catch (Exception e) { } catch (Exception e) {
//输出异常信息
log.error("邮件处理异常{},邮件主题:{}", e, subject);
// todo 需要解决其他异常会进入if 删除邮件
// 如果不是下载导致的失败 并且 下载成功则删除下载的邮件对象 // 如果不是下载导致的失败 并且 下载成功则删除下载的邮件对象
if(!(e instanceof DownloadEmailException) && downloadFlag){ if(!(e instanceof DownloadEmailException) && downloadFlag){
log.error("Catch Remove Email"+ subject + StringPool.UNDERSCORE + receiveDate + StringPool.UNDERSCORE); log.error("Catch Remove Email"+ subject + StringPool.UNDERSCORE + receiveDate + StringPool.UNDERSCORE);
emailServiceManager.removeMail(message,batchesCounter); emailServiceManager.removeMail(message,batchesCounter);
} }
//输出异常信息
log.error("邮件处理异常{},邮件主题:{}", e, subject);
}finally { }finally {
try { try {
EmailLogEvent expungeEvent = new EmailLogEvent(this.batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,EmailLogManager.DONE); EmailLogEvent expungeEvent = new EmailLogEvent(this.batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,EmailLogManager.DONE);