From c5fbf4360a9e63c85454ccc4c3c4052dfaf9a939 Mon Sep 17 00:00:00 2001 From: duwenyuan <15600000461@163.com> Date: Sun, 28 Sep 2025 11:12:42 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E8=87=AA=E5=8A=A8=E5=A4=84?= =?UTF-8?q?=E7=90=86=E5=A4=84=E7=90=86=E6=A0=B7=E5=93=81=E7=AD=89=E7=BA=A7?= =?UTF-8?q?=EF=BC=8C=E8=AE=A1=E7=AE=97=E9=98=88=E5=80=BC=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mapper/SampleGardsThresholdMapper.java | 20 +++ .../redisStream/AutoRedisStreamConfig.java | 150 ++++++++++++++++++ .../modules/redisStream/GradingConsumer.java | 120 ++++++++++++++ .../modules/service/GardsAnalysesService.java | 2 +- .../service/IGardsThresholdService.java | 26 +++ .../impl/GardsThresholdServiceImpl.java | 65 ++++++++ 6 files changed, 382 insertions(+), 1 deletion(-) create mode 100644 jeecg-module-auto-process/src/main/java/org/jeecg/modules/mapper/SampleGardsThresholdMapper.java create mode 100644 jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/AutoRedisStreamConfig.java create mode 100644 jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/GradingConsumer.java create mode 100644 jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/IGardsThresholdService.java create mode 100644 jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsThresholdServiceImpl.java diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/mapper/SampleGardsThresholdMapper.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/mapper/SampleGardsThresholdMapper.java new file mode 100644 index 00000000..5985ed1b --- /dev/null +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/mapper/SampleGardsThresholdMapper.java @@ -0,0 +1,20 @@ +package org.jeecg.modules.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; +import org.jeecg.modules.base.entity.SampleIdentifiedNuclides; +import org.jeecg.modules.base.entity.rnauto.GardsSampleGrading; + +import java.util.List; + +/*** + * 样品的等级Mapper + */ +@Mapper +public interface SampleGardsThresholdMapper extends BaseMapper { + List selectByStationId(@Param("stationId") Long stationId); + + int updateGradingStatus(@Param("sampleId") Long sampleId, + @Param("status") Integer gradingStatus); +} diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/AutoRedisStreamConfig.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/AutoRedisStreamConfig.java new file mode 100644 index 00000000..a4a6f7a3 --- /dev/null +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/AutoRedisStreamConfig.java @@ -0,0 +1,150 @@ +package org.jeecg.modules.redisStream; + +import org.jeecg.common.constant.RedisConstant; +import org.jeecg.common.exception.StreamErrorHandler; +import org.jeecg.common.properties.MaximumPoolSizeProperties; +import org.jeecg.common.properties.ParameterProperties; +import org.jeecg.common.util.RadionuclideUtil; +import org.jeecg.common.util.RedisStreamUtil; +import org.jeecg.modules.base.dto.Info; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.Consumer; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.ReadOffset; +import org.springframework.data.redis.connection.stream.StreamOffset; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.xml.sax.SAXException; + +import javax.annotation.Resource; +import javax.xml.parsers.ParserConfigurationException; +import java.io.File; +import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +@Configuration +public class AutoRedisStreamConfig { + // 空闲线程的存活时间(s) + private final Integer keepAliveTime = 5; + + // 每次轮询取几条消息 + private final Integer maxMsg = 10; + + private final String streamKey = RedisConstant.AUTO_CAT; + private final String streamGroup = RedisConstant.GROUP_AutoCat; + private final String streamConsumer = RedisConstant.AUTO_CONSUMER; + + @Resource + private RedisConnectionFactory redisConnectionFactory; + + @Resource + private RedisStreamUtil redisStreamUtil; + + @Autowired + private ParameterProperties parameterProperties; + @Autowired + private GradingConsumer gradingConsumer; + + @Autowired + private MaximumPoolSizeProperties maximumPoolSizeProperties; + + @Bean(initMethod = "start", destroyMethod = "stop") + public StreamMessageListenerContainer> autoStream() { + //初始化 ctbt核素xml + try { + RadionuclideUtil.initialize(parameterProperties.getFilePath()+ File.separator + "ctbt_radionuclides.xml"); + } catch (ParserConfigurationException | IOException | SAXException e) { + + } + /* 创建Stream和消费组 */ + redisStreamUtil.creatGroup(streamKey, streamGroup); + // 原子整数,多线程环境下对整数的原子性操作 + AtomicInteger index = new AtomicInteger(1); + // 返回当前系统可用的处理器数量 + Integer processors = maximumPoolSizeProperties.getStation(); + /* + corePoolSize:线程池中核心线程的数量,即线程池中保持的最小线程数 + maximumPoolSize:线程池中允许的最大线程数 + keepAliveTime:非核心线程的空闲时间超过该值,就会被回收 + unit:keepAliveTime 参数的时间单位 + workQueue:用于保存等待执行的任务的阻塞队列 + threadFactory:用于创建新线程的工厂 + **/ + ThreadPoolExecutor executor = new ThreadPoolExecutor(processors, + processors, + keepAliveTime, + TimeUnit.SECONDS, + new LinkedBlockingDeque<>(), + r -> { + Thread thread = new Thread(r); + thread.setName("Stream-Thread-"+streamKey + index.getAndIncrement()); + thread.setDaemon(true); + return thread; + }); + /* 设置消息监听容器 */ + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> options = + StreamMessageListenerContainer.StreamMessageListenerContainerOptions + .builder() + // 每次轮询取几条消息 + .batchSize(maxMsg) + // Stream执行消息轮询的执行器 + .executor(executor) + // Stream中没有消息时,阻塞多长时间(轮询等待时间) + // 设置为0表示消费者将一直等待新消息到达 + // 不能大于spring.redis.timeout + .pollTimeout(Duration.ZERO) + //.serializer() + //.objectMapper(new ObjectHashMapper()) + .targetType(Info.class) + // 异常处理器 + .errorHandler(new StreamErrorHandler()) + .build(); + /* 创建消息监听容器 */ + StreamMessageListenerContainer> streamMessageListenerContainer = + StreamMessageListenerContainer.create(redisConnectionFactory, options); + + // 独立消费 + /*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey), + new ConsumeListener("独立消费", null, null));*/ + // 非独立消费 + /* + register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用 + 这种方式适用于长期运行的消息消费者,它会持续监听Redis Stream并处理新到达的消息 + + receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间 + 这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务 + **/ + /* 1.需要手动确认消费消息 */ + // 1.1 使用 register 方式 + StreamMessageListenerContainer.ConsumerStreamReadRequest readRequest = StreamMessageListenerContainer + .StreamReadRequest + .builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed())) + .consumer(Consumer.from(streamGroup, streamConsumer)) + // 手动确认消费了消息 默认为自动确认消息 + .autoAcknowledge(false) + // 如果消费者发生了异常 不禁止消费者消费 默认为禁止 + .cancelOnError(throwable -> false) + .build(); + //GradingConsumer analysis = new GradingConsumer(autoGroup, autoConsumer); + // 设置消费者的 group 和 consumer 名称 + gradingConsumer.setGroupName(streamGroup); + gradingConsumer.setConsumerName(streamConsumer); + streamMessageListenerContainer.register(readRequest, gradingConsumer); + // 1.2 使用 receive 方式 + /*autoConsumer analysis = new autoConsumer(autoGroup, autoConsumer); + streamMessageListenerContainer.receive(Consumer.from(autoGroup, autoConsumer), + StreamOffset.create(autoKey, ReadOffset.lastConsumed()), analysis);*/ + /* 2.自动确认消费消息 */ + // 2.1 使用 receive 方式 + /*autoConsumer analysis = new autoConsumer(autoGroup,autoConsumer); + streamMessageListenerContainer.receiveAutoAck(Consumer.from(autoGroup, autoConsumer), + StreamOffset.create(autoKey, ReadOffset.lastConsumed()), analysis);*/ + return streamMessageListenerContainer; + } +} diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/GradingConsumer.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/GradingConsumer.java new file mode 100644 index 00000000..ffe60ae9 --- /dev/null +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/redisStream/GradingConsumer.java @@ -0,0 +1,120 @@ +package org.jeecg.modules.redisStream; + +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.jeecg.common.config.mqtoken.UserTokenContext; +import org.jeecg.common.properties.ParameterProperties; +import org.jeecg.common.util.RedisStreamUtil; +import org.jeecg.common.util.RedisUtil; +import org.jeecg.common.util.SpringContextUtils; +import org.jeecg.modules.base.dto.Info; +import org.jeecg.modules.base.entity.rnauto.GardsThresholdResult; +import org.jeecg.modules.base.service.ISampleGradingService; +import org.jeecg.modules.base.service.impl.RnAutoThresholdServiceImpl; +import org.jeecg.modules.service.GardsAnalysesService; +import org.jeecg.modules.service.IGardsThresholdService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.stream.StreamListener; +import org.springframework.stereotype.Component; + +import java.util.List; + +import static org.jeecg.common.util.TokenUtils.getTempToken; + +@Data +@Slf4j +@Component +@NoArgsConstructor +public class GradingConsumer implements StreamListener> { + + /** RNAUTO数据类型 */ + protected static final String DATA_TYPE_RNAUTO = "RNAUTO"; + + private String groupName; + + private String consumerName; + + private RedisStreamUtil redisStreamUtil; + private RedisUtil redisUtil; + @Autowired + private IGardsThresholdService gardsThresholdService; + /// 样品等级服务 + @Autowired + private ISampleGradingService sampleGradingService; + + @Autowired + private RnAutoThresholdServiceImpl rnAutoService; + + public GradingConsumer(String groupName, String consumerName) { + this.groupName = groupName; + this.consumerName = consumerName; + } + + /*** + * 样品分析完成,更新样品等级、如果样品是全谱更新台站核素阈值 + * @param message + */ + @Override + public void onMessage(ObjectRecord message) { + try { + + String streamKey = message.getStream(); + init(); + + /* + * 新消息在未进行ACK之前,状态也为pending, + * 直接消费所有异常未确认的消息和新消息 + */ + List> pendings = redisStreamUtil + .read(streamKey, groupName, consumerName); + for (ObjectRecord record : pendings) { + RecordId recordId = record.getId(); + Info info = record.getValue(); + if (info.getStationId()!=null ) + { + // 消费消息 + consume(info); + } + log.info("样品等级分级消费了一条消息: {}", info); + // 消费完成后,手动确认消费消息[消息消费成功] + // 否则就是消费抛出异常,进入pending_ids[消息消费失败] + redisStreamUtil.ack(streamKey, groupName, recordId.getValue()); + // 手动删除已消费消息 + redisStreamUtil.del(streamKey, recordId.getValue()); + } + } catch (Exception e) { + log.error("样品等级分级消费异常: ", e); + } finally { + destroy(); + } + } + + /*** + * 计算样品等级及台站阈值 + * @param info + */ + private void consume(Info info) { + //如果是全谱,更新台站阈值 + if ("FULL".equals(info.getFullOrPrel())) { + //更新台站阈值 + rnAutoService.calculateSingleStationThreshold(info.getStationId()); + } + } + + + private void init() { + // start 生成临时Token到线程中 + UserTokenContext.setToken(getTempToken()); + redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class); + + redisUtil = SpringContextUtils.getBean(RedisUtil.class); + } + + private void destroy() { + // end 删除临时Token + UserTokenContext.remove(); + } +} diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/GardsAnalysesService.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/GardsAnalysesService.java index d70ad140..4c9d7101 100644 --- a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/GardsAnalysesService.java +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/GardsAnalysesService.java @@ -24,7 +24,7 @@ public interface GardsAnalysesService extends IService { * @param reportPath * @return */ - public GardsAnalyses create(Integer sampleId, GardsSampleData detSampleData, GardsSampleData gasSampleData, Date beginDate, Date endDate, String logPath, String reportPath); + public GardsAnalyses create(Integer sampleId, GardsSampleData detSampleData, GardsSampleData gasSampleData, Date beginDate, Date endDate, String logPath, String reportPath,Integer category); Integer getIdAnalysis(Integer sampleId); diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/IGardsThresholdService.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/IGardsThresholdService.java new file mode 100644 index 00000000..d3d2d427 --- /dev/null +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/IGardsThresholdService.java @@ -0,0 +1,26 @@ +package org.jeecg.modules.service; + +import com.baomidou.mybatisplus.extension.service.IService; +import org.jeecg.modules.base.entity.rnauto.GardsSampleGrading; +import org.jeecg.modules.base.entity.rnauto.GardsThresholdResult; + +import java.util.List; + +public interface IGardsThresholdService extends IService { + + + GardsSampleGrading getGardsSampleGradingById(String sampleId); + + void saveOrUpdateSampleThreshold(GardsSampleGrading sampleThreshold); + + /*** + * + * @param sampleId + * @param level + */ + void updateSampleGrading(String sampleId, int level); + + //List findThresholdResults(String stationId,Integer idAnalysis); + List findThresholdResults(String stationId); + +} diff --git a/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsThresholdServiceImpl.java b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsThresholdServiceImpl.java new file mode 100644 index 00000000..2bf12b59 --- /dev/null +++ b/jeecg-module-auto-process/src/main/java/org/jeecg/modules/service/impl/GardsThresholdServiceImpl.java @@ -0,0 +1,65 @@ +package org.jeecg.modules.service.impl; + +import com.baomidou.dynamic.datasource.annotation.DS; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import org.jeecg.modules.base.entity.rnauto.GardsThresholdResult; +import org.jeecg.modules.base.entity.rnauto.GardsSampleGrading; +import org.jeecg.modules.base.mapper.ThresholdRnAutoResultMapper; +import org.jeecg.modules.mapper.SampleGardsThresholdMapper; +import org.jeecg.modules.service.IGardsThresholdService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; + +@Service +@DS("ora") +public class GardsThresholdServiceImpl extends ServiceImpl implements IGardsThresholdService { + + @Autowired + private ThresholdRnAutoResultMapper thresholdResultMapper; + @Override + public List findThresholdResults(String stationId) { + // 获取未分级样品 ,Integer idAnalysis + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper(); + queryWrapper.eq(GardsThresholdResult::getStationId, stationId); + //queryWrapper.eq(GardsThresholdResult::getIdAnalysis, idAnalysis); + + return thresholdResultMapper.selectList(queryWrapper); + } + + @Override + public GardsSampleGrading getGardsSampleGradingById(String sampleId) { + return this.baseMapper.selectById(sampleId); + } + + @Override + public void saveOrUpdateSampleThreshold(GardsSampleGrading sampleThreshold) { + if (sampleThreshold.getSampleId() == null) { + this.baseMapper.insert(sampleThreshold); + } else { + this.baseMapper.updateById(sampleThreshold); + } + } + + /*** + * 更新样品级别 + * @param sampleId 样品ID + * @param level 级别 + */ + @Override + public void updateSampleGrading(String sampleId, int level) { + GardsSampleGrading sample = getGardsSampleGradingById(sampleId); + if (sample != null) { + sample.setGrading(String.valueOf(level)); + sample.setGradingStatus(1); // 标记为已处理 + saveOrUpdateSampleThreshold(sample); + } + } + + + + + +}