fix:增加对异常的拦截,修改map为ConcurrentHashMap

This commit is contained in:
xiaoguangbin 2024-06-27 09:58:22 +08:00
parent f6b963145f
commit 1423b2f09d

View File

@ -16,6 +16,7 @@ import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
@ -39,12 +40,12 @@ public class AutoProcessManager{
/**
* 以邮件Id为key邮件信息为value
*/
private Map<String,EmailProperties> emailMap = new HashMap<>();
private Map<String,EmailProperties> emailMap = new ConcurrentHashMap<>();
/**
* 以邮件id为key以邮件执行线程为value
*/
private Map<String,EmailParsingActuator> emailExecThreadMap = new HashMap<>();
private Map<String,EmailParsingActuator> emailExecThreadMap = new ConcurrentHashMap<>();
/**
* 启动自动处理
@ -81,34 +82,40 @@ public class AutoProcessManager{
public void run() {
for(;;){
long start = System.currentTimeMillis();
if(!CollectionUtils.isEmpty(emailMap)){
synchronized (lock){
Iterator<EmailProperties> iterator = emailMap.values().iterator();
while(iterator.hasNext()){
EmailProperties next = iterator.next();
if (next.isResetFlag()) {
EmailParsingActuator actuator = emailExecThreadMap.get(next.getId());
actuator.updateEmail(next);
next.setResetFlag(false);
}
if(next.isNewEmailFlag()){
// 网络正常之后才允许创建新的实例
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(next);
boolean testFlag = emailServiceManager.testConnectEmailServer();
if(testFlag){
EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
emailParsingActuator.setName(next.getUsername()+"-email-monitor");
emailParsingActuator.start();
//把邮件监测执行线程加入管理队列
emailExecThreadMap.put(next.getId(),emailParsingActuator);
//新邮件监测监测线程已启动则修改新邮件标记为false
next.setNewEmailFlag(false);
try {
if(!CollectionUtils.isEmpty(emailMap)){
synchronized (lock){
Iterator<EmailProperties> iterator = emailMap.values().iterator();
while(iterator.hasNext()){
EmailProperties next = iterator.next();
// ConcurrentModificationException
if (next.isResetFlag()) {
EmailParsingActuator actuator = emailExecThreadMap.get(next.getId());
actuator.updateEmail(next);
next.setResetFlag(false);
}
if(next.isNewEmailFlag()){
// 网络正常之后才允许创建新的实例
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(next);
boolean testFlag = emailServiceManager.testConnectEmailServer();
if(testFlag){
EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
emailParsingActuator.setName(next.getUsername()+"-email-monitor");
emailParsingActuator.start();
//把邮件监测执行线程加入管理队列
emailExecThreadMap.put(next.getId(),emailParsingActuator);
//新邮件监测监测线程已启动则修改新邮件标记为false
next.setNewEmailFlag(false);
}
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage(), e);
}
long end = System.currentTimeMillis();
long sleepTime = taskProperties.getMonitoringMailDataCycle() - (end-start);
@ -177,6 +184,7 @@ public class AutoProcessManager{
if(sleepTime > 0){
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
// throw new RuntimeException("运行时异常");
} catch (InterruptedException e) {
e.printStackTrace();
log.error(e.getMessage(), e);
@ -281,46 +289,51 @@ public class AutoProcessManager{
public void run() {
for(;;){
long start = System.currentTimeMillis();
if(!CollectionUtils.isEmpty(emailExecThreadMap)){
//遍历邮箱执行线程如果状态为已停止则删除
final Iterator<Map.Entry<String, EmailParsingActuator>> checkStopThreads = emailExecThreadMap.entrySet().iterator();
while (checkStopThreads.hasNext()){
final Map.Entry<String, EmailParsingActuator> next = checkStopThreads.next();
if(next.getValue().getState() == State.TERMINATED){
log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
checkStopThreads.remove();
emailMap.remove(next.getKey());
try {
if(!CollectionUtils.isEmpty(emailExecThreadMap)){
//遍历邮箱执行线程如果状态为已停止则删除
final Iterator<Map.Entry<String, EmailParsingActuator>> checkStopThreads = emailExecThreadMap.entrySet().iterator();
while (checkStopThreads.hasNext()){
final Map.Entry<String, EmailParsingActuator> next = checkStopThreads.next();
if(next.getValue().getState() == State.TERMINATED){
log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
checkStopThreads.remove();
emailMap.remove(next.getKey());
}
}
}
//遍历邮箱执行线程如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程
final Iterator<Map.Entry<String, EmailParsingActuator>> iterator = emailExecThreadMap.entrySet().iterator();
emailExecThreadMap.forEach((emailId,emailExecThread)->{
try{
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername());
emailExecThread.closeResource();
//遍历邮箱执行线程如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程
final Iterator<Map.Entry<String, EmailParsingActuator>> iterator = emailExecThreadMap.entrySet().iterator();
emailExecThreadMap.forEach((emailId,emailExecThread)->{
try{
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername());
emailExecThread.closeResource();
}
}
}catch (Exception e){
e.printStackTrace();
log.error(e.getMessage(), e);
}finally {
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername());
emailExecThread.stop();
}
}
}
}catch (Exception e){
e.printStackTrace();
log.error(e.getMessage(), e);
}finally {
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername());
emailExecThread.stop();
}
}
}
});
});
}
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage(), e);
}
long end = System.currentTimeMillis();
long sleepTime = taskProperties.getDeletedMailThreadExecCycle() - (end-start);