import logging
import math
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from types import SimpleNamespace
import networkx as nx
import pandas
from monee.model import (
Child,
Const,
ExtHydrGrid,
ExtPowerGrid,
GenericModel,
Intermediate,
IntermediateEq,
MultiGridBranchModel,
MultiGridCompoundModel,
Network,
Node,
PostProcess,
Var,
WaterPipe,
)
from monee.model.child import GridFormingMixin
from monee.model.grid import GasGrid, PowerGrid, WaterGrid
from monee.model.node import Junction
from monee.problem.core import OptimizationProblem
_log = logging.getLogger(__name__)
# Display helpers (also imported by simulation.timeseries)
#: Internal bookkeeping columns omitted from pretty-printed output.
_META_COLS: frozenset[str] = frozenset({"active", "independent", "ignored"})
def _display_df(df: pandas.DataFrame) -> pandas.DataFrame:
"""Return *df* with internal bookkeeping columns removed."""
return df.drop(columns=[c for c in _META_COLS if c in df.columns])
def _col_summary(series: pandas.Series) -> str | None:
"""One-line numeric summary for a single attribute column.
Returns ``'val'`` for a constant column, ``'[lo, hi]'`` when values vary,
or ``None`` when the series is empty or entirely NaN.
"""
vals = series.dropna()
if vals.empty:
return None
lo, hi = float(vals.min()), float(vals.max())
if abs(hi - lo) < 1e-9 * max(1.0, abs(hi)):
return f"{lo:.4g}"
return f"[{lo:.4g}, {hi:.4g}]"
def _summarize_numeric_cols(num: pandas.DataFrame) -> list[str]:
"""Per-column one-line summaries for the repr table row."""
parts = []
for col in num.columns:
s = _col_summary(num[col])
if s is None:
continue
parts.append(f"{col} ∈ {s}" if "[" in s else f"{col} = {s}")
return parts
_TABLE_CSS = (
"<style>"
".monee-result table{border-collapse:collapse;font-size:.88em;margin-top:4px}"
".monee-result th{background:#e8e8e8;padding:3px 10px;border:1px solid #ccc;"
"text-align:right;font-weight:600}"
".monee-result td{padding:3px 10px;border:1px solid #ddd;text-align:right;"
"white-space:nowrap}"
".monee-result tr:nth-child(even) td{background:#f6f6f6}"
"</style>"
)
# Public, documented aliases for the shared result-rendering helpers above.
# The simulation layer (timeseries / multi_period) renders the same kind of
# result tables, so these are part of the solver's public reporting surface
# rather than backend-private internals. The underscore-prefixed names are
# retained for backwards compatibility.
display_df = _display_df
col_summary = _col_summary
TABLE_CSS = _TABLE_CSS
[docs]
@dataclass
class SolverResult:
"""Outcome of a single solve. ``objective=0.0`` for plain energy flow;
``None`` when not meaningful (e.g. ``MultiPeriodResult.get_period_result``).
Termination metadata
--------------------
``solver_status`` and ``termination_condition`` mirror the Pyomo
``SolverStatus`` / ``TerminationCondition`` strings (or ``None`` if
not populated by the backend). They let downstream consumers
distinguish a converged-optimal solution from a *witness* incumbent
Gurobi returns when it hits a time limit — the witness has
``success=True`` and looks identical otherwise. Callers that care
about convergence (e.g. the MC resilience pipeline, which must drop
aborted samples rather than averaging in a non-converged shed value)
can inspect ``termination_condition`` to detect the time-limit case.
"""
network: Network
dataframes: dict[str, pandas.DataFrame]
objective: float | None
success: bool
violations: dict[str, float] = field(default_factory=dict)
solver_status: str | None = None
termination_condition: str | None = None
#: Which solve mode actually ran: ``"simulation"`` (square IMODE=1
#: steady-state) or ``"optimization"`` (IMODE=3). When a simulation was
#: requested but the model was not square, this reads ``"optimization"`` -
#: the signal that the fast steady-state path silently fell back. ``None``
#: when the backend doesn't distinguish the two (e.g. Pyomo).
mode_used: str | None = None
#: Diagnostic infeasibility report when the solve failed (``success=False``);
#: ``None`` on success. Declaring it as a field gives both backends a single,
#: documented place to surface the report instead of the GEKKO-raises /
#: Pyomo-dynamic-attribute split. (GEKKO additionally raises
#: ``GekkoSolveError`` carrying the same report.)
infeasibility_report: object | None = None
[docs]
def summary(self):
return repr(self)
[docs]
def full(self):
return self.network.as_result_dataframe_dict_str()
[docs]
def get(self, model_type) -> pandas.DataFrame:
"""Result DataFrame for *model_type* (typo-safe vs. dict access)."""
return self.dataframes.get(model_type.__name__, pandas.DataFrame())
def __getitem__(self, component_id) -> pandas.Series:
"""Return the result row matching *component_id*. Raises KeyError if missing."""
for df in self.dataframes.values():
if "id" in df.columns:
try:
mask = df["id"] == component_id
except (ValueError, TypeError):
# Tuple branch IDs vs scalar id column triggers a broadcasting error.
mask = df["id"].apply(lambda x: x == component_id)
if mask.any():
return df[mask].iloc[0]
raise KeyError(component_id)
def __repr__(self) -> str:
SEP = "─" * 68
title = "SolverResult"
if self.objective is not None and abs(self.objective) > 0.0:
title += f" (objective = {self.objective:.6g})"
lines = [title, SEP]
for type_name, df in self.dataframes.items():
n = len(df)
vis = _display_df(df).drop(columns=["id", "node_id"], errors="ignore")
num = vis.select_dtypes(include="number")
parts = _summarize_numeric_cols(num)
row = f" {type_name:<22} {n:>2}"
if parts:
row += " │ " + " · ".join(parts[:4])
lines.append(row)
lines.append(SEP)
if self.violations:
lines.append(" VIOLATIONS:")
for key, mag in self.violations.items():
lines.append(f" {key}: {mag:.4g}")
lines.append(SEP)
return "\n".join(lines)
def __str__(self) -> str:
"""Full per-type table dump (``print(result)``); ``repr`` gives the summary."""
title = "SolverResult"
if self.objective is not None and abs(self.objective) > 0.0:
title += f" (objective = {self.objective:.6g})"
SEP = "─" * 68
lines = [title]
for type_name, df in self.dataframes.items():
vis = _display_df(df)
n = len(vis)
plural = "instance" if n == 1 else "instances"
lines.append("")
lines.append(f" {type_name} ({n} {plural})")
lines.append(" " + SEP)
table = vis.to_string(index=False, float_format=lambda x: f"{x:.4g}")
for line in table.splitlines():
lines.append(" " + line)
return "\n".join(lines)
def _repr_html_(self) -> str:
obj_extra = ""
if self.objective is not None and abs(self.objective) > 0.0:
obj_extra = (
f" <span style='color:#888;font-weight:normal'>"
f"objective = {self.objective:.6g}</span>"
)
sections = []
for type_name, df in self.dataframes.items():
vis = _display_df(df)
n = len(vis)
plural = "instance" if n == 1 else "instances"
tbl = vis.to_html(
index=False,
border=0,
classes=[],
na_rep="-",
float_format=lambda x: f"{x:.5g}",
)
sections.append(
f"<details open style='margin-bottom:6px'>"
f"<summary style='cursor:pointer;font-weight:bold;color:#333;"
f"padding:2px 0'>{type_name} "
f"<span style='color:#999;font-weight:normal'>({n} {plural})</span>"
f"</summary>{tbl}</details>"
)
return (
f"{_TABLE_CSS}"
f"<div class='monee-result'>"
f"<div style='font-weight:bold;font-size:1.05em;padding:4px 0 8px'>"
f"SolverResult{obj_extra}</div>" + "\n".join(sections) + "</div>"
)
[docs]
def plot(
self,
title: str | None = None,
show_children: bool = True,
use_monee_positions: bool = False,
write_to: str | None = None,
):
"""Plotly interactive network graph; delegates to :func:`plot_result`."""
from monee.visualization.result_visualization import plot_result
return plot_result(
self,
title=title,
show_children=show_children,
use_monee_positions=use_monee_positions,
write_to=write_to,
)
[docs]
class SinglePeriodSolverProtocol:
"""Documents the contract a solver backend must expose to act as a delegate
inside :class:`GekkoMultiPeriodSolver` / :class:`PyomoMultiPeriodSolver`.
Not enforced at runtime."""
[docs]
class SolverInterface(ABC):
"""Abstract base class for solver backends (GEKKO, Pyomo, …)."""
[docs]
@abstractmethod
def solve(
self,
input_network: Network,
optimization_problem: OptimizationProblem = None,
draw_debug=False,
exclude_unconnected_nodes=False,
step_state=None,
simulation=False,
formulation=None,
) -> SolverResult:
"""
Solve the energy-flow / optimisation problem for *input_network*.
Args:
input_network: The network to solve.
optimization_problem: Optional optimisation problem with objectives
and constraints. If ``None``, performs a plain energy-flow solve.
draw_debug: If ``True``, emit debug output from the solver.
exclude_unconnected_nodes: Legacy flag; prefer islanding config.
step_state: Inter-step state from the previous timeseries step.
simulation: If ``True``, square the model (pin phantom vars, drop
operational flow limits) and solve it as a steady-state
simulation. GEKKO runs this as IMODE=1 (falling back to IMODE=3
if not square); backends without a simulation mode ignore it.
formulation: Solve-time formulation override - a registry key
string (``"smooth_nlp"``, ``"convex_miqcqp"``, …), a
:class:`~monee.model.formulation.core.NetworkFormulation`, or a
sequence of either (merged left to right). Overrides the
network-level ``apply_formulation`` choice; components without
any choice fall back to ``DEFAULT_SIMULATION_FORMULATION``.
Returns:
A :class:`SolverResult` with updated variable values and result DataFrames.
"""
@abstractmethod
def _add_equations(self, solver_obj, eqs):
"""Register a list of equations/constraints with the backend solver object."""
[docs]
def init_branches(self, branches):
for branch in branches:
branch.model.init(branch.grid)
[docs]
@staticmethod
def mark_temporal_components(network, ignored_nodes: set) -> None:
"""Set ``_temporal_active`` on every model carrying a temporal method,
so static-only constraints can be suppressed when coupling is active."""
_temporal_methods = frozenset(
(
"inter_temporal_equations",
"inter_step_equations",
"inter_period_equations",
)
)
for node in network.nodes:
if node.id in ignored_nodes or node.ignored:
continue
if any(hasattr(node.model, m) for m in _temporal_methods):
node.model._temporal_active = True
for child in network.childs_by_ids(node.child_ids):
if child.ignored:
continue
if any(hasattr(child.model, m) for m in _temporal_methods):
child.model._temporal_active = True
for branch in network.branches:
if branch.ignored:
continue
if any(hasattr(branch.model, m) for m in _temporal_methods):
branch.model._temporal_active = True
for compound in network.compounds:
if compound.ignored:
continue
if any(hasattr(compound.model, m) for m in _temporal_methods):
compound.model._temporal_active = True
def _collect_temporal_eqs(
self,
solver_obj,
network,
nodes,
branches,
compounds,
ignored_nodes,
state,
mode_method,
):
"""Register ``inter_temporal_equations`` and *mode_method* for every
model/formulation that implements them."""
methods = ("inter_temporal_equations", mode_method)
for node in nodes:
if ignore_node(node, network, ignored_nodes):
continue
for method in methods:
if hasattr(node.model, method):
eqs = as_iter(getattr(node.model, method)(state, node.id))
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
if node.formulation is not None and hasattr(node.formulation, method):
eqs = as_iter(
getattr(node.formulation, method)(node.model, state, node.id)
)
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
for child in network.childs_by_ids(node.child_ids):
if ignore_child(child, ignored_nodes):
continue
for method in methods:
if hasattr(child.model, method):
eqs = as_iter(getattr(child.model, method)(state, child.id))
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
if child.formulation is not None and hasattr(
child.formulation, method
):
eqs = as_iter(
getattr(child.formulation, method)(
child.model, state, child.id
)
)
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
for branch in branches:
if ignore_branch(branch, network, ignored_nodes):
continue
for method in methods:
if hasattr(branch.model, method):
eqs = as_iter(getattr(branch.model, method)(state, branch.id))
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
if branch.formulation is not None and hasattr(
branch.formulation, method
):
eqs = as_iter(
getattr(branch.formulation, method)(
branch.model, state, branch.id
)
)
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
for compound in compounds:
if ignore_compound(compound, ignored_nodes):
continue
for method in methods:
if hasattr(compound.model, method):
eqs = as_iter(getattr(compound.model, method)(state, compound.id))
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
if compound.formulation is not None and hasattr(
compound.formulation, method
):
eqs = as_iter(
getattr(compound.formulation, method)(
compound.model, state, compound.id
)
)
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
[docs]
def process_inter_step_equations(
self,
solver_obj,
network: Network,
nodes,
branches,
compounds,
ignored_nodes: set,
step_state,
optimization_problem=None,
period_index=None,
):
"""Collect ``inter_step_equations`` + ``inter_temporal_equations`` plus
user temporal constraints for timeseries solves."""
self._collect_temporal_eqs(
solver_obj,
network,
nodes,
branches,
compounds,
ignored_nodes,
step_state,
"inter_step_equations",
)
self._collect_oxf_temporal_eqs(
solver_obj, network, step_state, optimization_problem, period_index
)
[docs]
def process_inter_period_equations(
self,
solver_obj,
network: Network,
nodes,
branches,
compounds,
ignored_nodes: set,
period_state,
optimization_problem=None,
period_index=None,
):
"""Collect ``inter_period_equations`` + ``inter_temporal_equations`` plus
user temporal constraints for multi-period solves."""
self._collect_temporal_eqs(
solver_obj,
network,
nodes,
branches,
compounds,
ignored_nodes,
period_state,
"inter_period_equations",
)
self._collect_oxf_temporal_eqs(
solver_obj, network, period_state, optimization_problem, period_index
)
def _collect_oxf_temporal_eqs(
self, solver_obj, network, temporal_state, optimization_problem, period_index
):
"""Evaluate user-defined temporal constraints from the optimization problem."""
if optimization_problem is None:
return
constraints = optimization_problem.constraints
if constraints is None or not constraints.has_temporal:
return
eqs = constraints.all_temporal(
network, temporal_state, period_index=period_index
)
if eqs:
self._add_equations(solver_obj, filter_intermediate_eqs(eqs))
[docs]
def as_iter(possible_iter):
if possible_iter is None:
raise ValueError("None as result for 'equations' is not allowed!")
return possible_iter if hasattr(possible_iter, "__iter__") else [possible_iter]
[docs]
def filter_bool_eqs(eqs, context=None):
"""Drop Python-``bool`` sentinel equations, identically for both backends.
An ``equations()`` body can collapse a constraint to a Python ``bool`` when
a component is over-deactivated (e.g. an attribute overwritten by a
:class:`Const` so ``a == b`` evaluates eagerly):
* ``True`` → tautology, a no-op; dropped silently.
* ``False`` → structurally infeasible (contradictory) constraint. Feeding it
into model construction either crashes the build (GEKKO) or - if it were
kept - injects an unsatisfiable constraint. We drop it so the
load-shedding objective can resolve the situation, but emit a warning so
the modelling error is surfaced rather than silently hidden.
*context* is an optional label (e.g. ``"node_3"``) included in the warning.
"""
kept = []
for eq in eqs:
if isinstance(eq, bool):
if eq is False:
_log.warning(
"Dropping always-false (structurally infeasible) equation%s; "
"this usually means a node/branch was over-deactivated.",
f" in {context}" if context is not None else "",
)
continue
kept.append(eq)
return kept
def _process_intermediate_eqs(m, model, equations):
"""Replace each ``Intermediate``-typed attribute with the backend's named
intermediate sub-expression (``m.Intermediate``), for backends whose model
object exposes that hook (GEKKO, CasADi)."""
for intermediate_eq in [eq for eq in equations if type(eq) is IntermediateEq]:
attr_intermediate_var = getattr(model, intermediate_eq.attr)
eq = (
intermediate_eq.eq() if callable(intermediate_eq.eq) else intermediate_eq.eq
)
if type(attr_intermediate_var) is Intermediate:
i = m.Intermediate(eq)
setattr(model, intermediate_eq.attr, i)
[docs]
class OperatorEquationAssembly:
"""Equation-assembly passes shared by backends whose model object exposes
operator-overloaded symbols plus GEKKO-style hooks (``m.sin``/``m.cos``/...,
``m.Equation(s)``, ``m.Obj``, ``m.Intermediate``): the GEKKO and CasADi
backends.
monee's component ``equations()`` bodies are backend-agnostic - they build
expressions from the injected variable objects and the math hooks passed in -
so a single set of passes drives both backends. Pyomo and Gurobi construct
constraints differently (their own intermediate handling / objective
accumulation) and provide their own overrides instead of using this mixin.
Hosts must set ``self._simulation`` (the per-solve simulation flag, read by
the branch pass to drop operational flow limits) and may override
:meth:`_pwl_impl`.
"""
def _pwl_impl(self, m):
"""Backend piecewise-linear/spline implementation handed to
``branch.equations(..., pwl_impl=...)``. ``None`` when the backend has no
PWL support (e.g. CasADi); GEKKO overrides it with a cubic-spline impl."""
return None
[docs]
def process_internal_oxf_components(self, m, network):
for constraint in network.constraints:
m.Equations(
filter_bool_eqs([constraint(network)], context="network constraint")
)
obj = None
for objective in network.objectives:
if obj is not None:
obj = obj + objective(network)
else:
obj = objective(network)
if obj is not None:
m.Obj(obj)
[docs]
def process_oxf_components(
self,
m,
network: "Network",
optimization_problem,
period_index=None,
):
if optimization_problem.constraints is not None and (
not optimization_problem.constraints.empty
):
m.Equations(
filter_bool_eqs(
optimization_problem.constraints.all(
network, period_index=period_index
),
context="optimization problem constraint",
)
)
obj = None
for objective in (
optimization_problem.objectives.all(network, period_index=period_index)
if optimization_problem.objectives is not None
else []
):
if obj is not None:
obj = obj + objective
else:
obj = objective
if obj is not None:
m.Obj(obj)
[docs]
def process_equations_compounds(self, m, network, compounds, ignored_nodes):
for compound in compounds:
if ignore_compound(compound, ignored_nodes):
continue
equations = compound.equations(network)
for expr in compound.minimize(network, sqrt_impl=m.sqrt):
m.Obj(expr)
if equations is not None:
compound_eqs = filter_bool_eqs(
as_iter(equations), context=f"compound_{compound.id}"
)
_process_intermediate_eqs(m, compound, compound_eqs)
m.Equations(filter_intermediate_eqs(compound_eqs))
[docs]
def process_equations_nodes_childs(
self, m, network: "Network", nodes, ignored_nodes
):
for node in nodes:
if ignore_node(node, network, ignored_nodes):
continue
node_childs = network.childs_by_ids(node.child_ids)
grid = node.grid
from_branches = [
network.branch_by_id(branch_id).model
for branch_id in node.from_branch_ids
if not ignore_branch(
network.branch_by_id(branch_id), network, ignored_nodes
)
]
to_branches = [
network.branch_by_id(branch_id).model
for branch_id in node.to_branch_ids
if not ignore_branch(
network.branch_by_id(branch_id), network, ignored_nodes
)
]
connected_childs = [
child.model
for child in node_childs
if not ignore_child(child, ignored_nodes)
]
equations = as_iter(
node.equations(
grid,
from_branches,
to_branches,
connected_childs,
sin_impl=m.sin,
cos_impl=m.cos,
if_impl=m.if2,
abs_impl=m.abs3,
max_impl=m.max2,
sign_impl=m.sign2,
sqrt_impl=m.sqrt,
)
)
for expr in node.minimize(
grid, from_branches, to_branches, connected_childs, sqrt_impl=m.sqrt
):
m.Obj(expr)
node_eqs = filter_bool_eqs(equations, context=f"node_{node.id}")
_process_intermediate_eqs(m, node.model, node_eqs)
m.Equations(filter_intermediate_eqs(node_eqs))
for child in node_childs:
if ignore_child(child, ignored_nodes):
continue
child_eqs = filter_bool_eqs(
as_iter(child.equations(grid, node)),
context=f"child_{child.id}",
)
for expr in child.minimize(grid, node, sqrt_impl=m.sqrt):
m.Obj(expr)
_process_intermediate_eqs(m, child.model, child_eqs)
m.Equations(filter_intermediate_eqs(child_eqs))
[docs]
def process_equations_branches(
self, m, network, branches, ignored_nodes, objs_exprs
):
pwl_impl = self._pwl_impl(m)
for branch in branches:
if ignore_branch(branch, network, ignored_nodes):
continue
grid = branch.grid
branch_eqs = as_iter(
branch.equations(
grid,
network.node_by_id(branch.from_node_id).model,
network.node_by_id(branch.to_node_id).model,
sin_impl=m.sin,
cos_impl=m.cos,
if_impl=m.if3,
abs_impl=m.abs3,
max_impl=m.max2,
sign_impl=m.sign3,
log_impl=m.log10,
sqrt_impl=m.sqrt,
exp_impl=m.exp,
pwl_impl=pwl_impl,
simulation=self._simulation,
)
)
for expr in branch.minimize(
grid,
network.node_by_id(branch.from_node_id).model,
network.node_by_id(branch.to_node_id).model,
sqrt_impl=m.sqrt,
):
objs_exprs.append(expr)
branch_eqs = filter_bool_eqs(branch_eqs, context=f"branch_{branch.id}")
_process_intermediate_eqs(m, branch.model, branch_eqs)
m.Equations(filter_intermediate_eqs(branch_eqs))
[docs]
def inject_nans(target: GenericModel):
"""Replace Var/Const fields with NaN placeholders; zero regulation."""
for key, value in target.__dict__.items():
if isinstance(value, Const):
setattr(target, key, Const(float("nan")))
if isinstance(value, Var):
setattr(
target,
key,
Var(float("nan"), max=value.max, min=value.min, name=value.name),
)
if hasattr(target, "regulation") and not isinstance(target.regulation, Var):
target.regulation = 0.0
[docs]
def mark_ignored_components(network, ignored_nodes):
"""Pre-mark ``component.ignored`` so ``optimization_problem._apply`` excludes them."""
for branch in network.branches:
if ignore_branch(branch, network, ignored_nodes):
branch.ignored = True
for node in network.nodes:
if ignore_node(node, network, ignored_nodes):
node.ignored = True
for child in network.childs_by_ids(node.child_ids):
child.ignored = True
_mark_ignored_compounds(network, ignored_nodes)
def _mark_ignored_compounds(network, ignored_nodes):
for compound in network.compounds:
if ignore_compound(compound, ignored_nodes):
compound.ignored = True
# Children attached to *external* nodes (e.g. SubHG at the heat
# node of a broken CHPHG) wouldn't be caught by the node-loop
# above - their host node may still be in the active grid.
# Mark them directly so ignore_child filters them out.
for sc in compound.subcomponents:
if isinstance(sc, Child):
sc.ignored = True
def _carrier_node_ids(network: Network, ignored_nodes, grid_type) -> set:
"""Active, non-ignored node ids belonging to a single hydraulic carrier."""
return {
n.id
for n in network.nodes
if isinstance(n.grid, grid_type) and n.active and n.id not in ignored_nodes
}
def _carrier_islands(network: Network, node_ids: set):
"""Connected components of the active-pipe subgraph restricted to *node_ids*
(pipes only ever join same-carrier nodes, so each component is one island)."""
g = nx.Graph()
g.add_nodes_from(node_ids)
for branch in network.branches:
if (
branch.active
and branch.from_node_id in node_ids
and branch.to_node_id in node_ids
):
g.add_edge(branch.from_node_id, branch.to_node_id)
return nx.connected_components(g)
def _island_grid_forming_node(network: Network, island):
"""Return the id of an island node carrying an active grid-forming child, or
``None`` if the island has no pressure/temperature reference."""
for nid in island:
node = network.node_by_id(nid)
childs = network.childs_by_ids(node.child_ids)
if any(isinstance(c.model, GridFormingMixin) and c.active for c in childs):
return nid
return None
[docs]
def pin_floating_hydraulic_gauges(network: Network, ignored_nodes):
"""Pin one node's pressure per energized gas/water island that has no
grid-forming source.
A grid-forming child (``ExtHydrGrid`` / ``GridFormingSource``) pins an
absolute pressure reference via its ``overwrite``. An island with none
references pressure only through the (relative) pipe pressure-drop equations
- its absolute level is a free gauge degree of freedom, which IPOPT can only
resolve by regularising a rank-deficient block (slow, fragile). Such islands
arise routinely: e.g. a heat-exchanger-fed return loop, where the coupling HE
transfers mass and temperature but imposes no pressure drop.
Pinning any one node's pressure to the carrier nominal removes the
rank-deficiency without changing flows or pressure *differences* (only the
arbitrary absolute level). Temperature is left free: where present it is
referenced through the heat-exchanger coupling, not a nodal pin.
"""
for grid_type in (GasGrid, WaterGrid):
ids = _carrier_node_ids(network, ignored_nodes, grid_type)
for island in _carrier_islands(network, ids):
if _island_grid_forming_node(network, island) is not None:
continue
node = network.node_by_id(min(island))
nominal = getattr(node.grid, "nominal_pressure_pu", 1.0)
model = node.model
if isinstance(getattr(model, "pressure_pu", None), Var):
model.pressure_pu = Const(nominal)
if isinstance(getattr(model, "pressure_squared_pu", None), Var):
model.pressure_squared_pu = Const(nominal**2)
[docs]
def mark_heat_balance_slacks(network: Network, ignored_nodes):
"""Mark one grid-forming reference node per energized heat island so its
(dependent) nodal heat balance is dropped - the heat carrier's slack,
mirroring the free mass-flow / angle slack the other carriers already have.
The nodal heat balances over a connected island are linearly dependent (one
is redundant); the grid-forming node absorbs the imbalance. Islands are
connected components of the active water subgraph; references are
``GridFormingMixin`` children, matching the islanding extension's criterion.
"""
heat_ids = _carrier_node_ids(network, ignored_nodes, WaterGrid)
if not heat_ids:
return
for island in _carrier_islands(network, heat_ids):
ref = _island_grid_forming_node(network, island)
if ref is not None:
network.node_by_id(ref).model._drop_heat_balance = True
def _island_has_free_mass_flow_child(network: Network, island) -> bool:
"""True if any active child on the island leaves its mass flow to the solver
(slack-like: ``ExtHydrGrid`` / ``ConsumeHydrGrid``), i.e. the island can
absorb an arbitrary through-flow."""
for nid in island:
node = network.node_by_id(nid)
for child in network.childs_by_ids(node.child_ids):
if child.active and isinstance(
getattr(child.model, "mass_flow_kgs", None), Var
):
return True
return False
[docs]
def mark_he_flow_prescription(network: Network, ignored_nodes):
"""Decide for each dynamic heat exchanger (compound-internal ``SubHE``)
whether it prescribes its design mass flow or yields to the network.
A SubHE bridges water islands through its multi-grid control node (the
islands stay separate because :func:`_carrier_islands` only spans pure
water nodes). In a supply/return structure every adjacent island has a
slack child with a free mass flow (``ExtHydrGrid`` / ``ConsumeHydrGrid``),
so the through-flow is otherwise undetermined and the design flow must
prescribe it. If any adjacent island lacks such a slack (e.g. a
fixed-mass-flow ``Sink`` fed only through the compound), that island's
balance already fixes the through-flow - pinning it to the design flow
would over-determine the system, so the HE yields and its energy balance
runs on the actual flow instead (``_he_flow_prescribed = False``)."""
dyn_he_branches = [
b
for b in network.branches
if b.active and getattr(b.model, "_calc_mass_flow", False)
]
if not dyn_he_branches:
return
water_ids = _carrier_node_ids(network, ignored_nodes, WaterGrid)
node_to_island: dict = {}
island_free: list = []
for idx, island in enumerate(_carrier_islands(network, water_ids)):
island_free.append(_island_has_free_mass_flow_child(network, island))
for nid in island:
node_to_island[nid] = idx
for branch in dyn_he_branches:
endpoints = [branch.from_node_id, branch.to_node_id]
# Water-side endpoints contribute their island directly; the other
# endpoint is the compound's control node - collect the islands of its
# water neighbours (reached via the compound's transfer branches).
adjacent = {node_to_island[nid] for nid in endpoints if nid in node_to_island}
for cn_id in (nid for nid in endpoints if nid not in node_to_island):
for b in network.branches:
if not b.active:
continue
if b.from_node_id == cn_id and b.to_node_id in node_to_island:
adjacent.add(node_to_island[b.to_node_id])
elif b.to_node_id == cn_id and b.from_node_id in node_to_island:
adjacent.add(node_to_island[b.from_node_id])
branch.model._he_flow_prescribed = (
all(island_free[i] for i in adjacent) if adjacent else True
)
[docs]
def inject_vars(inject_fn, nodes, branches, compounds, network, ignored_nodes):
"""Call ``inject_fn(model, component, category)`` on each active component;
ignored components get :func:`inject_nans` instead.
``category`` ∈ {``branch``, ``node``, ``child``, ``compound``}.
"""
for branch in branches:
if ignore_branch(branch, network, ignored_nodes):
branch.ignored = True
inject_nans(branch.model)
continue
inject_fn(branch.model, branch, "branch")
for node in nodes:
if ignore_node(node, network, ignored_nodes):
node.ignored = True
for child in network.childs_by_ids(node.child_ids):
child.ignored = True
inject_nans(child.model)
inject_nans(node.model)
continue
inject_fn(node.model, node, "node")
_inject_node_childs(inject_fn, node, network, ignored_nodes)
for compound in compounds:
if ignore_compound(compound, ignored_nodes):
compound.ignored = True
inject_nans(compound.model)
continue
inject_fn(compound.model, compound, "compound")
def _inject_node_childs(inject_fn, node, network, ignored_nodes):
for child in network.childs_by_ids(node.child_ids):
if ignore_child(child, ignored_nodes):
child.ignored = True
inject_nans(child.model)
continue
inject_fn(child.model, child, "child")
[docs]
def withdraw_vars(withdraw_fn, nodes, branches, compounds, network):
"""Call ``withdraw_fn(model)`` on each component to materialise solved Vars."""
for branch in branches:
withdraw_fn(branch.model)
for node in nodes:
withdraw_fn(node.model)
for child in network.childs_by_ids(node.child_ids):
withdraw_fn(child.model)
for compound in compounds:
withdraw_fn(compound.model)
[docs]
def apply_post_process(model: GenericModel) -> None:
"""Evaluate the model's :class:`PostProcess` attributes from its solved
values - runs after ``withdraw_vars`` and fully outside the solver. Lambdas
receive a namespace, so they read fields as ``v.vm_pu`` (not ``v["vm_pu"]``)."""
vals = SimpleNamespace(**model.values)
for attr in model.__dict__.values():
if isinstance(attr, PostProcess):
try:
attr.value = attr.fn(vals)
except Exception:
attr.value = float("nan")
[docs]
def apply_post_process_all(nodes, branches, compounds, network) -> None:
"""Run :func:`apply_post_process` over every solved component."""
for branch in branches:
apply_post_process(branch.model)
for node in nodes:
apply_post_process(node.model)
for child in network.childs_by_ids(node.child_ids):
apply_post_process(child.model)
for compound in compounds:
apply_post_process(compound.model)
def _copy_var_values(src, dst) -> None:
"""Copy ``.value`` for Var/Intermediate from *src* to *dst*. Intermediates
must be propagated so e.g. derived ``vm_pu`` isn't silently lost."""
for key, val in src.__dict__.items():
if isinstance(val, (Var, Intermediate)):
dst_attr = dst.__dict__.get(key)
if isinstance(dst_attr, (Var, Intermediate)):
dst_attr.value = val.value
[docs]
def persist_solution(solved_copy: Network, original: Network) -> None:
"""Propagate solved values back so the next ``inject_vars`` warm-starts."""
for src_node, dst_node in zip(solved_copy.nodes, original.nodes):
_copy_var_values(src_node.model, dst_node.model)
for src_child, dst_child in zip(
solved_copy.childs_by_ids(src_node.child_ids),
original.childs_by_ids(dst_node.child_ids),
):
_copy_var_values(src_child.model, dst_child.model)
for src_branch, dst_branch in zip(solved_copy.branches, original.branches):
_copy_var_values(src_branch.model, dst_branch.model)
for src_compound, dst_compound in zip(solved_copy.compounds, original.compounds):
_copy_var_values(src_compound.model, dst_compound.model)
[docs]
def compute_bound_violations(
nodes, branches, compounds, network, tol: float = 1e-6
) -> dict[str, float]:
"""``{"<Type>.<id>.<attr>": magnitude}`` for Var.value violations beyond *tol*."""
violations: dict[str, float] = {}
def _check(model, label: str) -> None:
for key, val in model.__dict__.items():
if not isinstance(val, Var):
continue
v = val.value
if v is None or (isinstance(v, float) and math.isnan(v)):
continue
if val.min is not None and v < val.min - tol:
violations[f"{label}.{key}"] = val.min - v
elif val.max is not None and v > val.max + tol:
violations[f"{label}.{key}"] = v - val.max
for branch in branches:
_check(branch.model, f"{type(branch.model).__name__}.{branch.id}")
for node in nodes:
_check(node.model, f"{type(node.model).__name__}.{node.id}")
for child in network.childs_by_ids(node.child_ids):
_check(child.model, f"{type(child.model).__name__}.{child.id}")
for compound in compounds:
_check(compound.model, f"{type(compound.model).__name__}.{compound.id}")
return violations
[docs]
def ignore_branch(branch, network: Network, ignored_nodes):
return (
(not branch.active)
or ignore_node(network.node_by_id(branch.id[0]), network, ignored_nodes)
or ignore_node(network.node_by_id(branch.id[1]), network, ignored_nodes)
)
[docs]
def ignore_node(node, network: Network, ignored_nodes):
ig = (not node.active) or (node.id in ignored_nodes)
if not node.independent:
compound = network.compound_of_node(node.id)
if compound is not None:
ig = ig or ignore_compound(compound, ignored_nodes)
return ig
[docs]
def ignore_child(child, ignored_nodes):
# ``child.ignored`` is set by ``mark_ignored_components`` when the child
# belongs to an ignored compound - without consulting it here, the child
# would still appear in its host node's balance equations as a free Var
# (e.g. SubHG.heat_mw absorbing arbitrary heat at the heat node).
return (not child.active) or (child.node_id in ignored_nodes) or child.ignored
[docs]
def ignore_compound(compound, ignored_nodes):
ig = not compound.active
external_broken = any(
value in ignored_nodes for value in compound.connected_to.values()
)
# Internal subcomponent turned off (e.g. user deactivates one of a
# CHPHG's internal transfer branches): the ControlNode's power balance -
# degenerate when its from-branches are gone - otherwise collides with
# its el_mw / heat_mw coupling rows and the LP is infeasible.
internal_broken = any(
not getattr(sc, "active", True) for sc in compound.subcomponents
)
if external_broken or internal_broken:
if hasattr(compound.model, "set_active"):
compound.model.set_active(False)
else:
ig = True
elif hasattr(compound.model, "set_active"):
compound.model.set_active(True)
return ig
[docs]
def generate_real_topology(nx_net):
net_copy = nx_net.copy()
# keys=True targets the exact parallel edge - not always key 0.
for u, v, key, data in nx_net.edges(keys=True, data=True):
branch = data["internal_branch"]
if not branch.active or (
hasattr(branch.model, "on_off")
and type(branch.model.on_off) is not Var
and branch.model.on_off == 0
):
net_copy.remove_edge(u, v, key)
return net_copy
[docs]
def remove_cps(network: Network):
relevant_compounds = [
compound
for compound in network.compounds
if isinstance(compound.model, MultiGridCompoundModel)
]
for comp in relevant_compounds:
network.remove_compound(comp.id)
if "heat_return_node_id" in comp.connected_to:
heat_return_node = network.node_by_id(
comp.connected_to["heat_return_node_id"]
)
heat_node = network.node_by_id(comp.connected_to["heat_node_id"])
network.branch(WaterPipe(0, 0), heat_return_node.id, heat_node.id)
for branch in network.branches:
if isinstance(branch.model, MultiGridBranchModel):
network.remove_branch(branch.id)
[docs]
def find_ignored_nodes(network: Network, islanding_config=None):
"""Return node IDs to exclude from the solve.
Default: active topology, only ExtPowerGrid/ExtHydrGrid children are
"leading". With *islanding_config*: full topology (backup lines included)
and any :class:`GridFormingMixin` child counts as leading for an
islanding-enabled carrier.
"""
ignored_nodes = set()
without_cps = network.copy()
remove_cps(without_cps)
if islanding_config is not None:
topology = without_cps._network_internal.copy()
else:
topology = generate_real_topology(without_cps._network_internal)
components = nx.connected_components(topology)
for component in components:
component_leading = False
for node_id in component:
int_node: Node = topology.nodes[node_id]["internal_node"]
for child_id in int_node.child_ids:
child = without_cps.child_by_id(child_id)
if not child.active:
continue
if isinstance(child.model, ExtPowerGrid | ExtHydrGrid):
component_leading = True
break
# With islanding enabled, any GridFormingMixin child also leads.
if islanding_config is not None and isinstance(
child.model, GridFormingMixin
):
node_grid = int_node.grid
carrier_enabled = (
(
isinstance(node_grid, PowerGrid)
and islanding_config.electricity is not None
)
or (
isinstance(node_grid, GasGrid)
and islanding_config.gas is not None
)
or (
isinstance(node_grid, WaterGrid)
and islanding_config.water is not None
)
)
if carrier_enabled:
component_leading = True
break
if component_leading:
break
if not component_leading:
ignored_nodes.update(component)
# Leaf-stub pruning: a node with no active children and degree ≤ 1 in the
# remaining active topology is a dead-end pump-target (infeasible LP).
# Iterate to fixed point. Skipped under islanding (topology there includes
# inactive backup branches, so degree overstates connectivity).
if islanding_config is None:
# Compound port children (e.g. SubHG on a CHPHG heat node) carry no
# demand of their own; they must not keep an otherwise-isolated
# junction alive.
compound_port_child_ids = {
sub.id
for compound in network.compounds
for sub in compound.subcomponents
if isinstance(sub, Child)
}
# Attachment ports of active multi-grid compounds: remove_cps strips
# the compound's transfer branches from the pruning copy, so these
# nodes look like childless dead-ends here - but in the real solve the
# compound re-attaches and carries flow through them. Ports whose grid
# connection is genuinely gone are still caught by the
# connected-component check above.
compound_attachment_node_ids = {
node_id
for compound in network.compounds
if compound.active and isinstance(compound.model, MultiGridCompoundModel)
for node_id in compound.connected_to.values()
}
def _has_real_active_child(int_node):
for cid in int_node.child_ids:
if cid in compound_port_child_ids:
continue
if without_cps.child_by_id(cid).active:
return True
return False
def _has_mass_flow_anchor(int_node):
"""A Junction at degree ≤ 1 needs a mass-flow-contributing child
(Sink / Source / ExtHydrGrid) to anchor mass conservation.
Heat-only children (HeatLoad / HeatGenerator) don't qualify -
with no outgoing pipe their heat_mw term has no enthalpy
stream to balance against and the junction becomes infeasible."""
for cid in int_node.child_ids:
if cid in compound_port_child_ids:
continue
child = without_cps.child_by_id(cid)
if not child.active:
continue
if "mass_flow_kgs" in getattr(child.model, "vars", {}):
return True
return False
while True:
new_stubs = set()
for node_id in topology.nodes:
if node_id in ignored_nodes:
continue
if node_id in compound_attachment_node_ids:
continue
int_node: Node = topology.nodes[node_id]["internal_node"]
active_degree = sum(
1 for nb in topology.neighbors(node_id) if nb not in ignored_nodes
)
if active_degree > 1:
continue
# Classical leaf stub: no real active children at all.
if not _has_real_active_child(int_node):
new_stubs.add(node_id)
continue
# Mass-balance dead-end: Junction at degree ≤ 1 whose only
# children are heat-only (heat_mw) - see _has_mass_flow_anchor.
if isinstance(int_node.model, Junction) and not _has_mass_flow_anchor(
int_node
):
new_stubs.add(node_id)
if not new_stubs:
break
ignored_nodes.update(new_stubs)
return ignored_nodes
# ---------------------------------------------------------------------------
# Inter-step state carriers
# ---------------------------------------------------------------------------
# StepState (sequential timeseries, plain floats) and PeriodState (multi-period,
# live solver vars) live here - in the solver's low-level core - rather than in
# the simulation layer, because both solver backends accept a ``step_state`` and
# must reference these types. Hosting them here keeps the dependency direction
# one-way (simulation -> solver) and avoids a circular import; the historical
# import path ``monee.simulation.step_state`` re-exports them unchanged.
def _collect_id_matches(net, component_id):
candidates = []
for node in net.nodes:
if node.id == component_id:
candidates.append(node.model)
for child in net.childs_by_ids(node.child_ids):
if child.id == component_id:
candidates.append(child.model)
for branch in net.branches:
if branch.id == component_id:
candidates.append(branch.model)
for compound in net.compounds:
if compound.id == component_id:
candidates.append(compound.model)
return candidates
def _find_model(net, component_id, attr=None):
"""Return the model for *component_id*. Disambiguates node/child id collisions
by preferring a model that actually carries *attr*."""
candidates = _collect_id_matches(net, component_id)
if not candidates:
return None
if attr is not None and len(candidates) > 1:
with_attr = [m for m in candidates if hasattr(m, attr)]
if with_attr:
return with_attr[0]
return candidates[0]
def _extract_value(val):
"""Return a plain float from *val* (Var, Intermediate, float, int, or None)."""
if val is None:
return None
if isinstance(val, (Var, Intermediate, PostProcess)):
return val.value
if isinstance(val, (int, float)):
return float(val)
return val
[docs]
class InterStepState(ABC):
"""Abstract base. ``step``: negative = relative to current, non-negative = absolute."""
dt_h: float = 1.0
[docs]
@abstractmethod
def get(self, component_id, attr: str, step: int = -1):
"""Float (StepState) / live var (PeriodState), or None if no data."""
[docs]
def has(self, component_id, attr: str) -> bool:
return self.get(component_id, attr) is not None
[docs]
class StepState(InterStepState):
"""All previously-solved network copies (sequential timeseries). ``get`` returns floats.
``initial_state`` falls back when no prior solve has written the attribute.
``max_steps`` caps how many solved networks are retained (``None`` =
unlimited): each network is a full copy, so an open-ended run (e.g. a
:class:`~monee.simulation.Stepper` paced by a co-simulation framework)
would otherwise grow memory without bound. Absolute ``step`` indices keep
their meaning when old networks are dropped; reading a dropped step falls
back to ``initial_state``."""
def __init__(
self, initial_state: dict | None = None, max_steps: int | None = None
) -> None:
if max_steps is not None and max_steps < 1:
raise ValueError(f"max_steps must be >= 1 or None, got {max_steps}")
self._networks: list = []
self._dropped: int = 0
self._max_steps = max_steps
self.dt_h: float = 1.0
self._initial_state: dict = dict(initial_state) if initial_state else {}
[docs]
def push(self, net) -> None:
self._networks.append(net)
if self._max_steps is not None and len(self._networks) > self._max_steps:
del self._networks[0]
self._dropped += 1
def _network_for_step(self, step: int):
if not self._networks:
return None
if step < 0:
return self._networks[step] if -step <= len(self._networks) else None
# Absolute index: shift by dropped prefix; dropped → fallback.
idx = step - self._dropped
return self._networks[idx] if 0 <= idx < len(self._networks) else None
[docs]
def get(self, component_id, attr: str, step: int = -1):
net = self._network_for_step(step)
if net is not None:
model = _find_model(net, component_id, attr)
if model is not None:
val = _extract_value(getattr(model, attr, None))
if val is not None:
return val
return self._initial_state.get((component_id, attr))
def __len__(self) -> int:
"""Logical number of solved steps (including dropped ones)."""
return self._dropped + len(self._networks)
def __repr__(self) -> str:
n = len(self)
return f"StepState({n} step{'s' if n != 1 else ''})"
[docs]
class PeriodState(InterStepState):
"""All period networks (multi-period). ``get`` returns live solver vars after
injection, so reading from another period becomes an algebraic cross-period
constraint. Both past and future absolute indices are accessible.
.. note::
Absolute (non-negative) ``step`` indices are **window-local** under
:func:`~monee.simulation.multi_period.run_mpc`: ``self._networks`` is only
the current rolling window's networks, so ``step=0`` means the first
period of *this window*, not the global start of the run. This differs
from :class:`StepState`, whose absolute indices stay globally meaningful
via its dropped-prefix bookkeeping. Relative (negative) lookbacks behave
identically in both. Prefer relative indices in ``inter_step_equations``
if you need MPC-window-independent behaviour.
"""
def __init__(
self,
networks: list,
current_t: int,
dt_h: float = 1.0,
initial_state: dict | None = None,
) -> None:
self._networks = networks
self.current_t = current_t
self.dt_h = dt_h
self._initial_state: dict = initial_state or {}
@property
def T(self) -> int:
return len(self._networks)
[docs]
def get(self, component_id, attr: str, step: int = -1):
actual_t = (self.current_t + step) if step < 0 else step
if actual_t < 0:
key = (component_id, attr)
if key in self._initial_state:
return self._initial_state[key]
return None
try:
net = self._networks[actual_t]
except IndexError:
return None
model = _find_model(net, component_id, attr)
if model is None:
return None
return getattr(model, attr, None)
def __repr__(self) -> str:
return f"PeriodState(T={self.T}, current_t={self.current_t}, dt_h={self.dt_h})"