添加交互分析样品等级,计算阈值相关服务
This commit is contained in:
parent
c5fbf4360a
commit
3a59339773
|
@ -6,13 +6,16 @@ import org.jeecg.common.properties.*;
|
|||
import org.jeecg.common.util.NameStandUtil;
|
||||
import org.jeecg.common.util.RedisStreamUtil;
|
||||
import org.jeecg.common.util.RedisUtil;
|
||||
import org.jeecg.modules.base.service.ISampleGradingService;
|
||||
import org.jeecg.modules.datasource.OraDataSourceProperties;
|
||||
import org.jeecg.modules.service.*;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.core.io.ResourceLoader;
|
||||
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.transaction.TransactionDefinition;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
|
@ -29,7 +32,7 @@ public class SpectrumServiceQuotes {
|
|||
|
||||
private final ISpectrumBaseBlockService spectrumBaseBlockService;
|
||||
|
||||
private final Map<String,ISpectrumBlockService> spectrumBlockService;
|
||||
private final Map<String, ISpectrumBlockService> spectrumBlockService;
|
||||
|
||||
private final TaskProperties taskProperties;
|
||||
|
||||
|
@ -92,4 +95,11 @@ public class SpectrumServiceQuotes {
|
|||
|
||||
private final BatchesCounter batchesCounter;
|
||||
|
||||
/// 样品等级服务
|
||||
|
||||
private final ISampleGradingService sampleGradingService;
|
||||
|
||||
// 获取阈值信息
|
||||
private final IGardsThresholdService gardsThresholdService;
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
package org.jeecg.modules.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.jeecg.modules.base.entity.SampleIdentifiedNuclides;
|
||||
import org.jeecg.modules.base.entity.rnman.GardsSampleGrading;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface RnManGardsThresholdMapper extends BaseMapper<GardsSampleGrading>{
|
||||
|
||||
List<SampleIdentifiedNuclides> findIdentifiedNuclides(Integer sampleId);
|
||||
}
|
|
@ -0,0 +1,15 @@
|
|||
package org.jeecg.modules.mapper;
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
import org.apache.ibatis.annotations.Param;
|
||||
import org.jeecg.modules.base.entity.rnman.GardsThresholdResult;
|
||||
|
||||
import java.util.List;
|
||||
@Mapper
|
||||
public interface RnManThresholdResultMapper extends BaseMapper<GardsThresholdResult> {
|
||||
|
||||
int batchInsert(@Param("list") List<GardsThresholdResult> results);
|
||||
|
||||
int batchInsertRnMan(@Param("list") List<GardsThresholdResult> results);
|
||||
}
|
|
@ -0,0 +1,114 @@
|
|||
package org.jeecg.modules.redisStream;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.jeecg.common.config.mqtoken.UserTokenContext;
|
||||
import org.jeecg.common.util.RedisStreamUtil;
|
||||
import org.jeecg.common.util.RedisUtil;
|
||||
import org.jeecg.common.util.SpringContextUtils;
|
||||
import org.jeecg.modules.base.dto.Info;
|
||||
import org.jeecg.modules.base.entity.rnman.GardsThresholdResult;
|
||||
import org.jeecg.modules.base.service.ISampleGradingService;
|
||||
import org.jeecg.modules.base.service.impl.RnAutoThresholdServiceImpl;
|
||||
import org.jeecg.modules.base.service.impl.RnManThresholdServiceImpl;
|
||||
import org.jeecg.modules.service.impl.RnManGardsThresholdServiceImpl;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
||||
import org.springframework.data.redis.stream.StreamListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.jeecg.common.util.TokenUtils.getTempToken;
|
||||
|
||||
@Data
|
||||
@Slf4j
|
||||
@Component
|
||||
@NoArgsConstructor
|
||||
public class RnManGradingConsumer implements StreamListener<String, ObjectRecord<String, Info>> {
|
||||
|
||||
/**
|
||||
* RNMAN
|
||||
*/
|
||||
protected static final String DATA_TYPE_RNMAN = "RNMAN";
|
||||
private String groupName;
|
||||
|
||||
private String consumerName;
|
||||
|
||||
private RedisStreamUtil redisStreamUtil;
|
||||
private RedisUtil redisUtil;
|
||||
|
||||
@Autowired
|
||||
private RnManGardsThresholdServiceImpl gardsThresholdService;
|
||||
/// 样品等级服务
|
||||
@Autowired
|
||||
private ISampleGradingService sampleGradingService;
|
||||
|
||||
@Autowired
|
||||
private RnManThresholdServiceImpl rnManService;
|
||||
|
||||
public RnManGradingConsumer(String groupName, String consumerName) {
|
||||
this.groupName = groupName;
|
||||
this.consumerName = consumerName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ObjectRecord<String, Info> message) {
|
||||
try {
|
||||
|
||||
String streamKey = message.getStream();
|
||||
init();
|
||||
|
||||
/*
|
||||
* 新消息在未进行ACK之前,状态也为pending,
|
||||
* 直接消费所有异常未确认的消息和新消息
|
||||
*/
|
||||
List<ObjectRecord<String, Info>> pendings = redisStreamUtil
|
||||
.read(streamKey, groupName, consumerName);
|
||||
for (ObjectRecord<String, Info> record : pendings) {
|
||||
RecordId recordId = record.getId();
|
||||
Info info = record.getValue();
|
||||
// 消费消息
|
||||
consume(info);
|
||||
log.info("样品等级分级消费了一条消息: {}", info);
|
||||
// 消费完成后,手动确认消费消息[消息消费成功]
|
||||
// 否则就是消费抛出异常,进入pending_ids[消息消费失败]
|
||||
redisStreamUtil.ack(streamKey, groupName, recordId.getValue());
|
||||
// 手动删除已消费消息
|
||||
redisStreamUtil.del(streamKey, recordId.getValue());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("样品等级分级消费异常: ", e);
|
||||
} finally {
|
||||
destroy();
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* 计算样品等级及台站阈值
|
||||
* @param info
|
||||
*/
|
||||
private void consume(Info info) {
|
||||
if ("FULL".equals(info.getFullOrPrel())) {
|
||||
//更新台站阈值
|
||||
rnManService.calculateSingleStationThreshold(info.getStationId());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void init() {
|
||||
// start 生成临时Token到线程中
|
||||
UserTokenContext.setToken(getTempToken());
|
||||
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
|
||||
|
||||
redisUtil = SpringContextUtils.getBean(RedisUtil.class);
|
||||
}
|
||||
|
||||
private void destroy() {
|
||||
// end 删除临时Token
|
||||
UserTokenContext.remove();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,153 @@
|
|||
package org.jeecg.modules.redisStream;
|
||||
|
||||
import org.jeecg.common.constant.RedisConstant;
|
||||
import org.jeecg.common.exception.StreamErrorHandler;
|
||||
import org.jeecg.common.properties.MaximumPoolSizeProperties;
|
||||
import org.jeecg.common.properties.ParameterProperties;
|
||||
import org.jeecg.common.util.RadionuclideUtil;
|
||||
import org.jeecg.common.util.RedisStreamUtil;
|
||||
import org.jeecg.modules.base.dto.Info;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
import org.springframework.data.redis.connection.stream.ObjectRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.StreamOffset;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
import org.xml.sax.SAXException;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
import javax.xml.parsers.ParserConfigurationException;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
@Configuration
|
||||
public class RnManRedisStreamConfig {
|
||||
|
||||
// 空闲线程的存活时间(s)
|
||||
private final Integer keepAliveTime = 5;
|
||||
|
||||
// 每次轮询取几条消息
|
||||
private final Integer maxMsg = 10;
|
||||
|
||||
private final String streamKey = RedisConstant.CATEGORY;
|
||||
private final String streamGroup = RedisConstant.GROUP_Category;
|
||||
private final String streamConsumer = RedisConstant.CATEGORY_CONSUMER;
|
||||
|
||||
@Resource
|
||||
private RedisConnectionFactory redisConnectionFactory;
|
||||
|
||||
@Resource
|
||||
private RedisStreamUtil redisStreamUtil;
|
||||
|
||||
@Autowired
|
||||
private ParameterProperties parameterProperties;
|
||||
@Autowired
|
||||
private RnManGradingConsumer gradingConsumer;
|
||||
|
||||
@Autowired
|
||||
private MaximumPoolSizeProperties maximumPoolSizeProperties;
|
||||
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, Info>> autoStream() {
|
||||
//初始化 ctbt核素xml
|
||||
try {
|
||||
RadionuclideUtil.initialize(parameterProperties.getFilePath()+ File.separator + "ctbt_radionuclides.xml");
|
||||
} catch (ParserConfigurationException | IOException | SAXException e) {
|
||||
|
||||
}
|
||||
/* 创建Stream和消费组 */
|
||||
redisStreamUtil.creatGroup(streamKey, streamGroup);
|
||||
// 原子整数,多线程环境下对整数的原子性操作
|
||||
AtomicInteger index = new AtomicInteger(1);
|
||||
// 返回当前系统可用的处理器数量
|
||||
Integer processors = maximumPoolSizeProperties.getStation();
|
||||
/*
|
||||
corePoolSize:线程池中核心线程的数量,即线程池中保持的最小线程数
|
||||
maximumPoolSize:线程池中允许的最大线程数
|
||||
keepAliveTime:非核心线程的空闲时间超过该值,就会被回收
|
||||
unit:keepAliveTime 参数的时间单位
|
||||
workQueue:用于保存等待执行的任务的阻塞队列
|
||||
threadFactory:用于创建新线程的工厂
|
||||
**/
|
||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(processors,
|
||||
processors,
|
||||
keepAliveTime,
|
||||
TimeUnit.SECONDS,
|
||||
new LinkedBlockingDeque<>(),
|
||||
r -> {
|
||||
Thread thread = new Thread(r);
|
||||
thread.setName("Stream-Thread-"+streamKey + index.getAndIncrement());
|
||||
thread.setDaemon(true);
|
||||
return thread;
|
||||
});
|
||||
/* 设置消息监听容器 */
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, Info>> options =
|
||||
StreamMessageListenerContainer.StreamMessageListenerContainerOptions
|
||||
.builder()
|
||||
// 每次轮询取几条消息
|
||||
.batchSize(maxMsg)
|
||||
// Stream执行消息轮询的执行器
|
||||
.executor(executor)
|
||||
// Stream中没有消息时,阻塞多长时间(轮询等待时间)
|
||||
// 设置为0表示消费者将一直等待新消息到达
|
||||
// 不能大于spring.redis.timeout
|
||||
.pollTimeout(Duration.ZERO)
|
||||
//.serializer()
|
||||
//.objectMapper(new ObjectHashMapper())
|
||||
.targetType(Info.class)
|
||||
// 异常处理器
|
||||
.errorHandler(new StreamErrorHandler())
|
||||
.build();
|
||||
/* 创建消息监听容器 */
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, Info>> streamMessageListenerContainer =
|
||||
StreamMessageListenerContainer.create(redisConnectionFactory, options);
|
||||
|
||||
// 独立消费
|
||||
/*streamMessageListenerContainer.receive(StreamOffset.fromStart(streamKey),
|
||||
new ConsumeListener("独立消费", null, null));*/
|
||||
// 非独立消费
|
||||
/*
|
||||
register:用于注册一个消息监听器,该监听器将在每次有新的消息到达Redis Stream时被调用
|
||||
这种方式适用于长期运行的消息消费者,它会持续监听Redis Stream并处理新到达的消息
|
||||
|
||||
receive:用于主动从Redis Stream中获取消息并进行处理,你可以指定要获取的消息数量和超时时间
|
||||
这种方式适用于需要主动控制消息获取的场景,例如批量处理消息或定时任务
|
||||
**/
|
||||
/* 1.需要手动确认消费消息 */
|
||||
// 1.1 使用 register 方式
|
||||
StreamMessageListenerContainer.ConsumerStreamReadRequest<String> readRequest = StreamMessageListenerContainer
|
||||
.StreamReadRequest
|
||||
.builder(StreamOffset.create(streamKey, ReadOffset.lastConsumed()))
|
||||
.consumer(Consumer.from(streamGroup, streamConsumer))
|
||||
// 手动确认消费了消息 默认为自动确认消息
|
||||
.autoAcknowledge(false)
|
||||
// 如果消费者发生了异常 不禁止消费者消费 默认为禁止
|
||||
.cancelOnError(throwable -> false)
|
||||
.build();
|
||||
//GradingConsumer analysis = new GradingConsumer(autoGroup, autoConsumer);
|
||||
// 设置消费者的 group 和 consumer 名称
|
||||
gradingConsumer.setGroupName(streamGroup);
|
||||
gradingConsumer.setConsumerName(streamConsumer);
|
||||
streamMessageListenerContainer.register(readRequest, gradingConsumer);
|
||||
// 1.2 使用 receive 方式
|
||||
/*autoConsumer analysis = new autoConsumer(autoGroup, autoConsumer);
|
||||
streamMessageListenerContainer.receive(Consumer.from(autoGroup, autoConsumer),
|
||||
StreamOffset.create(autoKey, ReadOffset.lastConsumed()), analysis);*/
|
||||
/* 2.自动确认消费消息 */
|
||||
// 2.1 使用 receive 方式
|
||||
/*autoConsumer analysis = new autoConsumer(autoGroup,autoConsumer);
|
||||
streamMessageListenerContainer.receiveAutoAck(Consumer.from(autoGroup, autoConsumer),
|
||||
StreamOffset.create(autoKey, ReadOffset.lastConsumed()), analysis);*/
|
||||
return streamMessageListenerContainer;
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
package org.jeecg.modules.service;
|
||||
|
||||
import com.baomidou.mybatisplus.extension.service.IService;
|
||||
import org.jeecg.modules.base.entity.rnman.GardsSampleGrading;
|
||||
import org.jeecg.modules.base.entity.rnman.GardsThresholdResult;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public interface IRnManGardsThresholdService extends IService<GardsSampleGrading> {
|
||||
|
||||
GardsSampleGrading getGardsSampleGradingById(String sampleId);
|
||||
|
||||
void saveOrUpdateSampleThreshold( GardsSampleGrading sampleThreshold);
|
||||
|
||||
void updateSampleGrading(String sampleId, int level);
|
||||
|
||||
List<GardsThresholdResult> findThresholdResults(String stationId);
|
||||
}
|
|
@ -0,0 +1,64 @@
|
|||
package org.jeecg.modules.service.impl;
|
||||
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
|
||||
import org.jeecg.common.util.RadionuclideUtil;
|
||||
import org.jeecg.modules.base.entity.rnman.GardsSampleGrading;
|
||||
import org.jeecg.modules.base.entity.rnman.GardsThresholdResult;
|
||||
import org.jeecg.modules.mapper.RnManGardsThresholdMapper;
|
||||
import org.jeecg.modules.mapper.RnManThresholdResultMapper;
|
||||
import org.jeecg.modules.service.IRnManGardsThresholdService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Service
|
||||
public class RnManGardsThresholdServiceImpl extends ServiceImpl<RnManGardsThresholdMapper, GardsSampleGrading> implements IRnManGardsThresholdService {
|
||||
|
||||
@Autowired
|
||||
private RnManThresholdResultMapper thresholdResultMapper;
|
||||
|
||||
/***
|
||||
* 查询台站阈值
|
||||
* @param stationId 台站ID
|
||||
* @return List<GardsThresholdResult> GardsThresholdResult
|
||||
*/
|
||||
@Override
|
||||
public List<GardsThresholdResult> findThresholdResults(String stationId) {
|
||||
// 获取未分级样品
|
||||
LambdaQueryWrapper<GardsThresholdResult> queryWrapper = new LambdaQueryWrapper<GardsThresholdResult>();
|
||||
queryWrapper.eq(GardsThresholdResult::getStationId, stationId);
|
||||
|
||||
return thresholdResultMapper.selectList(queryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
public GardsSampleGrading getGardsSampleGradingById(String sampleId) {
|
||||
return this.baseMapper.selectById(sampleId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void saveOrUpdateSampleThreshold(GardsSampleGrading sampleThreshold) {
|
||||
if (sampleThreshold.getSampleId() == null) {
|
||||
this.baseMapper.insert(sampleThreshold);
|
||||
} else {
|
||||
this.baseMapper.updateById(sampleThreshold);
|
||||
}
|
||||
}
|
||||
|
||||
/***
|
||||
* 更新样品级别
|
||||
* @param sampleId 样品ID
|
||||
* @param level 级别
|
||||
*/
|
||||
@Override
|
||||
public void updateSampleGrading(String sampleId, int level) {
|
||||
GardsSampleGrading sample = getGardsSampleGradingById(sampleId);
|
||||
if (sample != null) {
|
||||
sample.setGrading(String.valueOf(level));
|
||||
sample.setGradingStatus(1); // 标记为已处理
|
||||
saveOrUpdateSampleThreshold(sample);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user