@ -8,12 +8,14 @@ import cn.hutool.core.map.MapUtil;
import cn.hutool.core.util.NumberUtil ;
import cn.hutool.core.util.NumberUtil ;
import cn.hutool.core.util.ObjectUtil ;
import cn.hutool.core.util.ObjectUtil ;
import cn.hutool.core.util.StrUtil ;
import cn.hutool.core.util.StrUtil ;
import com.google.common.collect.Maps ;
import lombok.Data ;
import lombok.Data ;
import lombok.NoArgsConstructor ;
import lombok.NoArgsConstructor ;
import lombok.extern.slf4j.Slf4j ;
import lombok.extern.slf4j.Slf4j ;
import org.apache. poi.hwpf.sprm.SprmIterator ;
import org.apache. commons.compress.utils.Lists ;
import org.jeecg.common.api.dto.message.MessageDTO ;
import org.jeecg.common.api.dto.message.MessageDTO ;
import org.jeecg.common.config.mqtoken.UserTokenContext ;
import org.jeecg.common.config.mqtoken.UserTokenContext ;
import org.jeecg.common.constant.RedisConstant ;
import org.jeecg.common.constant.SymbolConstant ;
import org.jeecg.common.constant.SymbolConstant ;
import org.jeecg.common.constant.enums.SampleType ;
import org.jeecg.common.constant.enums.SampleType ;
import org.jeecg.common.util.* ;
import org.jeecg.common.util.* ;
@ -36,7 +38,7 @@ import org.springframework.stereotype.Component;
import static org.jeecg.common.constant.enums.MessageTypeEnum.* ;
import static org.jeecg.common.constant.enums.MessageTypeEnum.* ;
import static org.jeecg.common.util.TokenUtils.getTempToken ;
import static org.jeecg.common.util.TokenUtils.getTempToken ;
import static org.jeecg.modules.base.enums.Template.ANALYSIS_NUCLID E;
import static org.jeecg.modules.base.enums.Template.ANALYSIS_NUCLID E_NAM E;
import java.math.BigDecimal ;
import java.math.BigDecimal ;
import java.time.LocalDate ;
import java.time.LocalDate ;
@ -59,6 +61,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
private IAlarmAnalysisRuleService ruleService ;
private IAlarmAnalysisRuleService ruleService ;
private AnalysisResultService analysisResultService ;
private AnalysisResultService analysisResultService ;
private IAlarmAnalysisNuclideAvgService nuclideAvgService ;
private IAlarmAnalysisNuclideAvgService nuclideAvgService ;
private RedisUtil redisUtil ;
private final String COMMA = SymbolConstant . COMMA ;
private final String COMMA = SymbolConstant . COMMA ;
@ -103,10 +106,10 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
String sampleId = info . getSampleId ( ) ;
String sampleId = info . getSampleId ( ) ;
String fullOrPrel = info . getFullOrPrel ( ) ;
String fullOrPrel = info . getFullOrPrel ( ) ;
String datasource = info . getDatasource ( ) ;
String datasource = info . getDatasource ( ) ;
Map < String , String > nuclides = info . getNuclides ( ) ;
Map < String , String > infoNuclideMap = info . getNuclides ( ) ;
if ( StrUtil . isBlank ( stationId ) ) return ;
if ( StrUtil . isBlank ( stationId ) ) return ;
if ( StrUtil . isBlank ( sampleId ) ) return ;
if ( StrUtil . isBlank ( sampleId ) ) return ;
if ( MapUtil . isEmpty ( nuclides ) ) return ;
if ( MapUtil . isEmpty ( infoNuclideMap ) ) return ;
List < AlarmAnalysisRule > rules = ruleService . allAnalysisRule ( ) ;
List < AlarmAnalysisRule > rules = ruleService . allAnalysisRule ( ) ;
for ( AlarmAnalysisRule rule : rules ) {
for ( AlarmAnalysisRule rule : rules ) {
@ -129,17 +132,20 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
/ / 是否有当前规则关注的核素
/ / 是否有当前规则关注的核素
String nuclidesStr = rule . getNuclides ( ) ;
String nuclidesStr = rule . getNuclides ( ) ;
if ( StrUtil . isBlank ( nuclidesStr ) ) continue ;
if ( StrUtil . isBlank ( nuclidesStr ) ) continue ;
Set < String > names = nuclides . keySet ( ) ;
Set < String > names = infoNuclideMap . keySet ( ) ;
List < String > follow = ListUtil . toList ( nuclidesStr . split ( COMMA ) ) ;
List < String > follow = ListUtil . toList ( nuclidesStr . split ( COMMA ) ) ;
/ / 因数据库 Xe核素名称 M大小写不统一 , 先统一大小写再进行比较
/ / 因数据库 Xe核素名称 M大小写不统一 , 先统一大小写再进行比较
Collection < String > followLower = follow . stream ( ) . map ( String : : toLowerCase ) . collect ( Collectors . toList ( ) ) ;
Collection < String > follows = follow . stream ( ) . map ( f - > {
Collection < String > namesLower = names . stream ( ) . map ( String : : toLowerCase ) . collect ( Collectors . toList ( ) ) ;
if ( f . toLowerCase ( ) . contains ( " xe " ) ) {
return f . replace ( " M " , " m " ) ;
}
return f ;
} ) . collect ( Collectors . toList ( ) ) ;
/ / 推送过来的核素集合与所关注核素集合取交集
/ / 推送过来的核素集合与所关注核素集合取交集
Collection < String > cross = CollectionUtil . intersection ( namesLower , followLower ) ;
Collection < String > cross = CollectionUtil . intersection ( names , follows ) ;
Collection < String > crossNew = cross . stream ( ) . map ( f - > f . replace ( " x " , " X " ) ) . collect ( Collectors . toList ( ) ) ;
if ( CollUtil . isEmpty ( cross ) ) continue ;
if ( CollUtil . isEmpty ( cross ) ) continue ;
Map < String , String > nuclidesCross = nuclides . entrySet ( ) . stream ( )
Map < String , String > nuclidesCross = infoNuclideMap . entrySet ( ) . stream ( )
. filter ( entry - > cross New . contains ( entry . getKey ( ) ) )
. filter ( entry - > cross . contains ( entry . getKey ( ) ) )
. collect ( Collectors . toMap ( Map . Entry : : getKey , Map . Entry : : getValue ) ) ;
. collect ( Collectors . toMap ( Map . Entry : : getKey , Map . Entry : : getValue ) ) ;
/ / 开始对交集中的核素进行条件判断
/ / 开始对交集中的核素进行条件判断
info . setRuleId ( rule . getId ( ) ) ;
info . setRuleId ( rule . getId ( ) ) ;
@ -148,7 +154,12 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
if ( null ! = rule . getIdentifyNuclides ( ) ) {
if ( null ! = rule . getIdentifyNuclides ( ) ) {
String [ ] inSplit = rule . getIdentifyNuclides ( ) . split ( " , " ) ;
String [ ] inSplit = rule . getIdentifyNuclides ( ) . split ( " , " ) ;
if ( inSplit . length > = 1 ) {
if ( inSplit . length > = 1 ) {
info . setIdentifyNuclideSet ( Arrays . stream ( inSplit ) . map ( f - > f . replace ( " M " , " m " ) ) . collect ( Collectors . toSet ( ) ) ) ;
info . setIdentifyNuclideSet ( Arrays . stream ( inSplit ) . map ( f - > {
if ( f . toLowerCase ( ) . contains ( " xe " ) ) {
return f . replace ( " M " , " m " ) ;
}
return f ;
} ) . collect ( Collectors . toSet ( ) ) ) ;
}
}
}
}
judge ( info , nuclidesCross ) ;
judge ( info , nuclidesCross ) ;
@ -161,6 +172,8 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
String betaOrGamma = info . getBetaOrGamma ( ) ;
String betaOrGamma = info . getBetaOrGamma ( ) ;
String datasource = info . getDatasource ( ) ;
String datasource = info . getDatasource ( ) ;
String stationId = info . getStationId ( ) ;
String stationId = info . getStationId ( ) ;
String stationCode = " " ;
HashMap < String , Object > stationMap = ( HashMap < String , Object > ) redisUtil . get ( RedisConstant . STATION_CODE_MAP ) ;
String sampleId = info . getSampleId ( ) ;
String sampleId = info . getSampleId ( ) ;
String sampleName = info . getSampleName ( ) ;
String sampleName = info . getSampleName ( ) ;
Set < String > identifyNuclideSet = info . getIdentifyNuclideSet ( ) ;
Set < String > identifyNuclideSet = info . getIdentifyNuclideSet ( ) ;
@ -169,26 +182,27 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
info . getCollectionDate ( ) . toLocalDate ( ) ;
info . getCollectionDate ( ) . toLocalDate ( ) ;
List < String > conditions = ListUtil . toList ( conditionStr . split ( COMMA ) ) ;
List < String > conditions = ListUtil . toList ( conditionStr . split ( COMMA ) ) ;
List < String > firstDetected = new ArrayList < > ( ) ; / / 首次发现
List < NuclideInfo > firstDetected = new ArrayList < > ( ) ; / / 首次发现
List < NuclideInfo > moreThanAvg = new ArrayList < > ( ) ; / / 超浓度均值
List < NuclideInfo > moreThanAvg = new ArrayList < > ( ) ; / / 超浓度均值
List < String > meanWhile = new ArrayList < > ( ) ; / / 同时出现两种及以上核素
List < NuclideInfo > meanWhile = new ArrayList < > ( ) ; / / 同时出现两种及以上核素
List < String > identifyNuclideResult = new ArrayList < > ( ) ;
Map < String , NuclideInfo > nuclideInfoMap = Maps . newHashMap ( ) ;
List < NuclideInfo > identifyNuclideResult = new ArrayList < > ( ) ;
for ( String con : conditions ) {
for ( String con : conditions ) {
Condition condition = Condition . valueOf1 ( con ) ;
Condition condition = Condition . valueOf1 ( con ) ;
if ( ObjectUtil . isNull ( condition ) ) continue ;
if ( ObjectUtil . isNull ( condition ) ) continue ;
switch ( condition ) {
switch ( condition ) {
case FIRST_FOUND : / / 首次发现该元素
case FIRST_FOUND : / / 首次发现该元素
firstDetected = this . firstDetected ( betaOrGamma , datasource , stationId , sampleId , nuclide Name s) ;
firstDetected = this . firstDetected ( betaOrGamma , datasource , stationId , sampleId , nuclide sCros s) ;
break ;
break ;
case ABOVE_AVERAGE : / / 元素浓度高于均值
case ABOVE_AVERAGE : / / 元素浓度高于均值
moreThanAvg = this . moreThanAvg ( datasource , stationId , collDate , nuclidesCross ) ;
moreThanAvg = this . moreThanAvg ( datasource , stationId , collDate , nuclidesCross ) ;
break ;
break ;
case MEANWHILE : / / 同时出现两种及以上核素
case MEANWHILE : / / 同时出现两种及以上核素
meanWhile = this . meanWhile ( betaOrGamma , datasource , sampleId , nuclide Name s) ;
meanWhile = this . meanWhile ( betaOrGamma , datasource , sampleId , nuclide sCros s) ;
if ( meanWhile . size ( ) < 2 ) meanWhile = ListUtil . empty ( ) ;
if ( meanWhile . size ( ) < 2 ) meanWhile = ListUtil . empty ( ) ;
break ;
break ;
case IDENTIFY_NUCLIDES : / / 识别到某个核素
case IDENTIFY_NUCLIDES : / / 识别到某个核素
identifyNuclideResult = this . meanWhile ( betaOrGamma, datasource , sampleId , identifyNuclideSet ) ;
identifyNuclideResult = this . meanWhile ( info. getNuclides ( ) , datasource , identifyNuclideSet ) ;
break ;
break ;
default :
default :
break ;
break ;
@ -196,24 +210,54 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
}
}
/ / 构建预警信息
/ / 构建预警信息
DataTool dataTool = DataTool . getInstance ( ) ;
DataTool dataTool = DataTool . getInstance ( ) ;
if ( CollUtil . isNotEmpty ( firstDetected ) )
if ( CollUtil . isNotEmpty ( firstDetected ) ) {
dataTool . put ( " firstDetected " , CollUtil . join ( firstDetected , StrUtil . COMMA + StrUtil . SPACE ) ) ;
String above = firstDetected . stream ( )
. map ( NuclideInfo : : getNuclide )
. collect ( Collectors . joining ( StrUtil . COMMA + StrUtil . SPACE ) ) ;
dataTool . put ( " firstDetected " , above ) ;
for ( NuclideInfo nuclideInfo : firstDetected ) {
nuclideInfoMap . put ( nuclideInfo . getNuclide ( ) , nuclideInfo ) ;
}
}
if ( CollUtil . isNotEmpty ( meanWhile ) ) {
String above = meanWhile . stream ( )
. map ( NuclideInfo : : getNuclide )
. collect ( Collectors . joining ( StrUtil . COMMA + StrUtil . SPACE ) ) ;
dataTool . put ( " meanwhile " , above ) ;
for ( NuclideInfo nuclideInfo : meanWhile ) {
nuclideInfoMap . put ( nuclideInfo . getNuclide ( ) , nuclideInfo ) ;
}
}
if ( CollUtil . isNotEmpty ( identifyNuclideResult ) ) {
String above = identifyNuclideResult . stream ( )
. map ( NuclideInfo : : getNuclide )
. collect ( Collectors . joining ( StrUtil . COMMA + StrUtil . SPACE ) ) ;
dataTool . put ( " identifyNuclide " , above ) ;
for ( NuclideInfo nuclideInfo : identifyNuclideResult ) {
nuclideInfoMap . put ( nuclideInfo . getNuclide ( ) , nuclideInfo ) ;
}
}
if ( CollUtil . isNotEmpty ( moreThanAvg ) ) {
if ( CollUtil . isNotEmpty ( moreThanAvg ) ) {
String above = moreThanAvg . stream ( )
String above = moreThanAvg . stream ( )
. map ( item - > item . getNuclide ( ) + " ( " + item . getValue ( ) + " ) " + " > " + item . getThreshold ( ) )
. map ( item - > item . getNuclide ( ) + " ( " + item . getValue ( ) + " ) " + " > " + item . getThreshold ( ) )
. collect ( Collectors . joining ( StrUtil . COMMA + StrUtil . SPACE ) ) ;
. collect ( Collectors . joining ( StrUtil . COMMA + StrUtil . SPACE ) ) ;
dataTool . put ( " moreThanAvg " , above ) ;
dataTool . put ( " moreThanAvg " , above ) ;
for ( NuclideInfo nuclideInfo : moreThanAvg ) {
nuclideInfoMap . put ( nuclideInfo . getNuclide ( ) , nuclideInfo ) ;
}
}
}
if ( CollUtil . isNotEmpty ( meanWhile ) )
dataTool . put ( " meanwhile " , CollUtil . join ( meanWhile , StrUtil . COMMA + StrUtil . SPACE ) ) ;
if ( CollUtil . isNotEmpty ( identifyNuclideResult ) )
dataTool . put ( " identifyNuclide " , CollUtil . join ( identifyNuclideResult , StrUtil . COMMA + StrUtil . SPACE ) ) ;
/ / 如果报警数据为空 则不需要发送报警信息和生成报警日志
/ / 如果报警数据为空 则不需要发送报警信息和生成报警日志
if ( MapUtil . isEmpty ( dataTool . get ( ) ) ) return ;
if ( MapUtil . isEmpty ( dataTool . get ( ) ) ) return ;
/ / 产生报警信息的Sample信息
/ / 产生报警信息的Sample信息
if ( CollUtil . isNotEmpty ( stationMap ) )
{
stationCode = stationMap . get ( stationId ) . toString ( ) ;
}
dataTool . put ( " sampleId " , sampleId ) . put ( " sampleName " , sampleName ) ;
dataTool . put ( " sampleId " , sampleId ) . put ( " sampleName " , sampleName ) ;
/ / 构建预警信息实例 准备发送预警信息
DataTool titleData = DataTool . getInstance ( ) ;
MessageDTO messageDTO = TemplateUtil . parse1 ( ANALYSIS_NUCLIDE . getCode ( ) , dataTool . get ( ) ) ;
titleData . put ( " stationCode " , stationCode ) ;
/ / 构建预警信息实例 准备发送预警信息 , 20250327 - - 修改模版
MessageDTO messageDTO = TemplateUtil . parse1 ( ANALYSIS_NUCLIDE_NAME . getCode ( ) , dataTool . get ( ) , titleData . get ( ) ) ;
/ / 保存报警日志
/ / 保存报警日志
AlarmAnalysisLog logInfo = new AlarmAnalysisLog ( ) ;
AlarmAnalysisLog logInfo = new AlarmAnalysisLog ( ) ;
BeanUtil . copyProperties ( info , logInfo ) ;
BeanUtil . copyProperties ( info , logInfo ) ;
@ -221,8 +265,13 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
if ( ObjectUtil . isNotNull ( sampleType ) )
if ( ObjectUtil . isNotNull ( sampleType ) )
logInfo . setSampleType ( sampleType . getValue ( ) ) ;
logInfo . setSampleType ( sampleType . getValue ( ) ) ;
logInfo . setAlarmInfo ( messageDTO . getContent ( ) ) ;
logInfo . setAlarmInfo ( messageDTO . getContent ( ) ) ;
if ( CollUtil . isNotEmpty ( moreThanAvg ) )
logInfo . setNuclideInfoList ( moreThanAvg ) ;
/ / 报警信息中核素列表
if ( MapUtil . isNotEmpty ( nuclideInfoMap ) ) {
List < NuclideInfo > nuclideInfoList = Lists . newArrayList ( ) ;
nuclideInfoMap . forEach ( ( key , value ) - > nuclideInfoList . add ( nuclideInfoMap . get ( key ) ) ) ;
logInfo . setNuclideInfoList ( nuclideInfoList ) ;
}
logService . saveLog ( logInfo ) ;
logService . saveLog ( logInfo ) ;
/ / 发送报警信息
/ / 发送报警信息
String groupId = info . getGroupId ( ) ;
String groupId = info . getGroupId ( ) ;
@ -233,14 +282,32 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
/ * *
/ * *
* 首次发现该核素
* 首次发现该核素
* /
* /
private List < String > firstDetected ( String betaOrGamma , String dataSourceType ,
private List < NuclideInfo > firstDetected ( String betaOrGamma , String dataSourceType ,
String stationId , String sampleId , Set < String > nuclideNames ) {
String stationId , String sampleId , Map < String , String > nuclidesCross ) {
List < NuclideInfo > result = Lists . newArrayList ( ) ;
/ * 查询用户关注的核素是否存在 如果不存在则为首次发现该核素
/ * 查询用户关注的核素是否存在 如果不存在则为首次发现该核素
判断核素是否存在的条件 : 该核素的Conc值是否大于MDC值
判断核素是否存在的条件 : 该核素的Conc值是否大于MDC值
* /
* /
String detectorId = systemClient . getDetectorId ( sampleId ) ;
String detectorId = systemClient . getDetectorId ( sampleId ) ;
return analysisResultService . nuclideFirst ( betaOrGamma , dataSourceType , stationId ,
if ( StrUtil . isBlank ( detectorId ) ) {
throw new RuntimeException ( " detectorId is null or empty " ) ;
}
Set < String > nuclideNames = nuclidesCross . keySet ( ) ;
List < String > list = analysisResultService . nuclideFirst ( betaOrGamma , dataSourceType , stationId ,
detectorId , sampleId , nuclideNames ) ;
detectorId , sampleId , nuclideNames ) ;
if ( CollUtil . isNotEmpty ( list ) ) {
for ( Map . Entry < String , String > f : nuclidesCross . entrySet ( ) ) {
if ( list . contains ( f . getKey ( ) ) ) {
NuclideInfo nuclideInfo = new NuclideInfo ( ) ;
nuclideInfo . setNuclide ( f . getKey ( ) ) ;
nuclideInfo . setDatasource ( DSType . typeOf ( dataSourceType ) ) ;
/ / 对浓度值保留五位小数
nuclideInfo . setValue ( NumUtil . keepStr ( f . getValue ( ) , 5 ) ) ;
result . add ( nuclideInfo ) ;
}
}
}
return result ;
}
}
/ * *
/ * *
@ -279,12 +346,47 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
/ *
/ *
* 是否同时存在两种及以上核素
* 是否同时存在两种及以上核素
* * /
* * /
private List < String > meanWhile ( String betaOrGamma , String dataSourceType ,
private List < NuclideInfo > meanWhile ( String betaOrGamma , String dataSourceType ,
String sampleId , Set < String > nuclideNames ) {
String sampleId , Map < String , String > nuclidesCross ) {
List < NuclideInfo > result = Lists . newArrayList ( ) ;
/ * 查询用户关注的核素中 该谱中是否存在两种及以上核素
/ * 查询用户关注的核素中 该谱中是否存在两种及以上核素
判断核素是否存在的条件 : 该核素的Conc值是否大于MDC值
判断核素是否存在的条件 : 该核素的Conc值是否大于MDC值
* /
* /
return analysisResultService . nuclideExist ( betaOrGamma , dataSourceType , sampleId , nuclideNames ) ;
Set < String > nuclideNames = nuclidesCross . keySet ( ) ;
List < String > list = analysisResultService . nuclideExist ( betaOrGamma , dataSourceType , sampleId , nuclideNames ) ;
if ( CollUtil . isNotEmpty ( list ) ) {
for ( Map . Entry < String , String > f : nuclidesCross . entrySet ( ) ) {
if ( list . contains ( f . getKey ( ) ) ) {
NuclideInfo nuclideInfo = new NuclideInfo ( ) ;
nuclideInfo . setNuclide ( f . getKey ( ) ) ;
nuclideInfo . setDatasource ( DSType . typeOf ( dataSourceType ) ) ;
/ / 对浓度值保留五位小数
nuclideInfo . setValue ( NumUtil . keepStr ( f . getValue ( ) , 5 ) ) ;
result . add ( nuclideInfo ) ;
}
}
}
return result ;
}
/ *
* 是否同时存在两种及以上核素
* * /
private List < NuclideInfo > meanWhile ( Map < String , String > nuclideMap , String dataSourceType , Set < String > nuclideNames ) {
List < NuclideInfo > result = Lists . newArrayList ( ) ;
/ * 查询用户关注的核素中 该谱中是否存在两种及以上核素
判断核素是否存在的条件 : 该核素的Conc值是否大于MDC值
* /
for ( String name : nuclideNames ) {
if ( nuclideMap . containsKey ( name ) ) {
NuclideInfo nuclideInfo = new NuclideInfo ( ) ;
nuclideInfo . setNuclide ( name ) ;
nuclideInfo . setDatasource ( DSType . typeOf ( dataSourceType ) ) ;
/ / 对浓度值保留五位小数
nuclideInfo . setValue ( NumUtil . keepStr ( nuclideMap . get ( name ) , 5 ) ) ;
result . add ( nuclideInfo ) ;
}
}
return result ;
}
}
private void init ( ) {
private void init ( ) {
@ -296,6 +398,7 @@ public class AnalysisConsumer implements StreamListener<String, ObjectRecord<Str
ruleService = SpringContextUtils . getBean ( IAlarmAnalysisRuleService . class ) ;
ruleService = SpringContextUtils . getBean ( IAlarmAnalysisRuleService . class ) ;
analysisResultService = SpringContextUtils . getBean ( AnalysisResultService . class ) ;
analysisResultService = SpringContextUtils . getBean ( AnalysisResultService . class ) ;
nuclideAvgService = SpringContextUtils . getBean ( IAlarmAnalysisNuclideAvgService . class ) ;
nuclideAvgService = SpringContextUtils . getBean ( IAlarmAnalysisNuclideAvgService . class ) ;
redisUtil = SpringContextUtils . getBean ( RedisUtil . class ) ;
}
}
private void destroy ( ) {
private void destroy ( ) {