feat:Database Monitor

This commit is contained in:
nieziyan 2023-11-21 18:41:12 +08:00
parent 8e1b03cf7d
commit 6f6f25e763
13 changed files with 361 additions and 341 deletions

View File

@ -1,6 +1,8 @@
package org.jeecg.common.util;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.modules.base.entity.postgre.SysDatabase;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DriverManagerDataSource;
@ -10,6 +12,7 @@ import java.io.IOException;
import java.sql.*;
import java.util.Properties;
@Slf4j
public class JDBCUtil {
public static JdbcTemplate template(String url, String driver, String user, String pass){
@ -21,69 +24,26 @@ public class JDBCUtil {
return new JdbcTemplate(dataSource);
}
public static JdbcTemplate templateMy(String ip, String port, String db, String user, String pass){
return new JdbcTemplate(dataSourceMy(ip, port, db, user, pass));
}
public static JdbcTemplate templateMy(String ip, String db, String user, String pass){
return templateMy(ip, "3306", db, user, pass);
}
public static JdbcTemplate templateOr(String ip, String port, String db, String user, String pass){
return new JdbcTemplate(dataSourceOr(ip, port, db, user, pass));
}
public static JdbcTemplate templateOr(String ip, String db, String user, String pass){
return templateOr(ip, "1521", db, user, pass);
}
public static JdbcTemplate templatePg(String ip, String port, String db, String user, String pass){
return new JdbcTemplate(dataSourcePg(ip, port, db, user, pass));
}
public static JdbcTemplate templatePg(String ip, String db, String user, String pass){
return templatePg(ip, "5432", db, user, pass);
}
private static DataSource dataSourceMy(String ip, String port, String db, String user, String pass) {
public static boolean testConnection(String url, String driver, String user, String pass){
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setUrl(MySQLUrl(ip, port, db));
dataSource.setUrl(url);
dataSource.setDriverClassName(driver);
dataSource.setUsername(user);
dataSource.setPassword(pass);
return dataSource;
}
private static DataSource dataSourceOr(String ip, String port, String db, String user, String pass) {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setUrl(OracleUrl(ip, port, db));
dataSource.setUsername(user);
dataSource.setPassword(pass);
return dataSource;
}
private static DataSource dataSourcePg(String ip, String port, String db, String user, String pass) {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setUrl(PostgreUrl(ip, port, db));
dataSource.setUsername(user);
dataSource.setPassword(pass);
return dataSource;
}
private static String MySQLUrl(String ip, String port, String db){
return "jdbc:mysql://" + ip + ":" + port + "/" + db +
"?characterEncoding=UTF-8&useUnicode=true&" +
"useSSL=false&tinyInt1isBit=false&" +
"allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai";
}
private static String OracleUrl(String ip, String port, String db){
return "jdbc:oracle:thin:@" + ip + ":" + port + ":" + db;
}
private static String PostgreUrl(String ip, String port, String db){
return "jdbc:postgresql://" + ip + ":" + port + "/" + db +
"?stringtype=unspecified";
// try-with-resources 无须显式关闭Connection资源
try (Connection connection = dataSource.getConnection()) {
return true;
} catch (SQLException e) {
log.error("JDBCUtil.testConnection(): 数据源["+ url +"]连接异常,原因: {}",e.getMessage());
return false;
}
}
public static boolean testConnection(SysDatabase database){
String dbUrl = database.getDbUrl();
String dbDriver = database.getDbDriver();
String dbUsername = database.getDbUsername();
String dbPassword = database.getDbPassword();
return testConnection(dbUrl, dbDriver, dbUsername, dbPassword);
}
}

View File

@ -7,6 +7,9 @@ import cn.hutool.core.util.StrUtil;
import java.math.BigDecimal;
import java.math.RoundingMode;
import static org.jeecg.modules.base.enums.Op.*;
import static org.jeecg.modules.base.enums.Op.LE;
public class NumUtil {
/*
@ -78,4 +81,48 @@ public class NumUtil {
return String.valueOf(decimal.setScale(scale, RoundingMode.HALF_UP)
.doubleValue());
}
public static boolean compare(Number current, Number threshold, String op){
boolean cNull = ObjectUtil.isNull(current);
boolean tNull = ObjectUtil.isNull(threshold);
if (cNull || tNull) return false;
if (current instanceof Double && threshold instanceof Double){
double currentV = (double) current;
double thresholdV = (double) threshold;
if (EQ.getOp().equals(op)){
return currentV == thresholdV;
} else if (GT.getOp().equals(op)) {
return currentV > thresholdV;
} else if (GE.getOp().equals(op)) {
return currentV >= thresholdV;
} else if (LT.getOp().equals(op)) {
return currentV < thresholdV;
} else if (LE.getOp().equals(op)) {
return currentV <= thresholdV;
}else {
return false;
}
} else if (current instanceof Integer && threshold instanceof Double) {
int currentV = (int) current;
int thresholdV = (int) threshold;
if (EQ.getOp().equals(op)){
return currentV == thresholdV;
} else if (GT.getOp().equals(op)) {
return currentV > thresholdV;
} else if (GE.getOp().equals(op)) {
return currentV >= thresholdV;
} else if (LT.getOp().equals(op)) {
return currentV < thresholdV;
} else if (LE.getOp().equals(op)) {
return currentV <= thresholdV;
}else {
return false;
}
}else {
return false;
}
}
}

View File

@ -16,6 +16,8 @@ public class ItemDto implements Serializable{
private String units;
private String valueType;
public ItemDto(AlarmItem alarmItem) {
this.itemId = alarmItem.getId();
this.name = alarmItem.getName();
@ -26,5 +28,6 @@ public class ItemDto implements Serializable{
this.itemId = alarmItemDe.getId();
this.name = alarmItemDe.getName();
this.units = alarmItemDe.getUnits();
this.valueType = alarmItemDe.getValueType();
}
}

View File

@ -12,10 +12,7 @@ public class Rule implements Serializable {
private String operator; // 比较符
private Double threshold; // 阈值
private Number threshold; // 阈值
private String units; // 单位
@JsonIgnore
private Double current; // 当前值
}

View File

@ -1,13 +1,22 @@
package org.jeecg.modules.base.entity.postgre;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import com.fasterxml.jackson.annotation.JsonFormat;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.jeecg.common.system.base.entity.JeecgEntity;
import org.jeecgframework.poi.excel.annotation.Excel;
import org.springframework.format.annotation.DateTimeFormat;
@Data
@TableName("alarm_item_de")
public class AlarmItemDe extends JeecgEntity {
public class AlarmItemDe {
@TableId(type = IdType.AUTO)
private String id;
@TableField("name")
private String name;
@ -20,4 +29,31 @@ public class AlarmItemDe extends JeecgEntity {
@TableField("source_type")
private String sourceType;
@TableField("value_type")
private String valueType;
/**
* 创建人
*/
private java.lang.String createBy;
/**
* 创建时间
*/
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private java.util.Date createTime;
/**
* 更新人
*/
private java.lang.String updateBy;
/**
* 更新时间
*/
@JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
@DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss")
private java.util.Date updateTime;
}

View File

@ -0,0 +1,23 @@
package org.jeecg.modules.base.enums;
import cn.hutool.core.util.StrUtil;
import lombok.AllArgsConstructor;
import lombok.Getter;
@Getter
@AllArgsConstructor
public enum Item {
DATABASE_CONN("2", "数据源连接情况");
private String value;
private String name;
public static Item of(String value){
for (Item item : Item.values()) {
if (StrUtil.equals(item.getValue(), value))
return item;
}
return null;
}
}

View File

@ -82,4 +82,15 @@ public class SysDatabaseController {
public Result<?> spaceInfo(@RequestParam String sourceId) {
return Result.OK(sysDatabaseService.spaceInfo(sourceId));
}
@GetMapping("getById")
@ApiOperation(value = "根据id查询SysDatabase",notes = "根据id查询SysDatabase")
public SysDatabase getById(@RequestParam String sourceId) {
return sysDatabaseService.getById(sourceId);
}
@PutMapping("updateStatus")
public void updateStatus(@RequestBody SysDatabase sysDatabase){
sysDatabaseService.updateById(sysDatabase);
}
}

View File

@ -92,7 +92,6 @@ public class AlarmConsumer implements StreamListener<String, ObjectRecord<String
if (StrUtil.isBlank(operator))continue;
ObjectMapper mapper = new ObjectMapper();
Rule rule = mapper.readValue(operator, Rule.class);
rule.setCurrent(current);
boolean needWarn = parse(rule);
// 发送预警信息
if (needWarn){
@ -107,31 +106,8 @@ public class AlarmConsumer implements StreamListener<String, ObjectRecord<String
* @return 是否需要发送预警信息
*/
private boolean parse(Rule rule){
Double current = rule.getCurrent();
String op = rule.getOperator();
Double threshold = rule.getThreshold();
boolean cNull = ObjectUtil.isNull(current);
boolean tNull = ObjectUtil.isNull(threshold);
if (cNull || tNull)return false;
double currentV = current;
double thresholdV = threshold;
if (EQ.getOp().equals(op)){
return currentV == thresholdV;
} else if (GT.getOp().equals(op)) {
return currentV > thresholdV;
} else if (GE.getOp().equals(op)) {
return currentV >= thresholdV;
} else if (LT.getOp().equals(op)) {
return currentV < thresholdV;
} else if (LE.getOp().equals(op)) {
return currentV <= thresholdV;
}else {
return false;
return true;
}
}
private void init(){
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
}

View File

@ -2,13 +2,11 @@ package org.jeecg.modules.feignclient;
import org.jeecg.common.api.vo.Result;
import org.jeecg.modules.base.entity.postgre.AlarmLog;
import org.jeecg.modules.base.entity.postgre.SysDatabase;
import org.jeecg.modules.base.entity.postgre.SysEmail;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@ -26,7 +24,7 @@ public interface AbnormalAlarmClient {
/* AlarmLogController下相关接口 */
@PostMapping("/alarmLog/create")
Result create(@RequestBody AlarmLog alarmLog);
Result<?> create(@RequestBody AlarmLog alarmLog);
/* CalculateConcController下相关接口 */
@GetMapping("/calculateConc/caclAndSave")
@ -39,4 +37,11 @@ public interface AbnormalAlarmClient {
/* AlarmItemController下相关接口 */
@GetMapping("/alarmItem/syncServerItem")
boolean syncServerItem();
/* SysDatabaseController下相关接口 */
@GetMapping("/sysDatabase/getById")
SysDatabase getDatabase(@RequestParam String sourceId);
@PutMapping("/sysDatabase/updateStatus")
void updateDatabase(@RequestBody SysDatabase database);
}

View File

@ -0,0 +1,44 @@
package org.jeecg.modules.quartz.entity;
import cn.hutool.core.util.ObjectUtil;
import lombok.Getter;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.feignclient.AbnormalAlarmClient;
import org.jeecg.modules.feignclient.MonitorSystem;
import org.jeecg.modules.message.SendMessage;
import static org.jeecg.common.util.TokenUtils.getTempToken;
@Getter
public abstract class Monitor {
private SendMessage sendMessage;
private MonitorSystem monitorSystem;
private RedisStreamUtil redisStreamUtil;
private AbnormalAlarmClient alarmClient;
/*
* 规则首次触发报警后,设置该规则的沉默周期(如果有)
*/
protected void ruleSilence(String silenceKey ,Long silenceCycle) {
if (ObjectUtil.isNotNull(silenceCycle))
redisStreamUtil.set(silenceKey, silenceCycle, silenceCycle);
}
protected void init(){
// start:生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
sendMessage = SpringContextUtils.getBean(SendMessage.class);
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
alarmClient = SpringContextUtils.getBean(AbnormalAlarmClient.class);
}
protected void destroy() {
// end:删除临时Token
UserTokenContext.remove();
}
}

View File

@ -0,0 +1,151 @@
package org.jeecg.modules.quartz.job;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.DateConstant;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.util.JDBCUtil;
import org.jeecg.common.util.NumUtil;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.entity.Rule;
import org.jeecg.modules.base.entity.monitor.ItemHistory;
import org.jeecg.modules.base.entity.postgre.AlarmLog;
import org.jeecg.modules.base.entity.postgre.AlarmRule;
import org.jeecg.modules.base.entity.postgre.SysDatabase;
import org.jeecg.modules.base.enums.Item;
import org.jeecg.modules.feignclient.AbnormalAlarmClient;
import org.jeecg.modules.feignclient.MonitorSystem;
import org.jeecg.modules.message.SendMessage;
import org.jeecg.modules.quartz.entity.Monitor;
import org.quartz.*;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import static org.jeecg.common.util.TokenUtils.getTempToken;
import static org.jeecg.modules.base.enums.Op.*;
import static org.jeecg.modules.base.enums.SourceType.DATABASE;
@Data
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class DatabaseJob extends Monitor implements Job{
/**
* 解析Database预警规则
**/
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
init();
// 查询所有Database的报警规则,根据报警规则查询监控项数据
String pattern = RedisConstant.PREFIX_RULE + DATABASE.getType();
Set<String> keys = getRedisStreamUtil().keys(pattern);
if (CollUtil.isEmpty(keys)) return;
String prefixSilence = RedisConstant.PREFIX_SILENCE;
String operator = null;
for (String ruleKey : keys) {
try {
AlarmRule alarmRule = (AlarmRule) getRedisStreamUtil().get(ruleKey);
// 如果报警规则为空,或者在沉默周期内,跳过当前规则
operator = alarmRule.getOperator();
String ruleId = alarmRule.getId();
String itemId = alarmRule.getItemId();
String silenceKey = prefixSilence + ruleId;
boolean hasKey = getRedisStreamUtil().hasKey(silenceKey);
boolean blank1 = StrUtil.isBlank(operator);
boolean blank2 = StrUtil.isBlank(itemId);
if (blank1 || blank2 || hasKey) continue;
// 根据sourceId查询Database信息
String sourceId = alarmRule.getSourceId();
SysDatabase database = getAlarmClient().getDatabase(sourceId);
if (ObjectUtil.isNull(database)) continue;
// 根据监控项id选择要查询的监控项信息
Item item = Item.of(itemId);
if (ObjectUtil.isNull(item)) continue;
Number current = null;
switch (item){
case DATABASE_CONN: // 监控项-2: 测试数据源是否可以连接成功
current = isConnection(database);
break;
// 追加的监控项...
default:
break;
}
// 解析预警规则,判断是否需要报警
ObjectMapper mapper = new ObjectMapper();
Rule rule = mapper.readValue(operator, Rule.class);
String op = rule.getOperator();
Number threshold = rule.getThreshold();
boolean needWarn = NumUtil.compare(current, threshold, op);
if (needWarn){
// 记录报警日志
AlarmLog alarmLog = new AlarmLog();
alarmLog.setRuleId(ruleId);
alarmLog.setOperator(operator);
alarmLog.setAlarmValue(StrUtil.toString(current));
String ruleName = alarmRule.getName();
String message = "您设定的预警规则:"+ruleName+"," +
"预警信息为:"+ operator + ",当前值为:" + current;
alarmLog.setAlarmInfo(message);
getAlarmClient().create(alarmLog);
// 规则触发报警后,设置该规则的沉默周期(如果有)
// 沉默周期失效之前,该规则不会再次被触发
Long silenceCycle = alarmRule.getSilenceCycle();
ruleSilence(silenceKey, silenceCycle);
// 发送报警信息
String groupId = alarmRule.getContactId();
String notific = alarmRule.getNotification();
getSendMessage().send(message, groupId, notific);
}
} catch (JsonProcessingException e) {
log.error("Database预警规则:{}解析失败,失败原因:{}!", operator, e.getMessage());
}catch (RuntimeException e){
log.error("Database监控异常:{}",e.getMessage());
}
}
destroy();
}
/*
* 监控项-2: 测试数据源是否可以连接成功 (0:失败 1:成功)
* */
private Integer isConnection(SysDatabase database){
int res = 0;
try {
boolean conn = JDBCUtil.testConnection(database);
res = conn ? 1 : res;
// 如果数据源连接状态发生变化则进行更新
Integer status = database.getStatus();
if (ObjectUtil.isNotNull(status)){
if (res != status){
database.setStatus(res);
getAlarmClient().updateDatabase(database);
}
}else {
database.setStatus(res);
getAlarmClient().updateDatabase(database);
}
return res;
}catch (Exception e){
log.error("Database监控-更新数据源状态异常:{}", e.getMessage());
return res;
}
}
}

View File

@ -11,6 +11,7 @@ import org.jeecg.common.api.vo.Result;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.DateConstant;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.util.NumUtil;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.entity.Rule;
@ -20,6 +21,7 @@ import org.jeecg.modules.base.entity.postgre.AlarmRule;
import org.jeecg.modules.feignclient.AbnormalAlarmClient;
import org.jeecg.modules.feignclient.MonitorSystem;
import org.jeecg.modules.message.SendMessage;
import org.jeecg.modules.quartz.entity.Monitor;
import org.quartz.*;
import static org.jeecg.common.util.TokenUtils.getTempToken;
import java.time.LocalDateTime;
@ -33,15 +35,7 @@ import static org.jeecg.modules.base.enums.SourceType.SERVER;
@Slf4j
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class ServerJob implements Job {
private String parameter;
private SendMessage sendMessage;
private MonitorSystem monitorSystem;
private RedisStreamUtil redisStreamUtil;
private AbnormalAlarmClient alarmClient;
public class ServerJob extends Monitor implements Job {
/**
* 根据host定时查询服务器信息
* 并向消息队列中推送信息
@ -54,7 +48,7 @@ public class ServerJob implements Job {
// 查询所有Server的报警规则,根据报警规则查询监控项数据
String pattern = RedisConstant.PREFIX_RULE + SERVER.getType();
Set<String> keys = redisStreamUtil.keys(pattern);
Set<String> keys = getRedisStreamUtil().keys(pattern);
if (CollUtil.isEmpty(keys)) return;
// 时间间隔为每分钟
@ -71,27 +65,28 @@ public class ServerJob implements Job {
String operator = null;
for (String ruleKey : keys) {
try {
AlarmRule alarmRule = (AlarmRule) redisStreamUtil.get(ruleKey);
AlarmRule alarmRule = (AlarmRule) getRedisStreamUtil().get(ruleKey);
// 如果报警规则为空,或者在沉默周期内,跳过当前规则
operator = alarmRule.getOperator();
String ruleId = alarmRule.getId();
String itemId = alarmRule.getItemId();
String silenceKey = prefixSilence + ruleId;
boolean hasKey = redisStreamUtil.hasKey(silenceKey);
boolean hasKey = getRedisStreamUtil().hasKey(silenceKey);
boolean blank1 = StrUtil.isBlank(operator);
boolean blank2 = StrUtil.isBlank(itemId);
if (blank1 || blank2 || hasKey) continue;
// 向运管查询监控项数据
Result<ItemHistory> result = monitorSystem.itemBack(itemId, 0, start, end);
Result<ItemHistory> result = getMonitorSystem().itemBack(itemId, 0, start, end);
Double current = result.getResult().getNow();
// 解析预警规则,判断是否需要报警
ObjectMapper mapper = new ObjectMapper();
Rule rule = mapper.readValue(operator, Rule.class);
rule.setCurrent(current);
boolean needWarn = parse(rule);
String op = rule.getOperator();
Number threshold = rule.getThreshold();
boolean needWarn = NumUtil.compare(current, threshold, op);
if (needWarn){
// 记录报警日志
AlarmLog alarmLog = new AlarmLog();
@ -99,11 +94,10 @@ public class ServerJob implements Job {
alarmLog.setOperator(operator);
alarmLog.setAlarmValue(StrUtil.toString(current));
String ruleName = alarmRule.getName();
Double threshold = rule.getThreshold();
String message = "您设定的预警规则:"+ruleName+"," +
"预警信息为:"+ operator + ",当前值为:" + current;
alarmLog.setAlarmInfo(message);
alarmClient.create(alarmLog);
getAlarmClient().create(alarmLog);
// 规则触发报警后,设置该规则的沉默周期(如果有)
// 沉默周期失效之前,该规则不会再次被触发
@ -113,63 +107,14 @@ public class ServerJob implements Job {
// 发送报警信息
String groupId = alarmRule.getContactId();
String notific = alarmRule.getNotification();
sendMessage.send(message, groupId, notific);
getSendMessage().send(message, groupId, notific);
}
} catch (JsonProcessingException e) {
log.error("预警规则{}解析失败!", operator);
e.printStackTrace();
log.error("Server预警规则:{}解析失败,失败原因:{}!", operator, e.getMessage());
}catch (RuntimeException e){
e.printStackTrace();
log.error("Server监控异常:{}",e.getMessage());
}
}
destroy();
}
private boolean parse(Rule rule){
Double current = rule.getCurrent();
String op = rule.getOperator();
Double threshold = rule.getThreshold();
boolean cNull = ObjectUtil.isNull(current);
boolean tNull = ObjectUtil.isNull(threshold);
if (cNull || tNull) return false;
double currentV = current;
double thresholdV = threshold;
if (EQ.getOp().equals(op)){
return currentV == thresholdV;
} else if (GT.getOp().equals(op)) {
return currentV > thresholdV;
} else if (GE.getOp().equals(op)) {
return currentV >= thresholdV;
} else if (LT.getOp().equals(op)) {
return currentV < thresholdV;
} else if (LE.getOp().equals(op)) {
return currentV <= thresholdV;
}else {
return false;
}
}
/*
* 规则首次触发报警后,设置该规则的沉默周期(如果有)
*/
private void ruleSilence(String silenceKey ,Long silenceCycle) {
if (ObjectUtil.isNotNull(silenceCycle))
redisStreamUtil.set(silenceKey, silenceCycle, silenceCycle);
}
private void init(){
// start:生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
sendMessage = SpringContextUtils.getBean(SendMessage.class);
monitorSystem = SpringContextUtils.getBean(MonitorSystem.class);
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
alarmClient = SpringContextUtils.getBean(AbnormalAlarmClient.class);
}
private void destroy(){
// end:删除临时Token
UserTokenContext.remove();
}
}

View File

@ -1,178 +0,0 @@
package org.jeecg.modules.test;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.constant.DateConstant;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.entity.Rule;
import org.jeecg.modules.base.entity.monitor.ItemHistory;
import org.jeecg.modules.base.entity.postgre.AlarmLog;
import org.jeecg.modules.base.entity.postgre.AlarmRule;
import org.jeecg.modules.feignclient.AbnormalAlarmClient;
import org.jeecg.modules.feignclient.MonitorSystem;
import org.jeecg.modules.message.SendMessage;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Set;
import static org.jeecg.common.util.TokenUtils.getTempToken;
import static org.jeecg.modules.base.enums.Op.*;
@Slf4j
@RestController
@RequestMapping("/sys/job")
public class JobController {
private SendMessage sendMessage;
private MonitorSystem monitorSystem;
private RedisStreamUtil redisStreamUtil;
private AbnormalAlarmClient alarmClient;
/**
* 根据host定时查询服务器信息
* 并向消息队列中推送信息
*
*/
@GetMapping("start")
public void startJob(){
init();
// 查询所有报警规则,根据报警规则查询监控项数据
String pattern = RedisConstant.PREFIX_RULE;
Set<String> keys = redisStreamUtil.keys(pattern);
if (CollUtil.isEmpty(keys)) {
return;
}
// 时间间隔为每分钟
LocalDateTime now = LocalDateTime.now()
.withSecond(0)
.withNano(0);
LocalDateTime beforeMin = now.minusMinutes(1);
DateTimeFormatter formatter = DateTimeFormatter
.ofPattern(DateConstant.DATE_TIME);
String start = beforeMin.format(formatter);
String end = now.format(formatter);
String prefixSilence = RedisConstant.PREFIX_SILENCE;
String operator = null;
for (String ruleKey : keys) {
try {
if (StrUtil.equals(RedisConstant.ANALYSIS_RULE, ruleKey))
continue;
AlarmRule alarmRule = (AlarmRule) redisStreamUtil.get(ruleKey);
// 如果报警规则为空,或者在沉默周期内,跳过当前规则
operator = alarmRule.getOperator();
String ruleId = alarmRule.getId();
String itemId = alarmRule.getItemId();
String silenceKey = prefixSilence + ruleId;
boolean hasKey = redisStreamUtil.hasKey(silenceKey);
boolean blank1 = StrUtil.isBlank(operator);
boolean blank2 = StrUtil.isBlank(itemId);
if (blank1 || blank2 || hasKey) continue;
// 向运管查询监控项数据
Result<ItemHistory> result = monitorSystem.itemBack(itemId, 0, start, end);
Double current = result.getResult().getNow();
log.error("当前CPU使用率为:{}", current);
// 解析预警规则,判断是否需要报警
ObjectMapper mapper = new ObjectMapper();
Rule rule = mapper.readValue(operator, Rule.class);
rule.setCurrent(current);
boolean needWarn = parse(rule);
if (needWarn){
// 记录报警日志
AlarmLog alarmLog = new AlarmLog();
alarmLog.setRuleId(ruleId);
alarmLog.setAlarmValue(StrUtil.toString(current));
String ruleName = alarmRule.getName();
Double threshold = rule.getThreshold();
String message = "您设定的预警规则:"+ruleName+"," +
"预警信息为:"+ operator + ",当前值为:" + current;
alarmLog.setAlarmInfo(message);
alarmClient.create(alarmLog);
// 规则触发报警后,设置该规则的沉默周期(如果有)
// 沉默周期失效之前,该规则不会再次被触发
Long silenceCycle = alarmRule.getSilenceCycle();
ruleSilence(silenceKey, silenceCycle);
// 发送报警信息
String groupId = alarmRule.getContactId();
String notific = alarmRule.getNotification();
sendMessage.send(message,groupId,notific);
}
} catch (JsonProcessingException e) {
log.error("预警规则{}解析失败!", operator);
e.printStackTrace();
}catch (RuntimeException e){
e.printStackTrace();
}
}
destroy();
}
private boolean parse(Rule rule){
Double current = rule.getCurrent();
String op = rule.getOperator();
Double threshold = rule.getThreshold();
boolean cNull = ObjectUtil.isNull(current);
boolean tNull = ObjectUtil.isNull(threshold);
if (cNull || tNull) return false;
double currentV = current;
double thresholdV = threshold;
if (EQ.getOp().equals(op)){
return currentV == thresholdV;
} else if (GT.getOp().equals(op)) {
return currentV > thresholdV;
} else if (GE.getOp().equals(op)) {
return currentV >= thresholdV;
} else if (LT.getOp().equals(op)) {
return currentV < thresholdV;
} else if (LE.getOp().equals(op)) {
return currentV <= thresholdV;
}else {
return false;
}
}
/*
* 规则首次触发报警后,设置该规则的沉默周期(如果有)
*/
private void ruleSilence(String silenceKey ,Long silenceCycle) {
if (ObjectUtil.isNotNull(silenceCycle))
redisStreamUtil.set(silenceKey, silenceCycle, silenceCycle);
}
private void init(){
// start:生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
sendMessage = SpringContextUtils.getBean(SendMessage.class);
monitorSystem = SpringContextUtils.getBean(MonitorSystem.class);
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
alarmClient = SpringContextUtils.getBean(AbnormalAlarmClient.class);
}
private void destroy(){
// end:删除临时Token
UserTokenContext.remove();
}
}