Files
jsbsim-service/service/scripts/sweep_f22_cobra.py
2026-04-28 17:53:29 +08:00

261 lines
8.4 KiB
Python

import argparse
import contextlib
import io
import itertools
import json
import math
import os
import sys
import jsbsim_runner
DEFAULT_AIRCRAFT_ID = "f22cobra"
DEFAULT_ALTITUDE_M = 3000
DEFAULT_SAMPLE_RATE_HZ = 20
DEFAULT_INTEGRATION_RATE_HZ = 120
DEFAULT_DURATION_SEC = 5
def main():
args = parse_args()
candidates = build_grid(args.mode)
results = []
print(
f"Running {len(candidates)} F-22 cobra candidates "
f"({args.mode}, duration={args.duration_sec}s)...",
file=sys.stderr,
)
for index, parameters in enumerate(candidates, start=1):
if index == 1 or index % 10 == 0 or index == len(candidates):
print(f"[{index}/{len(candidates)}]", file=sys.stderr)
result = run_candidate(parameters, args)
if result:
results.append(result)
ranked = sorted(results, key=lambda item: item["score"])
top = ranked[: args.top]
print_summary(top)
if args.json:
print(json.dumps(top, indent=2))
def parse_args():
parser = argparse.ArgumentParser(
description="Sweep F-22 JSBSim cobra maneuver parameters."
)
parser.add_argument(
"--mode",
choices=["quick", "wide"],
default="quick",
help="quick is small enough for iteration; wide explores more combinations.",
)
parser.add_argument("--top", type=int, default=10)
parser.add_argument("--duration-sec", type=float, default=DEFAULT_DURATION_SEC)
parser.add_argument("--json", action="store_true")
return parser.parse_args()
def build_grid(mode):
if mode == "wide":
entry_speeds = [180, 200, 220, 240, 260]
pull_durations = [0.45, 0.6, 0.75, 0.9, 1.05]
unload_durations = [0.35, 0.5, 0.7]
recovery_elevators = [0.2, 0.35, 0.5, 0.65]
recovery_tvc_cmds = [0.0, 0.35, 0.7, 1.0]
throttle_cmds = [0.85, 1.0]
else:
entry_speeds = [200, 220, 240]
pull_durations = [0.45, 0.6, 0.75, 0.9]
unload_durations = [0.35, 0.5, 0.7]
recovery_elevators = [0.25, 0.4, 0.55]
recovery_tvc_cmds = [0.0, 0.5, 1.0]
throttle_cmds = [1.0]
grid = []
for (
speed_mps,
pull_duration_sec,
unload_duration_sec,
recovery_elevator_cmd,
recovery_tvc_cmd,
throttle_cmd,
) in itertools.product(
entry_speeds,
pull_durations,
unload_durations,
recovery_elevators,
recovery_tvc_cmds,
throttle_cmds,
):
grid.append(
{
"entrySpeedMps": speed_mps,
"pullDurationSec": pull_duration_sec,
"unloadDurationSec": unload_duration_sec,
"recoveryDurationSec": 2.0,
"maxElevatorCmd": -1.0,
"recoveryElevatorCmd": recovery_elevator_cmd,
"pullTvcCmd": 0.0,
"recoveryTvcCmd": recovery_tvc_cmd,
"throttleCmd": throttle_cmd,
}
)
return grid
def run_candidate(parameters, args):
payload = {
"operation": "simulate",
"aircraftId": DEFAULT_AIRCRAFT_ID,
"sampleRateHz": DEFAULT_SAMPLE_RATE_HZ,
"integrationRateHz": DEFAULT_INTEGRATION_RATE_HZ,
"trim": False,
"durationSec": args.duration_sec,
"initialState": {
"position": [0, DEFAULT_ALTITUDE_M, 0],
"velocityMps": parameters["entrySpeedMps"],
"headingDeg": 0,
},
"maneuver": {
"type": "cobra",
"parameters": {
"flightPathAngleDeg": 0,
"pullDurationSec": parameters["pullDurationSec"],
"unloadDurationSec": parameters["unloadDurationSec"],
"recoveryDurationSec": parameters["recoveryDurationSec"],
"maxElevatorCmd": parameters["maxElevatorCmd"],
"recoveryElevatorCmd": parameters["recoveryElevatorCmd"],
"pullTvcCmd": parameters["pullTvcCmd"],
"recoveryTvcCmd": parameters["recoveryTvcCmd"],
"throttleCmd": parameters["throttleCmd"],
},
},
}
try:
with contextlib.redirect_stdout(io.StringIO()):
simulation = jsbsim_runner.simulate(payload)
except Exception as error:
return {
"score": 1_000_000,
"failed": True,
"error": str(error),
"parameters": parameters,
}
samples = simulation["samples"]
metrics = compute_metrics(samples, parameters)
return {
"score": round(score(metrics), 6),
"failed": False,
"parameters": parameters,
"metrics": metrics,
}
def compute_metrics(samples, parameters):
first = samples[0]
last = samples[-1]
max_alpha_sample = max(samples, key=lambda sample: sample.get("alphaDeg", -999))
max_pitch_sample = max(samples, key=lambda sample: sample.get("pitchDeg", -999))
min_speed_sample = min(samples, key=lambda sample: sample.get("velocityMps", 999999))
heading_delta = angle_delta_deg(first.get("headingDeg", 0), last.get("headingDeg", 0))
altitude_delta = last["position"][1] - first["position"][1]
forward_delta = last["position"][2] - first["position"][2]
lateral_delta = last["position"][0] - first["position"][0]
return {
"maxAlphaDeg": round_float(max_alpha_sample.get("alphaDeg", 0)),
"maxAlphaTimeSec": round_float(max_alpha_sample.get("t", 0)),
"maxPitchDeg": round_float(max_pitch_sample.get("pitchDeg", 0)),
"maxPitchTimeSec": round_float(max_pitch_sample.get("t", 0)),
"endAlphaDeg": round_float(last.get("alphaDeg", 0)),
"endPitchDeg": round_float(last.get("pitchDeg", 0)),
"minSpeedMps": round_float(min_speed_sample.get("velocityMps", 0)),
"endSpeedMps": round_float(last.get("velocityMps", 0)),
"altitudeDeltaM": round_float(altitude_delta),
"forwardDeltaM": round_float(forward_delta),
"lateralDeltaM": round_float(lateral_delta),
"headingDeltaDeg": round_float(heading_delta),
"entrySpeedMps": parameters["entrySpeedMps"],
"maxTvcDeg": round_float(
max(
abs(sample.get("controlSurfaces", {}).get("tvcDeg", 0))
for sample in samples
)
),
"maxThrustNorm": round_float(
max(sample.get("fcs", {}).get("thrustNorm", 0) for sample in samples)
),
}
def score(metrics):
max_alpha = metrics["maxAlphaDeg"]
max_pitch = metrics["maxPitchDeg"]
end_alpha = abs(metrics["endAlphaDeg"])
end_pitch = abs(metrics["endPitchDeg"])
altitude_delta = metrics["altitudeDeltaM"]
lateral_delta = abs(metrics["lateralDeltaM"])
speed_loss = metrics["entrySpeedMps"] - metrics["minSpeedMps"]
return (
abs(max_alpha - 105) * 2.0
+ abs(max_pitch - 95) * 1.2
+ max(0, 90 - max_alpha) * 8.0
+ max(0, max_alpha - 125) * 10.0
+ max(0, end_alpha - 25) * 1.5
+ max(0, end_pitch - 35) * 0.8
+ max(0, 35 - speed_loss) * 0.8
+ lateral_delta * 0.15
+ max(0, -altitude_delta) * 0.1
)
def print_summary(results):
print("")
print("rank score speed pull unload recElev recTvc maxA tA maxP endA endP minV dAlt dFwd dHead")
for index, result in enumerate(results, start=1):
params = result["parameters"]
metrics = result.get("metrics", {})
if result.get("failed"):
print(f"{index:>4} FAILED {result.get('error')}")
continue
print(
f"{index:>4} "
f"{result['score']:>6.1f} "
f"{params['entrySpeedMps']:>5.0f} "
f"{params['pullDurationSec']:>4.2f} "
f"{params['unloadDurationSec']:>5.2f} "
f"{params['recoveryElevatorCmd']:>7.2f} "
f"{params['recoveryTvcCmd']:>6.2f} "
f"{metrics['maxAlphaDeg']:>5.1f} "
f"{metrics['maxAlphaTimeSec']:>3.1f} "
f"{metrics['maxPitchDeg']:>5.1f} "
f"{metrics['endAlphaDeg']:>5.1f} "
f"{metrics['endPitchDeg']:>5.1f} "
f"{metrics['minSpeedMps']:>5.1f} "
f"{metrics['altitudeDeltaM']:>5.0f} "
f"{metrics['forwardDeltaM']:>5.0f} "
f"{metrics['headingDeltaDeg']:>5.1f}"
)
def angle_delta_deg(start, end):
return ((end - start + 540) % 360) - 180
def round_float(value):
return round(float(value), 6) if math.isfinite(float(value)) else value
if __name__ == "__main__":
main()