fix:修改消息队列

This commit is contained in:
nieziyan 2023-10-07 19:25:42 +08:00
parent cc7775edaa
commit 5a661771e3
4 changed files with 141 additions and 105 deletions

View File

@ -1,5 +1,7 @@
package org.jeecg.common.util;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
@ -14,11 +16,10 @@ import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@Component
public class RedisStreamUtil {
@ -29,6 +30,10 @@ public class RedisStreamUtil {
@Autowired
private StringRedisTemplate stringRedisTemplate;
private final String analysisKey = RedisConstant.STREAM_ANALYSIS;
private final String analysisGroup = RedisConstant.GROUP_ANALYSIS;
/**
* 是否有Key(判断是否有Stream)
*
@ -122,6 +127,7 @@ public class RedisStreamUtil {
return stringRedisTemplate.opsForStream().read(Info.class,
Consumer.from(groupName, consumerName),
StreamOffset.fromStart(streamKey));
// StreamOffset.create(streamKey, ReadOffset.lastConsumed()));
}
/**
@ -192,7 +198,9 @@ public class RedisStreamUtil {
}
public String pushAnalysis(Info info){
String analysisKey = RedisConstant.STREAM_ANALYSIS;
// 创建Stream和消费组 消费者注册后可以消费当前队列中的消息
creatGroup(analysisKey, analysisGroup);
ObjectRecord<String, Info> record = StreamRecords.newRecord()
.in(analysisKey).ofObject(info);
// 向Redis Stream中推送消息
@ -244,4 +252,60 @@ public class RedisStreamUtil {
return null;
});
}
/**
* 创建Stream单个消费组
*/
public void creatGroup(String streamKey,String groupName){
if (StrUtil.isBlank(streamKey) || StrUtil.isBlank(groupName))return;
if (hasKey(streamKey)) {
StreamInfo.XInfoGroups groups = getGroups(streamKey);
if (groups.isEmpty()) {
createGroup(streamKey, groupName);
}else {
// 判断该组是否已经创建
List<String> created = groups.stream()
.map(StreamInfo.XInfoGroup::groupName)
.collect(Collectors.toList());
if (!created.contains(groupName))
createGroup(streamKey, groupName);
}
} else {
createGroup(streamKey, groupName);
}
}
/**
* 创建Stream多个消费组
*/
public void creatGroup(String streamKey, List<String> groupNames){
if (StrUtil.isBlank(streamKey) || CollUtil.isEmpty(groupNames))return;
if (hasKey(streamKey)) {
StreamInfo.XInfoGroups groups = getGroups(streamKey);
if (groups.isEmpty()) {
groupNames.forEach(groupName -> {
createGroup(streamKey, groupName);
});
}else {
// 如果组名已经存在,从待创建列表中移除
List<String> created = groups.stream()
.map(StreamInfo.XInfoGroup::groupName)
.collect(Collectors.toList());
Iterator<String> iterator = groupNames.iterator();
while (iterator.hasNext()){
String groupName = iterator.next();
if (created.contains(groupName))
iterator.remove();
}
// 对不存在的组进行创建
groupNames.forEach(groupName -> {
createGroup(streamKey, groupName);
});
}
} else {
groupNames.forEach(groupName -> {
createGroup(streamKey, groupName);
});
}
}
}

View File

@ -49,8 +49,8 @@ public class RedisStreamConfig {
@Bean(initMethod = "start", destroyMethod = "stop")
public StreamMessageListenerContainer<String, ObjectRecord<String, Info>> alarmStream() {
/* 创建Stream和消费组 */
creatGroup(alarmKey, alarmGroup);
creatGroup(analysisKey, analysisGroup);
redisStreamUtil.creatGroup(alarmKey, alarmGroup);
redisStreamUtil.creatGroup(analysisKey, analysisGroup);
// 原子整数,多线程环境下对整数的原子性操作
AtomicInteger index = new AtomicInteger(1);
// 返回当前系统可用的处理器数量
@ -130,58 +130,4 @@ public class RedisStreamConfig {
StreamOffset.create(warnKey, ReadOffset.lastConsumed()), consumeA2);*/
return streamMessageListenerContainer;
}
/**
* 创建Stream单个消费组
*/
private void creatGroup(String streamKey,String groupName){
if (StrUtil.isBlank(streamKey) || StrUtil.isBlank(groupName))return;
if (redisStreamUtil.hasKey(streamKey)) {
StreamInfo.XInfoGroups groups = redisStreamUtil.getGroups(streamKey);
if (groups.isEmpty()) {
redisStreamUtil.createGroup(streamKey, groupName);
}else {
// 判断该组是否已经创建
List<String> created = groups.stream()
.map(StreamInfo.XInfoGroup::groupName)
.collect(Collectors.toList());
if (!created.contains(groupName))redisStreamUtil.createGroup(streamKey, groupName);
}
} else {
redisStreamUtil.createGroup(streamKey, groupName);
}
}
/**
* 创建Stream多个消费组
*/
private void creatGroup(String streamKey, List<String> groupNames){
if (StrUtil.isBlank(streamKey) || CollUtil.isEmpty(groupNames))return;
if (redisStreamUtil.hasKey(streamKey)) {
StreamInfo.XInfoGroups groups = redisStreamUtil.getGroups(streamKey);
if (groups.isEmpty()) {
groupNames.forEach(groupName -> {
redisStreamUtil.createGroup(streamKey, groupName);
});
}else {
// 如果组名已经存在,从待创建列表中移除
List<String> created = groups.stream()
.map(StreamInfo.XInfoGroup::groupName)
.collect(Collectors.toList());
Iterator<String> iterator = groupNames.iterator();
while (iterator.hasNext()){
String groupName = iterator.next();
if (created.contains(groupName))iterator.remove();
}
// 对不存在的组进行创建
groupNames.forEach(groupName -> {
redisStreamUtil.createGroup(streamKey, groupName);
});
}
} else {
groupNames.forEach(groupName -> {
redisStreamUtil.createGroup(streamKey, groupName);
});
}
}
}

View File

@ -0,0 +1,70 @@
package org.jeecg.modules.quartz.controller;
import cn.hutool.core.map.MapUtil;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.apache.shiro.SecurityUtils;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.constant.CommonConstant;
import org.jeecg.common.constant.Prompt;
import org.jeecg.common.constant.SymbolConstant;
import org.jeecg.common.system.query.QueryGenerator;
import org.jeecg.common.system.vo.LoginUser;
import org.jeecg.common.util.ImportExcelUtil;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.modules.base.dto.Info;
import org.jeecg.modules.quartz.entity.QuartzJob;
import org.jeecg.modules.quartz.service.IQuartzJobService;
import org.jeecgframework.poi.excel.ExcelImportUtil;
import org.jeecgframework.poi.excel.def.NormalExcelConstants;
import org.jeecgframework.poi.excel.entity.ExportParams;
import org.jeecgframework.poi.excel.entity.ImportParams;
import org.jeecgframework.poi.excel.view.JeecgEntityExcelView;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.multipart.MultipartHttpServletRequest;
import org.springframework.web.servlet.ModelAndView;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("sys/testana")
@Slf4j
@Api(tags = "定时任务接口")
public class TestController {
@Autowired
private RedisStreamUtil redisStreamUtil;
@GetMapping("test")
public void test(){
Info info = new Info();
info.setStationId("205");
info.setSampleId("425496");
info.setBetaOrGamma("Gamma");
info.setFullOrPrel("FULL");
info.setDatasource("1");
info.setSampleName("CAX05_001-20230624_0220_Q_FULL_299.3.PHD");
info.setCollectionDate(LocalDateTime.now());
Map<String, String> nuclides = MapUtil.newHashMap();
nuclides.put("Be7","1000000");
nuclides.put("sss","1000000");
nuclides.put("Tl208","10");
info.setNuclides(nuclides);
redisStreamUtil.pushAnalysis(info);
}
}

View File

@ -1,44 +0,0 @@
package org.jeecg.modules.quartz.job;
import cn.hutool.core.map.MapUtil;
import org.jeecg.common.util.RedisStreamUtil;
import org.jeecg.common.util.SpringContextUtils;
import org.jeecg.modules.base.dto.Info;
import org.quartz.*;
import java.time.LocalDateTime;
import java.util.Map;
/**
* 此处的同步是指:当定时任务的执行时间大于任务的时间
* 间隔时会等待第一个任务执行完成才会走第二个任务
*/
@DisallowConcurrentExecution
@PersistJobDataAfterExecution
public class Test implements Job {
private RedisStreamUtil redisStreamUtil;
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
init();
/*Info info = new Info();
info.setStationId("205");
info.setSampleId("425496");
info.setBetaOrGamma("Gamma");
info.setFullOrPrel("FULL");
info.setDatasource("1");
info.setSampleName("CAX05_001-20230624_0220_Q_FULL_299.3.PHD");
info.setCollectionDate(LocalDateTime.now());
Map<String, String> nuclides = MapUtil.newHashMap();
nuclides.put("Be7","1000000");
nuclides.put("sss","1000000");
nuclides.put("Tl208","10");
info.setNuclides(nuclides);
redisStreamUtil.pushAnalysis(info);*/
}
private void init(){
redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class);
}
}