Compare commits
4 Commits
mdc
...
sleepDownl
Author | SHA1 | Date | |
---|---|---|---|
|
1423b2f09d | ||
|
f6b963145f | ||
|
64a98bd4ea | ||
|
f1410028fa |
|
@ -1,8 +1,6 @@
|
|||
package org.jeecg.common.email;
|
||||
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.sun.mail.imap.IMAPStore;
|
||||
|
@ -32,7 +30,6 @@ import java.io.*;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
@ -127,7 +124,7 @@ public class EmailServiceManager {
|
|||
/**
|
||||
* 接收邮件
|
||||
*/
|
||||
public Message[] receiveMail() throws MessagingException {
|
||||
public Message[] receiveMail() throws Exception {
|
||||
String status = EmailLogManager.STATUS_SUCCESS;
|
||||
try{
|
||||
//配置邮件服务属性
|
||||
|
@ -135,6 +132,8 @@ public class EmailServiceManager {
|
|||
properties.put("mail.store.protocol", "imap");
|
||||
properties.put("mail.imap.host", email.getEmailServerAddress());
|
||||
properties.put("mail.imap.port",email.getPort());
|
||||
properties.put("mail.imap.connectiontimeout", "3000"); // 设置连接超时时间为3秒
|
||||
properties.put("mail.imap.timeout", "3000"); // 设置读取超时时间为3秒
|
||||
if (email.getIsQiye() == 1) {
|
||||
properties.put("mail.imap.ssl.enable", "true");
|
||||
} else {
|
||||
|
@ -177,6 +176,7 @@ public class EmailServiceManager {
|
|||
return o1.getReceivedDate().compareTo(o2.getReceivedDate());
|
||||
} catch (MessagingException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
|
@ -186,7 +186,7 @@ public class EmailServiceManager {
|
|||
return Arrays.copyOfRange(messages,0,this.receiveNum-1);
|
||||
}
|
||||
}
|
||||
} catch (MessagingException e){
|
||||
} catch (Exception e){
|
||||
status = EmailLogManager.STATUS_ERROR;
|
||||
log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage());
|
||||
throw e;
|
||||
|
@ -727,6 +727,7 @@ public class EmailServiceManager {
|
|||
status = EmailLogManager.STATUS_ERROR;
|
||||
log.error("Email deletion failed, the subject of the email is :{}, the reason is :{}.",subject,e.getMessage());
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}finally {
|
||||
EmailLogEvent removeEvent = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.DELETEID,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"));
|
||||
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),removeEvent);
|
||||
|
@ -748,7 +749,7 @@ public class EmailServiceManager {
|
|||
if(null != store){
|
||||
store.close();
|
||||
}
|
||||
log.info("EmailServiceManage资源关闭完成.");
|
||||
log.info(Thread.currentThread().getName() + ",EmailServiceManage资源关闭完成.");
|
||||
// for(String messageId : messageIds){
|
||||
// String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
||||
// redisUtil.del(key);
|
||||
|
@ -756,6 +757,7 @@ public class EmailServiceManager {
|
|||
} catch (MessagingException e) {
|
||||
log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage());
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
package org.jeecg.modules;
|
||||
|
||||
import cn.hutool.core.util.RandomUtil;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jeecg.common.constant.RedisConstant;
|
||||
|
@ -17,6 +16,7 @@ import org.springframework.stereotype.Component;
|
|||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
|
@ -40,12 +40,12 @@ public class AutoProcessManager{
|
|||
/**
|
||||
* 以邮件Id为key,邮件信息为value
|
||||
*/
|
||||
private Map<String,EmailProperties> emailMap = new HashMap<>();
|
||||
private Map<String,EmailProperties> emailMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 以邮件id为key,以邮件执行线程为value
|
||||
*/
|
||||
private Map<String,EmailParsingActuator> emailExecThreadMap = new HashMap<>();
|
||||
private Map<String,EmailParsingActuator> emailExecThreadMap = new ConcurrentHashMap<>();
|
||||
|
||||
/**
|
||||
* 启动自动处理
|
||||
|
@ -82,34 +82,40 @@ public class AutoProcessManager{
|
|||
public void run() {
|
||||
for(;;){
|
||||
long start = System.currentTimeMillis();
|
||||
if(!CollectionUtils.isEmpty(emailMap)){
|
||||
synchronized (lock){
|
||||
Iterator<EmailProperties> iterator = emailMap.values().iterator();
|
||||
while(iterator.hasNext()){
|
||||
EmailProperties next = iterator.next();
|
||||
if (next.isResetFlag()) {
|
||||
EmailParsingActuator actuator = emailExecThreadMap.get(next.getId());
|
||||
actuator.updateEmail(next);
|
||||
next.setResetFlag(false);
|
||||
}
|
||||
if(next.isNewEmailFlag()){
|
||||
// 网络正常之后才允许创建新的实例
|
||||
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
|
||||
emailServiceManager.init(next);
|
||||
boolean testFlag = emailServiceManager.testConnectEmailServer();
|
||||
if(testFlag){
|
||||
EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
|
||||
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
|
||||
emailParsingActuator.setName(next.getUsername()+"-email-monitor");
|
||||
emailParsingActuator.start();
|
||||
//把邮件监测执行线程加入管理队列
|
||||
emailExecThreadMap.put(next.getId(),emailParsingActuator);
|
||||
//新邮件监测监测线程已启动则修改新邮件标记为false
|
||||
next.setNewEmailFlag(false);
|
||||
try {
|
||||
if(!CollectionUtils.isEmpty(emailMap)){
|
||||
synchronized (lock){
|
||||
Iterator<EmailProperties> iterator = emailMap.values().iterator();
|
||||
while(iterator.hasNext()){
|
||||
EmailProperties next = iterator.next();
|
||||
// ConcurrentModificationException
|
||||
if (next.isResetFlag()) {
|
||||
EmailParsingActuator actuator = emailExecThreadMap.get(next.getId());
|
||||
actuator.updateEmail(next);
|
||||
next.setResetFlag(false);
|
||||
}
|
||||
if(next.isNewEmailFlag()){
|
||||
// 网络正常之后才允许创建新的实例
|
||||
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
|
||||
emailServiceManager.init(next);
|
||||
boolean testFlag = emailServiceManager.testConnectEmailServer();
|
||||
if(testFlag){
|
||||
EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
|
||||
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
|
||||
emailParsingActuator.setName(next.getUsername()+"-email-monitor");
|
||||
emailParsingActuator.start();
|
||||
//把邮件监测执行线程加入管理队列
|
||||
emailExecThreadMap.put(next.getId(),emailParsingActuator);
|
||||
//新邮件监测监测线程已启动则修改新邮件标记为false
|
||||
next.setNewEmailFlag(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
long sleepTime = taskProperties.getMonitoringMailDataCycle() - (end-start);
|
||||
|
@ -120,6 +126,7 @@ public class AutoProcessManager{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -169,6 +176,7 @@ public class AutoProcessManager{
|
|||
//捕获异常不处理保障线程异常不退出
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
long sleepTime = taskProperties.getMonitoringMailCommStatusCycle() - (end-start);
|
||||
|
@ -176,8 +184,10 @@ public class AutoProcessManager{
|
|||
if(sleepTime > 0){
|
||||
try {
|
||||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
// throw new RuntimeException("运行时异常");
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -251,6 +261,7 @@ public class AutoProcessManager{
|
|||
//捕获异常不处理保障线程异常不退出
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
long sleepTime = taskProperties.getMonitoringMailDataCycle() - (end-start);
|
||||
|
@ -260,6 +271,7 @@ public class AutoProcessManager{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -277,45 +289,51 @@ public class AutoProcessManager{
|
|||
public void run() {
|
||||
for(;;){
|
||||
long start = System.currentTimeMillis();
|
||||
if(!CollectionUtils.isEmpty(emailExecThreadMap)){
|
||||
//遍历邮箱执行线程,如果状态为已停止则删除
|
||||
final Iterator<Map.Entry<String, EmailParsingActuator>> checkStopThreads = emailExecThreadMap.entrySet().iterator();
|
||||
while (checkStopThreads.hasNext()){
|
||||
final Map.Entry<String, EmailParsingActuator> next = checkStopThreads.next();
|
||||
if(next.getValue().getState() == State.TERMINATED){
|
||||
log.info("{}邮箱执行线程已停止,emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
|
||||
checkStopThreads.remove();
|
||||
emailMap.remove(next.getKey());
|
||||
try {
|
||||
if(!CollectionUtils.isEmpty(emailExecThreadMap)){
|
||||
//遍历邮箱执行线程,如果状态为已停止则删除
|
||||
final Iterator<Map.Entry<String, EmailParsingActuator>> checkStopThreads = emailExecThreadMap.entrySet().iterator();
|
||||
while (checkStopThreads.hasNext()){
|
||||
final Map.Entry<String, EmailParsingActuator> next = checkStopThreads.next();
|
||||
if(next.getValue().getState() == State.TERMINATED){
|
||||
log.info("{}邮箱执行线程已停止,emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
|
||||
checkStopThreads.remove();
|
||||
emailMap.remove(next.getKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//遍历邮箱执行线程,如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程
|
||||
final Iterator<Map.Entry<String, EmailParsingActuator>> iterator = emailExecThreadMap.entrySet().iterator();
|
||||
emailExecThreadMap.forEach((emailId,emailExecThread)->{
|
||||
try{
|
||||
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
|
||||
final long nowTime = System.currentTimeMillis();
|
||||
final long setStoptime = emailExecThread.getStopTime().getTime();
|
||||
final long val = nowTime - setStoptime;
|
||||
if(val >= taskProperties.getForceDeletedTime()){
|
||||
log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername());
|
||||
emailExecThread.closeResource();
|
||||
//遍历邮箱执行线程,如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程
|
||||
final Iterator<Map.Entry<String, EmailParsingActuator>> iterator = emailExecThreadMap.entrySet().iterator();
|
||||
emailExecThreadMap.forEach((emailId,emailExecThread)->{
|
||||
try{
|
||||
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
|
||||
final long nowTime = System.currentTimeMillis();
|
||||
final long setStoptime = emailExecThread.getStopTime().getTime();
|
||||
final long val = nowTime - setStoptime;
|
||||
if(val >= taskProperties.getForceDeletedTime()){
|
||||
log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername());
|
||||
emailExecThread.closeResource();
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}finally {
|
||||
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
|
||||
final long nowTime = System.currentTimeMillis();
|
||||
final long setStoptime = emailExecThread.getStopTime().getTime();
|
||||
final long val = nowTime - setStoptime;
|
||||
if(val >= taskProperties.getForceDeletedTime()){
|
||||
log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername());
|
||||
emailExecThread.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
}finally {
|
||||
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
|
||||
final long nowTime = System.currentTimeMillis();
|
||||
final long setStoptime = emailExecThread.getStopTime().getTime();
|
||||
final long val = nowTime - setStoptime;
|
||||
if(val >= taskProperties.getForceDeletedTime()){
|
||||
log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername());
|
||||
emailExecThread.stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
long sleepTime = taskProperties.getDeletedMailThreadExecCycle() - (end-start);
|
||||
|
@ -325,6 +343,7 @@ public class AutoProcessManager{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,8 +86,8 @@ public class EmailParsingActuator extends Thread{
|
|||
List<String> messageIds = new ArrayList<>();
|
||||
try {
|
||||
Message[] messages = emailServiceManager.receiveMail();
|
||||
log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), ArrayUtils.isEmpty(messages) ? 0 : messages.length);
|
||||
if(ArrayUtils.isNotEmpty(messages)){
|
||||
log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), messages.length);
|
||||
//检验获取的邮件是否在之前删除失败列表中,若在直接调用邮件API删除,并且此次数组里元素也删除
|
||||
for(int i=messages.length-1;i>=0;i--){
|
||||
if (null == messages[i].getHeader("Message-ID")) {
|
||||
|
@ -122,6 +122,7 @@ public class EmailParsingActuator extends Thread{
|
|||
log.error("EmailParsingActuator has exception: {}", e.getMessage());
|
||||
log.info("Mail-Parsing线程池资源关闭...");
|
||||
closeResource();
|
||||
log.error(e.getMessage(), e);
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
//清除本批次邮件日志缓存
|
||||
|
@ -142,6 +143,8 @@ public class EmailParsingActuator extends Thread{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -316,7 +316,7 @@ public abstract class AbstractSpectrumHandler extends AbstractChain {
|
|||
ex.printStackTrace();
|
||||
}
|
||||
} else if(SpectrumSource.FROM_FILE_SOURCE.getSourceType().equals(spectrumSource) && (e instanceof FileRepeatException)){
|
||||
this.spectrumFile.delete(); // TODO 删除原始谱文件
|
||||
//this.spectrumFile.delete(); // TODO 删除原始谱文件
|
||||
} else if (SpectrumSource.FORM_FILE_UNDEL.getSourceType().equals(spectrumSource) && !(e instanceof FileRepeatException)) {
|
||||
try {
|
||||
if (isDateFormatErr) {
|
||||
|
|
Loading…
Reference in New Issue
Block a user