# Copyright 2024-2026 Jae Hoon Seo
# Marine Structural Mechanics and Integrity Lab (SMI Lab), Inha University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Minimal parser and adapter for OpenFAST SubDyn ``.dat`` substructure files.
Scope is intentionally narrow — only what's needed to drive a clamped-base
monopile model from the existing reference decks shipped with the OpenFAST
regression-test corpus. Specifically:
* JOINTS — XYZ positions and joint type.
* BASE REACTION JOINTS — read the single clamped-base joint id and its DOF
flags; SSI files are not handled (the 5MW OC3 deck has none).
* INTERFACE JOINTS — read the transition-piece joint id.
* MEMBERS — beam connectivity (joint pairs and property-set ids).
* CIRCULAR BEAM CROSS-SECTION PROPERTIES — ``E``, ``G``, ``MatDens``,
outer diameter, wall thickness.
Sections we *don't* parse: rectangular / arbitrary cross-sections, cables,
rigid links, spring elements, cosine matrices, concentrated masses, output
settings. These are zero in the bundled OC3 monopile and SubDyn raises an
explicit error if asked to read tables we don't know about, so silently
skipping them is fine; we just consume their rows until the next section
divider. Extending the parser to cover one of those sections is a localised
change — drop another ``_consume_table`` call into :func:`_parse` and add a
typed list field on :class:`SubDynFile`.
The companion :func:`to_pybmodes_pile_tower` adapter takes the parsed
SubDyn together with the ElastoDyn main + tower files and synthesises a
single combined-cantilever ``BMIFile`` + ``SectionProperties`` for
pyBmodes' tower modal solver — a "rigid base + flexible pile + flexible
tower" model with no soil flexibility (matching the OC3 reference design,
which clamps the pile rigidly at the seabed).
"""
from __future__ import annotations
import math
import pathlib
import re
from dataclasses import dataclass, field
import numpy as np
# ---------------------------------------------------------------------------
# Dataclasses
# ---------------------------------------------------------------------------
[docs]
@dataclass
class SubDynJoint:
"""One node in the substructure graph (SS-coordinate system, metres)."""
joint_id: int
x: float
y: float
z: float
joint_type: int = 1 # 1 = cantilever, 2/3/4 = universal/revolute/spherical
[docs]
@dataclass
class SubDynMember:
"""One beam member connecting two joints."""
member_id: int
joint_a: int
joint_b: int
prop_set_a: int
prop_set_b: int
member_type: str = "1c" # 1c = circular beam, 1r = rectangular, 2 = cable, …
[docs]
@dataclass
class SubDynCircProp:
"""Circular beam cross-section property set."""
prop_set_id: int
E: float # Young's modulus, Pa
G: float # Shear modulus, Pa
rho: float # Density, kg/m³
D: float # Outer diameter, m
t: float # Wall thickness, m
@property
def area(self) -> float:
"""Cross-sectional area of the thin-walled tube, m²."""
d_inner = self.D - 2.0 * self.t
return math.pi / 4.0 * (self.D ** 2 - d_inner ** 2)
@property
def mass_per_length(self) -> float:
"""Distributed mass density, kg/m."""
return self.rho * self.area
@property
def I(self) -> float: # noqa: E743 — standard structural symbol for area moment of inertia
"""Second moment of area, m⁴ (same about either bending axis)."""
d_inner = self.D - 2.0 * self.t
return math.pi / 64.0 * (self.D ** 4 - d_inner ** 4)
@property
def EI(self) -> float:
"""Bending stiffness about either transverse axis, N·m²."""
return self.E * self.I
@property
def J(self) -> float:
"""Polar moment of area, m⁴."""
return 2.0 * self.I
@property
def GJ(self) -> float:
"""Torsional stiffness, N·m²."""
return self.G * self.J
@property
def EA(self) -> float:
"""Axial stiffness, N."""
return self.E * self.area
[docs]
@dataclass
class SubDynFile:
"""All parameters parsed from a SubDyn ``.dat`` file."""
header: str = ""
title: str = ""
source_file: pathlib.Path | None = None
joints: list[SubDynJoint] = field(default_factory=list)
members: list[SubDynMember] = field(default_factory=list)
circ_props: list[SubDynCircProp] = field(default_factory=list)
reaction_joint_id: int = 0
interface_joint_id: int = 0
# ---------------------------------------------------------------------------
# Parser
# ---------------------------------------------------------------------------
_RE_SECTION = re.compile(r"^\s*-{3,}\s*(.+?)\s*-{3,}\s*$")
[docs]
def read_subdyn(path: str | pathlib.Path) -> SubDynFile:
"""Parse a SubDyn ``.dat`` file. See module docstring for the supported
subset of sections."""
path = pathlib.Path(path)
text = path.read_text(encoding="latin-1")
return _parse(text.splitlines(), source_file=path)
def _parse_float(tok: str) -> float:
return float(tok.strip().replace("d", "e").replace("D", "E"))
def _parse_int(tok: str) -> int:
return int(tok.strip())
def _next_data_line(lines: list[str], i: int) -> tuple[int, str]:
"""Skip blank lines starting at ``i``, return ``(idx, stripped_line)``."""
while i < len(lines) and not lines[i].strip():
i += 1
if i >= len(lines):
raise EOFError("unexpected end of SubDyn file")
return i, lines[i].rstrip()
def _consume_table(
lines: list[str], i: int, n_rows: int, header_rows: int = 2,
) -> tuple[int, list[list[str]]]:
"""Skip ``header_rows`` non-blank lines, then read ``n_rows`` data rows."""
consumed = 0
while consumed < header_rows:
i, _ = _next_data_line(lines, i)
i += 1
consumed += 1
rows: list[list[str]] = []
for _ in range(n_rows):
i, line = _next_data_line(lines, i)
rows.append(line.split())
i += 1
return i, rows
def _read_count(lines: list[str], i: int) -> tuple[int, int]:
"""Read a ``<count> Label - comment`` line and return ``(idx_after, count)``."""
i, line = _next_data_line(lines, i)
return i + 1, _parse_int(line.split()[0])
def _parse(lines: list[str], source_file: pathlib.Path | None = None) -> SubDynFile:
obj = SubDynFile(source_file=source_file)
if lines:
obj.header = lines[0].rstrip()
title_idx = 1
while title_idx < len(lines) and not lines[title_idx].strip():
title_idx += 1
if title_idx < len(lines):
obj.title = lines[title_idx].rstrip()
# Pre-scan for the section dividers we care about; map section-name
# keywords to the line index of the divider.
section_index: dict[str, int] = {}
for i, line in enumerate(lines):
m = _RE_SECTION.match(line)
if not m:
continue
name = m.group(1).strip().upper()
# Only record the first occurrence — subsequent dividers with the same
# word in their title won't shadow the canonical one.
if name not in section_index:
section_index[name] = i
def find_section(prefix: str) -> int:
"""Return the line index of the first divider whose name **starts with**
``prefix`` (case-insensitive). Prefix matching is necessary because the
SubDyn divider lines often append free-form descriptive text after the
canonical section name, and substring matching fires false positives
(e.g. the *Structure Joints* divider's prose contains the word
"members")."""
target = prefix.upper()
for name, idx in section_index.items():
if name.startswith(target):
return idx
raise ValueError(
f"SubDyn parser: no section header starting with {prefix!r} "
f"found in {source_file}; got headers: {sorted(section_index)}"
)
# ----- JOINTS --------------------------------------------------------
i = find_section("STRUCTURE JOINTS") + 1
i, n_joints = _read_count(lines, i)
i, rows = _consume_table(lines, i, n_joints, header_rows=2)
for r in rows:
obj.joints.append(SubDynJoint(
joint_id=_parse_int(r[0]),
x=_parse_float(r[1]),
y=_parse_float(r[2]),
z=_parse_float(r[3]),
joint_type=_parse_int(r[4]) if len(r) >= 5 else 1,
))
# ----- BASE REACTION JOINTS -----------------------------------------
i = find_section("BASE REACTION") + 1
i, _n_react = _read_count(lines, i)
# Header rows then 1 data row (we only support the single-reaction case).
i, rows = _consume_table(lines, i, 1, header_rows=2)
obj.reaction_joint_id = _parse_int(rows[0][0])
# ----- INTERFACE JOINTS ---------------------------------------------
i = find_section("INTERFACE JOINTS") + 1
i, _n_interf = _read_count(lines, i)
i, rows = _consume_table(lines, i, 1, header_rows=2)
obj.interface_joint_id = _parse_int(rows[0][0])
# ----- MEMBERS -------------------------------------------------------
i = find_section("MEMBERS") + 1
i, n_mem = _read_count(lines, i)
i, rows = _consume_table(lines, i, n_mem, header_rows=2)
for row_idx, r in enumerate(rows):
# A member row needs at least 5 columns
# (MemberID MJointID1 MJointID2 MPropSetID1 MPropSetID2);
# the 6th column (MType) is optional and falls back to "1c"
# (circular beam). A short member row must not raise a
# bare IndexError.
if len(r) < 5:
raise ValueError(
f"SubDyn: malformed MEMBERS row {row_idx + 1} in "
f"{source_file}: expected >= 5 columns "
f"(MemberID MJointID1 MJointID2 MPropSetID1 "
f"MPropSetID2), got {len(r)}: {' '.join(r)!r}"
)
obj.members.append(SubDynMember(
member_id=_parse_int(r[0]),
joint_a=_parse_int(r[1]),
joint_b=_parse_int(r[2]),
prop_set_a=_parse_int(r[3]),
prop_set_b=_parse_int(r[4]),
member_type=r[5] if len(r) >= 6 else "1c",
))
# ----- CIRCULAR BEAM CROSS-SECTION PROPERTIES -----------------------
i = find_section("CIRCULAR BEAM") + 1
i, n_cyl = _read_count(lines, i)
i, rows = _consume_table(lines, i, n_cyl, header_rows=2)
for r in rows:
obj.circ_props.append(SubDynCircProp(
prop_set_id=_parse_int(r[0]),
E=_parse_float(r[1]),
G=_parse_float(r[2]),
rho=_parse_float(r[3]),
D=_parse_float(r[4]),
t=_parse_float(r[5]),
))
return obj
# ---------------------------------------------------------------------------
# Adapter — combine SubDyn pile + ElastoDyn tower into one cantilever
# ---------------------------------------------------------------------------
def _lookup_joint(subdyn: SubDynFile, joint_id: int, role: str) -> SubDynJoint:
"""Find the joint with the given ID, or raise a clear ``ValueError``.
Used by :func:`to_pybmodes_pile_tower` to resolve the reaction
and interface joint IDs against the joint list. A bare
``next(j for j in ... if j.joint_id == ...)`` raises an
uninformative ``StopIteration`` on missing IDs; this helper names
the missing ID, the role (``reaction`` / ``interface``), the
source file, and the known joint IDs so a misreferenced SubDyn
deck produces an actionable error. Pass-2 review.
"""
for j in subdyn.joints:
if j.joint_id == joint_id:
return j
known = sorted(j.joint_id for j in subdyn.joints)
raise ValueError(
f"SubDyn: no joint with id={joint_id} (declared as the "
f"{role} joint) in {subdyn.source_file}; known joint IDs = "
f"{known}. Check the SubDyn ``BASE REACTION`` / "
f"``INTERFACE JOINTS`` block against the ``STRUCTURE JOINTS`` "
f"table."
)
def _circ_prop_for(subdyn: SubDynFile, prop_set_id: int) -> SubDynCircProp:
for p in subdyn.circ_props:
if p.prop_set_id == prop_set_id:
return p
raise ValueError(
f"SubDyn: no circular cross-section property set with id={prop_set_id}; "
f"the OC3 monopile path here only supports circular sections referenced "
f"by member endpoints (rectangular / arbitrary / cable / rigid sets are "
f"not implemented)."
)
def _pile_axial_stations(subdyn: SubDynFile) -> tuple[np.ndarray, list[SubDynCircProp]]:
"""Sort joints by ``z`` (ascending), return ``(z_coords, prop_per_segment)``.
Returns ``n`` z-coordinates and ``n - 1`` cross-section properties (one
per inter-joint segment / member). Assumes joints are connected
sequentially by member ordering — the OC3 monopile satisfies this; a
multi-leg jacket would require a graph-based traversal that this simple
adapter does not perform.
"""
if len(subdyn.joints) < 2 or len(subdyn.members) < 1:
raise ValueError("SubDyn: need at least 2 joints and 1 member for the pile model")
# Joints in ascending-z order.
joints_sorted = sorted(subdyn.joints, key=lambda j: j.z)
z_coords = np.array([j.z for j in joints_sorted], dtype=float)
# For each adjacent pair (k, k+1), find the member spanning that pair
# and use its prop_set (averaged across its endpoints if they differ).
seg_props: list[SubDynCircProp] = []
for k in range(len(joints_sorted) - 1):
ja_id = joints_sorted[k].joint_id
jb_id = joints_sorted[k + 1].joint_id
member = next(
(m for m in subdyn.members
if {m.joint_a, m.joint_b} == {ja_id, jb_id}),
None,
)
if member is None:
raise ValueError(
f"SubDyn: no member connects adjacent joints "
f"{ja_id} (z={joints_sorted[k].z}) and "
f"{jb_id} (z={joints_sorted[k + 1].z}); the OC3-style "
f"sequential-pile assumption is violated."
)
# OC3 has uniform members (prop_set_a == prop_set_b); take the
# average if a future deck has tapered members.
if member.prop_set_a == member.prop_set_b:
seg_props.append(_circ_prop_for(subdyn, member.prop_set_a))
else:
pa = _circ_prop_for(subdyn, member.prop_set_a)
pb = _circ_prop_for(subdyn, member.prop_set_b)
seg_props.append(SubDynCircProp(
prop_set_id=-1,
E=0.5 * (pa.E + pb.E),
G=0.5 * (pa.G + pb.G),
rho=0.5 * (pa.rho + pb.rho),
D=0.5 * (pa.D + pb.D),
t=0.5 * (pa.t + pb.t),
))
return z_coords, seg_props
[docs]
def to_pybmodes_pile_tower(
main, # ElastoDynMain (forward-ref)
tower, # ElastoDynTower
subdyn: SubDynFile,
blade=None, # ElastoDynBlade — optional, used for RNA mass
):
"""Build a combined pile + tower BMI + SectionProperties for OC3-style
rigid-base monopiles.
Layout (axial coordinate ``z`` increases upward)::
z = z_seabed --- rigid clamped base (SubDyn reaction joint)
... pile section, properties from SubDyn members
z = z_TP --- transition piece (SubDyn interface joint
/ ElastoDyn ``TowerBsHt``)
... tower section, properties from ElastoDyn tower
z = z_top --- tower top, lumped RNA tip mass
The returned ``BMIFile`` describes the full beam from ``z_seabed`` to
``z_top`` as a single cantilever; the ``SectionProperties`` array
splices the pile and tower property tables at the transition piece
with two stations very close together so the FE interpolant captures
the cross-section discontinuity correctly.
No soil flexibility, no hydrodynamic added mass — the OC3 design
fixes the pile rigidly at the seabed and the user-selected scope here
excludes hydro coupling. See the case-study script in
``cases/nrel5mw_monopile/run.py`` for context.
"""
from pybmodes.io.bmi import BMIFile, ScalingFactors
from pybmodes.io.elastodyn_reader import (
_stack_tower_section_props,
_tower_top_assembly_mass,
)
from pybmodes.io.sec_props import SectionProperties
# --- Pile geometry from SubDyn ------------------------------------
z_pile, seg_props = _pile_axial_stations(subdyn)
# SubDyn reaction joint sits at z_seabed; interface at z_TP.
# Bare ``next(...)`` raises StopIteration on missing IDs, which
# propagates as a cryptic traceback — convert to a clear
# ValueError that names the missing ID and the source file
# (matches the adjacent ``_pile_axial_stations`` /
# ``_circ_prop_for`` error style). Pass-2 review.
reaction_joint = _lookup_joint(
subdyn, subdyn.reaction_joint_id, "reaction"
)
interface_joint = _lookup_joint(
subdyn, subdyn.interface_joint_id, "interface"
)
z_seabed = float(reaction_joint.z)
z_tp = float(interface_joint.z)
if z_tp <= z_seabed:
raise ValueError(
f"SubDyn: interface joint z={z_tp} must be above reaction joint "
f"z={z_seabed}"
)
# --- Tower geometry from ElastoDyn --------------------------------
# ElastoDyn TowerHt is the tower top relative to mean sea level (offshore).
# TowerBsHt is the tower base; for OC3 it sits at the transition piece
# which is 10 m above MSL. The combined cantilever length is therefore
# (TowerHt - z_seabed), measured upward from the seabed.
z_tower_top = float(main.tower_ht)
tower_base_z = float(main.tower_bs_ht)
# Sanity: the SubDyn TP and the ElastoDyn TowerBsHt should agree.
if abs(tower_base_z - z_tp) > 1.0:
# Soft check: warn-but-proceed equivalent done by raising. The OC3
# deck has TowerBsHt = 10 m and SubDyn interface at z = +10 m, so
# this never trips for the supported case.
raise ValueError(
f"ElastoDyn TowerBsHt ({tower_base_z} m) and SubDyn interface "
f"joint z ({z_tp} m) differ by more than 1 m; this adapter "
f"assumes they describe the same physical transition piece."
)
combined_length = z_tower_top - z_seabed
pile_length = z_tp - z_seabed
pile_frac = pile_length / combined_length
# --- Section properties: pile segments + tower stations -----------
# Build span_loc in [0, 1] from z_seabed to z_tower_top.
sp_tower = _stack_tower_section_props(tower)
tower_z_norm = sp_tower.span_loc # 0 at TP, 1 at tower top in tower frame
tower_combined_frac = pile_frac + tower_z_norm * (1.0 - pile_frac)
# Pile stations: one at each SubDyn joint. Properties are piecewise
# uniform per segment, so duplicate the segment property at the upper
# node of each segment to give the FE interpolant a step at the
# segment join. For OC3 the pile is fully uniform so this collapses
# to a single (D, t, ρ) value across all pile stations.
n_pile = len(z_pile)
pile_combined_frac = (z_pile - z_seabed) / combined_length
# Per-pile-station properties: take the property of the segment
# *above* each station (so the bottom station inherits the bottom
# member's section, and each station thereafter the segment leaving
# it upward); the topmost pile station inherits from the segment
# below (it's collocated with the TP).
pile_props_per_station: list[SubDynCircProp] = []
for k in range(n_pile - 1):
pile_props_per_station.append(seg_props[k])
pile_props_per_station.append(seg_props[-1])
pile_mass_den = np.array([p.mass_per_length for p in pile_props_per_station])
pile_EI = np.array([p.EI for p in pile_props_per_station])
pile_GJ = np.array([p.GJ for p in pile_props_per_station])
pile_EA = np.array([p.EA for p in pile_props_per_station])
# SectionProperties uses ``np.interp`` over span_loc to evaluate
# properties at any spanwise position. To get a sharp discontinuity at
# the transition piece (where the pile cross-section meets the tower
# cross-section), the section-property table includes two stations
# almost-but-not-exactly at the TP: one carrying the pile properties
# at ``pile_frac - eps`` and one carrying the tower properties at
# exactly ``pile_frac``. The tiny normalised gap ``eps`` is small
# enough that interpolation at any sane element midpoint resolves to
# one side or the other unambiguously.
eps = 1.0e-9
pile_combined_frac[-1] = max(pile_combined_frac[-1] - eps, pile_combined_frac[-2] + eps / 2)
span_loc = np.concatenate([pile_combined_frac, tower_combined_frac])
mass_den = np.concatenate([pile_mass_den, sp_tower.mass_den])
flp_stff = np.concatenate([pile_EI, sp_tower.flp_stff])
edge_stff = np.concatenate([pile_EI, sp_tower.edge_stff])
tor_stff = np.concatenate([pile_GJ, sp_tower.tor_stff])
axial_stff = np.concatenate([pile_EA, sp_tower.axial_stff])
# Rotary-inertia floor — same handling as the land-based path.
from pybmodes.io.elastodyn_reader import _rotary_inertia_floor
flp_iner = _rotary_inertia_floor(mass_den, 3.0)
edge_iner = _rotary_inertia_floor(mass_den, 3.0)
zeros = np.zeros_like(span_loc)
sp = SectionProperties(
title="OC3 monopile pile + ElastoDyn tower (combined cantilever)",
n_secs=int(span_loc.size),
span_loc=span_loc,
str_tw=zeros.copy(),
tw_iner=zeros.copy(),
mass_den=mass_den,
flp_iner=flp_iner,
edge_iner=edge_iner,
flp_stff=flp_stff,
edge_stff=edge_stff,
tor_stff=tor_stff,
axial_stff=axial_stff,
cg_offst=zeros.copy(),
sc_offst=zeros.copy(),
tc_offst=zeros.copy(),
source_file=subdyn.source_file,
)
# --- BMI with combined-length cantilever --------------------------
# FE node distribution is decoupled from the section-property table:
# the section table has an ε-gap at the TP for the interpolation
# discontinuity, but the FE mesh places one clean node *at* the TP
# (no degenerate ε-length element). All FE elements straddle either
# purely-pile or purely-tower cross-sections; element-midpoint
# interpolation of the section table gives the right side every time.
pile_nodes = np.linspace(0.0, pile_frac, n_pile)
n_tower_elt = max(tower_combined_frac.size - 1, 1)
tower_nodes = np.linspace(pile_frac, 1.0, n_tower_elt + 1)
el_loc = np.concatenate([pile_nodes, tower_nodes[1:]])
n_elements = el_loc.size - 1
# RNA tip mass at the tower top, same lumping as the land-based case.
tip = _tower_top_assembly_mass(main, blade)
bmi = BMIFile(
title=f"OC3 monopile (mudline z={z_seabed} m, TP z={z_tp} m, top z={z_tower_top} m)",
echo=False,
beam_type=2,
rot_rpm=0.0, rpm_mult=1.0,
radius=combined_length, hub_rad=0.0,
precone=0.0, bl_thp=0.0,
hub_conn=1, n_modes_print=20,
tab_delim=True, mid_node_tw=False,
tip_mass=tip,
id_mat=1, sec_props_file="",
scaling=ScalingFactors(),
n_elements=n_elements,
el_loc=el_loc,
tow_support=0,
support=None,
source_file=None,
)
return bmi, sp
__all__ = [
"SubDynCircProp",
"SubDynFile",
"SubDynJoint",
"SubDynMember",
"read_subdyn",
"to_pybmodes_pile_tower",
]