Aufbau einer UAV-Pfadkonflikt-Simulationsumgebung: von der Papierpraxis bis zur Code-Implementierung
In den ersten beiden Artikeln haben wir Panorama des UAV-Konfliktlösungsalgorithmus bzw. MARL+GAT-End-to-End-Lösung herausgefunden. In diesem Artikel wird eine untergeordnete Frage beantwortet: **Wie erstellt man ein realistisches und reproduzierbares UAV-Konfliktszenario in einer Simulationsumgebung? ** Dies bestimmt direkt die Glaubwürdigkeit der Algorithmusbewertung und die Vergleichbarkeit des Experiments.
1. Warum ist der Aufbau einer Simulationsumgebung so wichtig?
Die Simulationsumgebung ist eine Brücke zwischen Algorithmentheorie und Flugmessung. Eine gut gestaltete Simulationsumgebung muss drei Dimensionen erfüllen:
| Abmessungen | Bedeutung | FAQ |
|---|---|---|
| Authentizität | Die Kluft zwischen Simulation und Realität | Zu starke Vereinfachung der Dynamik und fehlende Sensormodelle |
| Reproduzierbarkeit | Konsistente Ergebnisse unter demselben Startwert | Unkontrollierte zufällige Initialisierung, Anhäufung von Gleitkommafehlern |
| Skalierbarkeit | Unterstützung großer Cluster | Die Einzelmaschinensimulation kann nicht parallelisiert werden und die Kommunikationstopologie fehlt |
Dieser Artikel konzentriert sich auf den Aufbau der Simulationsumgebung im Rahmen des MARL-Trainingsszenarios und berücksichtigt dabei sowohl Authentizität als auch Skalierbarkeit – schließlich ist es unrealistisch, echte Flugsteuerung auf mehr als 50 Drohnen zu trainieren.
2. Vergleich gängiger Simulationsplattformen
2.1 Plattformübersicht| Plattform | Zugrunde liegender Motor | Physische Authentizität | Unterstützung mehrerer Maschinen | MARL-Kompatibilität | Papierzitate |
|------|----------|-----------|----------|------------|-----------| | AirSim (Microsoft) | Unreal Engine | ⭐⭐⭐⭐⭐ | ✅ | ⚠️ Erfordert Verpackung | Hoch | | FlightGoggles (MIT) | Einheit | ⭐⭐⭐⭐ | ✅ | ⚠️ Erfordert Verpackung | Hoch | | RotorS (ETH Zürich) | Pavillon/ROS | ⭐⭐⭐⭐ | ✅ | ✅ PyMARL-kompatibel | Mittel | | Morse (LAAS-CNRS) | Blender Game Engine | ⭐⭐⭐ | ✅ | ✅ Einheimisch | Mittel | | Webbots | Selbstentwickelt | ⭐⭐⭐⭐ | ✅ | ✅ | Niedrig | | Benutzerdefiniert 2D/3D | N/A | ⭐⭐ | ✅✅✅ | ✅✅✅ | - |
Kernschlussfolgerung:
- Akademische Benchmarks: RotorS + ROS/Gazebo ist die Mainstream-Wahl (hergestellt von der ETH Zürich, kompatibel mit PX4)
- Umfassende RL-Forschung: Die meisten Artikel verwenden Benutzerdefinierte Fitnessstudio-Umgebung (vollständig angepasst, die Belohnungsfunktion lässt sich leicht ändern)
- Sim-to-Real Target: AirSim/FlightGoggles (High-Fidelity Vision + Sensorrauschen)
2.2 AirSim: Skalierung der Erfahrung von Null auf Hundert
Das Microsoft AI for Earth-Team trainierte die Schwarmkoordination von bis zu 100 Starrflügel-UAV auf AirSim:
Shah, S., et al. (2018). AirSim: Hochpräzise visuelle und physikalische Simulation für autonome Fahrzeuge. Feld- und Servicerobotik.Hauptvorteile von AirSim:
- Sensorsimulation: vollständige Simulation von Kamera, LiDAR, GPS, IMU, Rauschen kann eingespeist werden
- Wettersystem: Der Einfluss von Windgeschwindigkeit, Wolken und Licht auf den Sensor
- API-Vereinheitlichung: Unterstützt C++ / Python / ROS-Schnittstelle
import airsim
import numpy as np
class AirSimUAVEnv:
"""
AirSim 多 UAV 冲突环境封装
"""
def __init__(self, n_agents=8, drone_names=None):
self.client = airsim.MultirotorClient()
self.client.confirmConnection()
self.client.enableApiControl(True)
if drone_names is None:
self.drone_names = [f"Drone_{i}" for i in range(n_agents)]
else:
self.drone_names = drone_names
self.n_agents = len(self.drone_names)
# 为每架 UAV 解锁并设置为被动模式(由 RL 控制)
for name in self.drone_names:
self.client.enableApiControl(True, name)
self.client.armDisarm(True, name)
def reset(self):
"""随机初始化各 UAV 位置和目标"""
self.goals = {}
for name in self.drone_names:
# 随机起点
start_pos = np.random.uniform(-50, 50, size=3)
start_pos[2] = np.random.uniform(10, 30) # 高度 10-30m
# 随机终点
goal_pos = np.random.uniform(-50, 50, size=3)
goal_pos[2] = np.random.uniform(10, 30)
self.client.simSetVehiclePose(
airsim.Vector3R(*start_pos),
airsim.ToQuaternionr(0, 0, 0),
vehicle_name=name
)
self.goals[name] = goal_pos
return self._get_obs()
def _get_obs(self):
"""获取各 UAV 的本地观测"""
obs = []
for name in self.drone_names:
state = self.client.getMultirotorState(name)
pos = state.kinematics_estimated.position
vel = state.kinematics_estimated.linear_velocity
obs.append(np.array([pos.x_val, pos.y_val, pos.z_val,
vel.x_val, vel.y_val, vel.z_val]))
return np.array(obs)
def step(self, actions):
"""执行动作:vx, vy, vz 速度指令"""
for i, name in enumerate(self.drone_names):
self.client.moveByVelocityAsync(
actions[i, 0], actions[i, 1], actions[i, 2],
duration=0.1, vehicle_name=name
)
# 等待执行完成(AirSim 为异步 API)
self.client.simPause(True)
airsim.multirotorclient()
self.client.simContinueForTime(0.1)
self.client.simPause(False)
obs = self._get_obs()
reward, done = self._compute_reward()
return obs, reward, done, {}
def _compute_reward(self):
"""奖励函数"""
reward = np.zeros(self.n_agents)
done = np.zeros(self.n_agents, dtype=bool)
for i, name in enumerate(self.drone_names):
state = self.client.getMultirotorState(name)
pos = np.array([state.kinematics_estimated.position.x_val,
state.kinematics_estimated.position.y_val,
state.kinematics_estimated.position.z_val])
dist_to_goal = np.linalg.norm(pos - self.goals[name])
reward[i] += 1.0 / (dist_to_goal + 1.0)
# 冲突惩罚
for j, other_name in enumerate(self.drone_names):
if i >= j:
continue
other_state = self.client.getMultirotorState(other_name)
other_pos = np.array([other_state.kinematics_estimated.position.x_val,
other_state.kinematics_estimated.position.y_val,
other_state.kinematics_estimated.position.z_val])
dist = np.linalg.norm(pos - other_pos)
if dist < 5.0: # 5m 安全距离
reward[i] -= 100
done[i] = True
return reward, done.any()
2.3 Gazebo / RotorS: die Standardwahl im ROS-Ökosystem
RotorS der ETH Zürich ist das umfassendste Multi-UAV-Simulationspaket im ROS/Gazebo-Ökosystem:
Foehn, P., et al. (2020). ETHZ Flying Machine Arena Datensatz. ETH Zürich.
RotorS unterstützt:
- mav_msgs-Schnittstelle (Spin + Pitch + Roll + Schub → Motor-PWM)
- PX4 SITL/HITL: komplette Flugsteuerungs-Firmware in der Schleife
- RViz-Visualisierung: Beobachten Sie den Clusterstatus in Echtzeit
- Gazebo Physics Engine: ODE / Bullet / Simbody optional
# 安装 RotorS(Ubuntu 20.04 + ROS Noetic)
sudo apt-get install ros-noetic-rotors-description ros-noetic-rotors-gazebo
source /opt/ros/noetic/setup.bash
# 启动 8 架 UAV 集群场景
roslaunch rotors_gazebo mav_tecs_controller.launch \
mav_name:=hummingbird \
world_name:=forest \
n_uavs:=8
2.4 Benutzerdefinierte Fitnessstudio-Umgebung: die gängige Wahl für die akademische RL-Forschung
Die überwiegende Mehrheit der MARL-UAV-Papiere (einschließlich der ursprünglichen Implementierungen von MADDPG/QMIX/MAPPO) verwendet aus drei Gründen eine vollständig benutzerdefinierte Gym-ähnliche Umgebung:
- Trainingsgeschwindigkeit: Kein Grafik-Rendering, Python-Direktschleife, Zehntausende Interaktionen können pro Sekunde abgeschlossen werden
- Steuerbare Belohnungsfunktion: Iterieren Sie das Design schnell, ohne den C++-Code zu ändern
- Reproduzierbarkeit: Mit einem festen Zufallsstartwert sind die Ergebnisse vollständig reproduzierbar.
import gym
from gym import spaces
import numpy as np
class UAVConflictGym(gym.Env):
"""
标准 Gym 风格 UAV 冲突环境
兼容 Stable-Baselines3 / rllib / PyMARL
"""
metadata = {'render_modes': ['human', 'rgb_array']}
def __init__(
self,
n_agents=8,
area_size=200.0, # 仿真区域边长 (m)
safe_radius=5.0, # 安全隔离距离 (m)
max_speed=20.0, # 最大速度 (m/s)
dt=0.1, # 仿真步长 (s)
max_steps=500, # 最大 episode 步数
seed=None
):
super().__init__()
self.n_agents = n_agents
self.area_size = area_size
self.safe_radius = safe_radius
self.max_speed = max_speed
self.dt = dt
self.max_steps = max_steps
# 观测空间:(位置3 + 速度3 + 目标3 + 自身id编码 + 邻居信息)
self.obs_dim = 3 + 3 + 3 + n_agents + 3 * (n_agents - 1)
self.action_dim = 3 # Δvx, Δvy, Δvz
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.obs_dim,), dtype=np.float32
)
self.action_space = spaces.Box(
low=-1, high=1,
shape=(self.action_dim,), dtype=np.float32
)
# 多智能体环境的 each-agent 空间
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.n_agents, self.obs_dim), dtype=np.float32
)
if seed is not None:
self.seed(seed)
def seed(self, seed):
np.random.seed(seed)
def reset(self):
"""初始化 UAV 状态"""
# 位置:均匀分布在区域内
self.positions = np.random.uniform(
-self.area_size / 2, self.area_size / 2,
size=(self.n_agents, 3)
).astype(np.float32)
# 速度:随机方向,5-15 m/s
speeds = np.random.uniform(5, 15, size=(self.n_agents,))
directions = np.random.randn(self.n_agents, 3)
directions = directions / np.linalg.norm(directions, axis=1, keepdims=True)
self.velocities = speeds[:, None] * directions
# 目标点
self.goals = np.random.uniform(
-self.area_size / 2, self.area_size / 2,
size=(self.n_agents, 3)
).astype(np.float32)
# 确保起点和终点不重叠(避免初始冲突过于密集)
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
dist = np.linalg.norm(self.positions[i] - self.positions[j])
if dist < self.safe_radius * 2:
# 重新生成
self.positions[j] = np.random.uniform(
-self.area_size / 2, self.area_size / 2, size=3
).astype(np.float32)
self.step_count = 0
self.prev_distances = self._compute_distances()
return self._get_obs()
def step(self, actions):
"""
actions: (n_agents, 3) 速度增量
"""
actions = np.clip(actions, -1, 1) * self.max_speed * self.dt
# 更新速度
new_velocities = self.velocities + actions
speed = np.linalg.norm(new_velocities, axis=1, keepdims=True)
new_velocities = np.clip(new_velocities, -self.max_speed, self.max_speed)
new_velocities = new_velocities * np.clip(speed / self.max_speed, 0, 1)
new_velocities = new_velocities / (np.linalg.norm(new_velocities, axis=1, keepdims=True) + 1e-8) * np.clip(speed, 0, self.max_speed)
self.velocities = new_velocities
# 更新位置
self.positions += self.velocities * self.dt
# 边界处理:软边界反射
for i in range(self.n_agents):
for dim in range(3):
half = self.area_size / 2
if abs(self.positions[i, dim]) > half:
self.positions[i, dim] = np.clip(
self.positions[i, dim], -half, half
)
self.velocities[i, dim] *= -0.8 # 反射 + 能量损失
# 奖励计算
reward, info = self._compute_reward()
# 冲突检测
collision = self._check_collision()
done = collision or self.step_count >= self.max_steps
self.step_count += 1
return self._get_obs(), reward, np.full(self.n_agents, done), info
def _compute_distances(self):
"""计算所有 UAV 对之间的距离矩阵"""
dist_matrix = np.zeros((self.n_agents, self.n_agents))
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
d = np.linalg.norm(self.positions[i] - self.positions[j])
dist_matrix[i, j] = dist_matrix[j, i] = d
return dist_matrix
def _compute_reward(self):
"""分项奖励函数"""
reward = np.zeros(self.n_agents)
info = {'collision': False}
# 1. 任务进度奖励(到目标距离减少)
dist_to_goal = np.linalg.norm(self.positions - self.goals, axis=1)
prev_dist = self.prev_distances.copy()
np.fill_diagonal(prev_dist, np.inf)
# 进度:到目标距离减少得正奖励
for i in range(self.n_agents):
progress = self.prev_distances[i, i] - dist_to_goal[i]
reward[i] += 10.0 * progress # 缩放系数
# 到达目标
if dist_to_goal[i] < 2.0:
reward[i] += 50.0
# 2. 安全奖励(维持安全距离)
collision_any = False
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
d = np.linalg.norm(self.positions[i] - self.positions[j])
if d < self.safe_radius:
# 碰撞!
reward[i] -= 200
reward[j] -= 200
collision_any = True
elif d < self.safe_radius * 2:
# 接近告警
close_factor = (self.safe_radius * 2 - d) / self.safe_radius
reward[i] -= 5.0 * close_factor
reward[j] -= 5.0 * close_factor
info['collision'] = collision_any
# 3. 能量惩罚(避免无谓机动)
action_penalty = -0.01 * np.sum(np.square(self.velocities - self.prev_velocities))
reward += action_penalty
# 4. 速度维持奖励(偏好稳定飞行)
for i in range(self.n_agents):
speed = np.linalg.norm(self.velocities[i])
if self.max_speed * 0.3 < speed < self.max_speed * 0.8:
reward[i] += 0.5
self.prev_distances = self._compute_distances()
return reward, info
def _check_collision(self):
"""检测是否有碰撞"""
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
if np.linalg.norm(self.positions[i] - self.positions[j]) < self.safe_radius:
return True
return False
def _get_obs(self):
"""构建各 UAV 的观测向量"""
obs = []
for i in range(self.n_agents):
# 自身状态:位置 + 速度
self_state = np.concatenate([self.positions[i], self.velocities[i]])
# 目标信息:方向 + 距离
direction_to_goal = self.goals[i] - self.positions[i]
dist_to_goal = np.linalg.norm(direction_to_goal)
direction_to_goal_norm = direction_to_goal / (dist_to_goal + 1e-6)
goal_info = np.concatenate([direction_to_goal_norm, np.array([dist_to_goal])])
# 自身 ID 编码(one-hot)
id_encoding = np.zeros(self.n_agents)
id_encoding[i] = 1.0
# 邻居信息(相对位置 + 相对速度)
neighbor_info = []
for j in range(self.n_agents):
if i != j:
rel_pos = self.positions[j] - self.positions[i]
rel_vel = self.velocities[j] - self.velocities[i]
neighbor_info.extend(rel_pos.tolist() + rel_vel.tolist())
neighbor_info = np.array(neighbor_info, dtype=np.float32)
# 拼接
full_obs = np.concatenate([
self_state, # 6
goal_info, # 4
id_encoding, # n_agents
neighbor_info # (n_agents-1)*6
])
obs.append(full_obs)
return np.array(obs, dtype=np.float32)
def render(self, mode='human'):
"""简单 2D 可视化"""
import matplotlib.pyplot as plt
plt.clf()
plt.xlim(-self.area_size/2, self.area_size/2)
plt.ylim(-self.area_size/2, self.area_size/2)
# 绘制 UAV 和目标
for i in range(self.n_agents):
# 位置
plt.scatter(self.positions[i, 0], self.positions[i, 1],
color='blue', s=100, zorder=5)
# 速度向量
plt.arrow(self.positions[i, 0], self.positions[i, 1],
self.velocities[i, 0], self.velocities[i, 1],
head_width=2, color='blue', alpha=0.5)
# 目标点
plt.scatter(self.goals[i, 0], self.goals[i, 1],
color='green', s=50, marker='x')
# 连线
plt.plot([self.positions[i, 0], self.goals[i, 0]],
[self.positions[i, 1], self.goals[i, 1]],
'g--', alpha=0.2)
plt.pause(0.01)
plt.ioff()
3. Kernelemente des Umweltdesigns
3.1 Dynamisches Modell: vom Teilchenmodell zu sechs Freiheitsgraden
Stufe 1 – Punktmasse Die einfachste Version, die nur Positions- und Geschwindigkeitsaktualisierungen berücksichtigt, wird zur Algorithmusüberprüfung und zum schnellen Training verwendet:
\mathbf{a}{new} = \text{clip}(\mathbf{a}{old} + \mathbf{u}, \mathbf{v}{min}, \mathbf{v}{max})
\mathbf{v}_i[k+1] = \mathbf{u}_i[k]
\mathbf{v}_i[k+1] = \mathbf{v}_i[k] + \mathbf{a}_i[k] \Updelta t
\mathbf{p}_i[k+1] = \mathbf{p}_i[k] + \mathbf{v}_i[k] \Updelta t + \frac{1}{2}\mathbf{a}_i[k]\Updelta t^2
|\mathbf{a}i[k]| \leq a{max}, \quad |\mathbf{v}i[k]| \leq v{max}
\dot{\boldsymbol{\eta}} = \mathbf{R}(\boldsymbol{\Theta})\boldsymbol{\omega}
m\ddot{\mathbf{p}} = -mg\mathbf{e}_3 + \mathbf{R}(\boldsymbol{\Theta})\mathbf{T}_i\mathbf{e}_3 - \mathbf{D}(\dot{\mathbf{p}})
c_{ij} = \begin{cases} 1 & \text{if } | \mathbf{p}_i - \mathbf{p}j | < d{safe} \ 0 & \text{andernfalls} \end{Fälle}
P_{Konflikt} = P\left( | \mathbf{p}i - \mathbf{p}j | < d{safe} \right) = \int{| \mathbf{d} | < d_{safe}} f_{\Delta \mathbf{p}}(\mathbf{d}) , d\mathbf{d}
TTC = \min_{t > 0} { t \mid | \mathbf{p}_i(t) - \mathbf{p}j(t) | = d{sicher} }
\hat{r}i = r_i - \lambda \cdot \text{Var}({r_j}{j \in \mathcal{N}_i})
x_i = R\cos(\theta_i), \quad y_i = R\sin(\theta_i)
\theta_i = \theta_0 + \omega t + i \cdot \Delta\theta
\mathbf{p}_i(0) = R \cdot [\cos(\phi_i), \sin(\phi_i)], \quad \phi_i = \frac{2\pi i}{N}