任务相关

This commit is contained in:
李玉东 2025-09-18 13:51:58 +08:00
parent dc6f239a8d
commit 1fc63d734b
4 changed files with 106 additions and 77 deletions

View File

@ -5,6 +5,8 @@ import com.hivekion.room.bean.Room;
import com.hivekion.room.func.TaskAction; import com.hivekion.room.func.TaskAction;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
/** /**
* [类的简要说明] * [类的简要说明]
@ -60,4 +62,10 @@ public class RoomManager {
} }
return 0; return 0;
} }
public static void addFuture(ScheduledExecutorService future,String roomId){
Room room = roomsMap.get(roomId);
if (room != null) {
room.addTaskReference(future);
}
}
} }

View File

@ -1,18 +1,14 @@
package com.hivekion.room.bean; package com.hivekion.room.bean;
import com.hivekion.Global; import com.hivekion.room.RoomManager;
import com.hivekion.room.func.TaskAction; import com.hivekion.room.func.TaskAction;
import com.hivekion.scenario.bean.ScenarioWsParam;
import com.hivekion.scenario.entity.ScenarioTask; import com.hivekion.scenario.entity.ScenarioTask;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient;
/** /**
@ -28,7 +24,6 @@ import org.springframework.web.reactive.function.client.WebClient;
public abstract class AbtParentTask implements TaskAction { public abstract class AbtParentTask implements TaskAction {
//任务数据 //任务数据
protected final ScenarioTask scenarioTask; protected final ScenarioTask scenarioTask;
//房间ID //房间ID
@ -52,10 +47,10 @@ public abstract class AbtParentTask implements TaskAction {
} }
public void addScheduledExecutorServiceRefenceToRoom(
ScheduledExecutorService scheduledExecutorService) {
RoomManager.addFuture(scheduledExecutorService, this.roomId);
}
@Override @Override
public void doSomeThing() { public void doSomeThing() {
@ -71,9 +66,10 @@ public abstract class AbtParentTask implements TaskAction {
public String getType() { public String getType() {
return scenarioTask.getTaskType(); return scenarioTask.getTaskType();
} }
//获取房间的持续时间
public long getDuringTime() {
return RoomManager.getRoomDuringTime(this.roomId);
}
} }

View File

@ -13,7 +13,7 @@ import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment; import org.springframework.core.env.Environment;
@ -29,70 +29,87 @@ import org.springframework.core.env.Environment;
@Slf4j @Slf4j
public class MoveRootTask extends AbtParentTask implements TaskAction { public class MoveRootTask extends AbtParentTask implements TaskAction {
protected final ScheduledExecutorService schedule = Executors.newScheduledThreadPool(
1); private final double SPEED = 170;//速度
protected ScheduledFuture<?> scheduledFuture; private double accumulatedDistance = 0;//累计距离
private final double SPEED = 170; private final Map<Double, String> distanceInfoMap = new TreeMap<Double, String>();//距离和坐标点对应关系
private double accumulatedDistance = 0; private Double beforeLng = null;//上一次经度
private Double beforeLat = null; //上一次纬度
public MoveRootTask(ScenarioTask scenarioTask, String roomId) { public MoveRootTask(ScenarioTask scenarioTask, String roomId) {
super(scenarioTask, roomId); super(scenarioTask, roomId);
} }
private final Map<Double, String> distanceInfoMap = new TreeMap<Double, String>();
@Override @Override
public void doSomeThing() { public void doSomeThing() {
log.info("move task running"); log.info("move task running");
//累计距离 initPath(); //初始化路径
updatePath(); //更新路径
String url = SpringUtil.getBean(Environment.class).getProperty("path.planning.url");
String params = url + "?"
+ "profile=car"
+ "&point=" + scenarioTask.getFromLat() + ","
+ scenarioTask.getFromLng()
+ "&point=" + scenarioTask.getToLat() + ","
+ scenarioTask.getToLng()
+ "points_encoded=false"
+ "&algorithm=alternative_route&alternative_route.max_paths=3";
String result = webClient.get().uri(params)
.retrieve()
.bodyToMono(String.class)
.block();
JSONObject pointJson = JSON.parseObject(result);
//获取路径点
if (pointJson != null) {
JSONObject pointsObj = pointJson.getJSONArray("paths").getJSONObject(0)
.getJSONObject("points");
//推送路径任务
Global.sendCmdInfoQueue.add(
ResponseCmdInfo.create("path_init", roomId, scenarioTask.getScenarioId(), pointsObj));
JSONArray coordinates = pointsObj.getJSONArray("coordinates");
Double beforeLng = null;
Double beforeLat = null;
for (int i = 0; i < coordinates.size(); i++) {
JSONArray coordinate = coordinates.getJSONArray(i);
Double lng = coordinate.getDouble(0);
Double lat = coordinate.getDouble(1);
if (beforeLng == null && beforeLat == null) {
distanceInfoMap.put((double) 0, lng + "," + lat);
} else {
double distance = MultiPointGeoPosition.haversine(beforeLat, beforeLng, lng, lat);
distanceInfoMap.put(distance, lng + "," + lat);
}
beforeLng = lng;
beforeLat = lat;
}
}
} }
/**
* 初始化路径
*/
private void initPath() {
try {
beforeLng = Double.parseDouble(scenarioTask.getFromLng());
beforeLat = Double.parseDouble(scenarioTask.getFromLat());
//累计距离
String url = SpringUtil.getBean(Environment.class).getProperty("path.planning.url");
String params = url + "?"
+ "profile=car"
+ "&point=" + scenarioTask.getFromLat() + ","
+ scenarioTask.getFromLng()
+ "&point=" + scenarioTask.getToLat() + ","
+ scenarioTask.getToLng()
+ "&points_encoded=false"
+ "&algorithm=alternative_route&alternative_route.max_paths=3";
log.info("params::{}", params);
String result = webClient.get().uri(params)
.retrieve()
.bodyToMono(String.class)
.block();
log.info("result:{}", result);
JSONObject pointJson = JSON.parseObject(result);
//获取路径点
if (pointJson != null) {
JSONObject pointsObj = pointJson.getJSONArray("paths").getJSONObject(0)
.getJSONObject("points");
//推送路径任务
Global.sendCmdInfoQueue.add(
ResponseCmdInfo.create("path_init", roomId, scenarioTask.getScenarioId(), pointsObj));
JSONArray coordinates = pointsObj.getJSONArray("coordinates");
for (int i = 0; i < coordinates.size(); i++) {
JSONArray coordinate = coordinates.getJSONArray(i);
Double lng = coordinate.getDouble(0);
Double lat = coordinate.getDouble(1);
double distance = MultiPointGeoPosition.haversine(beforeLat, beforeLng, lng, lat);
distanceInfoMap.put(distance, lng + "," + lat);
beforeLng = lng;
beforeLat = lat;
}
}
} catch (Exception e) {
log.error("error::", e);
}
}
private void updatePath() {
ScheduledExecutorService schedule = Executors.newScheduledThreadPool(
1);
schedule.scheduleWithFixedDelay(() -> {
}, 0, 1, TimeUnit.SECONDS);
//房间统一管理定时器房间关闭后定时器销毁
addScheduledExecutorServiceRefenceToRoom(schedule);
}
} }

View File

@ -56,6 +56,8 @@ public class Room implements AutoCloseable {
private NavigableMap<Long, Map<String, TaskAction>> actionMap = new ConcurrentSkipListMap<>(); private NavigableMap<Long, Map<String, TaskAction>> actionMap = new ConcurrentSkipListMap<>();
//日期格式化 //日期格式化
private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); private DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
//房间中关联的任务管理器
private Map<String,ScheduledExecutorService> futures = new ConcurrentHashMap<>();
//线程池 //线程池
private final ExecutorService actionExecutor = private final ExecutorService actionExecutor =
new ThreadPoolExecutor( new ThreadPoolExecutor(
@ -79,7 +81,6 @@ public class Room implements AutoCloseable {
private int mag = 1; private int mag = 1;
/** /**
* 启动 * 启动
* *
@ -112,6 +113,7 @@ public class Room implements AutoCloseable {
public long getDuringTime() { public long getDuringTime() {
return duringTime.get(); return duringTime.get();
} }
public long getTotalTime() { public long getTotalTime() {
return totalTime.get(); return totalTime.get();
} }
@ -120,8 +122,9 @@ public class Room implements AutoCloseable {
private void startTask() { private void startTask() {
if (future == null || future.isCancelled()) { if (future == null || future.isCancelled()) {
future = scheduler.scheduleAtFixedRate(() -> { future = scheduler.scheduleAtFixedRate(() -> {
ScenarioWsParam magValue = Global.roomParamMap.get(this.scenario.getId() + "_" + this.roomId); ScenarioWsParam magValue = Global.roomParamMap.get(
if(magValue!=null){ this.scenario.getId() + "_" + this.roomId);
if (magValue != null) {
this.mag = magValue.getMag(); this.mag = magValue.getMag();
} }
@ -130,11 +133,11 @@ public class Room implements AutoCloseable {
sendRemainTime((totalTime.get() - curTime)); sendRemainTime((totalTime.get() - curTime));
NavigableMap<Long, Map<String, TaskAction>> actions = actionMap.headMap(curTime, true); NavigableMap<Long, Map<String, TaskAction>> actions = actionMap.headMap(curTime, true);
if (!actions.isEmpty() ) { if (!actions.isEmpty()) {
actions.forEach((key, action) -> { actions.forEach((key, action) -> {
action.forEach((taskAction, task) -> { action.forEach((taskAction, task) -> {
actionExecutor.submit(task::doSomeThing); actionExecutor.submit(task::doSomeThing);
}); });
}); });
actions.clear(); actions.clear();
@ -170,14 +173,19 @@ public class Room implements AutoCloseable {
private void sendRemainTime(long remainTime) { private void sendRemainTime(long remainTime) {
Map<String, Object> timeMap = new HashMap<>(); Map<String, Object> timeMap = new HashMap<>();
timeMap.put("update_time_str",utils.formatSeconds(remainTime)); timeMap.put("update_time_str", utils.formatSeconds(remainTime));
timeMap.put("remain_time",remainTime); timeMap.put("remain_time", remainTime);
timeMap.put("during_time",duringTime.get()); timeMap.put("during_time", duringTime.get());
timeMap.put("current_time",df.format(this.scenario.getStartTime().plusSeconds(duringTime.get()))); timeMap.put("current_time",
df.format(this.scenario.getStartTime().plusSeconds(duringTime.get())));
Global.sendCmdInfoQueue.add( Global.sendCmdInfoQueue.add(
ResponseCmdInfo.create("update_time", this.roomId, this.scenario.getId(), ResponseCmdInfo.create("update_time", this.roomId, this.scenario.getId(),
timeMap)); timeMap));
} }
public void addTaskReference(ScheduledExecutorService scheduledExecutorService) {
futures.put(IdUtils.simpleUUID(), scheduledExecutorService);
}
} }