Source code for monee.problem.core

import functools
import logging
import math
from collections.abc import Callable
from dataclasses import dataclass

from monee.model import (
    CHPControlNode,
    CHPHGControlNode,
    ExtHydrGrid,
    ExtPowerGrid,
    GasGrid,
    GasToHeatControlNode,
    GasToHeatHG,
    GasToPower,
    GenericModel,
    HeatExchanger,
    HeatExchangerGenerator,
    HeatExchangerLoad,
    HeatGenerator,
    HeatLoad,
    Network,
    PassiveHeatExchanger,
    PassiveHeatExchangerGenerator,
    PassiveHeatExchangerLoad,
    PowerGenerator,
    PowerLoad,
    PowerToGas,
    PowerToHeatControlNode,
    PowerToHeatHG,
    Sink,
    Source,
    Var,
)
from monee.model.storage import ElectricStorage, GasStorage, ThermalStorage

logger = logging.getLogger(__name__)


def nan_to_zero(v):
    """Return *v*, replacing NaN values (in Var/Const wrappers too) with 0."""
    if isinstance(v, (int, float)):
        return 0 if math.isnan(v) else v
    if isinstance(v, Var):
        if isinstance(v.value, (int, float)) and math.isnan(v.value):
            return 0
        return v
    if hasattr(v, "value"):
        val = v.value
        if isinstance(val, (int, float)) and math.isnan(val):
            return 0
    return v


class Objective:
    def __init__(self, selected_models_link) -> None:
        self._selected_models_link = selected_models_link
        self._data_attacher = None
        self._calculator = lambda _: 0
        self._period_filter = None

    def data(self, data_attacher):
        self._data_attacher = data_attacher
        return self

    def calculate(self, calculator):
        self._calculator = calculator
        return self

    def when_period(self, period_filter):
        """Only activate for periods where *period_filter* is truthy. Accepts a
        callable ``(t: int) -> bool`` or a collection of indices."""
        if callable(period_filter):
            self._period_filter = period_filter
        else:
            allowed = set(period_filter)
            self._period_filter = lambda t: t in allowed
        return self

    def _eval(self, network, period_index=None):
        if self._period_filter is not None and period_index is not None:
            if not self._period_filter(period_index):
                return [0]
        model_objectives = []
        if self._data_attacher is not None:
            model_to_data = {}
            for model in self._selected_models_link(network):
                model_to_data[model] = self._data_attacher(model)
            model_objectives.append(self._calculator(model_to_data))
        else:
            model_objectives.append(
                self._calculator(self._selected_models_link(network))
            )
        return model_objectives


[docs] class Objectives: def __init__(self) -> None: self._objectives = []
[docs] def select(self, model_selection_function) -> Objective: objective = Objective( lambda network: [ model for model in network.all_models() if model_selection_function(model) ] ) self._objectives.append(objective) return objective
[docs] def with_models(self, models_link) -> Objective: objective = Objective(models_link) self._objectives.append(objective) return objective
[docs] def all(self, network, period_index=None): if self._objectives: return functools.reduce( lambda a, b: a + b, [ objective._eval(network, period_index) for objective in self._objectives ], ) return []
class Constraint: def __init__( self, selected_models_link, selected_models_with_ids_link=None ) -> None: self._selected_models_link = selected_models_link self._selected_models_with_ids_link = selected_models_with_ids_link self._data_attacher = None self._equations = [] self._comp_equations = [] self._temporal_equations = [] self._period_filter = None def data(self, data_attacher): self._data_attacher = data_attacher return self def equation(self, equation_lambda): self._equations.append(equation_lambda) return self def comp_equation(self, equation_lambda): self._comp_equations.append(equation_lambda) return self def temporal_equation(self, equation_lambda): """Add a cross-period constraint. Lambda signature ``(model, component_id, temporal_state) -> eq | list[eq]``. Silently skipped in single-period solves.""" self._temporal_equations.append(equation_lambda) return self def when_period(self, period_filter): """Only activate for periods where *period_filter* is truthy. Accepts a callable ``(t: int) -> bool`` or a collection of indices.""" if callable(period_filter): self._period_filter = period_filter else: allowed = set(period_filter) self._period_filter = lambda t: t in allowed return self def _eval(self, network, period_index=None): if self._period_filter is not None and period_index is not None: if not self._period_filter(period_index): return [] model_equations = [] selected_models = self._selected_models_link(network) for equation in self._equations: if self._data_attacher is not None: model_to_data = {} for model in selected_models: model_to_data[model] = self._data_attacher(model) for item in model_to_data.items(): model_equations.append(equation(item)) else: for model in selected_models: model_equations.append(equation(model)) for comp_equation in self._comp_equations: if self._data_attacher is not None: model_to_data = {} for model in selected_models: model_to_data[model] = self._data_attacher(model) model_equations.append(comp_equation(model_to_data)) else: model_equations.append(comp_equation(selected_models)) return model_equations @property def has_temporal(self): return len(self._temporal_equations) > 0 def _eval_temporal(self, network, temporal_state, period_index=None): if not self._temporal_equations: return [] if self._period_filter is not None and period_index is not None: if not self._period_filter(period_index): return [] if self._selected_models_with_ids_link is None: return [] model_equations = [] for model, comp_id in self._selected_models_with_ids_link(network): for eq_fn in self._temporal_equations: result = eq_fn(model, comp_id, temporal_state) if hasattr(result, "__iter__") and not isinstance(result, str): model_equations.extend(result) else: model_equations.append(result) return model_equations
[docs] class Constraints: def __init__(self) -> None: self._constraints = []
[docs] def select(self, component_selection_function) -> Constraint: def _filter(component): return ( component_selection_function(component) and component.active and (not component.ignored) ) constraint = Constraint( lambda network: [ component.model for component in network.all_components() if _filter(component) ], selected_models_with_ids_link=lambda network: [ (component.model, component.id) for component in network.all_components() if _filter(component) ], ) self._constraints.append(constraint) return constraint
[docs] def select_types(self, model_cls_tuple) -> Constraint: return self.select( lambda component: isinstance(component.model, model_cls_tuple) )
[docs] def select_grids(self, grid_cls_tuple) -> Constraint: return self.select(lambda component: isinstance(component.grid, grid_cls_tuple))
[docs] def with_models(self, models) -> Constraint: constraint = Constraint(models) self._constraints.append(constraint) return constraint
[docs] def all(self, network, period_index=None): if self._constraints: return functools.reduce( lambda a, b: a + b, [ constraint._eval(network, period_index) for constraint in self._constraints ], ) return []
[docs] def all_temporal(self, network, temporal_state, period_index=None): results = [] for constraint in self._constraints: results.extend( constraint._eval_temporal(network, temporal_state, period_index) ) return results
@property def has_temporal(self): return any(c.has_temporal for c in self._constraints)
[docs] def regulation_ramp(self, limit): """Limit per-period change of ``regulation`` to ``limit`` (skips models without a regulation Var).""" def _regulation_ramp(model, cid, ts): if not hasattr(model, "regulation") or isinstance( model.regulation, (int, float) ): return [] prev_reg = ts.get(cid, "regulation") if prev_reg is None: return [] return [ model.regulation - prev_reg <= limit, prev_reg - model.regulation <= limit, ] self.select( lambda comp: ( hasattr(comp.model, "regulation") and comp.active and not comp.ignored ) ).temporal_equation(_regulation_ramp) return self
@property def empty(self): return len(self._constraints) == 0
[docs] @dataclass class AttributeParameter: min: Callable[[str, float], float] max: Callable[[str, float], float] val: Callable[[str, float], float] integer: bool = False
# Regulation attribute parameter (0–1) used across load-shedding formulations. REGULATION_ATTR = [ ( "regulation", AttributeParameter( min=lambda attr, val: 0, max=lambda attr, val: 1, val=lambda attr, val: 1, ), ) ]
[docs] class OptimizationProblem: """ Declares which components are free variables plus optional objectives/constraints. Workflow: create → call ``controllable_*`` helpers → set ``objectives`` and ``constraints`` → pass to :func:`run_multi_period` / :func:`run_mpc`. """ def __init__(self, debug=False, lex_objectives: bool = False) -> None: """ Args: debug: verbose logging during variable promotion. lex_objectives: Pyomo-only two-phase solve: first user objectives, then formulation-tightening terms (``branch/node/child.minimize``) with the phase-1 optimum pinned. Removes weight tuning. GEKKO falls back to single-objective sum. """ self._controllable_appliables: list = [] self._controllable_to_attr: dict[GenericModel, str] = {} self._bounds_for_controllables: list = [] self._objectives: Objectives | None = None self._constraints: Constraints | None = None self._debug = debug self._lex_objectives = lex_objectives @property def lex_objectives(self) -> bool: return self._lex_objectives def _apply(self, network: Network): self._controllable_to_attr.clear() for appliable in self._controllable_appliables: appliable(network) for model, attributes in self._controllable_to_attr.items(): for attribute_param in attributes: attribute = attribute_param param = None if type(attribute_param) is tuple: attribute = attribute_param[0] param: AttributeParameter = attribute_param[1] if hasattr(model, attribute): val = getattr(model, attribute) if type(val) is not Var: if param is None: if val == 0.0: logger.warning( "Attribute '%s' on %s has value 0.0 and no " "explicit bounds - inferred bounds [0, 0] " "will lock this variable. Use an " "AttributeParameter or prob.bounds() to " "set meaningful bounds.", attribute, type(model).__name__, ) variable = Var( val, max=0 if val <= 0 else val, min=0 if val > 0 else val, name=attribute, ) else: variable = Var( param.val(attribute, val), param.max(attribute, val), param.min(attribute, val), param.integer, name=attribute, ) setattr(model, attribute, variable) if self._debug: logger.warning("From the model %s", model) logger.warning( "The attribute %s has been replaced", attribute ) for ( min_value, max_value, component_condition, attributes, ) in self._bounds_for_controllables: component_list = network.all_components() for component in component_list: if ( component_condition(component.model, component.grid) and component.independent ): if self._debug: logger.info("From the model %s", component.model) logger.info("The attributes %s are bounded", attributes) for attribute in attributes: var = getattr(component.model, attribute) var.max = max_value var.min = min_value
[docs] def add_to_controllable( self, model, attributes: list[str | tuple[str, AttributeParameter]] ): if model not in self._controllable_to_attr: self._controllable_to_attr[model] = [] self._controllable_to_attr[model] += attributes
[docs] def bounds(self, minmax, component_condition=lambda _m, _g: True, attributes=None): """Override min/max for ``Var`` attributes on matching components. ``component_condition`` is ``(model, grid) -> bool``.""" self._bounds_for_controllables.append( (minmax[0], minmax[1], component_condition, attributes) )
[docs] def controllable( self, attributes: list[str | tuple[str, AttributeParameter]], component_condition=lambda _: True, ): """Promote attributes on matching components to free Vars. Low-level primitive; prefer the typed ``controllable_*`` helpers. Each attribute entry is a name or ``(name, AttributeParameter)`` tuple.""" def apply_controllable(network: Network): component_list = network.all_components() for component in component_list: if component_condition(component): self.add_to_controllable(component.model, attributes) self._controllable_appliables.append(apply_controllable) return self
[docs] def controllable_all(self, attributes): self.controllable(component_condition=lambda _: True, attributes=attributes) return self
[docs] def controllable_demands( self, attributes: list[str | tuple[str, AttributeParameter]] ): """Make PowerLoad/HeatLoad/HeatExchangerLoad/PassiveHeatExchangerLoad, gas Sinks, and consuming HeatExchanger/PassiveHeatExchanger controllable. Note: an attribute currently at 0.0 with no AttributeParameter is locked to [0,0]. Use prob.bounds() or pass an AttributeParameter for real bounds. """ self.controllable( component_condition=lambda component: ( ( isinstance( component.model, HeatExchangerLoad | PassiveHeatExchangerLoad | PowerLoad | HeatLoad, ) or ( type(component.model) is Sink and type(component.grid) is GasGrid ) or ( isinstance( component.model, HeatExchanger | PassiveHeatExchanger ) # q_mw is always a Var on these models; the consuming # setpoint lives in q_mw_set (= -q_mw), so a positive # consuming q_mw maps to q_mw_set < 0. and isinstance(component.model.q_mw_set, (int, float)) and (component.model.q_mw_set < 0) ) ) and component.active and (not component.ignored) ), attributes=attributes, ) return self
[docs] def controllable_generators(self, attributes): """Make PowerGenerator/HeatGenerator/HeatExchangerGenerator/ PassiveHeatExchangerGenerator/Source controllable. See :meth:`controllable_demands` for the 0.0-locks-to-[0,0] caveat.""" self.controllable( component_condition=lambda component: ( isinstance( component.model, HeatExchangerGenerator | PassiveHeatExchangerGenerator | PowerGenerator | HeatGenerator | Source, ) and component.active and (not component.ignored) ), attributes=attributes, ) return self
[docs] def controllable_ext(self): """Declare ExtPowerGrid / ExtHydrGrid connections controllable. Purely declarative: these models already expose their exchange (``p_mw`` / ``mass_flow_kgs``) as free Vars from their own ``__init__``, so no attribute is (re)bound here (``attributes=[]``). The call documents intent and registers the components with the controllable set; it does not itself make the ext-grid exchange free. """ self.controllable( component_condition=lambda component: ( isinstance(component.model, ExtPowerGrid | ExtHydrGrid) and component.active and (not component.ignored) ), attributes=[], ) return self
[docs] def controllable_cps(self, attributes): """Make all coupling-point components (CHP / P2H / G2H / P2G / G2P / their HG variants) controllable on *attributes*.""" self.controllable( component_condition=lambda component: ( isinstance( component.model, CHPControlNode | CHPHGControlNode | PowerToHeatControlNode | GasToHeatControlNode | PowerToGas | GasToPower | PowerToHeatHG | GasToHeatHG, ) and component.active and (not component.ignored) ), attributes=attributes, ) return self
[docs] def controllable_storages(self): """Promote dispatch on all storage components via their ``make_controllable``.""" def _apply_storages(network: Network): for component in network.all_components(): if ( isinstance( component.model, (ElectricStorage, GasStorage, ThermalStorage) ) and component.active and not component.ignored ): component.model.make_controllable() self._controllable_appliables.append(_apply_storages) return self
[docs] def controllable_backup_lines(self): """Add a binary ``on_off`` ∈ {0,1} on every branch with ``backup=True``.""" self.controllable( component_condition=lambda component: ( "backup" in component.model.vars and component.model.backup ), attributes=[ ( "on_off", AttributeParameter( min=lambda attr, val: 0, max=lambda attr, val: 1, val=lambda attr, val: 1, integer=True, ), ) ], ) return self
@property def objectives(self): return self._objectives @property def constraints(self): return self._constraints @constraints.setter def constraints(self, constraints): self._constraints = constraints @objectives.setter def objectives(self, objectives): self._objectives = objectives @property def controllables_link(self): return lambda _: self._controllable_to_attr.keys()