Compare commits

...

5 Commits

Author SHA1 Message Date
nieziyan
897f469ea9 fix:增加谱文件处理状态判断 2024-06-06 15:42:06 +08:00
nieziyan
3d5080c1d8 fix:AutoProcess多线程问题 2024-06-05 09:42:20 +08:00
nieziyan
97f4aae3ab fix:自动处理debug 2024-06-04 09:23:21 +08:00
orgin
11681f12db 连接超时之后不中断线程 2024-05-31 09:15:57 +08:00
orgin
9151233f11 连接超时之后不中断线程 2024-05-30 16:48:46 +08:00
5 changed files with 130 additions and 20 deletions

View File

@ -1,16 +1,13 @@
package org.jeecg.common.email;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.ArrayUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.RandomUtil;
import cn.hutool.core.util.StrUtil;
import com.google.common.collect.Lists;
import com.sun.mail.imap.IMAPStore;
import com.sun.mail.smtp.SMTPAddressFailedException;
import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.constant.RedisConstant;
@ -21,7 +18,6 @@ import org.jeecg.common.exception.DownloadEmailException;
import org.jeecg.common.properties.SpectrumPathProperties;
import org.jeecg.common.properties.TaskProperties;
import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.Md5Util;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.entity.postgre.SysEmail;
import org.jetbrains.annotations.NotNull;
@ -36,6 +32,9 @@ import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
/**
@ -70,6 +69,8 @@ public class EmailServiceManager {
private Object downloadEmlLocal = new Object();
private final ReentrantLock lock = new ReentrantLock();
@NotNull
public static EmailServiceManager getInstance(){
return new EmailServiceManager();
@ -609,6 +610,105 @@ public class EmailServiceManager {
}
}
/*public File downloadEmailToEmlDir(@NotNull Message message, Integer emailCounter, Integer batchesCounter) throws MessagingException, IOException {
AtomicReference<FileOutputStream> outputStream = new AtomicReference<>();
CompletableFuture<File> future = CompletableFuture.supplyAsync(() -> {
try {
lock.lock(); // 获取锁
// 执行需要获取锁才能进行的操作
// return executeWithLock(message, emailCounter, batchesCounter);
File file = executeWithLock(message, emailCounter, batchesCounter);
outputStream.set(new FileOutputStream(file));
a(outputStream,message);
return null;
} catch (MessagingException e) {
throw new RuntimeException(e);
} catch (FileNotFoundException e) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
// 如果成功拿到锁 释放锁
lock.unlock();
}
});
try {
return future.get(5, TimeUnit.SECONDS);
// return future.get(5, TimeUnit.SECONDS);// 等待任务执行超过5秒则抛出TimeoutException
} catch (InterruptedException | ExecutionException | TimeoutException e) {
future.cancel(true); // 取消任务执行
if (ObjectUtil.isNotNull(outputStream) && ObjectUtil.isNotNull(outputStream.get()))
outputStream.get().close();
log.error("下载 eml 执行超时", e);
throw new RuntimeException("下载 eml 执行超时");
}
}*/
public File executeWithLock(Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException {
String subject = "";
File emlFile = null;
String status = EmailLogManager.STATUS_SUCCESS;
Date receivedDate = null;
try {
// 获取锁 设置超时
//获取发件人
final String address = ((InternetAddress) message.getFrom()[0]).getAddress();
final String from = address.substring(0,address.indexOf(StringConstant.AT));
//获取主题
subject = MimeUtility.decodeText(message.getSubject());
if(subject.contains(StringConstant.SLASH)){
subject = StringUtils.replace(subject,StringConstant.SLASH,"");
}
if(subject.contains(StringConstant.COLON)){
subject = StringUtils.replace(subject,StringConstant.COLON,"");
}
receivedDate = message.getReceivedDate();
StringBuilder fileName = new StringBuilder();
fileName.append(from);
fileName.append(StringConstant.UNDER_LINE);
fileName.append(subject);
fileName.append(StringConstant.UNDER_LINE);
fileName.append(DateUtils.formatDate(new Date(),"YYMMdd"));
fileName.append(StringConstant.UNDER_LINE);
fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS"));
fileName.append(StringConstant.UNDER_LINE);
fileName.append("receive");
fileName.append(StringConstant.UNDER_LINE);
fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd"));
fileName.append(StringConstant.UNDER_LINE);
fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS"));
fileName.append(StringConstant.UNDER_LINE);
fileName.append(emailCounter);
fileName.append(SAVE_EML_SUFFIX);
final String rootPath = spectrumPathProperties.getRootPath();
final String emlPath = spectrumPathProperties.getEmlPath();
emlFile = new File(rootPath+emlPath+File.separator+fileName);
// Thread.sleep(6000l);
// try(FileOutputStream outputStream = new FileOutputStream(emlFile)) {
// message.writeTo(outputStream);
// } catch (IOException e) {
// throw new RuntimeException(e);
// }
} catch (MessagingException | IOException e) {
// 下载邮件失败 抛出自定义邮件下载异常
status = EmailLogManager.STATUS_ERROR;
String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage());
log.error(errorMsg);
throw new DownloadEmailException(errorMsg);
} catch (Exception e) {
log.error("",e);
} finally {
EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"),
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
}
return emlFile;
}
public void a(AtomicReference<FileOutputStream> outputStream, Message message) throws MessagingException, IOException {
message.writeTo(outputStream.get());
}
/**
* 删除邮件
* @param message
@ -622,7 +722,7 @@ public class EmailServiceManager {
subject = MimeUtility.decodeText(message.getSubject());
receivedDate = message.getReceivedDate();
message.setFlag(Flags.Flag.DELETED,true);
// log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
// log.info("EmailServiceManager: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
} catch (MessagingException | UnsupportedEncodingException e) {
status = EmailLogManager.STATUS_ERROR;
log.error("Email deletion failed, the subject of the email is :{}, the reason is :{}.",subject,e.getMessage());
@ -648,6 +748,7 @@ public class EmailServiceManager {
if(null != store){
store.close();
}
log.info("EmailServiceManage资源关闭完成.");
// for(String messageId : messageIds){
// String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
// redisUtil.del(key);
@ -668,7 +769,7 @@ public class EmailServiceManager {
try {
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0;
// exist = redisUtil.hasKey(key);
// exist = redisUtil.hasKey(key);
if(numberKey >= taskProperties.getForceDeletedNumber()){
exist = true;
log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));

View File

@ -1,5 +1,6 @@
package org.jeecg.modules;
import cn.hutool.core.util.RandomUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.RedisConstant;
@ -15,7 +16,6 @@ import org.jeecg.modules.spectrum.SpectrumServiceQuotes;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import java.io.*;
import java.util.*;
import java.util.concurrent.TimeUnit;
@ -158,7 +158,7 @@ public class AutoProcessManager{
//如果这时邮箱线程里已有执行的线程则设置停止标记
if(emailExecThreadMap.containsKey(email.getId())){
EmailParsingActuator actuator = emailExecThreadMap.get(email.getId());
actuator.setStop(true);
actuator.setThreadSleep(true);
actuator.setStopTime(new Date());
}
log.info("{}邮箱测试连接失败emailMap删除此邮箱数据emailExecThreadMap设置线程停止标记",email.getUsername());
@ -235,7 +235,7 @@ public class AutoProcessManager{
if(testFlag){
if (emailExecThreadMap.containsKey(databaseEmail.getId())) {
EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId());
actuator.setStop(false);
actuator.setThreadSleep(false);
log.info("{}邮箱重新加入监测队列",databaseEmail.getUsername());
} else {
databaseEmail.setNewEmailFlag(true);

View File

@ -36,6 +36,8 @@ public class EmailParsingActuator extends Thread{
@Setter @Getter
private boolean isStop;
@Setter @Getter
private boolean threadSleep;
@Setter @Getter
private Date stopTime;
public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes,
@ -62,8 +64,17 @@ public class EmailParsingActuator extends Thread{
@Override
public void run() {
for(;;){
String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
if (threadSleep) {
log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is sleep!");
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
log.error("Thread sleep error");
}
continue;
}
if (isStop) {
String nowDate = DateUtils.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
log.info(nowDate + " " +this.emailProperties.getName()+" EmailParsingActuator is Stop!");
closeResource();
return;
@ -80,13 +91,11 @@ public class EmailParsingActuator extends Thread{
//检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除
for(int i=messages.length-1;i>=0;i--){
if (null == messages[i].getHeader("Message-ID")) {
System.out.println("Message ID是空值信息");
messages = ArrayUtils.remove(messages, i);
continue;
}
if (!messages[i].isExpunged()){
String messageId = ((MimeMessage) messages[i]).getMessageID();
System.out.println("正常获取到的Message ID是"+messageId);
final boolean exist = emailServiceManager.check(messages[i],messageId);
messageIds.add(messageId);
if(exist){
@ -106,15 +115,14 @@ public class EmailParsingActuator extends Thread{
poolExecutor.execute(spectrumParsingActuator);
}
taskLatch.await();
log.info("EmailParsingActuator本次{}封邮件处理完成", messages.length);
}
}
} catch (MessagingException e) {
System.out.println("捕获MessagingException");
} catch (Exception e) {
log.error("EmailParsingActuator has exception: {}", e.getMessage());
log.info("Mail-Parsing线程池资源关闭...");
closeResource();
throw new RuntimeException(e);
} catch (Exception e) {
closeResource();
log.error(""+e);
} finally {
//清除本批次邮件日志缓存
EmailLogManager.getInstance().clear();

View File

@ -84,7 +84,8 @@ public class GardsSampleDataServiceImpl extends ServiceImpl<GardsSampleDataMappe
LambdaQueryWrapper<GardsSampleData> queryWrapper = new LambdaQueryWrapper<>();
queryWrapper.eq(GardsSampleData::getInputFileName,inputFileName);
final GardsSampleData sampleData = this.getOne(queryWrapper);
if(Objects.nonNull(sampleData) && !SampleStatus.COMPLETE.getValue().equals(sampleData.getStatus())){
if(Objects.nonNull(sampleData) && !SampleStatus.COMPLETE.getValue().equals(sampleData.getStatus())
&& !SampleStatus.INTERACTIVE.getValue().equals(sampleData.getStatus())){
this.baseMapper.updateStatus(status,inputFileName);
}
}

View File

@ -316,7 +316,7 @@ public abstract class AbstractSpectrumHandler extends AbstractChain {
ex.printStackTrace();
}
} else if(SpectrumSource.FROM_FILE_SOURCE.getSourceType().equals(spectrumSource) && (e instanceof FileRepeatException)){
this.spectrumFile.delete();
this.spectrumFile.delete(); // TODO 删除原始谱文件
} else if (SpectrumSource.FORM_FILE_UNDEL.getSourceType().equals(spectrumSource) && !(e instanceof FileRepeatException)) {
try {
if (isDateFormatErr) {