From 6f6f25e7633a1033df968cfad53a8ea843110caf Mon Sep 17 00:00:00 2001 From: nieziyan Date: Tue, 21 Nov 2023 18:41:12 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat=EF=BC=9ADatabase=20Monitor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/org/jeecg/common/util/JDBCUtil.java | 78 ++------ .../java/org/jeecg/common/util/NumUtil.java | 47 +++++ .../org/jeecg/modules/base/dto/ItemDto.java | 3 + .../org/jeecg/modules/base/entity/Rule.java | 5 +- .../base/entity/postgre/AlarmItemDe.java | 38 +++- .../org/jeecg/modules/base/enums/Item.java | 23 +++ .../controller/SysDatabaseController.java | 11 ++ .../modules/redisStream/AlarmConsumer.java | 26 +-- .../feignclient/AbnormalAlarmClient.java | 15 +- .../jeecg/modules/quartz/entity/Monitor.java | 44 +++++ .../jeecg/modules/quartz/job/DatabaseJob.java | 151 +++++++++++++++ .../jeecg/modules/quartz/job/ServerJob.java | 83 ++------ .../org/jeecg/modules/test/JobController.java | 178 ------------------ 13 files changed, 361 insertions(+), 341 deletions(-) create mode 100644 jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Item.java create mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/entity/Monitor.java create mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java delete mode 100644 jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/test/JobController.java diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/JDBCUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/JDBCUtil.java index e9cd4b24..d5717e09 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/JDBCUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/JDBCUtil.java @@ -1,6 +1,8 @@ package org.jeecg.common.util; import cn.hutool.core.util.ObjectUtil; +import lombok.extern.slf4j.Slf4j; +import org.jeecg.modules.base.entity.postgre.SysDatabase; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.jdbc.datasource.DriverManagerDataSource; @@ -10,6 +12,7 @@ import java.io.IOException; import java.sql.*; import java.util.Properties; +@Slf4j public class JDBCUtil { public static JdbcTemplate template(String url, String driver, String user, String pass){ @@ -21,69 +24,26 @@ public class JDBCUtil { return new JdbcTemplate(dataSource); } - public static JdbcTemplate templateMy(String ip, String port, String db, String user, String pass){ - return new JdbcTemplate(dataSourceMy(ip, port, db, user, pass)); - } - - public static JdbcTemplate templateMy(String ip, String db, String user, String pass){ - return templateMy(ip, "3306", db, user, pass); - } - - public static JdbcTemplate templateOr(String ip, String port, String db, String user, String pass){ - return new JdbcTemplate(dataSourceOr(ip, port, db, user, pass)); - } - - public static JdbcTemplate templateOr(String ip, String db, String user, String pass){ - return templateOr(ip, "1521", db, user, pass); - } - - public static JdbcTemplate templatePg(String ip, String port, String db, String user, String pass){ - return new JdbcTemplate(dataSourcePg(ip, port, db, user, pass)); - } - - public static JdbcTemplate templatePg(String ip, String db, String user, String pass){ - return templatePg(ip, "5432", db, user, pass); - } - - private static DataSource dataSourceMy(String ip, String port, String db, String user, String pass) { + public static boolean testConnection(String url, String driver, String user, String pass){ DriverManagerDataSource dataSource = new DriverManagerDataSource(); - dataSource.setUrl(MySQLUrl(ip, port, db)); + dataSource.setUrl(url); + dataSource.setDriverClassName(driver); dataSource.setUsername(user); dataSource.setPassword(pass); - return dataSource; + // try-with-resources 无须显式关闭Connection资源 + try (Connection connection = dataSource.getConnection()) { + return true; + } catch (SQLException e) { + log.error("JDBCUtil.testConnection(): 数据源["+ url +"]连接异常,原因: {}",e.getMessage()); + return false; + } } - private static DataSource dataSourceOr(String ip, String port, String db, String user, String pass) { - DriverManagerDataSource dataSource = new DriverManagerDataSource(); - dataSource.setUrl(OracleUrl(ip, port, db)); - dataSource.setUsername(user); - dataSource.setPassword(pass); - return dataSource; - } - - private static DataSource dataSourcePg(String ip, String port, String db, String user, String pass) { - DriverManagerDataSource dataSource = new DriverManagerDataSource(); - dataSource.setUrl(PostgreUrl(ip, port, db)); - dataSource.setUsername(user); - dataSource.setPassword(pass); - return dataSource; - } - - - - private static String MySQLUrl(String ip, String port, String db){ - return "jdbc:mysql://" + ip + ":" + port + "/" + db + - "?characterEncoding=UTF-8&useUnicode=true&" + - "useSSL=false&tinyInt1isBit=false&" + - "allowPublicKeyRetrieval=true&serverTimezone=Asia/Shanghai"; - } - - private static String OracleUrl(String ip, String port, String db){ - return "jdbc:oracle:thin:@" + ip + ":" + port + ":" + db; - } - - private static String PostgreUrl(String ip, String port, String db){ - return "jdbc:postgresql://" + ip + ":" + port + "/" + db + - "?stringtype=unspecified"; + public static boolean testConnection(SysDatabase database){ + String dbUrl = database.getDbUrl(); + String dbDriver = database.getDbDriver(); + String dbUsername = database.getDbUsername(); + String dbPassword = database.getDbPassword(); + return testConnection(dbUrl, dbDriver, dbUsername, dbPassword); } } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java index 0c5140cc..f2499df8 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java @@ -7,6 +7,9 @@ import cn.hutool.core.util.StrUtil; import java.math.BigDecimal; import java.math.RoundingMode; +import static org.jeecg.modules.base.enums.Op.*; +import static org.jeecg.modules.base.enums.Op.LE; + public class NumUtil { /* @@ -78,4 +81,48 @@ public class NumUtil { return String.valueOf(decimal.setScale(scale, RoundingMode.HALF_UP) .doubleValue()); } + + public static boolean compare(Number current, Number threshold, String op){ + boolean cNull = ObjectUtil.isNull(current); + boolean tNull = ObjectUtil.isNull(threshold); + if (cNull || tNull) return false; + + if (current instanceof Double && threshold instanceof Double){ + double currentV = (double) current; + double thresholdV = (double) threshold; + + if (EQ.getOp().equals(op)){ + return currentV == thresholdV; + } else if (GT.getOp().equals(op)) { + return currentV > thresholdV; + } else if (GE.getOp().equals(op)) { + return currentV >= thresholdV; + } else if (LT.getOp().equals(op)) { + return currentV < thresholdV; + } else if (LE.getOp().equals(op)) { + return currentV <= thresholdV; + }else { + return false; + } + } else if (current instanceof Integer && threshold instanceof Double) { + int currentV = (int) current; + int thresholdV = (int) threshold; + + if (EQ.getOp().equals(op)){ + return currentV == thresholdV; + } else if (GT.getOp().equals(op)) { + return currentV > thresholdV; + } else if (GE.getOp().equals(op)) { + return currentV >= thresholdV; + } else if (LT.getOp().equals(op)) { + return currentV < thresholdV; + } else if (LE.getOp().equals(op)) { + return currentV <= thresholdV; + }else { + return false; + } + }else { + return false; + } + } } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/dto/ItemDto.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/dto/ItemDto.java index 1e24e515..22d78753 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/dto/ItemDto.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/dto/ItemDto.java @@ -16,6 +16,8 @@ public class ItemDto implements Serializable{ private String units; + private String valueType; + public ItemDto(AlarmItem alarmItem) { this.itemId = alarmItem.getId(); this.name = alarmItem.getName(); @@ -26,5 +28,6 @@ public class ItemDto implements Serializable{ this.itemId = alarmItemDe.getId(); this.name = alarmItemDe.getName(); this.units = alarmItemDe.getUnits(); + this.valueType = alarmItemDe.getValueType(); } } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java index 030acd24..334f9c84 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java @@ -12,10 +12,7 @@ public class Rule implements Serializable { private String operator; // 比较符 - private Double threshold; // 阈值 + private Number threshold; // 阈值 private String units; // 单位 - - @JsonIgnore - private Double current; // 当前值 } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/postgre/AlarmItemDe.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/postgre/AlarmItemDe.java index 82b7a312..acc7720d 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/postgre/AlarmItemDe.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/postgre/AlarmItemDe.java @@ -1,13 +1,22 @@ package org.jeecg.modules.base.entity.postgre; +import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; +import com.baomidou.mybatisplus.annotation.TableId; import com.baomidou.mybatisplus.annotation.TableName; +import com.fasterxml.jackson.annotation.JsonFormat; +import io.swagger.annotations.ApiModelProperty; import lombok.Data; import org.jeecg.common.system.base.entity.JeecgEntity; +import org.jeecgframework.poi.excel.annotation.Excel; +import org.springframework.format.annotation.DateTimeFormat; @Data @TableName("alarm_item_de") -public class AlarmItemDe extends JeecgEntity { +public class AlarmItemDe { + + @TableId(type = IdType.AUTO) + private String id; @TableField("name") private String name; @@ -20,4 +29,31 @@ public class AlarmItemDe extends JeecgEntity { @TableField("source_type") private String sourceType; + + @TableField("value_type") + private String valueType; + + /** + * 创建人 + */ + private java.lang.String createBy; + + /** + * 创建时间 + */ + @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private java.util.Date createTime; + + /** + * 更新人 + */ + private java.lang.String updateBy; + + /** + * 更新时间 + */ + @JsonFormat(timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss") + @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") + private java.util.Date updateTime; } diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Item.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Item.java new file mode 100644 index 00000000..79f98993 --- /dev/null +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/enums/Item.java @@ -0,0 +1,23 @@ +package org.jeecg.modules.base.enums; + +import cn.hutool.core.util.StrUtil; +import lombok.AllArgsConstructor; +import lombok.Getter; + +@Getter +@AllArgsConstructor +public enum Item { + DATABASE_CONN("2", "数据源连接情况"); + + private String value; + + private String name; + + public static Item of(String value){ + for (Item item : Item.values()) { + if (StrUtil.equals(item.getValue(), value)) + return item; + } + return null; + } +} diff --git a/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/controller/SysDatabaseController.java b/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/controller/SysDatabaseController.java index ce3c5bec..839ef822 100644 --- a/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/controller/SysDatabaseController.java +++ b/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/controller/SysDatabaseController.java @@ -82,4 +82,15 @@ public class SysDatabaseController { public Result spaceInfo(@RequestParam String sourceId) { return Result.OK(sysDatabaseService.spaceInfo(sourceId)); } + + @GetMapping("getById") + @ApiOperation(value = "根据id查询SysDatabase",notes = "根据id查询SysDatabase") + public SysDatabase getById(@RequestParam String sourceId) { + return sysDatabaseService.getById(sourceId); + } + + @PutMapping("updateStatus") + public void updateStatus(@RequestBody SysDatabase sysDatabase){ + sysDatabaseService.updateById(sysDatabase); + } } diff --git a/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/AlarmConsumer.java b/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/AlarmConsumer.java index feb70f78..1eaf0340 100644 --- a/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/AlarmConsumer.java +++ b/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/AlarmConsumer.java @@ -92,7 +92,6 @@ public class AlarmConsumer implements StreamListener thresholdV; - } else if (GE.getOp().equals(op)) { - return currentV >= thresholdV; - } else if (LT.getOp().equals(op)) { - return currentV < thresholdV; - } else if (LE.getOp().equals(op)) { - return currentV <= thresholdV; - }else { - return false; - } + return true; } - private void init(){ redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class); } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/feignclient/AbnormalAlarmClient.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/feignclient/AbnormalAlarmClient.java index b0ecf721..71f6611e 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/feignclient/AbnormalAlarmClient.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/feignclient/AbnormalAlarmClient.java @@ -2,13 +2,11 @@ package org.jeecg.modules.feignclient; import org.jeecg.common.api.vo.Result; import org.jeecg.modules.base.entity.postgre.AlarmLog; +import org.jeecg.modules.base.entity.postgre.SysDatabase; import org.jeecg.modules.base.entity.postgre.SysEmail; import org.springframework.cloud.openfeign.FeignClient; import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.*; import java.util.List; @@ -26,7 +24,7 @@ public interface AbnormalAlarmClient { /* AlarmLogController下相关接口 */ @PostMapping("/alarmLog/create") - Result create(@RequestBody AlarmLog alarmLog); + Result create(@RequestBody AlarmLog alarmLog); /* CalculateConcController下相关接口 */ @GetMapping("/calculateConc/caclAndSave") @@ -39,4 +37,11 @@ public interface AbnormalAlarmClient { /* AlarmItemController下相关接口 */ @GetMapping("/alarmItem/syncServerItem") boolean syncServerItem(); + + /* SysDatabaseController下相关接口 */ + @GetMapping("/sysDatabase/getById") + SysDatabase getDatabase(@RequestParam String sourceId); + + @PutMapping("/sysDatabase/updateStatus") + void updateDatabase(@RequestBody SysDatabase database); } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/entity/Monitor.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/entity/Monitor.java new file mode 100644 index 00000000..40544a8e --- /dev/null +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/entity/Monitor.java @@ -0,0 +1,44 @@ +package org.jeecg.modules.quartz.entity; + +import cn.hutool.core.util.ObjectUtil; +import lombok.Getter; +import org.jeecg.common.config.mqtoken.UserTokenContext; +import org.jeecg.common.util.RedisStreamUtil; +import org.jeecg.common.util.SpringContextUtils; +import org.jeecg.modules.feignclient.AbnormalAlarmClient; +import org.jeecg.modules.feignclient.MonitorSystem; +import org.jeecg.modules.message.SendMessage; + +import static org.jeecg.common.util.TokenUtils.getTempToken; +@Getter +public abstract class Monitor { + + private SendMessage sendMessage; + + private MonitorSystem monitorSystem; + + private RedisStreamUtil redisStreamUtil; + + private AbnormalAlarmClient alarmClient; + + /* + * 规则首次触发报警后,设置该规则的沉默周期(如果有) + */ + protected void ruleSilence(String silenceKey ,Long silenceCycle) { + if (ObjectUtil.isNotNull(silenceCycle)) + redisStreamUtil.set(silenceKey, silenceCycle, silenceCycle); + } + + protected void init(){ + // start:生成临时Token到线程中 + UserTokenContext.setToken(getTempToken()); + sendMessage = SpringContextUtils.getBean(SendMessage.class); + redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class); + alarmClient = SpringContextUtils.getBean(AbnormalAlarmClient.class); + } + + protected void destroy() { + // end:删除临时Token + UserTokenContext.remove(); + } +} diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java new file mode 100644 index 00000000..1c8ba2b6 --- /dev/null +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java @@ -0,0 +1,151 @@ +package org.jeecg.modules.quartz.job; + +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.NumberUtil; +import cn.hutool.core.util.ObjectUtil; +import cn.hutool.core.util.StrUtil; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.jeecg.common.api.vo.Result; +import org.jeecg.common.config.mqtoken.UserTokenContext; +import org.jeecg.common.constant.DateConstant; +import org.jeecg.common.constant.RedisConstant; +import org.jeecg.common.util.JDBCUtil; +import org.jeecg.common.util.NumUtil; +import org.jeecg.common.util.RedisStreamUtil; +import org.jeecg.common.util.SpringContextUtils; +import org.jeecg.modules.base.entity.Rule; +import org.jeecg.modules.base.entity.monitor.ItemHistory; +import org.jeecg.modules.base.entity.postgre.AlarmLog; +import org.jeecg.modules.base.entity.postgre.AlarmRule; +import org.jeecg.modules.base.entity.postgre.SysDatabase; +import org.jeecg.modules.base.enums.Item; +import org.jeecg.modules.feignclient.AbnormalAlarmClient; +import org.jeecg.modules.feignclient.MonitorSystem; +import org.jeecg.modules.message.SendMessage; +import org.jeecg.modules.quartz.entity.Monitor; +import org.quartz.*; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.Set; + +import static org.jeecg.common.util.TokenUtils.getTempToken; +import static org.jeecg.modules.base.enums.Op.*; +import static org.jeecg.modules.base.enums.SourceType.DATABASE; + +@Data +@Slf4j +@PersistJobDataAfterExecution +@DisallowConcurrentExecution +public class DatabaseJob extends Monitor implements Job{ + /** + * 解析Database预警规则 + **/ + @Override + public void execute(JobExecutionContext context) throws JobExecutionException { + init(); + + // 查询所有Database的报警规则,根据报警规则查询监控项数据 + String pattern = RedisConstant.PREFIX_RULE + DATABASE.getType(); + Set keys = getRedisStreamUtil().keys(pattern); + if (CollUtil.isEmpty(keys)) return; + + String prefixSilence = RedisConstant.PREFIX_SILENCE; + String operator = null; + for (String ruleKey : keys) { + try { + AlarmRule alarmRule = (AlarmRule) getRedisStreamUtil().get(ruleKey); + // 如果报警规则为空,或者在沉默周期内,跳过当前规则 + operator = alarmRule.getOperator(); + String ruleId = alarmRule.getId(); + String itemId = alarmRule.getItemId(); + String silenceKey = prefixSilence + ruleId; + boolean hasKey = getRedisStreamUtil().hasKey(silenceKey); + boolean blank1 = StrUtil.isBlank(operator); + boolean blank2 = StrUtil.isBlank(itemId); + + if (blank1 || blank2 || hasKey) continue; + + // 根据sourceId查询Database信息 + String sourceId = alarmRule.getSourceId(); + SysDatabase database = getAlarmClient().getDatabase(sourceId); + if (ObjectUtil.isNull(database)) continue; + + // 根据监控项id选择要查询的监控项信息 + Item item = Item.of(itemId); + if (ObjectUtil.isNull(item)) continue; + Number current = null; + switch (item){ + case DATABASE_CONN: // 监控项-2: 测试数据源是否可以连接成功 + current = isConnection(database); + break; + // 追加的监控项... + default: + break; + } + // 解析预警规则,判断是否需要报警 + ObjectMapper mapper = new ObjectMapper(); + Rule rule = mapper.readValue(operator, Rule.class); + String op = rule.getOperator(); + Number threshold = rule.getThreshold(); + boolean needWarn = NumUtil.compare(current, threshold, op); + if (needWarn){ + // 记录报警日志 + AlarmLog alarmLog = new AlarmLog(); + alarmLog.setRuleId(ruleId); + alarmLog.setOperator(operator); + alarmLog.setAlarmValue(StrUtil.toString(current)); + String ruleName = alarmRule.getName(); + String message = "您设定的预警规则:"+ruleName+"," + + "预警信息为:"+ operator + ",当前值为:" + current; + alarmLog.setAlarmInfo(message); + getAlarmClient().create(alarmLog); + + // 规则触发报警后,设置该规则的沉默周期(如果有) + // 沉默周期失效之前,该规则不会再次被触发 + Long silenceCycle = alarmRule.getSilenceCycle(); + ruleSilence(silenceKey, silenceCycle); + + // 发送报警信息 + String groupId = alarmRule.getContactId(); + String notific = alarmRule.getNotification(); + getSendMessage().send(message, groupId, notific); + } + } catch (JsonProcessingException e) { + log.error("Database预警规则:{}解析失败,失败原因:{}!", operator, e.getMessage()); + }catch (RuntimeException e){ + log.error("Database监控异常:{}",e.getMessage()); + } + } + destroy(); + } + + /* + * 监控项-2: 测试数据源是否可以连接成功 (0:失败 1:成功) + * */ + private Integer isConnection(SysDatabase database){ + int res = 0; + try { + boolean conn = JDBCUtil.testConnection(database); + res = conn ? 1 : res; + // 如果数据源连接状态发生变化则进行更新 + Integer status = database.getStatus(); + if (ObjectUtil.isNotNull(status)){ + if (res != status){ + database.setStatus(res); + getAlarmClient().updateDatabase(database); + } + }else { + database.setStatus(res); + getAlarmClient().updateDatabase(database); + } + return res; + }catch (Exception e){ + log.error("Database监控-更新数据源状态异常:{}", e.getMessage()); + return res; + } + } +} diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java index 3bbd3c8a..01b3dc3a 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java @@ -11,6 +11,7 @@ import org.jeecg.common.api.vo.Result; import org.jeecg.common.config.mqtoken.UserTokenContext; import org.jeecg.common.constant.DateConstant; import org.jeecg.common.constant.RedisConstant; +import org.jeecg.common.util.NumUtil; import org.jeecg.common.util.RedisStreamUtil; import org.jeecg.common.util.SpringContextUtils; import org.jeecg.modules.base.entity.Rule; @@ -20,6 +21,7 @@ import org.jeecg.modules.base.entity.postgre.AlarmRule; import org.jeecg.modules.feignclient.AbnormalAlarmClient; import org.jeecg.modules.feignclient.MonitorSystem; import org.jeecg.modules.message.SendMessage; +import org.jeecg.modules.quartz.entity.Monitor; import org.quartz.*; import static org.jeecg.common.util.TokenUtils.getTempToken; import java.time.LocalDateTime; @@ -33,15 +35,7 @@ import static org.jeecg.modules.base.enums.SourceType.SERVER; @Slf4j @PersistJobDataAfterExecution @DisallowConcurrentExecution -public class ServerJob implements Job { - - private String parameter; - - private SendMessage sendMessage; - private MonitorSystem monitorSystem; - private RedisStreamUtil redisStreamUtil; - private AbnormalAlarmClient alarmClient; - +public class ServerJob extends Monitor implements Job { /** * 根据host定时查询服务器信息 * 并向消息队列中推送信息 @@ -54,7 +48,7 @@ public class ServerJob implements Job { // 查询所有Server的报警规则,根据报警规则查询监控项数据 String pattern = RedisConstant.PREFIX_RULE + SERVER.getType(); - Set keys = redisStreamUtil.keys(pattern); + Set keys = getRedisStreamUtil().keys(pattern); if (CollUtil.isEmpty(keys)) return; // 时间间隔为每分钟 @@ -71,27 +65,28 @@ public class ServerJob implements Job { String operator = null; for (String ruleKey : keys) { try { - AlarmRule alarmRule = (AlarmRule) redisStreamUtil.get(ruleKey); + AlarmRule alarmRule = (AlarmRule) getRedisStreamUtil().get(ruleKey); // 如果报警规则为空,或者在沉默周期内,跳过当前规则 operator = alarmRule.getOperator(); String ruleId = alarmRule.getId(); String itemId = alarmRule.getItemId(); String silenceKey = prefixSilence + ruleId; - boolean hasKey = redisStreamUtil.hasKey(silenceKey); + boolean hasKey = getRedisStreamUtil().hasKey(silenceKey); boolean blank1 = StrUtil.isBlank(operator); boolean blank2 = StrUtil.isBlank(itemId); if (blank1 || blank2 || hasKey) continue; // 向运管查询监控项数据 - Result result = monitorSystem.itemBack(itemId, 0, start, end); + Result result = getMonitorSystem().itemBack(itemId, 0, start, end); Double current = result.getResult().getNow(); // 解析预警规则,判断是否需要报警 ObjectMapper mapper = new ObjectMapper(); Rule rule = mapper.readValue(operator, Rule.class); - rule.setCurrent(current); - boolean needWarn = parse(rule); + String op = rule.getOperator(); + Number threshold = rule.getThreshold(); + boolean needWarn = NumUtil.compare(current, threshold, op); if (needWarn){ // 记录报警日志 AlarmLog alarmLog = new AlarmLog(); @@ -99,11 +94,10 @@ public class ServerJob implements Job { alarmLog.setOperator(operator); alarmLog.setAlarmValue(StrUtil.toString(current)); String ruleName = alarmRule.getName(); - Double threshold = rule.getThreshold(); String message = "您设定的预警规则:"+ruleName+"," + "预警信息为:"+ operator + ",当前值为:" + current; alarmLog.setAlarmInfo(message); - alarmClient.create(alarmLog); + getAlarmClient().create(alarmLog); // 规则触发报警后,设置该规则的沉默周期(如果有) // 沉默周期失效之前,该规则不会再次被触发 @@ -113,63 +107,14 @@ public class ServerJob implements Job { // 发送报警信息 String groupId = alarmRule.getContactId(); String notific = alarmRule.getNotification(); - sendMessage.send(message, groupId, notific); + getSendMessage().send(message, groupId, notific); } } catch (JsonProcessingException e) { - log.error("预警规则{}解析失败!", operator); - e.printStackTrace(); + log.error("Server预警规则:{}解析失败,失败原因:{}!", operator, e.getMessage()); }catch (RuntimeException e){ - e.printStackTrace(); + log.error("Server监控异常:{}",e.getMessage()); } } destroy(); } - - private boolean parse(Rule rule){ - Double current = rule.getCurrent(); - String op = rule.getOperator(); - Double threshold = rule.getThreshold(); - - boolean cNull = ObjectUtil.isNull(current); - boolean tNull = ObjectUtil.isNull(threshold); - if (cNull || tNull) return false; - - double currentV = current; - double thresholdV = threshold; - if (EQ.getOp().equals(op)){ - return currentV == thresholdV; - } else if (GT.getOp().equals(op)) { - return currentV > thresholdV; - } else if (GE.getOp().equals(op)) { - return currentV >= thresholdV; - } else if (LT.getOp().equals(op)) { - return currentV < thresholdV; - } else if (LE.getOp().equals(op)) { - return currentV <= thresholdV; - }else { - return false; - } - } - - /* - * 规则首次触发报警后,设置该规则的沉默周期(如果有) - */ - private void ruleSilence(String silenceKey ,Long silenceCycle) { - if (ObjectUtil.isNotNull(silenceCycle)) - redisStreamUtil.set(silenceKey, silenceCycle, silenceCycle); - } - - private void init(){ - // start:生成临时Token到线程中 - UserTokenContext.setToken(getTempToken()); - sendMessage = SpringContextUtils.getBean(SendMessage.class); - monitorSystem = SpringContextUtils.getBean(MonitorSystem.class); - redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class); - alarmClient = SpringContextUtils.getBean(AbnormalAlarmClient.class); - } - - private void destroy(){ - // end:删除临时Token - UserTokenContext.remove(); - } } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/test/JobController.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/test/JobController.java deleted file mode 100644 index 3b1b6ffe..00000000 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/test/JobController.java +++ /dev/null @@ -1,178 +0,0 @@ -package org.jeecg.modules.test; - -import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.ObjectUtil; -import cn.hutool.core.util.StrUtil; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.jeecg.common.api.vo.Result; -import org.jeecg.common.config.mqtoken.UserTokenContext; -import org.jeecg.common.constant.DateConstant; -import org.jeecg.common.constant.RedisConstant; -import org.jeecg.common.util.RedisStreamUtil; -import org.jeecg.common.util.SpringContextUtils; -import org.jeecg.modules.base.entity.Rule; -import org.jeecg.modules.base.entity.monitor.ItemHistory; -import org.jeecg.modules.base.entity.postgre.AlarmLog; -import org.jeecg.modules.base.entity.postgre.AlarmRule; -import org.jeecg.modules.feignclient.AbnormalAlarmClient; -import org.jeecg.modules.feignclient.MonitorSystem; -import org.jeecg.modules.message.SendMessage; -import org.quartz.JobExecutionContext; -import org.quartz.JobExecutionException; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; - -import java.time.LocalDateTime; -import java.time.format.DateTimeFormatter; -import java.util.Set; - -import static org.jeecg.common.util.TokenUtils.getTempToken; -import static org.jeecg.modules.base.enums.Op.*; - -@Slf4j -@RestController -@RequestMapping("/sys/job") -public class JobController { - - private SendMessage sendMessage; - private MonitorSystem monitorSystem; - private RedisStreamUtil redisStreamUtil; - private AbnormalAlarmClient alarmClient; - - /** - * 根据host定时查询服务器信息 - * 并向消息队列中推送信息 - * - */ - @GetMapping("start") - public void startJob(){ - init(); - - // 查询所有报警规则,根据报警规则查询监控项数据 - String pattern = RedisConstant.PREFIX_RULE; - Set keys = redisStreamUtil.keys(pattern); - if (CollUtil.isEmpty(keys)) { - return; - } - - // 时间间隔为每分钟 - LocalDateTime now = LocalDateTime.now() - .withSecond(0) - .withNano(0); - LocalDateTime beforeMin = now.minusMinutes(1); - DateTimeFormatter formatter = DateTimeFormatter - .ofPattern(DateConstant.DATE_TIME); - String start = beforeMin.format(formatter); - String end = now.format(formatter); - - String prefixSilence = RedisConstant.PREFIX_SILENCE; - String operator = null; - for (String ruleKey : keys) { - try { - if (StrUtil.equals(RedisConstant.ANALYSIS_RULE, ruleKey)) - continue; - AlarmRule alarmRule = (AlarmRule) redisStreamUtil.get(ruleKey); - // 如果报警规则为空,或者在沉默周期内,跳过当前规则 - operator = alarmRule.getOperator(); - String ruleId = alarmRule.getId(); - String itemId = alarmRule.getItemId(); - String silenceKey = prefixSilence + ruleId; - boolean hasKey = redisStreamUtil.hasKey(silenceKey); - boolean blank1 = StrUtil.isBlank(operator); - boolean blank2 = StrUtil.isBlank(itemId); - - if (blank1 || blank2 || hasKey) continue; - - // 向运管查询监控项数据 - Result result = monitorSystem.itemBack(itemId, 0, start, end); - Double current = result.getResult().getNow(); - log.error("当前CPU使用率为:{}", current); - - // 解析预警规则,判断是否需要报警 - ObjectMapper mapper = new ObjectMapper(); - Rule rule = mapper.readValue(operator, Rule.class); - rule.setCurrent(current); - boolean needWarn = parse(rule); - if (needWarn){ - // 记录报警日志 - AlarmLog alarmLog = new AlarmLog(); - alarmLog.setRuleId(ruleId); - alarmLog.setAlarmValue(StrUtil.toString(current)); - String ruleName = alarmRule.getName(); - Double threshold = rule.getThreshold(); - String message = "您设定的预警规则:"+ruleName+"," + - "预警信息为:"+ operator + ",当前值为:" + current; - alarmLog.setAlarmInfo(message); - alarmClient.create(alarmLog); - - // 规则触发报警后,设置该规则的沉默周期(如果有) - // 沉默周期失效之前,该规则不会再次被触发 - Long silenceCycle = alarmRule.getSilenceCycle(); - ruleSilence(silenceKey, silenceCycle); - - // 发送报警信息 - String groupId = alarmRule.getContactId(); - String notific = alarmRule.getNotification(); - sendMessage.send(message,groupId,notific); - } - } catch (JsonProcessingException e) { - log.error("预警规则{}解析失败!", operator); - e.printStackTrace(); - }catch (RuntimeException e){ - e.printStackTrace(); - } - } - destroy(); - } - - private boolean parse(Rule rule){ - Double current = rule.getCurrent(); - String op = rule.getOperator(); - Double threshold = rule.getThreshold(); - - boolean cNull = ObjectUtil.isNull(current); - boolean tNull = ObjectUtil.isNull(threshold); - if (cNull || tNull) return false; - - double currentV = current; - double thresholdV = threshold; - if (EQ.getOp().equals(op)){ - return currentV == thresholdV; - } else if (GT.getOp().equals(op)) { - return currentV > thresholdV; - } else if (GE.getOp().equals(op)) { - return currentV >= thresholdV; - } else if (LT.getOp().equals(op)) { - return currentV < thresholdV; - } else if (LE.getOp().equals(op)) { - return currentV <= thresholdV; - }else { - return false; - } - } - - /* - * 规则首次触发报警后,设置该规则的沉默周期(如果有) - */ - private void ruleSilence(String silenceKey ,Long silenceCycle) { - if (ObjectUtil.isNotNull(silenceCycle)) - redisStreamUtil.set(silenceKey, silenceCycle, silenceCycle); - } - - private void init(){ - // start:生成临时Token到线程中 - UserTokenContext.setToken(getTempToken()); - sendMessage = SpringContextUtils.getBean(SendMessage.class); - monitorSystem = SpringContextUtils.getBean(MonitorSystem.class); - redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class); - alarmClient = SpringContextUtils.getBean(AbnormalAlarmClient.class); - } - - private void destroy(){ - // end:删除临时Token - UserTokenContext.remove(); - } -} From 0de89da6f63050b745f6cb57335c2757feb3aafc Mon Sep 17 00:00:00 2001 From: nieziyan Date: Tue, 21 Nov 2023 19:15:29 +0800 Subject: [PATCH 2/2] =?UTF-8?q?fix=EF=BC=9Abug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/main/java/org/jeecg/common/util/NumUtil.java | 10 +++++----- .../main/java/org/jeecg/modules/base/entity/Rule.java | 2 +- .../java/org/jeecg/modules/quartz/job/DatabaseJob.java | 2 +- .../java/org/jeecg/modules/quartz/job/ServerJob.java | 2 +- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java index f2499df8..9c64c389 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/NumUtil.java @@ -82,14 +82,14 @@ public class NumUtil { .doubleValue()); } - public static boolean compare(Number current, Number threshold, String op){ + public static boolean compare(Number current, Double threshold, String op){ boolean cNull = ObjectUtil.isNull(current); boolean tNull = ObjectUtil.isNull(threshold); if (cNull || tNull) return false; - if (current instanceof Double && threshold instanceof Double){ + if (current instanceof Double){ double currentV = (double) current; - double thresholdV = (double) threshold; + double thresholdV = threshold; if (EQ.getOp().equals(op)){ return currentV == thresholdV; @@ -104,9 +104,9 @@ public class NumUtil { }else { return false; } - } else if (current instanceof Integer && threshold instanceof Double) { + } else if (current instanceof Integer) { int currentV = (int) current; - int thresholdV = (int) threshold; + int thresholdV = threshold.intValue(); if (EQ.getOp().equals(op)){ return currentV == thresholdV; diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java index 334f9c84..0e8a3da3 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/modules/base/entity/Rule.java @@ -12,7 +12,7 @@ public class Rule implements Serializable { private String operator; // 比较符 - private Number threshold; // 阈值 + private Double threshold; // 阈值 private String units; // 单位 } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java index 1c8ba2b6..10a9a009 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/DatabaseJob.java @@ -90,7 +90,7 @@ public class DatabaseJob extends Monitor implements Job{ ObjectMapper mapper = new ObjectMapper(); Rule rule = mapper.readValue(operator, Rule.class); String op = rule.getOperator(); - Number threshold = rule.getThreshold(); + Double threshold = rule.getThreshold(); boolean needWarn = NumUtil.compare(current, threshold, op); if (needWarn){ // 记录报警日志 diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java index 01b3dc3a..83b9cb83 100644 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/ServerJob.java @@ -85,7 +85,7 @@ public class ServerJob extends Monitor implements Job { ObjectMapper mapper = new ObjectMapper(); Rule rule = mapper.readValue(operator, Rule.class); String op = rule.getOperator(); - Number threshold = rule.getThreshold(); + Double threshold = rule.getThreshold(); boolean needWarn = NumUtil.compare(current, threshold, op); if (needWarn){ // 记录报警日志