添加自动处理处理样品等级,计算阈值相关服务

This commit is contained in:
duwenyuan 2025-09-28 11:12:42 +08:00
parent b5ba8e29d3
commit c5fbf4360a
6 changed files with 382 additions and 1 deletions

View File

@ -0,0 +1,20 @@
package org.jeecg.modules.mapper;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.jeecg.modules.base.entity.SampleIdentifiedNuclides;
import org.jeecg.modules.base.entity.rnauto.GardsSampleGrading;
import java.util.List;
/***
* 样品的等级Mapper
*/
@Mapper
public interface SampleGardsThresholdMapper extends BaseMapper<GardsSampleGrading> {
List<GardsSampleGrading> selectByStationId(@Param("stationId") Long stationId);
int updateGradingStatus(@Param("sampleId") Long sampleId,
@Param("status") Integer gradingStatus);
}

View File

@ -0,0 +1,150 @@
package org.jeecg.modules.redisStream;
import org.jeecg.common.constant.RedisConstant;
import org.jeecg.common.exception.StreamErrorHandler;
import org.jeecg.common.properties.MaximumPoolSizeProperties;
import org.jeecg.common.properties.ParameterProperties;
import org.jeecg.common.util.RadionuclideUtil;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.modules.base.dto.Info;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
import org.xml.sax.SAXException;
import javax.annotation.Resource;
import javax.xml.parsers.ParserConfigurationException;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Configuration
public class AutoRedisStreamConfig {
// 空闲线程的存活时间(s)
private final Integer keepAliveTime = 5;
// 每次轮询取几条消息
private final Integer maxMsg = 10;
private final String streamKey = RedisConstant.AUTO_CAT;
private final String streamGroup = RedisConstant.GROUP_AutoCat;
private final String streamConsumer = RedisConstant.AUTO_CONSUMER;
@Resource
private RedisConnectionFactory redisConnectionFactory;
@Resource
private RedisStreamUtil redisStreamUtil;
@Autowired
private ParameterProperties parameterProperties;
@Autowired
private GradingConsumer gradingConsumer;
@Autowired
private MaximumPoolSizeProperties maximumPoolSizeProperties;
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Info>> autoStream() {
//初始化 ctbt核素xml
try {
RadionuclideUtil.initialize(parameterProperties.getFilePath()+ File.separator + "ctbt_radionuclides.xml");
} catch (ParserConfigurationException | IOException | SAXException e) {
}
/* 创建Stream和消费组 */
redisStreamUtil.creatGroup(streamKey, streamGroup);
// 原子整数,多线程环境下对整数的原子性操作
AtomicInteger index = new AtomicInteger(1);
// 返回当前系统可用的处理器数量
Integer processors = maximumPoolSizeProperties.getStation();
/*
corePoolSize线程池中核心线程的数量,即线程池中保持的最小线程数
maximumPoolSize线程池中允许的最大线程数
keepAliveTime非核心线程的空闲时间超过该值,就会被回收
unitkeepAliveTime 参数的时间单位
workQueue用于保存等待执行的任务的阻塞队列
threadFactory用于创建新线程的工厂
**/
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors,
processors,
keepAliveTime,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(),
r -> {
Thread thread = new Thread(r);
thread.setName("Stream-Thread-"+streamKey + index.getAndIncrement());
thread.setDaemon(true);
return thread;
});
/* 设置消息监听容器 */
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Info>> options =
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
.builder()
// 每次轮询取几条消息
.batchSize(maxMsg)
// Stream执行消息轮询的执行器
.executor(executor)
// Stream中没有消息时,阻塞多长时间(轮询等待时间)
// 设置为0表示消费者将一直等待新消息到达
// 不能大于spring.redis.timeout
.pollTimeout(Duration.ZERO)
//.serializer()
//.objectMapper(new ObjectHashMapper())
.targetType(Info.class)
// 异常处理器
.errorHandler(new StreamErrorHandler())
.build();
/* 创建消息监听容器 */
StreamMessageListenerContainer<String, ObjectRecord<String, Info>> streamMessageListenerContainer =
StreamMessageListenerContainer.create(redisConnectionFactory, options);
// 独立消费
/*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
new ConsumeListener("独立消费", null, null));*/
// 非独立消费
/*
register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用
这种方式适用于长期运行的消息消费者它会持续监听Redis Stream并处理新到达的消息
receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间
这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务
**/
/* 1.需要手动确认消费消息 */
// 1.1 使用 register 方式
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer
.StreamReadRequest
.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
.consumer(Consumer.from(streamGroup, streamConsumer))
// 手动确认消费了消息 默认为自动确认消息
.autoAcknowledge(false)
// 如果消费者发生了异常 不禁止消费者消费 默认为禁止
.cancelOnError(throwable -> false)
.build();
//GradingConsumer analysis = new GradingConsumer(autoGroup, autoConsumer);
// 设置消费者的 group consumer 名称
gradingConsumer.setGroupName(streamGroup);
gradingConsumer.setConsumerName(streamConsumer);
streamMessageListenerContainer.register(readRequest, gradingConsumer);
// 1.2 使用 receive 方式
/*autoConsumer analysis = new autoConsumer(autoGroup, autoConsumer);
streamMessageListenerContainer.receive(Consumer.from(autoGroup, autoConsumer),
StreamOffset.create(autoKey, ReadOffset.lastConsumed()), analysis);*/
/* 2.自动确认消费消息 */
// 2.1 使用 receive 方式
/*autoConsumer analysis = new autoConsumer(autoGroup,autoConsumer);
streamMessageListenerContainer.receiveAutoAck(Consumer.from(autoGroup, autoConsumer),
StreamOffset.create(autoKey, ReadOffset.lastConsumed()), analysis);*/
return streamMessageListenerContainer;
}
}

View File

@ -0,0 +1,120 @@
package org.jeecg.modules.redisStream;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.config.mqtoken.UserTokenContext;
import org.jeecg.common.properties.ParameterProperties;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.RedisUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.Info;
import org.jeecg.modules.base.entity.rnauto.GardsThresholdResult;
import org.jeecg.modules.base.service.ISampleGradingService;
import org.jeecg.modules.base.service.impl.RnAutoThresholdServiceImpl;
import org.jeecg.modules.service.GardsAnalysesService;
import org.jeecg.modules.service.IGardsThresholdService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.stereotype.Component;
import java.util.List;
import static org.jeecg.common.util.TokenUtils.getTempToken;
@Data
@Slf4j
@Component
@NoArgsConstructor
public class GradingConsumer implements StreamListener<String, ObjectRecord<String, Info>> {
/** RNAUTO数据类型 */
protected static final String DATA_TYPE_RNAUTO = "RNAUTO";
private String groupName;
private String consumerName;
private RedisStreamUtil redisStreamUtil;
private RedisUtil redisUtil;
@Autowired
private IGardsThresholdService gardsThresholdService;
/// 样品等级服务
@Autowired
private ISampleGradingService sampleGradingService;
@Autowired
private RnAutoThresholdServiceImpl rnAutoService;
public GradingConsumer(String groupName, String consumerName) {
this.groupName = groupName;
this.consumerName = consumerName;
}
/***
* 样品分析完成更新样品等级如果样品是全谱更新台站核素阈值
* @param message
*/
@Override
public void onMessage(ObjectRecord<String, Info> message) {
try {
String streamKey = message.getStream();
init();
/*
* 新消息在未进行ACK之前,状态也为pending,
* 直接消费所有异常未确认的消息和新消息
*/
List<ObjectRecord<String, Info>> pendings = redisStreamUtil
.read(streamKey, groupName, consumerName);
for (ObjectRecord<String, Info> record : pendings) {
RecordId recordId = record.getId();
Info info = record.getValue();
if (info.getStationId()!=null )
{
// 消费消息
consume(info);
}
log.info("样品等级分级消费了一条消息: {}", info);
// 消费完成后,手动确认消费消息[消息消费成功]
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
// 手动删除已消费消息
redisStreamUtil.del(streamKey, recordId.getValue());
}
} catch (Exception e) {
log.error("样品等级分级消费异常: ", e);
} finally {
destroy();
}
}
/***
* 计算样品等级及台站阈值
* @param info
*/
private void consume(Info info) {
//如果是全谱更新台站阈值
if ("FULL".equals(info.getFullOrPrel())) {
//更新台站阈值
rnAutoService.calculateSingleStationThreshold(info.getStationId());
}
}
private void init() {
// start 生成临时Token到线程中
UserTokenContext.setToken(getTempToken());
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
redisUtil = SpringContextUtils.getBean(RedisUtil.class);
}
private void destroy() {
// end 删除临时Token
UserTokenContext.remove();
}
}

View File

@ -24,7 +24,7 @@ public interface GardsAnalysesService extends IService<GardsAnalyses> {
* @param reportPath
* @return
*/
public GardsAnalyses create(Integer sampleId, GardsSampleData detSampleData, GardsSampleData gasSampleData, Date beginDate, Date endDate, String logPath, String reportPath);
public GardsAnalyses create(Integer sampleId, GardsSampleData detSampleData, GardsSampleData gasSampleData, Date beginDate, Date endDate, String logPath, String reportPath,Integer category);
Integer getIdAnalysis(Integer sampleId);

View File

@ -0,0 +1,26 @@
package org.jeecg.modules.service;
import com.baomidou.mybatisplus.extension.service.IService;
import org.jeecg.modules.base.entity.rnauto.GardsSampleGrading;
import org.jeecg.modules.base.entity.rnauto.GardsThresholdResult;
import java.util.List;
public interface IGardsThresholdService extends IService<GardsSampleGrading> {
GardsSampleGrading getGardsSampleGradingById(String sampleId);
void saveOrUpdateSampleThreshold(GardsSampleGrading sampleThreshold);
/***
*
* @param sampleId
* @param level
*/
void updateSampleGrading(String sampleId, int level);
//List<GardsThresholdResult> findThresholdResults(String stationId,Integer idAnalysis);
List<GardsThresholdResult> findThresholdResults(String stationId);
}

View File

@ -0,0 +1,65 @@
package org.jeecg.modules.service.impl;
import com.baomidou.dynamic.datasource.annotation.DS;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.jeecg.modules.base.entity.rnauto.GardsThresholdResult;
import org.jeecg.modules.base.entity.rnauto.GardsSampleGrading;
import org.jeecg.modules.base.mapper.ThresholdRnAutoResultMapper;
import org.jeecg.modules.mapper.SampleGardsThresholdMapper;
import org.jeecg.modules.service.IGardsThresholdService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
@DS("ora")
public class GardsThresholdServiceImpl extends ServiceImpl<SampleGardsThresholdMapper, GardsSampleGrading> implements IGardsThresholdService {
@Autowired
private ThresholdRnAutoResultMapper thresholdResultMapper;
@Override
public List<GardsThresholdResult> findThresholdResults(String stationId) {
// 获取未分级样品 ,Integer idAnalysis
LambdaQueryWrapper<GardsThresholdResult> queryWrapper = new LambdaQueryWrapper<GardsThresholdResult>();
queryWrapper.eq(GardsThresholdResult::getStationId, stationId);
//queryWrapper.eq(GardsThresholdResult::getIdAnalysis, idAnalysis);
return thresholdResultMapper.selectList(queryWrapper);
}
@Override
public GardsSampleGrading getGardsSampleGradingById(String sampleId) {
return this.baseMapper.selectById(sampleId);
}
@Override
public void saveOrUpdateSampleThreshold(GardsSampleGrading sampleThreshold) {
if (sampleThreshold.getSampleId() == null) {
this.baseMapper.insert(sampleThreshold);
} else {
this.baseMapper.updateById(sampleThreshold);
}
}
/***
* 更新样品级别
* @param sampleId 样品ID
* @param level 级别
*/
@Override
public void updateSampleGrading(String sampleId, int level) {
GardsSampleGrading sample = getGardsSampleGradingById(sampleId);
if (sample != null) {
sample.setGrading(String.valueOf(level));
sample.setGradingStatus(1); // 标记为已处理
saveOrUpdateSampleThreshold(sample);
}
}
}