Source code for pybmodes.io._elastodyn.parser

# Copyright 2024-2026 Jae Hoon Seo
# Marine Structural Mechanics and Integrity Lab (SMI Lab), Inha University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Line-driven parsers for the three ElastoDyn ``.dat`` file flavours.

Each entry point reads a path with ``latin-1`` (the encoding OpenFAST
sources use), splits into lines, and delegates to a per-flavour
parser. The parsers walk the file in line order, dispatching to
``_assign_*_field`` for typed scalars and ``_consume_*_distributed``
for the numeric tables.

Field-set discrepancies vs. the user-specified spec are documented in
``pybmodes.io.elastodyn_reader``'s public docstring.
"""

from __future__ import annotations

import pathlib

import numpy as np

from pybmodes.io._elastodyn.lex import (
    _canon_label,
    _is_section_divider,
    _parse_float,
    _split_label_index,
    _split_value_label,
    _strip_quotes,
)
from pybmodes.io._elastodyn.types import (
    ElastoDynBlade,
    ElastoDynMain,
    ElastoDynTower,
)

# ---------------------------------------------------------------------------
# Main file parser
# ---------------------------------------------------------------------------


[docs] def read_elastodyn_main(path: str | pathlib.Path) -> ElastoDynMain: """Parse a top-level ElastoDyn ``.dat`` file.""" path = pathlib.Path(path) text = path.read_text(encoding="latin-1") return _parse_main(text.splitlines(), source_file=path)
def _parse_main( lines: list[str], source_file: pathlib.Path | None = None ) -> ElastoDynMain: obj = ElastoDynMain(header="", title="", source_file=source_file) # Header is line 0 (file marker), title is the next non-empty line. if lines: obj.header = lines[0].rstrip() title_idx = 1 while title_idx < len(lines) and not lines[title_idx].strip(): title_idx += 1 if title_idx < len(lines): obj.title = lines[title_idx].rstrip() i = title_idx + 1 else: i = 1 in_nodal_outlist = False while i < len(lines): line = lines[i] stripped = line.strip() if not stripped: i += 1 continue if _is_section_divider(line): obj.section_dividers.append(line.rstrip()) # An OutList ends with an "END" marker on its own line, but the # nodal-OutList opener is itself a "==" divider — track it. if "[optional section]" in line.lower() or stripped.startswith("======"): in_nodal_outlist = True i += 1 continue # Detect OutList header (line literally containing the word "OutList" # as a label with an arrow comment). if "OutList" in stripped and stripped.split()[0] == "OutList": target = obj.nodal_out_list if in_nodal_outlist else obj.out_list target.append(line.rstrip()) i += 1 # Consume quoted-channel lines until END or end-of-file. while i < len(lines): ln = lines[i] target.append(ln.rstrip()) if ln.strip().upper().startswith("END"): i += 1 break i += 1 continue parts = _split_value_label(line) if parts is None: i += 1 continue value_str, label = parts canon = _canon_label(label) obj.scalars[label] = value_str # Dispatch to typed fields where we care about the value. # Known labels raise loudly on malformed values; unknown # labels are tolerated (they just land in ``obj.scalars`` and # are ignored by downstream consumers). try: _assign_main_field(obj, label, canon, value_str) except (ValueError, IndexError) as err: if _canon_root(canon) in _KNOWN_MAIN_CANON: raise ValueError( f"Malformed value for known ElastoDyn main field " f"{label!r} ({canon}): {value_str!r}. " f"Original error: {err}" ) from err # Unknown / odd label — stay tolerant. i += 1 return obj _KNOWN_MAIN_CANON = frozenset({ "NumBl", "TipRad", "HubRad", "PreCone", "HubCM", "OverHang", "ShftTilt", "Twr2Shft", "TowerHt", "TowerBsHt", "NacCMxn", "NacCMyn", "NacCMzn", "RotSpeed", "TipMass", "HubMass", "HubIner", "GenIner", "NacMass", "NacYIner", "YawBrMass", "BldFile", "TwrFile", }) _KNOWN_TOWER_CANON = frozenset({ "NTwInpSt", "TwrFADmp", "TwrSSDmp", "FAStTunr", "SSStTunr", "AdjTwMa", "AdjFASt", "AdjSSSt", "TwFAM1Sh", "TwFAM2Sh", "TwSSM1Sh", "TwSSM2Sh", }) _KNOWN_BLADE_CANON = frozenset({ "NBlInpSt", "BldFlDmp", "BldEdDmp", "FlStTunr", "AdjBlMs", "AdjFlSt", "AdjEdSt", "BldFl1Sh", "BldFl2Sh", "BldEdgSh", }) def _canon_root(canon: str) -> str: """Strip any trailing digit-index from a canon label. ``_canon_label`` returns ``"PreCone"`` for ``"PreCone1"``, but for a robust set-membership check we still want the root token even if a stray indexed form sneaks through. """ return canon.rstrip("0123456789") def _assign_main_field( obj: ElastoDynMain, label: str, canon: str, value_str: str ) -> None: """Populate typed fields from ``label`` + raw value-string.""" # Re-derive (canon, idx) here so the bare-digit BldFile1 form is handled. canon, idx = _split_label_index(label) if canon == "NumBl": obj.num_bl = int(value_str) elif canon == "TipRad": obj.tip_rad = _parse_float(value_str) elif canon == "HubRad": obj.hub_rad = _parse_float(value_str) elif canon == "PreCone" and idx is not None and 0 <= idx < 3: obj.pre_cone[idx] = _parse_float(value_str) elif canon == "HubCM": obj.hub_cm = _parse_float(value_str) elif canon == "OverHang": obj.overhang = _parse_float(value_str) elif canon == "ShftTilt": obj.shft_tilt = _parse_float(value_str) elif canon == "Twr2Shft": obj.twr2shft = _parse_float(value_str) elif canon == "TowerHt": obj.tower_ht = _parse_float(value_str) elif canon == "TowerBsHt": obj.tower_bs_ht = _parse_float(value_str) elif canon == "NacCMxn": obj.nac_cm_xn = _parse_float(value_str) elif canon == "NacCMyn": obj.nac_cm_yn = _parse_float(value_str) elif canon == "NacCMzn": obj.nac_cm_zn = _parse_float(value_str) elif canon == "RotSpeed": obj.rot_speed_rpm = _parse_float(value_str) elif canon == "TipMass" and idx is not None and 0 <= idx < 3: obj.tip_mass[idx] = _parse_float(value_str) elif canon == "HubMass": obj.hub_mass = _parse_float(value_str) elif canon == "HubIner": obj.hub_iner = _parse_float(value_str) elif canon == "GenIner": obj.gen_iner = _parse_float(value_str) elif canon == "NacMass": obj.nac_mass = _parse_float(value_str) elif canon == "NacYIner": obj.nac_y_iner = _parse_float(value_str) elif canon == "YawBrMass": obj.yaw_br_mass = _parse_float(value_str) elif canon == "BldFile": i_safe = idx if idx is not None else 0 if 0 <= i_safe < 3: obj.bld_file[i_safe] = _normalise_subfile_path(value_str) elif canon == "TwrFile": obj.twr_file = _normalise_subfile_path(value_str) def _normalise_subfile_path(value_str: str) -> str: """Strip surrounding quotes and rewrite Windows-style backslashes to forward slashes so ``parent / sub_file`` resolves correctly on Linux / macOS too. OpenFAST decks authored on Windows often write the ``TwrFile`` / ``BldFile`` paths with backslash separators (``"Tower\\Mytower.dat"``). Python's ``pathlib.Path`` treats backslash as a literal character on POSIX, so the unaltered string would resolve to a non-existent file named ``"Tower\\Mytower.dat"`` instead of ``Tower/Mytower.dat``. """ return _strip_quotes(value_str).replace("\\", "/") # --------------------------------------------------------------------------- # Tower file parser # ---------------------------------------------------------------------------
[docs] def read_elastodyn_tower(path: str | pathlib.Path) -> ElastoDynTower: path = pathlib.Path(path) text = path.read_text(encoding="latin-1") return _parse_tower(text.splitlines(), source_file=path)
def _parse_tower( lines: list[str], source_file: pathlib.Path | None = None ) -> ElastoDynTower: obj = ElastoDynTower(header="", title="", source_file=source_file) if lines: obj.header = lines[0].rstrip() title_idx = 1 while title_idx < len(lines) and not lines[title_idx].strip(): title_idx += 1 if title_idx < len(lines): obj.title = lines[title_idx].rstrip() i = title_idx + 1 else: i = 1 while i < len(lines): line = lines[i] stripped = line.strip() if not stripped: i += 1 continue if _is_section_divider(line): obj.section_dividers.append(line.rstrip()) if "DISTRIBUTED" in stripped.upper(): # Consume the distributed-properties block. i = _consume_tower_distributed(obj, lines, i + 1) continue i += 1 continue parts = _split_value_label(line) if parts is None: i += 1 continue value_str, label = parts canon = _canon_label(label) try: _assign_tower_field(obj, label, canon, value_str) except (ValueError, IndexError) as err: if _canon_root(canon) in _KNOWN_TOWER_CANON: raise ValueError( f"Malformed value for known ElastoDyn tower field " f"{label!r} ({canon}): {value_str!r}. " f"Original error: {err}" ) from err i += 1 return obj _TOWER_DISTR_COL_MAP = { "HtFract": "ht_fract", "TMassDen": "t_mass_den", "TwFAStif": "tw_fa_stif", "TwSSStif": "tw_ss_stif", "TwFAIner": "tw_fa_iner", "TwSSIner": "tw_ss_iner", "TwFAcgOf": "tw_fa_cg_of", "TwSScgOf": "tw_ss_cg_of", } def _consume_tower_distributed( obj: ElastoDynTower, lines: list[str], i: int ) -> int: """Read the column-header lines and the n_rows × n_cols numeric block.""" # Two header rows (column names, units) before the data. header_lines: list[str] = [] while i < len(lines) and len(header_lines) < 2: ln = lines[i] if ln.strip(): header_lines.append(ln.rstrip()) i += 1 obj.distr_header_lines = header_lines col_names = header_lines[0].split() if header_lines else [] field_names = [_TOWER_DISTR_COL_MAP.get(c) for c in col_names] n_cols = len(col_names) # Read data rows up to the next section divider or non-numeric line. rows: list[list[float]] = [] while i < len(lines): ln = lines[i] s = ln.strip() if not s: i += 1 continue if _is_section_divider(ln): break toks = s.split() if len(toks) < n_cols: break try: rows.append([_parse_float(t) for t in toks[:n_cols]]) except ValueError: break i += 1 if rows: arr = np.asarray(rows, dtype=float) for col_idx, fname in enumerate(field_names): if fname is None: continue setattr(obj, fname, arr[:, col_idx].copy()) # Cross-check: the count of parsed rows must equal ``NTwInpSt`` # from the same file. The data loop above breaks on short rows / # non-numeric lines, so a malformed table can silently truncate # without the count check below. ``n_tw_inp_st`` defaults to 0 — if the file is # missing ``NTwInpSt`` entirely we don't enforce the equality # (the structural-property table would be empty anyway, which # is its own failure). if obj.n_tw_inp_st > 0 and len(rows) != obj.n_tw_inp_st: raise ValueError( f"ElastoDyn tower distributed-properties table truncated: " f"NTwInpSt = {obj.n_tw_inp_st} but {len(rows)} numeric " f"row(s) parsed before the next section divider / non-" f"numeric line. Check the upstream deck for a short row, " f"a non-numeric token in a data row, or a missing section " f"divider between the table and the mode-shape block." ) return i def _assign_tower_field( obj: ElastoDynTower, label: str, canon: str, value_str: str ) -> None: canon, idx = _split_label_index(label) if canon == "NTwInpSt": obj.n_tw_inp_st = int(value_str) elif canon == "TwrFADmp" and idx is not None and 0 <= idx < 2: obj.twr_fa_dmp[idx] = _parse_float(value_str) elif canon == "TwrSSDmp" and idx is not None and 0 <= idx < 2: obj.twr_ss_dmp[idx] = _parse_float(value_str) elif canon == "FAStTunr" and idx is not None and 0 <= idx < 2: obj.fa_st_tunr[idx] = _parse_float(value_str) elif canon == "SSStTunr" and idx is not None and 0 <= idx < 2: obj.ss_st_tunr[idx] = _parse_float(value_str) elif canon == "AdjTwMa": obj.adj_tw_ma = _parse_float(value_str) elif canon == "AdjFASt": obj.adj_fa_st = _parse_float(value_str) elif canon == "AdjSSSt": obj.adj_ss_st = _parse_float(value_str) elif canon == "TwFAM1Sh" and idx is not None and 2 <= idx + 1 <= 6: obj.tw_fa_m1_sh[idx - 1] = _parse_float(value_str) elif canon == "TwFAM2Sh" and idx is not None and 2 <= idx + 1 <= 6: obj.tw_fa_m2_sh[idx - 1] = _parse_float(value_str) elif canon == "TwSSM1Sh" and idx is not None and 2 <= idx + 1 <= 6: obj.tw_ss_m1_sh[idx - 1] = _parse_float(value_str) elif canon == "TwSSM2Sh" and idx is not None and 2 <= idx + 1 <= 6: obj.tw_ss_m2_sh[idx - 1] = _parse_float(value_str) # --------------------------------------------------------------------------- # Blade file parser # ---------------------------------------------------------------------------
[docs] def read_elastodyn_blade(path: str | pathlib.Path) -> ElastoDynBlade: path = pathlib.Path(path) text = path.read_text(encoding="latin-1") return _parse_blade(text.splitlines(), source_file=path)
_BLADE_DISTR_COL_MAP = { "BlFract": "bl_fract", "PitchAxis": "pitch_axis", "StrcTwst": "strc_twst", "BMassDen": "b_mass_den", "FlpStff": "flp_stff", "EdgStff": "edg_stff", } def _parse_blade( lines: list[str], source_file: pathlib.Path | None = None ) -> ElastoDynBlade: obj = ElastoDynBlade(header="", title="", source_file=source_file) if lines: obj.header = lines[0].rstrip() title_idx = 1 while title_idx < len(lines) and not lines[title_idx].strip(): title_idx += 1 if title_idx < len(lines): obj.title = lines[title_idx].rstrip() i = title_idx + 1 else: i = 1 while i < len(lines): line = lines[i] stripped = line.strip() if not stripped: i += 1 continue if _is_section_divider(line): obj.section_dividers.append(line.rstrip()) if "DISTRIBUTED" in stripped.upper(): i = _consume_blade_distributed(obj, lines, i + 1) continue i += 1 continue parts = _split_value_label(line) if parts is None: i += 1 continue value_str, label = parts canon = _canon_label(label) try: _assign_blade_field(obj, label, canon, value_str) except (ValueError, IndexError) as err: if _canon_root(canon) in _KNOWN_BLADE_CANON: raise ValueError( f"Malformed value for known ElastoDyn blade field " f"{label!r} ({canon}): {value_str!r}. " f"Original error: {err}" ) from err i += 1 return obj def _consume_blade_distributed( obj: ElastoDynBlade, lines: list[str], i: int ) -> int: header_lines: list[str] = [] while i < len(lines) and len(header_lines) < 2: ln = lines[i] if ln.strip(): header_lines.append(ln.rstrip()) i += 1 obj.distr_header_lines = header_lines col_names = header_lines[0].split() if header_lines else [] field_names = [_BLADE_DISTR_COL_MAP.get(c) for c in col_names] n_cols = len(col_names) rows: list[list[float]] = [] while i < len(lines): ln = lines[i] s = ln.strip() if not s: i += 1 continue if _is_section_divider(ln): break toks = s.split() if len(toks) < n_cols: break try: rows.append([_parse_float(t) for t in toks[:n_cols]]) except ValueError: break i += 1 if rows: arr = np.asarray(rows, dtype=float) for col_idx, fname in enumerate(field_names): if fname is None: continue setattr(obj, fname, arr[:, col_idx].copy()) # Cross-check: same row-count enforcement as the tower path # (``_consume_tower_distributed`` above). if obj.n_bl_inp_st > 0 and len(rows) != obj.n_bl_inp_st: raise ValueError( f"ElastoDyn blade distributed-properties table truncated: " f"NBlInpSt = {obj.n_bl_inp_st} but {len(rows)} numeric " f"row(s) parsed before the next section divider / non-" f"numeric line. Check the upstream deck for a short row, " f"a non-numeric token in a data row, or a missing section " f"divider between the table and the mode-shape block." ) return i def _assign_blade_field( obj: ElastoDynBlade, label: str, canon: str, value_str: str ) -> None: canon, idx = _split_label_index(label) if canon == "NBlInpSt": obj.n_bl_inp_st = int(value_str) elif canon == "BldFlDmp" and idx is not None and 0 <= idx < 2: obj.bld_fl_dmp[idx] = _parse_float(value_str) elif canon == "BldEdDmp" and idx is not None and 0 <= idx < 1: obj.bld_ed_dmp[idx] = _parse_float(value_str) elif canon == "FlStTunr" and idx is not None and 0 <= idx < 2: obj.fl_st_tunr[idx] = _parse_float(value_str) elif canon == "AdjBlMs": obj.adj_bl_ms = _parse_float(value_str) elif canon == "AdjFlSt": obj.adj_fl_st = _parse_float(value_str) elif canon == "AdjEdSt": obj.adj_ed_st = _parse_float(value_str) elif canon == "BldFl1Sh" and idx is not None and 2 <= idx + 1 <= 6: obj.bld_fl1_sh[idx - 1] = _parse_float(value_str) elif canon == "BldFl2Sh" and idx is not None and 2 <= idx + 1 <= 6: obj.bld_fl2_sh[idx - 1] = _parse_float(value_str) elif canon == "BldEdgSh" and idx is not None and 2 <= idx + 1 <= 6: obj.bld_edg_sh[idx - 1] = _parse_float(value_str)