UAV path conflict simulation environment construction: from paper practice to code implementation
In the first two articles, we sorted out UAV conflict resolution algorithm panorama and MARL+GAT end-to-end solution respectively. This article will answer a lower-level question: **How to build a realistic and reproducible UAV conflict scenario in a simulation environment? ** This directly determines the credibility of the algorithm evaluation and the comparability of the experiment.
1. Why is the construction of simulation environment so critical?
The simulation environment is a bridge connecting algorithm theory and flight measurement. A well-designed simulation environment needs to meet three dimensions:
| Dimensions | Meaning | FAQ |
|---|---|---|
| Authenticity | The gap between simulation and reality | Oversimplification of dynamics and lack of sensor models |
| Reproducibility | Consistent results under the same seed | Uncontrolled random initialization, accumulation of floating point errors |
| Scalability | Support large-scale clusters | Single-machine simulation cannot be parallelized and communication topology is missing |
This article focuses on the construction of the simulation environment under the MARL training scenario, taking into account both authenticity and scalability - after all, it is unrealistic to train real flight control on 50+ drones.
2. Comparison of mainstream simulation platforms
2.1 Platform Overview| Platform | Underlying engine | Physical authenticity | Multi-machine support | MARL compatibility | Paper citations |
|------|----------|-----------|----------|------------|-----------| | AirSim (Microsoft) | Unreal Engine | ⭐⭐⭐⭐⭐ | ✅ | ⚠️ Requires Wrapping | High | | FlightGoggles (MIT) | Unity | ⭐⭐⭐⭐ | ✅ | ⚠️ Requires Wrapping | High | | RotorS (ETH Zurich) | Gazebo/ROS | ⭐⭐⭐⭐ | ✅ | ✅ PyMARL Compatible | Medium | | Morse (LAAS-CNRS) | Blender Game Engine | ⭐⭐⭐ | ✅ | ✅ Native | Medium | | Webots | Self-developed | ⭐⭐⭐⭐ | ✅ | ✅ | Low | | Custom 2D/3D | N/A | ⭐⭐ | ✅✅✅ | ✅✅✅ | - |
Core conclusion:
- Academic Benchmarks: RotorS + ROS/Gazebo is the mainstream choice (produced by ETH Zurich, compatible with PX4)
- End-to-end RL research: Most papers use Custom Gym environment (fully customized, easy to change the reward function)
- Sim-to-Real Target: AirSim/FlightGoggles (High-Fidelity Vision + Sensor Noise)
2.2 AirSim: Scaling Experience from Zero to One Hundred
The Microsoft AI for Earth team trained swarm coordination of up to 100 fixed-wing UAV on AirSim:
Shah, S., et al. (2018). AirSim: High-Fidelity Visual and Physical Simulation for Autonomous Vehicles. Field and Service Robotics.Key advantages of AirSim:
- Sensor Simulation: full simulation of camera, LiDAR, GPS, IMU, noise can be injected
- Weather System: The impact of wind speed, clouds, and light on the sensor
- API Unification: Support C++ / Python / ROS interface
import airsim
import numpy as np
class AirSimUAVEnv:
"""
AirSim 多 UAV 冲突环境封装
"""
def __init__(self, n_agents=8, drone_names=None):
self.client = airsim.MultirotorClient()
self.client.confirmConnection()
self.client.enableApiControl(True)
if drone_names is None:
self.drone_names = [f"Drone_{i}" for i in range(n_agents)]
else:
self.drone_names = drone_names
self.n_agents = len(self.drone_names)
# 为每架 UAV 解锁并设置为被动模式(由 RL 控制)
for name in self.drone_names:
self.client.enableApiControl(True, name)
self.client.armDisarm(True, name)
def reset(self):
"""随机初始化各 UAV 位置和目标"""
self.goals = {}
for name in self.drone_names:
# 随机起点
start_pos = np.random.uniform(-50, 50, size=3)
start_pos[2] = np.random.uniform(10, 30) # 高度 10-30m
# 随机终点
goal_pos = np.random.uniform(-50, 50, size=3)
goal_pos[2] = np.random.uniform(10, 30)
self.client.simSetVehiclePose(
airsim.Vector3R(*start_pos),
airsim.ToQuaternionr(0, 0, 0),
vehicle_name=name
)
self.goals[name] = goal_pos
return self._get_obs()
def _get_obs(self):
"""获取各 UAV 的本地观测"""
obs = []
for name in self.drone_names:
state = self.client.getMultirotorState(name)
pos = state.kinematics_estimated.position
vel = state.kinematics_estimated.linear_velocity
obs.append(np.array([pos.x_val, pos.y_val, pos.z_val,
vel.x_val, vel.y_val, vel.z_val]))
return np.array(obs)
def step(self, actions):
"""执行动作:vx, vy, vz 速度指令"""
for i, name in enumerate(self.drone_names):
self.client.moveByVelocityAsync(
actions[i, 0], actions[i, 1], actions[i, 2],
duration=0.1, vehicle_name=name
)
# 等待执行完成(AirSim 为异步 API)
self.client.simPause(True)
airsim.multirotorclient()
self.client.simContinueForTime(0.1)
self.client.simPause(False)
obs = self._get_obs()
reward, done = self._compute_reward()
return obs, reward, done, {}
def _compute_reward(self):
"""奖励函数"""
reward = np.zeros(self.n_agents)
done = np.zeros(self.n_agents, dtype=bool)
for i, name in enumerate(self.drone_names):
state = self.client.getMultirotorState(name)
pos = np.array([state.kinematics_estimated.position.x_val,
state.kinematics_estimated.position.y_val,
state.kinematics_estimated.position.z_val])
dist_to_goal = np.linalg.norm(pos - self.goals[name])
reward[i] += 1.0 / (dist_to_goal + 1.0)
# 冲突惩罚
for j, other_name in enumerate(self.drone_names):
if i >= j:
continue
other_state = self.client.getMultirotorState(other_name)
other_pos = np.array([other_state.kinematics_estimated.position.x_val,
other_state.kinematics_estimated.position.y_val,
other_state.kinematics_estimated.position.z_val])
dist = np.linalg.norm(pos - other_pos)
if dist < 5.0: # 5m 安全距离
reward[i] -= 100
done[i] = True
return reward, done.any()
2.3 Gazebo / RotorS: the standard choice in the ROS ecosystem
ETH Zurich’s RotorS is the most complete multi-UAV simulation package in the ROS/Gazebo ecosystem:
Foehn, P., et al. (2020). ETHZ Flying Machine Arena Dataset. ETH Zurich.
RotorS supports:
- mav_msgs interface (spin + pitch + roll + thrust → motor PWM)
- PX4 SITL/HITL: complete flight control firmware in the loop
- RViz Visualization: Observe cluster status in real time
- Gazebo Physics Engine: ODE / Bullet / Simbody optional
# 安装 RotorS(Ubuntu 20.04 + ROS Noetic)
sudo apt-get install ros-noetic-rotors-description ros-noetic-rotors-gazebo
source /opt/ros/noetic/setup.bash
# 启动 8 架 UAV 集群场景
roslaunch rotors_gazebo mav_tecs_controller.launch \
mav_name:=hummingbird \
world_name:=forest \
n_uavs:=8
2.4 Custom Gym environment: the mainstream choice for academic RL research
The vast majority of MARL UAV papers (including the original implementations of MADDPG/QMIX/MAPPO) use a completely custom Gym-style environment for three reasons:
- Training speed: No graphics rendering, Python direct loop, tens of thousands of interactions can be completed per second
- Controllable reward function: Quickly iterate the design without changing the C++ code
- Reproducibility: With a fixed random seed, the results are completely reproducible.
import gym
from gym import spaces
import numpy as np
class UAVConflictGym(gym.Env):
"""
标准 Gym 风格 UAV 冲突环境
兼容 Stable-Baselines3 / rllib / PyMARL
"""
metadata = {'render_modes': ['human', 'rgb_array']}
def __init__(
self,
n_agents=8,
area_size=200.0, # 仿真区域边长 (m)
safe_radius=5.0, # 安全隔离距离 (m)
max_speed=20.0, # 最大速度 (m/s)
dt=0.1, # 仿真步长 (s)
max_steps=500, # 最大 episode 步数
seed=None
):
super().__init__()
self.n_agents = n_agents
self.area_size = area_size
self.safe_radius = safe_radius
self.max_speed = max_speed
self.dt = dt
self.max_steps = max_steps
# 观测空间:(位置3 + 速度3 + 目标3 + 自身id编码 + 邻居信息)
self.obs_dim = 3 + 3 + 3 + n_agents + 3 * (n_agents - 1)
self.action_dim = 3 # Δvx, Δvy, Δvz
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.obs_dim,), dtype=np.float32
)
self.action_space = spaces.Box(
low=-1, high=1,
shape=(self.action_dim,), dtype=np.float32
)
# 多智能体环境的 each-agent 空间
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.n_agents, self.obs_dim), dtype=np.float32
)
if seed is not None:
self.seed(seed)
def seed(self, seed):
np.random.seed(seed)
def reset(self):
"""初始化 UAV 状态"""
# 位置:均匀分布在区域内
self.positions = np.random.uniform(
-self.area_size / 2, self.area_size / 2,
size=(self.n_agents, 3)
).astype(np.float32)
# 速度:随机方向,5-15 m/s
speeds = np.random.uniform(5, 15, size=(self.n_agents,))
directions = np.random.randn(self.n_agents, 3)
directions = directions / np.linalg.norm(directions, axis=1, keepdims=True)
self.velocities = speeds[:, None] * directions
# 目标点
self.goals = np.random.uniform(
-self.area_size / 2, self.area_size / 2,
size=(self.n_agents, 3)
).astype(np.float32)
# 确保起点和终点不重叠(避免初始冲突过于密集)
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
dist = np.linalg.norm(self.positions[i] - self.positions[j])
if dist < self.safe_radius * 2:
# 重新生成
self.positions[j] = np.random.uniform(
-self.area_size / 2, self.area_size / 2, size=3
).astype(np.float32)
self.step_count = 0
self.prev_distances = self._compute_distances()
return self._get_obs()
def step(self, actions):
"""
actions: (n_agents, 3) 速度增量
"""
actions = np.clip(actions, -1, 1) * self.max_speed * self.dt
# 更新速度
new_velocities = self.velocities + actions
speed = np.linalg.norm(new_velocities, axis=1, keepdims=True)
new_velocities = np.clip(new_velocities, -self.max_speed, self.max_speed)
new_velocities = new_velocities * np.clip(speed / self.max_speed, 0, 1)
new_velocities = new_velocities / (np.linalg.norm(new_velocities, axis=1, keepdims=True) + 1e-8) * np.clip(speed, 0, self.max_speed)
self.velocities = new_velocities
# 更新位置
self.positions += self.velocities * self.dt
# 边界处理:软边界反射
for i in range(self.n_agents):
for dim in range(3):
half = self.area_size / 2
if abs(self.positions[i, dim]) > half:
self.positions[i, dim] = np.clip(
self.positions[i, dim], -half, half
)
self.velocities[i, dim] *= -0.8 # 反射 + 能量损失
# 奖励计算
reward, info = self._compute_reward()
# 冲突检测
collision = self._check_collision()
done = collision or self.step_count >= self.max_steps
self.step_count += 1
return self._get_obs(), reward, np.full(self.n_agents, done), info
def _compute_distances(self):
"""计算所有 UAV 对之间的距离矩阵"""
dist_matrix = np.zeros((self.n_agents, self.n_agents))
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
d = np.linalg.norm(self.positions[i] - self.positions[j])
dist_matrix[i, j] = dist_matrix[j, i] = d
return dist_matrix
def _compute_reward(self):
"""分项奖励函数"""
reward = np.zeros(self.n_agents)
info = {'collision': False}
# 1. 任务进度奖励(到目标距离减少)
dist_to_goal = np.linalg.norm(self.positions - self.goals, axis=1)
prev_dist = self.prev_distances.copy()
np.fill_diagonal(prev_dist, np.inf)
# 进度:到目标距离减少得正奖励
for i in range(self.n_agents):
progress = self.prev_distances[i, i] - dist_to_goal[i]
reward[i] += 10.0 * progress # 缩放系数
# 到达目标
if dist_to_goal[i] < 2.0:
reward[i] += 50.0
# 2. 安全奖励(维持安全距离)
collision_any = False
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
d = np.linalg.norm(self.positions[i] - self.positions[j])
if d < self.safe_radius:
# 碰撞!
reward[i] -= 200
reward[j] -= 200
collision_any = True
elif d < self.safe_radius * 2:
# 接近告警
close_factor = (self.safe_radius * 2 - d) / self.safe_radius
reward[i] -= 5.0 * close_factor
reward[j] -= 5.0 * close_factor
info['collision'] = collision_any
# 3. 能量惩罚(避免无谓机动)
action_penalty = -0.01 * np.sum(np.square(self.velocities - self.prev_velocities))
reward += action_penalty
# 4. 速度维持奖励(偏好稳定飞行)
for i in range(self.n_agents):
speed = np.linalg.norm(self.velocities[i])
if self.max_speed * 0.3 < speed < self.max_speed * 0.8:
reward[i] += 0.5
self.prev_distances = self._compute_distances()
return reward, info
def _check_collision(self):
"""检测是否有碰撞"""
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
if np.linalg.norm(self.positions[i] - self.positions[j]) < self.safe_radius:
return True
return False
def _get_obs(self):
"""构建各 UAV 的观测向量"""
obs = []
for i in range(self.n_agents):
# 自身状态:位置 + 速度
self_state = np.concatenate([self.positions[i], self.velocities[i]])
# 目标信息:方向 + 距离
direction_to_goal = self.goals[i] - self.positions[i]
dist_to_goal = np.linalg.norm(direction_to_goal)
direction_to_goal_norm = direction_to_goal / (dist_to_goal + 1e-6)
goal_info = np.concatenate([direction_to_goal_norm, np.array([dist_to_goal])])
# 自身 ID 编码(one-hot)
id_encoding = np.zeros(self.n_agents)
id_encoding[i] = 1.0
# 邻居信息(相对位置 + 相对速度)
neighbor_info = []
for j in range(self.n_agents):
if i != j:
rel_pos = self.positions[j] - self.positions[i]
rel_vel = self.velocities[j] - self.velocities[i]
neighbor_info.extend(rel_pos.tolist() + rel_vel.tolist())
neighbor_info = np.array(neighbor_info, dtype=np.float32)
# 拼接
full_obs = np.concatenate([
self_state, # 6
goal_info, # 4
id_encoding, # n_agents
neighbor_info # (n_agents-1)*6
])
obs.append(full_obs)
return np.array(obs, dtype=np.float32)
def render(self, mode='human'):
"""简单 2D 可视化"""
import matplotlib.pyplot as plt
plt.clf()
plt.xlim(-self.area_size/2, self.area_size/2)
plt.ylim(-self.area_size/2, self.area_size/2)
# 绘制 UAV 和目标
for i in range(self.n_agents):
# 位置
plt.scatter(self.positions[i, 0], self.positions[i, 1],
color='blue', s=100, zorder=5)
# 速度向量
plt.arrow(self.positions[i, 0], self.positions[i, 1],
self.velocities[i, 0], self.velocities[i, 1],
head_width=2, color='blue', alpha=0.5)
# 目标点
plt.scatter(self.goals[i, 0], self.goals[i, 1],
color='green', s=50, marker='x')
# 连线
plt.plot([self.positions[i, 0], self.goals[i, 0]],
[self.positions[i, 1], self.goals[i, 1]],
'g--', alpha=0.2)
plt.pause(0.01)
plt.ioff()
3. Core elements of environmental design
3.1 Dynamic model: from particle model to six degrees of freedom
Level 1 — Point Mass The most simplified version, which only considers position and velocity updates, is used for algorithm verification and fast training:
\mathbf{a}{new} = \text{clip}(\mathbf{a}{old} + \mathbf{u}, \mathbf{v}{min}, \mathbf{v}{max})
\mathbf{v}_i[k+1] = \mathbf{u}_i[k]
\mathbf{v}_i[k+1] = \mathbf{v}_i[k] + \mathbf{a}_i[k] \Delta t
\mathbf{p}_i[k+1] = \mathbf{p}_i[k] + \mathbf{v}_i[k] \Delta t + \frac{1}{2}\mathbf{a}_i[k]\Delta t^2
|\mathbf{a}i[k]| \leq a{max}, \quad |\mathbf{v}i[k]| \leq v{max}
\dot{\boldsymbol{\eta}} = \mathbf{R}(\boldsymbol{\Theta})\boldsymbol{\omega}
m\ddot{\mathbf{p}} = -mg\mathbf{e}_3 + \mathbf{R}(\boldsymbol{\Theta})\mathbf{T}_i\mathbf{e}_3 - \mathbf{D}(\dot{\mathbf{p}})
c_{ij} = \begin{cases} 1 & \text{if } | \mathbf{p}_i - \mathbf{p}j | < d{safe} \ 0 & \text{otherwise} \end{cases}
P_{conflict} = P\left( | \mathbf{p}i - \mathbf{p}j | < d{safe} \right) = \int{| \mathbf{d} | < d_{safe}} f_{\Delta \mathbf{p}}(\mathbf{d}) , d\mathbf{d}
TTC = \min_{t > 0} { t \mid | \mathbf{p}_i(t) - \mathbf{p}j(t) | = d{safe} }
\hat{r}i = r_i - \lambda \cdot \text{Var}({r_j}{j \in \mathcal{N}_i})
x_i = R\cos(\theta_i), \quad y_i = R\sin(\theta_i)
\theta_i = \theta_0 + \omega t + i \cdot \Delta\theta
\mathbf{p}_i(0) = R \cdot [\cos(\phi_i), \sin(\phi_i)], \quad \phi_i = \frac{2\pi i}{N}