package com.hivekion.room.bean; import cn.hutool.extra.spring.SpringUtil; import com.alibaba.fastjson2.JSON; import com.alibaba.fastjson2.JSONArray; import com.alibaba.fastjson2.JSONObject; import com.hivekion.Global; import com.hivekion.baseData.entity.Scenario; import com.hivekion.baseData.service.ScenarioService; import com.hivekion.common.MultiPointGeoPosition; import com.hivekion.common.entity.ResponseCmdInfo; import com.hivekion.common.redis.RedisUtil; import com.hivekion.enums.WsCmdTypeEnum; import com.hivekion.room.RoomManager; import com.hivekion.room.func.TaskAction; import com.hivekion.scenario.entity.ScenarioTask; import com.hivekion.statistic.bean.ScenarioInfo; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.NavigableMap; import java.util.TreeMap; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.core.env.Environment; import org.springframework.web.reactive.function.client.WebClient; /** * [类的简要说明] *

* [详细描述,可选] *

* * @author LiDongYU * @since 2025/7/22 */ @Slf4j public abstract class AbtParentTask implements TaskAction { /** * 开始点坐标 */ private final AtomicReference startPoint = new AtomicReference<>(); /** * 距离和坐标的对应关系 */ protected final TreeMap distanceInfoMap = new TreeMap<>(); //任务数据 protected final ScenarioTask scenarioTask; //房间ID protected final String roomId; //http请求 protected WebClient webClient = WebClient.create(); protected final AtomicBoolean canMoved = new AtomicBoolean(true); protected final AtomicReference coordinateReference = new AtomicReference<>(); /** * 任务相对与想定的开始时间 */ private long taskRelativeTime = 0; //线程池 protected ThreadPoolExecutor executor = new ThreadPoolExecutor( 5, // 核心线程数 10, // 最大线程数 60L, // 空闲线程存活时间 TimeUnit.SECONDS, // 时间单位 new LinkedBlockingQueue<>(100), // 任务队列 new CustomThreadFactory("MyPool"), // 线程工厂 new ThreadPoolExecutor.CallerRunsPolicy() // 拒绝策略 ); public AbtParentTask(ScenarioTask scenarioTask, String roomId) { this.scenarioTask = scenarioTask; this.roomId = roomId; Scenario scenario = SpringUtil.getBean(ScenarioService.class) .getScenarioById(scenarioTask.getScenarioId()); taskRelativeTime = Math.abs( Duration.between(scenario.getStartTime(), scenarioTask.getStartTime()).getSeconds()); } public void addScheduledExecutorServiceRefenceToRoom( ScheduledExecutorService scheduledExecutorService) { RoomManager.addFuture(scheduledExecutorService, this.roomId); } @Override public void doSomeThing() { } @Override public String getId() { return scenarioTask.getId(); } @Override public String getType() { return scenarioTask.getTaskType(); } //获取房间的持续时间 public long getDuringTime() { return RoomManager.getRoomDuringTime(this.roomId); } //获取房间状态 public boolean getRoomStatus() { return RoomManager.isRunning(roomId); } public void createBattleTaskOnTimingHandle(BizTaskOnTiming bizTaskOnTiming) { ScheduledExecutorService schedule = Executors.newScheduledThreadPool( 1); schedule.scheduleWithFixedDelay(() -> { bizTaskOnTiming.execTask(); }, 0, 10, TimeUnit.SECONDS); //房间统一管理定时器;房间关闭后,定时器销毁 addScheduledExecutorServiceRefenceToRoom(schedule); } protected void initPath() { try { log.info("init path"); String url = SpringUtil.getBean(Environment.class).getProperty("path.planning.url"); String params = url + "?" + "profile=car" + "&point=" + scenarioTask.getFromLat() + "," + scenarioTask.getFromLng() + "&point=" + scenarioTask.getToLat() + "," + scenarioTask.getToLng() + "&points_encoded=false" + "&algorithm=alternative_route&alternative_route.max_paths=3"; log.info("params:;{}", params); //获取路线信息 String result = webClient.get().uri(params) .retrieve() .bodyToMono(String.class) .block(); log.info("init path finished ::{}", result); JSONObject pointJson = JSON.parseObject(result); //获取路径点 if (pointJson != null) { JSONObject pointsObj = pointJson.getJSONArray("paths").getJSONObject(0) .getJSONObject("points"); JSONArray coordinates = pointsObj.getJSONArray("coordinates"); //组装信息 Map dataMap = new HashMap<>(); dataMap.put("resourceId", scenarioTask.getResourceId()); dataMap.put("points", coordinates); //推送路径任务 Global.sendCmdInfoQueue.add( ResponseCmdInfo.create(WsCmdTypeEnum.PATH_INIT.getCode(), roomId, scenarioTask.getScenarioId(), dataMap)); SpringUtil.getBean(RedisUtil.class).hset( scenarioTask.getScenarioId() + "-" + roomId + "-" + scenarioTask.getResourceId(), "init_path", JSON.toJSONString(coordinates)); //计算各个点的累计距离和坐标的对应关系 double beforeLng = Double.parseDouble(scenarioTask.getFromLng()); double beforeLat = Double.parseDouble(scenarioTask.getFromLat()); double total = 0; for (int i = 0; i < coordinates.size(); i++) { JSONArray coordinate = coordinates.getJSONArray(i); Double lng = coordinate.getDouble(0); Double lat = coordinate.getDouble(1); double distance = MultiPointGeoPosition.haversine(beforeLat, beforeLng, lat, lng); //当前总距离 total = total + distance; //定义坐标对象 Coordinate coordinateInfo = new Coordinate(); coordinateInfo.setLat(lat); coordinateInfo.setLng(lng); //记录距离和数组列表直接的索引关系 distanceInfoMap.put(total, coordinateInfo); beforeLng = lng; beforeLat = lat; } //设置第一个开始位置 startPoint.set(distanceInfoMap.firstKey()); } } catch (Exception e) { log.error("error::", e); } } protected void updatePath(double speed, TaskAction duringAction, TaskAction finishedAction) { AtomicLong duringTime = new AtomicLong(0); ScheduledExecutorService schedule = Executors.newScheduledThreadPool( 1); schedule.scheduleWithFixedDelay(() -> { try { if (this.getRoomStatus()) { if (distanceInfoMap.isEmpty()) { return; } if (!this.canMoved.get()) { return; } log.info("{}-移动中,canRemove::{}", this.scenarioTask.getResourceId(), this.canMoved.get()); if (duringAction != null) { duringAction.doSomeThing(); } //跑动距离 double distance = duringTime.getAndAdd(RoomManager.getMag(roomId)) * speed; //获取大与此距离的第一个路线点key Entry endPoint = distanceInfoMap.ceilingEntry(distance); if (endPoint == null) { endPoint = distanceInfoMap.lastEntry(); } // log.info("enPoint::{}",endPoint); //ws数据 List dataList = new ArrayList<>(); HashMap dataMap = new HashMap<>(); dataMap.put("resourceId", scenarioTask.getResourceId()); dataMap.put("points", dataList); if (Double.compare(distance, endPoint.getKey()) < 0) { //获取小于最大值的第一个key Double lowerKey = distanceInfoMap.lowerKey(endPoint.getKey()); if (lowerKey == null) { lowerKey = endPoint.getKey(); } NavigableMap subPathMap = distanceInfoMap.subMap(startPoint.get(), true, lowerKey, true); for (Double key : subPathMap.keySet()) { Coordinate coordinate = subPathMap.get(key); dataList.add(new double[]{coordinate.getLng(), coordinate.getLat()}); } double diff = distance - lowerKey; //插入值 double[] insertPoints = MultiPointGeoPosition.pointAlong( distanceInfoMap.get(lowerKey).getLat(), distanceInfoMap.get(lowerKey).getLng(), endPoint.getValue().getLat(), endPoint.getValue().getLng(), diff); dataList.add(new double[]{insertPoints[1], insertPoints[0]}); Coordinate coordinate = new Coordinate(); coordinate.setLat(insertPoints[0]); coordinate.setLng(insertPoints[1]); distanceInfoMap.put(distance, coordinate); startPoint.set(distance); coordinateReference.set(coordinate); SpringUtil.getBean(RedisUtil.class).hset( scenarioTask.getScenarioId() + "-" + roomId + "-" + scenarioTask.getResourceId(), "position", JSON.toJSONString(coordinate)); Global.sendCmdInfoQueue.add( ResponseCmdInfo.create(WsCmdTypeEnum.PATH_UPDATE.getCode(), roomId, scenarioTask.getScenarioId(), dataMap)); } else if (Double.compare(distance, endPoint.getKey()) == 0) { NavigableMap subPathMap = distanceInfoMap.subMap(startPoint.get(), true, endPoint.getKey(), true); for (Double key : subPathMap.keySet()) { Coordinate coordinate = subPathMap.get(key); dataList.add(new double[]{coordinate.getLng(), coordinate.getLat()}); } coordinateReference.set(endPoint.getValue()); startPoint.set(endPoint.getKey()); Global.sendCmdInfoQueue.add( ResponseCmdInfo.create(WsCmdTypeEnum.PATH_UPDATE.getCode(), roomId, scenarioTask.getScenarioId(), dataMap)); } else { if (finishedAction != null) { finishedAction.doSomeThing(); } //完成路径 Global.sendCmdInfoQueue.add( ResponseCmdInfo.create(WsCmdTypeEnum.PATH_FINISHED.getCode(), roomId, scenarioTask.getScenarioId(), dataMap)); //任务终止 schedule.shutdown(); } } } catch (Exception e) { log.error("error::", e); } }, 0, 1, TimeUnit.SECONDS); //房间统一管理定时器;房间关闭后,定时器销毁 addScheduledExecutorServiceRefenceToRoom(schedule); } private RedisUtil redisUtil; //统一推送方法 protected void pushStatus(String resourceId) { if (StringUtils.isBlank(resourceId)) { return; } if (redisUtil == null) { redisUtil = SpringUtil.getBean(RedisUtil.class); } String jsonStr = (String) redisUtil.hget( this.scenarioTask.getScenarioId() + "-" + roomId + "-" + scenarioTask.getResourceId(), "scenarioInfo"); ResponseCmdInfo respObj = new ResponseCmdInfo<>(); respObj.setData(jsonStr); respObj.setRoom(roomId); respObj.setScenarioId(scenarioTask.getScenarioId()); respObj.setCmdType("scenarioInfo"); Global.sendCmdInfoQueue.add(respObj); } } interface BizTaskOnTiming { public void execTask(); } // 自定义线程工厂 class CustomThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public CustomThreadFactory(String namePrefix) { this.namePrefix = namePrefix + "-thread-"; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, namePrefix + threadNumber.getAndIncrement()); thread.setDaemon(false); // 设置为非守护线程 thread.setPriority(Thread.NORM_PRIORITY); return thread; } }