UAV パス競合シミュレーション環境構築:紙面での実践からコード実装まで
最初の 2 つの記事では、UAV 競合解決アルゴリズムのパノラマ と MARL+GAT エンドツーエンド ソリューション をそれぞれ整理しました。この記事では、次のような低レベルの質問に答えます。**シミュレーション環境で現実的で再現可能な UAV 衝突シナリオを構築するにはどうすればよいですか? ** これは、アルゴリズム評価の信頼性と実験の比較可能性を直接決定します。
1. シミュレーション環境の構築がなぜそれほど重要なのでしょうか?
シミュレーション環境は、アルゴリズム理論と飛行測定をつなぐ架け橋です。適切に設計されたシミュレーション環境は、次の 3 つの側面を満たす必要があります。
| 寸法 | 意味 | よくある質問 |
|---|---|---|
| 信頼性 | シミュレーションと現実のギャップ | ダイナミクスの過度の単純化とセンサー モデルの欠如 |
| 再現性 | 同じシードの下で一貫した結果 | 制御されていないランダムな初期化、浮動小数点エラーの蓄積 |
| スケーラビリティ | 大規模クラスターのサポート | 単一マシンのシミュレーションは並列化できず、通信トポロジがありません。 |
この記事では、信頼性と拡張性の両方を考慮した MARL トレーニング シナリオ に基づくシミュレーション環境の構築に焦点を当てています。結局のところ、50 機以上のドローンで実際の飛行制御をトレーニングするのは非現実的です。
2. 主流のシミュレーション プラットフォームの比較
2.1 プラットフォームの概要|プラットフォーム |基礎となるエンジン |物理的な信頼性 |マルチマシンのサポート | MARL の互換性 |論文引用 |
|------|----------|-----------|----------|------------|-----------| | AirSim (マイクロソフト) |アンリアル エンジン | ⭐⭐⭐⭐⭐ | ✅ | ⚠️ラッピングが必要です |高 | | フライトゴーグル (MIT) |団結 | ⭐⭐⭐⭐ | ✅ | ⚠️ラッピングが必要です |高 | | RotorS (チューリッヒ工科大学) |ガゼボ/ROS | ⭐⭐⭐⭐ | ✅ | ✅ PyMARL 互換 |中 | | モールス (LAAS-CNRS) |ブレンダー ゲーム エンジン | ⭐⭐⭐ | ✅ | ✅ ネイティブ |中 | | ウェブボット |自社開発 | ⭐⭐⭐⭐ | ✅ | ✅ |低い | | カスタム 2D/3D |該当なし | ⭐⭐ | ✅✅✅ | ✅✅✅ | - |
核となる結論:
- アカデミック ベンチマーク: RotorS + ROS/Gazebo が主流の選択肢です (ETH Zurich によって作成され、PX4 と互換性があります)
- エンドツーエンドの RL 研究: ほとんどの論文では カスタム ジム環境 (完全にカスタマイズされており、報酬関数を簡単に変更できます) が使用されています。
- Sim-to-Real ターゲット: AirSim/FlightGoggles (高忠実度ビジョン + センサー ノイズ)
2.2 AirSim: ゼロから 100 までの拡張体験
Microsoft AI for Earth チームは、AirSim 上で最大 100 台の固定翼 UAV の群れ調整をトレーニングしました。
シャー、S.、他。 (2018年)。 AirSim: 自動運転車向けの高忠実度の視覚的および物理的シミュレーション。 フィールドおよびサービス ロボティクス。AirSim の主な利点:
- センサー シミュレーション: カメラ、LiDAR、GPS、IMU の完全なシミュレーション、ノイズを注入可能
- 気象システム: 風速、雲、光がセンサーに与える影響
- API の統合: C++ / Python / ROS インターフェイスをサポート
import airsim
import numpy as np
class AirSimUAVEnv:
"""
AirSim 多 UAV 冲突环境封装
"""
def __init__(self, n_agents=8, drone_names=None):
self.client = airsim.MultirotorClient()
self.client.confirmConnection()
self.client.enableApiControl(True)
if drone_names is None:
self.drone_names = [f"Drone_{i}" for i in range(n_agents)]
else:
self.drone_names = drone_names
self.n_agents = len(self.drone_names)
# 为每架 UAV 解锁并设置为被动模式(由 RL 控制)
for name in self.drone_names:
self.client.enableApiControl(True, name)
self.client.armDisarm(True, name)
def reset(self):
"""随机初始化各 UAV 位置和目标"""
self.goals = {}
for name in self.drone_names:
# 随机起点
start_pos = np.random.uniform(-50, 50, size=3)
start_pos[2] = np.random.uniform(10, 30) # 高度 10-30m
# 随机终点
goal_pos = np.random.uniform(-50, 50, size=3)
goal_pos[2] = np.random.uniform(10, 30)
self.client.simSetVehiclePose(
airsim.Vector3R(*start_pos),
airsim.ToQuaternionr(0, 0, 0),
vehicle_name=name
)
self.goals[name] = goal_pos
return self._get_obs()
def _get_obs(self):
"""获取各 UAV 的本地观测"""
obs = []
for name in self.drone_names:
state = self.client.getMultirotorState(name)
pos = state.kinematics_estimated.position
vel = state.kinematics_estimated.linear_velocity
obs.append(np.array([pos.x_val, pos.y_val, pos.z_val,
vel.x_val, vel.y_val, vel.z_val]))
return np.array(obs)
def step(self, actions):
"""执行动作:vx, vy, vz 速度指令"""
for i, name in enumerate(self.drone_names):
self.client.moveByVelocityAsync(
actions[i, 0], actions[i, 1], actions[i, 2],
duration=0.1, vehicle_name=name
)
# 等待执行完成(AirSim 为异步 API)
self.client.simPause(True)
airsim.multirotorclient()
self.client.simContinueForTime(0.1)
self.client.simPause(False)
obs = self._get_obs()
reward, done = self._compute_reward()
return obs, reward, done, {}
def _compute_reward(self):
"""奖励函数"""
reward = np.zeros(self.n_agents)
done = np.zeros(self.n_agents, dtype=bool)
for i, name in enumerate(self.drone_names):
state = self.client.getMultirotorState(name)
pos = np.array([state.kinematics_estimated.position.x_val,
state.kinematics_estimated.position.y_val,
state.kinematics_estimated.position.z_val])
dist_to_goal = np.linalg.norm(pos - self.goals[name])
reward[i] += 1.0 / (dist_to_goal + 1.0)
# 冲突惩罚
for j, other_name in enumerate(self.drone_names):
if i >= j:
continue
other_state = self.client.getMultirotorState(other_name)
other_pos = np.array([other_state.kinematics_estimated.position.x_val,
other_state.kinematics_estimated.position.y_val,
other_state.kinematics_estimated.position.z_val])
dist = np.linalg.norm(pos - other_pos)
if dist < 5.0: # 5m 安全距离
reward[i] -= 100
done[i] = True
return reward, done.any()
2.3 Gazebo / RotorS: ROS エコシステムの標準的な選択肢
チューリッヒ工科大学の RotorS は、ROS/Gazebo エコシステムの中で最も完全なマルチ UAV シミュレーション パッケージです。
フェーン、P.、他。 (2020年)。 ETHZ フライング マシン アリーナ データセット。 ETH チューリッヒ。
RotorS は以下をサポートします。
- mav_msgs インターフェース (スピン + ピッチ + ロール + スラスト → モーター PWM)
- PX4 SITL/HITL: ループ内の完全な飛行制御ファームウェア
- RViz 視覚化: クラスターのステータスをリアルタイムで観察
- Gazebo 物理エンジン: ODE / Bullet / Simbody オプション
# 安装 RotorS(Ubuntu 20.04 + ROS Noetic)
sudo apt-get install ros-noetic-rotors-description ros-noetic-rotors-gazebo
source /opt/ros/noetic/setup.bash
# 启动 8 架 UAV 集群场景
roslaunch rotors_gazebo mav_tecs_controller.launch \
mav_name:=hummingbird \
world_name:=forest \
n_uavs:=8
2.4 カスタムジム環境: 学術的な RL 研究の主流の選択肢
MARL UAV 論文の大部分 (MADDPG/QMIX/MAPPO のオリジナルの実装を含む) は、次の 3 つの理由から完全にカスタムの Gym スタイル環境を使用しています。
- トレーニング速度: グラフィックス レンダリングなし、Python 直接ループ、1 秒あたり数万のインタラクションを完了可能
- 制御可能な報酬関数: C++ コードを変更せずに設計を迅速に反復します。
- 再現性: 固定ランダムシードを使用すると、結果は完全に再現可能です。
import gym
from gym import spaces
import numpy as np
class UAVConflictGym(gym.Env):
"""
标准 Gym 风格 UAV 冲突环境
兼容 Stable-Baselines3 / rllib / PyMARL
"""
metadata = {'render_modes': ['human', 'rgb_array']}
def __init__(
self,
n_agents=8,
area_size=200.0, # 仿真区域边长 (m)
safe_radius=5.0, # 安全隔离距离 (m)
max_speed=20.0, # 最大速度 (m/s)
dt=0.1, # 仿真步长 (s)
max_steps=500, # 最大 episode 步数
seed=None
):
super().__init__()
self.n_agents = n_agents
self.area_size = area_size
self.safe_radius = safe_radius
self.max_speed = max_speed
self.dt = dt
self.max_steps = max_steps
# 观测空间:(位置3 + 速度3 + 目标3 + 自身id编码 + 邻居信息)
self.obs_dim = 3 + 3 + 3 + n_agents + 3 * (n_agents - 1)
self.action_dim = 3 # Δvx, Δvy, Δvz
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.obs_dim,), dtype=np.float32
)
self.action_space = spaces.Box(
low=-1, high=1,
shape=(self.action_dim,), dtype=np.float32
)
# 多智能体环境的 each-agent 空间
self.observation_space = spaces.Box(
low=-np.inf, high=np.inf,
shape=(self.n_agents, self.obs_dim), dtype=np.float32
)
if seed is not None:
self.seed(seed)
def seed(self, seed):
np.random.seed(seed)
def reset(self):
"""初始化 UAV 状态"""
# 位置:均匀分布在区域内
self.positions = np.random.uniform(
-self.area_size / 2, self.area_size / 2,
size=(self.n_agents, 3)
).astype(np.float32)
# 速度:随机方向,5-15 m/s
speeds = np.random.uniform(5, 15, size=(self.n_agents,))
directions = np.random.randn(self.n_agents, 3)
directions = directions / np.linalg.norm(directions, axis=1, keepdims=True)
self.velocities = speeds[:, None] * directions
# 目标点
self.goals = np.random.uniform(
-self.area_size / 2, self.area_size / 2,
size=(self.n_agents, 3)
).astype(np.float32)
# 确保起点和终点不重叠(避免初始冲突过于密集)
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
dist = np.linalg.norm(self.positions[i] - self.positions[j])
if dist < self.safe_radius * 2:
# 重新生成
self.positions[j] = np.random.uniform(
-self.area_size / 2, self.area_size / 2, size=3
).astype(np.float32)
self.step_count = 0
self.prev_distances = self._compute_distances()
return self._get_obs()
def step(self, actions):
"""
actions: (n_agents, 3) 速度增量
"""
actions = np.clip(actions, -1, 1) * self.max_speed * self.dt
# 更新速度
new_velocities = self.velocities + actions
speed = np.linalg.norm(new_velocities, axis=1, keepdims=True)
new_velocities = np.clip(new_velocities, -self.max_speed, self.max_speed)
new_velocities = new_velocities * np.clip(speed / self.max_speed, 0, 1)
new_velocities = new_velocities / (np.linalg.norm(new_velocities, axis=1, keepdims=True) + 1e-8) * np.clip(speed, 0, self.max_speed)
self.velocities = new_velocities
# 更新位置
self.positions += self.velocities * self.dt
# 边界处理:软边界反射
for i in range(self.n_agents):
for dim in range(3):
half = self.area_size / 2
if abs(self.positions[i, dim]) > half:
self.positions[i, dim] = np.clip(
self.positions[i, dim], -half, half
)
self.velocities[i, dim] *= -0.8 # 反射 + 能量损失
# 奖励计算
reward, info = self._compute_reward()
# 冲突检测
collision = self._check_collision()
done = collision or self.step_count >= self.max_steps
self.step_count += 1
return self._get_obs(), reward, np.full(self.n_agents, done), info
def _compute_distances(self):
"""计算所有 UAV 对之间的距离矩阵"""
dist_matrix = np.zeros((self.n_agents, self.n_agents))
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
d = np.linalg.norm(self.positions[i] - self.positions[j])
dist_matrix[i, j] = dist_matrix[j, i] = d
return dist_matrix
def _compute_reward(self):
"""分项奖励函数"""
reward = np.zeros(self.n_agents)
info = {'collision': False}
# 1. 任务进度奖励(到目标距离减少)
dist_to_goal = np.linalg.norm(self.positions - self.goals, axis=1)
prev_dist = self.prev_distances.copy()
np.fill_diagonal(prev_dist, np.inf)
# 进度:到目标距离减少得正奖励
for i in range(self.n_agents):
progress = self.prev_distances[i, i] - dist_to_goal[i]
reward[i] += 10.0 * progress # 缩放系数
# 到达目标
if dist_to_goal[i] < 2.0:
reward[i] += 50.0
# 2. 安全奖励(维持安全距离)
collision_any = False
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
d = np.linalg.norm(self.positions[i] - self.positions[j])
if d < self.safe_radius:
# 碰撞!
reward[i] -= 200
reward[j] -= 200
collision_any = True
elif d < self.safe_radius * 2:
# 接近告警
close_factor = (self.safe_radius * 2 - d) / self.safe_radius
reward[i] -= 5.0 * close_factor
reward[j] -= 5.0 * close_factor
info['collision'] = collision_any
# 3. 能量惩罚(避免无谓机动)
action_penalty = -0.01 * np.sum(np.square(self.velocities - self.prev_velocities))
reward += action_penalty
# 4. 速度维持奖励(偏好稳定飞行)
for i in range(self.n_agents):
speed = np.linalg.norm(self.velocities[i])
if self.max_speed * 0.3 < speed < self.max_speed * 0.8:
reward[i] += 0.5
self.prev_distances = self._compute_distances()
return reward, info
def _check_collision(self):
"""检测是否有碰撞"""
for i in range(self.n_agents):
for j in range(i + 1, self.n_agents):
if np.linalg.norm(self.positions[i] - self.positions[j]) < self.safe_radius:
return True
return False
def _get_obs(self):
"""构建各 UAV 的观测向量"""
obs = []
for i in range(self.n_agents):
# 自身状态:位置 + 速度
self_state = np.concatenate([self.positions[i], self.velocities[i]])
# 目标信息:方向 + 距离
direction_to_goal = self.goals[i] - self.positions[i]
dist_to_goal = np.linalg.norm(direction_to_goal)
direction_to_goal_norm = direction_to_goal / (dist_to_goal + 1e-6)
goal_info = np.concatenate([direction_to_goal_norm, np.array([dist_to_goal])])
# 自身 ID 编码(one-hot)
id_encoding = np.zeros(self.n_agents)
id_encoding[i] = 1.0
# 邻居信息(相对位置 + 相对速度)
neighbor_info = []
for j in range(self.n_agents):
if i != j:
rel_pos = self.positions[j] - self.positions[i]
rel_vel = self.velocities[j] - self.velocities[i]
neighbor_info.extend(rel_pos.tolist() + rel_vel.tolist())
neighbor_info = np.array(neighbor_info, dtype=np.float32)
# 拼接
full_obs = np.concatenate([
self_state, # 6
goal_info, # 4
id_encoding, # n_agents
neighbor_info # (n_agents-1)*6
])
obs.append(full_obs)
return np.array(obs, dtype=np.float32)
def render(self, mode='human'):
"""简单 2D 可视化"""
import matplotlib.pyplot as plt
plt.clf()
plt.xlim(-self.area_size/2, self.area_size/2)
plt.ylim(-self.area_size/2, self.area_size/2)
# 绘制 UAV 和目标
for i in range(self.n_agents):
# 位置
plt.scatter(self.positions[i, 0], self.positions[i, 1],
color='blue', s=100, zorder=5)
# 速度向量
plt.arrow(self.positions[i, 0], self.positions[i, 1],
self.velocities[i, 0], self.velocities[i, 1],
head_width=2, color='blue', alpha=0.5)
# 目标点
plt.scatter(self.goals[i, 0], self.goals[i, 1],
color='green', s=50, marker='x')
# 连线
plt.plot([self.positions[i, 0], self.goals[i, 0]],
[self.positions[i, 1], self.goals[i, 1]],
'g--', alpha=0.2)
plt.pause(0.01)
plt.ioff()
3. 環境設計の核となる要素
3.1 動的モデル: 粒子モデルから 6 自由度まで
レベル 1 — 点質量 位置と速度の更新のみを考慮する最も単純化されたバージョンは、アルゴリズムの検証と高速トレーニングに使用されます。
\mathbf{a}{new} = \text{clip}(\mathbf{a}{old} + \mathbf{u}, \mathbf{v}{min}, \mathbf{v}{max})
\mathbf{v}_i[k+1] = \mathbf{u}_i[k]
\mathbf{v}_i[k+1] = \mathbf{v}_i[k] + \mathbf{a}_i[k] \Delta t
\mathbf{p}_i[k+1] = \mathbf{p}_i[k] + \mathbf{v}_i[k] \Delta t + \frac{1}{2}\mathbf{a}_i[k]\Delta t^2
|\mathbf{a}i[k]| \leq a{max}, \quad |\mathbf{v}i[k]| \leq v{最大}
\dot{\boldsymbol{\eta}} = \mathbf{R}(\boldsymbol{\Theta})\boldsymbol{\omega}
m\ddot{\mathbf{p}} = -mg\mathbf{e}_3 + \mathbf{R}(\boldsymbol{\Theta})\mathbf{T}_i\mathbf{e}_3 - \mathbf{D}(\dot{\mathbf{p}})
c_{ij} = \begin{件} 1 & \text{if } | \mathbf{p}_i - \mathbf{p}j | < d{安全} \ 0 & \text{そうでない場合} \end{件}
P_{競合} = P\left( | \mathbf{p}i - \mathbf{p}j | < d{safe} \right) = \int{| \mathbf{d} | < d_{safe}} f_{\Delta \mathbf{p}}(\mathbf{d}) , d\mathbf{d}
TTC = \min_{t > 0} { t \mid | \mathbf{p}_i(t) - \mathbf{p}j(t) | = d{安全} }
\hat{r}i = r_i - \lambda \cdot \text{Var}({r_j}{j \in \mathcal{N}_i})
x_i = R\cos(\theta_i)、\quad y_i = R\sin(\theta_i)
\theta_i = \theta_0 + \omega t + i \cdot \Delta\theta
\mathbf{p}_i(0) = R \cdot [\cos(\phi_i), \sin(\phi_i)], \quad \phi_i = \frac{2\pi i}{N}