Construction of UAV path conflict simulation environment: from paper practice to code implementation

Systematically review the construction method of UAV multi-aircraft conflict scenario simulation environment, covering mainstream simulation platform comparison, dynamics modeling, state space design, conflict definition, reward function construction and benchmark testing scenarios, with complete examples of Gym/Gazebo/AirSim multi-framework

UAV path conflict simulation environment construction: from paper practice to code implementation

In the first two articles, we sorted out UAV conflict resolution algorithm panorama and MARL+GAT end-to-end solution respectively. This article will answer a lower-level question: **How ​​to build a realistic and reproducible UAV conflict scenario in a simulation environment? ** This directly determines the credibility of the algorithm evaluation and the comparability of the experiment.


1. Why is the construction of simulation environment so critical?

The simulation environment is a bridge connecting algorithm theory and flight measurement. A well-designed simulation environment needs to meet three dimensions:

DimensionsMeaningFAQ
AuthenticityThe gap between simulation and realityOversimplification of dynamics and lack of sensor models
ReproducibilityConsistent results under the same seedUncontrolled random initialization, accumulation of floating point errors
ScalabilitySupport large-scale clustersSingle-machine simulation cannot be parallelized and communication topology is missing

This article focuses on the construction of the simulation environment under the MARL training scenario, taking into account both authenticity and scalability - after all, it is unrealistic to train real flight control on 50+ drones.


2. Comparison of mainstream simulation platforms

2.1 Platform Overview| Platform | Underlying engine | Physical authenticity | Multi-machine support | MARL compatibility | Paper citations |

|------|----------|-----------|----------|------------|-----------| | AirSim (Microsoft) | Unreal Engine | ⭐⭐⭐⭐⭐ | ✅ | ⚠️ Requires Wrapping | High | | FlightGoggles (MIT) | Unity | ⭐⭐⭐⭐ | ✅ | ⚠️ Requires Wrapping | High | | RotorS (ETH Zurich) | Gazebo/ROS | ⭐⭐⭐⭐ | ✅ | ✅ PyMARL Compatible | Medium | | Morse (LAAS-CNRS) | Blender Game Engine | ⭐⭐⭐ | ✅ | ✅ Native | Medium | | Webots | Self-developed | ⭐⭐⭐⭐ | ✅ | ✅ | Low | | Custom 2D/3D | N/A | ⭐⭐ | ✅✅✅ | ✅✅✅ | - |

Core conclusion:

2.2 AirSim: Scaling Experience from Zero to One Hundred

The Microsoft AI for Earth team trained swarm coordination of up to 100 fixed-wing UAV on AirSim:

Shah, S., et al. (2018). AirSim: High-Fidelity Visual and Physical Simulation for Autonomous Vehicles. Field and Service Robotics.Key advantages of AirSim:

import airsim
import numpy as np

class AirSimUAVEnv:
    """
    AirSim 多 UAV 冲突环境封装
    """
    def __init__(self, n_agents=8, drone_names=None):
        self.client = airsim.MultirotorClient()
        self.client.confirmConnection()
        self.client.enableApiControl(True)
        
        if drone_names is None:
            self.drone_names = [f"Drone_{i}" for i in range(n_agents)]
        else:
            self.drone_names = drone_names
        
        self.n_agents = len(self.drone_names)
        
        # 为每架 UAV 解锁并设置为被动模式(由 RL 控制)
        for name in self.drone_names:
            self.client.enableApiControl(True, name)
            self.client.armDisarm(True, name)
    
    def reset(self):
        """随机初始化各 UAV 位置和目标"""
        self.goals = {}
        for name in self.drone_names:
            # 随机起点
            start_pos = np.random.uniform(-50, 50, size=3)
            start_pos[2] = np.random.uniform(10, 30)  # 高度 10-30m
            
            # 随机终点
            goal_pos = np.random.uniform(-50, 50, size=3)
            goal_pos[2] = np.random.uniform(10, 30)
            
            self.client.simSetVehiclePose(
                airsim.Vector3R(*start_pos),
                airsim.ToQuaternionr(0, 0, 0),
                vehicle_name=name
            )
            
            self.goals[name] = goal_pos
        
        return self._get_obs()
    
    def _get_obs(self):
        """获取各 UAV 的本地观测"""
        obs = []
        for name in self.drone_names:
            state = self.client.getMultirotorState(name)
            pos = state.kinematics_estimated.position
            vel = state.kinematics_estimated.linear_velocity
            
            obs.append(np.array([pos.x_val, pos.y_val, pos.z_val,
                                vel.x_val, vel.y_val, vel.z_val]))
        return np.array(obs)
    
    def step(self, actions):
        """执行动作:vx, vy, vz 速度指令"""
        for i, name in enumerate(self.drone_names):
            self.client.moveByVelocityAsync(
                actions[i, 0], actions[i, 1], actions[i, 2],
                duration=0.1, vehicle_name=name
            )
        
        # 等待执行完成(AirSim 为异步 API)
        self.client.simPause(True)
       airsim.multirotorclient()
        self.client.simContinueForTime(0.1)
        self.client.simPause(False)
        
        obs = self._get_obs()
        reward, done = self._compute_reward()
        
        return obs, reward, done, {}
    
    def _compute_reward(self):
        """奖励函数"""
        reward = np.zeros(self.n_agents)
        done = np.zeros(self.n_agents, dtype=bool)
        
        for i, name in enumerate(self.drone_names):
            state = self.client.getMultirotorState(name)
            pos = np.array([state.kinematics_estimated.position.x_val,
                           state.kinematics_estimated.position.y_val,
                           state.kinematics_estimated.position.z_val])
            
            dist_to_goal = np.linalg.norm(pos - self.goals[name])
            reward[i] += 1.0 / (dist_to_goal + 1.0)
            
            # 冲突惩罚
            for j, other_name in enumerate(self.drone_names):
                if i >= j:
                    continue
                other_state = self.client.getMultirotorState(other_name)
                other_pos = np.array([other_state.kinematics_estimated.position.x_val,
                                      other_state.kinematics_estimated.position.y_val,
                                      other_state.kinematics_estimated.position.z_val])
                
                dist = np.linalg.norm(pos - other_pos)
                if dist < 5.0:  # 5m 安全距离
                    reward[i] -= 100
                    done[i] = True
        
        return reward, done.any()

2.3 Gazebo / RotorS: the standard choice in the ROS ecosystem

ETH Zurich’s RotorS is the most complete multi-UAV simulation package in the ROS/Gazebo ecosystem:

Foehn, P., et al. (2020). ETHZ Flying Machine Arena Dataset. ETH Zurich.

RotorS supports:

# 安装 RotorS(Ubuntu 20.04 + ROS Noetic)
sudo apt-get install ros-noetic-rotors-description ros-noetic-rotors-gazebo
source /opt/ros/noetic/setup.bash

# 启动 8 架 UAV 集群场景
roslaunch rotors_gazebo mav_tecs_controller.launch \
    mav_name:=hummingbird \
    world_name:=forest \
    n_uavs:=8

2.4 Custom Gym environment: the mainstream choice for academic RL research

The vast majority of MARL UAV papers (including the original implementations of MADDPG/QMIX/MAPPO) use a completely custom Gym-style environment for three reasons:

  1. Training speed: No graphics rendering, Python direct loop, tens of thousands of interactions can be completed per second
  2. Controllable reward function: Quickly iterate the design without changing the C++ code
  3. Reproducibility: With a fixed random seed, the results are completely reproducible.
import gym
from gym import spaces
import numpy as np

class UAVConflictGym(gym.Env):
    """
    标准 Gym 风格 UAV 冲突环境
    兼容 Stable-Baselines3 / rllib / PyMARL
    """
    metadata = {'render_modes': ['human', 'rgb_array']}
    
    def __init__(
        self,
        n_agents=8,
        area_size=200.0,     # 仿真区域边长 (m)
        safe_radius=5.0,     # 安全隔离距离 (m)
        max_speed=20.0,      # 最大速度 (m/s)
        dt=0.1,              # 仿真步长 (s)
        max_steps=500,       # 最大 episode 步数
        seed=None
    ):
        super().__init__()
        
        self.n_agents = n_agents
        self.area_size = area_size
        self.safe_radius = safe_radius
        self.max_speed = max_speed
        self.dt = dt
        self.max_steps = max_steps
        
        # 观测空间:(位置3 + 速度3 + 目标3 + 自身id编码 + 邻居信息)
        self.obs_dim = 3 + 3 + 3 + n_agents + 3 * (n_agents - 1)
        self.action_dim = 3  # Δvx, Δvy, Δvz
        
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf,
            shape=(self.obs_dim,), dtype=np.float32
        )
        self.action_space = spaces.Box(
            low=-1, high=1,
            shape=(self.action_dim,), dtype=np.float32
        )
        
        # 多智能体环境的 each-agent 空间
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf,
            shape=(self.n_agents, self.obs_dim), dtype=np.float32
        )
        
        if seed is not None:
            self.seed(seed)
    
    def seed(self, seed):
        np.random.seed(seed)
    
    def reset(self):
        """初始化 UAV 状态"""
        # 位置:均匀分布在区域内
        self.positions = np.random.uniform(
            -self.area_size / 2, self.area_size / 2,
            size=(self.n_agents, 3)
        ).astype(np.float32)
        
        # 速度:随机方向,5-15 m/s
        speeds = np.random.uniform(5, 15, size=(self.n_agents,))
        directions = np.random.randn(self.n_agents, 3)
        directions = directions / np.linalg.norm(directions, axis=1, keepdims=True)
        self.velocities = speeds[:, None] * directions
        
        # 目标点
        self.goals = np.random.uniform(
            -self.area_size / 2, self.area_size / 2,
            size=(self.n_agents, 3)
        ).astype(np.float32)
        
        # 确保起点和终点不重叠(避免初始冲突过于密集)
        for i in range(self.n_agents):
            for j in range(i + 1, self.n_agents):
                dist = np.linalg.norm(self.positions[i] - self.positions[j])
                if dist < self.safe_radius * 2:
                    # 重新生成
                    self.positions[j] = np.random.uniform(
                        -self.area_size / 2, self.area_size / 2, size=3
                    ).astype(np.float32)
        
        self.step_count = 0
        self.prev_distances = self._compute_distances()
        
        return self._get_obs()
    
    def step(self, actions):
        """
        actions: (n_agents, 3) 速度增量
        """
        actions = np.clip(actions, -1, 1) * self.max_speed * self.dt
        
        # 更新速度
        new_velocities = self.velocities + actions
        speed = np.linalg.norm(new_velocities, axis=1, keepdims=True)
        new_velocities = np.clip(new_velocities, -self.max_speed, self.max_speed)
        new_velocities = new_velocities * np.clip(speed / self.max_speed, 0, 1)
        new_velocities = new_velocities / (np.linalg.norm(new_velocities, axis=1, keepdims=True) + 1e-8) * np.clip(speed, 0, self.max_speed)
        self.velocities = new_velocities
        
        # 更新位置
        self.positions += self.velocities * self.dt
        
        # 边界处理:软边界反射
        for i in range(self.n_agents):
            for dim in range(3):
                half = self.area_size / 2
                if abs(self.positions[i, dim]) > half:
                    self.positions[i, dim] = np.clip(
                        self.positions[i, dim], -half, half
                    )
                    self.velocities[i, dim] *= -0.8  # 反射 + 能量损失
        
        # 奖励计算
        reward, info = self._compute_reward()
        
        # 冲突检测
        collision = self._check_collision()
        done = collision or self.step_count >= self.max_steps
        
        self.step_count += 1
        
        return self._get_obs(), reward, np.full(self.n_agents, done), info
    
    def _compute_distances(self):
        """计算所有 UAV 对之间的距离矩阵"""
        dist_matrix = np.zeros((self.n_agents, self.n_agents))
        for i in range(self.n_agents):
            for j in range(i + 1, self.n_agents):
                d = np.linalg.norm(self.positions[i] - self.positions[j])
                dist_matrix[i, j] = dist_matrix[j, i] = d
        return dist_matrix
    
    def _compute_reward(self):
        """分项奖励函数"""
        reward = np.zeros(self.n_agents)
        info = {'collision': False}
        
        # 1. 任务进度奖励(到目标距离减少)
        dist_to_goal = np.linalg.norm(self.positions - self.goals, axis=1)
        prev_dist = self.prev_distances.copy()
        np.fill_diagonal(prev_dist, np.inf)
        
        # 进度:到目标距离减少得正奖励
        for i in range(self.n_agents):
            progress = self.prev_distances[i, i] - dist_to_goal[i]
            reward[i] += 10.0 * progress  # 缩放系数
            # 到达目标
            if dist_to_goal[i] < 2.0:
                reward[i] += 50.0
        
        # 2. 安全奖励(维持安全距离)
        collision_any = False
        for i in range(self.n_agents):
            for j in range(i + 1, self.n_agents):
                d = np.linalg.norm(self.positions[i] - self.positions[j])
                
                if d < self.safe_radius:
                    # 碰撞!
                    reward[i] -= 200
                    reward[j] -= 200
                    collision_any = True
                elif d < self.safe_radius * 2:
                    # 接近告警
                    close_factor = (self.safe_radius * 2 - d) / self.safe_radius
                    reward[i] -= 5.0 * close_factor
                    reward[j] -= 5.0 * close_factor
        
        info['collision'] = collision_any
        
        # 3. 能量惩罚(避免无谓机动)
        action_penalty = -0.01 * np.sum(np.square(self.velocities - self.prev_velocities))
        reward += action_penalty
        
        # 4. 速度维持奖励(偏好稳定飞行)
        for i in range(self.n_agents):
            speed = np.linalg.norm(self.velocities[i])
            if self.max_speed * 0.3 < speed < self.max_speed * 0.8:
                reward[i] += 0.5
        
        self.prev_distances = self._compute_distances()
        
        return reward, info
    
    def _check_collision(self):
        """检测是否有碰撞"""
        for i in range(self.n_agents):
            for j in range(i + 1, self.n_agents):
                if np.linalg.norm(self.positions[i] - self.positions[j]) < self.safe_radius:
                    return True
        return False
    
    def _get_obs(self):
        """构建各 UAV 的观测向量"""
        obs = []
        for i in range(self.n_agents):
            # 自身状态:位置 + 速度
            self_state = np.concatenate([self.positions[i], self.velocities[i]])
            
            # 目标信息:方向 + 距离
            direction_to_goal = self.goals[i] - self.positions[i]
            dist_to_goal = np.linalg.norm(direction_to_goal)
            direction_to_goal_norm = direction_to_goal / (dist_to_goal + 1e-6)
            goal_info = np.concatenate([direction_to_goal_norm, np.array([dist_to_goal])])
            
            # 自身 ID 编码(one-hot)
            id_encoding = np.zeros(self.n_agents)
            id_encoding[i] = 1.0
            
            # 邻居信息(相对位置 + 相对速度)
            neighbor_info = []
            for j in range(self.n_agents):
                if i != j:
                    rel_pos = self.positions[j] - self.positions[i]
                    rel_vel = self.velocities[j] - self.velocities[i]
                    neighbor_info.extend(rel_pos.tolist() + rel_vel.tolist())
            
            neighbor_info = np.array(neighbor_info, dtype=np.float32)
            
            # 拼接
            full_obs = np.concatenate([
                self_state,       # 6
                goal_info,        # 4
                id_encoding,      # n_agents
                neighbor_info      # (n_agents-1)*6
            ])
            
            obs.append(full_obs)
        
        return np.array(obs, dtype=np.float32)
    
    def render(self, mode='human'):
        """简单 2D 可视化"""
        import matplotlib.pyplot as plt
        
        plt.clf()
        plt.xlim(-self.area_size/2, self.area_size/2)
        plt.ylim(-self.area_size/2, self.area_size/2)
        
        # 绘制 UAV 和目标
        for i in range(self.n_agents):
            # 位置
            plt.scatter(self.positions[i, 0], self.positions[i, 1],
                       color='blue', s=100, zorder=5)
            # 速度向量
            plt.arrow(self.positions[i, 0], self.positions[i, 1],
                     self.velocities[i, 0], self.velocities[i, 1],
                     head_width=2, color='blue', alpha=0.5)
            # 目标点
            plt.scatter(self.goals[i, 0], self.goals[i, 1],
                       color='green', s=50, marker='x')
            # 连线
            plt.plot([self.positions[i, 0], self.goals[i, 0]],
                    [self.positions[i, 1], self.goals[i, 1]],
                    'g--', alpha=0.2)
        
        plt.pause(0.01)
        plt.ioff()

3. Core elements of environmental design

3.1 Dynamic model: from particle model to six degrees of freedom

Level 1 — Point Mass The most simplified version, which only considers position and velocity updates, is used for algorithm verification and fast training:

\mathbf{a}{new} = \text{clip}(\mathbf{a}{old} + \mathbf{u}, \mathbf{v}{min}, \mathbf{v}{max})

\mathbf{v}_i[k+1] = \mathbf{u}_i[k]

\mathbf{v}_i[k+1] = \mathbf{v}_i[k] + \mathbf{a}_i[k] \Delta t

\mathbf{p}_i[k+1] = \mathbf{p}_i[k] + \mathbf{v}_i[k] \Delta t + \frac{1}{2}\mathbf{a}_i[k]\Delta t^2

|\mathbf{a}i[k]| \leq a{max}, \quad |\mathbf{v}i[k]| \leq v{max}

\dot{\boldsymbol{\eta}} = \mathbf{R}(\boldsymbol{\Theta})\boldsymbol{\omega}

m\ddot{\mathbf{p}} = -mg\mathbf{e}_3 + \mathbf{R}(\boldsymbol{\Theta})\mathbf{T}_i\mathbf{e}_3 - \mathbf{D}(\dot{\mathbf{p}})

You can't use 'macro parameter character #' in math mode Where $\boldsymbol{\eta}$ is the Euler angle, $\mathbf{R}$ is the rotation matrix, $\mathbf{T}_i$ is the $i$-th axis thrust.| Model | Calculation amount | Authenticity | Applicable stage | |------|--------|--------|---------| | Mass point | ⭐ | ⭐ | Algorithm quick verification | | Single integrator | ⭐⭐ | ⭐⭐ | RL training (mainstream choice) | | Two-way integrator | ⭐⭐ | ⭐⭐⭐ | Close to real flight | | 6-DOF | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | Sim-to-Real Pre-Verification | ### 3.2 State space design A good state space should contain all the information required for decision-making and have controllable dimensions: ```python def build_observation(agent_id, all_positions, all_velocities, goals, adj_matrix): """ 推荐的状态空间组成(维度可控) """ i = agent_id n = len(all_positions) # ✅ 必须包含: # 1. 自身状态 self_state = np.concatenate([all_positions[i], all_velocities[i]]) # 6维 # 2. 目标信息 direction = goals[i] - all_positions[i] dist = np.linalg.norm(direction) + 1e-6 goal_info = np.concatenate([direction / dist, [dist]]) # 4维 # 3. K近邻信息(节省维度,不用全连接) neighbor_k = 3 # 只看最近3架 rel_states = [] for j in range(n): if i != j: rel_states.append(( np.linalg.norm(all_positions[i] - all_positions[j]), j )) rel_states.sort(key=lambda x: x[0]) neighbor_info = [] for dist_j, j in rel_states[:neighbor_k]: rel_pos = all_positions[j] - all_positions[i] rel_vel = all_velocities[j] - all_velocities[i] neighbor_info.extend([dist_j] + list(rel_pos) + list(rel_vel)) # 7维/邻居 # padding while len(neighbor_info) < neighbor_k * 7: neighbor_info.extend([0.0] * 7) return np.concatenate([self_state, goal_info, neighbor_info]) ``` ### 3.3 Action space design | Action Types | Dimensions | Examples | Advantages | Disadvantages | |---------|------|------|------|------| | **Direct speed setting** | 2-3 | $[v_x, v_y]$ | Simple, RL friendly | Post-processing required to ensure $\|\mathbf{v}\| \leq v_{max}$ | | **Speed Increment** | 2-3 | $[\Delta v_x, \Delta v_y]$ | Incremental control is smoother | May exceed speed constraints | | **Heading angle + speed** | 2 | $[\psi, v]$ | In line with intuition | Polar coordinate transformation introduces singularity | | **Target position** | 2-3 | $[x_{target}, y_{target}]$ | Closest to human instructions | Requires path tracking backend | | **Thrust vector** | 4 | $[T_1,T_2,T_3,T_4]$ | The lowest level control | High dimension, RL is difficult to learn | **Mainstream choice (MARL paper)**: Velocity increment $(\Delta v_x, \Delta v_y, \Delta v_z)$, with $\tanh$ activation + clip limiting. ### 3.4 Conflict Definition and Measurement **Deterministic conflict (most commonly used)**:

c_{ij} = \begin{cases} 1 & \text{if } | \mathbf{p}_i - \mathbf{p}j | < d{safe} \ 0 & \text{otherwise} \end{cases}

P_{conflict} = P\left( | \mathbf{p}i - \mathbf{p}j | < d{safe} \right) = \int{| \mathbf{d} | < d_{safe}} f_{\Delta \mathbf{p}}(\mathbf{d}) , d\mathbf{d}

TTC = \min_{t > 0} { t \mid | \mathbf{p}_i(t) - \mathbf{p}j(t) | = d{safe} }

You can't use 'macro parameter character #' in math mode ### 3.5 Reward function design principles The reward function is the soul of RL environment design and directly determines the learned strategic behavior. #### Principle 1: Sub-item rewards > Single reward ```python def compute_reward(self, positions, velocities, goals, prev_distances): """ 分项奖励设计(推荐) """ r_total = np.zeros(self.n_agents) # 1. 任务奖励(正向引导) dist_to_goal = np.linalg.norm(positions - goals, axis=1) progress = prev_distances_to_goal - dist_to_goal r_task = 5.0 * progress # 2. 安全奖励(负向惩罚) r_safe = np.zeros(self.n_agents) for i in range(self.n_agents): for j in range(i+1, self.n_agents): d = np.linalg.norm(positions[i] - positions[j]) if d < self.safe_radius: r_safe[i] -= 100 r_safe[j] -= 100 elif d < self.safe_radius * 3: # 告警区 penalty = (self.safe_radius * 3 - d) / (self.safe_radius * 2) r_safe[i] -= 10 * penalty r_safe[j] -= 10 * penalty # 3. 效率奖励(偏好直线/短路径) r_efficiency = np.zeros(self.n_agents) for i in range(self.n_agents): ideal_dist = np.linalg.norm(goals[i] - self.init_positions[i]) actual_dist = dist_to_goal[i] r_efficiency[i] = 1.0 - (actual_dist / (ideal_dist + 1e-6)) # 4. 能量惩罚(避免无谓机动) r_energy = -0.01 * np.sum(np.square(velocities), axis=1) r_total = r_task + r_safe + r_efficiency + r_energy return r_total ``` #### Principle 2: Sparse reward + shaping Purely sparse rewards (-100 only for collisions and +100 for reaching the goal) are almost impossible to train in high-dimensional state spaces. **Dense bonus + sparse bonus** is best practice: - **Base Layer**: Dense distance progress rewards (with feedback at every step) - **Sparse Layer**: Collision Penalty + Reach Reward (Key Event) #### Principle 3: Fairness Constraints Avoid the "hero UAV" phenomenon - certain UAVs always give way and other UAVs don't learn at all:

\hat{r}i = r_i - \lambda \cdot \text{Var}({r_j}{j \in \mathcal{N}_i})

You can't use 'macro parameter character #' in math mode Or at the end of each episode, the "number of concessions" of each UAV can be counted, and the UAV with more concessions will receive additional compensation rewards. --- ## 4. Benchmark Scenarios ### 4.1 Classic scenes in the paper**Scenario 1 — Crossing** Two teams of UAVs traversing the same airspace from opposite directions: ``` 队1: (0, -100) → (0, +100) [N 架,纵向] 队2: (-100, 0) → (+100, 0) [M 架,横向] ``` Classic conflict scenarios, testing conflict identification and timing decision-making. **Scenario 2 — Circular Pursuit** Multiple UAVs fly clockwise along the same circular track, with the following aircraft catching up with the leading one:

x_i = R\cos(\theta_i), \quad y_i = R\sin(\theta_i)

\theta_i = \theta_0 + \omega t + i \cdot \Delta\theta

\mathbf{p}_i(0) = R \cdot [\cos(\phi_i), \sin(\phi_i)], \quad \phi_i = \frac{2\pi i}{N}

Missing open brace for subscript The most severe scenario, testing large-scale cluster digestion capabilities. **Scenario 4 — Dynamic Obstacle Avoidance** Stationary or moving obstacles are scattered throughout the flight area: ```python class DynamicObstacleScenario: def __init__(self, n_obstacles=10, n_uavs=8): self.obstacles = [] for _ in range(n_obstacles): self.obstacles.append({ 'position': np.random.uniform(-100, 100, size=3), 'velocity': np.random.uniform(-5, 5, size=3), # 移动障碍 'radius': np.random.uniform(3, 10) # 半径 }) def update(self, dt): for obs in self.obstacles: obs['position'] += obs['velocity'] * dt # 边界反弹 for dim in range(3): if abs(obs['position'][dim]) > 100: obs['velocity'][dim] *= -1 ``` ### 4.2 Difficulty Rating | Difficulty | Number of UAVs | Conflict density | Dynamic obstacles | Uncertainty | |------|---------|---------|---------|---------| | **Easy** | 2–4 | Low (predictable) | None | None | | **Medium** | 5–12 | Medium (requires coordination) | Optional | GPS Noise | | **Hard** | 20–50 | High (must be distributed) | Yes | GPS+Wind Disturbance | | **Extreme** | 100+ | Extremely high | Complex | Total uncertainty | ### 4.3 Evaluation indicator system Instead of just using "conflict rate" as an indicator, a complete evaluation system must be established: ```python def evaluate_policy(env, policy, n_episodes=100): """ 多维度评估指标 """ metrics = { 'collision_rate': [], # 碰撞率 'success_rate': [], # 任务完成率(所有UAV到达目标) 'avg_episode_length': [], # 平均 episode 长度 'avg_dist_to_goal': [], # 最终到目标平均距离 'avg_jerk': [], # 平均加加速度(平滑度) 'communications': [], # 通信次数(分布式场景) 'fairness_index': [], # Jain公平性指数 } for ep in range(n_episodes): obs = env.reset() done = False step_count = 0 total_reward = 0 while not done and step_count < env.max_steps: actions = policy.select_action(obs) obs, rewards, done, _ = env.step(actions) total_reward += rewards.mean() step_count += 1 # 计算指标 metrics['collision_rate'].append(1 if env._check_collision() else 0) metrics['success_rate'].append(1 if all( np.linalg.norm(env.positions[i] - env.goals[i]) < 2.0 for i in range(env.n_agents) ) else 0) metrics['avg_episode_length'].append(step_count) # Jain公平性指数(衡量各UAV任务分配公平程度) rewards_per_uav = [env.get_cumulative_reward(i) for i in range(env.n_agents)] n = len(rewards_per_uav) if sum(rewards_per_uav) > 0: fairness = sum(rewards_per_uav)**2 / (n * sum(r**2 for r in rewards_per_uav)) else: fairness = 1.0 metrics['fairness_index'].append(fairness) # 汇总 summary = {k: np.mean(v) for k, v in metrics.items()} return summary ``` --- ## 5. Environment registration under the PyMARL framework**PyMARL** (University of Southampton) is a standardized framework for MARL research. Register a custom environment: ```python # pysocialwatcher/utils/registry.py REGISTRY = {} from pysocialwatcher.agents.rnn_agent import RNNAgent from pysocialwatcher.runners import runners # 注册自定义环境 REGISTRY['uav_conflict'] = { 'env_class': UAVConflictGym, 'state_dim': 37, # obs_dim 'n_actions': 3, # action_dim 'n_agents': 8, # UAV 数量 'episode_limit': 500, # 最大步数 } # 运行示例 # python3 -m pysocialwatcher.main config=baselines/maddpg/uav_conflict.yaml ``` ```yaml # baselines/maddpg/uav_conflict.yaml env: name: uav_conflict n_agents: 8 area_size: 200 safe_radius: 5.0 max_speed: 20.0 seed: 42 algorithm: name: MADDPG actor_lr: 1.0e-3 critic_lr: 1.0e-3 gamma: 0.95 tau: 0.01 hidden_dim: 64 training: n_episodes: 100000 batch_size: 256 replay_buffer: 100000 target_update_freq: 200 ``` --- ## 6. Real physics engine access: from simulation to actual measurement ### 6.1 PX4 SITL access PX4 provides a complete software-in-the-loop simulation (SITL) that can directly replace the dynamics model of the Gym environment: ```bash # 启动 PX4 SITL + ROS cd /path/to/PX4-Autopilot make px4_sitl_default gazebo # ROS 主题订阅 rostopic echo /drone_0/mavros/local_position/pose # Python 控制接口 from pymavlink import mavutil def set_velocity(master, vx, vy, vz): master.mav.set_position_target_local_ned_send( 0, 0, 0, # time_boot_ms, target_system, target_component mavutil.mavlink.MAV_FRAME_BODY_NED, 0x0C00, # type_mask: 仅速度 0, 0, 0, # x, y, z 位置(被忽略) vx, vy, vz, # 速度 m/s 0, 0, 0, # 加速度 0, 0 # 偏航, yaw_rate ) ``` ### 6.2 Common sources of Sim-to-Real gaps Even if the dynamic model is completely consistent, there is still a significant gap between simulation and real migration: | Source of gap | Simulation | Reality | Impact | |---------|------|------|------| | **Motor response delay** | Instantaneous | 20–50ms | Trajectory error during high-speed maneuvering | | **GPS Noise** | Ideal | 1–5m Error | Position Estimate Offset | | **Wind Disturbance** | Optional Modeling | Continuous Random | Lateral Drift | | **Physical parameters** | Nominal value | Changes with temperature/electricity | Thrust coefficient changes | | **Communication delay** | 0ms | 5–100ms | Collaborative decision-making failure | **Domain Randomization** is the mainstream method to solve the Sim-to-Real gap: ```python class DomainRandomizedEnv(UAVConflictGym): """域随机化:训练时随机化物理参数""" def reset(self): # 随机化物理参数 self.mass = np.random.uniform(0.8, 1.2) # 质量 ±20% self.thrust_coeff = np.random.uniform(0.9, 1.1) # 推力系数 self.drag_coeff = np.random.uniform(0.01, 0.05) # 阻力系数 self.gps_noise = np.random.uniform(0.5, 3.0) # GPS噪声 m self.wind = np.random.uniform(-3, 3, size=3) # 风速 m/s return super().reset() ``` --- ## 7. Summary: Environment Build Checklist A qualified UAV conflict simulation environment should meet: - ✅ **Dynamics realism**: use at least a two-way integrator, taking into account velocity/acceleration constraints - ✅ **Well defined conflict**: safe distance $d_{safe}$ has a physical basis (usually 5–10m) - ✅ **Reward function can be explained**: sub-reward, sparse bonus, avoid reward hacking - ✅ **Controlled Randomness**: Results under a fixed seed can be fully reproduced - ✅ **Complete evaluation indicators**: collision rate + task completion rate + fairness + smoothness - ✅ **Baseline scenarios can be reproduced**: Crossing routes / catch-up / convergence scenarios are clearly described - ✅ **Interface Standardization**: Gym PettingZoo compatible or PyMARL registration formatThe code examples in this article cover the entire technology stack from the Gym custom environment to the AirSim/Gazebo platform, and can be freely combined according to research goals. --- **References:**1. Shah, S., et al. (2018). *AirSim: High-Fidelity Visual and Physical Simulation for Autonomous Vehicles.* Field and Service Robotics (FSR), Springer. 2. Zhou, M., et al. (2019). *A Survey on Path Planning for UAVs in Cluttered Environments.* IEEE Transactions on Intelligent Transportation Systems (T-ITS). 3. Everett, M., et al. (2021). *Collision avoidance in dense traffic with deep reinforcement learning.* IEEE International Conference on Robotics and Automation (ICRA). 4. Alonso-Mora, J., et al. (2018). *Optimization-based collision avoidance for multi-vehicle systems.* IEEE Transactions on Robotics (TRO). 5. van den Berg, J., Lin, M., & Manocha, D. (2008). *Reciprocal velocity obstacles for real-time multi-agent navigation.* IEEE International Conference on Robotics and Automation (ICRA). 6. Richards, A., & How, J. P. (2002). *Aircraft trajectory planning with collision avoidance using mixed integer linear programming.* AIAA Guidance, Navigation, and Control Conference (GNC). 7. Fan, T., et al. (2020). *Distributed Multi-Robot Collision Avoidance via Deep Reinforcement Learning for Navigation in Complex Scenarios.* The International Journal of Robotics Research (IJRR). 8. Yan, C., et al. (2025). *Multi-Agent Reinforcement Learning With Spatial-Temporal Attention for Flocking With Collision Avoidance of a Scalable Fixed-Wing UAV Fleet.* IEEE Transactions on Intelligent Transportation Systems (T-ITS). 9. Yu, L., et al. (2025). *Hybrid Transformer Based Multi-Agent Reinforcement Learning for Multiple Unpiloted Aerial Vehicle Coordination in Air Corridors.* IEEE Transactions on Mobile Computing (TMC). 10. Huo, D., et al. (2023). *Collision-Free Model Predictive Trajectory Tracking Control for UAVs in Obstacle Environment.* IEEE Transactions on Aerospace and Electronic Systems (TAES). 11. Jiang, C., et al. (2024). *Distributed Sampling-Based Model Predictive Control via Belief Propagation for Multi-Robot Formation Navigation.* IEEE Robotics and Automation Letters (RA-L).