fix:1.重构AutoManager解决停止邮件服务后,主线程及线程池不能真正停止问题2.修改启用关闭邮件,邮件数据检测线程没有正确响应问题3.解决邮件一直下载重复下载问题

This commit is contained in:
panbaolin 2024-02-24 22:50:19 +08:00
parent d5f7729976
commit 5cfa3e62d3
5 changed files with 197 additions and 139 deletions

View File

@ -97,7 +97,7 @@ public class EmailServiceManager {
Socket socket = new Socket();
boolean flag = false;
try {
socket.connect(new InetSocketAddress(email.getEmailServerAddress(),email.getPort()),5000);
socket.connect(new InetSocketAddress(email.getEmailServerAddress(),email.getPort()),3000);
log.info("{}邮件服务连接测试成功",email.getName());
flag = true;
} catch (IOException e) {
@ -556,24 +556,24 @@ public class EmailServiceManager {
final String rootPath = spectrumPathProperties.getRootPath();
final String emlPath = spectrumPathProperties.getEmlPath();
emlFile = new File(rootPath+emlPath+File.separator+fileName);
// message.writeTo(new FileOutputStream(emlFile));
message.writeTo(new FileOutputStream(emlFile));
int bufferSize = 1024 * 1024; // 1M
InputStream inputStream = message.getInputStream();
BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize);
// 或者使用 BufferedOutputStream
OutputStream outputStream = new FileOutputStream(emlFile);
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize);
// 从邮件的输入流读取内容并写入到本地文件
byte[] buffer = new byte[bufferSize];
int bytesRead;
while ((bytesRead = bufferedInputStream.read(buffer)) != -1) {
bufferedOutputStream.write(buffer, 0, bytesRead);
}
// 关闭流
bufferedInputStream.close();
bufferedOutputStream.close();
// int bufferSize = 1024 * 1024; // 1M
// InputStream inputStream = message.getInputStream();
// BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize);
// // 或者使用 BufferedOutputStream
// OutputStream outputStream = new FileOutputStream(emlFile);
// BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize);
// // 从邮件的输入流读取内容并写入到本地文件
// byte[] buffer = new byte[bufferSize];
// int bytesRead;
// while ((bytesRead = bufferedInputStream.read(buffer)) != -1) {
// bufferedOutputStream.write(buffer, 0, bytesRead);
// }
//
// // 关闭流
// bufferedInputStream.close();
// bufferedOutputStream.close();
} catch (MessagingException | IOException e) {
// 下载邮件失败 抛出自定义邮件下载异常
status = EmailLogManager.STATUS_ERROR;
@ -601,6 +601,7 @@ public class EmailServiceManager {
Date receivedDate = null;
try {
subject = MimeUtility.decodeText(message.getSubject());
receivedDate = message.getReceivedDate();
message.setFlag(Flags.Flag.DELETED,true);
log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
} catch (MessagingException | UnsupportedEncodingException e) {

View File

@ -41,6 +41,16 @@ public class TaskProperties implements Serializable {
*/
private Integer mailThreadExecCycle;
/**
* 监测需删除的邮件线程执行周期毫秒
*/
private Integer deletedMailThreadExecCycle;
/**
* 强制删除邮件线程时间毫秒
*/
private Integer forceDeletedTime;
/**
* undeal目录文件获取周期
*/

View File

@ -15,6 +15,7 @@ import org.jeecg.modules.spectrum.SpectrumServiceQuotes;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
@ -66,6 +67,10 @@ public class AutoProcessManager{
final MailExecManager autoProcessThread = new MailExecManager();
autoProcessThread.setName("mail-exec-thread-manage");
autoProcessThread.start();
//删除邮件执行线程管理
final DeletedMailMonitor detedMailMonitor = new DeletedMailMonitor();
detedMailMonitor.setName("deleted-mail-monitor");
detedMailMonitor.start();
}
/**
@ -78,52 +83,30 @@ public class AutoProcessManager{
for(;;){
long start = System.currentTimeMillis();
if(!CollectionUtils.isEmpty(emailMap)){
Iterator<EmailProperties> iterator = emailMap.values().iterator();
while(iterator.hasNext()){
EmailProperties next = iterator.next();
if(next.isDelFlag()){
if(emailExecThreadMap.containsKey(next.getId())){
synchronized (lock){
Iterator<EmailProperties> iterator = emailMap.values().iterator();
while(iterator.hasNext()){
EmailProperties next = iterator.next();
if (next.isResetFlag()) {
EmailParsingActuator actuator = emailExecThreadMap.get(next.getId());
actuator.setStop(true);
// 二十秒内 如果现在没有停止将强制停止
for(int i=1;i<=20;i++){
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
if(i==20 && actuator.isAlive()){
//关闭线程内部资源例如线程池
actuator.closeResource();
actuator.stop();
}
if(!actuator.isAlive()){
break;
}
}
emailExecThreadMap.remove(next.getId());
actuator.updateEmail(next);
next.setResetFlag(false);
}
iterator.remove();
}
if (next.isResetFlag()) {
EmailParsingActuator actuator = emailExecThreadMap.get(next.getId());
actuator.updateEmail(next);
next.setResetFlag(false);
}
if(next.isNewEmailFlag()){
// 网络正常之后才允许创建新的实例
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(next);
boolean testFlag = emailServiceManager.testConnectEmailServer();
if(testFlag){
EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
emailParsingActuator.setName(next.getUsername()+"-email-monitor");
emailParsingActuator.start();
//把邮件监测执行线程加入管理队列
emailExecThreadMap.put(next.getId(),emailParsingActuator);
//新邮件监测监测线程已启动则修改新邮件标记为false
next.setNewEmailFlag(false);
if(next.isNewEmailFlag()){
// 网络正常之后才允许创建新的实例
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(next);
boolean testFlag = emailServiceManager.testConnectEmailServer();
if(testFlag){
EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
emailParsingActuator.setName(next.getUsername()+"-email-monitor");
emailParsingActuator.start();
//把邮件监测执行线程加入管理队列
emailExecThreadMap.put(next.getId(),emailParsingActuator);
//新邮件监测监测线程已启动则修改新邮件标记为false
next.setNewEmailFlag(false);
}
}
}
}
@ -155,21 +138,33 @@ public class AutoProcessManager{
long start = System.currentTimeMillis();
try{
if(!CollectionUtils.isEmpty(emailMap)){
emailMap.values().forEach(email->{
if(!email.isDelFlag()){
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(email);
boolean testFlag = emailServiceManager.testConnectEmailServer();
// redisUtil.hset(EmailConstant.EMAIL_STATUS_PREFIX,email.getId(),testFlag);
if(testFlag && !emailExecThreadMap.containsKey(email.getId())){
email.setNewEmailFlag(true);
}
if(!testFlag){
//如果邮件服务通信测试失败则添加删除标记
email.setDelFlag(true);
synchronized (lock){
//记录连接测试失败的邮箱
List<EmailProperties> emails = new ArrayList<>();
emailMap.values().forEach(email->{
if(!email.isDelFlag()){
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(email);
boolean testFlag = emailServiceManager.testConnectEmailServer();
if(!testFlag){
emails.add(email);
}
}
});
//如果emailIds不为空说明此次有测试连接失败的邮箱直接删除
if(!CollectionUtils.isEmpty(emails)){
emails.forEach(email->{
emailMap.remove(email.getId());
//如果这时邮箱线程里已有执行的线程则设置停止标记
if(emailExecThreadMap.containsKey(email.getId())){
EmailParsingActuator actuator = emailExecThreadMap.get(email.getId());
actuator.setStop(true);
actuator.setStopTime(new Date());
}
log.info("{}邮箱测试连接失败emailMap删除此邮箱数据emailExecThreadMap设置线程停止标记",email.getUsername());
});
}
});
}
}
//捕获异常不处理保障线程异常不退出
}catch (Exception e){
@ -204,41 +199,48 @@ public class AutoProcessManager{
try{
final List<EmailProperties> receiveMails = mailService.findReceiveMails();
if(!CollectionUtils.isEmpty(receiveMails)){
//如果库里已有数据原来已开启使用并且监测Map中已存在现在关闭使用则添加删除标记
//如果本次查询数据监测Map中不存在并且已开启使用的则加入监测Map
for(EmailProperties email : receiveMails){
//判断map里是否包含邮箱id
final boolean flag = emailMap.containsKey(email.getId());
//如果包含邮箱id 并且 邮箱处于未启用的状态 将邮箱的删除标识设置为true
if(flag && email.getEnabled().equals(SysMailEnableType.NOT_ENABLE.getMailEnableType())){
EmailProperties sourceEmail = emailMap.get(email.getId());
sourceEmail.setDelFlag(true);
}
//如果包含邮箱id 并且 邮箱处于启用状态 将邮箱数据进行更新
if (flag && email.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())) {
EmailProperties properties = emailMap.get(email.getId());
//判断邮箱的用户名密码是否与原邮箱用户名密码一致
//如果不一致则进行更新
if ( !properties.getUsername().equals(email.getUsername()) || !properties.getPassword().equals(email.getPassword()) ) {
email.setResetFlag(true);
putSysEmailMap(email);
log.info("{}邮箱加入监测队列",email.getEmailServerAddress());
final Iterator<EmailProperties> iterator = receiveMails.iterator();
synchronized (lock){
while(iterator.hasNext()){
EmailProperties databaseEmail = iterator.next();
//判断map里存储的邮箱数据是否包含当前数据库邮箱记录的id
if(emailMap.containsKey(databaseEmail.getId())){
final EmailProperties mapEmail = emailMap.get(databaseEmail.getId());
//如果数据库里邮箱数据已关闭则删除
if(databaseEmail.getEnabled().equals(SysMailEnableType.NOT_ENABLE.getMailEnableType())){
emailMap.remove(mapEmail.getId());
//如果这时邮箱线程里已有执行的线程则设置停止标记
if(emailExecThreadMap.containsKey(databaseEmail.getId())){
EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId());
actuator.setStop(true);
actuator.setStopTime(new Date());
}
log.info("{}邮箱已关闭服务emailMap删除此邮箱数据emailExecThreadMap设置线程停止标记",mapEmail.getUsername());
}else{
//判断邮箱的用户名密码是否与原邮箱用户名密码一致如果不一致则进行更新
if (!mapEmail.getUsername().equals(databaseEmail.getUsername()) || !mapEmail.getPassword().equals(databaseEmail.getPassword())
|| !mapEmail.getEmailServerAddress().equals(databaseEmail.getEmailServerAddress())) {
final boolean testFlag = testConnectEmailServer(databaseEmail);
if(testFlag){
databaseEmail.setResetFlag(true);
emailMap.put(databaseEmail.getId(),databaseEmail);
log.info("{}邮箱数据已更新,覆盖原邮箱数据,设置重置标记",mapEmail.getUsername());
}
}
}
}else{
//如果不包含邮箱id 并且 邮箱处于启用状态 将邮箱对象存入到map中 并将新邮箱标识设置为true
if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){
final boolean testFlag = testConnectEmailServer(databaseEmail);
if(testFlag){
databaseEmail.setNewEmailFlag(true);
emailMap.put(databaseEmail.getId(),databaseEmail);
log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername());
}
}
}
}
//如果不包含邮箱id 并且 邮箱处于启用状态 将邮箱对象存入到map中 并将新邮箱标识设置为true
if(!flag && email.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){
email.setNewEmailFlag(true);
putSysEmailMap(email);
log.info("{}邮箱加入监测队列",email.getEmailServerAddress());
}
}
//如果监测Map中存在的邮箱数据在本次查询数据中不存在说明库里已删除则添加删除标记
emailMap.forEach((emailId,sourceEmail)->{
final long result = receiveMails.stream().filter(email -> emailId.equals(email.getId())).count();
if (result <= 0){
sourceEmail.setDelFlag(true);
}
});
}
//捕获异常不处理保障线程异常不退出
}catch (Exception e){
@ -259,38 +261,78 @@ public class AutoProcessManager{
}
/**
* 新增邮箱数据
* @param email
* 监测已设置删除标记的邮箱线程
* isStop值为true的邮箱线程
* 20秒还未停止则进行强制停止
*/
private void putSysEmailMap(EmailProperties email){
synchronized (this.lock){
emailMap.put(email.getId(),email);
}
}
private class DeletedMailMonitor extends Thread{
/**
* 删除邮箱数据
* @param emailId
*/
private void removeSysEmailMap(String emailId){
synchronized (this.lock){
if (emailMap.containsKey(emailId)){
emailMap.remove(emailId);
}
}
}
@Override
public void run() {
for(;;){
long start = System.currentTimeMillis();
if(!CollectionUtils.isEmpty(emailExecThreadMap)){
//遍历邮箱执行线程如果状态为已停止则删除
final Iterator<Map.Entry<String, EmailParsingActuator>> checkStopThreads = emailExecThreadMap.entrySet().iterator();
while (checkStopThreads.hasNext()){
final Map.Entry<String, EmailParsingActuator> next = checkStopThreads.next();
if(next.getValue().getState() == State.TERMINATED){
log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
checkStopThreads.remove();
}
}
/**
* 删除邮箱数据
* @param sysEmailIds
*/
private void removeSysEmailMap(List<String> sysEmailIds){
synchronized (this.lock){
for(String sysEmailId : sysEmailIds){
if (emailMap.containsKey(sysEmailId)){
emailMap.remove(sysEmailId);
//遍历邮箱执行线程如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程
final Iterator<Map.Entry<String, EmailParsingActuator>> iterator = emailExecThreadMap.entrySet().iterator();
emailExecThreadMap.forEach((emailId,emailExecThread)->{
try{
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername());
emailExecThread.closeResource();
}
}
}catch (Exception e){
e.printStackTrace();
}finally {
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
final long nowTime = System.currentTimeMillis();
final long setStoptime = emailExecThread.getStopTime().getTime();
final long val = nowTime - setStoptime;
if(val >= taskProperties.getForceDeletedTime()){
log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername());
emailExecThread.stop();
}
}
}
});
}
long end = System.currentTimeMillis();
long sleepTime = taskProperties.getDeletedMailThreadExecCycle() - (end-start);
//如果sleepTime > 0 需要睡眠到指定时间否则继续下次监测
if(sleepTime > 0){
try {
TimeUnit.MILLISECONDS.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
/**
* 测试邮箱连通性
* @param email
* @return
*/
private boolean testConnectEmailServer(EmailProperties email){
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(email);
boolean testFlag = emailServiceManager.testConnectEmailServer();
return testFlag;
}
}

View File

@ -1,5 +1,6 @@
package org.jeecg.modules;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
@ -29,13 +30,16 @@ import java.util.concurrent.*;
public class EmailParsingActuator extends Thread{
private TaskProperties taskProperties;
@Getter
private EmailProperties emailProperties;
private ThreadPoolExecutor poolExecutor;
private SpectrumServiceQuotes spectrumServiceQuotes;
private EmailCounter emailCounter;
private Date systemStartupTime;
@Setter
@Setter @Getter
private boolean isStop;
@Setter @Getter
private Date stopTime;
public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes,
EmailCounter emailCounter,Date systemStartupTime){
@ -64,6 +68,7 @@ public class EmailParsingActuator extends Thread{
if (isStop) {
String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!");
closeResource();
return;
}
long start = System.currentTimeMillis();

View File

@ -70,11 +70,11 @@ public class JeecgAutoProcessApplication extends SpringBootServletInitializer im
public void run(String... args) throws Exception {
//调用dll
//Windows加载dll工具库
// System.loadLibrary("ReadPHDFile");
// System.loadLibrary("GammaAnaly");
System.loadLibrary("ReadPHDFile");
System.loadLibrary("GammaAnaly");
//Linux版本加载dll工具库
System.load("/usr/local/jdk/lib/libReadPHDFile.so");
System.load("/usr/local/jdk/lib/libGammaAnalyALG.so");
// System.load("/usr/local/jdk/lib/libReadPHDFile.so");
// System.load("/usr/local/jdk/lib/libGammaAnalyALG.so");
nuclLibService.getNuclideMap();
//根据配置文件配置邮件获取策略定义时间条件默认EmailReceivePolicy.HISTORY_ORDER_RECEIVE.getPolicy()
Date systemStartupTime = null;