feat:系统预警

This commit is contained in:
nieziyan 2023-07-20 10:26:55 +08:00
parent dcb82ce493
commit 82783edbcc
20 changed files with 309 additions and 162 deletions

View File

@ -513,7 +513,7 @@ public interface CommonConstant {
/**
* 预警规则Key前缀
*/
String PREFIX_RULE = "Warn:Rule:";
String PREFIX_RULE = "Rule:";
// 启用
Integer ENABLED = 1;

View File

@ -21,8 +21,8 @@ public enum MessageTypeEnum {
DD("dingtalk", "钉钉消息"),
/** 企业微信 */
QYWX("wechat_enterprise", "企业微信"),
/* 自定义邮箱配置 */
YX("email-self", "自定义邮件消息"),
/* 邮箱消息 */
YX("email-self", "邮箱消息"),
/* 手机短信 */
SMS("sms", "手机短信");

View File

@ -3,27 +3,19 @@ package org.jeecg.common.util;
import cn.hutool.core.collection.ListUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.collect.Multimap;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.modules.base.dto.RuleDto;
import org.jeecg.modules.base.entity.AlarmRule;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.connection.stream.*;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StreamOperations;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.stereotype.Component;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -102,8 +94,8 @@ public class RedisStreamUtil{
* @param groupName
* @param consumerName
*/
public List<ObjectRecord<String,WarnDto>> read(String streamKey,String groupName,String consumerName){
return stringRedisTemplate.opsForStream().read(WarnDto.class,
public List<ObjectRecord<String, RuleDto>> read(String streamKey, String groupName, String consumerName){
return stringRedisTemplate.opsForStream().read(RuleDto.class,
Consumer.from(groupName, consumerName),
StreamOffset.fromStart(streamKey));
}
@ -154,7 +146,7 @@ public class RedisStreamUtil{
*
* @param record
*/
private String putRecord(Record<String, WarnDto> record){
private String putRecord(Record<String, RuleDto> record){
/**
* 1.使用stringRedisTemplate
* 2.序列化方式使用string
@ -165,12 +157,12 @@ public class RedisStreamUtil{
/**
* 向消息队列中添加Warn信息
*
* @param warnDto
* @param ruleDto
*/
public String pushWarn(WarnDto warnDto){
public String pushWarn(RuleDto ruleDto){
String warnKey = CommonConstant.STREAM_KEY_WARN;
ObjectRecord<String, WarnDto> record = StreamRecords.newRecord()
.in(warnKey).ofObject(warnDto);
ObjectRecord<String, RuleDto> record = StreamRecords.newRecord()
.in(warnKey).ofObject(ruleDto);
// 向Redis Stream中推送消息
return putRecord(record);
}
@ -193,4 +185,17 @@ public class RedisStreamUtil{
return null;
});
}
public Set<String> keys(String pattern) {
ScanOptions options = ScanOptions.scanOptions().match(pattern).build();
return redisTemplate.execute((RedisCallback<Set<String>>) connection -> {
Set<String> keys = new HashSet<>();
try (Cursor<byte[]> cursor = connection.scan(options)) {
while (cursor.hasNext()) {
keys.add(new String(cursor.next()));
}
}
return keys;
});
}
}

View File

@ -0,0 +1,25 @@
package org.jeecg.modules.base.dto;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
import java.io.Serializable;
import java.util.List;
@Data
public class ItemDto implements Serializable {
private Double min;
private Double max;
private Double avg;
private Double now;
private String name;
private String units;
private List<Object> list;
}

View File

@ -1,5 +1,6 @@
package org.jeecg.modules.base.dto;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.experimental.Accessors;
import org.jeecg.modules.base.enums.Item;
@ -14,17 +15,14 @@ import java.io.Serializable;
* @date 2023-06-29
*/
@Data
@AllArgsConstructor
@Accessors(chain = true)
public class WarnDto implements Serializable{
// 资源id
private String sourceId;
public class RuleDto implements Serializable{
// 资源类型
private SourceType sourceType;
// 监控项
private Item item;
private Integer itemId;
// 当前值
private double value;

View File

@ -73,8 +73,8 @@ public class AlarmRule implements Serializable {
/**
* 监控项
*/
@TableField(value = "item")
private String item;
@TableField(value = "item_id")
private Integer itemId;
/**
* 创建时间

View File

@ -1,13 +1,26 @@
package org.jeecg.modules.base.entity;
import com.fasterxml.jackson.annotation.JsonIgnore;
import lombok.Data;
@Data
public class Rule {
private String name;
private String name; // 监控项名称
private String operator;
private String operator; // 比较符
private double threshold;
private Double threshold; // 阈值
@JsonIgnore
private String ruleName; // 规则名称
@JsonIgnore
private Double current; // 当前值
@JsonIgnore
private String groupId; // 联系人组id
@JsonIgnore
private String notific; // 通知方式
}

View File

@ -1,7 +1,7 @@
package org.jeecg.modules.base.enums;
public enum Notific {
SYSTEM("1"),EMAIL("2"),SMS("3");
SYSTEM("1"),EMAIL("2"),PHONE("3");
private String way;

View File

@ -4,11 +4,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.EmailConstant;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.enums.Item;
import org.jeecg.modules.base.enums.SourceType;
import org.jeecg.modules.service.ISysEmailLogService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;

View File

@ -3,7 +3,6 @@ package org.jeecg.modules.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.jeecg.common.api.QueryRequest;
import org.jeecg.common.api.vo.Result;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.modules.base.entity.AlarmRule;
public interface IAlarmRuleService extends IService<AlarmRule> {

View File

@ -1,6 +1,5 @@
package org.jeecg.modules.service.impl;
import cn.hutool.core.collection.ListUtil;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
@ -9,18 +8,15 @@ import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multiset;
import org.jeecg.common.api.QueryRequest;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.system.util.JwtUtil;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.modules.base.entity.AlarmRule;
import org.jeecg.modules.base.enums.Op;
import org.jeecg.modules.base.enums.SourceType;
import org.jeecg.modules.mapper.AlarmRuleMapper;
import org.jeecg.modules.service.IAlarmRuleService;
import org.springframework.beans.factory.annotation.Autowired;
@ -28,11 +24,7 @@ import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.time.LocalDate;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
@Service("alarmRuleService")
public class AlarmRuleServiceImpl extends ServiceImpl<AlarmRuleMapper, AlarmRule> implements IAlarmRuleService {
@ -141,6 +133,8 @@ public class AlarmRuleServiceImpl extends ServiceImpl<AlarmRuleMapper, AlarmRule
**/
@PostConstruct
public void rule2Redis(){
String underline = SymbolConstant.UNDERLINE;
String colon = SymbolConstant.COLON;
String prefix = CommonConstant.PREFIX_RULE;
LambdaQueryWrapper<AlarmRule> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(AlarmRule::getEnabled,1);
@ -149,14 +143,11 @@ public class AlarmRuleServiceImpl extends ServiceImpl<AlarmRuleMapper, AlarmRule
for (AlarmRule alarmRule : alarmRules) {
String sourceType = alarmRule.getSourceType();
String sourceId = alarmRule.getSourceId();
String item = alarmRule.getItem();
String key = prefix + sourceType + "_" + sourceId + "_" + item;
Integer itemId = alarmRule.getItemId();
String key = prefix + sourceType + colon;
key += sourceId + underline + itemId;
ruleMap.put(key,alarmRule);
}
redisStreamUtil.putRules(ruleMap);
}
public static void main(String[] args) {
}
}

View File

@ -1,6 +1,7 @@
package org.jeecg.modules.feignclient;
import org.jeecg.common.api.vo.Result;
import org.jeecg.modules.base.dto.ItemDto;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*;
@ -18,40 +19,40 @@ public interface MonitorClient {
@RequestParam String status,
@RequestParam String type);
@GetMapping("queryHostDetails")
Result<?> monDetail(@RequestParam String hostId,
@RequestParam String pageName,
@RequestParam Integer pageNo,
@RequestParam Integer pageSize,
@RequestParam String start,
@RequestParam String end);
@GetMapping("queryItemHistory")
Result<ItemDto> itemHistory(@RequestParam String itemId,
@RequestParam Integer itemType,
@RequestParam String start,
@RequestParam String end);
@GetMapping("queryItemHistoryData")
Result<?> itemHistoryData(@RequestParam String itemId,
@RequestParam Integer itemType,
@RequestParam String start,
@RequestParam String end);
@GetMapping("log")
Result<?> monLog(@RequestParam String code,
@RequestParam String deviceType,
@RequestParam String end,
@RequestParam Integer pageNo,
@RequestParam Integer pageSize,
@RequestParam String start,
@RequestParam String end,
@RequestParam String status);
@GetMapping("log/{hostId}")
Result<?> monOneLog(@PathVariable("hostId") String hostId,
@RequestParam String end,
@RequestParam Integer pageNo,
@RequestParam Integer pageSize,
@RequestParam String start,
@RequestParam String status);
@GetMapping("queryHostDetails")
Result<?> monDetail(@RequestParam String hostId,
@RequestParam String pageName,
@RequestParam String end,
@RequestParam Integer pageNo,
@RequestParam Integer pageSize,
@RequestParam String start);
@GetMapping("queryItemHistory")
Result<?> itemHistory(@RequestParam String end,
@RequestParam String itemId,
@RequestParam Integer itemType,
@RequestParam String start);
@GetMapping("queryItemHistoryData")
Result<?> itemHistoryData(@RequestParam String end,
@RequestParam String itemId,
@RequestParam Integer itemType,
@RequestParam String start);
@RequestParam String status);
}

View File

@ -2,7 +2,6 @@ package org.jeecg.modules.message;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import org.jeecg.common.api.dto.message.MessageDTO;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.enums.MessageTypeEnum;
@ -20,7 +19,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.jeecg.common.constant.enums.MessageTypeEnum.*;
import static org.jeecg.common.util.TokenUtils.getTempToken;
import static org.jeecg.modules.base.enums.Notific.*;
/**
* 消息推送 (站内|邮箱|短信)
@ -31,9 +32,9 @@ import static org.jeecg.common.util.TokenUtils.getTempToken;
@Component
public class SendMessage {
private final String SYSTEM = "Username";
private final String EMAIL = "Email";
private final String SMS = "Phone";
private final String System = "Username";
private final String Email = "Email";
private final String Sms = "Phone";
@Autowired
private ISysBaseAPI sysBaseAPI;
@ -47,40 +48,46 @@ public class SendMessage {
/**
* 根据联系人组id向用户推送消息
*
* @param groupId
*/
public void send(String groupId, String notific, Rule rule){
public void send(Rule rule){
// start:生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
// 封装MessageDTO消息体
MessageDTO messageDTO = new MessageDTO();
messageDTO.setTitle("系统预警消息");
messageDTO.setContent("一封测试邮件");
String ruleName = rule.getRuleName();
double current = rule.getCurrent();
String operator = rule.getOperator();
double threshold = rule.getThreshold();
String context = "您设定的预警规则:"+ruleName+",预警阈值为:["+operator+threshold+"]当前值为:"+current;
messageDTO.setContent(context);
String groupId = rule.getGroupId();
String notific = rule.getNotific();
Map<String, String> contact = getContact(groupId);
if (StrUtil.isBlank(notific))return;
String[] ways = notific.split(",");
for (String way : ways) {
if(way.equals(Notific.SYSTEM.getWay())){// 1.推送系统消息
String toSys = contact.get(SYSTEM);
if(way.equals(SYSTEM.getWay())){// 1.推送系统消息
String toSys = contact.get(System);
if (StrUtil.isNotBlank(toSys)){
messageDTO.setToUser(toSys);
messageDTO.setType(MessageTypeEnum.XT.getType());
messageDTO.setType(XT.getType());
sysBaseAPI.sendTemplateMessage(messageDTO);
}
} else if (way.equals(Notific.EMAIL.getWay())) {// 2.推送邮箱
String toEmail = contact.get(EMAIL);
} else if (way.equals(EMAIL.getWay())) {// 2.推送邮箱
String toEmail = contact.get(Email);
if (StrUtil.isNotBlank(toEmail)){
messageDTO.setToUser(toEmail);
messageDTO.setType(MessageTypeEnum.YX.getType());
messageDTO.setType(YX.getType());
sysBaseAPI.sendTemplateMessage(messageDTO);
}
} else if (way.equals(Notific.SMS.getWay())) {// 3.推送短信
String toSms = contact.get(SMS);
} else if (way.equals(PHONE.getWay())) {// 3.推送短信
String toSms = contact.get(Sms);
if (StrUtil.isNotBlank(toSms)){
messageDTO.setToUser(toSms);
messageDTO.setType(MessageTypeEnum.SMS.getType());
messageDTO.setType(SMS.getType());
sysBaseAPI.sendTemplateMessage(messageDTO);
}
}
@ -114,9 +121,9 @@ public class SendMessage {
.map(SysUser::getPhone)
.collect(Collectors.joining(","));
Map<String,String> result = new HashMap<>();
result.put(SYSTEM,usernameList);
result.put(EMAIL,emailList);
result.put(SMS,phoneList);
result.put(System,usernameList);
result.put(Email,emailList);
result.put(Sms,phoneList);
return result;
}
}

View File

@ -1,20 +1,38 @@
package org.jeecg.modules.quartz.job;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.enums.MessageTypeEnum;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextHolder;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.modules.base.dto.ItemDto;
import org.jeecg.modules.base.dto.RuleDto;
import org.jeecg.modules.base.entity.AlarmRule;
import org.jeecg.modules.base.entity.Rule;
import org.jeecg.modules.base.enums.Item;
import org.jeecg.modules.base.enums.SourceType;
import org.jeecgframework.core.util.ApplicationContextUtil;
import org.jeecg.modules.feignclient.MonitorClient;
import org.jeecg.modules.message.SendMessage;
import org.quartz.*;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.Map;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.FormatStyle;
import java.util.List;
import java.util.Set;
import static org.jeecg.common.constant.enums.MessageTypeEnum.QYWX;
import static org.jeecg.modules.base.enums.Op.*;
import static org.jeecg.modules.base.enums.SourceType.EMAIL;
import static org.jeecg.modules.base.enums.SourceType.SERVER;
@Data
@Slf4j
@ -33,13 +51,86 @@ public class SysInfoJob implements Job {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
RedisStreamUtil redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
/* 获取服务器信息并推送 */
log.info("监控的主机地址:{}",parameter);
WarnDto warnDto = new WarnDto();
warnDto.setSourceId("2").setSourceType(SourceType.EMAIL)
.setItem(Item.CONN).setValue(0d);
MonitorClient monitorClient = SpringContextUtils.getBean(MonitorClient.class);
SendMessage sendMessage = SpringContextUtils.getBean(SendMessage.class);
// 查询所有报警规则,根据报警规则查询监控项数据
String pattern = CommonConstant.PREFIX_RULE;
Set<String> keys = redisStreamUtil.keys(pattern);
if (CollUtil.isEmpty(keys)) return;
String id = redisStreamUtil.pushWarn(warnDto);
log.info("新增RecordId:{}",id);
// 时间间隔为每分钟
LocalDateTime now = LocalDateTime.now()
.withSecond(0)
.withNano(0);
LocalDateTime beforeMin = now.minusMinutes(1);
DateTimeFormatter formatter = DateTimeFormatter
.ofPattern("yyyy-MM-dd HH:mm:ss");
String start = beforeMin.format(formatter);
String end = now.format(formatter);
log.info("开始时间:"+start+",结束时间:"+end);
for (String key : keys) {
Double current = null;
List<AlarmRule> alarmRules = (List<AlarmRule>) redisStreamUtil.get(key);
for (int i = 0; i < alarmRules.size(); i++) {
AlarmRule alarmRule = alarmRules.get(i);
// 如果报警规则为空,跳过本次循环
String operator = alarmRule.getOperator();
if (StrUtil.isBlank(operator))continue;
// 向运管查询监控项数据
if (i == 0){
String itemId = alarmRule.getItemId().toString();
Result<ItemDto> result = monitorClient.itemHistory(itemId, 0, start, end);
current = result.getResult().getNow();
}
// 解析预警规则,判断是否需要报警
try {
ObjectMapper mapper = new ObjectMapper();
Rule rule = mapper.readValue(operator, Rule.class);
rule.setCurrent(current);
boolean needWarn = parse(rule);
// 发送预警信息
if (needWarn){
// 规则名称
rule.setRuleName(alarmRule.getName());
// 报警信息发送方式
rule.setNotific(alarmRule.getNotification());
// 联系人组id
rule.setGroupId(alarmRule.getContactId());
sendMessage.send(rule);
}
} catch (JsonProcessingException e) {
log.error("预警规则{}解析失败!",operator);
e.printStackTrace();
}
}
}
}
private boolean parse(Rule rule){
Double current = rule.getCurrent();
String op = rule.getOperator();
Double threshold = rule.getThreshold();
boolean cNull = ObjectUtil.isNull(current);
boolean tNull = ObjectUtil.isNull(threshold);
if (cNull || tNull)return false;
double currentV = current;
double thresholdV = threshold;
if (EQ.getOp().equals(op)){
return currentV == thresholdV;
} else if (GT.getOp().equals(op)) {
return currentV > thresholdV;
} else if (GE.getOp().equals(op)) {
return currentV >= thresholdV;
} else if (LT.getOp().equals(op)) {
return currentV < thresholdV;
} else if (LE.getOp().equals(op)) {
return currentV <= thresholdV;
}else {
return false;
}
}
}

View File

@ -1,31 +1,26 @@
package org.jeecg.modules.redisStream;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.modules.base.dto.RuleDto;
import org.jeecg.modules.base.entity.AlarmRule;
import org.jeecg.modules.base.entity.Rule;
import org.jeecg.modules.base.enums.Item;
import org.jeecg.modules.base.enums.Op;
import org.jeecg.modules.base.enums.SourceType;
import org.jeecg.modules.message.SendMessage;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.PendingMessage;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Iterator;
import java.util.List;
import static org.jeecg.modules.base.enums.Op.*;
@ -34,7 +29,7 @@ import static org.jeecg.modules.base.enums.Op.*;
@Slf4j
@Component
@NoArgsConstructor
public class ConsumeA1 implements StreamListener<String, ObjectRecord<String, WarnDto>> {
public class ConsumeA1 implements StreamListener<String, ObjectRecord<String, RuleDto>> {
private String groupName;
@ -50,7 +45,7 @@ public class ConsumeA1 implements StreamListener<String, ObjectRecord<String, Wa
}
@Override
public void onMessage(ObjectRecord<String,WarnDto> message) {
public void onMessage(ObjectRecord<String, RuleDto> message) {
/* 避免消费抛出异常后,取消此消费者的消费资格 */
try {
String streamKey = message.getStream();
@ -59,12 +54,12 @@ public class ConsumeA1 implements StreamListener<String, ObjectRecord<String, Wa
* 新消息在未进行ACK之前,状态也为pending,
* 直接消费所有异常未确认的消息和新消息
*/
List<ObjectRecord<String, WarnDto>> pendingList = redisStreamUtil.read(streamKey, groupName, consumerName);
for (ObjectRecord<String, WarnDto> record : pendingList) {
List<ObjectRecord<String, RuleDto>> pendingList = redisStreamUtil.read(streamKey, groupName, consumerName);
for (ObjectRecord<String, RuleDto> record : pendingList) {
RecordId recordId = record.getId();
WarnDto warnDto = record.getValue();
RuleDto ruleDto = record.getValue();
// 消费消息
consume(warnDto);
consume(ruleDto);
// 消费完成后,手动确认消费消息[消息消费成功]
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
@ -83,32 +78,36 @@ public class ConsumeA1 implements StreamListener<String, ObjectRecord<String, Wa
/**
* 消费方法,根据校验情况发送预警信息
*
* @param warnDto
* @param ruleDto
*/
private void consume(WarnDto warnDto) throws JsonProcessingException {
String sourceType = warnDto.getSourceType() == null
? "" : warnDto.getSourceType().getType();
String sourceId = warnDto.getSourceId();
String item = warnDto.getItem() == null
? "" : warnDto.getItem().getValue();
private void consume(RuleDto ruleDto) throws JsonProcessingException {
String sourceType = ruleDto.getSourceType() == null
? "" : ruleDto.getSourceType().getType();
int itemId = ruleDto.getItemId();
String underline = SymbolConstant.UNDERLINE;
String prefix = CommonConstant.PREFIX_RULE;
String key = prefix + sourceType + "_" + sourceId + "_" + item;
String key = prefix + sourceType + underline + itemId;
Boolean hasKey = redisStreamUtil.hasKey(key);
if (!hasKey) return;
List<AlarmRule> alarmRules = (List<AlarmRule>) redisStreamUtil.get(key);
double value = warnDto.getValue();
Double current = ruleDto.getValue();
for (AlarmRule alarmRule : alarmRules) {
String operator = alarmRule.getOperator();
if (StrUtil.isBlank(operator))continue;
ObjectMapper mapper = new ObjectMapper();
Rule rule = mapper.readValue(operator, Rule.class);
boolean needWarn = parse(rule, value);
// 报警信息发送方式
String notific = alarmRule.getNotification();
// 联系人组id
String contactId = alarmRule.getContactId();
rule.setCurrent(current);
boolean needWarn = parse(rule);
// 发送预警信息
if (needWarn)sendMessage.send(contactId,notific,rule);
if (needWarn){
// 规则名称
rule.setRuleName(alarmRule.getName());
// 报警信息发送方式
rule.setNotific(alarmRule.getNotification());
// 联系人组id
rule.setGroupId(alarmRule.getContactId());
sendMessage.send(rule);
}
}
}
@ -117,19 +116,27 @@ public class ConsumeA1 implements StreamListener<String, ObjectRecord<String, Wa
*
* @return 是否需要发送预警信息
*/
private boolean parse(Rule rule,double value){
private boolean parse(Rule rule){
Double current = rule.getCurrent();
String op = rule.getOperator();
double threshold = rule.getThreshold();
Double threshold = rule.getThreshold();
boolean cNull = ObjectUtil.isNull(current);
boolean tNull = ObjectUtil.isNull(threshold);
if (cNull || tNull)return false;
double currentV = current;
double thresholdV = threshold;
if (EQ.getOp().equals(op)){
return value == threshold;
return currentV == thresholdV;
} else if (GT.getOp().equals(op)) {
return value > threshold;
return currentV > thresholdV;
} else if (GE.getOp().equals(op)) {
return value >= threshold;
return currentV >= thresholdV;
} else if (LT.getOp().equals(op)) {
return value < threshold;
return currentV < thresholdV;
} else if (LE.getOp().equals(op)) {
return value <= threshold;
return currentV <= thresholdV;
}else {
return false;
}

View File

@ -5,7 +5,7 @@ import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.modules.base.dto.RuleDto;
import org.jeecg.modules.base.enums.SourceType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
@ -16,21 +16,20 @@ import org.springframework.stereotype.Component;
@Slf4j
@Component
@NoArgsConstructor
public class ConsumeA2 implements StreamListener<String, ObjectRecord<String, WarnDto>> {
public class ConsumeA2 implements StreamListener<String, ObjectRecord<String, RuleDto>> {
private String consumerName;
@Autowired
private RedisUtil redisUtil;
@Override
public void onMessage(ObjectRecord<String,WarnDto> message) {
public void onMessage(ObjectRecord<String, RuleDto> message) {
String streamKey = message.getStream();
RecordId recordId = message.getId();
WarnDto msg = message.getValue();
RuleDto msg = message.getValue();
String prefix = CommonConstant.PREFIX_RULE;
SourceType sourceType = msg.getSourceType();
String sourceId = msg.getSourceId();
String key = prefix + sourceType + "_" + sourceId;
String key = prefix + sourceType + "_";
log.info("[自动ACK][name:"+consumerName+"][streamKey:{}][id:{}][message:{}]", streamKey, recordId, msg);
// redisStreamUtil.del(streamKey, id.getValue());

View File

@ -5,7 +5,7 @@ import cn.hutool.core.util.StrUtil;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.exception.StreamErrorHandler;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.modules.base.dto.WarnDto;
import org.jeecg.modules.base.dto.RuleDto;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
@ -48,7 +48,7 @@ public class RedisStreamConfig {
private RedisStreamUtil redisStreamUtil;
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, WarnDto>> streamMessageListenerContainer() {
public StreamMessageListenerContainer<String, ObjectRecord<String, RuleDto>> streamMessageListenerContainer() {
/* 创建Stream和消费组A */
creatGroup(warnKey, groupWarnA);
// 原子整数,多线程环境下对整数的原子性操作
@ -75,7 +75,7 @@ public class RedisStreamConfig {
return thread;
});
/* 设置消息监听容器 */
StreamMessageListenerContainerOptions<String, ObjectRecord<String,WarnDto>> options =
StreamMessageListenerContainerOptions<String, ObjectRecord<String, RuleDto>> options =
StreamMessageListenerContainerOptions
.builder()
// 每次轮询取几条消息
@ -88,12 +88,12 @@ public class RedisStreamConfig {
.pollTimeout(Duration.ZERO)
//.serializer()
//.objectMapper(new ObjectHashMapper())
.targetType(WarnDto.class)
.targetType(RuleDto.class)
// 异常处理器
.errorHandler(new StreamErrorHandler())
.build();
/* 创建消息监听容器 */
StreamMessageListenerContainer<String, ObjectRecord<String,WarnDto>> streamMessageListenerContainer =
StreamMessageListenerContainer<String, ObjectRecord<String, RuleDto>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 独立消费

View File

@ -25,6 +25,22 @@
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<layout>ZIP</layout>
<includes>
<include>
<groupId>nothing</groupId>
<artifactId>nothing</artifactId>
</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

View File

@ -15,10 +15,4 @@ spring:
config:
import:
- optional:nacos:jeecg.yaml
- optional:nacos:jeecg-@profile.name@.yaml
# 调用监控服务配置信息
monitor:
username: wmhr
password: Wmhr.123
url: http://218.249.158.97:7008/mobile/monitor
- optional:nacos:jeecg-@profile.name@.yaml

View File

@ -15,4 +15,9 @@ spring:
config:
import:
- optional:nacos:jeecg.yaml
- optional:nacos:jeecg-@profile.name@.yaml
- optional:nacos:jeecg-@profile.name@.yaml
# 调用监控服务配置信息
monitor:
username: wmhr
password: Wmhr.123
url: http://218.249.158.97:7008/mobile/monitor