Source code for pterasoftware.movements.movement

"""Contains the Movement class.

**Contains the following classes:**

Movement: A class used to contain an UnsteadyProblem's movement.

**Contains the following functions:**

None
"""

from __future__ import annotations

import copy
import math
from typing import cast

import numpy as np
import scipy.optimize as sp_opt

from .. import _core, _logging, _parameter_validation, geometry
from .. import operating_point as operating_point_mod
from .. import problems
from . import airplane_movement as airplane_movement_mod
from . import operating_point_movement as operating_point_movement_mod

movement_logger = _logging.get_logger("movements.movement")


[docs] class Movement(_core.CoreMovement): """A class used to contain an UnsteadyProblem's movement. **Contains the following methods:** lcm_period: The least common multiple of all motion periods, ensuring all motions complete an integer number of cycles when cycle averaging forces and moments. max_period: The longest period of motion of Movement's sub movement objects, the motion(s) of its sub sub movement object(s), and the motions of its sub sub sub movement objects. min_period: The shortest non zero period of motion of Movement's sub movement objects, the motion(s) of its sub sub movement object(s), and the motions of its sub sub sub movement objects. static: Flags if Movement's sub movement objects, its sub sub movement object(s), and its sub sub sub movement objects all represent no motion. """ __slots__ = ( "_num_cycles", "_num_chords", "_max_wake_chords", "_max_wake_cycles", "_airplanes", "_operating_points", ) def __init__( self, airplane_movements: list[airplane_movement_mod.AirplaneMovement], operating_point_movement: operating_point_movement_mod.OperatingPointMovement, delta_time: float | int | str | None = None, num_cycles: int | None = None, num_chords: int | None = None, num_steps: int | None = None, max_wake_rows: int | None = None, max_wake_chords: int | None = None, max_wake_cycles: int | None = None, ) -> None: """The initialization method. This method checks that all Wings maintain their symmetry type across all time steps. See the WingMovement class documentation for more details on this requirement. See the Wing class documentation for more information on symmetry types. :param airplane_movements: A list of the AirplaneMovements associated with each of the UnsteadyProblem's Airplanes. :param operating_point_movement: An OperatingPointMovement characterizing any changes to the UnsteadyProblem's operating conditions. :param delta_time: The time between each time step. Accepts the following: None (default): Movement analytically estimates the delta_time that produces wake ring vortices with roughly the same chord length as the bound trailing edge ring vortices, accounting for both freestream and geometry motion velocities. This provides good results across all Strouhal numbers. "optimize": Movement first runs the analytical estimation, then uses that result as an initial guess for an iterative optimization that minimizes the area mismatch between wake ring vortices and their parent bound trailing edge ring vortices. This is slower but may produce slightly more accurate results. Positive number (int or float): Use the specified value directly. All values are converted internally to floats. The units are in seconds. :param num_cycles: The number of cycles of the maximum period motion used to calculate a num_steps parameter initialized as None if Movement isn't static. If num_steps is not None or if Movement is static, this must be None. If num_steps is initialized as None and the Movement isn't static, num_cycles must be a positive int. In that case, I recommend setting num_cycles to 3. The default is None. :param num_chords: The number of chord lengths used to calculate a num_steps parameter initialized as None if Movement is static. If num_steps is not None or if Movement isn't static, this must be None. If num_steps is initialized as None and Movement is static, num_chords must be a positive int. In that case, I recommend setting num_chords to 10. For cases with multiple Airplanes, the num_chords will reference the largest reference chord length. The default is None. :param num_steps: The number of time steps of the unsteady simulation. If initialized as None, and Movement isn't static, Movement will calculate a value for num_steps such that the simulation will cover some number of cycles of the maximum period of all the motion described in Movement's sub movement objects, sub sub movement object(s), and sub sub sub movement objects. If num_steps is initialized as None, and Movement is static, Movement will calculate a value for num_steps such that the simulation will result in a wake extending back by some number of reference chord lengths. :param max_wake_rows: The maximum number of chordwise wake ring vortex rows per Wing. Works for both static and non static Movements. Must be a positive int if set. At most one of max_wake_rows, max_wake_chords, and max_wake_cycles can be non None. The default is None (no truncation). :param max_wake_chords: The maximum wake length in chord lengths, converted to max_wake_rows internally. Only valid for static Movements (mirrors num_chords). Must be a positive int if set. The default is None. :param max_wake_cycles: The maximum wake length in motion cycles, converted to max_wake_rows internally. Only valid for non static Movements (mirrors num_cycles). Must be a positive int if set. The default is None. :return: None """ # --- Validate types early --- # CoreMovement.__init__() validates these, but Movement needs to use # the parameters before calling super().__init__() (for delta_time # estimation and period computation). Validate here so that bad types # produce clear TypeErrors rather than AttributeErrors. if not isinstance(airplane_movements, list): raise TypeError("airplane_movements must be a list.") if len(airplane_movements) < 1: raise ValueError("airplane_movements must have at least one element.") for airplane_movement in airplane_movements: if not isinstance( airplane_movement, airplane_movement_mod.AirplaneMovement ): raise TypeError( "Every element in airplane_movements must be an " "AirplaneMovement." ) if not isinstance( operating_point_movement, operating_point_movement_mod.OperatingPointMovement, ): raise TypeError( "operating_point_movement must be an OperatingPointMovement." ) # --- Resolve delta_time --- # Track whether iterative optimization should run after analytical. _should_iteratively_optimize_delta_time: bool = False if isinstance(delta_time, str): if delta_time == "optimize": _should_iteratively_optimize_delta_time = True # Fall through to calculate analytical estimate first. delta_time = None else: raise ValueError('delta_time string must be "optimize".') if delta_time is not None: delta_time = _parameter_validation.number_in_range_return_float( delta_time, "delta_time", min_val=0.0, min_inclusive=False ) else: # Calculate a fast initial delta_time estimate based on freestream # velocity. This is used as a fallback for static movements and as # a starting point for the analytical optimization. delta_times = [] for airplane_movement in airplane_movements: # TODO: Consider making this also average across each Airplane's Wings. c_ref = airplane_movement.base_airplane.c_ref assert c_ref is not None delta_times.append( c_ref / airplane_movement.base_airplane.wings[0].num_chordwise_panels / operating_point_movement.base_operating_point.vCg__E ) fast_estimate = sum(delta_times) / len(delta_times) # Run analytical optimization to get a better delta_time that accounts # for both freestream and geometry motion velocities. delta_time = _analytically_optimize_delta_time( airplane_movements=list(airplane_movements), operating_point_movement=operating_point_movement, initial_delta_time=fast_estimate, ) # Run iterative optimization if requested, using the analytical result # as the initial guess. if _should_iteratively_optimize_delta_time: delta_time = _optimize_delta_time( airplane_movements=list(airplane_movements), operating_point_movement=operating_point_movement, initial_delta_time=delta_time, ) # --- Compute period properties locally --- # These are needed for num_steps and max_wake calculations before # super().__init__() is called. The same values will be lazily cached # by CoreMovement when accessed via properties. _airplane_movement_max_periods = [] for airplane_movement in airplane_movements: _airplane_movement_max_periods.append(airplane_movement.max_period) _max_period = max( max(_airplane_movement_max_periods), operating_point_movement.max_period, ) _static = _max_period == 0 _lcm_period: float = 0.0 if not _static: _all_periods: list[float] = [] for airplane_movement in airplane_movements: _all_periods.extend(airplane_movement.all_periods) _all_periods.append(operating_point_movement.max_period) _lcm_period = _core.lcm_multiple(_all_periods) # --- Resolve num_steps --- if num_steps is None: if _static: if num_cycles is not None: raise ValueError( "If num_steps is None and the Movement is static, num_cycles " "must be left as None." ) else: if num_cycles is None: raise ValueError( "If num_steps is None and the Movement isn't static, " "num_cycles must be set." ) if num_cycles is not None: num_cycles = _parameter_validation.int_in_range_return_int( num_cycles, "num_cycles", min_val=1, min_inclusive=True, ) if num_steps is None: if _static: if num_chords is None: raise ValueError( "If num_steps is None and the Movement is static, num_chords " "must be set." ) else: if num_chords is not None: raise ValueError( "If num_steps is None and the Movement isn't static, " "num_chords must be left as None." ) if num_chords is not None: num_chords = _parameter_validation.int_in_range_return_int( num_chords, "num_chords", min_val=1, min_inclusive=True, ) if num_cycles is not None or num_chords is not None: if num_steps is not None: raise ValueError( "If either num_cycles or num_chords is not None, num_steps must " "be None." ) if num_steps is not None: num_steps = _parameter_validation.int_in_range_return_int( num_steps, "num_steps", min_val=1, min_inclusive=True, ) else: if _static: # Find the value of the largest reference chord length of all the # base Airplanes. c_refs = [] for airplane_movement in airplane_movements: c_ref = airplane_movement.base_airplane.c_ref assert c_ref is not None c_refs.append(c_ref) max_c_ref = max(c_refs) # Set the number of time steps such that the wake extends back by # some number of reference chord lengths. assert num_chords is not None wake_length = num_chords * max_c_ref distance_per_time_step = ( delta_time * operating_point_movement.base_operating_point.vCg__E ) num_steps = math.ceil(wake_length / distance_per_time_step) else: # Set the number of time steps such that the simulation runs for # some number of cycles of all motions. Use the LCM of all periods # to ensure each motion completes an integer number of cycles. assert num_cycles is not None num_steps = math.ceil(num_cycles * _lcm_period / delta_time) # --- Resolve max_wake_rows --- # Validate max_wake_* parameters. At most one can be non None. _num_max_wake_set = sum( x is not None for x in (max_wake_rows, max_wake_chords, max_wake_cycles) ) if _num_max_wake_set > 1: raise ValueError( "At most one of max_wake_rows, max_wake_chords, and max_wake_cycles " "can be non None." ) if max_wake_chords is not None: if not _static: raise ValueError("max_wake_chords is only valid for static Movements.") max_wake_chords = _parameter_validation.int_in_range_return_int( max_wake_chords, "max_wake_chords", min_val=1, min_inclusive=True, ) if max_wake_cycles is not None: if _static: raise ValueError( "max_wake_cycles is only valid for non static Movements." ) max_wake_cycles = _parameter_validation.int_in_range_return_int( max_wake_cycles, "max_wake_cycles", min_val=1, min_inclusive=True, ) # Convert max_wake_chords to max_wake_rows using the same formula as # num_chords to num_steps. if max_wake_chords is not None: c_refs = [] for airplane_movement in airplane_movements: c_ref = airplane_movement.base_airplane.c_ref assert c_ref is not None c_refs.append(c_ref) max_c_ref = max(c_refs) distance_per_time_step = ( delta_time * operating_point_movement.base_operating_point.vCg__E ) max_wake_rows = math.ceil( max_wake_chords * max_c_ref / distance_per_time_step ) # Convert max_wake_cycles to max_wake_rows using the same formula as # num_cycles to num_steps. if max_wake_cycles is not None: max_wake_rows = math.ceil(max_wake_cycles * _lcm_period / delta_time) # --- Initialize CoreMovement --- super().__init__( airplane_movements=airplane_movements, operating_point_movement=operating_point_movement, delta_time=delta_time, num_steps=num_steps, max_wake_rows=max_wake_rows, ) # Pre-populate the lazy caches with values already computed above so # that accessing the inherited properties does not redundantly # recompute them. _max_period, _lcm_period, and _static are set here # because they are always needed during __init__ (via the static # check). _min_period remains lazy. self._max_period = _max_period self._lcm_period = _lcm_period self._static = _static # --- Store Movement only attributes --- self._num_cycles = num_cycles self._num_chords = num_chords self._max_wake_chords = max_wake_chords self._max_wake_cycles = max_wake_cycles # --- Batch generate --- # Generate a list of lists of Airplanes that are the steps through each # AirplaneMovement. The first index identifies the AirplaneMovement, and the # second index identifies the time step. airplanes_temp: list[list[geometry.airplane.Airplane]] = [] for airplane_movement in self.airplane_movements: airplanes_temp.append( airplane_movement.generate_airplanes( num_steps=self._num_steps, delta_time=self._delta_time ) ) # Validate that all Wings maintain their symmetry type across all time steps. for airplane_movement_id, airplane_list in enumerate(airplanes_temp): # Get the base Airplane (first time step). base_airplane = airplane_list[0] # Store the symmetry types of the base Wings. base_wing_symmetry_types = [] for wing in base_airplane.wings: base_wing_symmetry_types.append(wing.symmetry_type) # Validate all subsequent time steps. for step_id, airplane in enumerate(airplane_list): # Check that Wings maintain their symmetry types. for wing_id, wing in enumerate(airplane.wings): base_symmetry_type = base_wing_symmetry_types[wing_id] if wing.symmetry_type != base_symmetry_type: raise ValueError( f"Wing {wing_id} in AirplaneMovement " f"{airplane_movement_id} changed from type " f"{base_symmetry_type} symmetry at time step 0 to type " f"{wing.symmetry_type} symmetry at time step {step_id}. " f"Wings cannot undergo motion that changes their symmetry " f"type. This happens when a symmetric Wing moves such " f"that its symmetry plane is no longer coincident with " f"the wing axes' yz plane or vice versa." ) # Store as tuple of tuples to prevent external mutation. self._airplanes: tuple[tuple[geometry.airplane.Airplane, ...], ...] = tuple( tuple(airplane_list) for airplane_list in airplanes_temp ) # Generate a list of OperatingPoints that are the steps through the # OperatingPointMovement. operating_points_temp = operating_point_movement.generate_operating_points( num_steps=self._num_steps, delta_time=self._delta_time ) # Store as tuple to prevent external mutation. self._operating_points: tuple[operating_point_mod.OperatingPoint, ...] = tuple( operating_points_temp ) # --- Immutable: read only properties --- @property def operating_point_movement( self, ) -> operating_point_movement_mod.OperatingPointMovement: assert isinstance( self._operating_point_movement, operating_point_movement_mod.OperatingPointMovement, ) return self._operating_point_movement @property def airplane_movements( self, ) -> tuple[airplane_movement_mod.AirplaneMovement, ...]: return cast( tuple[airplane_movement_mod.AirplaneMovement, ...], self._airplane_movements, ) @property def num_cycles(self) -> int | None: return self._num_cycles @property def num_chords(self) -> int | None: return self._num_chords @property def max_wake_chords(self) -> int | None: return self._max_wake_chords @property def max_wake_cycles(self) -> int | None: return self._max_wake_cycles @property def airplanes(self) -> tuple[tuple[geometry.airplane.Airplane, ...], ...]: return self._airplanes @property def operating_points(self) -> tuple[operating_point_mod.OperatingPoint, ...]: return self._operating_points
# Oversampling factor for the non static cached path. The high resolution # Movement is built with _NON_STATIC_CACHE_OVERSAMPLE * max_num_steps # intervals (and therefore one more snapshot) so the maximum and half maximum # candidates have integer strides and other candidates stay within roughly # half a high resolution step of the nominal time. _NON_STATIC_CACHE_OVERSAMPLE: int = 2 def _compute_wake_area_mismatch( delta_time: float, airplane_movements: list[airplane_movement_mod.AirplaneMovement], operating_point_movement: operating_point_movement_mod.OperatingPointMovement, num_steps: int | None = None, ) -> float: """Computes the average area mismatch between wake and bound ring vortices. Builds a temporary Movement and UnsteadyProblem so the panel vertex points are populated in the first Airplane's geometry axes (GP1_CgP1), then computes bound trailing edge ring vortex areas and wake first row ring vortex areas analytically from the panel corners. Skips solver instantiation, the per step initialize_step_geometry walk, the bound corner stack population, and the wake corner stack population. The area mismatch metric measures how well the wake ring vortex sizing matches the bound ring vortex sizing. A lower value indicates better matching. The number of time steps checked is picked to capture the full range of differences in areas for the wake and bound ring vortex child parent pairs. For static cases, this is just a single time step. For non static cases, it is enough time steps to cover one full maximum length period of motion. Bound trailing edge ring vortex back vertices are computed exactly as UnsteadyRingVortexLatticeMethodSolver._initialize_panel_vortices_at would: at step 0, panel.B[lr]pp + 0.25 * vInf * dt; at step k > 0, 0.75 * panel.B[lr]pp(k) + 0.25 * panel.B[lr]pp(k - 1) + 0.25 * vInf(k) * dt. For the first wake-row case corresponding to step == 1, the wake ring vortex area reduces algebraically to dt * |cross(Brrvp(k) - Blrvp(k), vInf(k - 1))|. For later steps (step >= 2), the wake first row ring vortex area depends on both current and previous bound trailing edge vertices and is computed using the general quadrilateral-diagonal formula. The advection velocity uses step k - 1 because the solver populates the wake using the previous step's freestream velocity. :param delta_time: The delta_time value to test. It must be a positive float. Its units are in seconds. :param airplane_movements: The AirplaneMovements defining the motion. :param operating_point_movement: The OperatingPointMovement. :param num_steps: The number of time steps to simulate. If None (the default), derived from ceil(max_period / delta_time). Callers that already know the intended integer step count should pass it explicitly to match the semantics of the cached non-static evaluator and avoid floating point rounding in the inferred step count. :return: The average area mismatch. The absolute percent error between the area of shed wake ring vortices and the area of their parent bound ring vortices (at the time step where they were shed). Averaged across all time steps and all pairs of child and parent ring vortices. A lower value indicates better matching. """ airplane_movements_copy = copy.deepcopy(airplane_movements) operating_point_movement_copy = copy.deepcopy(operating_point_movement) if num_steps is None: max_airplane_movement_period = 0.0 for airplane_movement in airplane_movements_copy: max_airplane_movement_period = max( airplane_movement.max_period, max_airplane_movement_period ) max_period = max( max_airplane_movement_period, operating_point_movement_copy.max_period ) # Calculate the number of time steps to traverse the max period (or just a # single time step if there is no movement). num_steps = 1 if max_period > 0.0: num_steps = math.ceil(max_period / delta_time) temp_movement = Movement( airplane_movements=airplane_movements_copy, operating_point_movement=operating_point_movement_copy, delta_time=delta_time, num_steps=num_steps, ) # UnsteadyProblem populates panel.*_GP1_CgP1 attributes on every Airplane snapshot, # which is the frame the solver's bound ring vortex construction reads. temp_problem = problems.UnsteadyProblem(movement=temp_movement) if num_steps < 2: return 0.0 # Cache vInf in GP1_CgP1 frame per step. v_inf_per_step = [ steady_problem.operating_point.vInf_GP1__E for steady_problem in temp_problem.steady_problems ] total_mismatch = 0.0 num_comparisons = 0 for step in range(1, num_steps): current_problem = temp_problem.steady_problems[step] previous_problem = temp_problem.steady_problems[step - 1] v_inf_curr = v_inf_per_step[step] v_inf_prev = v_inf_per_step[step - 1] for airplane_id, current_airplane in enumerate(current_problem.airplanes): previous_airplane = previous_problem.airplanes[airplane_id] for wing_id, current_wing in enumerate(current_airplane.wings): previous_wing = previous_airplane.wings[wing_id] current_panels = current_wing.panels if current_panels is None: continue previous_panels = previous_wing.panels if previous_panels is None: continue num_chordwise_panels = previous_wing.num_chordwise_panels trailing_edge_chordwise_index = num_chordwise_panels - 1 num_spanwise_panels = current_panels.shape[1] # Stack TE panel corners across spanwise positions for vectorized math. # Shape: (num_spanwise_panels, 3) per array. Flpp_curr = np.stack( [ current_panels[trailing_edge_chordwise_index, s].Flpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Frpp_curr = np.stack( [ current_panels[trailing_edge_chordwise_index, s].Frpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Blpp_curr = np.stack( [ current_panels[trailing_edge_chordwise_index, s].Blpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Brpp_curr = np.stack( [ current_panels[trailing_edge_chordwise_index, s].Brpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Flpp_prev = np.stack( [ previous_panels[trailing_edge_chordwise_index, s].Flpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Frpp_prev = np.stack( [ previous_panels[trailing_edge_chordwise_index, s].Frpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Blpp_prev = np.stack( [ previous_panels[trailing_edge_chordwise_index, s].Blpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Brpp_prev = np.stack( [ previous_panels[trailing_edge_chordwise_index, s].Brpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) # Bound RingVortex front vertices at step - 1 (panel quarter chord). Flrvp_prev = 0.75 * Flpp_prev + 0.25 * Blpp_prev Frrvp_prev = 0.75 * Frpp_prev + 0.25 * Brpp_prev # Bound RingVortex back vertices at step - 1 (TE special case). if step - 1 == 0: Blrvp_prev = Blpp_prev + 0.25 * v_inf_prev * delta_time Brrvp_prev = Brpp_prev + 0.25 * v_inf_prev * delta_time else: previous_previous_problem = temp_problem.steady_problems[step - 2] previous_previous_panels = ( previous_previous_problem.airplanes[airplane_id] .wings[wing_id] .panels ) assert previous_previous_panels is not None Blpp_prev_prev = np.stack( [ previous_previous_panels[ trailing_edge_chordwise_index, s ].Blpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Brpp_prev_prev = np.stack( [ previous_previous_panels[ trailing_edge_chordwise_index, s ].Brpp_GP1_CgP1 for s in range(num_spanwise_panels) ] ) Blrvp_prev = ( 0.75 * Blpp_prev + 0.25 * Blpp_prev_prev + 0.25 * v_inf_prev * delta_time ) Brrvp_prev = ( 0.75 * Brpp_prev + 0.25 * Brpp_prev_prev + 0.25 * v_inf_prev * delta_time ) # Bound area = 0.5 * |cross(Frrvp - Blrvp, Flrvp - Brrvp)|. bound_diag1 = Frrvp_prev - Blrvp_prev bound_diag2 = Flrvp_prev - Brrvp_prev bound_areas = 0.5 * np.linalg.norm( np.cross(bound_diag1, bound_diag2), axis=1 ) # Bound TE RingVortex back vertices at the current step. These define # the front of the wake first row RingVortex at the current step. Blrvp_curr = ( 0.75 * Blpp_curr + 0.25 * Blpp_prev + 0.25 * v_inf_curr * delta_time ) Brrvp_curr = ( 0.75 * Brpp_curr + 0.25 * Brpp_prev + 0.25 * v_inf_curr * delta_time ) # Wake first row area depends on how the solver's wake populator # constructs the back vertices. At step 1 (first wake population, run # at solver step 0), there is no preexisting wake grid: the populator # sets row 0 to the bound TE back vertices at step 1 and row 1 to # row 0 + vInf(0) * dt. The area collapses to # dt * |cross(Brrvp_curr - Blrvp_curr, vInf_prev)|. At step k >= 2, # the populator copies the previous step's wake grid forward, advects # it by vInf(k - 1) * dt, then prepends a new front row from bound TE # back vertices at step k. The first wake row's back vertices are # therefore Blrvp_prev + vInf_prev * dt and Brrvp_prev + # vInf_prev * dt, where Blrvp_prev and Brrvp_prev are the previous # step's bound TE back vertices already computed above. if step == 1: span_vec = Brrvp_curr - Blrvp_curr wake_areas = delta_time * np.linalg.norm( np.cross(span_vec, v_inf_prev), axis=1 ) else: Flwrvp = Blrvp_curr Frwrvp = Brrvp_curr Blwrvp = Blrvp_prev + v_inf_prev * delta_time Brwrvp = Brrvp_prev + v_inf_prev * delta_time wake_diag1 = Frwrvp - Blwrvp wake_diag2 = Flwrvp - Brwrvp wake_areas = 0.5 * np.linalg.norm( np.cross(wake_diag1, wake_diag2), axis=1 ) epsilon = 1e-12 valid = np.abs(bound_areas) > epsilon if np.any(valid): total_mismatch += float( np.sum( np.abs(wake_areas[valid] - bound_areas[valid]) / bound_areas[valid] ) ) num_comparisons += int(np.sum(valid)) if num_comparisons == 0: return 0.0 return total_mismatch / num_comparisons def _compute_wake_area_mismatches_cached_non_static( airplane_movements: list[airplane_movement_mod.AirplaneMovement], operating_point_movement: operating_point_movement_mod.OperatingPointMovement, lcm_period: float, num_steps_candidates: list[int], ) -> dict[int, float]: """Computes wake area mismatch for many integer num_steps candidates from one shared high resolution Movement. Builds a single Movement with M = _NON_STATIC_CACHE_OVERSAMPLE * max(num_steps_candidates) snapshots covering one LCM period of motion. For each candidate N, the snapshots needed at delta_time = lcm_period / N are looked up via linear interpolation between adjacent high resolution samples, then the same closed form bound and wake area formulas as _compute_wake_area_mismatch are applied. The Movement and UnsteadyProblem cost is paid once per optimizer call rather than once per candidate, which collapses the dominant per evaluation cost into a single up front build. The linear interpolation introduces a per panel position error bounded by O(omega^2 * dt^2 * amplitude) where dt is the high resolution sample spacing in time. For typical flapping kinematics this is small enough that the optimizer's selected num_steps matches the result that would be obtained from a fresh Movement and UnsteadyProblem build per candidate. :param airplane_movements: The AirplaneMovements defining the motion. :param operating_point_movement: The OperatingPointMovement. :param lcm_period: The LCM of all motion periods. It must be a positive float. Its units are in seconds. :param num_steps_candidates: The integer num_steps values to score. Must be non empty and contain only positive ints. :return: A dict mapping each candidate num_steps value to its computed mismatch. """ max_candidate = max(num_steps_candidates) high_res_num_intervals = _NON_STATIC_CACHE_OVERSAMPLE * max_candidate high_res_dt = lcm_period / high_res_num_intervals # The Movement covers high_res_num_intervals + 1 snapshots so that one full # LCM period worth of intervals can be sampled. high_res_num_steps = high_res_num_intervals + 1 airplane_movements_copy = copy.deepcopy(airplane_movements) operating_point_movement_copy = copy.deepcopy(operating_point_movement) temp_movement = Movement( airplane_movements=airplane_movements_copy, operating_point_movement=operating_point_movement_copy, delta_time=high_res_dt, num_steps=high_res_num_steps, ) temp_problem = problems.UnsteadyProblem(movement=temp_movement) first_problem = temp_problem.steady_problems[0] # Per (airplane, wing) cache of TE panel corners. Each stored array has # shape (high_res_num_steps, num_spanwise_panels, 3) so that strided # advanced indexing produces (N + 1, num_spanwise_panels, 3) views per # candidate. cache_per_wing: list[dict] = [] for airplane_id, airplane in enumerate(first_problem.airplanes): for wing_id, wing in enumerate(airplane.wings): if wing.panels is None: continue num_chordwise = wing.num_chordwise_panels te_idx = num_chordwise - 1 num_spanwise = wing.panels.shape[1] Flpp = np.empty((high_res_num_steps, num_spanwise, 3), dtype=float) Frpp = np.empty_like(Flpp) Blpp = np.empty_like(Flpp) Brpp = np.empty_like(Flpp) for step in range(high_res_num_steps): step_panels = ( temp_problem.steady_problems[step] .airplanes[airplane_id] .wings[wing_id] .panels ) assert step_panels is not None for s in range(num_spanwise): panel = step_panels[te_idx, s] Flpp[step, s] = panel.Flpp_GP1_CgP1 Frpp[step, s] = panel.Frpp_GP1_CgP1 Blpp[step, s] = panel.Blpp_GP1_CgP1 Brpp[step, s] = panel.Brpp_GP1_CgP1 cache_per_wing.append( { "Flpp": Flpp, "Frpp": Frpp, "Blpp": Blpp, "Brpp": Brpp, } ) v_inf_high_res = np.array( [ steady_problem.operating_point.vInf_GP1__E for steady_problem in temp_problem.steady_problems ] ) results: dict[int, float] = {} for num_steps in num_steps_candidates: results[num_steps] = _evaluate_cached_wake_area_mismatch( cache_per_wing=cache_per_wing, v_inf_high_res=v_inf_high_res, lcm_period=lcm_period, high_res_num_intervals=high_res_num_intervals, num_steps=num_steps, ) return results def _evaluate_cached_wake_area_mismatch( cache_per_wing: list[dict], v_inf_high_res: np.ndarray, lcm_period: float, high_res_num_intervals: int, num_steps: int, ) -> float: """Evaluates wake area mismatch at one candidate num_steps using a shared cache. Maps each candidate step k in [0, num_steps - 1] to a fractional high resolution index k * high_res_num_intervals / num_steps and linearly interpolates the cached panel corners and freestream between adjacent high resolution samples. Linear interpolation has O(dt^2) error in panel position for smooth motion, which is small enough at the oversampling factors used here that the integer num_steps picked by the optimizer matches the result obtained from a fresh per candidate Movement and UnsteadyProblem build. When the fractional index is an integer (e.g. for candidates that divide high_res_num_intervals exactly) the interpolation reduces to a direct lookup. The resulting per panel positions feed the same closed form bound and wake area formulas as _compute_wake_area_mismatch. :param cache_per_wing: A list of per (airplane, wing) caches produced by _compute_wake_area_mismatches_cached_non_static. Each entry is a dict with keys "Flpp", "Frpp", "Blpp", "Brpp" mapping to (high_res_num_intervals + 1, num_spanwise_panels, 3) ndarrays of trailing edge panel corner positions in the first Airplane's geometry axes (GP1_CgP1). :param v_inf_high_res: A (high_res_num_intervals + 1, 3) ndarray of freestream velocity samples in the first Airplane's geometry axes (GP1_CgP1), one per high resolution snapshot. :param lcm_period: The LCM of all motion periods. It must be a positive float. Its units are in seconds. :param high_res_num_intervals: The number of high resolution intervals covering one LCM period. The cached arrays have high_res_num_intervals + 1 samples along their leading axis. :param num_steps: The candidate integer number of steps per LCM period to score. Must be a positive int. Returns 0.0 immediately for num_steps < 2 since at least one step pair is needed for a comparison. :return: The average area mismatch at this candidate. The absolute percent error between the area of shed wake RingVortices and the area of their parent bound RingVortices (at the time step where they were shed). Averaged across all time steps and all pairs of child and parent RingVortices. A lower value indicates better matching. """ if num_steps < 2: return 0.0 delta_time = lcm_period / num_steps # Linear interpolation mapping from candidate steps to high resolution # samples. The floor index gives the lower bracketing sample; the fractional # part is the interpolation weight on the next sample. Only num_steps # candidate samples are generated since the comparison loop reads indices # [0, num_steps - 1]; the t = lcm_period sample would be dead. fractional_indices = np.arange(num_steps) * high_res_num_intervals / num_steps floor_indices = np.floor(fractional_indices).astype(int) weights_next = fractional_indices - floor_indices next_indices = floor_indices + 1 weights_floor = 1.0 - weights_next # Reshape weights for broadcasting against (num_samples, num_spanwise, 3) # panel arrays. weights_floor_b = weights_floor[:, None, None] weights_next_b = weights_next[:, None, None] # Reshape weights for broadcasting against (num_samples, 3) vInf arrays. weights_floor_v = weights_floor[:, None] weights_next_v = weights_next[:, None] total_mismatch = 0.0 num_comparisons = 0 for cache in cache_per_wing: # Sample and linearly interpolate the cached panel attribute arrays at # the candidate step times. Each result has shape (num_steps, # num_spanwise_panels, 3); v_inf has shape (num_steps, 3). Flpp = ( weights_floor_b * cache["Flpp"][floor_indices] + weights_next_b * cache["Flpp"][next_indices] ) Frpp = ( weights_floor_b * cache["Frpp"][floor_indices] + weights_next_b * cache["Frpp"][next_indices] ) Blpp = ( weights_floor_b * cache["Blpp"][floor_indices] + weights_next_b * cache["Blpp"][next_indices] ) Brpp = ( weights_floor_b * cache["Brpp"][floor_indices] + weights_next_b * cache["Brpp"][next_indices] ) v_inf = ( weights_floor_v * v_inf_high_res[floor_indices] + weights_next_v * v_inf_high_res[next_indices] ) # Comparison axis: index i in [0, num_steps - 2] maps to # step = i + 1, prev_step = i. Slice the panel arrays accordingly so # bound and wake area calculations can run as one vectorized batch # rather than a Python step loop. Flpp_prev_arr = Flpp[: num_steps - 1] Frpp_prev_arr = Frpp[: num_steps - 1] Blpp_prev_arr = Blpp[: num_steps - 1] Brpp_prev_arr = Brpp[: num_steps - 1] Blpp_curr_arr = Blpp[1:num_steps] Brpp_curr_arr = Brpp[1:num_steps] v_inf_prev_arr = v_inf[: num_steps - 1, None, :] v_inf_curr_arr = v_inf[1:num_steps, None, :] # Bound RingVortex front vertices at the previous step (panel quarter # chord, no derivative term). Flrvp_prev = 0.75 * Flpp_prev_arr + 0.25 * Blpp_prev_arr Frrvp_prev = 0.75 * Frpp_prev_arr + 0.25 * Brpp_prev_arr # Bound RingVortex back vertices at the previous step. For comparison # index 0 (prev_step == 0) there is no panel at step - 2; substituting # the step - 1 panel makes the derivative formula collapse to the # solver's no-derivative form 0.75*Blpp + 0.25*Blpp + 0.25*vInf*dt = # Blpp + 0.25*vInf*dt, so we get the correct value for both i == 0 and # i >= 1 from a single vectorized expression. Blpp_prev_prev_arr = np.concatenate( [Blpp_prev_arr[:1], Blpp_prev_arr[:-1]], axis=0 ) Brpp_prev_prev_arr = np.concatenate( [Brpp_prev_arr[:1], Brpp_prev_arr[:-1]], axis=0 ) Blrvp_prev = ( 0.75 * Blpp_prev_arr + 0.25 * Blpp_prev_prev_arr + 0.25 * v_inf_prev_arr * delta_time ) Brrvp_prev = ( 0.75 * Brpp_prev_arr + 0.25 * Brpp_prev_prev_arr + 0.25 * v_inf_prev_arr * delta_time ) # Bound areas across all comparison steps. Shape: (num_steps - 1, # num_spanwise_panels). bound_diag1 = Frrvp_prev - Blrvp_prev bound_diag2 = Flrvp_prev - Brrvp_prev bound_areas = 0.5 * np.linalg.norm(np.cross(bound_diag1, bound_diag2), axis=-1) # Bound RingVortex back vertices at the current step. These are the # front vertices of the wake first row at the current step. Blrvp_curr = ( 0.75 * Blpp_curr_arr + 0.25 * Blpp_prev_arr + 0.25 * v_inf_curr_arr * delta_time ) Brrvp_curr = ( 0.75 * Brpp_curr_arr + 0.25 * Brpp_prev_arr + 0.25 * v_inf_curr_arr * delta_time ) # Wake first row back vertices. At comparison index 0 (step == 1, first # wake population) the solver sets row 1 = row 0 + vInf * dt, so the # wake's back row is the bound TE back at the *current* step advected. # At i >= 1 the wake's back row is the bound TE back at the *previous* # step advected. Substituting Blrvp_curr[0] for index 0 of the previous # step array makes a single vectorized expression cover both cases. Blrvp_prev_for_wake = np.concatenate([Blrvp_curr[:1], Blrvp_prev[1:]], axis=0) Brrvp_prev_for_wake = np.concatenate([Brrvp_curr[:1], Brrvp_prev[1:]], axis=0) Flwrvp = Blrvp_curr Frwrvp = Brrvp_curr Blwrvp = Blrvp_prev_for_wake + v_inf_prev_arr * delta_time Brwrvp = Brrvp_prev_for_wake + v_inf_prev_arr * delta_time wake_diag1 = Frwrvp - Blwrvp wake_diag2 = Flwrvp - Brwrvp wake_areas = 0.5 * np.linalg.norm(np.cross(wake_diag1, wake_diag2), axis=-1) epsilon = 1e-12 valid = np.abs(bound_areas) > epsilon if np.any(valid): total_mismatch += float( np.sum( np.abs(wake_areas[valid] - bound_areas[valid]) / bound_areas[valid] ) ) num_comparisons += int(np.sum(valid)) if num_comparisons == 0: return 0.0 return total_mismatch / num_comparisons def _optimize_delta_time( airplane_movements: list[airplane_movement_mod.AirplaneMovement], operating_point_movement: operating_point_movement_mod.OperatingPointMovement, initial_delta_time: float, mismatch_cutoff: float = 0.01, ) -> float: """Finds an optimal delta_time by minimizing wake area mismatch. Optimizes delta_time to minimize the area mismatch between wake ring vortices and their parent bound trailing edge ring vortices. This produces better results at high Strouhal numbers where motion induced velocity is significant. For non static Movements, the optimization uses a brute force search over integer num_steps_per_lcm_cycle values (from 0.5x to 2x the initial estimate). This ensures the resulting delta_time divides the LCM period evenly. For static Movements, the optimization uses scipy.optimize.minimize_scalar with early termination if the mismatch falls below the specified cutoff value. :param airplane_movements: The AirplaneMovements defining the motion. :param operating_point_movement: The OperatingPointMovement. :param initial_delta_time: The initial estimate, typically the result of _analytically_optimize_delta_time. It must be a positive float. Its units are in seconds. :param mismatch_cutoff: A positive float for the optimization's convergence threshold. Only used for static Movements. When the average area mismatch (which is an absolute percent error) falls below this value, the search terminates early. The default is 0.01. :return: The optimized delta_time value. Its units are in seconds. """ movement_logger.info("Starting delta_time optimization.") # Collect all non zero periods to determine if static. all_periods: list[float] = [] for airplane_movement in airplane_movements: all_periods.extend(airplane_movement.all_periods) op_period = operating_point_movement.max_period if op_period != 0.0: all_periods.append(op_period) non_zero_periods = [p for p in all_periods if p != 0.0] if not non_zero_periods: # Static case: use scipy continuous optimization. return _optimize_delta_time_static( airplane_movements=airplane_movements, operating_point_movement=operating_point_movement, initial_delta_time=initial_delta_time, mismatch_cutoff=mismatch_cutoff, ) else: # Non static case: brute force search over integer num_steps_per_lcm_cycle. lcm_period = _core.lcm_multiple(non_zero_periods) return _optimize_delta_time_non_static( airplane_movements=airplane_movements, operating_point_movement=operating_point_movement, initial_delta_time=initial_delta_time, lcm_period=lcm_period, ) def _optimize_delta_time_static( airplane_movements: list[airplane_movement_mod.AirplaneMovement], operating_point_movement: operating_point_movement_mod.OperatingPointMovement, initial_delta_time: float, mismatch_cutoff: float, ) -> float: """Optimizes delta_time for static Movements using scipy. Uses scipy.optimize.minimize_scalar to find the delta_time that minimizes wake area mismatch. The search terminates early if the mismatch falls below the specified cutoff value. :param airplane_movements: The AirplaneMovements defining the motion. :param operating_point_movement: The OperatingPointMovement. :param initial_delta_time: The initial estimate. It must be a positive float. Its units are in seconds. :param mismatch_cutoff: A positive float for early termination. When the average area mismatch falls below this value, the search terminates early. :return: The optimized delta_time value. Its units are in seconds. """ lower_bound = initial_delta_time / 2.0 upper_bound = initial_delta_time * 2.0 # Check initial estimate first before running optimizer. initial_mismatch = _compute_wake_area_mismatch( initial_delta_time, airplane_movements, operating_point_movement ) dt_str = str(round(initial_delta_time, 6)) mismatch_str = str(round(initial_mismatch, 6)) state_msg = "\tState: delta_time=" + dt_str obj_msg = "\t\tMismatch: " + mismatch_str movement_logger.info(state_msg) movement_logger.info(obj_msg) if initial_mismatch < mismatch_cutoff: movement_logger.info("Acceptable value reached.") movement_logger.info("Optimization complete.") return initial_delta_time best_delta_time = initial_delta_time best_mismatch = initial_mismatch def objective(dt: float) -> float: nonlocal best_delta_time, best_mismatch mismatch = _compute_wake_area_mismatch( dt, airplane_movements, operating_point_movement ) this_dt_str = str(round(dt, 6)) this_mismatch_str = str(round(mismatch, 6)) this_state_msg = "\tState: delta_time=" + this_dt_str this_obj_msg = "\t\tMismatch: " + this_mismatch_str movement_logger.info(this_state_msg) movement_logger.info(this_obj_msg) if mismatch < best_mismatch: best_mismatch = mismatch best_delta_time = dt if mismatch < mismatch_cutoff: raise StopIteration return mismatch try: result = sp_opt.minimize_scalar( objective, bounds=(lower_bound, upper_bound), method="bounded", options={"xatol": 0.001}, ) if not result.success: raise RuntimeError("delta_time optimization failed to converge.") optimized_delta_time = float(result.x) except StopIteration: optimized_delta_time = best_delta_time movement_logger.info("Acceptable value reached.") # Warn if the optimized value is at one of the bounds. bound_tolerance = 1e-6 if abs(optimized_delta_time - lower_bound) < bound_tolerance: movement_logger.warning( "Optimized delta_time is at the lower bound. A better value may exist " "below the search range." ) elif abs(optimized_delta_time - upper_bound) < bound_tolerance: movement_logger.warning( "Optimized delta_time is at the upper bound. A better value may exist " "above the search range." ) movement_logger.info("Optimization complete.") return optimized_delta_time def _optimize_delta_time_non_static( airplane_movements: list[airplane_movement_mod.AirplaneMovement], operating_point_movement: operating_point_movement_mod.OperatingPointMovement, initial_delta_time: float, lcm_period: float, ) -> float: """Optimizes delta_time for non static Movements using brute force search. Searches over integer num_steps_per_lcm_cycle values from 0.5x to 2x the initial estimate. This ensures the resulting delta_time divides the LCM period evenly, which is important for clean cycle boundaries when averaging. :param airplane_movements: The AirplaneMovements defining the motion. :param operating_point_movement: The OperatingPointMovement. :param initial_delta_time: The initial estimate. It must be a positive float. Its units are in seconds. :param lcm_period: The LCM of all motion periods. It must be a positive float. Its units are in seconds. :return: The optimized delta_time value. Its units are in seconds. """ # Convert initial_delta_time to num_steps_per_lcm_cycle. initial_num_steps = lcm_period / initial_delta_time # Search from 0.5x to 2x the initial estimate. min_num_steps = max(1, int(initial_num_steps / 2)) max_num_steps = int(initial_num_steps * 2) + 1 movement_logger.info( "\tSearching num_steps_per_lcm_cycle from " + str(min_num_steps) + " to " + str(max_num_steps) ) candidates = list(range(min_num_steps, max_num_steps + 1)) # Build one high resolution Movement and score every integer candidate against it # by linearly interpolating the cached panel corners. This collapses the per # candidate Movement and UnsteadyProblem cost into a single up front build for # the entire bracket. cached_mismatches = _compute_wake_area_mismatches_cached_non_static( airplane_movements=airplane_movements, operating_point_movement=operating_point_movement, lcm_period=lcm_period, num_steps_candidates=candidates, ) best_num_steps = min_num_steps best_mismatch = float("inf") for num_steps in candidates: mismatch = cached_mismatches[num_steps] delta_time = lcm_period / num_steps movement_logger.info( "\tnum_steps=" + str(num_steps) + ", delta_time=" + str(round(delta_time, 6)) + ", mismatch=" + str(round(mismatch, 6)) ) if mismatch < best_mismatch: best_mismatch = mismatch best_num_steps = num_steps optimized_delta_time = lcm_period / best_num_steps movement_logger.info( "Optimization complete. Best: num_steps_per_lcm_cycle=" + str(best_num_steps) + ", delta_time=" + str(round(optimized_delta_time, 6)) + ", mismatch=" + str(round(best_mismatch, 6)) ) # Warn if the optimized value is at one of the bounds. if best_num_steps == min_num_steps: movement_logger.warning( "Optimized num_steps_per_lcm_cycle is at the lower bound (" + str(min_num_steps) + "). A better value may exist below the search range." ) elif best_num_steps == max_num_steps: movement_logger.warning( "Optimized num_steps_per_lcm_cycle is at the upper bound (" + str(max_num_steps) + "). A better value may exist above the search range." ) return optimized_delta_time def _analytically_optimize_delta_time( airplane_movements: list[airplane_movement_mod.AirplaneMovement], operating_point_movement: operating_point_movement_mod.OperatingPointMovement, initial_delta_time: float, ) -> float: """Analytically estimates the optimal delta_time from wake displacement. Estimates the delta_time that produces wake ring vortices with roughly the same chord length as the bound trailing edge ring vortices, accounting for both freestream and geometry motion velocities. This is faster than _optimize_delta_time but may be slightly less accurate. The algorithm works by: (1) computing a very small preliminary delta_time as the minimum motion period divided by 100 (capped by a maximum of 1000 total time steps for one cycle of the motion's LCM period), (2) creating a temporary Movement and UnsteadyProblem to generate Airplane geometry at each time step (with all Panel coordinates transformed into the first Airplane's geometry axes), (3) measuring the average wake displacement per time step for each Wing's trailing edge Panels (combining freestream velocity and geometry motion, both in the first Airplane's geometry axes), and (4) choosing a delta_time such that each wake ring vortex has approximately the same chord as its parent bound ring vortex (averaged across the one LCM period). :param airplane_movements: The AirplaneMovements defining the motion. :param operating_point_movement: The OperatingPointMovement. :param initial_delta_time: The initial estimate from the chord-based seed. It must be a positive float. Its units are in seconds. Used as a fallback for static Movements or degenerate cases. :return: The analytically optimized delta_time value. Its units are in seconds. """ movement_logger.info("Starting analytical delta_time optimization.") # Collect all non zero periods. all_periods: list[float] = [] for airplane_movement in airplane_movements: all_periods.extend(airplane_movement.all_periods) op_period = operating_point_movement.max_period if op_period != 0.0: all_periods.append(op_period) non_zero_periods = [p for p in all_periods if p != 0.0] # If there is no motion, fall back to the initial estimate. if not non_zero_periods: movement_logger.info( "All motion is static. Returning initial delta_time estimate." ) return initial_delta_time min_period = min(non_zero_periods) lcm_period = _core.lcm_multiple(non_zero_periods) # Step 1: Compute a preliminary delta_time that divides the LCM period into roughly # min_period / 100 sized steps. Cap at 1000 steps to prevent excessive computation # for cases with very large LCM to min period ratios. preliminary_delta_time = min_period / 100.0 preliminary_num_steps = round(lcm_period / preliminary_delta_time) max_preliminary_steps = 1000 if preliminary_num_steps > max_preliminary_steps: movement_logger.warning( "Preliminary num_steps (" + str(preliminary_num_steps) + ") exceeds cap of " + str(max_preliminary_steps) + ". Capping to prevent excessive computation." ) preliminary_num_steps = max_preliminary_steps if preliminary_num_steps < 1: preliminary_num_steps = 1 # Adjust to ensure an integer number of steps fits the LCM period. preliminary_delta_time = lcm_period / preliminary_num_steps # Step 2: Deep copy the movement objects and create a temporary Movement and # UnsteadyProblem. The UnsteadyProblem creates SteadyProblems that transform all # Panel coordinates into the first Airplane's geometry axes (GP1_CgP1), ensuring # consistent coordinate frames for multi airplane formation flight cases. airplane_movements_copy = copy.deepcopy(airplane_movements) operating_point_movement_copy = copy.deepcopy(operating_point_movement) temp_movement = Movement( airplane_movements=airplane_movements_copy, operating_point_movement=operating_point_movement_copy, delta_time=preliminary_delta_time, num_steps=preliminary_num_steps + 1, ) temp_problem = problems.UnsteadyProblem(movement=temp_movement) # Step 3: For each Airplane and Wing, measure the average wake displacement of # trailing edge Panels across all time steps. All coordinates are in the first # Airplane's geometry axes (GP1), relative to the first Airplane's CG (CgP1). wing_num_steps_values: list[float] = [] wing_num_spanwise_panels_values: list[int] = [] num_airplanes = len(temp_problem.steady_problems[0].airplanes) for airplane_id in range(num_airplanes): # Use the first generated Airplane to get Wing properties, since the base # Airplane may have different panel counts (e.g., due to symmetry processing # that occurs during generate_airplanes). first_airplane = temp_problem.steady_problems[0].airplanes[airplane_id] for wing_id, wing in enumerate(first_airplane.wings): num_chordwise = wing.num_chordwise_panels _panels = wing.panels assert _panels is not None num_spanwise = _panels.shape[1] # Compute the mean chordwise width of trailing edge Panels. This is # the target chord length for wake ring vortices. We use the trailing # edge Panel chord directly (rather than standard_mean_chord / # num_chordwise) because non uniform chordwise spacing can cause # trailing edge Panels to have different chords than average. total_te_panel_chord = 0.0 for spanwise_id in range(num_spanwise): te_panel = _panels[num_chordwise - 1, spanwise_id] _leftLeg_GP1 = te_panel.leftLeg_GP1 _rightLeg_GP1 = te_panel.rightLeg_GP1 assert _leftLeg_GP1 is not None assert _rightLeg_GP1 is not None # Average the left and right leg magnitudes for this Panel. panel_chord = ( float(np.linalg.norm(_leftLeg_GP1)) + float(np.linalg.norm(_rightLeg_GP1)) ) / 2.0 total_te_panel_chord += panel_chord mean_te_panel_chord = total_te_panel_chord / num_spanwise # Accumulate displacement distance across all trailing edge Panels # across all consecutive time step pairs. total_distance = 0.0 num_measurements = 0 for spanwise_id in range(num_spanwise): spanwise_distance = 0.0 for step in range(preliminary_num_steps): # Get this and next time step's trailing edge Panel. this_airplane = temp_problem.steady_problems[step].airplanes[ airplane_id ] next_airplane = temp_problem.steady_problems[step + 1].airplanes[ airplane_id ] _this_panels = this_airplane.wings[wing_id].panels _next_panels = next_airplane.wings[wing_id].panels assert _this_panels is not None assert _next_panels is not None this_panel = _this_panels[num_chordwise - 1, spanwise_id] next_panel = _next_panels[num_chordwise - 1, spanwise_id] # Compute the trailing edge center at each time step (in the first # Airplane's geometry axes, relative to the first Airplane's CG). _thisBlpp_GP1_CgP1 = this_panel.Blpp_GP1_CgP1 _thisBrpp_GP1_CgP1 = this_panel.Brpp_GP1_CgP1 _nextBlpp_GP1_CgP1 = next_panel.Blpp_GP1_CgP1 _nextBrpp_GP1_CgP1 = next_panel.Brpp_GP1_CgP1 assert _thisBlpp_GP1_CgP1 is not None assert _thisBrpp_GP1_CgP1 is not None assert _nextBlpp_GP1_CgP1 is not None assert _nextBrpp_GP1_CgP1 is not None thisCenter_GP1_CgP1 = ( _thisBlpp_GP1_CgP1 + _thisBrpp_GP1_CgP1 ) / 2.0 nextCenter_GP1_CgP1 = ( _nextBlpp_GP1_CgP1 + _nextBrpp_GP1_CgP1 ) / 2.0 # Geometry displacement of the trailing edge (in the first # Airplane's geometry axes). geometryDisplacement_GP1 = nextCenter_GP1_CgP1 - thisCenter_GP1_CgP1 # Flow displacement during this time step (in the first Airplane's # geometry axes). vInf_GP1__E = temp_movement.operating_points[step].vInf_GP1__E flowDisplacement_GP1 = vInf_GP1__E * preliminary_delta_time # Wake displacement is the combination of freestream flow and the # opposite of geometry motion (the wake stays where the trailing # edge was, then convects with the flow). Both vectors are in the # first Airplane's geometry axes. wakeDisplacement_GP1 = ( flowDisplacement_GP1 - geometryDisplacement_GP1 ) spanwise_distance += float(np.linalg.norm(wakeDisplacement_GP1)) total_distance += spanwise_distance num_measurements += 1 # Average distance over one LCM period per spanwise Panel. average_distance = total_distance / num_measurements # The desired number of steps per LCM period is such that each wake # ring vortex chord equals the trailing edge bound Panel chord. wing_num_steps_per_lcm = average_distance / mean_te_panel_chord wing_num_steps_values.append(wing_num_steps_per_lcm) wing_num_spanwise_panels_values.append(num_spanwise) # Step 4: Compute the weighted average of num_steps across all Wings. if not wing_num_steps_values: movement_logger.info( "No valid wake displacement data. Returning initial delta_time estimate." ) return initial_delta_time total_weight = sum(wing_num_spanwise_panels_values) weighted_num_steps = ( sum( n * w for n, w in zip(wing_num_steps_values, wing_num_spanwise_panels_values) ) / total_weight ) if weighted_num_steps <= 0.0: movement_logger.info( "Computed num_steps is non positive. Returning initial delta_time estimate." ) return initial_delta_time # Round to an integer number of steps that fits the LCM period. final_num_steps = round(weighted_num_steps) if final_num_steps < 1: final_num_steps = 1 optimized_delta_time = lcm_period / final_num_steps dt_str = str(round(optimized_delta_time, 6)) movement_logger.info( "\tResult: delta_time=" + dt_str + " (" + str(final_num_steps) + " steps per LCM period)" ) # Warn if the result implies fewer than 20 time steps per minimum period of motion. # This indicates the trailing edge Panels are large relative to the motion, so # matching wake ring vortex area to bound ring vortex area requires a time step that # is large compared to the minimum period. This results in poor temporal resolution # of the fastest motion. Increasing the number of chordwise Panels or switching to # cosine chordwise spacing (which concentrates Panels at the leading and trailing # edges) will reduce the trailing edge Panels' chord lengths and allow finer time # stepping. steps_per_min_period = min_period / optimized_delta_time if steps_per_min_period < 20: movement_logger.warning( "Analytical optimization result implies only " + str(round(steps_per_min_period, 1)) + " time steps per minimum period of motion. This may cause poor temporal " + "resolution. Consider increasing the number of chordwise Panels or " + "switching to cosine chordwise spacing to reduce the trailing edge " + "Panels' chord lengths." ) movement_logger.info("Analytical optimization complete.") return optimized_delta_time