update:rename
This commit is contained in:
parent
59cc54cf24
commit
0149c0b823
|
@ -5,7 +5,7 @@ import com.fasterxml.jackson.annotation.PropertyAccessor;
|
|||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.jeecg.common.constant.RedisConstant;
|
||||
import org.jeecg.common.constant.SymbolConstant;
|
||||
import org.jeecg.modules.base.dto.RuleDto;
|
||||
import org.jeecg.modules.base.dto.Info;
|
||||
import org.jeecg.modules.base.entity.postgre.AlarmRule;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.connection.RedisConnection;
|
||||
|
@ -118,8 +118,8 @@ public class RedisStreamUtil {
|
|||
* @param groupName
|
||||
* @param consumerName
|
||||
*/
|
||||
public List<ObjectRecord<String, RuleDto>> read(String streamKey, String groupName, String consumerName){
|
||||
return stringRedisTemplate.opsForStream().read(RuleDto.class,
|
||||
public List<ObjectRecord<String, Info>> read(String streamKey, String groupName, String consumerName){
|
||||
return stringRedisTemplate.opsForStream().read(Info.class,
|
||||
Consumer.from(groupName, consumerName),
|
||||
StreamOffset.fromStart(streamKey));
|
||||
}
|
||||
|
@ -170,7 +170,7 @@ public class RedisStreamUtil {
|
|||
*
|
||||
* @param record
|
||||
*/
|
||||
private String putRecord(Record<String, RuleDto> record){
|
||||
private String putRecord(Record<String, Info> record){
|
||||
/**
|
||||
* 1.使用stringRedisTemplate
|
||||
* 2.序列化方式使用string
|
||||
|
@ -181,20 +181,20 @@ public class RedisStreamUtil {
|
|||
/**
|
||||
* 向消息队列中添加Warn信息
|
||||
*
|
||||
* @param ruleDto
|
||||
* @param info
|
||||
*/
|
||||
public String pushAlarm(RuleDto ruleDto){
|
||||
public String pushAlarm(Info info){
|
||||
String warnKey = RedisConstant.STREAM_ALARM;
|
||||
ObjectRecord<String, RuleDto> record = StreamRecords.newRecord()
|
||||
.in(warnKey).ofObject(ruleDto);
|
||||
ObjectRecord<String, Info> record = StreamRecords.newRecord()
|
||||
.in(warnKey).ofObject(info);
|
||||
// 向Redis Stream中推送消息
|
||||
return putRecord(record);
|
||||
}
|
||||
|
||||
public String pushAnalysis(RuleDto ruleDto){
|
||||
public String pushAnalysis(Info info){
|
||||
String analysisKey = RedisConstant.STREAM_ANALYSIS;
|
||||
ObjectRecord<String, RuleDto> record = StreamRecords.newRecord()
|
||||
.in(analysisKey).ofObject(ruleDto);
|
||||
ObjectRecord<String, Info> record = StreamRecords.newRecord()
|
||||
.in(analysisKey).ofObject(info);
|
||||
// 向Redis Stream中推送消息
|
||||
return putRecord(record);
|
||||
}
|
||||
|
|
|
@ -8,14 +8,13 @@ import org.jeecg.modules.base.enums.SourceType;
|
|||
|
||||
import java.io.Serializable;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.Date;
|
||||
import java.util.Map;
|
||||
|
||||
@Data
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
@Accessors(chain = true)
|
||||
public class RuleDto implements Serializable{
|
||||
public class Info implements Serializable{
|
||||
// 资源类型
|
||||
private SourceType sourceType;
|
||||
|
|
@ -11,7 +11,7 @@ import org.jeecg.common.constant.RedisConstant;
|
|||
import org.jeecg.common.constant.SymbolConstant;
|
||||
import org.jeecg.common.util.RedisStreamUtil;
|
||||
import org.jeecg.common.util.SpringContextUtils;
|
||||
import org.jeecg.modules.base.dto.RuleDto;
|
||||
import org.jeecg.modules.base.dto.Info;
|
||||
import org.jeecg.modules.base.entity.Rule;
|
||||
import org.jeecg.modules.base.entity.postgre.AlarmRule;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
|
@ -27,7 +27,7 @@ import static org.jeecg.modules.base.enums.Op.*;
|
|||
@Slf4j
|
||||
@Component
|
||||
@NoArgsConstructor
|
||||
public class AlarmConsumer implements StreamListener<String, ObjectRecord<String, RuleDto>> {
|
||||
public class AlarmConsumer implements StreamListener<String, ObjectRecord<String, Info>> {
|
||||
|
||||
private String groupName;
|
||||
|
||||
|
@ -41,7 +41,7 @@ public class AlarmConsumer implements StreamListener<String, ObjectRecord<String
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, RuleDto> message) {
|
||||
public void onMessage(ObjectRecord<String, Info> message) {
|
||||
/* 避免消费抛出异常后,取消此消费者的消费资格 */
|
||||
try {
|
||||
String streamKey = message.getStream();
|
||||
|
@ -50,13 +50,13 @@ public class AlarmConsumer implements StreamListener<String, ObjectRecord<String
|
|||
* 新消息在未进行ACK之前,状态也为pending,
|
||||
* 直接消费所有异常未确认的消息和新消息
|
||||
*/
|
||||
List<ObjectRecord<String, RuleDto>> pendings = redisStreamUtil
|
||||
List<ObjectRecord<String, Info>> pendings = redisStreamUtil
|
||||
.read(streamKey, groupName, consumerName);
|
||||
for (ObjectRecord<String, RuleDto> record : pendings) {
|
||||
for (ObjectRecord<String, Info> record : pendings) {
|
||||
RecordId recordId = record.getId();
|
||||
RuleDto ruleDto = record.getValue();
|
||||
Info info = record.getValue();
|
||||
// 消费消息
|
||||
consume(ruleDto);
|
||||
consume(info);
|
||||
// 消费完成后,手动确认消费消息[消息消费成功]
|
||||
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
|
||||
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
|
||||
|
@ -74,19 +74,19 @@ public class AlarmConsumer implements StreamListener<String, ObjectRecord<String
|
|||
/**
|
||||
* 消费方法,根据校验情况发送预警信息
|
||||
*
|
||||
* @param ruleDto
|
||||
* @param info
|
||||
*/
|
||||
private void consume(RuleDto ruleDto) throws JsonProcessingException {
|
||||
String sourceType = ruleDto.getSourceType() == null
|
||||
? "" : ruleDto.getSourceType().getType();
|
||||
int itemId = ruleDto.getItemId();
|
||||
private void consume(Info info) throws JsonProcessingException {
|
||||
String sourceType = info.getSourceType() == null
|
||||
? "" : info.getSourceType().getType();
|
||||
int itemId = info.getItemId();
|
||||
String underline = SymbolConstant.UNDERLINE;
|
||||
String prefix = RedisConstant.PREFIX_RULE;
|
||||
String key = prefix + sourceType + underline + itemId;
|
||||
Boolean hasKey = redisStreamUtil.hasKey(key);
|
||||
if (!hasKey) return;
|
||||
List<AlarmRule> alarmRules = (List<AlarmRule>) redisStreamUtil.get(key);
|
||||
Double current = ruleDto.getValue();
|
||||
Double current = info.getValue();
|
||||
for (AlarmRule alarmRule : alarmRules) {
|
||||
String operator = alarmRule.getOperator();
|
||||
if (StrUtil.isBlank(operator))continue;
|
||||
|
|
|
@ -16,7 +16,7 @@ import org.jeecg.common.constant.SymbolConstant;
|
|||
import org.jeecg.common.util.RedisStreamUtil;
|
||||
import org.jeecg.common.util.SpringContextUtils;
|
||||
import org.jeecg.modules.base.dto.NuclideInfo;
|
||||
import org.jeecg.modules.base.dto.RuleDto;
|
||||
import org.jeecg.modules.base.dto.Info;
|
||||
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisLog;
|
||||
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisNuclideAvg;
|
||||
import org.jeecg.modules.base.entity.postgre.AlarmAnalysisRule;
|
||||
|
@ -40,7 +40,7 @@ import java.util.stream.Collectors;
|
|||
@Slf4j
|
||||
@Component
|
||||
@NoArgsConstructor
|
||||
public class AnalysisConsumer implements StreamListener<String, ObjectRecord<String, RuleDto>> {
|
||||
public class AnalysisConsumer implements StreamListener<String, ObjectRecord<String, Info>> {
|
||||
|
||||
private String groupName;
|
||||
|
||||
|
@ -61,7 +61,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
|||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, RuleDto> message) {
|
||||
public void onMessage(ObjectRecord<String, Info> message) {
|
||||
/* 避免消费抛出异常后,取消此消费者的消费资格 */
|
||||
try {
|
||||
String streamKey = message.getStream();
|
||||
|
@ -70,13 +70,13 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
|||
* 新消息在未进行ACK之前,状态也为pending,
|
||||
* 直接消费所有异常未确认的消息和新消息
|
||||
*/
|
||||
List<ObjectRecord<String, RuleDto>> pendings = redisStreamUtil
|
||||
List<ObjectRecord<String, Info>> pendings = redisStreamUtil
|
||||
.read(streamKey, groupName, consumerName);
|
||||
for (ObjectRecord<String, RuleDto> record : pendings) {
|
||||
for (ObjectRecord<String, Info> record : pendings) {
|
||||
RecordId recordId = record.getId();
|
||||
RuleDto ruleDto = record.getValue();
|
||||
Info info = record.getValue();
|
||||
// 消费消息
|
||||
consume(ruleDto);
|
||||
consume(info);
|
||||
// 消费完成后,手动确认消费消息[消息消费成功]
|
||||
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
|
||||
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
|
||||
|
@ -90,12 +90,12 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
|||
}
|
||||
}
|
||||
|
||||
private void consume(RuleDto ruleDto){
|
||||
String stationId = ruleDto.getStationId();
|
||||
String sampleId = ruleDto.getSampleId();
|
||||
String fullOrPrel = ruleDto.getFullOrPrel();
|
||||
String datasource = ruleDto.getDatasource();
|
||||
Map<String, String> nuclides = ruleDto.getNuclides();
|
||||
private void consume(Info info){
|
||||
String stationId = info.getStationId();
|
||||
String sampleId = info.getSampleId();
|
||||
String fullOrPrel = info.getFullOrPrel();
|
||||
String datasource = info.getDatasource();
|
||||
Map<String, String> nuclides = info.getNuclides();
|
||||
if (StrUtil.isBlank(stationId)) return;
|
||||
if (StrUtil.isBlank(sampleId)) return;
|
||||
if (MapUtil.isEmpty(nuclides)) return;
|
||||
|
@ -128,22 +128,22 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
|||
.filter(entry -> cross.contains(entry.getKey()))
|
||||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
|
||||
// 开始对交集中的核素进行条件判断
|
||||
ruleDto.setRuleId(rule.getId());
|
||||
ruleDto.setGroupId(rule.getContactGroup());
|
||||
ruleDto.setConditions(rule.getConditions());
|
||||
judge(ruleDto,nuclidesCross);
|
||||
info.setRuleId(rule.getId());
|
||||
info.setGroupId(rule.getContactGroup());
|
||||
info.setConditions(rule.getConditions());
|
||||
judge(info,nuclidesCross);
|
||||
}
|
||||
}
|
||||
|
||||
private void judge(RuleDto ruleDto, Map<String,String> nuclidesCross){
|
||||
private void judge(Info info, Map<String,String> nuclidesCross){
|
||||
String ONE = "1";String TWO = "2";String THREE = "3";
|
||||
Set<String> nuclideNames = nuclidesCross.keySet();
|
||||
String alarmInfo = "";
|
||||
List<String> firstDetected;
|
||||
List<NuclideInfo> moreThanAvg = new ArrayList<>();
|
||||
String conditionStr = ruleDto.getConditions();
|
||||
String betaOrGamma = ruleDto.getBetaOrGamma();
|
||||
String datasource = ruleDto.getDatasource();
|
||||
String conditionStr = info.getConditions();
|
||||
String betaOrGamma = info.getBetaOrGamma();
|
||||
String datasource = info.getDatasource();
|
||||
List<String> conditions = ListUtil.toList(conditionStr.split(comma));
|
||||
for (String con : conditions) {
|
||||
if (ONE.equals(con)){ // 首次发现该元素
|
||||
|
@ -172,7 +172,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
|||
if (StrUtil.isNotBlank(alarmInfo)){
|
||||
// 保存报警日志
|
||||
AlarmAnalysisLog logInfo = new AlarmAnalysisLog();
|
||||
BeanUtil.copyProperties(ruleDto,logInfo);
|
||||
BeanUtil.copyProperties(info,logInfo);
|
||||
if (alarmInfo.startsWith(comma))
|
||||
alarmInfo = StrUtil.sub(alarmInfo, 1, alarmInfo.length());
|
||||
logInfo.setAlarmInfo(alarmInfo);
|
||||
|
@ -180,7 +180,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
|
|||
logInfo.setNuclideInfoList(moreThanAvg);
|
||||
logService.saveLog(logInfo);
|
||||
// 发送报警信息
|
||||
String groupId = ruleDto.getGroupId();
|
||||
String groupId = info.getGroupId();
|
||||
if (StrUtil.isNotBlank(groupId))
|
||||
systemClient.sendMessage(alarmInfo,groupId, ALL.getValue());
|
||||
}
|
||||
|
|
|
@ -5,7 +5,7 @@ import cn.hutool.core.util.StrUtil;
|
|||
import org.jeecg.common.constant.RedisConstant;
|
||||
import org.jeecg.common.exception.StreamErrorHandler;
|
||||
import org.jeecg.common.util.RedisStreamUtil;
|
||||
import org.jeecg.modules.base.dto.RuleDto;
|
||||
import org.jeecg.modules.base.dto.Info;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
|
@ -47,7 +47,7 @@ public class RedisStreamConfig {
|
|||
private RedisStreamUtil redisStreamUtil;
|
||||
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, RuleDto>> alarmStream() {
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, Info>> alarmStream() {
|
||||
/* 创建Stream和消费组 */
|
||||
creatGroup(alarmKey, alarmGroup);
|
||||
creatGroup(analysisKey, analysisGroup);
|
||||
|
@ -75,7 +75,7 @@ public class RedisStreamConfig {
|
|||
return thread;
|
||||
});
|
||||
/* 设置消息监听容器 */
|
||||
StreamMessageListenerContainerOptions<String, ObjectRecord<String, RuleDto>> options =
|
||||
StreamMessageListenerContainerOptions<String, ObjectRecord<String, Info>> options =
|
||||
StreamMessageListenerContainerOptions
|
||||
.builder()
|
||||
// 每次轮询取几条消息
|
||||
|
@ -88,12 +88,12 @@ public class RedisStreamConfig {
|
|||
.pollTimeout(Duration.ZERO)
|
||||
//.serializer()
|
||||
//.objectMapper(new ObjectHashMapper())
|
||||
.targetType(RuleDto.class)
|
||||
.targetType(Info.class)
|
||||
// 异常处理器
|
||||
.errorHandler(new StreamErrorHandler())
|
||||
.build();
|
||||
/* 创建消息监听容器 */
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, RuleDto>> streamMessageListenerContainer =
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, Info>> streamMessageListenerContainer =
|
||||
StreamMessageListenerContainer.create(redisConnectionFactory, options);
|
||||
|
||||
// 独立消费
|
||||
|
|
|
@ -208,6 +208,7 @@ public class SysServerServiceImpl extends ServiceImpl<SysServerMapper, SysServer
|
|||
SourceDto sourceDto = new SourceDto();
|
||||
sourceDto.setSourceId(sysServer.getId());
|
||||
sourceDto.setSourceName(sysServer.getName());
|
||||
sourceDto.setHostId(sysServer.getHostId());
|
||||
sourceDtos.add(sourceDto);
|
||||
}
|
||||
return sourceDtos;
|
||||
|
|
|
@ -3,7 +3,7 @@ package org.jeecg.modules.quartz.job;
|
|||
import cn.hutool.core.map.MapUtil;
|
||||
import org.jeecg.common.util.RedisStreamUtil;
|
||||
import org.jeecg.common.util.SpringContextUtils;
|
||||
import org.jeecg.modules.base.dto.RuleDto;
|
||||
import org.jeecg.modules.base.dto.Info;
|
||||
import org.quartz.*;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
|
@ -22,20 +22,20 @@ public class Test implements Job {
|
|||
@Override
|
||||
public void execute(JobExecutionContext context) throws JobExecutionException {
|
||||
init();
|
||||
RuleDto ruleDto = new RuleDto();
|
||||
ruleDto.setStationId("205");
|
||||
ruleDto.setSampleId("425496");
|
||||
ruleDto.setBetaOrGamma("Gamma");
|
||||
ruleDto.setFullOrPrel("FULL");
|
||||
ruleDto.setDatasource("1");
|
||||
ruleDto.setSampleName("CAX05_001-20230624_0220_Q_FULL_299.3.PHD");
|
||||
ruleDto.setCollectionDate(LocalDateTime.now());
|
||||
Info info = new Info();
|
||||
info.setStationId("205");
|
||||
info.setSampleId("425496");
|
||||
info.setBetaOrGamma("Gamma");
|
||||
info.setFullOrPrel("FULL");
|
||||
info.setDatasource("1");
|
||||
info.setSampleName("CAX05_001-20230624_0220_Q_FULL_299.3.PHD");
|
||||
info.setCollectionDate(LocalDateTime.now());
|
||||
Map<String, String> nuclides = MapUtil.newHashMap();
|
||||
nuclides.put("Be7","1000000");
|
||||
nuclides.put("sss","1000000");
|
||||
nuclides.put("Tl208","10");
|
||||
ruleDto.setNuclides(nuclides);
|
||||
redisStreamUtil.pushAnalysis(ruleDto);
|
||||
info.setNuclides(nuclides);
|
||||
redisStreamUtil.pushAnalysis(info);
|
||||
}
|
||||
|
||||
private void init(){
|
||||
|
|
Loading…
Reference in New Issue
Block a user