fix:Analysis log

This commit is contained in:
nieziyan 2023-12-08 14:08:14 +08:00
parent ad6fdb06ed
commit fc1f6b20f5
13 changed files with 22 additions and 519 deletions

View File

@ -189,19 +189,6 @@ public class RedisStreamUtil {
return stringRedisTemplate.opsForStream().add(record).getValue();
}
/**
* 向消息队列中添加Warn信息
*
* @param info
*/
public String pushAlarm(Info info){
String warnKey = RedisConstant.STREAM_ALARM;
ObjectRecord<String, Info> record = StreamRecords.newRecord()
.in(warnKey).ofObject(info);
// 向Redis Stream中推送消息
return putRecord(record);
}
public String pushAnalysis(Info info){
// 创建Stream和消费组 消费者注册后可以消费当前队列中的消息
creatGroup(analysisKey, analysisGroup);

View File

@ -15,15 +15,6 @@ import java.util.Map;
@NoArgsConstructor
@Accessors(chain = true)
public class Info implements Serializable{
// 资源类型
private SourceType sourceType;
// 监控项
private Integer itemId;
// 当前值
private double value;
// 台站id
private String stationId;

View File

@ -1,38 +0,0 @@
package org.jeecg.modules;
import com.netease.qiye.qiyeopenplatform.sdk.QiyeOpenPlatSDK;
import org.jeecg.modules.base.entity.postgre.SysDatabase;
import org.jeecg.modules.databaseStatus.ConnFetcher;
import org.jeecg.modules.qiyeEmail.base.InstanceSDK;
import org.jeecg.modules.qiyeEmail.base.RParam;
import org.jeecg.modules.qiyeEmail.service.MailBox;
import org.jeecg.modules.service.ISysDatabaseService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class DatabaseStatusFetcher {
@Autowired
private ISysDatabaseService databaseService;
public void start(){
List<SysDatabase> databases = databaseService.list();
// 采集数据库连接数
Thread connFetcher = new Thread(new ConnFetcher(databases));
connFetcher.setName("Thread-connFetcher");
connFetcher.start();
// ...
}
public static void main(String[] args) {
QiyeOpenPlatSDK platSDK = InstanceSDK.getInstance();
RParam rParam = new RParam();
rParam.setAccountName("cnndc.rn.ng");
rParam.setDomain("ndc.org.cn");
Integer unreadMsg = MailBox.unreadMsg(platSDK, rParam);
System.out.println(unreadMsg);
}
}

View File

@ -6,8 +6,8 @@ import io.swagger.annotations.ApiOperation;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.Prompt;
import org.jeecg.common.system.base.controller.JeecgController;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog;
import org.jeecg.modules.base.bizVo.AnalysisLogVo;
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog;
import org.jeecg.modules.service.IAlarmAnalysisLogService;
import org.springframework.web.bind.annotation.*;

View File

@ -1,110 +0,0 @@
package org.jeecg.modules.databaseStatus;
import cn.hutool.core.collection.ArrayIter;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.collection.ListUtil;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.DBSQL;
import org.jeecg.common.constant.enums.DbType;
import org.jeecg.common.util.JDBCUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.DatabaseStatusConn;
import org.jeecg.modules.base.entity.postgre.SysDatabase;
import org.jeecg.modules.service.IDatabaseStatusConnService;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.jeecg.common.constant.enums.DbType.typeOf;
/*
* 收集数据源的连接数信息 入库
* */
@Slf4j
public class ConnFetcher implements Runnable{
private List<SysDatabase> databases;
private IDatabaseStatusConnService connService;
public ConnFetcher(List<SysDatabase> databases) {
this.databases = databases;
connService = SpringContextUtils.getBean(IDatabaseStatusConnService.class);
}
@Override
public void run() {
if (CollUtil.isEmpty(databases)) return;
List<DatabaseStatusConn> statusList = new ArrayList<>();
while (true){
Iterator<SysDatabase> iterator = CollUtil
.addAll(new ArrayList<>() ,databases).iterator();
LocalDateTime now = LocalDateTime.now();
long start = System.currentTimeMillis();
while (iterator.hasNext()) {
try {
SysDatabase database= iterator.next();
String databaseId = database.getId();
String dbType = database.getDbType();
String dbUrl = database.getDbUrl();
String dbDriver = database.getDbDriver();
String dbUsername = database.getDbUsername();
String dbPassword = database.getDbPassword();
long getConn1 = System.currentTimeMillis();
JdbcTemplate template = JDBCUtil.template(dbUrl, dbDriver, dbUsername, dbPassword);
long getConn2 = System.currentTimeMillis();
System.out.println("耗时: "+ (getConn2 - getConn1) + ", 连接[" + dbUrl + "]");
if (ObjectUtil.isNull(template)) {
iterator.remove();
continue;
}
Integer conn = null;
// 根据数据库类型选择执行哪种数据库的状态查询SQL
DbType dbTypeE = typeOf(dbType);
if (ObjectUtil.isNotNull(dbTypeE)){
switch (dbTypeE){
case POSTGRESQL:
conn = template.queryForObject(DBSQL.DBSTATUS_CONN_PG, Integer.class);
break;
case ORACLE:
conn = template.queryForObject(DBSQL.DBSTATUS_CONN_OR, Integer.class);
break;
case MYSQL55:
// ...
break;
case MYSQL57:
// ...
break;
default:
break;
}
}
// 如果conn == null 说明数据源类型不确定
if (ObjectUtil.isNull(conn)) {
iterator.remove();
continue;
}
DatabaseStatusConn databaseStatus = new DatabaseStatusConn();
databaseStatus.setConnNum(conn);
databaseStatus.setDatabaseId(databaseId);
databaseStatus.setCollectTime(now);
// 数据达到一定数量,进行批量保存
statusList.add(databaseStatus);
if (statusList.size() == 100){
connService.saveBatch(statusList);
statusList = new ArrayList<>();
}
}catch (Exception e){
log.error("ConnFetcher采集数据库连接数异常: {}", e.getMessage());
}
}
long end = System.currentTimeMillis();
System.out.println("执行耗时: " + (end - start) + "ms");
}
}
}

View File

@ -1,107 +0,0 @@
package org.jeecg.modules.databaseStatus;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.DBSQL;
import org.jeecg.common.constant.enums.DbType;
import org.jeecg.common.util.JDBCUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.DatabaseStatusConn;
import org.jeecg.modules.base.entity.postgre.SysDatabase;
import org.jeecg.modules.service.IDatabaseStatusConnService;
import org.springframework.jdbc.core.JdbcTemplate;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import static org.jeecg.common.constant.enums.DbType.typeOf;
/*
* 收集数据源的连接数信息 入库
* */
@Slf4j
public class ConnFetcher1 implements Runnable{
private List<SysDatabase> databases;
private IDatabaseStatusConnService connService;
public ConnFetcher1(List<SysDatabase> databases) {
this.databases = databases;
connService = SpringContextUtils.getBean(IDatabaseStatusConnService.class);
}
@Override
public void run() {
if (CollUtil.isEmpty(databases)) return;
List<DatabaseStatusConn> statusList = new ArrayList<>();
while (true){
Iterator<SysDatabase> iterator = CollUtil
.addAll(new ArrayList<>() ,databases).iterator();
LocalDateTime now = LocalDateTime.now();
long start = System.currentTimeMillis();
while (iterator.hasNext()) {
try {
SysDatabase database= iterator.next();
String databaseId = database.getId();
String dbType = database.getDbType();
String dbUrl = database.getDbUrl();
String dbDriver = database.getDbDriver();
String dbUsername = database.getDbUsername();
String dbPassword = database.getDbPassword();
long getConn1 = System.currentTimeMillis();
JdbcTemplate template = JDBCUtil.template(dbUrl, dbDriver, dbUsername, dbPassword);
long getConn2 = System.currentTimeMillis();
System.out.println("耗时: "+ (getConn2 - getConn1) + ", 连接[" + dbUrl + "]");
if (ObjectUtil.isNull(template)) {
iterator.remove();
continue;
}
Integer conn = null;
// 根据数据库类型选择执行哪种数据库的状态查询SQL
DbType dbTypeE = typeOf(dbType);
if (ObjectUtil.isNotNull(dbTypeE)){
switch (dbTypeE){
case POSTGRESQL:
conn = template.queryForObject(DBSQL.DBSTATUS_CONN_PG, Integer.class);
break;
case ORACLE:
conn = template.queryForObject(DBSQL.DBSTATUS_CONN_OR, Integer.class);
break;
case MYSQL55:
// ...
break;
case MYSQL57:
// ...
break;
default:
break;
}
}
// 如果conn == null 说明数据源类型不确定
if (ObjectUtil.isNull(conn)) {
iterator.remove();
continue;
}
DatabaseStatusConn databaseStatus = new DatabaseStatusConn();
databaseStatus.setConnNum(conn);
databaseStatus.setDatabaseId(databaseId);
databaseStatus.setCollectTime(now);
// 数据达到一定数量,进行批量保存
statusList.add(databaseStatus);
if (statusList.size() == 100){
connService.saveBatch(statusList);
statusList = new ArrayList<>();
}
}catch (Exception e){
log.error("ConnFetcher采集数据库连接数异常: {}", e.getMessage());
}
}
long end = System.currentTimeMillis();
System.out.println("执行耗时: " + (end - start) + "ms");
}
}
}

View File

@ -1,104 +0,0 @@
package org.jeecg.modules.databaseStatus;
import cn.hutool.core.util.ObjectUtil;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.DBSQL;
import org.jeecg.common.constant.enums.DbType;
import org.jeecg.common.util.JDBCUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.DatabaseStatusConn;
import org.jeecg.modules.base.entity.postgre.SysDatabase;
import org.jeecg.modules.service.IDatabaseStatusConnService;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.RowMapper;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.jeecg.common.constant.enums.DbType.typeOf;
/*
* 收集数据源的状态信息 入库
* */
@Slf4j
public class StatusFetcher implements Runnable{
private final long sleepTime = 30 * 60 * 1000;
private final SysDatabase database;
private IDatabaseStatusConnService statusService;
public StatusFetcher(SysDatabase database) {
this.database = database;
statusService = SpringContextUtils.getBean(IDatabaseStatusConnService.class);
}
@Override
public void run() {
if (ObjectUtil.isNull(database)) return;
String databaseId = database.getId();
String dbType = database.getDbType();
String dbUrl = database.getDbUrl();
String dbDriver = database.getDbDriver();
String dbUsername = database.getDbUsername();
String dbPassword = database.getDbPassword();
List<DatabaseStatusConn> statusList = new ArrayList<>();
while (true){
try {
long start = System.currentTimeMillis();
JdbcTemplate template = JDBCUtil.template(dbUrl, dbDriver, dbUsername, dbPassword);
long end = System.currentTimeMillis();
/*
如果template == null 表示数据源连接失败,睡眠30min后重试
如果template != null 则表示连接成功,同时记录数据源连接响应时间
*/
if (ObjectUtil.isNull(template)){
TimeUnit.MILLISECONDS.sleep(sleepTime);
log.warn("数据源");
continue;
}
log.info("11111111111111111111111111111111111111111111111");
DatabaseStatusConn databaseStatus = null;
// 根据数据库类型选择执行哪种数据库的状态查询SQL
DbType dbTypeE = typeOf(dbType);
if (ObjectUtil.isNotNull(dbTypeE)){
switch (dbTypeE){
case POSTGRESQL:
// databaseStatus = template.queryForObject(DBSQL.DBSTATUS_PG, mapper);
break;
case ORACLE:
// databaseStatus = template.queryForObject(DBSQL.DBSTATUS_OR, mapper);
break;
case MYSQL55:
// ...
break;
case MYSQL57:
// ...
break;
default:
break;
}
}
// 如果databaseStatus == null 说明数据源类型不确定
if (ObjectUtil.isNull(databaseStatus)) return;
databaseStatus.setDatabaseId(databaseId);
// 设置数据的响应时间
BigDecimal decimal = new BigDecimal((end - start));
double respTime = decimal.divide(new BigDecimal(1000), 2, RoundingMode.HALF_UP).doubleValue();
//databaseStatus.setRespTime(respTime);
// 数据达到一定数量,进行批量保存
statusList.add(databaseStatus);
if (statusList.size() == 300){
statusService.saveBatch(statusList);
}
}catch (Exception e){
}
}
}
}

View File

@ -1,114 +0,0 @@
package org.jeecg.modules.redisStream;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.Info;
import org.jeecg.modules.base.entity.Rule;
import org.jeecg.modules.base.entity.postgre.AlarmRule;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.List;
import static org.jeecg.modules.base.enums.Op.*;
@Data
@Slf4j
@Component
@NoArgsConstructor
public class AlarmConsumer implements StreamListener<String, ObjectRecord<String, Info>> {
private String groupName;
private String consumerName;
private RedisStreamUtil redisStreamUtil;
public AlarmConsumer(String groupName, String consumerName) {
this.groupName = groupName;
this.consumerName = consumerName;
}
@Override
public void onMessage(ObjectRecord<String, Info> message) {
/* 避免消费抛出异常后,取消此消费者的消费资格 */
try {
String streamKey = message.getStream();
init();
/**
* 新消息在未进行ACK之前,状态也为pending,
* 直接消费所有异常未确认的消息和新消息
*/
List<ObjectRecord<String, Info>> pendings = redisStreamUtil
.read(streamKey, groupName, consumerName);
for (ObjectRecord<String, Info> record : pendings) {
RecordId recordId = record.getId();
Info info = record.getValue();
// 消费消息
consume(info);
// 消费完成后,手动确认消费消息[消息消费成功]
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
// 手动删除已消费消息
redisStreamUtil.del(streamKey, recordId.getValue());
}
}catch (JsonProcessingException e) {
log.error("Alarm Rule Resolution Failed!");
e.printStackTrace();
}catch (RuntimeException e){
e.printStackTrace();
}
}
/**
* 消费方法,根据校验情况发送预警信息
*
* @param info
*/
private void consume(Info info) throws JsonProcessingException {
String sourceType = info.getSourceType() == null
? "" : info.getSourceType().getType();
int itemId = info.getItemId();
String underline = SymbolConstant.UNDERLINE;
String prefix = RedisConstant.PREFIX_RULE;
String key = prefix + sourceType + underline + itemId;
Boolean hasKey = redisStreamUtil.hasKey(key);
if (!hasKey) return;
List<AlarmRule> alarmRules = (List<AlarmRule>) redisStreamUtil.get(key);
Double current = info.getValue();
for (AlarmRule alarmRule : alarmRules) {
String operator = alarmRule.getOperator();
if (StrUtil.isBlank(operator))continue;
ObjectMapper mapper = new ObjectMapper();
Rule rule = mapper.readValue(operator, Rule.class);
boolean needWarn = parse(rule);
// 发送预警信息
if (needWarn){
// sendMessage.send(rule);
}
}
}
/**
* 规则解析
*
* @return 是否需要发送预警信息
*/
private boolean parse(Rule rule){
return true;
}
private void init(){
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
}
}

View File

@ -77,14 +77,15 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
Info info = record.getValue();
// 消费消息
consume(info);
log.info("AnalysisConsumer消费了一条消息: {}", info);
// 消费完成后,手动确认消费消息[消息消费成功]
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
// 手动删除已消费消息
redisStreamUtil.del(streamKey, recordId.getValue());
// TODO 手动删除已消费消息
// redisStreamUtil.del(streamKey, recordId.getValue());
}
}catch (RuntimeException e){
e.printStackTrace();
log.error("AnalysisConsumer消费异常: {}", e.getMessage());
}finally {
destroy();
}
@ -195,8 +196,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
List<String> existNames = analysisResultService
.nuclideNames(betaOrGamma, dataSourceType, nuclideNames);
// 两个集合元素相减
List<String> differ = CollUtil.subtractToList(nuclideNames, existNames);
return differ;
return CollUtil.subtractToList(nuclideNames, existNames);
}
/**

View File

@ -49,7 +49,6 @@ public class RedisStreamConfig {
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Info>> alarmStream() {
/* 创建Stream和消费组 */
redisStreamUtil.creatGroup(alarmKey, alarmGroup);
redisStreamUtil.creatGroup(analysisKey, analysisGroup);
// 原子整数,多线程环境下对整数的原子性操作
AtomicInteger index = new AtomicInteger(1);
@ -118,9 +117,6 @@ public class RedisStreamConfig {
.build();
ConsumeA1 consumeA1 = new ConsumeA1(groupWarnA, consumerWarnA1);
streamMessageListenerContainer.register(readA1, consumeA1);*/
AlarmConsumer alarm = new AlarmConsumer(alarmGroup, alarmConsumer);
streamMessageListenerContainer.receive(Consumer.from(alarmGroup, alarmConsumer),
StreamOffset.create(alarmKey, ReadOffset.lastConsumed()), alarm);
AnalysisConsumer analysis = new AnalysisConsumer(analysisGroup,analysisConsumer);
streamMessageListenerContainer.receive(Consumer.from(analysisGroup, analysisConsumer),
StreamOffset.create(analysisKey, ReadOffset.lastConsumed()), analysis);

View File

@ -168,6 +168,8 @@ public class SysEmailServiceImpl extends ServiceImpl<SysEmailMapper, SysEmail> i
boolean success = updateById(sysEmail);
if (success) {
saveOrUpdateStatus(sysEmail);
// 更新邮箱发件服务器到Redis
// updateSender();
return Result.OK(Prompt.UPDATE_SUCC);
}
return Result.error(Prompt.UPDATE_ERR);
@ -213,13 +215,7 @@ public class SysEmailServiceImpl extends ServiceImpl<SysEmailMapper, SysEmail> i
if (redisUtil.hasKey(key))
return Result.OK(redisUtil.get(key));
LambdaQueryWrapper<SysEmail> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(SysEmail::getEmilType, SysMailType.SEND_EMAIL.getEmailType());
wrapper.eq(SysEmail::getEnabled, ENABLED.getValue());
List<SysEmail> emails = this.list(wrapper);
SysEmail sysEmail = emails.stream().findFirst().get();
if (ObjectUtil.isNotNull(sysEmail))
redisUtil.set(key, sysEmail);
SysEmail sysEmail = updateSender();
return Result.OK(sysEmail);
}
@ -296,4 +292,16 @@ public class SysEmailServiceImpl extends ServiceImpl<SysEmailMapper, SysEmail> i
boolean contains = CollUtil.contains(domains, domain);
return contains ? Qiye.IS.getValue() : Qiye.NOT.getValue();
}
private SysEmail updateSender(){
String key = RedisConstant.EMAIL_SENDER;
LambdaQueryWrapper<SysEmail> wrapper = new LambdaQueryWrapper<>();
wrapper.eq(SysEmail::getEmilType, SysMailType.SEND_EMAIL.getEmailType());
wrapper.eq(SysEmail::getEnabled, ENABLED.getValue());
List<SysEmail> emails = this.list(wrapper);
SysEmail sysEmail = emails.stream().findFirst().get();
if (ObjectUtil.isNotNull(sysEmail))
redisUtil.set(key, sysEmail);
return sysEmail;
}
}

View File

@ -50,7 +50,7 @@ public class SendMessage {
*/
public void send(String message,String groupId,String notific){
// 封装MessageDTO消息体
String title = "系统预警消息";
String title = "Nuclide System Warning Message";
MessageDTO messageDTO = new MessageDTO(title,message);
Map<String, String> contact = getContact(groupId);

View File

@ -3,7 +3,6 @@ package org.jeecg;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.util.oConvertUtils;
import org.jeecg.modules.DatabaseStatusFetcher;
import org.jeecg.modules.DatabaseStatusManager;
import org.jeecg.modules.EmailStatusManager;
import org.jeecg.modules.ServerStatusManager;
@ -33,8 +32,6 @@ public class JeecgAbnormalAlarmApplication extends SpringBootServletInitializer
private final DatabaseStatusManager databaseStatusManager;
private final DatabaseStatusFetcher databaseStatusFetcher;
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(JeecgAbnormalAlarmApplication.class);
@ -62,8 +59,5 @@ public class JeecgAbnormalAlarmApplication extends SpringBootServletInitializer
emailStatusManager.start();
// 启动监测服务器连接状态的线程
serverStatusManager.start();
// 启动采集数据库状态信息线程组
// databaseStatusFetcher.start();
}
}