自建台站分支同步代码

This commit is contained in:
qiaoqinzheng 2024-04-30 11:51:54 +08:00
parent 3a1567f853
commit 600c5235ec
6 changed files with 128 additions and 122 deletions

View File

@ -20,6 +20,7 @@ import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.email.emuns.MailContentType; import org.jeecg.common.email.emuns.MailContentType;
import org.jeecg.common.exception.DownloadEmailException; import org.jeecg.common.exception.DownloadEmailException;
import org.jeecg.common.properties.SpectrumPathProperties; import org.jeecg.common.properties.SpectrumPathProperties;
import org.jeecg.common.properties.TaskProperties;
import org.jeecg.common.util.DateUtils; import org.jeecg.common.util.DateUtils;
import org.jeecg.common.util.Md5Util; import org.jeecg.common.util.Md5Util;
import org.jeecg.common.util.RedisUtil; import org.jeecg.common.util.RedisUtil;
@ -54,6 +55,8 @@ public class EmailServiceManager {
private SysEmail email; private SysEmail email;
private SpectrumPathProperties spectrumPathProperties; private SpectrumPathProperties spectrumPathProperties;
private TaskProperties taskProperties;
/** /**
* 系统启动时间 * 系统启动时间
*/ */
@ -69,6 +72,8 @@ public class EmailServiceManager {
private RedisUtil redisUtil; private RedisUtil redisUtil;
private Object downloadEmlLocal = new Object();
@NotNull @NotNull
public static EmailServiceManager getInstance(){ public static EmailServiceManager getInstance(){
return new EmailServiceManager(); return new EmailServiceManager();
@ -87,13 +92,14 @@ public class EmailServiceManager {
* @param email 邮件属性 * @param email 邮件属性
*/ */
public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath, public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath,
Date systemStartupTime, SpectrumPathProperties pathProperties, Date systemStartupTime, SpectrumPathProperties pathProperties,TaskProperties taskProperties,
RedisUtil redisUtil){ RedisUtil redisUtil){
this.email = email; this.email = email;
this.receiveNum = receiveNum; this.receiveNum = receiveNum;
this.temporaryStoragePath = temporaryStoragePath; this.temporaryStoragePath = temporaryStoragePath;
this.systemStartupTime = systemStartupTime; this.systemStartupTime = systemStartupTime;
this.spectrumPathProperties = pathProperties; this.spectrumPathProperties = pathProperties;
this.taskProperties = taskProperties;
this.redisUtil = redisUtil; this.redisUtil = redisUtil;
} }
@ -528,105 +534,85 @@ public class EmailServiceManager {
* 当计数大于10000后从0开始服务重启后也从0开始 * 当计数大于10000后从0开始服务重启后也从0开始
*/ */
public File downloadEmailToEmlDir(@NotNull Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException { public File downloadEmailToEmlDir(@NotNull Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException {
String subject = ""; synchronized (downloadEmlLocal) {
File emlFile = null; String subject = "";
String status = EmailLogManager.STATUS_SUCCESS; File emlFile = null;
Date receivedDate = null; String status = EmailLogManager.STATUS_SUCCESS;
try { Date receivedDate = null;
//获取发件人 InputStream inputStream = null;
final String address = ((InternetAddress) message.getFrom()[0]).getAddress(); BufferedOutputStream outputStream = null;
final String from = address.substring(0,address.indexOf(StringConstant.AT)); try {
//获取主题 //获取发件人
subject = MimeUtility.decodeText(message.getSubject()); final String address = ((InternetAddress) message.getFrom()[0]).getAddress();
if(subject.contains(StringConstant.SLASH)){ final String from = address.substring(0,address.indexOf(StringConstant.AT));
subject = StringUtils.replace(subject,StringConstant.SLASH,""); //获取主题
} subject = MimeUtility.decodeText(message.getSubject());
if(subject.contains(StringConstant.COLON)){ if(subject.indexOf(StringConstant.SLASH) != -1){
subject = StringUtils.replace(subject,StringConstant.COLON,""); subject = StringUtils.replace(subject,StringConstant.SLASH,"");
} }
receivedDate = message.getReceivedDate(); if(subject.indexOf(StringConstant.COLON) != -1){
StringBuilder fileName = new StringBuilder(); subject = StringUtils.replace(subject,StringConstant.COLON,"");
fileName.append(from); }
fileName.append(StringConstant.UNDER_LINE); receivedDate = message.getReceivedDate();
fileName.append(subject); StringBuilder fileName = new StringBuilder();
fileName.append(StringConstant.UNDER_LINE); fileName.append(from);
fileName.append(DateUtils.formatDate(new Date(),"YYMMdd")); fileName.append(StringConstant.UNDER_LINE);
fileName.append(StringConstant.UNDER_LINE); fileName.append(subject);
fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS")); fileName.append(StringConstant.UNDER_LINE);
fileName.append(StringConstant.UNDER_LINE); fileName.append(DateUtils.formatDate(new Date(),"YYMMdd"));
fileName.append("receive"); fileName.append(StringConstant.UNDER_LINE);
fileName.append(StringConstant.UNDER_LINE); fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS"));
fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd")); fileName.append(StringConstant.UNDER_LINE);
fileName.append(StringConstant.UNDER_LINE); fileName.append("receive");
fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS")); fileName.append(StringConstant.UNDER_LINE);
fileName.append(StringConstant.UNDER_LINE); fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd"));
fileName.append(emailCounter); fileName.append(StringConstant.UNDER_LINE);
fileName.append(SAVE_EML_SUFFIX); fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS"));
final String rootPath = spectrumPathProperties.getRootPath(); fileName.append(StringConstant.UNDER_LINE);
final String emlPath = spectrumPathProperties.getEmlPath(); fileName.append(emailCounter);
emlFile = new File(rootPath + emlPath + File.separator + fileName); fileName.append(SAVE_EML_SUFFIX);
/* 如果邮件内容经过Base64编码 需要解码后再生成.eml文件 否则直接生成.eml文件 */ final String rootPath = spectrumPathProperties.getRootPath();
final String BASE64_FLAG = "Base64"; final String emlPath = spectrumPathProperties.getEmlPath();
String content = (String) message.getContent(); emlFile = new File(rootPath+emlPath+File.separator+fileName);
// 将正文内容分割为两部分: Base64 MD5|压缩前大小|压缩后大小|正文Base64编码 // outputStream = new FileOutputStream(emlFile);
List<String> contents = StrUtil.split(content, '|', 2); // message.writeTo(outputStream);
if (StrUtil.equals(contents.get(0), BASE64_FLAG)){
// 先将未解码的内容保存为.eml文件 int bufferSize = 1024 * 1024; // 1M
FileUtil.writeFromStream(message.getInputStream(), emlFile); inputStream = message.getInputStream();
// content: MD5|压缩前大小|压缩后大小|正文Base64编码 outputStream = new BufferedOutputStream(new FileOutputStream(emlFile), bufferSize);
content = contents.get(1); // 从邮件的输入流读取内容并写入到本地文件
// 将content分割为: MD5 压缩前大小|压缩后大小|正文Base64编码 byte[] buffer = new byte[bufferSize];
contents = StrUtil.split(content, '|', 2); int bytesRead;
String md5 = contents.get(0); while ((bytesRead = inputStream.read(buffer)) != -1) {
content = contents.get(1); outputStream.write(buffer, 0, bytesRead);
if (StrUtil.isBlank(content)) return emlFile; }
content = StrUtil.cleanBlank(content);
String md5Verified = MD5.create().digestHex(content); } catch (MessagingException | IOException e) {
// 如果md5验证失败 则不进行解码 // 下载邮件失败 抛出自定义邮件下载异常
if (!StrUtil.equals(md5, md5Verified)) return emlFile; status = EmailLogManager.STATUS_ERROR;
contents = StrUtil.split(content, '|'); String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage());
String base64 = contents.get(contents.size() - 1); log.error(errorMsg);
// 解码Base64字符串 并用解码后的内容覆盖未解码内容 throw new DownloadEmailException(errorMsg);
byte[] data = Base64.decode(base64); }catch (Exception e) {
try (OutputStream out = Files.newOutputStream(emlFile.toPath()); log.error("",e);
InflaterOutputStream inflaterOutputStream = new InflaterOutputStream(out)) { }finally {
inflaterOutputStream.write(data); EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"),
} catch (Exception e) { (Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
log.error("Create the Base64 decoded file[{}] error: {}", emlFile.getAbsolutePath(), e.getMessage()); EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
try {
if (Objects.nonNull(inputStream)) {
inputStream.close();
}
if (Objects.nonNull(outputStream)) {
outputStream.flush();
outputStream.close();
}
} catch (IOException e) {
throw new RuntimeException(e);
} }
} else { // 直接生成.eml文件
message.writeTo(Files.newOutputStream(emlFile.toPath()));
} }
// int bufferSize = 1024 * 1024; // 1M return emlFile;
// InputStream inputStream = message.getInputStream();
// BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize);
// // 或者使用 BufferedOutputStream
// OutputStream outputStream = new FileOutputStream(emlFile);
// BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize);
// // 从邮件的输入流读取内容并写入到本地文件
// byte[] buffer = new byte[bufferSize];
// int bytesRead;
// while ((bytesRead = bufferedInputStream.read(buffer)) != -1) {
// bufferedOutputStream.write(buffer, 0, bytesRead);
// }
//
// // 关闭流
// bufferedInputStream.close();
// bufferedOutputStream.close();
} catch (MessagingException | IOException e) {
// 下载邮件失败 抛出自定义邮件下载异常
status = EmailLogManager.STATUS_ERROR;
String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage());
log.error(errorMsg);
throw new DownloadEmailException(errorMsg);
}catch (Exception e) {
log.error("The email download failed: {}", e.getMessage());
}finally {
EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"),
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
} }
return emlFile;
} }
/** /**
@ -668,10 +654,10 @@ public class EmailServiceManager {
if(null != store){ if(null != store){
store.close(); store.close();
} }
for(String messageId : messageIds){ // for(String messageId : messageIds){
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; // String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
redisUtil.del(key); // redisUtil.del(key);
} // }
} catch (MessagingException e) { } catch (MessagingException e) {
log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage()); log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage());
e.printStackTrace(); e.printStackTrace();
@ -687,10 +673,13 @@ public class EmailServiceManager {
boolean exist = false; boolean exist = false;
try { try {
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
exist = redisUtil.hasKey(key); int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0;
if(exist){ // exist = redisUtil.hasKey(key);
if(numberKey >= taskProperties.getForceDeletedNumber()){
exist = true;
log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss")); log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
message.setFlag(Flags.Flag.DELETED,true); message.setFlag(Flags.Flag.DELETED,true);
redisUtil.del(key);
} }
return exist; return exist;
} catch (MessagingException e) { } catch (MessagingException e) {

View File

@ -41,6 +41,11 @@ public class TaskProperties implements Serializable {
*/ */
private Integer mailThreadExecCycle; private Integer mailThreadExecCycle;
/**
* 线程获取失败邮件次数
*/
private Integer forceDeletedNumber;
/** /**
* 监测需删除的邮件线程执行周期毫秒 * 监测需删除的邮件线程执行周期毫秒
*/ */

View File

@ -9,7 +9,7 @@ import lombok.Getter;
public enum Condition { public enum Condition {
FIRST_FOUND("1"), ABOVE_AVERAGE("2"), MEANWHILE("3"); FIRST_FOUND("1"), ABOVE_AVERAGE("2"), MEANWHILE("3");
private String value; private final String value;
public static Condition valueOf1(String value){ public static Condition valueOf1(String value){
for (Condition condition : Condition.values()) { for (Condition condition : Condition.values()) {

View File

@ -233,9 +233,15 @@ public class AutoProcessManager{
if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){ if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){
final boolean testFlag = testConnectEmailServer(databaseEmail); final boolean testFlag = testConnectEmailServer(databaseEmail);
if(testFlag){ if(testFlag){
databaseEmail.setNewEmailFlag(true); if (emailExecThreadMap.containsKey(databaseEmail.getId())) {
EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId());
actuator.setStop(false);
log.info("{}邮箱重新加入监测队列",databaseEmail.getUsername());
} else {
databaseEmail.setNewEmailFlag(true);
log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername());
}
emailMap.put(databaseEmail.getId(),databaseEmail); emailMap.put(databaseEmail.getId(),databaseEmail);
log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername());
} }
} }
} }
@ -279,6 +285,7 @@ public class AutoProcessManager{
if(next.getValue().getState() == State.TERMINATED){ if(next.getValue().getState() == State.TERMINATED){
log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername()); log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
checkStopThreads.remove(); checkStopThreads.remove();
emailMap.remove(next.getKey());
} }
} }

View File

@ -74,23 +74,23 @@ public class EmailParsingActuator extends Thread{
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance(); final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(), emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(),spectrumServiceQuotes.getRedisUtil()); this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil());
List<String> messageIds = new ArrayList<>(); List<String> messageIds = new ArrayList<>();
try { try {
Message[] messages = emailServiceManager.receiveMail(); Message[] messages = emailServiceManager.receiveMail();
if(ArrayUtils.isNotEmpty(messages)){ if(ArrayUtils.isNotEmpty(messages)){
log.info("EmailParsingActuator本次获取邮件数量为:{}",messages.length); log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), messages.length);
//检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除 //检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除
// for(int i=messages.length-1;i>=0;i--){ for(int i=messages.length-1;i>=0;i--){
// if (!messages[i].isExpunged()){ if (!messages[i].isExpunged()){
// String messageId = ((MimeMessage) messages[i]).getMessageID(); String messageId = ((MimeMessage) messages[i]).getMessageID();
// final boolean exist = emailServiceManager.check(messages[i],messageId); final boolean exist = emailServiceManager.check(messages[i],messageId);
// messageIds.add(messageId); messageIds.add(messageId);
// if(exist){ if(exist){
// messages = ArrayUtils.remove(messages,i); messages = ArrayUtils.remove(messages,i);
// } }
// } }
// } }
log.info("EmailParsingActuator本次真实执行邮件数量为{}",messages.length); log.info("EmailParsingActuator本次真实执行邮件数量为{}",messages.length);
if(messages.length > 0){ if(messages.length > 0){
//本批次邮件号 //本批次邮件号
@ -105,8 +105,10 @@ public class EmailParsingActuator extends Thread{
taskLatch.await(); taskLatch.await();
} }
} }
}catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} catch (MessagingException e) {
throw new RuntimeException(e);
} finally { } finally {
//清除本批次邮件日志缓存 //清除本批次邮件日志缓存
EmailLogManager.getInstance().clear(); EmailLogManager.getInstance().clear();

View File

@ -96,7 +96,10 @@ public class SpectrumParsingActuator implements Runnable{
receiveDate = DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"); receiveDate = DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss");
String emlName = subject+ StringConstant.UNDER_LINE+ receiveDate; String emlName = subject+ StringConstant.UNDER_LINE+ receiveDate;
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime); // spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime);
//判断当前key的下载次数是否超过限制次数
spectrumServiceQuotes.getRedisUtil().incr(key, 1L);
spectrumServiceQuotes.getRedisUtil().expire(key, expiryTime);
//线程开始初始化时初始本线程负责的能谱日志事件 //线程开始初始化时初始本线程负责的能谱日志事件
SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null); SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null);
@ -123,6 +126,7 @@ public class SpectrumParsingActuator implements Runnable{
try { try {
//开始解析 //开始解析
spectrumHandler.handler(); spectrumHandler.handler();
spectrumServiceQuotes.getRedisUtil().del(key);
} catch (Exception e) { } catch (Exception e) {
//如果是gamma谱的分析异常 //如果是gamma谱的分析异常
if (e instanceof AnalySpectrumException) { if (e instanceof AnalySpectrumException) {
@ -136,7 +140,6 @@ public class SpectrumParsingActuator implements Runnable{
throw e; throw e;
} }
} }
}else{ }else{
log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject); log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject);
} }
@ -204,7 +207,7 @@ public class SpectrumParsingActuator implements Runnable{
final String finalPath = rootPath+emlErrorPath; final String finalPath = rootPath+emlErrorPath;
FileOperation.moveFile(emlFile,finalPath,true); FileOperation.moveFile(emlFile,finalPath,true);
// 删除 key防止下次线程执行删除邮件 // 删除 key防止下次线程执行删除邮件
spectrumServiceQuotes.getRedisUtil().del(key); // spectrumServiceQuotes.getRedisUtil().del(key);
} }
} }