同步mdc代码

This commit is contained in:
qiaoqinzheng 2024-04-19 16:53:17 +08:00
parent 01122e6daa
commit b0529f0f3d
14 changed files with 189 additions and 151 deletions

View File

@ -669,7 +669,7 @@ public class EmailServiceManager {
String key = RedisConstant.EMAIL_MSG_ID+StringConstant.COLON+messageId;
int numberKey = redisUtil.get(key) != null? (int) redisUtil.get(key):0;
// exist = redisUtil.hasKey(key);
if(numberKey > taskProperties.getForceDeletedNumber()){
if(numberKey >= taskProperties.getForceDeletedNumber()){
exist = true;
log.info("Check: Remove Email:{},receiveTime:{}",message.getSubject(), DateUtils.formatDate(message.getReceivedDate(),"yyyy-MM-dd HH:mm:ss"));
message.setFlag(Flags.Flag.DELETED,true);

View File

@ -5,8 +5,10 @@ import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.StrBuilder;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.ReUtil;
import cn.hutool.core.util.StrUtil;
import lombok.Data;
import lombok.NoArgsConstructor;
@ -16,14 +18,15 @@ import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.constant.enums.SampleType;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.common.util.*;
import org.jeecg.common.util.dynamic.db.FreemarkerParseFactory;
import org.jeecg.modules.base.dto.NuclideInfo;
import org.jeecg.modules.base.dto.Info;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisRule;
import org.jeecg.modules.base.enums.Condition;
import org.jeecg.modules.base.enums.DSType;
import org.jeecg.modules.feignclient.SystemClient;
import org.jeecg.modules.service.AnalysisResultService;
import org.jeecg.modules.service.IAlarmAnalysisLogService;
@ -36,9 +39,14 @@ import org.springframework.stereotype.Component;
import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
import static org.jeecg.common.util.TokenUtils.getTempToken;
import static org.jeecg.modules.base.enums.Template.ANALYSIS_NUCLIDE;
import static org.jeecg.modules.base.enums.Template.MONITOR_EMAIL;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Data
@ -71,7 +79,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
try {
String streamKey = message.getStream();
init();
/**
/*
* 新消息在未进行ACK之前,状态也为pending,
* 直接消费所有异常未确认的消息和新消息
*/
@ -86,10 +94,10 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
// 消费完成后,手动确认消费消息[消息消费成功]
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
// TODO del 取消手动删除已消费消息
// redisStreamUtil.del(streamKey, recordId.getValue());
// 手动删除已消费消息
redisStreamUtil.del(streamKey, recordId.getValue());
}
}catch (RuntimeException e){
}catch (Exception e){
log.error("AnalysisConsumer消费异常: {}", e.getMessage());
}finally {
destroy();
@ -109,7 +117,8 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
for (AlarmAnalysisRule rule : rules) {
// 当前规则是否有报警条件
String conditionStr = rule.getConditions();
if (StrUtil.isBlank(conditionStr)) continue;
if (StrUtil.isBlank(conditionStr))
continue;
// 是否在当前规则关注的台站列表内
String stations = rule.getStations();
if (!StrUtil.contains(stations, stationId))
@ -137,74 +146,71 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
info.setRuleId(rule.getId());
info.setGroupId(rule.getContactGroup());
info.setConditions(rule.getConditions());
judge(info,nuclidesCross);
judge(info, nuclidesCross);
}
}
private void judge(Info info, Map<String,String> nuclidesCross){
Set<String> nuclideNames = nuclidesCross.keySet();
StringBuilder alarmInfo = new StringBuilder();
List<String> firstDetected;
List<NuclideInfo> moreThanAvg = new ArrayList<>();
String conditionStr = info.getConditions();
String betaOrGamma = info.getBetaOrGamma();
String datasource = info.getDatasource();
String stationId = info.getStationId();
// 获取谱文件采样日期 如果为null 则默认为LocalDate.now()
LocalDate collDate = ObjectUtil.isNull(info.getCollectionDate()) ? LocalDate.now() :
info.getCollectionDate().toLocalDate();
List<String> conditions = ListUtil.toList(conditionStr.split(COMMA));
List<String> firstDetected = new ArrayList<>(); // 首次发现
List<NuclideInfo> moreThanAvg = new ArrayList<>(); // 超浓度均值
List<String> meanwhile = new ArrayList<>(); // 同时出现两种及以上核素
for (String con : conditions) {
Condition condition = Condition.valueOf1(con);
if (ObjectUtil.isNotNull(condition)){
switch (condition){
case FIRST_FOUND: // 首次发现该元素
firstDetected = firstDetected(betaOrGamma,datasource,nuclideNames);
if (CollUtil.isNotEmpty(firstDetected)){
String message = "First discovery of nuclides: [" + StrUtil.join(COMMA,firstDetected) + "]";
alarmInfo.append(message);
}
break;
case ABOVE_AVERAGE: // 元素浓度高于均值
moreThanAvg = moreThanAvg(datasource,nuclidesCross);
if (CollUtil.isNotEmpty(moreThanAvg)){
for (NuclideInfo nuclideInfo : moreThanAvg) {
String nuclide = nuclideInfo.getNuclide();
String threshold = nuclideInfo.getThreshold();
String message = "Nuclide " + nuclide + "is above average: " + threshold;
alarmInfo.append(COMMA).append(message);
}
}
break;
case MEANWHILE: // 同时出现两种及以上核素
if (nuclideNames.size() >= 2){
String message = "Simultaneously detecting nuclides: [" + StrUtil.join(COMMA,nuclideNames) + "]";
alarmInfo.append(COMMA).append(message);
}
break;
default:
break;
}
if (ObjectUtil.isNull(condition)) continue;
switch (condition){
case FIRST_FOUND: // 首次发现该元素
firstDetected = firstDetected(betaOrGamma, datasource, nuclideNames);
break;
case ABOVE_AVERAGE: // 元素浓度高于均值
moreThanAvg = moreThanAvg(datasource, stationId, collDate, nuclidesCross);
break;
case MEANWHILE: // 同时出现两种及以上核素
if (CollUtil.isNotEmpty(nuclideNames) && nuclideNames.size() >= 2)
meanwhile.addAll(nuclideNames);
break;
default:
break;
}
}
if (StrUtil.isNotBlank(alarmInfo.toString())){
// 保存报警日志
AlarmAnalysisLog logInfo = new AlarmAnalysisLog();
BeanUtil.copyProperties(info,logInfo);
SampleType sampleType = SampleType.typeOf(betaOrGamma);
if (ObjectUtil.isNotNull(sampleType))
logInfo.setSampleType(sampleType.getValue());
if (alarmInfo.toString().startsWith(COMMA))
alarmInfo = new StringBuilder(StrUtil.sub(alarmInfo.toString(), 1, alarmInfo.length()));
logInfo.setAlarmInfo(alarmInfo.toString());
if (CollUtil.isNotEmpty(moreThanAvg))
logInfo.setNuclideInfoList(moreThanAvg);
logService.saveLog(logInfo);
// 发送报警信息
String groupId = info.getGroupId();
MessageDTO messageDTO = new MessageDTO();
messageDTO.setTitle("Nuclied Warn Info").setContent(alarmInfo.toString());
if (StrUtil.isNotBlank(groupId)) {
systemClient.sendMessage(messageDTO, groupId, ALL.getValue());
systemClient.pushMessageToSingle(messageDTO, groupId);
}
// 构建预警信息
DataTool dataTool = DataTool.getInstance();
if (CollUtil.isNotEmpty(firstDetected))
dataTool.put("firstDetected", CollUtil.join(firstDetected, StrUtil.COMMA + StrUtil.SPACE));
if (CollUtil.isNotEmpty(moreThanAvg)){
String above = moreThanAvg.stream()
.map(item -> item.getNuclide() + "(" + item.getValue() + ")" + " > " + item.getThreshold())
.collect(Collectors.joining(StrUtil.COMMA + StrUtil.SPACE));
dataTool.put("moreThanAvg", above);
}
if (CollUtil.isNotEmpty(meanwhile))
dataTool.put("meanwhile", CollUtil.join(meanwhile, StrUtil.COMMA + StrUtil.SPACE));
// 如果报警数据为空 则不需要发送报警信息和生成报警日志
if (MapUtil.isEmpty(dataTool.get())) return;
MessageDTO messageDTO = TemplateUtil.parse1(ANALYSIS_NUCLIDE.getCode(), dataTool.get());
// 保存报警日志
AlarmAnalysisLog logInfo = new AlarmAnalysisLog();
BeanUtil.copyProperties(info, logInfo);
SampleType sampleType = SampleType.typeOf(betaOrGamma);
if (ObjectUtil.isNotNull(sampleType))
logInfo.setSampleType(sampleType.getValue());
logInfo.setAlarmInfo(messageDTO.getContent());
if (CollUtil.isNotEmpty(moreThanAvg))
logInfo.setNuclideInfoList(moreThanAvg);
logService.saveLog(logInfo);
// 发送报警信息
String groupId = info.getGroupId();
systemClient.sendMessage(messageDTO, groupId, ALL.getValue());
systemClient.pushMessageToSingle(messageDTO, groupId);
}
/**
@ -222,12 +228,12 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
/**
* 核素值大于历史浓度均值
*/
private List<NuclideInfo> moreThanAvg(String dataSourceType,
Map<String,String> nuclidesCross){
private List<NuclideInfo> moreThanAvg(String dataSourceType, String stationId,
LocalDate collDate, Map<String,String> nuclidesCross){
List<NuclideInfo> nuclideInfos = new ArrayList<>();
Set<String> nuclideNames = nuclidesCross.keySet();
Map<String, String> nuclideAvgs = nuclideAvgService
.list(nuclideNames, dataSourceType).stream()
.list(nuclideNames, dataSourceType, stationId, collDate).stream()
.collect(Collectors.toMap(AlarmAnalysisNuclideAvg::getNuclide,
AlarmAnalysisNuclideAvg::getVal));
for (Map.Entry<String, String> nuclide : nuclidesCross.entrySet()) {
@ -246,41 +252,27 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
NuclideInfo nuclideInfo = new NuclideInfo();
nuclideInfo.setNuclide(nuclideName);
nuclideInfo.setThreshold(avg.toString());
nuclideInfo.setDatasource(type(dataSourceType));
nuclideInfo.setValue(conc.toString());
nuclideInfo.setDatasource(DSType.typeOf(dataSourceType));
// 对浓度值保留五位小数
nuclideInfo.setValue(NumUtil.keepStr(concValue, 5));
nuclideInfos.add(nuclideInfo);
}
return nuclideInfos;
}
private String type(String dataSourceType){
switch (dataSourceType){
case CommonConstant.ARMDARR:
return "ARMDARR";
case CommonConstant.ARMDRRR:
return "ARMDRRR";
case CommonConstant.IDCARR:
return "IDCARR";
case CommonConstant.IDCRRR:
return "IDCRRR";
default:
return null;
}
}
private void init() {
// start:生成临时Token到线程中
// start 生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
systemClient = SpringContextUtils.getBean(SystemClient.class);
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
logService = SpringContextUtils.getBean(IAlarmAnalysisLogService.class);
analysisResultService = SpringContextUtils.getBean(AnalysisResultService.class);
ruleService = SpringContextUtils.getBean(IAlarmAnalysisRuleService.class);
analysisResultService = SpringContextUtils.getBean(AnalysisResultService.class);
nuclideAvgService = SpringContextUtils.getBean(IAlarmAnalysisNuclideAvgService.class);
}
private void destroy(){
// end:删除临时Token
// end 删除临时Token
UserTokenContext.remove();
}
}

View File

@ -34,10 +34,6 @@ public class RedisStreamConfig {
// 每次轮询取几条消息
private final Integer maxMsg = 10;
private final String alarmKey = RedisConstant.STREAM_ALARM;
private final String alarmGroup = RedisConstant.GROUP_ALARM;
private final String alarmConsumer = RedisConstant.CONSUMER_ALARM;
private final String analysisKey = RedisConstant.STREAM_ANALYSIS;
private final String analysisGroup = RedisConstant.GROUP_ANALYSIS;
private final String analysisConsumer = RedisConstant.CONSUMER_ANALYSIS;
@ -103,7 +99,7 @@ public class RedisStreamConfig {
// 独立消费
/*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
new ConsumeListener("独立消费", null, null));*/
// 非独立消费
/*
register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用
这种方式适用于长期运行的消息消费者它会持续监听Redis Stream并处理新到达的消息
@ -111,24 +107,28 @@ public class RedisStreamConfig {
receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间
这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务
**/
// 注册消费组A中的消费者A1,手动ACK
/*ConsumerStreamReadRequest<String> readA1 = StreamMessageListenerContainer
/* 1.需要手动确认消费消息 */
// 1.1 使用 register 方式
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer
.StreamReadRequest
.builder(StreamOffset.create(warnKey, ReadOffset.lastConsumed()))
.consumer(Consumer.from(groupWarnA, consumerWarnA1))
.builder(StreamOffset.create(analysisKey, ReadOffset.lastConsumed()))
.consumer(Consumer.from(analysisGroup, analysisConsumer))
// 手动确认消费了消息 默认为自动确认消息
.autoAcknowledge(false)
// 如果消费者发生了异常,是否禁止消费者消费
// 如果消费者发生了异常 不禁止消费者消费 默认为禁止
.cancelOnError(throwable -> false)
.build();
ConsumeA1 consumeA1 = new ConsumeA1(groupWarnA, consumerWarnA1);
streamMessageListenerContainer.register(readA1, consumeA1);*/
AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup,analysisConsumer);
AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup, analysisConsumer);
streamMessageListenerContainer.register(readRequest, analysis);
// 1.2 使用 receive 方式
/*AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup, analysisConsumer);
streamMessageListenerContainer.receive(Consumer.from(analysisGroup, analysisConsumer),
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);
// 创建消费组A中的消费者A2,自动ACK
/* ConsumeA2 consumeA2 = new ConsumeA2(consumerWarnA2);
streamMessageListenerContainer.receiveAutoAck(Consumer.from(groupWarnA, consumerWarnA2),
StreamOffset.create(warnKey, ReadOffset.lastConsumed()), consumeA2);*/
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);*/
/* 2.自动确认消费消息 */
// 2.1 使用 receive 方式
/*AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup,analysisConsumer);
streamMessageListenerContainer.receiveAutoAck(Consumer.from(analysisGroup, analysisConsumer),
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);*/
return streamMessageListenerContainer;
}
}

View File

@ -6,12 +6,14 @@ import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
import com.baomidou.mybatisplus.extension.service.IService;
import org.jeecg.modules.base.bizVo.NuclideAvgVo;
import java.time.LocalDate;
import java.util.List;
import java.util.Set;
public interface IAlarmAnalysisNuclideAvgService extends IService<AlarmAnalysisNuclideAvg> {
List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames,String dataSourceType);
List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames, String dataSourceType,
String stationId, LocalDate collDate);
Page<NuclideAvgDto> findPage(NuclideAvgVo nuclideAvgVo);
}

View File

@ -75,6 +75,7 @@ public class AlarmAnalysisLogServiceImpl extends ServiceImpl<AlarmAnalysisLogMap
try {
ObjectMapper mapper = new ObjectMapper();
List<NuclideInfo> nuclideInfos = mapper.readValue(nuclideInfo, new TypeReference<List<NuclideInfo>>() {});
nuclideInfos.forEach(NuclideInfo::keepSix);
logDto.setNuclideList(nuclideInfos);
} catch (JsonProcessingException e) {
log.error("NuclideInfo解析异常: {}", e.getMessage());

View File

@ -15,6 +15,7 @@ import org.springframework.stereotype.Service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -27,13 +28,15 @@ public class AlarmAnalysisNuclideAvgServiceImpl extends ServiceImpl<AlarmAnalysi
private SystemClient systemClient;
@Override
public List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames,String dataSourceType) {
LocalDate dayAgo = LocalDate.now().minusDays(1);
public List<AlarmAnalysisNuclideAvg> list(Set<String> nuclideNames, String dataSourceType,
String stationId, LocalDate collDate) {
LocalDate dayAgo = collDate.minusDays(1);
LambdaQueryWrapper<AlarmAnalysisNuclideAvg> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(AlarmAnalysisNuclideAvg::getDataSourceType,dataSourceType);
wrapper.eq(AlarmAnalysisNuclideAvg::getCaclDate,dayAgo);
wrapper.eq(AlarmAnalysisNuclideAvg::getStationId, stationId);
wrapper.eq(AlarmAnalysisNuclideAvg::getDataSourceType, dataSourceType);
wrapper.eq(AlarmAnalysisNuclideAvg::getCaclDate, dayAgo);
wrapper.in(AlarmAnalysisNuclideAvg::getNuclide,nuclideNames);
return list(wrapper);
return this.list(wrapper);
}
@Override

View File

@ -1,5 +1,6 @@
package org.jeecg.modules.feignclient;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.system.util.JwtUtil;
@ -9,6 +10,7 @@ import org.jeecg.modules.base.dto.LoginResult;
import org.jeecg.modules.base.dto.LoginVo;
import org.springframework.core.env.Environment;
@Slf4j
public class ManageUtil {
private static RedisUtil redisUtil;
@ -31,20 +33,28 @@ public class ManageUtil {
* 登录运管系统 获取Token
* */
public static String getToken(){
if (redisUtil.hasKey(RedisConstant.MANAGE_TOKEN))
return (String) redisUtil.get(RedisConstant.MANAGE_TOKEN);
LoginVo loginVo = new LoginVo(username, password);
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
String token = loginRes.getResult().getToken();
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
return token;
String token = null;
try {
if (redisUtil.hasKey(RedisConstant.MANAGE_TOKEN))
return (String) redisUtil.get(RedisConstant.MANAGE_TOKEN);
LoginVo loginVo = new LoginVo(username, password);
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
token = loginRes.getResult().getToken();
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
return token;
}catch (RuntimeException e){
return token;
}
}
public static void refreshToken(){
LoginVo loginVo = new LoginVo(username, password);
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
String token = loginRes.getResult().getToken();
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
try {
LoginVo loginVo = new LoginVo(username, password);
Result<LoginResult> loginRes = monitorSystem.login(loginVo);
String token = loginRes.getResult().getToken();
redisUtil.set(RedisConstant.MANAGE_TOKEN, token, JwtUtil.EXPIRE_TIME / 1000 - 10);
}catch (RuntimeException e){
log.error("运管系统登录异常, Token刷新失败: {}", e.getMessage());
}
}
}

View File

@ -4,6 +4,7 @@ import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.constant.enums.MessageTypeEnum;
@ -27,6 +28,7 @@ import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
* @author nieziyan
* @date 2023-06-21
*/
@Slf4j
@Component
public class SendMessage {
@ -50,6 +52,7 @@ public class SendMessage {
*/
public void send(MessageDTO messageDTO, String groupId, String notific){
Map<String, String> contact = getContact(groupId);
if (MapUtil.isEmpty(contact)) return;
if (StrUtil.isBlank(notific)) return;
List<String> ways = ListUtil.toList(StrUtil.split(notific, COMMA));
if (ways.contains(ALL.getValue()))
@ -96,26 +99,33 @@ public class SendMessage {
* @param groupId
*/
private Map<String, String> getContact(String groupId){
List<String> userIds = abnormalAlarmClient.userIds(groupId).getResult();
// 查询用户信息
List<SysUser> sysUsers = sysUserService.listByIds(userIds);
// 用户名
String usernameList = sysUsers.stream().map(SysUser::getUsername).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 邮箱
String emailList = sysUsers.stream().map(SysUser::getEmail).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 用户名-邮箱Map
Map<String, String> userEmail = sysUsers.stream()
.collect(Collectors.toMap(SysUser::getUsername, SysUser::getEmail));
// 手机号码
String phoneList = sysUsers.stream().map(SysUser::getPhone).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
Map<String,String> result = new HashMap<>();
result.put(SYSTEM, usernameList);
result.put(EMAIL, emailList);
result.put(SMS, phoneList);
result.putAll(userEmail);
return result;
try {
List<String> userIds = abnormalAlarmClient.userIds(groupId).getResult();
// 查询用户信息
List<SysUser> sysUsers = sysUserService.listByIds(userIds);
// 用户名
String usernameList = sysUsers.stream().map(SysUser::getUsername).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 邮箱
String emailList = sysUsers.stream().map(SysUser::getEmail).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
// 用户名-邮箱Map
Map<String, String> userEmail = sysUsers.stream()
.filter(item -> StrUtil.isNotBlank(item.getEmail()))
.collect(Collectors.toMap(SysUser::getUsername, SysUser::getEmail));
// 手机号码
String phoneList = sysUsers.stream().map(SysUser::getPhone).filter(StrUtil::isNotBlank)
.collect(Collectors.joining(COMMA));
result.put(SYSTEM, usernameList);
result.put(EMAIL, emailList);
result.put(SMS, phoneList);
result.putAll(userEmail);
return result;
}catch (Exception e) {
log.error("获取收件人联系信息异常: {}", e.getMessage());
return result;
}
}
}

View File

@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import feign.FeignException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.dto.message.MessageDTO;
@ -102,6 +103,10 @@ public class DatabaseJob extends Monitor implements Job{
}*/
// 向运管查询监控项数据
String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){
@ -141,9 +146,12 @@ public class DatabaseJob extends Monitor implements Job{
getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId);
}
} catch (FeignException.Unauthorized e){
ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) {
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){
} catch (Exception e){
log.error("Database监控异常: {}", e.getMessage());
}
}

View File

@ -85,6 +85,10 @@ public class ServerJob extends Monitor implements Job {
// 向运管查询监控项数据
String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){
@ -124,12 +128,12 @@ public class ServerJob extends Monitor implements Job {
getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId);
}
}catch (FeignException.Unauthorized e){
} catch (FeignException.Unauthorized e){
ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) {
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){
} catch (Exception e){
log.error("Server监控异常: {}", e.getMessage());
}
}

View File

@ -6,6 +6,7 @@ import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import feign.FeignException;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.dto.message.MessageDTO;
@ -86,6 +87,10 @@ public class DatabaseJob extends Monitor {
// 向运管查询监控项数据
String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){
@ -125,11 +130,13 @@ public class DatabaseJob extends Monitor {
getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId);
}
}catch (FeignException.Unauthorized e){
ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) {
log.error("Database预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){
log.error("Database监控异常: {}", e.getMessage());
e.printStackTrace();
}
}
destroy();

View File

@ -111,7 +111,6 @@ public class EmailJob extends Monitor{
log.error("Email预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){
log.error("Email监控异常: {}", e.getMessage());
e.printStackTrace();
}
}
destroy();

View File

@ -84,6 +84,10 @@ public class ServerJob extends Monitor{
// 向运管查询监控项数据
String token = ManageUtil.getToken();
if(StrUtil.isBlank(token)){
log.error("运管系统登录异常, Token获取失败");
continue;
}
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, itemType, start, end, token);
ItemHistory itemHistory = result.getResult();
if (ObjectUtil.isNull(itemHistory)){
@ -123,14 +127,13 @@ public class ServerJob extends Monitor{
getSendMessage().send(messageDTO, groupId, notific);
getPushAppUtil().pushToSingle(messageDTO, groupId);
}
}catch (FeignException.Unauthorized e){
} catch (FeignException.Unauthorized e){
ManageUtil.refreshToken();
log.warn("向运管系统查询ItemHistory信息异常: Token失效,已刷新Token");
} catch (JsonProcessingException e) {
log.error("Server预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){
} catch (Exception e){
log.error("Server监控异常: {}", e.getMessage());
e.printStackTrace();
}
}
destroy();

View File

@ -44,7 +44,7 @@ public class TableSpaceJob extends Monitor {
/**
* 解析Oracle 表空间预警规则
**/
@Scheduled(cron = "${task.period-space:0 0 */1 * * ?}")
@Scheduled(cron = "${task.period-space:0 0/1 * * * ?}")
public void execute(){
init();
@ -124,7 +124,6 @@ public class TableSpaceJob extends Monitor {
log.error("Database-TableSpace预警规则: {}解析失败,失败原因: {}", operator, e.getMessage());
}catch (Exception e){
log.error("Database-TableSpace监控异常: {}", e.getMessage());
e.printStackTrace();
}
}
destroy();