Merge remote-tracking branch 'refs/remotes/origin/sleepDownload' into noFtp-seflStation
# Conflicts: # jeecg-boot-base-core/src/main/java/org/jeecg/common/email/EmailServiceManager.java # jeecg-module-auto-process/src/main/java/org/jeecg/modules/AutoProcessManager.java # jeecg-module-auto-process/src/main/java/org/jeecg/modules/EmailParsingActuator.java
This commit is contained in:
commit
0a560452bf
|
@ -1,16 +1,11 @@
|
|||
package org.jeecg.common.email;
|
||||
|
||||
import cn.hutool.core.io.FileUtil;
|
||||
import cn.hutool.core.util.ArrayUtil;
|
||||
import cn.hutool.core.util.ObjectUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.sun.mail.imap.IMAPStore;
|
||||
import com.sun.mail.smtp.SMTPAddressFailedException;
|
||||
import io.swagger.models.auth.In;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.Charsets;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jeecg.common.api.dto.message.MessageDTO;
|
||||
import org.jeecg.common.constant.RedisConstant;
|
||||
|
@ -21,7 +16,6 @@ import org.jeecg.common.exception.DownloadEmailException;
|
|||
import org.jeecg.common.properties.SpectrumPathProperties;
|
||||
import org.jeecg.common.properties.TaskProperties;
|
||||
import org.jeecg.common.util.DateUtils;
|
||||
import org.jeecg.common.util.Md5Util;
|
||||
import org.jeecg.common.util.RedisUtil;
|
||||
import org.jeecg.modules.base.entity.postgre.SysEmail;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
@ -36,6 +30,8 @@ import java.io.*;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
|
@ -70,6 +66,8 @@ public class EmailServiceManager {
|
|||
|
||||
private Object downloadEmlLocal = new Object();
|
||||
|
||||
private final ReentrantLock lock = new ReentrantLock();
|
||||
|
||||
@NotNull
|
||||
public static EmailServiceManager getInstance(){
|
||||
return new EmailServiceManager();
|
||||
|
@ -126,7 +124,7 @@ public class EmailServiceManager {
|
|||
/**
|
||||
* 接收邮件
|
||||
*/
|
||||
public Message[] receiveMail() throws MessagingException {
|
||||
public Message[] receiveMail() throws Exception {
|
||||
String status = EmailLogManager.STATUS_SUCCESS;
|
||||
try{
|
||||
//配置邮件服务属性
|
||||
|
@ -134,6 +132,8 @@ public class EmailServiceManager {
|
|||
properties.put("mail.store.protocol", "imap");
|
||||
properties.put("mail.imap.host", email.getEmailServerAddress());
|
||||
properties.put("mail.imap.port",email.getPort());
|
||||
properties.put("mail.imap.connectiontimeout", "3000"); // 设置连接超时时间为3秒
|
||||
properties.put("mail.imap.timeout", "3000"); // 设置读取超时时间为3秒
|
||||
if (email.getIsQiye() == 1) {
|
||||
properties.put("mail.imap.ssl.enable", "true");
|
||||
} else {
|
||||
|
@ -176,6 +176,7 @@ public class EmailServiceManager {
|
|||
return o1.getReceivedDate().compareTo(o2.getReceivedDate());
|
||||
} catch (MessagingException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
|
@ -185,7 +186,7 @@ public class EmailServiceManager {
|
|||
return Arrays.copyOfRange(messages,0,this.receiveNum-1);
|
||||
}
|
||||
}
|
||||
} catch (MessagingException e){
|
||||
} catch (Exception e){
|
||||
status = EmailLogManager.STATUS_ERROR;
|
||||
log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage());
|
||||
throw e;
|
||||
|
@ -609,6 +610,105 @@ public class EmailServiceManager {
|
|||
}
|
||||
}
|
||||
|
||||
/*public File downloadEmailToEmlDir(@NotNull Message message, Integer emailCounter, Integer batchesCounter) throws MessagingException, IOException {
|
||||
AtomicReference<FileOutputStream> outputStream = new AtomicReference<>();
|
||||
CompletableFuture<File> future = CompletableFuture.supplyAsync(() -> {
|
||||
try {
|
||||
lock.lock(); // 获取锁
|
||||
// 执行需要获取锁才能进行的操作
|
||||
// return executeWithLock(message, emailCounter, batchesCounter);
|
||||
File file = executeWithLock(message, emailCounter, batchesCounter);
|
||||
outputStream.set(new FileOutputStream(file));
|
||||
a(outputStream,message);
|
||||
return null;
|
||||
} catch (MessagingException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (FileNotFoundException e) {
|
||||
throw new RuntimeException(e);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
// 如果成功拿到锁 释放锁
|
||||
lock.unlock();
|
||||
}
|
||||
});
|
||||
|
||||
try {
|
||||
return future.get(5, TimeUnit.SECONDS);
|
||||
// return future.get(5, TimeUnit.SECONDS);// 等待任务执行,超过5秒则抛出TimeoutException
|
||||
} catch (InterruptedException | ExecutionException | TimeoutException e) {
|
||||
future.cancel(true); // 取消任务执行
|
||||
if (ObjectUtil.isNotNull(outputStream) && ObjectUtil.isNotNull(outputStream.get()))
|
||||
outputStream.get().close();
|
||||
log.error("下载 eml 执行超时", e);
|
||||
throw new RuntimeException("下载 eml 执行超时");
|
||||
}
|
||||
}*/
|
||||
public File executeWithLock(Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException {
|
||||
|
||||
String subject = "";
|
||||
File emlFile = null;
|
||||
String status = EmailLogManager.STATUS_SUCCESS;
|
||||
Date receivedDate = null;
|
||||
try {
|
||||
// 获取锁 设置超时
|
||||
//获取发件人
|
||||
final String address = ((InternetAddress) message.getFrom()[0]).getAddress();
|
||||
final String from = address.substring(0,address.indexOf(StringConstant.AT));
|
||||
//获取主题
|
||||
subject = MimeUtility.decodeText(message.getSubject());
|
||||
if(subject.contains(StringConstant.SLASH)){
|
||||
subject = StringUtils.replace(subject,StringConstant.SLASH,"");
|
||||
}
|
||||
if(subject.contains(StringConstant.COLON)){
|
||||
subject = StringUtils.replace(subject,StringConstant.COLON,"");
|
||||
}
|
||||
receivedDate = message.getReceivedDate();
|
||||
StringBuilder fileName = new StringBuilder();
|
||||
fileName.append(from);
|
||||
fileName.append(StringConstant.UNDER_LINE);
|
||||
fileName.append(subject);
|
||||
fileName.append(StringConstant.UNDER_LINE);
|
||||
fileName.append(DateUtils.formatDate(new Date(),"YYMMdd"));
|
||||
fileName.append(StringConstant.UNDER_LINE);
|
||||
fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS"));
|
||||
fileName.append(StringConstant.UNDER_LINE);
|
||||
fileName.append("receive");
|
||||
fileName.append(StringConstant.UNDER_LINE);
|
||||
fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd"));
|
||||
fileName.append(StringConstant.UNDER_LINE);
|
||||
fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS"));
|
||||
fileName.append(StringConstant.UNDER_LINE);
|
||||
fileName.append(emailCounter);
|
||||
fileName.append(SAVE_EML_SUFFIX);
|
||||
final String rootPath = spectrumPathProperties.getRootPath();
|
||||
final String emlPath = spectrumPathProperties.getEmlPath();
|
||||
emlFile = new File(rootPath+emlPath+File.separator+fileName);
|
||||
// Thread.sleep(6000l);
|
||||
// try(FileOutputStream outputStream = new FileOutputStream(emlFile)) {
|
||||
// message.writeTo(outputStream);
|
||||
// } catch (IOException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
} catch (MessagingException | IOException e) {
|
||||
// 下载邮件失败 抛出自定义邮件下载异常
|
||||
status = EmailLogManager.STATUS_ERROR;
|
||||
String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage());
|
||||
log.error(errorMsg);
|
||||
throw new DownloadEmailException(errorMsg);
|
||||
} catch (Exception e) {
|
||||
log.error("",e);
|
||||
} finally {
|
||||
EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"),
|
||||
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
|
||||
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
|
||||
}
|
||||
return emlFile;
|
||||
}
|
||||
|
||||
public void a(AtomicReference<FileOutputStream> outputStream, Message message) throws MessagingException, IOException {
|
||||
message.writeTo(outputStream.get());
|
||||
}
|
||||
/**
|
||||
* 删除邮件
|
||||
* @param message
|
||||
|
@ -622,11 +722,12 @@ public class EmailServiceManager {
|
|||
subject = MimeUtility.decodeText(message.getSubject());
|
||||
receivedDate = message.getReceivedDate();
|
||||
message.setFlag(Flags.Flag.DELETED,true);
|
||||
// log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
|
||||
// log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
|
||||
} catch (MessagingException | UnsupportedEncodingException e) {
|
||||
status = EmailLogManager.STATUS_ERROR;
|
||||
log.error("Email deletion failed, the subject of the email is :{}, the reason is :{}.",subject,e.getMessage());
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}finally {
|
||||
EmailLogEvent removeEvent = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.DELETEID,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"));
|
||||
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),removeEvent);
|
||||
|
@ -648,6 +749,7 @@ public class EmailServiceManager {
|
|||
if(null != store){
|
||||
store.close();
|
||||
}
|
||||
log.info(Thread.currentThread().getName() + ",EmailServiceManage资源关闭完成.");
|
||||
// for(String messageId : messageIds){
|
||||
// String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
||||
// redisUtil.del(key);
|
||||
|
@ -655,6 +757,7 @@ public class EmailServiceManager {
|
|||
} catch (MessagingException e) {
|
||||
log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage());
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -668,7 +771,7 @@ public class EmailServiceManager {
|
|||
try {
|
||||
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
||||
int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0;
|
||||
// exist = redisUtil.hasKey(key);
|
||||
// exist = redisUtil.hasKey(key);
|
||||
if(numberKey >= taskProperties.getForceDeletedNumber()){
|
||||
exist = true;
|
||||
log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
|
||||
|
|
|
@ -15,7 +15,6 @@ import org.jeecg.modules.spectrum.SpectrumServiceQuotes;
|
|||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
@ -120,6 +119,7 @@ public class AutoProcessManager{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ public class AutoProcessManager{
|
|||
//如果这时邮箱线程里已有执行的线程则设置停止标记
|
||||
if(emailExecThreadMap.containsKey(email.getId())){
|
||||
EmailParsingActuator actuator = emailExecThreadMap.get(email.getId());
|
||||
actuator.setStop(true);
|
||||
actuator.setThreadSleep(true);
|
||||
actuator.setStopTime(new Date());
|
||||
}
|
||||
log.info("{}邮箱测试连接失败,emailMap删除此邮箱数据,emailExecThreadMap设置线程停止标记",email.getUsername());
|
||||
|
@ -169,6 +169,7 @@ public class AutoProcessManager{
|
|||
//捕获异常不处理保障线程异常不退出
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
long sleepTime = taskProperties.getMonitoringMailCommStatusCycle() - (end-start);
|
||||
|
@ -178,6 +179,7 @@ public class AutoProcessManager{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -235,7 +237,7 @@ public class AutoProcessManager{
|
|||
if(testFlag){
|
||||
if (emailExecThreadMap.containsKey(databaseEmail.getId())) {
|
||||
EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId());
|
||||
actuator.setStop(false);
|
||||
actuator.setThreadSleep(false);
|
||||
log.info("{}邮箱重新加入监测队列",databaseEmail.getUsername());
|
||||
} else {
|
||||
databaseEmail.setNewEmailFlag(true);
|
||||
|
@ -251,6 +253,7 @@ public class AutoProcessManager{
|
|||
//捕获异常不处理保障线程异常不退出
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
long end = System.currentTimeMillis();
|
||||
long sleepTime = taskProperties.getMonitoringMailDataCycle() - (end-start);
|
||||
|
@ -260,6 +263,7 @@ public class AutoProcessManager{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -304,6 +308,7 @@ public class AutoProcessManager{
|
|||
}
|
||||
}catch (Exception e){
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}finally {
|
||||
if(emailExecThread.getState() != State.TERMINATED && emailExecThread.isStop()){
|
||||
final long nowTime = System.currentTimeMillis();
|
||||
|
@ -325,6 +330,7 @@ public class AutoProcessManager{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,6 +36,8 @@ public class EmailParsingActuator extends Thread{
|
|||
@Setter @Getter
|
||||
private boolean isStop;
|
||||
@Setter @Getter
|
||||
private boolean threadSleep;
|
||||
@Setter @Getter
|
||||
private Date stopTime;
|
||||
|
||||
public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes,
|
||||
|
@ -62,8 +64,17 @@ public class EmailParsingActuator extends Thread{
|
|||
@Override
|
||||
public void run() {
|
||||
for(;;){
|
||||
String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
|
||||
if (threadSleep) {
|
||||
log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is sleep!");
|
||||
try {
|
||||
Thread.sleep(1000L);
|
||||
} catch (InterruptedException e) {
|
||||
log.error("Thread sleep error");
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if (isStop) {
|
||||
String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
|
||||
log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!");
|
||||
closeResource();
|
||||
return;
|
||||
|
@ -75,18 +86,16 @@ public class EmailParsingActuator extends Thread{
|
|||
List<String> messageIds = new ArrayList<>();
|
||||
try {
|
||||
Message[] messages = emailServiceManager.receiveMail();
|
||||
log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), ArrayUtils.isEmpty(messages) ? 0 : messages.length);
|
||||
if(ArrayUtils.isNotEmpty(messages)){
|
||||
log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), messages.length);
|
||||
//检验获取的邮件是否在之前删除失败列表中,若在直接调用邮件API删除,并且此次数组里元素也删除
|
||||
for(int i=messages.length-1;i>=0;i--){
|
||||
if (null == messages[i].getHeader("Message-ID")) {
|
||||
System.out.println("Message ID是空值信息!!!!!!!");
|
||||
messages = ArrayUtils.remove(messages, i);
|
||||
continue;
|
||||
}
|
||||
if (!messages[i].isExpunged()){
|
||||
String messageId = ((MimeMessage) messages[i]).getMessageID();
|
||||
System.out.println("正常获取到的Message ID是:"+messageId);
|
||||
final boolean exist = emailServiceManager.check(messages[i],messageId);
|
||||
messageIds.add(messageId);
|
||||
if(exist){
|
||||
|
@ -106,15 +115,15 @@ public class EmailParsingActuator extends Thread{
|
|||
poolExecutor.execute(spectrumParsingActuator);
|
||||
}
|
||||
taskLatch.await();
|
||||
log.info("EmailParsingActuator本次{}封邮件处理完成", messages.length);
|
||||
}
|
||||
}
|
||||
} catch (MessagingException e) {
|
||||
System.out.println("捕获MessagingException!!!!!!!!");
|
||||
closeResource();
|
||||
throw new RuntimeException(e);
|
||||
} catch (Exception e) {
|
||||
log.error("EmailParsingActuator has exception: {}", e.getMessage());
|
||||
log.info("Mail-Parsing线程池资源关闭...");
|
||||
closeResource();
|
||||
log.error(""+e);
|
||||
log.error(e.getMessage(), e);
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
//清除本批次邮件日志缓存
|
||||
EmailLogManager.getInstance().clear();
|
||||
|
@ -134,6 +143,8 @@ public class EmailParsingActuator extends Thread{
|
|||
TimeUnit.MILLISECONDS.sleep(sleepTime);
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
log.error(e.getMessage(), e);
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -84,7 +84,8 @@ public class GardsSampleDataServiceImpl extends ServiceImpl<GardsSampleDataMappe
|
|||
LambdaQueryWrapper<GardsSampleData> queryWrapper = new LambdaQueryWrapper<>();
|
||||
queryWrapper.eq(GardsSampleData::getInputFileName,inputFileName);
|
||||
final GardsSampleData sampleData = this.getOne(queryWrapper);
|
||||
if(Objects.nonNull(sampleData) && !SampleStatus.COMPLETE.getValue().equals(sampleData.getStatus())){
|
||||
if(Objects.nonNull(sampleData) && !SampleStatus.COMPLETE.getValue().equals(sampleData.getStatus())
|
||||
&& !SampleStatus.INTERACTIVE.getValue().equals(sampleData.getStatus())){
|
||||
this.baseMapper.updateStatus(status,inputFileName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -316,7 +316,7 @@ public abstract class AbstractSpectrumHandler extends AbstractChain {
|
|||
ex.printStackTrace();
|
||||
}
|
||||
} else if(SpectrumSource.FROM_FILE_SOURCE.getSourceType().equals(spectrumSource) && (e instanceof FileRepeatException)){
|
||||
this.spectrumFile.delete();
|
||||
//this.spectrumFile.delete(); // TODO 删除原始谱文件
|
||||
} else if (SpectrumSource.FORM_FILE_UNDEL.getSourceType().equals(spectrumSource) && !(e instanceof FileRepeatException)) {
|
||||
try {
|
||||
if (isDateFormatErr) {
|
||||
|
|
Loading…
Reference in New Issue
Block a user