Compare commits

..

No commits in common. "72dbf2ba80d0de1f5d5322534a7b422161d09129" and "85987985a11f2b58eb0f6b056f3f7916de76f752" have entirely different histories.

23 changed files with 203 additions and 319 deletions

View File

@ -13,10 +13,16 @@ public interface RedisConstant {
*/ */
String PREFIX_SILENCE = "SilenceCycle:"; String PREFIX_SILENCE = "SilenceCycle:";
String STREAM_ALARM = "Stream:Alarm";
String STREAM_ANALYSIS = "Stream:Analysis"; String STREAM_ANALYSIS = "Stream:Analysis";
String GROUP_ALARM = "Group_Alarm";
String GROUP_ANALYSIS = "Group_Analysis"; String GROUP_ANALYSIS = "Group_Analysis";
String CONSUMER_ALARM = "Consumer_Alarm";
String CONSUMER_ANALYSIS = "Consumer_Analysis"; String CONSUMER_ANALYSIS = "Consumer_Analysis";
String NUCLIDE_LINES_LIB = "Nuclide_Lines_Lib:"; String NUCLIDE_LINES_LIB = "Nuclide_Lines_Lib:";

View File

@ -9,8 +9,6 @@ import com.sun.mail.imap.IMAPStore;
import com.sun.mail.smtp.SMTPAddressFailedException; import com.sun.mail.smtp.SMTPAddressFailedException;
import io.swagger.models.auth.In; import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.Charsets;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.jeecg.common.api.dto.message.MessageDTO; import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.constant.RedisConstant; import org.jeecg.common.constant.RedisConstant;
@ -126,7 +124,7 @@ public class EmailServiceManager {
/** /**
* 接收邮件 * 接收邮件
*/ */
public Message[] receiveMail() throws MessagingException { public Message[] receiveMail() {
String status = EmailLogManager.STATUS_SUCCESS; String status = EmailLogManager.STATUS_SUCCESS;
try{ try{
//配置邮件服务属性 //配置邮件服务属性
@ -185,11 +183,12 @@ public class EmailServiceManager {
return Arrays.copyOfRange(messages,0,this.receiveNum-1); return Arrays.copyOfRange(messages,0,this.receiveNum-1);
} }
} }
} catch (MessagingException e){ }catch (MessagingException e){
status = EmailLogManager.STATUS_ERROR; status = EmailLogManager.STATUS_ERROR;
log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage()); log.error("Email connection is abnormal, account is {}, service is {},the reason is {}.",email.getName(),email.getEmailServerAddress(),e.getMessage());
throw e; e.printStackTrace();
} finally { return null;
}finally {
EmailLogEvent connectEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,email,status,EmailLogManager.CONNECT); EmailLogEvent connectEvent = new EmailLogEvent(EmailLogManager.GS_TYPE_GET,email,status,EmailLogManager.CONNECT);
EmailLogManager.getInstance().setConnectLogEvent(connectEvent); EmailLogManager.getInstance().setConnectLogEvent(connectEvent);
//GetAllId C++原业务是把远程邮箱邮件同步到C++本次java编写没有这一步所以和Connect绑定若Connect成功则GetAllId成功 //GetAllId C++原业务是把远程邮箱邮件同步到C++本次java编写没有这一步所以和Connect绑定若Connect成功则GetAllId成功
@ -533,8 +532,7 @@ public class EmailServiceManager {
File emlFile = null; File emlFile = null;
String status = EmailLogManager.STATUS_SUCCESS; String status = EmailLogManager.STATUS_SUCCESS;
Date receivedDate = null; Date receivedDate = null;
InputStream inputStream = null; FileOutputStream outputStream = null;
BufferedOutputStream outputStream = null;
try { try {
//获取发件人 //获取发件人
final String address = ((InternetAddress) message.getFrom()[0]).getAddress(); final String address = ((InternetAddress) message.getFrom()[0]).getAddress();
@ -568,19 +566,25 @@ public class EmailServiceManager {
final String rootPath = spectrumPathProperties.getRootPath(); final String rootPath = spectrumPathProperties.getRootPath();
final String emlPath = spectrumPathProperties.getEmlPath(); final String emlPath = spectrumPathProperties.getEmlPath();
emlFile = new File(rootPath+emlPath+File.separator+fileName); emlFile = new File(rootPath+emlPath+File.separator+fileName);
// outputStream = new FileOutputStream(emlFile); outputStream = new FileOutputStream(emlFile);
// message.writeTo(outputStream); message.writeTo(outputStream);
int bufferSize = 1024 * 1024; // 1M
inputStream = message.getInputStream();
outputStream = new BufferedOutputStream(new FileOutputStream(emlFile), bufferSize);
// 从邮件的输入流读取内容并写入到本地文件
byte[] buffer = new byte[bufferSize];
int bytesRead;
while ((bytesRead = inputStream.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
// int bufferSize = 1024 * 1024; // 1M
// InputStream inputStream = message.getInputStream();
// BufferedInputStream bufferedInputStream = new BufferedInputStream(inputStream, bufferSize);
// // 或者使用 BufferedOutputStream
// OutputStream outputStream = new FileOutputStream(emlFile);
// BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(outputStream, bufferSize);
// // 从邮件的输入流读取内容并写入到本地文件
// byte[] buffer = new byte[bufferSize];
// int bytesRead;
// while ((bytesRead = bufferedInputStream.read(buffer)) != -1) {
// bufferedOutputStream.write(buffer, 0, bytesRead);
// }
//
// // 关闭流
// bufferedInputStream.close();
// bufferedOutputStream.close();
} catch (MessagingException | IOException e) { } catch (MessagingException | IOException e) {
// 下载邮件失败 抛出自定义邮件下载异常 // 下载邮件失败 抛出自定义邮件下载异常
status = EmailLogManager.STATUS_ERROR; status = EmailLogManager.STATUS_ERROR;
@ -594,11 +598,7 @@ public class EmailServiceManager {
(Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath())); (Objects.isNull(emlFile)?" ":emlFile.getAbsolutePath()));
EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event); EmailLogManager.getInstance().offer(Thread.currentThread().getId(),event);
try { try {
if (Objects.nonNull(inputStream)) {
inputStream.close();
}
if (Objects.nonNull(outputStream)) { if (Objects.nonNull(outputStream)) {
outputStream.flush();
outputStream.close(); outputStream.close();
} }
} catch (IOException e) { } catch (IOException e) {
@ -669,7 +669,7 @@ public class EmailServiceManager {
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0; int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0;
// exist = redisUtil.hasKey(key); // exist = redisUtil.hasKey(key);
if(numberKey >= taskProperties.getForceDeletedNumber()){ if(numberKey > taskProperties.getForceDeletedNumber()){
exist = true; exist = true;
log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss")); log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
message.setFlag(Flags.Flag.DELETED,true); message.setFlag(Flags.Flag.DELETED,true);

View File

@ -1,20 +1,13 @@
package org.jeecg.common.util; package org.jeecg.common.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import org.jeecg.common.api.dto.message.MessageDTO; import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.util.dynamic.db.FreemarkerParseFactory; import org.jeecg.common.util.dynamic.db.FreemarkerParseFactory;
import org.jeecg.modules.base.entity.postgre.SysMessageTemplate; import org.jeecg.modules.base.entity.postgre.SysMessageTemplate;
import org.jeecg.modules.base.service.ISysMessageTemplateService; import org.jeecg.modules.base.service.ISysMessageTemplateService;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/* /*
* 模板工具类 * 模板工具类
@ -44,30 +37,6 @@ public class TemplateUtil {
return messageDTO; return messageDTO;
} }
public static MessageDTO parse1(String code, Map<String, Object> data){
MessageDTO messageDTO = new MessageDTO();
SysMessageTemplate template = templateService.getOne(code);
// 如果没有消息模板
if(ObjectUtil.isNull(template))
return messageDTO;
String templateName = template.getTemplateName();
String templateContent = template.getTemplateContent();
messageDTO.setTitle(templateName);
if (MapUtil.isEmpty(data))
return messageDTO;
Set<String> keys = data.keySet();
String pattern = "\\<([^<>]*{}[^<>]*)\\>";
List<String> contents = new ArrayList<>();
for (String key : keys) {
contents.add(ReUtil.getGroup1(StrUtil.format(pattern, key), templateContent));
}
templateContent = CollUtil.join(contents, "#");
String content = FreemarkerParseFactory
.parseTemplateContent(templateContent, data, true);
messageDTO.setContent(content);
return messageDTO;
}
public static MessageDTO parse(String title, String code, Map<String, Object> data) { public static MessageDTO parse(String title, String code, Map<String, Object> data) {
MessageDTO messageDTO = new MessageDTO(); MessageDTO messageDTO = new MessageDTO();
SysMessageTemplate template = templateService.getOne(code); SysMessageTemplate template = templateService.getOne(code);
@ -84,28 +53,4 @@ public class TemplateUtil {
messageDTO.setContent(content); messageDTO.setContent(content);
return messageDTO; return messageDTO;
} }
public static MessageDTO parse1(String title, String code, Map<String, Object> data) {
MessageDTO messageDTO = new MessageDTO();
SysMessageTemplate template = templateService.getOne(code);
// 如果没有消息模板
if(ObjectUtil.isNull(template))
return messageDTO;
String templateName = template.getTemplateName();
String templateContent = template.getTemplateContent();
messageDTO.setTitle(StrUtil.isBlank(title) ? templateName : title);
if (MapUtil.isEmpty(data))
return messageDTO;
Set<String> keys = data.keySet();
String pattern = "\\<([^<>]*{}[^<>]*)\\>";
List<String> contents = new ArrayList<>();
for (String key : keys) {
contents.add(ReUtil.getGroup1(StrUtil.format(pattern, key), templateContent));
}
templateContent = CollUtil.join(contents, "#");
String content = FreemarkerParseFactory
.parseTemplateContent(templateContent, data, true);
messageDTO.setContent(content);
return messageDTO;
}
} }

View File

@ -6,8 +6,8 @@ import java.util.Comparator;
public class FileComparator implements Comparator<FileDto> { public class FileComparator implements Comparator<FileDto> {
private String field; private final String field;
private String order; private final String order;
public FileComparator(String field, String order) { public FileComparator(String field, String order) {
this.field = field; this.field = field;

View File

@ -3,12 +3,8 @@ package org.jeecg.modules.base.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data; import lombok.Data;
import org.jeecg.common.util.NumUtil;
import java.io.Serializable; import java.io.Serializable;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@Data @Data
@JsonIgnoreProperties(ignoreUnknown = true) @JsonIgnoreProperties(ignoreUnknown = true)
@ -21,8 +17,4 @@ public class NuclideInfo implements Serializable {
private String datasource; private String datasource;
private String value; private String value;
public void keepSix(){
this.value = NumUtil.keepStr(this.value, 6);
}
} }

View File

@ -9,7 +9,7 @@ import lombok.Getter;
public enum Condition { public enum Condition {
FIRST_FOUND("1"), ABOVE_AVERAGE("2"), MEANWHILE("3"); FIRST_FOUND("1"), ABOVE_AVERAGE("2"), MEANWHILE("3");
private final String value; private String value;
public static Condition valueOf1(String value){ public static Condition valueOf1(String value){
for (Condition condition : Condition.values()) { for (Condition condition : Condition.values()) {

View File

@ -1,14 +1,10 @@
package org.jeecg.modules.base.enums; package org.jeecg.modules.base.enums;
import cn.hutool.core.util.StrUtil;
import lombok.Getter;
/** /**
* @author nieziyan * @author nieziyan
* 数据源类型 * 数据源类型
*/ */
@Getter
public enum DSType { public enum DSType {
/* ARMD自动处理库 */ /* ARMD自动处理库 */
ARMDARR("1"), ARMDARR("1"),
@ -19,17 +15,13 @@ public enum DSType {
/* IDC人工交互库 */ /* IDC人工交互库 */
IDCRRR("4"); IDCRRR("4");
DSType(String type) { DSType(java.lang.String type) {
this.type = type; this.type = type;
} }
private final String type; private String type;
public static String typeOf(String type){ public String getType() {
for (DSType dsType : DSType.values()) { return type;
if (StrUtil.equals(type, dsType.getType()))
return dsType.name();
}
return null;
} }
} }

View File

@ -5,10 +5,8 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil; import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrBuilder;
import cn.hutool.core.util.NumberUtil; import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
@ -18,15 +16,14 @@ import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.SymbolConstant; import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.constant.enums.SampleType; import org.jeecg.common.constant.enums.SampleType;
import org.jeecg.common.util.*; import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.dynamic.db.FreemarkerParseFactory; import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.NuclideInfo; import org.jeecg.modules.base.dto.NuclideInfo;
import org.jeecg.modules.base.dto.Info; import org.jeecg.modules.base.dto.Info;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog; import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg; import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisRule; import org.jeecg.modules.base.entity.postgre.AlarmAnalysisRule;
import org.jeecg.modules.base.enums.Condition; import org.jeecg.modules.base.enums.Condition;
import org.jeecg.modules.base.enums.DSType;
import org.jeecg.modules.feignclient.SystemClient; import org.jeecg.modules.feignclient.SystemClient;
import org.jeecg.modules.service.AnalysisResultService; import org.jeecg.modules.service.AnalysisResultService;
import org.jeecg.modules.service.IAlarmAnalysisLogService; import org.jeecg.modules.service.IAlarmAnalysisLogService;
@ -39,14 +36,9 @@ import org.springframework.stereotype.Component;
import static org.jeecg.common.constant.enums.MessageTypeEnum.*; import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
import static org.jeecg.common.util.TokenUtils.getTempToken; import static org.jeecg.common.util.TokenUtils.getTempToken;
import static org.jeecg.modules.base.enums.Template.ANALYSIS_NUCLIDE;
import static org.jeecg.modules.base.enums.Template.MONITOR_EMAIL;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.*; import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@Data @Data
@ -79,7 +71,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
try { try {
String streamKey = message.getStream(); String streamKey = message.getStream();
init(); init();
/* /**
* 新消息在未进行ACK之前,状态也为pending, * 新消息在未进行ACK之前,状态也为pending,
* 直接消费所有异常未确认的消息和新消息 * 直接消费所有异常未确认的消息和新消息
*/ */
@ -94,10 +86,10 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
// 消费完成后,手动确认消费消息[消息消费成功] // 消费完成后,手动确认消费消息[消息消费成功]
// 否则就是消费抛出异常,进入pending_ids[消息消费失败] // 否则就是消费抛出异常,进入pending_ids[消息消费失败]
redisStreamUtil.ack(streamKey, groupName, recordId.getValue()); redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
// 手动删除已消费消息 // TODO del 取消手动删除已消费消息
redisStreamUtil.del(streamKey, recordId.getValue()); // redisStreamUtil.del(streamKey, recordId.getValue());
} }
}catch (Exception e){ }catch (RuntimeException e){
log.error("AnalysisConsumer消费异常: {}", e.getMessage()); log.error("AnalysisConsumer消费异常: {}", e.getMessage());
}finally { }finally {
destroy(); destroy();
@ -117,8 +109,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
for (AlarmAnalysisRule rule : rules) { for (AlarmAnalysisRule rule : rules) {
// 当前规则是否有报警条件 // 当前规则是否有报警条件
String conditionStr = rule.getConditions(); String conditionStr = rule.getConditions();
if (StrUtil.isBlank(conditionStr)) if (StrUtil.isBlank(conditionStr)) continue;
continue;
// 是否在当前规则关注的台站列表内 // 是否在当前规则关注的台站列表内
String stations = rule.getStations(); String stations = rule.getStations();
if (!StrUtil.contains(stations, stationId)) if (!StrUtil.contains(stations, stationId))
@ -146,71 +137,74 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
info.setRuleId(rule.getId()); info.setRuleId(rule.getId());
info.setGroupId(rule.getContactGroup()); info.setGroupId(rule.getContactGroup());
info.setConditions(rule.getConditions()); info.setConditions(rule.getConditions());
judge(info, nuclidesCross); judge(info,nuclidesCross);
} }
} }
private void judge(Info info, Map<String,String> nuclidesCross){ private void judge(Info info, Map<String,String> nuclidesCross){
Set<String> nuclideNames = nuclidesCross.keySet(); Set<String> nuclideNames = nuclidesCross.keySet();
StringBuilder alarmInfo = new StringBuilder();
List<String> firstDetected;
List<NuclideInfo> moreThanAvg = new ArrayList<>();
String conditionStr = info.getConditions(); String conditionStr = info.getConditions();
String betaOrGamma = info.getBetaOrGamma(); String betaOrGamma = info.getBetaOrGamma();
String datasource = info.getDatasource(); String datasource = info.getDatasource();
String stationId = info.getStationId();
// 获取谱文件采样日期 如果为null 则默认为LocalDate.now()
LocalDate collDate = ObjectUtil.isNull(info.getCollectionDate()) ? LocalDate.now() :
info.getCollectionDate().toLocalDate();
List<String> conditions = ListUtil.toList(conditionStr.split(COMMA)); List<String> conditions = ListUtil.toList(conditionStr.split(COMMA));
List<String> firstDetected = new ArrayList<>(); // 首次发现
List<NuclideInfo> moreThanAvg = new ArrayList<>(); // 超浓度均值
List<String> meanwhile = new ArrayList<>(); // 同时出现两种及以上核素
for (String con : conditions) { for (String con : conditions) {
Condition condition = Condition.valueOf1(con); Condition condition = Condition.valueOf1(con);
if (ObjectUtil.isNull(condition)) continue; if (ObjectUtil.isNotNull(condition)){
switch (condition){ switch (condition){
case FIRST_FOUND: // 首次发现该元素 case FIRST_FOUND: // 首次发现该元素
firstDetected = firstDetected(betaOrGamma, datasource, nuclideNames); firstDetected = firstDetected(betaOrGamma,datasource,nuclideNames);
break; if (CollUtil.isNotEmpty(firstDetected)){
case ABOVE_AVERAGE: // 元素浓度高于均值 String message = "First discovery of nuclides: [" + StrUtil.join(COMMA,firstDetected) + "]";
moreThanAvg = moreThanAvg(datasource, stationId, collDate, nuclidesCross); alarmInfo.append(message);
break; }
case MEANWHILE: // 同时出现两种及以上核素 break;
if (CollUtil.isNotEmpty(nuclideNames) && nuclideNames.size() >= 2) case ABOVE_AVERAGE: // 元素浓度高于均值
meanwhile.addAll(nuclideNames); moreThanAvg = moreThanAvg(datasource,nuclidesCross);
break; if (CollUtil.isNotEmpty(moreThanAvg)){
default: for (NuclideInfo nuclideInfo : moreThanAvg) {
break; String nuclide = nuclideInfo.getNuclide();
String threshold = nuclideInfo.getThreshold();
String message = "Nuclide " + nuclide + "is above average: " + threshold;
alarmInfo.append(COMMA).append(message);
}
}
break;
case MEANWHILE: // 同时出现两种及以上核素
if (nuclideNames.size() >= 2){
String message = "Simultaneously detecting nuclides: [" + StrUtil.join(COMMA,nuclideNames) + "]";
alarmInfo.append(COMMA).append(message);
}
break;
default:
break;
}
} }
} }
// 构建预警信息 if (StrUtil.isNotBlank(alarmInfo.toString())){
DataTool dataTool = DataTool.getInstance(); // 保存报警日志
if (CollUtil.isNotEmpty(firstDetected)) AlarmAnalysisLog logInfo = new AlarmAnalysisLog();
dataTool.put("firstDetected", CollUtil.join(firstDetected, StrUtil.COMMA + StrUtil.SPACE)); BeanUtil.copyProperties(info,logInfo);
if (CollUtil.isNotEmpty(moreThanAvg)){ SampleType sampleType = SampleType.typeOf(betaOrGamma);
String above = moreThanAvg.stream() if (ObjectUtil.isNotNull(sampleType))
.map(item -> item.getNuclide() + "(" + item.getValue() + ")" + " > " + item.getThreshold()) logInfo.setSampleType(sampleType.getValue());
.collect(Collectors.joining(StrUtil.COMMA + StrUtil.SPACE)); if (alarmInfo.toString().startsWith(COMMA))
dataTool.put("moreThanAvg", above); alarmInfo = new StringBuilder(StrUtil.sub(alarmInfo.toString(), 1, alarmInfo.length()));
logInfo.setAlarmInfo(alarmInfo.toString());
if (CollUtil.isNotEmpty(moreThanAvg))
logInfo.setNuclideInfoList(moreThanAvg);
logService.saveLog(logInfo);
// 发送报警信息
String groupId = info.getGroupId();
MessageDTO messageDTO = new MessageDTO();
messageDTO.setTitle("Nuclied Warn Info").setContent(alarmInfo.toString());
if (StrUtil.isNotBlank(groupId)) {
systemClient.sendMessage(messageDTO, groupId, ALL.getValue());
systemClient.pushMessageToSingle(messageDTO, groupId);
}
} }
if (CollUtil.isNotEmpty(meanwhile))
dataTool.put("meanwhile", CollUtil.join(meanwhile, StrUtil.COMMA + StrUtil.SPACE));
// 如果报警数据为空 则不需要发送报警信息和生成报警日志
if (MapUtil.isEmpty(dataTool.get())) return;
MessageDTO messageDTO = TemplateUtil.parse1(ANALYSIS_NUCLIDE.getCode(), dataTool.get());
// 保存报警日志
AlarmAnalysisLog logInfo = new AlarmAnalysisLog();
BeanUtil.copyProperties(info, logInfo);
SampleType sampleType = SampleType.typeOf(betaOrGamma);
if (ObjectUtil.isNotNull(sampleType))
logInfo.setSampleType(sampleType.getValue());
logInfo.setAlarmInfo(messageDTO.getContent());
if (CollUtil.isNotEmpty(moreThanAvg))
logInfo.setNuclideInfoList(moreThanAvg);
logService.saveLog(logInfo);
// 发送报警信息
String groupId = info.getGroupId();
systemClient.sendMessage(messageDTO, groupId, ALL.getValue());
systemClient.pushMessageToSingle(messageDTO, groupId);
} }
/** /**
@ -228,12 +222,12 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
/** /**
* 核素值大于历史浓度均值 * 核素值大于历史浓度均值
*/ */
private List<NuclideInfo> moreThanAvg(String dataSourceType, String stationId, private List<NuclideInfo> moreThanAvg(String dataSourceType,
LocalDate collDate, Map<String,String> nuclidesCross){ Map<String,String> nuclidesCross){
List<NuclideInfo> nuclideInfos = new ArrayList<>(); List<NuclideInfo> nuclideInfos = new ArrayList<>();
Set<String> nuclideNames = nuclidesCross.keySet(); Set<String> nuclideNames = nuclidesCross.keySet();
Map<String, String> nuclideAvgs = nuclideAvgService Map<String, String> nuclideAvgs = nuclideAvgService
.list(nuclideNames, dataSourceType, stationId, collDate).stream() .list(nuclideNames, dataSourceType).stream()
.collect(Collectors.toMap(AlarmAnalysisNuclideAvg::getNuclide, .collect(Collectors.toMap(AlarmAnalysisNuclideAvg::getNuclide,
AlarmAnalysisNuclideAvg::getVal)); AlarmAnalysisNuclideAvg::getVal));
for (Map.Entry<String, String> nuclide : nuclidesCross.entrySet()) { for (Map.Entry<String, String> nuclide : nuclidesCross.entrySet()) {
@ -252,27 +246,41 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
NuclideInfo nuclideInfo = new NuclideInfo(); NuclideInfo nuclideInfo = new NuclideInfo();
nuclideInfo.setNuclide(nuclideName); nuclideInfo.setNuclide(nuclideName);
nuclideInfo.setThreshold(avg.toString()); nuclideInfo.setThreshold(avg.toString());
nuclideInfo.setDatasource(DSType.typeOf(dataSourceType)); nuclideInfo.setDatasource(type(dataSourceType));
// 对浓度值保留五位小数 nuclideInfo.setValue(conc.toString());
nuclideInfo.setValue(NumUtil.keepStr(concValue, 5));
nuclideInfos.add(nuclideInfo); nuclideInfos.add(nuclideInfo);
} }
return nuclideInfos; return nuclideInfos;
} }
private String type(String dataSourceType){
switch (dataSourceType){
case CommonConstant.ARMDARR:
return "ARMDARR";
case CommonConstant.ARMDRRR:
return "ARMDRRR";
case CommonConstant.IDCARR:
return "IDCARR";
case CommonConstant.IDCRRR:
return "IDCRRR";
default:
return null;
}
}
private void init() { private void init() {
// start 生成临时Token到线程中 // start:生成临时Token到线程中
UserTokenContext.setToken(getTempToken()); UserTokenContext.setToken(getTempToken());
systemClient = SpringContextUtils.getBean(SystemClient.class); systemClient = SpringContextUtils.getBean(SystemClient.class);
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class); redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
logService = SpringContextUtils.getBean(IAlarmAnalysisLogService.class); logService = SpringContextUtils.getBean(IAlarmAnalysisLogService.class);
ruleService = SpringContextUtils.getBean(IAlarmAnalysisRuleService.class);
analysisResultService = SpringContextUtils.getBean(AnalysisResultService.class); analysisResultService = SpringContextUtils.getBean(AnalysisResultService.class);
ruleService = SpringContextUtils.getBean(IAlarmAnalysisRuleService.class);
nuclideAvgService = SpringContextUtils.getBean(IAlarmAnalysisNuclideAvgService.class); nuclideAvgService = SpringContextUtils.getBean(IAlarmAnalysisNuclideAvgService.class);
} }
private void destroy(){ private void destroy(){
// end 删除临时Token // end:删除临时Token
UserTokenContext.remove(); UserTokenContext.remove();
} }
} }

View File

@ -34,6 +34,10 @@ public class RedisStreamConfig {
// 每次轮询取几条消息 // 每次轮询取几条消息
private final Integer maxMsg = 10; private final Integer maxMsg = 10;
private final String alarmKey = RedisConstant.STREAM_ALARM;
private final String alarmGroup = RedisConstant.GROUP_ALARM;
private final String alarmConsumer = RedisConstant.CONSUMER_ALARM;
private final String analysisKey = RedisConstant.STREAM_ANALYSIS; private final String analysisKey = RedisConstant.STREAM_ANALYSIS;
private final String analysisGroup = RedisConstant.GROUP_ANALYSIS; private final String analysisGroup = RedisConstant.GROUP_ANALYSIS;
private final String analysisConsumer = RedisConstant.CONSUMER_ANALYSIS; private final String analysisConsumer = RedisConstant.CONSUMER_ANALYSIS;
@ -99,7 +103,7 @@ public class RedisStreamConfig {
// 独立消费 // 独立消费
/*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), /*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
new ConsumeListener("独立消费", null, null));*/ new ConsumeListener("独立消费", null, null));*/
// 非独立消费
/* /*
register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用 register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用
这种方式适用于长期运行的消息消费者它会持续监听Redis Stream并处理新到达的消息 这种方式适用于长期运行的消息消费者它会持续监听Redis Stream并处理新到达的消息
@ -107,28 +111,24 @@ public class RedisStreamConfig {
receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间 receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间
这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务 这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务
**/ **/
/* 1.需要手动确认消费消息 */ // 注册消费组A中的消费者A1,手动ACK
// 1.1 使用 register 方式 /*ConsumerStreamReadRequest<String> readA1 = StreamMessageListenerContainer
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer
.StreamReadRequest .StreamReadRequest
.builder(StreamOffset.create(analysisKey, ReadOffset.lastConsumed())) .builder(StreamOffset.create(warnKey, ReadOffset.lastConsumed()))
.consumer(Consumer.from(analysisGroup, analysisConsumer)) .consumer(Consumer.from(groupWarnA, consumerWarnA1))
// 手动确认消费了消息 默认为自动确认消息
.autoAcknowledge(false) .autoAcknowledge(false)
// 如果消费者发生了异常 不禁止消费者消费 默认为禁止 // 如果消费者发生了异常,是否禁止消费者消费
.cancelOnError(throwable -> false) .cancelOnError(throwable -> false)
.build(); .build();
AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup, analysisConsumer); ConsumeA1 consumeA1 = new ConsumeA1(groupWarnA, consumerWarnA1);
streamMessageListenerContainer.register(readRequest, analysis); streamMessageListenerContainer.register(readA1, consumeA1);*/
// 1.2 使用 receive 方式 AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup,analysisConsumer);
/*AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup, analysisConsumer);
streamMessageListenerContainer.receive(Consumer.from(analysisGroup, analysisConsumer), streamMessageListenerContainer.receive(Consumer.from(analysisGroup, analysisConsumer),
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);*/ StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);
/* 2.自动确认消费消息 */ // 创建消费组A中的消费者A2,自动ACK
// 2.1 使用 receive 方式 /* ConsumeA2 consumeA2 = new ConsumeA2(consumerWarnA2);
/*AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup,analysisConsumer); streamMessageListenerContainer.receiveAutoAck(Consumer.from(groupWarnA, consumerWarnA2),
streamMessageListenerContainer.receiveAutoAck(Consumer.from(analysisGroup, analysisConsumer), StreamOffset.create(warnKey, ReadOffset.lastConsumed()), consumeA2);*/
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);*/
return streamMessageListenerContainer; return streamMessageListenerContainer;
} }
} }

View File

@ -6,14 +6,12 @@ import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
import com.baomidou.mybatisplus.extension.service.IService; import com.baomidou.mybatisplus.extension.service.IService;
import org.jeecg.modules.base.bizVo.NuclideAvgVo; import org.jeecg.modules.base.bizVo.NuclideAvgVo;
import java.time.LocalDate;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
public interface IAlarmAnalysisNuclideAvgService extends IService<AlarmAnalysisNuclideAvg> { public interface IAlarmAnalysisNuclideAvgService extends IService<AlarmAnalysisNuclideAvg> {
List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames, String dataSourceType, List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames,String dataSourceType);
String stationId, LocalDate collDate);
Page<NuclideAvgDto> findPage(NuclideAvgVo nuclideAvgVo); Page<NuclideAvgDto> findPage(NuclideAvgVo nuclideAvgVo);
} }

View File

@ -75,7 +75,6 @@ public class AlarmAnalysisLogServiceImpl extends ServiceImpl<AlarmAnalysisLogMap
try { try {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
List<NuclideInfo> nuclideInfos = mapper.readValue(nuclideInfo, new TypeReference<List<NuclideInfo>>() {}); List<NuclideInfo> nuclideInfos = mapper.readValue(nuclideInfo, new TypeReference<List<NuclideInfo>>() {});
nuclideInfos.forEach(NuclideInfo::keepSix);
logDto.setNuclideList(nuclideInfos); logDto.setNuclideList(nuclideInfos);
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
log.error("NuclideInfo解析异常: {}", e.getMessage()); log.error("NuclideInfo解析异常: {}", e.getMessage());

View File

@ -15,7 +15,6 @@ import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -28,15 +27,13 @@ public class AlarmAnalysisNuclideAvgServiceImpl extends ServiceImpl<AlarmAnalysi
private SystemClient systemClient; private SystemClient systemClient;
@Override @Override
public List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames, String dataSourceType, public List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames,String dataSourceType) {
String stationId, LocalDate collDate) { LocalDate dayAgo = LocalDate.now().minusDays(1);
LocalDate dayAgo = collDate.minusDays(1);
LambdaQueryWrapper<AlarmAnalysisNuclideAvg> wrapper = new LambdaQueryWrapper<>(); LambdaQueryWrapper<AlarmAnalysisNuclideAvg> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(AlarmAnalysisNuclideAvg::getStationId, stationId); wrapper.eq(AlarmAnalysisNuclideAvg::getDataSourceType,dataSourceType);
wrapper.eq(AlarmAnalysisNuclideAvg::getDataSourceType, dataSourceType); wrapper.eq(AlarmAnalysisNuclideAvg::getCaclDate,dayAgo);
wrapper.eq(AlarmAnalysisNuclideAvg::getCaclDate, dayAgo);
wrapper.in(AlarmAnalysisNuclideAvg::getNuclide,nuclideNames); wrapper.in(AlarmAnalysisNuclideAvg::getNuclide,nuclideNames);
return this.list(wrapper); return list(wrapper);
} }
@Override @Override

View File

@ -233,15 +233,9 @@ public class AutoProcessManager{
if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){ if(databaseEmail.getEnabled().equals(SysMailEnableType.ENABLE.getMailEnableType())){
final boolean testFlag = testConnectEmailServer(databaseEmail); final boolean testFlag = testConnectEmailServer(databaseEmail);
if(testFlag){ if(testFlag){
if (emailExecThreadMap.containsKey(databaseEmail.getId())) { databaseEmail.setNewEmailFlag(true);
EmailParsingActuator actuator = emailExecThreadMap.get(databaseEmail.getId());
actuator.setStop(false);
log.info("{}邮箱重新加入监测队列",databaseEmail.getUsername());
} else {
databaseEmail.setNewEmailFlag(true);
log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername());
}
emailMap.put(databaseEmail.getId(),databaseEmail); emailMap.put(databaseEmail.getId(),databaseEmail);
log.info("{}邮箱加入监测队列,设置新增标记",databaseEmail.getUsername());
} }
} }
} }
@ -285,7 +279,6 @@ public class AutoProcessManager{
if(next.getValue().getState() == State.TERMINATED){ if(next.getValue().getState() == State.TERMINATED){
log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername()); log.info("{}邮箱执行线程已停止emailExecThreadMap删除此邮箱数据",next.getValue().getEmailProperties().getUsername());
checkStopThreads.remove(); checkStopThreads.remove();
emailMap.remove(next.getKey());
} }
} }

View File

@ -17,7 +17,10 @@ import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import javax.mail.Message; import javax.mail.Message;
import javax.mail.MessagingException; import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage; import javax.mail.internet.MimeMessage;
import java.util.*; import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*; import java.util.concurrent.*;
/** /**
@ -76,17 +79,11 @@ public class EmailParsingActuator extends Thread{
try { try {
Message[] messages = emailServiceManager.receiveMail(); Message[] messages = emailServiceManager.receiveMail();
if(ArrayUtils.isNotEmpty(messages)){ if(ArrayUtils.isNotEmpty(messages)){
log.info("EmailParsingActuator本次{}获取邮件数量为:{}", Thread.currentThread().getName(), messages.length); log.info("EmailParsingActuator本次获取邮件数量为:{}",messages.length);
//检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除 //检验获取的邮件是否在之前删除失败列表中若在直接调用邮件API删除并且此次数组里元素也删除
for(int i=messages.length-1;i>=0;i--){ for(int i=messages.length-1;i>=0;i--){
if (null == messages[i].getHeader("Message-ID")) {
System.out.println("Message ID是空值信息");
messages = ArrayUtils.remove(messages, i);
continue;
}
if (!messages[i].isExpunged()){ if (!messages[i].isExpunged()){
String messageId = ((MimeMessage) messages[i]).getMessageID(); String messageId = ((MimeMessage) messages[i]).getMessageID();
System.out.println("正常获取到的Message ID是"+messageId);
final boolean exist = emailServiceManager.check(messages[i],messageId); final boolean exist = emailServiceManager.check(messages[i],messageId);
messageIds.add(messageId); messageIds.add(messageId);
if(exist){ if(exist){
@ -108,13 +105,10 @@ public class EmailParsingActuator extends Thread{
taskLatch.await(); taskLatch.await();
} }
} }
}catch (InterruptedException e) {
e.printStackTrace();
} catch (MessagingException e) { } catch (MessagingException e) {
System.out.println("捕获MessagingException");
closeResource();
throw new RuntimeException(e); throw new RuntimeException(e);
} catch (Exception e) {
closeResource();
log.error(""+e);
} finally { } finally {
//清除本批次邮件日志缓存 //清除本批次邮件日志缓存
EmailLogManager.getInstance().clear(); EmailLogManager.getInstance().clear();

View File

@ -97,6 +97,9 @@ public class SpectrumParsingActuator implements Runnable{
String emlName = subject+ StringConstant.UNDER_LINE+ receiveDate; String emlName = subject+ StringConstant.UNDER_LINE+ receiveDate;
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId; String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
// spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime); // spectrumServiceQuotes.getRedisUtil().set(key,emlName,expiryTime);
//判断当前key的下载次数是否超过限制次数
spectrumServiceQuotes.getRedisUtil().incr(key, 1L);
spectrumServiceQuotes.getRedisUtil().expire(key, expiryTime);
//线程开始初始化时初始本线程负责的能谱日志事件 //线程开始初始化时初始本线程负责的能谱日志事件
SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null); SpectrumLogManager.mailSpectrumLogManager.offer(Thread.currentThread().getId(),null);
@ -123,7 +126,6 @@ public class SpectrumParsingActuator implements Runnable{
try { try {
//开始解析 //开始解析
spectrumHandler.handler(); spectrumHandler.handler();
spectrumServiceQuotes.getRedisUtil().del(key);
} catch (Exception e) { } catch (Exception e) {
//如果是gamma谱的分析异常 //如果是gamma谱的分析异常
if (e instanceof AnalySpectrumException) { if (e instanceof AnalySpectrumException) {
@ -137,14 +139,12 @@ public class SpectrumParsingActuator implements Runnable{
throw e; throw e;
} }
} }
}else{ }else{
log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject); log.warn("This email {} parsing failed and is not listed in the Met, Alert, SOH, Sample, Detbkphd, QC, Gasbkphd spectra.",subject);
} }
emailServiceManager.removeMail(message,batchesCounter); emailServiceManager.removeMail(message,batchesCounter);
} else { }else {
//判断当前key的下载次数是否超过限制次数
spectrumServiceQuotes.getRedisUtil().incr(key, 1L);
spectrumServiceQuotes.getRedisUtil().expire(key, expiryTime);
// 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError // 如果邮件内容校验失败(邮件内容不完整) 将错误邮件从eml移动到emlError
if (Objects.nonNull(emlFile) && emlFile.exists()){ if (Objects.nonNull(emlFile) && emlFile.exists()){
moveEmail(emlFile, key); moveEmail(emlFile, key);
@ -207,7 +207,7 @@ public class SpectrumParsingActuator implements Runnable{
final String finalPath = rootPath+emlErrorPath; final String finalPath = rootPath+emlErrorPath;
FileOperation.moveFile(emlFile,finalPath,true); FileOperation.moveFile(emlFile,finalPath,true);
// 删除 key防止下次线程执行删除邮件 // 删除 key防止下次线程执行删除邮件
// spectrumServiceQuotes.getRedisUtil().del(key); spectrumServiceQuotes.getRedisUtil().del(key);
} }
} }

View File

@ -1,6 +1,5 @@
package org.jeecg.modules.feignclient; package org.jeecg.modules.feignclient;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result; import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.RedisConstant; import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.system.util.JwtUtil; import org.jeecg.common.system.util.JwtUtil;
@ -10,7 +9,6 @@ import org.jeecg.modules.base.dto.LoginResult;
import org.jeecg.modules.base.dto.LoginVo; import org.jeecg.modules.base.dto.LoginVo;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
@Slf4j
public class ManageUtil { public class ManageUtil {
private static RedisUtil redisUtil; private static RedisUtil redisUtil;
@ -33,28 +31,20 @@ public class ManageUtil {
* 登录运管系统 获取Token * 登录运管系统 获取Token
* */ * */
public static String getToken(){ public static String getToken(){
String token = null; if (redisUtil.hasKey(RedisConstant.MANAGE_TOKEN))
try { return (String) redisUtil.get(RedisConstant.MANAGE_TOKEN);
if (redisUtil.hasKey(RedisConstant.MANAGE_TOKEN))
return (String) redisUtil.get(RedisConstant.MANAGE_TOKEN); LoginVo loginVo = new LoginVo(username, password);
LoginVo loginVo = new LoginVo(username, password); Result<LoginResult> loginRes = monitorSystem.login(loginVo);
Result<LoginResult> loginRes = monitorSystem.login(loginVo); String token = loginRes.getResult().getToken();
token = loginRes.getResult().getToken(); redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10); return token;
return token;
}catch (RuntimeException e){
return token;
}
} }
public static void refreshToken(){ public static void refreshToken(){
try { LoginVo loginVo = new LoginVo(username, password);
LoginVo loginVo = new LoginVo(username, password); Result<LoginResult> loginRes = monitorSystem.login(loginVo);
Result<LoginResult> loginRes = monitorSystem.login(loginVo); String token = loginRes.getResult().getToken();
String token = loginRes.getResult().getToken(); redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
}catch (RuntimeException e){
log.error("运管系统登录异常, Token刷新失败: {}", e.getMessage());
}
} }
} }

View File

@ -4,7 +4,6 @@ import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil; import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.dto.message.MessageDTO; import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.constant.SymbolConstant; import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.constant.enums.MessageTypeEnum; import org.jeecg.common.constant.enums.MessageTypeEnum;
@ -28,7 +27,6 @@ import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
* @author nieziyan * @author nieziyan
* @date 2023-06-21 * @date 2023-06-21
*/ */
@Slf4j
@Component @Component
public class SendMessage { public class SendMessage {
@ -52,7 +50,6 @@ public class SendMessage {
*/ */
public void send(MessageDTO messageDTO, String groupId, String notific){ public void send(MessageDTO messageDTO, String groupId, String notific){
Map<String, String> contact = getContact(groupId); Map<String, String> contact = getContact(groupId);
if (MapUtil.isEmpty(contact)) return;
if (StrUtil.isBlank(notific)) return; if (StrUtil.isBlank(notific)) return;
List<String> ways = ListUtil.toList(StrUtil.split(notific, COMMA)); List<String> ways = ListUtil.toList(StrUtil.split(notific, COMMA));
if (ways.contains(ALL.getValue())) if (ways.contains(ALL.getValue()))
@ -99,33 +96,26 @@ public class SendMessage {
* @param groupId * @param groupId
*/ */
private Map<String, String> getContact(String groupId){ private Map<String, String> getContact(String groupId){
List<String> userIds = abnormalAlarmClient.userIds(groupId).getResult();
// 查询用户信息
List<SysUser> sysUsers = sysUserService.listByIds(userIds);
// 用户名
String usernameList = sysUsers.stream().map(SysUser::getUsername).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 邮箱
String emailList = sysUsers.stream().map(SysUser::getEmail).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 用户名-邮箱Map
Map<String, String> userEmail = sysUsers.stream()
.collect(Collectors.toMap(SysUser::getUsername, SysUser::getEmail));
// 手机号码
String phoneList = sysUsers.stream().map(SysUser::getPhone).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
Map<String,String> result = new HashMap<>(); Map<String,String> result = new HashMap<>();
try { result.put(SYSTEM, usernameList);
List<String> userIds = abnormalAlarmClient.userIds(groupId).getResult(); result.put(EMAIL, emailList);
// 查询用户信息 result.put(SMS, phoneList);
List<SysUser> sysUsers = sysUserService.listByIds(userIds); result.putAll(userEmail);
// 用户名 return result;
String usernameList = sysUsers.stream().map(SysUser::getUsername).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 邮箱
String emailList = sysUsers.stream().map(SysUser::getEmail).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 用户名-邮箱Map
Map<String, String> userEmail = sysUsers.stream()
.filter(item -> StrUtil.isNotBlank(item.getEmail()))
.collect(Collectors.toMap(SysUser::getUsername, SysUser::getEmail));
// 手机号码
String phoneList = sysUsers.stream().map(SysUser::getPhone).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
result.put(SYSTEM, usernameList);
result.put(EMAIL, emailList);
result.put(SMS, phoneList);
result.putAll(userEmail);
return result;
}catch (Exception e) {
log.error("获取收件人联系信息异常: {}", e.getMessage());
return result;
}
} }
} }

View File

@ -6,7 +6,6 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import feign.FeignException;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.dto.message.MessageDTO; import org.jeecg.common.api.dto.message.MessageDTO;
@ -103,10 +102,6 @@ public class DatabaseJob extends Monitor implements Job{
}*/ }*/
// 向运管查询监控项数据 // 向运管查询监控项数据
String token = ManageUtil.getToken(); String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token); Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult(); ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){ if (ObjectUtil.isNull(itemHistory)){
@ -146,12 +141,9 @@ public class DatabaseJob extends Monitor implements Job{
getSendMessage().send(messageDTO, groupId, notific); getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId); getPushAppUtil().pushToSingle(messageDTO, groupId);
} }
} catch (FeignException.Unauthorized e){
ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage()); log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
} catch (Exception e){ }catch (Exception e){
log.error("Database监控异常: {}", e.getMessage()); log.error("Database监控异常: {}", e.getMessage());
} }
} }

View File

@ -85,10 +85,6 @@ public class ServerJob extends Monitor implements Job {
// 向运管查询监控项数据 // 向运管查询监控项数据
String token = ManageUtil.getToken(); String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token); Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult(); ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){ if (ObjectUtil.isNull(itemHistory)){
@ -128,12 +124,12 @@ public class ServerJob extends Monitor implements Job {
getSendMessage().send(messageDTO, groupId, notific); getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId); getPushAppUtil().pushToSingle(messageDTO, groupId);
} }
} catch (FeignException.Unauthorized e){ }catch (FeignException.Unauthorized e){
ManageUtil.refreshToken(); ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token"); log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage()); log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
} catch (Exception e){ }catch (Exception e){
log.error("Server监控异常: {}", e.getMessage()); log.error("Server监控异常: {}", e.getMessage());
} }
} }

View File

@ -6,7 +6,6 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
import feign.FeignException;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.dto.message.MessageDTO; import org.jeecg.common.api.dto.message.MessageDTO;
@ -87,10 +86,6 @@ public class DatabaseJob extends Monitor {
// 向运管查询监控项数据 // 向运管查询监控项数据
String token = ManageUtil.getToken(); String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token); Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult(); ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){ if (ObjectUtil.isNull(itemHistory)){
@ -130,13 +125,11 @@ public class DatabaseJob extends Monitor {
getSendMessage().send(messageDTO, groupId, notific); getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId); getPushAppUtil().pushToSingle(messageDTO, groupId);
} }
}catch (FeignException.Unauthorized e){
ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage()); log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){ }catch (Exception e){
log.error("Database监控异常: {}", e.getMessage()); log.error("Database监控异常: {}", e.getMessage());
e.printStackTrace();
} }
} }
destroy(); destroy();

View File

@ -111,6 +111,7 @@ public class EmailJob extends Monitor{
log.error("Email预警规则: {}解析失败,失败原因: {}", operator, e.getMessage()); log.error("Email预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){ }catch (Exception e){
log.error("Email监控异常: {}", e.getMessage()); log.error("Email监控异常: {}", e.getMessage());
e.printStackTrace();
} }
} }
destroy(); destroy();

View File

@ -84,10 +84,6 @@ public class ServerJob extends Monitor{
// 向运管查询监控项数据 // 向运管查询监控项数据
String token = ManageUtil.getToken(); String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token); Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult(); ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){ if (ObjectUtil.isNull(itemHistory)){
@ -127,13 +123,14 @@ public class ServerJob extends Monitor{
getSendMessage().send(messageDTO, groupId, notific); getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId); getPushAppUtil().pushToSingle(messageDTO, groupId);
} }
} catch (FeignException.Unauthorized e){ }catch (FeignException.Unauthorized e){
ManageUtil.refreshToken(); ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token"); log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) { } catch (JsonProcessingException e) {
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage()); log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
} catch (Exception e){ }catch (Exception e){
log.error("Server监控异常: {}", e.getMessage()); log.error("Server监控异常: {}", e.getMessage());
e.printStackTrace();
} }
} }
destroy(); destroy();

View File

@ -44,7 +44,7 @@ public class TableSpaceJob extends Monitor {
/** /**
* 解析Oracle 表空间预警规则 * 解析Oracle 表空间预警规则
**/ **/
@Scheduled(cron = "${task.period-space:0 0/1 * * * ?}") @Scheduled(cron = "${task.period-space:0 0 */1 * * ?}")
public void execute(){ public void execute(){
init(); init();
@ -124,6 +124,7 @@ public class TableSpaceJob extends Monitor {
log.error("Database-TableSpace预警规则: {}解析失败,失败原因: {}", operator, e.getMessage()); log.error("Database-TableSpace预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){ }catch (Exception e){
log.error("Database-TableSpace监控异常: {}", e.getMessage()); log.error("Database-TableSpace监控异常: {}", e.getMessage());
e.printStackTrace();
} }
} }
destroy(); destroy();