Source code for pybmodes.campbell._sweep

# Copyright 2024-2026 Jae Hoon Seo
# Marine Structural Mechanics and Integrity Lab (SMI Lab), Inha University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Rotor-speed sweep drivers and the public :func:`campbell_sweep` entry point.

Two solver paths:

- :func:`_solve_blade_sweep` — re-solves the blade FEM at each
  rotor speed in the input grid, tracks each mode across consecutive
  steps via :func:`_hungarian_assignment` on the per-step
  :func:`_mac_matrix`, and records the per-step MAC confidence.
- :func:`_solve_tower_once` — solves the tower model once at
  ``rot_rpm = 0`` and broadcasts the result across the sweep
  (tower modes are rotor-speed independent in an Earth-fixed frame).

The public :func:`campbell_sweep` is a thin orchestrator that
validates inputs, resolves the input(s) via :func:`_load_models`, runs
each side that's available, and packs the result into a
:class:`~pybmodes.campbell.result.CampbellResult`.
"""
from __future__ import annotations

import pathlib
from typing import TYPE_CHECKING

import numpy as np

from pybmodes.fem.normalize import NodeModeShape
from pybmodes.fem.platform_modes import _N_RIGID
from pybmodes.io.bmi import PlatformSupport
from pybmodes.models._pipeline import run_fem

from ._classify import (
    _label_blade_modes,
    _label_tower_modes_with_overrides,
    _participation,
)
from ._mac import _hungarian_assignment, _mac_matrix
from ._models import _load_models, _Model
from .result import CampbellResult

if TYPE_CHECKING:
    from pybmodes.models.blade import RotatingBlade
    from pybmodes.models.tower import Tower


def _solve_blade_sweep(
    blade: _Model,
    omega_rpm: np.ndarray,
    n_modes: int,
    track_by_mac: bool,
) -> tuple[np.ndarray, np.ndarray, list[str], np.ndarray]:
    """Run the rotor-speed sweep on the blade model.

    Returns ``(frequencies, participation, labels, mac_to_previous)``
    with shapes ``(n_steps, n_modes)``, ``(n_steps, n_modes, 3)``, a
    list of ``n_modes`` labels, and ``(n_steps, n_modes)`` per-step
    MAC values vs the immediately-preceding step (row 0 is NaN).
    Restores the original ``bbmi.rot_rpm`` after the sweep so the
    caller's BMI object is unmutated.
    """
    bbmi, bsp = blade
    original_rpm = float(getattr(bbmi, "rot_rpm", 0.0))
    n_steps = omega_rpm.size
    freqs = np.zeros((n_steps, n_modes))
    parts = np.zeros((n_steps, n_modes, 3))
    mac_to_prev = np.full((n_steps, n_modes), np.nan, dtype=float)
    slot_shapes: list[NodeModeShape] | None = None

    try:
        for step, rpm in enumerate(omega_rpm):
            bbmi.rot_rpm = float(rpm)
            modal = run_fem(bbmi, n_modes=n_modes, sp=bsp)
            # Defensive: the symmetric ``eigh`` solver always returns
            # exactly ``n_modes`` rows, but the rare general-eig
            # fallback (floating platforms with non-symmetric K,
            # ``solve_modes`` → ``scipy.linalg.eig``) can drop NaN
            # rows from a degenerate eigenproblem. Detect and fail
            # loudly rather than silently NaN-padding downstream.
            if len(modal.frequencies) < n_modes:
                raise RuntimeError(
                    f"campbell_sweep: at rot_rpm = {rpm:.3f}, the FEM "
                    f"solver returned only {len(modal.frequencies)} of "
                    f"the requested {n_modes} modes — typically a sign "
                    f"of a near-degenerate eigenproblem (rotating "
                    f"floating platforms with a non-symmetric "
                    f"PlatformSupport block at certain rotor speeds). "
                    f"Reduce ``n_blade_modes`` or use a finer RPM grid "
                    f"that avoids the degeneracy."
                )
            shapes = list(modal.shapes[:n_modes])
            f_step = np.asarray(modal.frequencies[:n_modes], dtype=float)
            p_step = np.array([_participation(s) for s in shapes])

            if step == 0 or not track_by_mac or slot_shapes is None:
                order = np.arange(n_modes, dtype=int)
                mac_row = np.full(n_modes, np.nan, dtype=float)
            else:
                mac = _mac_matrix(shapes, slot_shapes)
                order = _hungarian_assignment(mac)
                free = [s for s in range(n_modes) if s not in order]
                for k in range(n_modes):
                    if order[k] < 0 and free:
                        order[k] = free.pop(0)
                # MAC confidence of the chosen pairing per output slot.
                mac_row = np.empty(n_modes, dtype=float)
                for k in range(n_modes):
                    slot = int(order[k])
                    mac_row[slot] = float(mac[k, slot]) if slot >= 0 else np.nan

            for k in range(n_modes):
                slot = int(order[k])
                freqs[step, slot] = f_step[k]
                parts[step, slot, :] = p_step[k]
            mac_to_prev[step, :] = mac_row

            new_slot_shapes: list[NodeModeShape | None] = [None] * n_modes
            for k in range(n_modes):
                new_slot_shapes[int(order[k])] = shapes[k]
            slot_shapes = [s for s in new_slot_shapes if s is not None]
    finally:
        bbmi.rot_rpm = original_rpm

    labels = _label_blade_modes(parts[0])
    return freqs, parts, labels, mac_to_prev


def _solve_tower_once(
    tower: _Model,
    n_modes: int,
    n_steps: int,
) -> tuple[np.ndarray, np.ndarray, list[str]]:
    """Solve the tower once at ``rot_rpm = 0`` and broadcast across the sweep.

    Tower modes are rotor-speed-independent (the tower lives in an
    Earth-fixed frame), so a single eigensolve is enough; we tile the
    result across ``n_steps`` rows for shape compatibility with the
    blade-sweep output.
    """
    tbmi, tsp = tower

    # A floating tower carries six rigid-body modes (surge through yaw)
    # ahead of its first flexible bending mode. ``classify_platform_modes``
    # assigns those DOF names by a Hungarian matching over the leading
    # ``_N_RIGID`` modes, so its result is only stable when the solve
    # actually contains that whole block. Requesting fewer modes than the
    # rigid block (the default ``n_tower_modes = 4`` is below 6) truncated
    # the matching and could leave surge/sway unnamed, after which the
    # fallback mislabelled them as flexible "1st tower FA/SS". That
    # diverged from a direct ``Tower(...).run()`` whose larger solve named
    # them (the asymmetric-FOWT bug). So for a floating tower we always
    # solve the full rigid block, classify over it, then slice the result
    # back to the requested ``n_modes``. The labels then match the direct
    # solve regardless of ``n_tower_modes``.
    floating = tbmi.hub_conn == 2 and isinstance(tbmi.support, PlatformSupport)
    n_solve = max(n_modes, _N_RIGID) if floating else n_modes

    # Save and restore the caller's ``rot_rpm`` — tower modes are
    # rotor-speed-independent so we force ``0.0`` for the solve, but
    # we mustn't leave the caller's BMI mutated on the way out
    # (mirrors the try / finally pattern in ``_solve_blade_sweep``;
    # Pre-1.0 review caught this).
    original_rpm = tbmi.rot_rpm
    try:
        tbmi.rot_rpm = 0.0
        modal = run_fem(tbmi, n_modes=n_solve, sp=tsp)
    finally:
        tbmi.rot_rpm = original_rpm
    # Defensive: mirror the blade-sweep "too few modes" guard. The
    # symmetric ``eigh`` path always returns exactly ``n_solve`` rows,
    # but the rare general-eig fallback (floating ``PlatformSupport``
    # with non-symmetric ``hydro_K`` / ``mooring_K``) can drop NaN rows
    # from a degenerate eigenproblem and return fewer. Without this
    # guard the downstream ``np.broadcast_to`` call would raise a
    # cryptic shape error; we want the same friendly diagnostic the
    # blade path emits.
    if len(modal.frequencies) < n_solve:
        raise RuntimeError(
            f"campbell_sweep: tower solve returned only "
            f"{len(modal.frequencies)} of the requested {n_solve} "
            f"modes — typically a sign of a near-degenerate "
            f"eigenproblem (floating tower with a non-symmetric "
            f"PlatformSupport block). Reduce ``n_tower_modes``."
        )
    tshapes = list(modal.shapes[:n_solve])
    tfreqs_full = np.asarray(modal.frequencies[:n_solve], dtype=float)
    tparts_full = np.array([_participation(s) for s in tshapes])

    # Prefer the FEM's own platform-mode classification for a floating
    # tower (``ModalResult.mode_labels`` — populated only for
    # ``hub_conn == 2``); fall back to participation argmax for
    # cantilever / monopile towers and for flexible bending modes. The
    # classification is done over the full ``n_solve`` block so the
    # rigid-body labels are stable, then everything is sliced back to
    # the requested ``n_modes``.
    labels_full = _label_tower_modes_with_overrides(
        tparts_full, modal.mode_labels
    )
    tfreqs = tfreqs_full[:n_modes]
    tparts = tparts_full[:n_modes]
    labels = labels_full[:n_modes]

    freqs = np.broadcast_to(tfreqs, (n_steps, n_modes)).copy()
    parts = np.broadcast_to(tparts, (n_steps, n_modes, 3)).copy()
    return freqs, parts, labels


[docs] def campbell_sweep( input_path: str | pathlib.Path | RotatingBlade | Tower, omega_rpm: np.ndarray, n_blade_modes: int = 4, n_tower_modes: int = 4, *, tower_input: str | pathlib.Path | Tower | None = None, track_by_mac: bool = True, ) -> CampbellResult: """Build a Campbell-diagram dataset for the given turbine. Parameters ---------- input_path : Either a path **or an already-loaded model** (issue #51): - an OpenFAST ElastoDyn main ``.dat`` file — the function loads the blade *and* the tower from the deck and runs both; - a blade ``.bmi`` (``beam_type = 1``) — blade-only sweep unless ``tower_input`` is also supplied; - a tower ``.bmi`` (``beam_type = 2``) — tower-only result (frequencies are constant across ``omega_rpm``; the result is mostly useful for overlay against the per-rev family); - an already-constructed :class:`~pybmodes.models.RotatingBlade` or :class:`~pybmodes.models.Tower` (from *any* constructor — ``__init__``, ``from_elastodyn``, ``from_windio``, ``from_windio_floating``, …). The model is used **verbatim, with no disk re-read**, so a single load point feeds both ``.run()`` and the sweep, and a ``from_windio`` / ``from_elastodyn`` model (whose section properties no path can re-read) can finally be swept. Routed to blade/tower by its ``beam_type`` so either may be passed here. omega_rpm : 1-D array of rotor speeds in rpm. ``Ω = 0`` is fine and produces the parked-rotor frequencies. n_blade_modes : Number of blade modes to extract per speed and report in ``frequencies[:, :n_blade_modes]``. Default 4 covers 1st/2nd flap and 1st/2nd edge — the modes that actually drive resonance design. Pushing this much higher just adds high-order flap modes that no realistic per-rev family crosses inside the operating envelope; raise it deliberately when you need them. n_tower_modes : Number of tower modes (default 4 — 1st/2nd FA + 1st/2nd SS). Drop to 2 to overlay only the 1st FA + 1st SS pair, or push higher for offshore decks where 3rd-mode crossings matter. Ignored when no tower model is available. tower_input : Optional explicit tower — a tower ``.bmi`` path **or a loaded :class:`~pybmodes.models.Tower`** (keyword-only). Useful when ``input_path`` is a blade-only deck or a loaded ``RotatingBlade``. Overrides the deck-supplied tower if ``input_path`` was an ElastoDyn ``.dat``. So the single-load-point form is ``campbell_sweep(blade, omega, tower_input=tower)``. track_by_mac : Whether to use MAC across consecutive rotor speeds to keep each blade output column corresponding to the same physical mode. ``False`` returns the eigensolver's native order (useful for debugging mode re-ordering issues). Tower modes don't change with rotor speed and are unaffected by this flag. Returns ------- :class:`CampbellResult`. """ # ``_load_models`` accepts a path *or* a loaded RotatingBlade / # Tower for either argument (issue #51) — do not coerce to Path # here (that would break a model object). blade, tower = _load_models(input_path, tower_input) omega_rpm = np.asarray(omega_rpm, dtype=float).ravel() if omega_rpm.size == 0: raise ValueError("omega_rpm must contain at least one rotor speed") if not np.all(np.isfinite(omega_rpm)): raise ValueError( "omega_rpm must be finite; found NaN or inf in " f"{omega_rpm.tolist()!r}" ) if np.any(omega_rpm < 0.0): raise ValueError( "omega_rpm must be non-negative (rotor speeds in rpm); found " f"min = {float(omega_rpm.min())!r}" ) if omega_rpm.size >= 2 and np.any(np.diff(omega_rpm) < 0.0): raise ValueError( "omega_rpm must be sorted ascending so MAC tracking can pair " "consecutive steps; got " f"{omega_rpm.tolist()!r}" ) if not isinstance(n_blade_modes, int) or n_blade_modes < 0: raise ValueError( f"n_blade_modes must be a non-negative integer; got {n_blade_modes!r}" ) if not isinstance(n_tower_modes, int) or n_tower_modes < 0: raise ValueError( f"n_tower_modes must be a non-negative integer; got {n_tower_modes!r}" ) # Silently zero-out mode counts for components that aren't present — # easier on the caller than raising for the common "no tower" case. if blade is None: n_blade_modes = 0 if tower is None: n_tower_modes = 0 if n_blade_modes + n_tower_modes < 1: raise ValueError( "no modes to compute: input had neither a blade nor a tower " "component, or both n_blade_modes and n_tower_modes were 0" ) n_steps = omega_rpm.size blade_freqs = blade_parts = None blade_labels: list[str] = [] blade_mac: np.ndarray | None = None if blade is not None and n_blade_modes > 0: blade_freqs, blade_parts, blade_labels, blade_mac = _solve_blade_sweep( blade, omega_rpm, n_blade_modes, track_by_mac, ) tower_freqs = tower_parts = None tower_labels: list[str] = [] if tower is not None and n_tower_modes > 0: tower_freqs, tower_parts, tower_labels = _solve_tower_once( tower, n_tower_modes, n_steps, ) parts_pieces = [a for a in (blade_parts, tower_parts) if a is not None] freqs_pieces = [a for a in (blade_freqs, tower_freqs) if a is not None] frequencies = np.concatenate(freqs_pieces, axis=1) participation = np.concatenate(parts_pieces, axis=1) labels = blade_labels + tower_labels # Build the per-step MAC table: blade columns get the tracked # MACs from the sweep; tower columns are NaN (no rotor-speed # dependence, so a MAC confidence is not meaningful). mac_pieces: list[np.ndarray] = [] if blade_mac is not None: mac_pieces.append(blade_mac) if tower_freqs is not None: mac_pieces.append(np.full((n_steps, n_tower_modes), np.nan)) if mac_pieces: mac_to_previous = np.concatenate(mac_pieces, axis=1) else: mac_to_previous = np.empty((n_steps, 0)) return CampbellResult( omega_rpm=omega_rpm, frequencies=frequencies, labels=labels, participation=participation, n_blade_modes=n_blade_modes, n_tower_modes=n_tower_modes, mac_to_previous=mac_to_previous, )