上传测试代码到新的分支onlyDownload,测试下载邮件问题
This commit is contained in:
parent
b0529f0f3d
commit
df16ed12a8
|
@ -66,6 +66,8 @@ public class EmailServiceManager {
|
|||
|
||||
private RedisUtil redisUtil;
|
||||
|
||||
private Object receive;
|
||||
|
||||
private Object downloadEmlLocal = new Object();
|
||||
|
||||
@NotNull
|
||||
|
@ -87,7 +89,7 @@ public class EmailServiceManager {
|
|||
*/
|
||||
public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath,
|
||||
Date systemStartupTime, SpectrumPathProperties pathProperties,TaskProperties taskProperties,
|
||||
RedisUtil redisUtil){
|
||||
RedisUtil redisUtil, Object receive){
|
||||
this.email = email;
|
||||
this.receiveNum = receiveNum;
|
||||
this.temporaryStoragePath = temporaryStoragePath;
|
||||
|
@ -95,6 +97,7 @@ public class EmailServiceManager {
|
|||
this.spectrumPathProperties = pathProperties;
|
||||
this.taskProperties = taskProperties;
|
||||
this.redisUtil = redisUtil;
|
||||
this.receive = receive;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -125,77 +128,79 @@ public class EmailServiceManager {
|
|||
* 接收邮件
|
||||
*/
|
||||
public Message[] receiveMail() {
|
||||
String status = EmailLogManager.STATUS_SUCCESS;
|
||||
try{
|
||||
//配置邮件服务属性
|
||||
Properties properties = new Properties();
|
||||
properties.put("mail.store.protocol", "imap");
|
||||
properties.put("mail.imap.host", email.getEmailServerAddress());
|
||||
properties.put("mail.imap.port",email.getPort());
|
||||
if (email.getIsQiye() == 1) {
|
||||
properties.put("mail.imap.ssl.enable", "true");
|
||||
} else {
|
||||
properties.put("mail.imap.ssl.enable", "false");
|
||||
}
|
||||
|
||||
HashMap IAM = new HashMap();
|
||||
//带上IMAP ID信息,由key和value组成,例如name,version,vendor,support-email等。
|
||||
IAM.put("name","myname");
|
||||
IAM.put("version","1.0.0");
|
||||
IAM.put("vendor","myclient");
|
||||
IAM.put("support-email","testmail@test.com");
|
||||
|
||||
//获取邮件回话
|
||||
final Session session = Session.getDefaultInstance(properties);
|
||||
|
||||
|
||||
//获取smtp协议的存储对象
|
||||
store = (IMAPStore) session.getStore();
|
||||
//连接
|
||||
store.connect(email.getUsername(),email.getPassword());
|
||||
// 解决163普通邮箱无法建立连接问题
|
||||
store.id(IAM);
|
||||
//获取收件箱
|
||||
folder = store.getFolder("INBOX");//INBOX
|
||||
folder.open(Folder.READ_WRITE);
|
||||
//如果邮箱邮件数量 > 0
|
||||
final int messageCount = folder.getMessageCount();
|
||||
if(messageCount > 0){
|
||||
Message[] messages = null;
|
||||
if(Objects.isNull(this.systemStartupTime)){
|
||||
int finalNum = messageCount > this.receiveNum?this.receiveNum:messageCount;
|
||||
//邮箱邮件下标是从1开始的
|
||||
return folder.getMessages(1,finalNum);
|
||||
synchronized (receive) {
|
||||
String status = EmailLogManager.STATUS_SUCCESS;
|
||||
try{
|
||||
//配置邮件服务属性
|
||||
Properties properties = new Properties();
|
||||
properties.put("mail.store.protocol", "imap");
|
||||
properties.put("mail.imap.host", email.getEmailServerAddress());
|
||||
properties.put("mail.imap.port",email.getPort());
|
||||
if (email.getIsQiye() == 1) {
|
||||
properties.put("mail.imap.ssl.enable", "true");
|
||||
} else {
|
||||
properties.put("mail.imap.ssl.enable", "false");
|
||||
}
|
||||
SearchTerm searchTerm = new ReceivedDateTerm(ComparisonTerm.GE,this.systemStartupTime);
|
||||
messages = folder.search(searchTerm);
|
||||
Arrays.sort(messages, (o1, o2) -> {
|
||||
try {
|
||||
return o1.getReceivedDate().compareTo(o2.getReceivedDate());
|
||||
} catch (MessagingException e) {
|
||||
e.printStackTrace();
|
||||
|
||||
HashMap IAM = new HashMap();
|
||||
//带上IMAP ID信息,由key和value组成,例如name,version,vendor,support-email等。
|
||||
IAM.put("name","myname");
|
||||
IAM.put("version","1.0.0");
|
||||
IAM.put("vendor","myclient");
|
||||
IAM.put("support-email","testmail@test.com");
|
||||
|
||||
//获取邮件回话
|
||||
final Session session = Session.getDefaultInstance(properties);
|
||||
|
||||
|
||||
//获取smtp协议的存储对象
|
||||
store = (IMAPStore) session.getStore();
|
||||
//连接
|
||||
store.connect(email.getUsername(),email.getPassword());
|
||||
// 解决163普通邮箱无法建立连接问题
|
||||
store.id(IAM);
|
||||
//获取收件箱
|
||||
folder = store.getFolder("INBOX");//INBOX
|
||||
folder.open(Folder.READ_WRITE);
|
||||
//如果邮箱邮件数量 > 0
|
||||
final int messageCount = folder.getMessageCount();
|
||||
if(messageCount > 0){
|
||||
Message[] messages = null;
|
||||
if(Objects.isNull(this.systemStartupTime)){
|
||||
int finalNum = messageCount > this.receiveNum?this.receiveNum:messageCount;
|
||||
//邮箱邮件下标是从1开始的
|
||||
return folder.getMessages(1,finalNum);
|
||||
}
|
||||
SearchTerm searchTerm = new ReceivedDateTerm(ComparisonTerm.GE,this.systemStartupTime);
|
||||
messages = folder.search(searchTerm);
|
||||
Arrays.sort(messages, (o1, o2) -> {
|
||||
try {
|
||||
return o1.getReceivedDate().compareTo(o2.getReceivedDate());
|
||||
} catch (MessagingException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
if(this.receiveNum >= messages.length){
|
||||
return messages;
|
||||
}else{
|
||||
return Arrays.copyOfRange(messages,0,this.receiveNum-1);
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
if(this.receiveNum >= messages.length){
|
||||
return messages;
|
||||
}else{
|
||||
return Arrays.copyOfRange(messages,0,this.receiveNum-1);
|
||||
}
|
||||
}catch (MessagingException e){
|
||||
status = EmailLogManager.STATUS_ERROR;
|
||||
log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage());
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}finally {
|
||||
EmailLogEvent connectEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,email,status,EmailLogManager.CONNECT);
|
||||
EmailLogManager.getInstance().setConnectLogEvent(connectEvent);
|
||||
//GetAllId C++原业务是把远程邮箱邮件同步到C++,本次java编写没有这一步,所以和Connect绑定,若Connect成功则GetAllId成功
|
||||
EmailLogEvent getAllEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETALLID);
|
||||
EmailLogManager.getInstance().setGetAllIdLogEvent(getAllEvent);
|
||||
}
|
||||
}catch (MessagingException e){
|
||||
status = EmailLogManager.STATUS_ERROR;
|
||||
log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage());
|
||||
e.printStackTrace();
|
||||
return null;
|
||||
}finally {
|
||||
EmailLogEvent connectEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,email,status,EmailLogManager.CONNECT);
|
||||
EmailLogManager.getInstance().setConnectLogEvent(connectEvent);
|
||||
//GetAllId C++原业务是把远程邮箱邮件同步到C++,本次java编写没有这一步,所以和Connect绑定,若Connect成功则GetAllId成功
|
||||
EmailLogEvent getAllEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETALLID);
|
||||
EmailLogManager.getInstance().setGetAllIdLogEvent(getAllEvent);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -532,7 +537,7 @@ public class EmailServiceManager {
|
|||
File emlFile = null;
|
||||
String status = EmailLogManager.STATUS_SUCCESS;
|
||||
Date receivedDate = null;
|
||||
FileOutputStream outputStream = null;
|
||||
BufferedOutputStream outputStream = null;
|
||||
try {
|
||||
//获取发件人
|
||||
final String address = ((InternetAddress) message.getFrom()[0]).getAddress();
|
||||
|
@ -566,9 +571,10 @@ public class EmailServiceManager {
|
|||
final String rootPath = spectrumPathProperties.getRootPath();
|
||||
final String emlPath = spectrumPathProperties.getEmlPath();
|
||||
emlFile = new File(rootPath+emlPath+File.separator+fileName);
|
||||
outputStream = new FileOutputStream(emlFile);
|
||||
outputStream = new BufferedOutputStream(new FileOutputStream(emlFile));
|
||||
message.writeTo(outputStream);
|
||||
|
||||
// 刷新缓冲区内容存入内存
|
||||
outputStream.flush();
|
||||
// int bufferSize = 1024 * 1024; // 1M
|
||||
// InputStream inputStream = message.getInputStream();
|
||||
// BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize);
|
||||
|
|
|
@ -37,6 +37,10 @@ public class AutoProcessManager{
|
|||
* 邮件Map数据锁
|
||||
*/
|
||||
private final Object lock = new Object();
|
||||
/**
|
||||
* 邮箱接收邮件锁
|
||||
*/
|
||||
private final Object receive = new Object();
|
||||
/**
|
||||
* 以邮件Id为key,邮件信息为value
|
||||
*/
|
||||
|
@ -99,7 +103,7 @@ public class AutoProcessManager{
|
|||
boolean testFlag = emailServiceManager.testConnectEmailServer();
|
||||
if(testFlag){
|
||||
EmailParsingActuator emailParsingActuator = new EmailParsingActuator();
|
||||
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime);
|
||||
emailParsingActuator.init(next,spectrumServiceQuotes,emailCounter,systemStartupTime, receive);
|
||||
emailParsingActuator.setName(next.getUsername()+"-email-monitor");
|
||||
emailParsingActuator.start();
|
||||
//把邮件监测执行线程加入管理队列
|
||||
|
|
|
@ -36,18 +36,20 @@ public class EmailParsingActuator extends Thread{
|
|||
private SpectrumServiceQuotes spectrumServiceQuotes;
|
||||
private EmailCounter emailCounter;
|
||||
private Date systemStartupTime;
|
||||
private Object receive;
|
||||
@Setter @Getter
|
||||
private boolean isStop;
|
||||
@Setter @Getter
|
||||
private Date stopTime;
|
||||
|
||||
public void init(EmailProperties emailProperties,SpectrumServiceQuotes spectrumServiceQuotes,
|
||||
EmailCounter emailCounter,Date systemStartupTime){
|
||||
EmailCounter emailCounter,Date systemStartupTime, Object receive){
|
||||
this.emailProperties = emailProperties;
|
||||
this.spectrumServiceQuotes = spectrumServiceQuotes;
|
||||
this.taskProperties = spectrumServiceQuotes.getTaskProperties();
|
||||
this.emailCounter = emailCounter;
|
||||
this.systemStartupTime = systemStartupTime;
|
||||
this.receive = receive;
|
||||
|
||||
//获取机器可用核心数
|
||||
int systemCores = spectrumServiceQuotes.getMaximumPoolSizeProperties().getAuto();
|
||||
|
@ -74,7 +76,7 @@ public class EmailParsingActuator extends Thread{
|
|||
long start = System.currentTimeMillis();
|
||||
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
|
||||
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),
|
||||
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil());
|
||||
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil(), receive);
|
||||
List<String> messageIds = new ArrayList<>();
|
||||
try {
|
||||
Message[] messages = emailServiceManager.receiveMail();
|
||||
|
@ -84,11 +86,11 @@ public class EmailParsingActuator extends Thread{
|
|||
for(int i=messages.length-1;i>=0;i--){
|
||||
if (!messages[i].isExpunged()){
|
||||
String messageId = ((MimeMessage) messages[i]).getMessageID();
|
||||
final boolean exist = emailServiceManager.check(messages[i],messageId);
|
||||
// final boolean exist = emailServiceManager.check(messages[i],messageId);
|
||||
messageIds.add(messageId);
|
||||
if(exist){
|
||||
messages = ArrayUtils.remove(messages,i);
|
||||
}
|
||||
// if(exist){
|
||||
// messages = ArrayUtils.remove(messages,i);
|
||||
// }
|
||||
}
|
||||
}
|
||||
log.info("EmailParsingActuator本次真实执行邮件数量为:{}",messages.length);
|
||||
|
|
|
@ -110,47 +110,47 @@ public class SpectrumParsingActuator implements Runnable{
|
|||
//保存邮件日志到PG数据库
|
||||
this.spectrumServiceQuotes.getMailLogService().create(message,emailProperties);
|
||||
|
||||
//获取邮件内容
|
||||
StringBuilder mailContent = new StringBuilder();
|
||||
if(Objects.nonNull(emlFile) && emlFile.length() > 0){
|
||||
mailContent.append(FileUtil.readUtf8String(emlFile));
|
||||
}
|
||||
|
||||
//判断是否是IMS2.0协议文件
|
||||
// 如果邮件内容校验成功 将文件保存到eml目录 并删除邮件对象
|
||||
if(checkMailContent(mailContent,subject)){
|
||||
AbstractSpectrumHandler spectrumHandler = new SamplephdSpectrum();
|
||||
spectrumHandler.init(mailContent.toString(),emlFile.getName(),spectrumServiceQuotes,new StringBuilder(),SpectrumSource.FORM_EMAIL_SERVICE.getSourceType(),batchesCounter);
|
||||
final boolean matchResult = spectrumHandler.saveEmailToLocal();
|
||||
if(matchResult){
|
||||
try {
|
||||
//开始解析
|
||||
spectrumHandler.handler();
|
||||
spectrumServiceQuotes.getRedisUtil().del(key);
|
||||
} catch (Exception e) {
|
||||
//如果是gamma谱的分析异常
|
||||
if (e instanceof AnalySpectrumException) {
|
||||
// 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError
|
||||
if (Objects.nonNull(emlFile) && emlFile.exists()){
|
||||
moveEmail(emlFile, key);
|
||||
}
|
||||
//删除邮件
|
||||
emailServiceManager.removeMail(message,batchesCounter);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
}else{
|
||||
log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject);
|
||||
}
|
||||
emailServiceManager.removeMail(message,batchesCounter);
|
||||
}else {
|
||||
// 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError
|
||||
if (Objects.nonNull(emlFile) && emlFile.exists()){
|
||||
moveEmail(emlFile, key);
|
||||
throw new DownloadEmailException("邮件移走后手动抛出DownloadEmailException");
|
||||
}
|
||||
}
|
||||
// //获取邮件内容
|
||||
// StringBuilder mailContent = new StringBuilder();
|
||||
// if(Objects.nonNull(emlFile) && emlFile.length() > 0){
|
||||
// mailContent.append(FileUtil.readUtf8String(emlFile));
|
||||
// }
|
||||
//
|
||||
// //判断是否是IMS2.0协议文件
|
||||
// // 如果邮件内容校验成功 将文件保存到eml目录 并删除邮件对象
|
||||
// if(checkMailContent(mailContent,subject)){
|
||||
// AbstractSpectrumHandler spectrumHandler = new SamplephdSpectrum();
|
||||
// spectrumHandler.init(mailContent.toString(),emlFile.getName(),spectrumServiceQuotes,new StringBuilder(),SpectrumSource.FORM_EMAIL_SERVICE.getSourceType(),batchesCounter);
|
||||
// final boolean matchResult = spectrumHandler.saveEmailToLocal();
|
||||
// if(matchResult){
|
||||
// try {
|
||||
// //开始解析
|
||||
// spectrumHandler.handler();
|
||||
// spectrumServiceQuotes.getRedisUtil().del(key);
|
||||
// } catch (Exception e) {
|
||||
// //如果是gamma谱的分析异常
|
||||
// if (e instanceof AnalySpectrumException) {
|
||||
// // 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError
|
||||
// if (Objects.nonNull(emlFile) && emlFile.exists()){
|
||||
// moveEmail(emlFile, key);
|
||||
// }
|
||||
// //删除邮件
|
||||
// emailServiceManager.removeMail(message,batchesCounter);
|
||||
// } else {
|
||||
// throw e;
|
||||
// }
|
||||
// }
|
||||
// }else{
|
||||
// log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject);
|
||||
// }
|
||||
// emailServiceManager.removeMail(message,batchesCounter);
|
||||
// }else {
|
||||
// // 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError
|
||||
// if (Objects.nonNull(emlFile) && emlFile.exists()){
|
||||
// moveEmail(emlFile, key);
|
||||
// throw new DownloadEmailException("邮件移走后手动抛出DownloadEmailException");
|
||||
// }
|
||||
// }
|
||||
} catch (Exception e) {
|
||||
// 如果不是下载导致的失败 并且 下载成功,则删除下载的邮件对象
|
||||
if(!(e instanceof DownloadEmailException) && downloadFlag){
|
||||
|
|
|
@ -93,8 +93,8 @@ public class JeecgAutoProcessApplication extends SpringBootServletInitializer im
|
|||
//校验存储目录是否存在,不存在则创建,存在无操作
|
||||
checkStorageDirectory();
|
||||
autoProcessManager.start(systemStartupTime);
|
||||
undealHandleManager.start();
|
||||
fileSourceHandleManager.start();
|
||||
// undealHandleManager.start();
|
||||
// fileSourceHandleManager.start();
|
||||
// 删除过期的文件
|
||||
delFileManager.start();
|
||||
//统计报告执行线程
|
||||
|
|
Loading…
Reference in New Issue
Block a user