diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java index 95822640..7d7e8131 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java @@ -97,7 +97,7 @@ public class EmailServiceManager { Socket socket = new Socket(); boolean flag = false; try { - socket.connect(new InetSocketAddress(email.getEmailServerAddress(),email.getPort()),5000); + socket.connect(new InetSocketAddress(email.getEmailServerAddress(),email.getPort()),3000); log.info("{}邮件服务连接测试成功",email.getName()); flag = true; } catch (IOException e) { @@ -556,24 +556,24 @@ public class EmailServiceManager { final String rootPath = spectrumPathProperties.getRootPath(); final String emlPath = spectrumPathProperties.getEmlPath(); emlFile = new File(rootPath+emlPath+File.separator+fileName); -// message.writeTo(new FileOutputStream(emlFile)); + message.writeTo(new FileOutputStream(emlFile)); - int bufferSize = 1024 * 1024; // 1M - InputStream inputStream = message.getInputStream(); - BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize); - // 或者使用 BufferedOutputStream - OutputStream outputStream = new FileOutputStream(emlFile); - BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize); - // 从邮件的输入流读取内容,并写入到本地文件 - byte[] buffer = new byte[bufferSize]; - int bytesRead; - while ((bytesRead = bufferedInputStream.read(buffer)) != -1) { - bufferedOutputStream.write(buffer, 0, bytesRead); - } - - // 关闭流 - bufferedInputStream.close(); - bufferedOutputStream.close(); +// int bufferSize = 1024 * 1024; // 1M +// InputStream inputStream = message.getInputStream(); +// BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize); +// // 或者使用 BufferedOutputStream +// OutputStream outputStream = new FileOutputStream(emlFile); +// BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize); +// // 从邮件的输入流读取内容,并写入到本地文件 +// byte[] buffer = new byte[bufferSize]; +// int bytesRead; +// while ((bytesRead = bufferedInputStream.read(buffer)) != -1) { +// bufferedOutputStream.write(buffer, 0, bytesRead); +// } +// +// // 关闭流 +// bufferedInputStream.close(); +// bufferedOutputStream.close(); } catch (MessagingException | IOException e) { // 下载邮件失败 抛出自定义邮件下载异常 status = EmailLogManager.STATUS_ERROR; @@ -601,6 +601,7 @@ public class EmailServiceManager { Date receivedDate = null; try { subject = MimeUtility.decodeText(message.getSubject()); + receivedDate = message.getReceivedDate(); message.setFlag(Flags.Flag.DELETED,true); log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss")); } catch (MessagingException | UnsupportedEncodingException e) { diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java index a7594460..34f9e294 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/properties/TaskProperties.java @@ -41,6 +41,16 @@ public class TaskProperties implements Serializable { */ private Integer mailThreadExecCycle; + /** + * 监测需删除的邮件线程执行周期(毫秒) + */ + private Integer deletedMailThreadExecCycle; + + /** + * 强制删除邮件线程时间(毫秒) + */ + private Integer forceDeletedTime; + /** * undeal目录文件获取周期 */ diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java index c237653e..3a0880dd 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java @@ -15,6 +15,7 @@ import org.jeecg.modules.spectrum.SpectrumServiceQuotes; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; +import java.io.*; import java.util.*; import java.util.concurrent.TimeUnit; @@ -66,6 +67,10 @@ public class AutoProcessManager{ final MailExecManager autoProcessThread = new MailExecManager(); autoProcessThread.setName("mail-exec-thread-manage"); autoProcessThread.start(); + //删除邮件执行线程管理 + final DeletedMailMonitor detedMailMonitor = new DeletedMailMonitor(); + detedMailMonitor.setName("deleted-mail-monitor"); + detedMailMonitor.start(); } /** @@ -78,52 +83,30 @@ public class AutoProcessManager{ for(;;){ long start = System.currentTimeMillis(); if(!CollectionUtils.isEmpty(emailMap)){ - Iterator iterator = emailMap.values().iterator(); - while(iterator.hasNext()){ - EmailProperties next = iterator.next(); - if(next.isDelFlag()){ - if(emailExecThreadMap.containsKey(next.getId())){ + synchronized (lock){ + Iterator iterator = emailMap.values().iterator(); + while(iterator.hasNext()){ + EmailProperties next = iterator.next(); + if (next.isResetFlag()) { EmailParsingActuator actuator = emailExecThreadMap.get(next.getId()); - actuator.setStop(true); - // 二十秒内 如果现在没有停止将强制停止 - for(int i=1;i<=20;i++){ - try { - TimeUnit.SECONDS.sleep(1); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - if(i==20 && actuator.isAlive()){ - //关闭线程内部资源,例如:线程池 - actuator.closeResource(); - actuator.stop(); - } - if(!actuator.isAlive()){ - break; - } - } - emailExecThreadMap.remove(next.getId()); + actuator.updateEmail(next); + next.setResetFlag(false); } - iterator.remove(); - } - if (next.isResetFlag()) { - EmailParsingActuator actuator = emailExecThreadMap.get(next.getId()); - actuator.updateEmail(next); - next.setResetFlag(false); - } - if(next.isNewEmailFlag()){ - // 网络正常之后才允许创建新的实例 - final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); - emailServiceManager.init(next); - boolean testFlag = emailServiceManager.testConnectEmailServer(); - if(testFlag){ - EmailParsingActuator emailParsingActuator = new EmailParsingActuator(); - emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime); - emailParsingActuator.setName(next.getUsername()+"-email-monitor"); - emailParsingActuator.start(); - //把邮件监测执行线程加入管理队列 - emailExecThreadMap.put(next.getId(),emailParsingActuator); - //新邮件监测监测线程已启动则修改新邮件标记为false - next.setNewEmailFlag(false); + if(next.isNewEmailFlag()){ + // 网络正常之后才允许创建新的实例 + final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); + emailServiceManager.init(next); + boolean testFlag = emailServiceManager.testConnectEmailServer(); + if(testFlag){ + EmailParsingActuator emailParsingActuator = new EmailParsingActuator(); + emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime); + emailParsingActuator.setName(next.getUsername()+"-email-monitor"); + emailParsingActuator.start(); + //把邮件监测执行线程加入管理队列 + emailExecThreadMap.put(next.getId(),emailParsingActuator); + //新邮件监测监测线程已启动则修改新邮件标记为false + next.setNewEmailFlag(false); + } } } } @@ -155,21 +138,33 @@ public class AutoProcessManager{ long start = System.currentTimeMillis(); try{ if(!CollectionUtils.isEmpty(emailMap)){ - emailMap.values().forEach(email->{ - if(!email.isDelFlag()){ - final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); - emailServiceManager.init(email); - boolean testFlag = emailServiceManager.testConnectEmailServer(); - // redisUtil.hset(EmailConstant.EMAIL_STATUS_PREFIX,email.getId(),testFlag); - if(testFlag && !emailExecThreadMap.containsKey(email.getId())){ - email.setNewEmailFlag(true); - } - if(!testFlag){ - //如果邮件服务通信测试失败则添加删除标记 - email.setDelFlag(true); + synchronized (lock){ + //记录连接测试失败的邮箱 + List emails = new ArrayList<>(); + emailMap.values().forEach(email->{ + if(!email.isDelFlag()){ + final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); + emailServiceManager.init(email); + boolean testFlag = emailServiceManager.testConnectEmailServer(); + if(!testFlag){ + emails.add(email); + } } + }); + //如果emailIds不为空,说明此次有测试连接失败的邮箱,直接删除 + if(!CollectionUtils.isEmpty(emails)){ + emails.forEach(email->{ + emailMap.remove(email.getId()); + //如果这时邮箱线程里已有执行的线程则设置停止标记 + if(emailExecThreadMap.containsKey(email.getId())){ + EmailParsingActuator actuator = emailExecThreadMap.get(email.getId()); + actuator.setStop(true); + actuator.setStopTime(new Date()); + } + log.info("{}邮箱测试连接失败,emailMap删除此邮箱数据,emailExecThreadMap设置线程停止标记",email.getUsername()); + }); } - }); + } } //捕获异常不处理保障线程异常不退出 }catch (Exception e){ @@ -204,41 +199,48 @@ public class AutoProcessManager{ try{ final List receiveMails = mailService.findReceiveMails(); if(!CollectionUtils.isEmpty(receiveMails)){ - //如果库里已有数据原来已开启使用并且监测Map中已存在,现在关闭使用则添加删除标记 - //如果本次查询数据监测Map中不存在,并且已开启使用的则加入监测Map - for(EmailProperties email : receiveMails){ - //判断map里是否包含邮箱id - final boolean flag = emailMap.containsKey(email.getId()); - //如果包含邮箱id 并且 邮箱处于未启用的状态 将邮箱的删除标识设置为true - if(flag && email.getEnabled().equals(SysMailEnableType.NOT_ENABLE.getMailEnableType())){ - EmailProperties sourceEmail = emailMap.get(email.getId()); - sourceEmail.setDelFlag(true); - } - //如果包含邮箱id 并且 邮箱处于启用状态 将邮箱数据进行更新 - if (flag && email.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())) { - EmailProperties properties = emailMap.get(email.getId()); - //判断邮箱的用户名,密码是否与原邮箱用户名,密码一致 - //如果不一致则进行更新 - if ( !properties.getUsername().equals(email.getUsername()) || !properties.getPassword().equals(email.getPassword()) ) { - email.setResetFlag(true); - putSysEmailMap(email); - log.info("{}邮箱加入监测队列",email.getEmailServerAddress()); + final Iterator iterator = receiveMails.iterator(); + synchronized (lock){ + while(iterator.hasNext()){ + EmailProperties databaseEmail = iterator.next(); + //判断map里存储的邮箱数据是否包含当前数据库邮箱记录的id + if(emailMap.containsKey(databaseEmail.getId())){ + final EmailProperties mapEmail = emailMap.get(databaseEmail.getId()); + //如果数据库里邮箱数据已关闭,则删除 + if(databaseEmail.getEnabled().equals(SysMailEnableType.NOT_ENABLE.getMailEnableType())){ + emailMap.remove(mapEmail.getId()); + //如果这时邮箱线程里已有执行的线程则设置停止标记 + if(emailExecThreadMap.containsKey(databaseEmail.getId())){ + EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId()); + actuator.setStop(true); + actuator.setStopTime(new Date()); + } + log.info("{}邮箱已关闭服务,emailMap删除此邮箱数据,emailExecThreadMap设置线程停止标记",mapEmail.getUsername()); + }else{ + //判断邮箱的用户名,密码是否与原邮箱用户名,密码一致,如果不一致则进行更新 + if (!mapEmail.getUsername().equals(databaseEmail.getUsername()) || !mapEmail.getPassword().equals(databaseEmail.getPassword()) + || !mapEmail.getEmailServerAddress().equals(databaseEmail.getEmailServerAddress())) { + final boolean testFlag = testConnectEmailServer(databaseEmail); + if(testFlag){ + databaseEmail.setResetFlag(true); + emailMap.put(databaseEmail.getId(),databaseEmail); + log.info("{}邮箱数据已更新,覆盖原邮箱数据,设置重置标记",mapEmail.getUsername()); + } + } + } + }else{ + //如果不包含邮箱id 并且 邮箱处于启用状态 将邮箱对象存入到map中 并将新邮箱标识设置为true + if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){ + final boolean testFlag = testConnectEmailServer(databaseEmail); + if(testFlag){ + databaseEmail.setNewEmailFlag(true); + emailMap.put(databaseEmail.getId(),databaseEmail); + log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername()); + } + } } } - //如果不包含邮箱id 并且 邮箱处于启用状态 将邮箱对象存入到map中 并将新邮箱标识设置为true - if(!flag && email.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){ - email.setNewEmailFlag(true); - putSysEmailMap(email); - log.info("{}邮箱加入监测队列",email.getEmailServerAddress()); - } } - //如果监测Map中存在的邮箱数据,在本次查询数据中不存在说明库里已删除,则添加删除标记 - emailMap.forEach((emailId,sourceEmail)->{ - final long result = receiveMails.stream().filter(email -> emailId.equals(email.getId())).count(); - if (result <= 0){ - sourceEmail.setDelFlag(true); - } - }); } //捕获异常不处理保障线程异常不退出 }catch (Exception e){ @@ -259,38 +261,78 @@ public class AutoProcessManager{ } /** - * 新增邮箱数据 - * @param email + * 监测已设置删除标记的邮箱线程 + * isStop值为true的邮箱线程 + * 20秒还未停止则进行强制停止 */ - private void putSysEmailMap(EmailProperties email){ - synchronized (this.lock){ - emailMap.put(email.getId(),email); - } - } + private class DeletedMailMonitor extends Thread{ - /** - * 删除邮箱数据 - * @param emailId - */ - private void removeSysEmailMap(String emailId){ - synchronized (this.lock){ - if (emailMap.containsKey(emailId)){ - emailMap.remove(emailId); - } - } - } + @Override + public void run() { + for(;;){ + long start = System.currentTimeMillis(); + if(!CollectionUtils.isEmpty(emailExecThreadMap)){ + //遍历邮箱执行线程,如果状态为已停止则删除 + final Iterator> checkStopThreads = emailExecThreadMap.entrySet().iterator(); + while (checkStopThreads.hasNext()){ + final Map.Entry next = checkStopThreads.next(); + if(next.getValue().getState() == State.TERMINATED){ + log.info("{}邮箱执行线程已停止,emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername()); + checkStopThreads.remove(); + } + } - /** - * 删除邮箱数据 - * @param sysEmailIds - */ - private void removeSysEmailMap(List sysEmailIds){ - synchronized (this.lock){ - for(String sysEmailId : sysEmailIds){ - if (emailMap.containsKey(sysEmailId)){ - emailMap.remove(sysEmailId); + //遍历邮箱执行线程,如果邮箱执行线程stop属性已被设置为true则关闭资源停止线程 + final Iterator> iterator = emailExecThreadMap.entrySet().iterator(); + emailExecThreadMap.forEach((emailId,emailExecThread)->{ + try{ + if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){ + final long nowTime = System.currentTimeMillis(); + final long setStoptime = emailExecThread.getStopTime().getTime(); + final long val = nowTime - setStoptime; + if(val >= taskProperties.getForceDeletedTime()){ + log.info("关闭{}邮箱线程内部线程池资源",emailExecThread.getEmailProperties().getUsername()); + emailExecThread.closeResource(); + } + } + }catch (Exception e){ + e.printStackTrace(); + }finally { + if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){ + final long nowTime = System.currentTimeMillis(); + final long setStoptime = emailExecThread.getStopTime().getTime(); + final long val = nowTime - setStoptime; + if(val >= taskProperties.getForceDeletedTime()){ + log.info("强制停止{}邮箱线程",emailExecThread.getEmailProperties().getUsername()); + emailExecThread.stop(); + } + } + } + }); + } + long end = System.currentTimeMillis(); + long sleepTime = taskProperties.getDeletedMailThreadExecCycle() - (end-start); + //如果sleepTime > 0 需要睡眠到指定时间,否则继续下次监测 + if(sleepTime > 0){ + try { + TimeUnit.MILLISECONDS.sleep(sleepTime); + } catch (InterruptedException e) { + e.printStackTrace(); + } } } } } + + /** + * 测试邮箱连通性 + * @param email + * @return + */ + private boolean testConnectEmailServer(EmailProperties email){ + final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); + emailServiceManager.init(email); + boolean testFlag = emailServiceManager.testConnectEmailServer(); + return testFlag; + } } diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java index 90c1feef..8abbeb06 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java @@ -1,5 +1,6 @@ package org.jeecg.modules; +import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.ArrayUtils; @@ -29,13 +30,16 @@ import java.util.concurrent.*; public class EmailParsingActuator extends Thread{ private TaskProperties taskProperties; + @Getter private EmailProperties emailProperties; private ThreadPoolExecutor poolExecutor; private SpectrumServiceQuotes spectrumServiceQuotes; private EmailCounter emailCounter; private Date systemStartupTime; - @Setter + @Setter @Getter private boolean isStop; + @Setter @Getter + private Date stopTime; public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes, EmailCounter emailCounter,Date systemStartupTime){ @@ -64,6 +68,7 @@ public class EmailParsingActuator extends Thread{ if (isStop) { String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss"); log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!"); + closeResource(); return; } long start = System.currentTimeMillis(); diff --git a/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java b/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java index e8389cec..c2ca5b2f 100644 --- a/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java +++ b/jeecg-server-cloud/armd-auto-process-start/src/main/java/org/jeecg/JeecgAutoProcessApplication.java @@ -70,11 +70,11 @@ public class JeecgAutoProcessApplication extends SpringBootServletInitializer im public void run(String... args) throws Exception { //调用dll //Windows加载dll工具库 -// System.loadLibrary("ReadPHDFile"); -// System.loadLibrary("GammaAnaly"); + System.loadLibrary("ReadPHDFile"); + System.loadLibrary("GammaAnaly"); //Linux版本加载dll工具库 - System.load("/usr/local/jdk/lib/libReadPHDFile.so"); - System.load("/usr/local/jdk/lib/libGammaAnalyALG.so"); +// System.load("/usr/local/jdk/lib/libReadPHDFile.so"); +// System.load("/usr/local/jdk/lib/libGammaAnalyALG.so"); nuclLibService.getNuclideMap(); //根据配置文件配置邮件获取策略定义时间条件,默认EmailReceivePolicy.HISTORY_ORDER_RECEIVE.getPolicy() Date systemStartupTime = null;