diff --git a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/RedisStreamUtil.java b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/RedisStreamUtil.java index 1441aed3..aba5f03f 100644 --- a/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/RedisStreamUtil.java +++ b/jeecg-boot-base-core/src/main/java/org/jeecg/common/util/RedisStreamUtil.java @@ -1,5 +1,7 @@ package org.jeecg.common.util; +import cn.hutool.core.collection.CollUtil; +import cn.hutool.core.util.StrUtil; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; @@ -14,11 +16,10 @@ import org.springframework.data.redis.core.*; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.stereotype.Component; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.time.Duration; +import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; @Component public class RedisStreamUtil { @@ -29,6 +30,10 @@ public class RedisStreamUtil { @Autowired private StringRedisTemplate stringRedisTemplate; + private final String analysisKey = RedisConstant.STREAM_ANALYSIS; + + private final String analysisGroup = RedisConstant.GROUP_ANALYSIS; + /** * 是否有Key(判断是否有Stream) * @@ -122,6 +127,7 @@ public class RedisStreamUtil { return stringRedisTemplate.opsForStream().read(Info.class, Consumer.from(groupName, consumerName), StreamOffset.fromStart(streamKey)); + // StreamOffset.create(streamKey, ReadOffset.lastConsumed())); } /** @@ -192,7 +198,9 @@ public class RedisStreamUtil { } public String pushAnalysis(Info info){ - String analysisKey = RedisConstant.STREAM_ANALYSIS; + // 创建Stream和消费组 消费者注册后可以消费当前队列中的消息 + creatGroup(analysisKey, analysisGroup); + ObjectRecord record = StreamRecords.newRecord() .in(analysisKey).ofObject(info); // 向Redis Stream中推送消息 @@ -244,4 +252,60 @@ public class RedisStreamUtil { return null; }); } + + /** + * 创建Stream、单个消费组 + */ + public void creatGroup(String streamKey,String groupName){ + if (StrUtil.isBlank(streamKey) || StrUtil.isBlank(groupName))return; + if (hasKey(streamKey)) { + StreamInfo.XInfoGroups groups = getGroups(streamKey); + if (groups.isEmpty()) { + createGroup(streamKey, groupName); + }else { + // 判断该组是否已经创建 + List created = groups.stream() + .map(StreamInfo.XInfoGroup::groupName) + .collect(Collectors.toList()); + if (!created.contains(groupName)) + createGroup(streamKey, groupName); + } + } else { + createGroup(streamKey, groupName); + } + } + + /** + * 创建Stream、多个消费组 + */ + public void creatGroup(String streamKey, List groupNames){ + if (StrUtil.isBlank(streamKey) || CollUtil.isEmpty(groupNames))return; + if (hasKey(streamKey)) { + StreamInfo.XInfoGroups groups = getGroups(streamKey); + if (groups.isEmpty()) { + groupNames.forEach(groupName -> { + createGroup(streamKey, groupName); + }); + }else { + // 如果组名已经存在,从待创建列表中移除 + List created = groups.stream() + .map(StreamInfo.XInfoGroup::groupName) + .collect(Collectors.toList()); + Iterator iterator = groupNames.iterator(); + while (iterator.hasNext()){ + String groupName = iterator.next(); + if (created.contains(groupName)) + iterator.remove(); + } + // 对不存在的组进行创建 + groupNames.forEach(groupName -> { + createGroup(streamKey, groupName); + }); + } + } else { + groupNames.forEach(groupName -> { + createGroup(streamKey, groupName); + }); + } + } } diff --git a/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/RedisStreamConfig.java b/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/RedisStreamConfig.java index 3af4534e..18efd30e 100644 --- a/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/RedisStreamConfig.java +++ b/jeecg-module-abnormal-alarm/src/main/java/org/jeecg/modules/redisStream/RedisStreamConfig.java @@ -49,8 +49,8 @@ public class RedisStreamConfig { @Bean(initMethod = "start", destroyMethod = "stop") public StreamMessageListenerContainer> alarmStream() { /* 创建Stream和消费组 */ - creatGroup(alarmKey, alarmGroup); - creatGroup(analysisKey, analysisGroup); + redisStreamUtil.creatGroup(alarmKey, alarmGroup); + redisStreamUtil.creatGroup(analysisKey, analysisGroup); // 原子整数,多线程环境下对整数的原子性操作 AtomicInteger index = new AtomicInteger(1); // 返回当前系统可用的处理器数量 @@ -130,58 +130,4 @@ public class RedisStreamConfig { StreamOffset.create(warnKey, ReadOffset.lastConsumed()), consumeA2);*/ return streamMessageListenerContainer; } - - /** - * 创建Stream、单个消费组 - */ - private void creatGroup(String streamKey,String groupName){ - if (StrUtil.isBlank(streamKey) || StrUtil.isBlank(groupName))return; - if (redisStreamUtil.hasKey(streamKey)) { - StreamInfo.XInfoGroups groups = redisStreamUtil.getGroups(streamKey); - if (groups.isEmpty()) { - redisStreamUtil.createGroup(streamKey, groupName); - }else { - // 判断该组是否已经创建 - List created = groups.stream() - .map(StreamInfo.XInfoGroup::groupName) - .collect(Collectors.toList()); - if (!created.contains(groupName))redisStreamUtil.createGroup(streamKey, groupName); - } - } else { - redisStreamUtil.createGroup(streamKey, groupName); - } - } - - /** - * 创建Stream、多个消费组 - */ - private void creatGroup(String streamKey, List groupNames){ - if (StrUtil.isBlank(streamKey) || CollUtil.isEmpty(groupNames))return; - if (redisStreamUtil.hasKey(streamKey)) { - StreamInfo.XInfoGroups groups = redisStreamUtil.getGroups(streamKey); - if (groups.isEmpty()) { - groupNames.forEach(groupName -> { - redisStreamUtil.createGroup(streamKey, groupName); - }); - }else { - // 如果组名已经存在,从待创建列表中移除 - List created = groups.stream() - .map(StreamInfo.XInfoGroup::groupName) - .collect(Collectors.toList()); - Iterator iterator = groupNames.iterator(); - while (iterator.hasNext()){ - String groupName = iterator.next(); - if (created.contains(groupName))iterator.remove(); - } - // 对不存在的组进行创建 - groupNames.forEach(groupName -> { - redisStreamUtil.createGroup(streamKey, groupName); - }); - } - } else { - groupNames.forEach(groupName -> { - redisStreamUtil.createGroup(streamKey, groupName); - }); - } - } } diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/controller/TestController.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/controller/TestController.java new file mode 100644 index 00000000..c350e0a8 --- /dev/null +++ b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/controller/TestController.java @@ -0,0 +1,70 @@ +package org.jeecg.modules.quartz.controller; + +import cn.hutool.core.map.MapUtil; +import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; +import com.baomidou.mybatisplus.core.metadata.IPage; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import lombok.extern.slf4j.Slf4j; +import org.apache.shiro.SecurityUtils; +import org.jeecg.common.api.vo.Result; +import org.jeecg.common.constant.CommonConstant; +import org.jeecg.common.constant.Prompt; +import org.jeecg.common.constant.SymbolConstant; +import org.jeecg.common.system.query.QueryGenerator; +import org.jeecg.common.system.vo.LoginUser; +import org.jeecg.common.util.ImportExcelUtil; +import org.jeecg.common.util.RedisStreamUtil; +import org.jeecg.modules.base.dto.Info; +import org.jeecg.modules.quartz.entity.QuartzJob; +import org.jeecg.modules.quartz.service.IQuartzJobService; +import org.jeecgframework.poi.excel.ExcelImportUtil; +import org.jeecgframework.poi.excel.def.NormalExcelConstants; +import org.jeecgframework.poi.excel.entity.ExportParams; +import org.jeecgframework.poi.excel.entity.ImportParams; +import org.jeecgframework.poi.excel.view.JeecgEntityExcelView; +import org.quartz.Scheduler; +import org.quartz.SchedulerException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.multipart.MultipartFile; +import org.springframework.web.multipart.MultipartHttpServletRequest; +import org.springframework.web.servlet.ModelAndView; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +@RestController +@RequestMapping("sys/testana") +@Slf4j +@Api(tags = "定时任务接口") +public class TestController { + + @Autowired + private RedisStreamUtil redisStreamUtil; + + @GetMapping("test") + public void test(){ + Info info = new Info(); + info.setStationId("205"); + info.setSampleId("425496"); + info.setBetaOrGamma("Gamma"); + info.setFullOrPrel("FULL"); + info.setDatasource("1"); + info.setSampleName("CAX05_001-20230624_0220_Q_FULL_299.3.PHD"); + info.setCollectionDate(LocalDateTime.now()); + Map nuclides = MapUtil.newHashMap(); + nuclides.put("Be7","1000000"); + nuclides.put("sss","1000000"); + nuclides.put("Tl208","10"); + info.setNuclides(nuclides); + redisStreamUtil.pushAnalysis(info); + } +} diff --git a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/Test.java b/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/Test.java deleted file mode 100644 index 5abea4b4..00000000 --- a/jeecg-module-system/jeecg-system-biz/src/main/java/org/jeecg/modules/quartz/job/Test.java +++ /dev/null @@ -1,44 +0,0 @@ -package org.jeecg.modules.quartz.job; - -import cn.hutool.core.map.MapUtil; -import org.jeecg.common.util.RedisStreamUtil; -import org.jeecg.common.util.SpringContextUtils; -import org.jeecg.modules.base.dto.Info; -import org.quartz.*; - -import java.time.LocalDateTime; -import java.util.Map; - -/** - * 此处的同步是指:当定时任务的执行时间大于任务的时间 - * 间隔时会等待第一个任务执行完成才会走第二个任务 - */ -@DisallowConcurrentExecution -@PersistJobDataAfterExecution -public class Test implements Job { - - private RedisStreamUtil redisStreamUtil; - - @Override - public void execute(JobExecutionContext context) throws JobExecutionException { - init(); - /*Info info = new Info(); - info.setStationId("205"); - info.setSampleId("425496"); - info.setBetaOrGamma("Gamma"); - info.setFullOrPrel("FULL"); - info.setDatasource("1"); - info.setSampleName("CAX05_001-20230624_0220_Q_FULL_299.3.PHD"); - info.setCollectionDate(LocalDateTime.now()); - Map nuclides = MapUtil.newHashMap(); - nuclides.put("Be7","1000000"); - nuclides.put("sss","1000000"); - nuclides.put("Tl208","10"); - info.setNuclides(nuclides); - redisStreamUtil.pushAnalysis(info);*/ - } - - private void init(){ - redisStreamUtil = SpringContextUtils.getBean(RedisStreamUtil.class); - } -}