Merge remote-tracking branch 'origin/noFtp' into noFtp
This commit is contained in:
commit
18b495d1d0
|
@ -13,16 +13,10 @@ public interface RedisConstant {
|
||||||
*/
|
*/
|
||||||
String PREFIX_SILENCE = "SilenceCycle:";
|
String PREFIX_SILENCE = "SilenceCycle:";
|
||||||
|
|
||||||
String STREAM_ALARM = "Stream:Alarm";
|
|
||||||
|
|
||||||
String STREAM_ANALYSIS = "Stream:Analysis";
|
String STREAM_ANALYSIS = "Stream:Analysis";
|
||||||
|
|
||||||
String GROUP_ALARM = "Group_Alarm";
|
|
||||||
|
|
||||||
String GROUP_ANALYSIS = "Group_Analysis";
|
String GROUP_ANALYSIS = "Group_Analysis";
|
||||||
|
|
||||||
String CONSUMER_ALARM = "Consumer_Alarm";
|
|
||||||
|
|
||||||
String CONSUMER_ANALYSIS = "Consumer_Analysis";
|
String CONSUMER_ANALYSIS = "Consumer_Analysis";
|
||||||
|
|
||||||
String NUCLIDE_LINES_LIB = "Nuclide_Lines_Lib:";
|
String NUCLIDE_LINES_LIB = "Nuclide_Lines_Lib:";
|
||||||
|
|
|
@ -7,6 +7,7 @@ import cn.hutool.core.util.StrUtil;
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
import com.sun.mail.imap.IMAPStore;
|
import com.sun.mail.imap.IMAPStore;
|
||||||
import com.sun.mail.smtp.SMTPAddressFailedException;
|
import com.sun.mail.smtp.SMTPAddressFailedException;
|
||||||
|
import io.swagger.models.auth.In;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.jeecg.common.api.dto.message.MessageDTO;
|
import org.jeecg.common.api.dto.message.MessageDTO;
|
||||||
|
@ -16,6 +17,7 @@ import org.jeecg.common.constant.SymbolConstant;
|
||||||
import org.jeecg.common.email.emuns.MailContentType;
|
import org.jeecg.common.email.emuns.MailContentType;
|
||||||
import org.jeecg.common.exception.DownloadEmailException;
|
import org.jeecg.common.exception.DownloadEmailException;
|
||||||
import org.jeecg.common.properties.SpectrumPathProperties;
|
import org.jeecg.common.properties.SpectrumPathProperties;
|
||||||
|
import org.jeecg.common.properties.TaskProperties;
|
||||||
import org.jeecg.common.util.DateUtils;
|
import org.jeecg.common.util.DateUtils;
|
||||||
import org.jeecg.common.util.Md5Util;
|
import org.jeecg.common.util.Md5Util;
|
||||||
import org.jeecg.common.util.RedisUtil;
|
import org.jeecg.common.util.RedisUtil;
|
||||||
|
@ -47,6 +49,8 @@ public class EmailServiceManager {
|
||||||
|
|
||||||
private SysEmail email;
|
private SysEmail email;
|
||||||
private SpectrumPathProperties spectrumPathProperties;
|
private SpectrumPathProperties spectrumPathProperties;
|
||||||
|
|
||||||
|
private TaskProperties taskProperties;
|
||||||
/**
|
/**
|
||||||
* 系统启动时间
|
* 系统启动时间
|
||||||
*/
|
*/
|
||||||
|
@ -62,6 +66,8 @@ public class EmailServiceManager {
|
||||||
|
|
||||||
private RedisUtil redisUtil;
|
private RedisUtil redisUtil;
|
||||||
|
|
||||||
|
private Object downloadEmlLocal = new Object();
|
||||||
|
|
||||||
@NotNull
|
@NotNull
|
||||||
public static EmailServiceManager getInstance(){
|
public static EmailServiceManager getInstance(){
|
||||||
return new EmailServiceManager();
|
return new EmailServiceManager();
|
||||||
|
@ -80,13 +86,14 @@ public class EmailServiceManager {
|
||||||
* @param email 邮件属性
|
* @param email 邮件属性
|
||||||
*/
|
*/
|
||||||
public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath,
|
public void init(SysEmail email, Integer receiveNum, String temporaryStoragePath,
|
||||||
Date systemStartupTime, SpectrumPathProperties pathProperties,
|
Date systemStartupTime, SpectrumPathProperties pathProperties,TaskProperties taskProperties,
|
||||||
RedisUtil redisUtil){
|
RedisUtil redisUtil){
|
||||||
this.email = email;
|
this.email = email;
|
||||||
this.receiveNum = receiveNum;
|
this.receiveNum = receiveNum;
|
||||||
this.temporaryStoragePath = temporaryStoragePath;
|
this.temporaryStoragePath = temporaryStoragePath;
|
||||||
this.systemStartupTime = systemStartupTime;
|
this.systemStartupTime = systemStartupTime;
|
||||||
this.spectrumPathProperties = pathProperties;
|
this.spectrumPathProperties = pathProperties;
|
||||||
|
this.taskProperties = taskProperties;
|
||||||
this.redisUtil = redisUtil;
|
this.redisUtil = redisUtil;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -328,8 +335,9 @@ public class EmailServiceManager {
|
||||||
props.put("mail.smtp.host", email.getEmailServerAddress());
|
props.put("mail.smtp.host", email.getEmailServerAddress());
|
||||||
props.put("mail.smtp.port", email.getPort());
|
props.put("mail.smtp.port", email.getPort());
|
||||||
props.put("mail.smtp.auth", "true");
|
props.put("mail.smtp.auth", "true");
|
||||||
/*props.put("mail.smtp.socketFactory.port", email.getPort());
|
props.put("mail.smtp.starttls.enable", "true");
|
||||||
props.put("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");*/
|
props.put("mail.smtp.socketFactory.port", email.getPort());
|
||||||
|
props.put("mail.smtp.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
|
||||||
|
|
||||||
Session session = Session.getInstance(props);
|
Session session = Session.getInstance(props);
|
||||||
|
|
||||||
|
@ -519,44 +527,47 @@ public class EmailServiceManager {
|
||||||
* 当计数大于10000后从0开始,服务重启后也从0开始
|
* 当计数大于10000后从0开始,服务重启后也从0开始
|
||||||
*/
|
*/
|
||||||
public File downloadEmailToEmlDir(@NotNull Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException {
|
public File downloadEmailToEmlDir(@NotNull Message message,Integer emailCounter,Integer batchesCounter) throws MessagingException {
|
||||||
String subject = "";
|
synchronized (downloadEmlLocal) {
|
||||||
File emlFile = null;
|
String subject = "";
|
||||||
String status = EmailLogManager.STATUS_SUCCESS;
|
File emlFile = null;
|
||||||
Date receivedDate = null;
|
String status = EmailLogManager.STATUS_SUCCESS;
|
||||||
try {
|
Date receivedDate = null;
|
||||||
//获取发件人
|
FileOutputStream outputStream = null;
|
||||||
final String address = ((InternetAddress) message.getFrom()[0]).getAddress();
|
try {
|
||||||
final String from = address.substring(0,address.indexOf(StringConstant.AT));
|
//获取发件人
|
||||||
//获取主题
|
final String address = ((InternetAddress) message.getFrom()[0]).getAddress();
|
||||||
subject = MimeUtility.decodeText(message.getSubject());
|
final String from = address.substring(0,address.indexOf(StringConstant.AT));
|
||||||
if(subject.indexOf(StringConstant.SLASH) != -1){
|
//获取主题
|
||||||
subject = StringUtils.replace(subject,StringConstant.SLASH,"");
|
subject = MimeUtility.decodeText(message.getSubject());
|
||||||
}
|
if(subject.indexOf(StringConstant.SLASH) != -1){
|
||||||
if(subject.indexOf(StringConstant.COLON) != -1){
|
subject = StringUtils.replace(subject,StringConstant.SLASH,"");
|
||||||
subject = StringUtils.replace(subject,StringConstant.COLON,"");
|
}
|
||||||
}
|
if(subject.indexOf(StringConstant.COLON) != -1){
|
||||||
receivedDate = message.getReceivedDate();
|
subject = StringUtils.replace(subject,StringConstant.COLON,"");
|
||||||
StringBuilder fileName = new StringBuilder();
|
}
|
||||||
fileName.append(from);
|
receivedDate = message.getReceivedDate();
|
||||||
fileName.append(StringConstant.UNDER_LINE);
|
StringBuilder fileName = new StringBuilder();
|
||||||
fileName.append(subject);
|
fileName.append(from);
|
||||||
fileName.append(StringConstant.UNDER_LINE);
|
fileName.append(StringConstant.UNDER_LINE);
|
||||||
fileName.append(DateUtils.formatDate(new Date(),"YYMMdd"));
|
fileName.append(subject);
|
||||||
fileName.append(StringConstant.UNDER_LINE);
|
fileName.append(StringConstant.UNDER_LINE);
|
||||||
fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS"));
|
fileName.append(DateUtils.formatDate(new Date(),"YYMMdd"));
|
||||||
fileName.append(StringConstant.UNDER_LINE);
|
fileName.append(StringConstant.UNDER_LINE);
|
||||||
fileName.append("receive");
|
fileName.append(DateUtils.formatDate(new Date(),"HHmmssSSS"));
|
||||||
fileName.append(StringConstant.UNDER_LINE);
|
fileName.append(StringConstant.UNDER_LINE);
|
||||||
fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd"));
|
fileName.append("receive");
|
||||||
fileName.append(StringConstant.UNDER_LINE);
|
fileName.append(StringConstant.UNDER_LINE);
|
||||||
fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS"));
|
fileName.append(DateUtils.formatDate(receivedDate,"YYMMdd"));
|
||||||
fileName.append(StringConstant.UNDER_LINE);
|
fileName.append(StringConstant.UNDER_LINE);
|
||||||
fileName.append(emailCounter);
|
fileName.append(DateUtils.formatDate(receivedDate,"HHmmssSSS"));
|
||||||
fileName.append(SAVE_EML_SUFFIX);
|
fileName.append(StringConstant.UNDER_LINE);
|
||||||
final String rootPath = spectrumPathProperties.getRootPath();
|
fileName.append(emailCounter);
|
||||||
final String emlPath = spectrumPathProperties.getEmlPath();
|
fileName.append(SAVE_EML_SUFFIX);
|
||||||
emlFile = new File(rootPath+emlPath+File.separator+fileName);
|
final String rootPath = spectrumPathProperties.getRootPath();
|
||||||
message.writeTo(new FileOutputStream(emlFile));
|
final String emlPath = spectrumPathProperties.getEmlPath();
|
||||||
|
emlFile = new File(rootPath+emlPath+File.separator+fileName);
|
||||||
|
outputStream = new FileOutputStream(emlFile);
|
||||||
|
message.writeTo(outputStream);
|
||||||
|
|
||||||
// int bufferSize = 1024 * 1024; // 1M
|
// int bufferSize = 1024 * 1024; // 1M
|
||||||
// InputStream inputStream = message.getInputStream();
|
// InputStream inputStream = message.getInputStream();
|
||||||
|
@ -574,20 +585,28 @@ public class EmailServiceManager {
|
||||||
// // 关闭流
|
// // 关闭流
|
||||||
// bufferedInputStream.close();
|
// bufferedInputStream.close();
|
||||||
// bufferedOutputStream.close();
|
// bufferedOutputStream.close();
|
||||||
} catch (MessagingException | IOException e) {
|
} catch (MessagingException | IOException e) {
|
||||||
// 下载邮件失败 抛出自定义邮件下载异常
|
// 下载邮件失败 抛出自定义邮件下载异常
|
||||||
status = EmailLogManager.STATUS_ERROR;
|
status = EmailLogManager.STATUS_ERROR;
|
||||||
String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage());
|
String errorMsg = StrUtil.format("The email download failed, the subject of the email is {}, the reason is {}.", subject, e.getMessage());
|
||||||
log.error(errorMsg);
|
log.error(errorMsg);
|
||||||
throw new DownloadEmailException(errorMsg);
|
throw new DownloadEmailException(errorMsg);
|
||||||
}catch (Exception e) {
|
}catch (Exception e) {
|
||||||
log.error("",e);
|
log.error("",e);
|
||||||
}finally {
|
}finally {
|
||||||
EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"),
|
EmailLogEvent event = new EmailLogEvent(batchesCounter,Thread.currentThread().getId(),EmailLogManager.GS_TYPE_GET,status,EmailLogManager.GETIDEML,subject,DateUtils.formatDate(receivedDate,"yyyy-MM-dd HH:mm:ss:SSS"),
|
||||||
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
|
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
|
||||||
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
|
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
|
||||||
|
try {
|
||||||
|
if (Objects.nonNull(outputStream)) {
|
||||||
|
outputStream.close();
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return emlFile;
|
||||||
}
|
}
|
||||||
return emlFile;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -629,10 +648,10 @@ public class EmailServiceManager {
|
||||||
if(null != store){
|
if(null != store){
|
||||||
store.close();
|
store.close();
|
||||||
}
|
}
|
||||||
for(String messageId : messageIds){
|
// for(String messageId : messageIds){
|
||||||
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
// String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
||||||
redisUtil.del(key);
|
// redisUtil.del(key);
|
||||||
}
|
// }
|
||||||
} catch (MessagingException e) {
|
} catch (MessagingException e) {
|
||||||
log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage());
|
log.error("Email closure failed, email address is: {}, reason is: {}",email.getUsername(),e.getMessage());
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
@ -648,10 +667,13 @@ public class EmailServiceManager {
|
||||||
boolean exist = false;
|
boolean exist = false;
|
||||||
try {
|
try {
|
||||||
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
||||||
exist = redisUtil.hasKey(key);
|
int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0;
|
||||||
if(exist){
|
// exist = redisUtil.hasKey(key);
|
||||||
|
if(numberKey >= taskProperties.getForceDeletedNumber()){
|
||||||
|
exist = true;
|
||||||
log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
|
log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
|
||||||
message.setFlag(Flags.Flag.DELETED,true);
|
message.setFlag(Flags.Flag.DELETED,true);
|
||||||
|
redisUtil.del(key);
|
||||||
}
|
}
|
||||||
return exist;
|
return exist;
|
||||||
} catch (MessagingException e) {
|
} catch (MessagingException e) {
|
||||||
|
|
|
@ -41,6 +41,11 @@ public class TaskProperties implements Serializable {
|
||||||
*/
|
*/
|
||||||
private Integer mailThreadExecCycle;
|
private Integer mailThreadExecCycle;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 线程获取失败邮件次数
|
||||||
|
*/
|
||||||
|
private Integer forceDeletedNumber;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 监测需删除的邮件线程执行周期(毫秒)
|
* 监测需删除的邮件线程执行周期(毫秒)
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -1,13 +1,20 @@
|
||||||
package org.jeecg.common.util;
|
package org.jeecg.common.util;
|
||||||
|
|
||||||
|
import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.map.MapUtil;
|
import cn.hutool.core.map.MapUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
|
import cn.hutool.core.util.ReUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import org.jeecg.common.api.dto.message.MessageDTO;
|
import org.jeecg.common.api.dto.message.MessageDTO;
|
||||||
import org.jeecg.common.util.dynamic.db.FreemarkerParseFactory;
|
import org.jeecg.common.util.dynamic.db.FreemarkerParseFactory;
|
||||||
import org.jeecg.modules.base.entity.postgre.SysMessageTemplate;
|
import org.jeecg.modules.base.entity.postgre.SysMessageTemplate;
|
||||||
import org.jeecg.modules.base.service.ISysMessageTemplateService;
|
import org.jeecg.modules.base.service.ISysMessageTemplateService;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 模板工具类
|
* 模板工具类
|
||||||
|
@ -37,6 +44,30 @@ public class TemplateUtil {
|
||||||
return messageDTO;
|
return messageDTO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static MessageDTO parse1(String code, Map<String, Object> data){
|
||||||
|
MessageDTO messageDTO = new MessageDTO();
|
||||||
|
SysMessageTemplate template = templateService.getOne(code);
|
||||||
|
// 如果没有消息模板
|
||||||
|
if(ObjectUtil.isNull(template))
|
||||||
|
return messageDTO;
|
||||||
|
String templateName = template.getTemplateName();
|
||||||
|
String templateContent = template.getTemplateContent();
|
||||||
|
messageDTO.setTitle(templateName);
|
||||||
|
if (MapUtil.isEmpty(data))
|
||||||
|
return messageDTO;
|
||||||
|
Set<String> keys = data.keySet();
|
||||||
|
String pattern = "\\<([^<>]*{}[^<>]*)\\>";
|
||||||
|
List<String> contents = new ArrayList<>();
|
||||||
|
for (String key : keys) {
|
||||||
|
contents.add(ReUtil.getGroup1(StrUtil.format(pattern, key), templateContent));
|
||||||
|
}
|
||||||
|
templateContent = CollUtil.join(contents, "#");
|
||||||
|
String content = FreemarkerParseFactory
|
||||||
|
.parseTemplateContent(templateContent, data, true);
|
||||||
|
messageDTO.setContent(content);
|
||||||
|
return messageDTO;
|
||||||
|
}
|
||||||
|
|
||||||
public static MessageDTO parse(String title, String code, Map<String, Object> data) {
|
public static MessageDTO parse(String title, String code, Map<String, Object> data) {
|
||||||
MessageDTO messageDTO = new MessageDTO();
|
MessageDTO messageDTO = new MessageDTO();
|
||||||
SysMessageTemplate template = templateService.getOne(code);
|
SysMessageTemplate template = templateService.getOne(code);
|
||||||
|
@ -53,4 +84,28 @@ public class TemplateUtil {
|
||||||
messageDTO.setContent(content);
|
messageDTO.setContent(content);
|
||||||
return messageDTO;
|
return messageDTO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static MessageDTO parse1(String title, String code, Map<String, Object> data) {
|
||||||
|
MessageDTO messageDTO = new MessageDTO();
|
||||||
|
SysMessageTemplate template = templateService.getOne(code);
|
||||||
|
// 如果没有消息模板
|
||||||
|
if(ObjectUtil.isNull(template))
|
||||||
|
return messageDTO;
|
||||||
|
String templateName = template.getTemplateName();
|
||||||
|
String templateContent = template.getTemplateContent();
|
||||||
|
messageDTO.setTitle(StrUtil.isBlank(title) ? templateName : title);
|
||||||
|
if (MapUtil.isEmpty(data))
|
||||||
|
return messageDTO;
|
||||||
|
Set<String> keys = data.keySet();
|
||||||
|
String pattern = "\\<([^<>]*{}[^<>]*)\\>";
|
||||||
|
List<String> contents = new ArrayList<>();
|
||||||
|
for (String key : keys) {
|
||||||
|
contents.add(ReUtil.getGroup1(StrUtil.format(pattern, key), templateContent));
|
||||||
|
}
|
||||||
|
templateContent = CollUtil.join(contents, "#");
|
||||||
|
String content = FreemarkerParseFactory
|
||||||
|
.parseTemplateContent(templateContent, data, true);
|
||||||
|
messageDTO.setContent(content);
|
||||||
|
return messageDTO;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,8 +6,8 @@ import java.util.Comparator;
|
||||||
|
|
||||||
public class FileComparator implements Comparator<FileDto> {
|
public class FileComparator implements Comparator<FileDto> {
|
||||||
|
|
||||||
private final String field;
|
private String field;
|
||||||
private final String order;
|
private String order;
|
||||||
|
|
||||||
public FileComparator(String field, String order) {
|
public FileComparator(String field, String order) {
|
||||||
this.field = field;
|
this.field = field;
|
||||||
|
|
|
@ -3,8 +3,12 @@ package org.jeecg.modules.base.dto;
|
||||||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
|
||||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
|
import org.jeecg.common.util.NumUtil;
|
||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
|
@ -17,4 +21,8 @@ public class NuclideInfo implements Serializable {
|
||||||
private String datasource;
|
private String datasource;
|
||||||
|
|
||||||
private String value;
|
private String value;
|
||||||
|
|
||||||
|
public void keepSix(){
|
||||||
|
this.value = NumUtil.keepStr(this.value, 6);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,7 +9,7 @@ import lombok.Getter;
|
||||||
public enum Condition {
|
public enum Condition {
|
||||||
FIRST_FOUND("1"), ABOVE_AVERAGE("2"), MEANWHILE("3");
|
FIRST_FOUND("1"), ABOVE_AVERAGE("2"), MEANWHILE("3");
|
||||||
|
|
||||||
private String value;
|
private final String value;
|
||||||
|
|
||||||
public static Condition valueOf1(String value){
|
public static Condition valueOf1(String value){
|
||||||
for (Condition condition : Condition.values()) {
|
for (Condition condition : Condition.values()) {
|
||||||
|
|
|
@ -1,10 +1,14 @@
|
||||||
package org.jeecg.modules.base.enums;
|
package org.jeecg.modules.base.enums;
|
||||||
|
|
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @author nieziyan
|
* @author nieziyan
|
||||||
* 数据源类型
|
* 数据源类型
|
||||||
*/
|
*/
|
||||||
|
@Getter
|
||||||
public enum DSType {
|
public enum DSType {
|
||||||
/* ARMD自动处理库 */
|
/* ARMD自动处理库 */
|
||||||
ARMDARR("1"),
|
ARMDARR("1"),
|
||||||
|
@ -15,13 +19,17 @@ public enum DSType {
|
||||||
/* IDC人工交互库 */
|
/* IDC人工交互库 */
|
||||||
IDCRRR("4");
|
IDCRRR("4");
|
||||||
|
|
||||||
DSType(java.lang.String type) {
|
DSType(String type) {
|
||||||
this.type = type;
|
this.type = type;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String type;
|
private final String type;
|
||||||
|
|
||||||
public String getType() {
|
public static String typeOf(String type){
|
||||||
return type;
|
for (DSType dsType : DSType.values()) {
|
||||||
|
if (StrUtil.equals(type, dsType.getType()))
|
||||||
|
return dsType.name();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,8 +5,10 @@ import cn.hutool.core.collection.CollUtil;
|
||||||
import cn.hutool.core.collection.CollectionUtil;
|
import cn.hutool.core.collection.CollectionUtil;
|
||||||
import cn.hutool.core.collection.ListUtil;
|
import cn.hutool.core.collection.ListUtil;
|
||||||
import cn.hutool.core.map.MapUtil;
|
import cn.hutool.core.map.MapUtil;
|
||||||
|
import cn.hutool.core.text.StrBuilder;
|
||||||
import cn.hutool.core.util.NumberUtil;
|
import cn.hutool.core.util.NumberUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
|
import cn.hutool.core.util.ReUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.NoArgsConstructor;
|
import lombok.NoArgsConstructor;
|
||||||
|
@ -16,14 +18,15 @@ import org.jeecg.common.config.mqtoken.UserTokenContext;
|
||||||
import org.jeecg.common.constant.CommonConstant;
|
import org.jeecg.common.constant.CommonConstant;
|
||||||
import org.jeecg.common.constant.SymbolConstant;
|
import org.jeecg.common.constant.SymbolConstant;
|
||||||
import org.jeecg.common.constant.enums.SampleType;
|
import org.jeecg.common.constant.enums.SampleType;
|
||||||
import org.jeecg.common.util.RedisStreamUtil;
|
import org.jeecg.common.util.*;
|
||||||
import org.jeecg.common.util.SpringContextUtils;
|
import org.jeecg.common.util.dynamic.db.FreemarkerParseFactory;
|
||||||
import org.jeecg.modules.base.dto.NuclideInfo;
|
import org.jeecg.modules.base.dto.NuclideInfo;
|
||||||
import org.jeecg.modules.base.dto.Info;
|
import org.jeecg.modules.base.dto.Info;
|
||||||
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog;
|
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog;
|
||||||
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
|
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
|
||||||
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisRule;
|
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisRule;
|
||||||
import org.jeecg.modules.base.enums.Condition;
|
import org.jeecg.modules.base.enums.Condition;
|
||||||
|
import org.jeecg.modules.base.enums.DSType;
|
||||||
import org.jeecg.modules.feignclient.SystemClient;
|
import org.jeecg.modules.feignclient.SystemClient;
|
||||||
import org.jeecg.modules.service.AnalysisResultService;
|
import org.jeecg.modules.service.AnalysisResultService;
|
||||||
import org.jeecg.modules.service.IAlarmAnalysisLogService;
|
import org.jeecg.modules.service.IAlarmAnalysisLogService;
|
||||||
|
@ -36,9 +39,14 @@ import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
|
import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
|
||||||
import static org.jeecg.common.util.TokenUtils.getTempToken;
|
import static org.jeecg.common.util.TokenUtils.getTempToken;
|
||||||
|
import static org.jeecg.modules.base.enums.Template.ANALYSIS_NUCLIDE;
|
||||||
|
import static org.jeecg.modules.base.enums.Template.MONITOR_EMAIL;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
|
import java.time.LocalDate;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.regex.Matcher;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@Data
|
@Data
|
||||||
|
@ -71,7 +79,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
||||||
try {
|
try {
|
||||||
String streamKey = message.getStream();
|
String streamKey = message.getStream();
|
||||||
init();
|
init();
|
||||||
/**
|
/*
|
||||||
* 新消息在未进行ACK之前,状态也为pending,
|
* 新消息在未进行ACK之前,状态也为pending,
|
||||||
* 直接消费所有异常未确认的消息和新消息
|
* 直接消费所有异常未确认的消息和新消息
|
||||||
*/
|
*/
|
||||||
|
@ -86,10 +94,10 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
||||||
// 消费完成后,手动确认消费消息[消息消费成功]
|
// 消费完成后,手动确认消费消息[消息消费成功]
|
||||||
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
|
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
|
||||||
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
|
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
|
||||||
// TODO del 取消手动删除已消费消息
|
// 手动删除已消费消息
|
||||||
// redisStreamUtil.del(streamKey, recordId.getValue());
|
redisStreamUtil.del(streamKey, recordId.getValue());
|
||||||
}
|
}
|
||||||
}catch (RuntimeException e){
|
}catch (Exception e){
|
||||||
log.error("AnalysisConsumer消费异常: {}", e.getMessage());
|
log.error("AnalysisConsumer消费异常: {}", e.getMessage());
|
||||||
}finally {
|
}finally {
|
||||||
destroy();
|
destroy();
|
||||||
|
@ -109,7 +117,8 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
||||||
for (AlarmAnalysisRule rule : rules) {
|
for (AlarmAnalysisRule rule : rules) {
|
||||||
// 当前规则是否有报警条件
|
// 当前规则是否有报警条件
|
||||||
String conditionStr = rule.getConditions();
|
String conditionStr = rule.getConditions();
|
||||||
if (StrUtil.isBlank(conditionStr)) continue;
|
if (StrUtil.isBlank(conditionStr))
|
||||||
|
continue;
|
||||||
// 是否在当前规则关注的台站列表内
|
// 是否在当前规则关注的台站列表内
|
||||||
String stations = rule.getStations();
|
String stations = rule.getStations();
|
||||||
if (!StrUtil.contains(stations, stationId))
|
if (!StrUtil.contains(stations, stationId))
|
||||||
|
@ -137,74 +146,71 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
||||||
info.setRuleId(rule.getId());
|
info.setRuleId(rule.getId());
|
||||||
info.setGroupId(rule.getContactGroup());
|
info.setGroupId(rule.getContactGroup());
|
||||||
info.setConditions(rule.getConditions());
|
info.setConditions(rule.getConditions());
|
||||||
judge(info,nuclidesCross);
|
judge(info, nuclidesCross);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void judge(Info info, Map<String,String> nuclidesCross){
|
private void judge(Info info, Map<String,String> nuclidesCross){
|
||||||
Set<String> nuclideNames = nuclidesCross.keySet();
|
Set<String> nuclideNames = nuclidesCross.keySet();
|
||||||
StringBuilder alarmInfo = new StringBuilder();
|
|
||||||
List<String> firstDetected;
|
|
||||||
List<NuclideInfo> moreThanAvg = new ArrayList<>();
|
|
||||||
String conditionStr = info.getConditions();
|
String conditionStr = info.getConditions();
|
||||||
String betaOrGamma = info.getBetaOrGamma();
|
String betaOrGamma = info.getBetaOrGamma();
|
||||||
String datasource = info.getDatasource();
|
String datasource = info.getDatasource();
|
||||||
|
String stationId = info.getStationId();
|
||||||
|
// 获取谱文件采样日期 如果为null 则默认为LocalDate.now()
|
||||||
|
LocalDate collDate = ObjectUtil.isNull(info.getCollectionDate()) ? LocalDate.now() :
|
||||||
|
info.getCollectionDate().toLocalDate();
|
||||||
|
|
||||||
List<String> conditions = ListUtil.toList(conditionStr.split(COMMA));
|
List<String> conditions = ListUtil.toList(conditionStr.split(COMMA));
|
||||||
|
List<String> firstDetected = new ArrayList<>(); // 首次发现
|
||||||
|
List<NuclideInfo> moreThanAvg = new ArrayList<>(); // 超浓度均值
|
||||||
|
List<String> meanwhile = new ArrayList<>(); // 同时出现两种及以上核素
|
||||||
for (String con : conditions) {
|
for (String con : conditions) {
|
||||||
Condition condition = Condition.valueOf1(con);
|
Condition condition = Condition.valueOf1(con);
|
||||||
if (ObjectUtil.isNotNull(condition)){
|
if (ObjectUtil.isNull(condition)) continue;
|
||||||
switch (condition){
|
switch (condition){
|
||||||
case FIRST_FOUND: // 首次发现该元素
|
case FIRST_FOUND: // 首次发现该元素
|
||||||
firstDetected = firstDetected(betaOrGamma,datasource,nuclideNames);
|
firstDetected = firstDetected(betaOrGamma, datasource, nuclideNames);
|
||||||
if (CollUtil.isNotEmpty(firstDetected)){
|
break;
|
||||||
String message = "First discovery of nuclides: [" + StrUtil.join(COMMA,firstDetected) + "]";
|
case ABOVE_AVERAGE: // 元素浓度高于均值
|
||||||
alarmInfo.append(message);
|
moreThanAvg = moreThanAvg(datasource, stationId, collDate, nuclidesCross);
|
||||||
}
|
break;
|
||||||
break;
|
case MEANWHILE: // 同时出现两种及以上核素
|
||||||
case ABOVE_AVERAGE: // 元素浓度高于均值
|
if (CollUtil.isNotEmpty(nuclideNames) && nuclideNames.size() >= 2)
|
||||||
moreThanAvg = moreThanAvg(datasource,nuclidesCross);
|
meanwhile.addAll(nuclideNames);
|
||||||
if (CollUtil.isNotEmpty(moreThanAvg)){
|
break;
|
||||||
for (NuclideInfo nuclideInfo : moreThanAvg) {
|
default:
|
||||||
String nuclide = nuclideInfo.getNuclide();
|
break;
|
||||||
String threshold = nuclideInfo.getThreshold();
|
|
||||||
String message = "Nuclide " + nuclide + "is above average: " + threshold;
|
|
||||||
alarmInfo.append(COMMA).append(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case MEANWHILE: // 同时出现两种及以上核素
|
|
||||||
if (nuclideNames.size() >= 2){
|
|
||||||
String message = "Simultaneously detecting nuclides: [" + StrUtil.join(COMMA,nuclideNames) + "]";
|
|
||||||
alarmInfo.append(COMMA).append(message);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (StrUtil.isNotBlank(alarmInfo.toString())){
|
// 构建预警信息
|
||||||
// 保存报警日志
|
DataTool dataTool = DataTool.getInstance();
|
||||||
AlarmAnalysisLog logInfo = new AlarmAnalysisLog();
|
if (CollUtil.isNotEmpty(firstDetected))
|
||||||
BeanUtil.copyProperties(info,logInfo);
|
dataTool.put("firstDetected", CollUtil.join(firstDetected, StrUtil.COMMA + StrUtil.SPACE));
|
||||||
SampleType sampleType = SampleType.typeOf(betaOrGamma);
|
if (CollUtil.isNotEmpty(moreThanAvg)){
|
||||||
if (ObjectUtil.isNotNull(sampleType))
|
String above = moreThanAvg.stream()
|
||||||
logInfo.setSampleType(sampleType.getValue());
|
.map(item -> item.getNuclide() + "(" + item.getValue() + ")" + " > " + item.getThreshold())
|
||||||
if (alarmInfo.toString().startsWith(COMMA))
|
.collect(Collectors.joining(StrUtil.COMMA + StrUtil.SPACE));
|
||||||
alarmInfo = new StringBuilder(StrUtil.sub(alarmInfo.toString(), 1, alarmInfo.length()));
|
dataTool.put("moreThanAvg", above);
|
||||||
logInfo.setAlarmInfo(alarmInfo.toString());
|
|
||||||
if (CollUtil.isNotEmpty(moreThanAvg))
|
|
||||||
logInfo.setNuclideInfoList(moreThanAvg);
|
|
||||||
logService.saveLog(logInfo);
|
|
||||||
// 发送报警信息
|
|
||||||
String groupId = info.getGroupId();
|
|
||||||
MessageDTO messageDTO = new MessageDTO();
|
|
||||||
messageDTO.setTitle("Nuclied Warn Info").setContent(alarmInfo.toString());
|
|
||||||
if (StrUtil.isNotBlank(groupId)) {
|
|
||||||
systemClient.sendMessage(messageDTO, groupId, ALL.getValue());
|
|
||||||
systemClient.pushMessageToSingle(messageDTO, groupId);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
if (CollUtil.isNotEmpty(meanwhile))
|
||||||
|
dataTool.put("meanwhile", CollUtil.join(meanwhile, StrUtil.COMMA + StrUtil.SPACE));
|
||||||
|
// 如果报警数据为空 则不需要发送报警信息和生成报警日志
|
||||||
|
if (MapUtil.isEmpty(dataTool.get())) return;
|
||||||
|
MessageDTO messageDTO = TemplateUtil.parse1(ANALYSIS_NUCLIDE.getCode(), dataTool.get());
|
||||||
|
// 保存报警日志
|
||||||
|
AlarmAnalysisLog logInfo = new AlarmAnalysisLog();
|
||||||
|
BeanUtil.copyProperties(info, logInfo);
|
||||||
|
SampleType sampleType = SampleType.typeOf(betaOrGamma);
|
||||||
|
if (ObjectUtil.isNotNull(sampleType))
|
||||||
|
logInfo.setSampleType(sampleType.getValue());
|
||||||
|
logInfo.setAlarmInfo(messageDTO.getContent());
|
||||||
|
if (CollUtil.isNotEmpty(moreThanAvg))
|
||||||
|
logInfo.setNuclideInfoList(moreThanAvg);
|
||||||
|
logService.saveLog(logInfo);
|
||||||
|
// 发送报警信息
|
||||||
|
String groupId = info.getGroupId();
|
||||||
|
systemClient.sendMessage(messageDTO, groupId, ALL.getValue());
|
||||||
|
systemClient.pushMessageToSingle(messageDTO, groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -222,12 +228,12 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
||||||
/**
|
/**
|
||||||
* 核素值大于历史浓度均值
|
* 核素值大于历史浓度均值
|
||||||
*/
|
*/
|
||||||
private List<NuclideInfo> moreThanAvg(String dataSourceType,
|
private List<NuclideInfo> moreThanAvg(String dataSourceType, String stationId,
|
||||||
Map<String,String> nuclidesCross){
|
LocalDate collDate, Map<String,String> nuclidesCross){
|
||||||
List<NuclideInfo> nuclideInfos = new ArrayList<>();
|
List<NuclideInfo> nuclideInfos = new ArrayList<>();
|
||||||
Set<String> nuclideNames = nuclidesCross.keySet();
|
Set<String> nuclideNames = nuclidesCross.keySet();
|
||||||
Map<String, String> nuclideAvgs = nuclideAvgService
|
Map<String, String> nuclideAvgs = nuclideAvgService
|
||||||
.list(nuclideNames, dataSourceType).stream()
|
.list(nuclideNames, dataSourceType, stationId, collDate).stream()
|
||||||
.collect(Collectors.toMap(AlarmAnalysisNuclideAvg::getNuclide,
|
.collect(Collectors.toMap(AlarmAnalysisNuclideAvg::getNuclide,
|
||||||
AlarmAnalysisNuclideAvg::getVal));
|
AlarmAnalysisNuclideAvg::getVal));
|
||||||
for (Map.Entry<String, String> nuclide : nuclidesCross.entrySet()) {
|
for (Map.Entry<String, String> nuclide : nuclidesCross.entrySet()) {
|
||||||
|
@ -246,41 +252,27 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
||||||
NuclideInfo nuclideInfo = new NuclideInfo();
|
NuclideInfo nuclideInfo = new NuclideInfo();
|
||||||
nuclideInfo.setNuclide(nuclideName);
|
nuclideInfo.setNuclide(nuclideName);
|
||||||
nuclideInfo.setThreshold(avg.toString());
|
nuclideInfo.setThreshold(avg.toString());
|
||||||
nuclideInfo.setDatasource(type(dataSourceType));
|
nuclideInfo.setDatasource(DSType.typeOf(dataSourceType));
|
||||||
nuclideInfo.setValue(conc.toString());
|
// 对浓度值保留五位小数
|
||||||
|
nuclideInfo.setValue(NumUtil.keepStr(concValue, 5));
|
||||||
nuclideInfos.add(nuclideInfo);
|
nuclideInfos.add(nuclideInfo);
|
||||||
}
|
}
|
||||||
return nuclideInfos;
|
return nuclideInfos;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String type(String dataSourceType){
|
|
||||||
switch (dataSourceType){
|
|
||||||
case CommonConstant.ARMDARR:
|
|
||||||
return "ARMDARR";
|
|
||||||
case CommonConstant.ARMDRRR:
|
|
||||||
return "ARMDRRR";
|
|
||||||
case CommonConstant.IDCARR:
|
|
||||||
return "IDCARR";
|
|
||||||
case CommonConstant.IDCRRR:
|
|
||||||
return "IDCRRR";
|
|
||||||
default:
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void init() {
|
private void init() {
|
||||||
// start:生成临时Token到线程中
|
// start 生成临时Token到线程中
|
||||||
UserTokenContext.setToken(getTempToken());
|
UserTokenContext.setToken(getTempToken());
|
||||||
systemClient = SpringContextUtils.getBean(SystemClient.class);
|
systemClient = SpringContextUtils.getBean(SystemClient.class);
|
||||||
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
|
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
|
||||||
logService = SpringContextUtils.getBean(IAlarmAnalysisLogService.class);
|
logService = SpringContextUtils.getBean(IAlarmAnalysisLogService.class);
|
||||||
analysisResultService = SpringContextUtils.getBean(AnalysisResultService.class);
|
|
||||||
ruleService = SpringContextUtils.getBean(IAlarmAnalysisRuleService.class);
|
ruleService = SpringContextUtils.getBean(IAlarmAnalysisRuleService.class);
|
||||||
|
analysisResultService = SpringContextUtils.getBean(AnalysisResultService.class);
|
||||||
nuclideAvgService = SpringContextUtils.getBean(IAlarmAnalysisNuclideAvgService.class);
|
nuclideAvgService = SpringContextUtils.getBean(IAlarmAnalysisNuclideAvgService.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void destroy(){
|
private void destroy(){
|
||||||
// end:删除临时Token
|
// end 删除临时Token
|
||||||
UserTokenContext.remove();
|
UserTokenContext.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,10 +34,6 @@ public class RedisStreamConfig {
|
||||||
// 每次轮询取几条消息
|
// 每次轮询取几条消息
|
||||||
private final Integer maxMsg = 10;
|
private final Integer maxMsg = 10;
|
||||||
|
|
||||||
private final String alarmKey = RedisConstant.STREAM_ALARM;
|
|
||||||
private final String alarmGroup = RedisConstant.GROUP_ALARM;
|
|
||||||
private final String alarmConsumer = RedisConstant.CONSUMER_ALARM;
|
|
||||||
|
|
||||||
private final String analysisKey = RedisConstant.STREAM_ANALYSIS;
|
private final String analysisKey = RedisConstant.STREAM_ANALYSIS;
|
||||||
private final String analysisGroup = RedisConstant.GROUP_ANALYSIS;
|
private final String analysisGroup = RedisConstant.GROUP_ANALYSIS;
|
||||||
private final String analysisConsumer = RedisConstant.CONSUMER_ANALYSIS;
|
private final String analysisConsumer = RedisConstant.CONSUMER_ANALYSIS;
|
||||||
|
@ -103,7 +99,7 @@ public class RedisStreamConfig {
|
||||||
// 独立消费
|
// 独立消费
|
||||||
/*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
|
/*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
|
||||||
new ConsumeListener("独立消费", null, null));*/
|
new ConsumeListener("独立消费", null, null));*/
|
||||||
|
// 非独立消费
|
||||||
/*
|
/*
|
||||||
register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用
|
register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用
|
||||||
这种方式适用于长期运行的消息消费者,它会持续监听Redis Stream并处理新到达的消息
|
这种方式适用于长期运行的消息消费者,它会持续监听Redis Stream并处理新到达的消息
|
||||||
|
@ -111,24 +107,28 @@ public class RedisStreamConfig {
|
||||||
receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间
|
receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间
|
||||||
这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务
|
这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务
|
||||||
**/
|
**/
|
||||||
// 注册消费组A中的消费者A1,手动ACK
|
/* 1.需要手动确认消费消息 */
|
||||||
/*ConsumerStreamReadRequest<String> readA1 = StreamMessageListenerContainer
|
// 1.1 使用 register 方式
|
||||||
|
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer
|
||||||
.StreamReadRequest
|
.StreamReadRequest
|
||||||
.builder(StreamOffset.create(warnKey, ReadOffset.lastConsumed()))
|
.builder(StreamOffset.create(analysisKey, ReadOffset.lastConsumed()))
|
||||||
.consumer(Consumer.from(groupWarnA, consumerWarnA1))
|
.consumer(Consumer.from(analysisGroup, analysisConsumer))
|
||||||
|
// 手动确认消费了消息 默认为自动确认消息
|
||||||
.autoAcknowledge(false)
|
.autoAcknowledge(false)
|
||||||
// 如果消费者发生了异常,是否禁止消费者消费
|
// 如果消费者发生了异常 不禁止消费者消费 默认为禁止
|
||||||
.cancelOnError(throwable -> false)
|
.cancelOnError(throwable -> false)
|
||||||
.build();
|
.build();
|
||||||
ConsumeA1 consumeA1 = new ConsumeA1(groupWarnA, consumerWarnA1);
|
AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup, analysisConsumer);
|
||||||
streamMessageListenerContainer.register(readA1, consumeA1);*/
|
streamMessageListenerContainer.register(readRequest, analysis);
|
||||||
AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup,analysisConsumer);
|
// 1.2 使用 receive 方式
|
||||||
|
/*AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup, analysisConsumer);
|
||||||
streamMessageListenerContainer.receive(Consumer.from(analysisGroup, analysisConsumer),
|
streamMessageListenerContainer.receive(Consumer.from(analysisGroup, analysisConsumer),
|
||||||
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);
|
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);*/
|
||||||
// 创建消费组A中的消费者A2,自动ACK
|
/* 2.自动确认消费消息 */
|
||||||
/* ConsumeA2 consumeA2 = new ConsumeA2(consumerWarnA2);
|
// 2.1 使用 receive 方式
|
||||||
streamMessageListenerContainer.receiveAutoAck(Consumer.from(groupWarnA, consumerWarnA2),
|
/*AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup,analysisConsumer);
|
||||||
StreamOffset.create(warnKey, ReadOffset.lastConsumed()), consumeA2);*/
|
streamMessageListenerContainer.receiveAutoAck(Consumer.from(analysisGroup, analysisConsumer),
|
||||||
|
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);*/
|
||||||
return streamMessageListenerContainer;
|
return streamMessageListenerContainer;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,12 +6,14 @@ import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
|
||||||
import com.baomidou.mybatisplus.extension.service.IService;
|
import com.baomidou.mybatisplus.extension.service.IService;
|
||||||
import org.jeecg.modules.base.bizVo.NuclideAvgVo;
|
import org.jeecg.modules.base.bizVo.NuclideAvgVo;
|
||||||
|
|
||||||
|
import java.time.LocalDate;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
public interface IAlarmAnalysisNuclideAvgService extends IService<AlarmAnalysisNuclideAvg> {
|
public interface IAlarmAnalysisNuclideAvgService extends IService<AlarmAnalysisNuclideAvg> {
|
||||||
|
|
||||||
List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames,String dataSourceType);
|
List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames, String dataSourceType,
|
||||||
|
String stationId, LocalDate collDate);
|
||||||
|
|
||||||
Page<NuclideAvgDto> findPage(NuclideAvgVo nuclideAvgVo);
|
Page<NuclideAvgDto> findPage(NuclideAvgVo nuclideAvgVo);
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,6 +75,7 @@ public class AlarmAnalysisLogServiceImpl extends ServiceImpl<AlarmAnalysisLogMap
|
||||||
try {
|
try {
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
List<NuclideInfo> nuclideInfos = mapper.readValue(nuclideInfo, new TypeReference<List<NuclideInfo>>() {});
|
List<NuclideInfo> nuclideInfos = mapper.readValue(nuclideInfo, new TypeReference<List<NuclideInfo>>() {});
|
||||||
|
nuclideInfos.forEach(NuclideInfo::keepSix);
|
||||||
logDto.setNuclideList(nuclideInfos);
|
logDto.setNuclideList(nuclideInfos);
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
log.error("NuclideInfo解析异常: {}", e.getMessage());
|
log.error("NuclideInfo解析异常: {}", e.getMessage());
|
||||||
|
|
|
@ -15,6 +15,7 @@ import org.springframework.stereotype.Service;
|
||||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
import java.time.LocalDate;
|
||||||
|
import java.time.LocalDateTime;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -27,13 +28,15 @@ public class AlarmAnalysisNuclideAvgServiceImpl extends ServiceImpl<AlarmAnalysi
|
||||||
private SystemClient systemClient;
|
private SystemClient systemClient;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames,String dataSourceType) {
|
public List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames, String dataSourceType,
|
||||||
LocalDate dayAgo = LocalDate.now().minusDays(1);
|
String stationId, LocalDate collDate) {
|
||||||
|
LocalDate dayAgo = collDate.minusDays(1);
|
||||||
LambdaQueryWrapper<AlarmAnalysisNuclideAvg> wrapper = new LambdaQueryWrapper<>();
|
LambdaQueryWrapper<AlarmAnalysisNuclideAvg> wrapper = new LambdaQueryWrapper<>();
|
||||||
wrapper.eq(AlarmAnalysisNuclideAvg::getDataSourceType,dataSourceType);
|
wrapper.eq(AlarmAnalysisNuclideAvg::getStationId, stationId);
|
||||||
wrapper.eq(AlarmAnalysisNuclideAvg::getCaclDate,dayAgo);
|
wrapper.eq(AlarmAnalysisNuclideAvg::getDataSourceType, dataSourceType);
|
||||||
|
wrapper.eq(AlarmAnalysisNuclideAvg::getCaclDate, dayAgo);
|
||||||
wrapper.in(AlarmAnalysisNuclideAvg::getNuclide,nuclideNames);
|
wrapper.in(AlarmAnalysisNuclideAvg::getNuclide,nuclideNames);
|
||||||
return list(wrapper);
|
return this.list(wrapper);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -74,23 +74,23 @@ public class EmailParsingActuator extends Thread{
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
|
final EmailServiceManager emailServiceManager = EmailServiceManager.getInstance();
|
||||||
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),
|
emailServiceManager.init(this.emailProperties,this.taskProperties.getReceiveNum(),this.taskProperties.getTemporaryStoragePath(),
|
||||||
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(),spectrumServiceQuotes.getRedisUtil());
|
this.systemStartupTime, spectrumServiceQuotes.getSpectrumPathProperties(), spectrumServiceQuotes.getTaskProperties(), spectrumServiceQuotes.getRedisUtil());
|
||||||
List<String> messageIds = new ArrayList<>();
|
List<String> messageIds = new ArrayList<>();
|
||||||
try {
|
try {
|
||||||
Message[] messages = emailServiceManager.receiveMail();
|
Message[] messages = emailServiceManager.receiveMail();
|
||||||
if(ArrayUtils.isNotEmpty(messages)){
|
if(ArrayUtils.isNotEmpty(messages)){
|
||||||
log.info("EmailParsingActuator本次获取邮件数量为:{}",messages.length);
|
log.info("EmailParsingActuator本次获取邮件数量为:{}",messages.length);
|
||||||
//检验获取的邮件是否在之前删除失败列表中,若在直接调用邮件API删除,并且此次数组里元素也删除
|
//检验获取的邮件是否在之前删除失败列表中,若在直接调用邮件API删除,并且此次数组里元素也删除
|
||||||
// for(int i=messages.length-1;i>=0;i--){
|
for(int i=messages.length-1;i>=0;i--){
|
||||||
// if (!messages[i].isExpunged()){
|
if (!messages[i].isExpunged()){
|
||||||
// String messageId = ((MimeMessage) messages[i]).getMessageID();
|
String messageId = ((MimeMessage) messages[i]).getMessageID();
|
||||||
// final boolean exist = emailServiceManager.check(messages[i],messageId);
|
final boolean exist = emailServiceManager.check(messages[i],messageId);
|
||||||
// messageIds.add(messageId);
|
messageIds.add(messageId);
|
||||||
// if(exist){
|
if(exist){
|
||||||
// messages = ArrayUtils.remove(messages,i);
|
messages = ArrayUtils.remove(messages,i);
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
// }
|
}
|
||||||
log.info("EmailParsingActuator本次真实执行邮件数量为:{}",messages.length);
|
log.info("EmailParsingActuator本次真实执行邮件数量为:{}",messages.length);
|
||||||
if(messages.length > 0){
|
if(messages.length > 0){
|
||||||
//本批次邮件号
|
//本批次邮件号
|
||||||
|
@ -107,6 +107,8 @@ public class EmailParsingActuator extends Thread{
|
||||||
}
|
}
|
||||||
}catch (InterruptedException e) {
|
}catch (InterruptedException e) {
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
|
} catch (MessagingException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
} finally {
|
} finally {
|
||||||
//清除本批次邮件日志缓存
|
//清除本批次邮件日志缓存
|
||||||
EmailLogManager.getInstance().clear();
|
EmailLogManager.getInstance().clear();
|
||||||
|
|
|
@ -96,7 +96,10 @@ public class SpectrumParsingActuator implements Runnable{
|
||||||
receiveDate = DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss");
|
receiveDate = DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss");
|
||||||
String emlName = subject+ StringConstant.UNDER_LINE+ receiveDate;
|
String emlName = subject+ StringConstant.UNDER_LINE+ receiveDate;
|
||||||
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
|
||||||
spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime);
|
// spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime);
|
||||||
|
//判断当前key的下载次数是否超过限制次数
|
||||||
|
spectrumServiceQuotes.getRedisUtil().incr(key, 1L);
|
||||||
|
spectrumServiceQuotes.getRedisUtil().expire(key, expiryTime);
|
||||||
//线程开始初始化时,初始本线程负责的能谱日志事件
|
//线程开始初始化时,初始本线程负责的能谱日志事件
|
||||||
SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null);
|
SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null);
|
||||||
|
|
||||||
|
@ -123,6 +126,7 @@ public class SpectrumParsingActuator implements Runnable{
|
||||||
try {
|
try {
|
||||||
//开始解析
|
//开始解析
|
||||||
spectrumHandler.handler();
|
spectrumHandler.handler();
|
||||||
|
spectrumServiceQuotes.getRedisUtil().del(key);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
//如果是gamma谱的分析异常
|
//如果是gamma谱的分析异常
|
||||||
if (e instanceof AnalySpectrumException) {
|
if (e instanceof AnalySpectrumException) {
|
||||||
|
@ -136,7 +140,6 @@ public class SpectrumParsingActuator implements Runnable{
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}else{
|
}else{
|
||||||
log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject);
|
log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject);
|
||||||
}
|
}
|
||||||
|
@ -204,7 +207,7 @@ public class SpectrumParsingActuator implements Runnable{
|
||||||
final String finalPath = rootPath+emlErrorPath;
|
final String finalPath = rootPath+emlErrorPath;
|
||||||
FileOperation.moveFile(emlFile,finalPath,true);
|
FileOperation.moveFile(emlFile,finalPath,true);
|
||||||
// 删除 key,防止下次线程执行删除邮件
|
// 删除 key,防止下次线程执行删除邮件
|
||||||
spectrumServiceQuotes.getRedisUtil().del(key);
|
// spectrumServiceQuotes.getRedisUtil().del(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package org.jeecg.modules.feignclient;
|
package org.jeecg.modules.feignclient;
|
||||||
|
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jeecg.common.api.vo.Result;
|
import org.jeecg.common.api.vo.Result;
|
||||||
import org.jeecg.common.constant.RedisConstant;
|
import org.jeecg.common.constant.RedisConstant;
|
||||||
import org.jeecg.common.system.util.JwtUtil;
|
import org.jeecg.common.system.util.JwtUtil;
|
||||||
|
@ -9,6 +10,7 @@ import org.jeecg.modules.base.dto.LoginResult;
|
||||||
import org.jeecg.modules.base.dto.LoginVo;
|
import org.jeecg.modules.base.dto.LoginVo;
|
||||||
import org.springframework.core.env.Environment;
|
import org.springframework.core.env.Environment;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
public class ManageUtil {
|
public class ManageUtil {
|
||||||
|
|
||||||
private static RedisUtil redisUtil;
|
private static RedisUtil redisUtil;
|
||||||
|
@ -31,20 +33,28 @@ public class ManageUtil {
|
||||||
* 登录运管系统 获取Token
|
* 登录运管系统 获取Token
|
||||||
* */
|
* */
|
||||||
public static String getToken(){
|
public static String getToken(){
|
||||||
if (redisUtil.hasKey(RedisConstant.MANAGE_TOKEN))
|
String token = null;
|
||||||
return (String) redisUtil.get(RedisConstant.MANAGE_TOKEN);
|
try {
|
||||||
|
if (redisUtil.hasKey(RedisConstant.MANAGE_TOKEN))
|
||||||
LoginVo loginVo = new LoginVo(username, password);
|
return (String) redisUtil.get(RedisConstant.MANAGE_TOKEN);
|
||||||
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
|
LoginVo loginVo = new LoginVo(username, password);
|
||||||
String token = loginRes.getResult().getToken();
|
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
|
||||||
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
|
token = loginRes.getResult().getToken();
|
||||||
return token;
|
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
|
||||||
|
return token;
|
||||||
|
}catch (RuntimeException e){
|
||||||
|
return token;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void refreshToken(){
|
public static void refreshToken(){
|
||||||
LoginVo loginVo = new LoginVo(username, password);
|
try {
|
||||||
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
|
LoginVo loginVo = new LoginVo(username, password);
|
||||||
String token = loginRes.getResult().getToken();
|
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
|
||||||
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
|
String token = loginRes.getResult().getToken();
|
||||||
|
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
|
||||||
|
}catch (RuntimeException e){
|
||||||
|
log.error("运管系统登录异常, Token刷新失败: {}", e.getMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ import cn.hutool.core.collection.ListUtil;
|
||||||
import cn.hutool.core.map.MapUtil;
|
import cn.hutool.core.map.MapUtil;
|
||||||
import cn.hutool.core.util.ObjectUtil;
|
import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jeecg.common.api.dto.message.MessageDTO;
|
import org.jeecg.common.api.dto.message.MessageDTO;
|
||||||
import org.jeecg.common.constant.SymbolConstant;
|
import org.jeecg.common.constant.SymbolConstant;
|
||||||
import org.jeecg.common.constant.enums.MessageTypeEnum;
|
import org.jeecg.common.constant.enums.MessageTypeEnum;
|
||||||
|
@ -27,6 +28,7 @@ import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
|
||||||
* @author nieziyan
|
* @author nieziyan
|
||||||
* @date 2023-06-21
|
* @date 2023-06-21
|
||||||
*/
|
*/
|
||||||
|
@Slf4j
|
||||||
@Component
|
@Component
|
||||||
public class SendMessage {
|
public class SendMessage {
|
||||||
|
|
||||||
|
@ -50,6 +52,7 @@ public class SendMessage {
|
||||||
*/
|
*/
|
||||||
public void send(MessageDTO messageDTO, String groupId, String notific){
|
public void send(MessageDTO messageDTO, String groupId, String notific){
|
||||||
Map<String, String> contact = getContact(groupId);
|
Map<String, String> contact = getContact(groupId);
|
||||||
|
if (MapUtil.isEmpty(contact)) return;
|
||||||
if (StrUtil.isBlank(notific)) return;
|
if (StrUtil.isBlank(notific)) return;
|
||||||
List<String> ways = ListUtil.toList(StrUtil.split(notific, COMMA));
|
List<String> ways = ListUtil.toList(StrUtil.split(notific, COMMA));
|
||||||
if (ways.contains(ALL.getValue()))
|
if (ways.contains(ALL.getValue()))
|
||||||
|
@ -96,26 +99,33 @@ public class SendMessage {
|
||||||
* @param groupId
|
* @param groupId
|
||||||
*/
|
*/
|
||||||
private Map<String, String> getContact(String groupId){
|
private Map<String, String> getContact(String groupId){
|
||||||
List<String> userIds = abnormalAlarmClient.userIds(groupId).getResult();
|
|
||||||
// 查询用户信息
|
|
||||||
List<SysUser> sysUsers = sysUserService.listByIds(userIds);
|
|
||||||
// 用户名
|
|
||||||
String usernameList = sysUsers.stream().map(SysUser::getUsername).filter(StrUtil::isNotBlank)
|
|
||||||
.collect(Collectors.joining(COMMA));
|
|
||||||
// 邮箱
|
|
||||||
String emailList = sysUsers.stream().map(SysUser::getEmail).filter(StrUtil::isNotBlank)
|
|
||||||
.collect(Collectors.joining(COMMA));
|
|
||||||
// 用户名-邮箱Map
|
|
||||||
Map<String, String> userEmail = sysUsers.stream()
|
|
||||||
.collect(Collectors.toMap(SysUser::getUsername, SysUser::getEmail));
|
|
||||||
// 手机号码
|
|
||||||
String phoneList = sysUsers.stream().map(SysUser::getPhone).filter(StrUtil::isNotBlank)
|
|
||||||
.collect(Collectors.joining(COMMA));
|
|
||||||
Map<String,String> result = new HashMap<>();
|
Map<String,String> result = new HashMap<>();
|
||||||
result.put(SYSTEM, usernameList);
|
try {
|
||||||
result.put(EMAIL, emailList);
|
List<String> userIds = abnormalAlarmClient.userIds(groupId).getResult();
|
||||||
result.put(SMS, phoneList);
|
// 查询用户信息
|
||||||
result.putAll(userEmail);
|
List<SysUser> sysUsers = sysUserService.listByIds(userIds);
|
||||||
return result;
|
// 用户名
|
||||||
|
String usernameList = sysUsers.stream().map(SysUser::getUsername).filter(StrUtil::isNotBlank)
|
||||||
|
.collect(Collectors.joining(COMMA));
|
||||||
|
// 邮箱
|
||||||
|
String emailList = sysUsers.stream().map(SysUser::getEmail).filter(StrUtil::isNotBlank)
|
||||||
|
.collect(Collectors.joining(COMMA));
|
||||||
|
// 用户名-邮箱Map
|
||||||
|
Map<String, String> userEmail = sysUsers.stream()
|
||||||
|
.filter(item -> StrUtil.isNotBlank(item.getEmail()))
|
||||||
|
.collect(Collectors.toMap(SysUser::getUsername, SysUser::getEmail));
|
||||||
|
// 手机号码
|
||||||
|
String phoneList = sysUsers.stream().map(SysUser::getPhone).filter(StrUtil::isNotBlank)
|
||||||
|
.collect(Collectors.joining(COMMA));
|
||||||
|
|
||||||
|
result.put(SYSTEM, usernameList);
|
||||||
|
result.put(EMAIL, emailList);
|
||||||
|
result.put(SMS, phoneList);
|
||||||
|
result.putAll(userEmail);
|
||||||
|
return result;
|
||||||
|
}catch (Exception e) {
|
||||||
|
log.error("获取收件人联系信息异常: {}", e.getMessage());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import feign.FeignException;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jeecg.common.api.dto.message.MessageDTO;
|
import org.jeecg.common.api.dto.message.MessageDTO;
|
||||||
|
@ -102,6 +103,10 @@ public class DatabaseJob extends Monitor implements Job{
|
||||||
}*/
|
}*/
|
||||||
// 向运管查询监控项数据
|
// 向运管查询监控项数据
|
||||||
String token = ManageUtil.getToken();
|
String token = ManageUtil.getToken();
|
||||||
|
if(StrUtil.isBlank(token)){
|
||||||
|
log.error("运管系统登录异常, Token获取失败");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
||||||
ItemHistory itemHistory = result.getResult();
|
ItemHistory itemHistory = result.getResult();
|
||||||
if (ObjectUtil.isNull(itemHistory)){
|
if (ObjectUtil.isNull(itemHistory)){
|
||||||
|
@ -141,9 +146,12 @@ public class DatabaseJob extends Monitor implements Job{
|
||||||
getSendMessage().send(messageDTO, groupId, notific);
|
getSendMessage().send(messageDTO, groupId, notific);
|
||||||
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
||||||
}
|
}
|
||||||
|
} catch (FeignException.Unauthorized e){
|
||||||
|
ManageUtil.refreshToken();
|
||||||
|
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
||||||
}catch (Exception e){
|
} catch (Exception e){
|
||||||
log.error("Database监控异常: {}", e.getMessage());
|
log.error("Database监控异常: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,6 +85,10 @@ public class ServerJob extends Monitor implements Job {
|
||||||
|
|
||||||
// 向运管查询监控项数据
|
// 向运管查询监控项数据
|
||||||
String token = ManageUtil.getToken();
|
String token = ManageUtil.getToken();
|
||||||
|
if(StrUtil.isBlank(token)){
|
||||||
|
log.error("运管系统登录异常, Token获取失败");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
||||||
ItemHistory itemHistory = result.getResult();
|
ItemHistory itemHistory = result.getResult();
|
||||||
if (ObjectUtil.isNull(itemHistory)){
|
if (ObjectUtil.isNull(itemHistory)){
|
||||||
|
@ -124,12 +128,12 @@ public class ServerJob extends Monitor implements Job {
|
||||||
getSendMessage().send(messageDTO, groupId, notific);
|
getSendMessage().send(messageDTO, groupId, notific);
|
||||||
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
||||||
}
|
}
|
||||||
}catch (FeignException.Unauthorized e){
|
} catch (FeignException.Unauthorized e){
|
||||||
ManageUtil.refreshToken();
|
ManageUtil.refreshToken();
|
||||||
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
|
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
||||||
}catch (Exception e){
|
} catch (Exception e){
|
||||||
log.error("Server监控异常: {}", e.getMessage());
|
log.error("Server监控异常: {}", e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjectUtil;
|
||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
|
import feign.FeignException;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.jeecg.common.api.dto.message.MessageDTO;
|
import org.jeecg.common.api.dto.message.MessageDTO;
|
||||||
|
@ -86,6 +87,10 @@ public class DatabaseJob extends Monitor {
|
||||||
|
|
||||||
// 向运管查询监控项数据
|
// 向运管查询监控项数据
|
||||||
String token = ManageUtil.getToken();
|
String token = ManageUtil.getToken();
|
||||||
|
if(StrUtil.isBlank(token)){
|
||||||
|
log.error("运管系统登录异常, Token获取失败");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
||||||
ItemHistory itemHistory = result.getResult();
|
ItemHistory itemHistory = result.getResult();
|
||||||
if (ObjectUtil.isNull(itemHistory)){
|
if (ObjectUtil.isNull(itemHistory)){
|
||||||
|
@ -125,11 +130,13 @@ public class DatabaseJob extends Monitor {
|
||||||
getSendMessage().send(messageDTO, groupId, notific);
|
getSendMessage().send(messageDTO, groupId, notific);
|
||||||
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
||||||
}
|
}
|
||||||
|
}catch (FeignException.Unauthorized e){
|
||||||
|
ManageUtil.refreshToken();
|
||||||
|
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.error("Database监控异常: {}", e.getMessage());
|
log.error("Database监控异常: {}", e.getMessage());
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destroy();
|
destroy();
|
||||||
|
|
|
@ -111,7 +111,6 @@ public class EmailJob extends Monitor{
|
||||||
log.error("Email预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
log.error("Email预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.error("Email监控异常: {}", e.getMessage());
|
log.error("Email监控异常: {}", e.getMessage());
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destroy();
|
destroy();
|
||||||
|
|
|
@ -84,6 +84,10 @@ public class ServerJob extends Monitor{
|
||||||
|
|
||||||
// 向运管查询监控项数据
|
// 向运管查询监控项数据
|
||||||
String token = ManageUtil.getToken();
|
String token = ManageUtil.getToken();
|
||||||
|
if(StrUtil.isBlank(token)){
|
||||||
|
log.error("运管系统登录异常, Token获取失败");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
|
||||||
ItemHistory itemHistory = result.getResult();
|
ItemHistory itemHistory = result.getResult();
|
||||||
if (ObjectUtil.isNull(itemHistory)){
|
if (ObjectUtil.isNull(itemHistory)){
|
||||||
|
@ -123,14 +127,13 @@ public class ServerJob extends Monitor{
|
||||||
getSendMessage().send(messageDTO, groupId, notific);
|
getSendMessage().send(messageDTO, groupId, notific);
|
||||||
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
getPushAppUtil().pushToSingle(messageDTO, groupId);
|
||||||
}
|
}
|
||||||
}catch (FeignException.Unauthorized e){
|
} catch (FeignException.Unauthorized e){
|
||||||
ManageUtil.refreshToken();
|
ManageUtil.refreshToken();
|
||||||
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
|
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
|
||||||
} catch (JsonProcessingException e) {
|
} catch (JsonProcessingException e) {
|
||||||
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
||||||
}catch (Exception e){
|
} catch (Exception e){
|
||||||
log.error("Server监控异常: {}", e.getMessage());
|
log.error("Server监控异常: {}", e.getMessage());
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destroy();
|
destroy();
|
||||||
|
|
|
@ -44,7 +44,7 @@ public class TableSpaceJob extends Monitor {
|
||||||
/**
|
/**
|
||||||
* 解析Oracle 表空间预警规则
|
* 解析Oracle 表空间预警规则
|
||||||
**/
|
**/
|
||||||
@Scheduled(cron = "${task.period-space:0 0 */1 * * ?}")
|
@Scheduled(cron = "${task.period-space:0 0/1 * * * ?}")
|
||||||
public void execute(){
|
public void execute(){
|
||||||
init();
|
init();
|
||||||
|
|
||||||
|
@ -124,7 +124,6 @@ public class TableSpaceJob extends Monitor {
|
||||||
log.error("Database-TableSpace预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
log.error("Database-TableSpace预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
|
||||||
}catch (Exception e){
|
}catch (Exception e){
|
||||||
log.error("Database-TableSpace监控异常: {}", e.getMessage());
|
log.error("Database-TableSpace监控异常: {}", e.getMessage());
|
||||||
e.printStackTrace();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
destroy();
|
destroy();
|
||||||
|
|
Loading…
Reference in New Issue
Block a user