Source code for monee.model.core

import copy
from abc import ABC, abstractmethod

EL_KEY = "electricity"
GAS_KEY = "gas"
WATER_KEY = "water"
WATER = WATER_KEY
EL = EL_KEY
GAS = GAS_KEY

component_list = []

_IMMUTABLE_PRIMITIVES = frozenset({int, float, bool, str, bytes, complex, type(None)})


[docs] def model(cls): """Register a model class in the global ``component_list`` registry.""" component_list.append(cls) return cls
[docs] def upper(var_or_const): """Return the upper bound of a ``Var`` (or its value if unbounded); pass through otherwise.""" if isinstance(var_or_const, Var): if var_or_const.max is None: return var_or_const.value return var_or_const.max return var_or_const
[docs] def lower(var_or_const): """Return the lower bound of a ``Var`` (or its value if unbounded); pass through otherwise.""" if isinstance(var_or_const, Var): if var_or_const.min is None: return var_or_const.value return var_or_const.min return var_or_const
[docs] def value(var_or_const): """Extract ``.value`` from a ``Var``/``Const``/``Intermediate``/``PostProcess``; pass through plain scalars.""" if isinstance(var_or_const, Const | Var | Intermediate | PostProcess): return var_or_const.value return var_or_const
[docs] def set_initial_value(var, value) -> None: """Seed a variable's initial guess / warm start, backend-agnostically. A model attribute may be a monee :class:`Var` *or*, once a backend has run ``inject_vars`` (e.g. in ``activate_timeseries``), the backend's own variable object. monee ``Var`` / GEKKO ``GKVariable`` / Pyomo ``Var`` all expose a settable ``value``; gurobipy ``Var`` instead carries its warm start in ``Start`` (and has no ``value``, so ``var.value = ...`` raises). Duck-type on the available attribute so callers in the model layer never need to know which solver backend is active. A backend symbol that seeds its warm start elsewhere (e.g. the CasADi ``CasSym``, which captures ``x0`` at ``inject_vars`` time and exposes neither attribute) is deliberately a silent no-op. A plain scalar/string target is never a valid warm-start sink, so reject it loudly to catch the wrong-target programming error the no-op would otherwise mask. """ if hasattr(var, "value"): var.value = value elif hasattr(var, "Start"): # gurobipy.Var var.Start = value elif isinstance(var, (int, float, complex, str, bool)): raise TypeError( f"set_initial_value got a non-variable target {var!r}; expected a " "backend variable with a settable 'value' or 'Start'" )
[docs] class Var: """ A decision variable (or mutable parameter) in the optimisation model. Sign convention for child components: positive = consumption (load), negative = generation. ``PowerGenerator(p_mw=5)`` stores ``p_mw=-5``. Comparison operators compare against the *bound*, not the value, and return ``False`` if the relevant bound is ``None``. """ def __init__(self, value, max=None, min=None, integer=False, name=None) -> None: if not isinstance(value, float | int): raise ValueError( "The initial guess for the variable need to a numeric value!" ) self.value = value self.max = max self.min = min self.integer = integer self.name = name def __neg__(self): actual_max = None if self.max is None else -self.max actual_min = None if self.min is None else -self.min return type(self)( value=-self.value, max=actual_min, min=actual_max, integer=self.integer, name=self.name, ) def __mul__(self, other): if isinstance(other, (int, float)) and other < 0: new_max = None if self.min is None else self.min * other new_min = None if self.max is None else self.max * other else: new_max = None if self.max is None else self.max * other new_min = None if self.min is None else self.min * other return type(self)( value=self.value * other, max=new_max, min=new_min, integer=self.integer, name=self.name, ) def __lt__(self, other): if isinstance(other, float | int) and self.max is not None: return self.max < other return False def __le__(self, other): if isinstance(other, float | int) and self.max is not None: return self.max <= other return False def __gt__(self, other): if isinstance(other, float | int) and self.min is not None: return self.min > other return False def __ge__(self, other): if isinstance(other, float | int) and self.min is not None: return self.min >= other return False def __repr__(self): parts = [repr(self.value)] if self.min is not None or self.max is not None: parts.append(f"min={self.min!r}, max={self.max!r}") if self.integer: parts.append("integer=True") if self.name is not None: parts.append(f"name={self.name!r}") return f"{type(self).__name__}({', '.join(parts)})" def __str__(self): return f"{self.value} ({self.min}, {self.max}), is int: {self.integer}" def __deepcopy__(self, memo): new = type(self).__new__(type(self)) memo[id(self)] = new new.value = self.value new.max = self.max new.min = self.min new.integer = self.integer new.name = self.name return new
tracked = Var
[docs] class Const: """ A fixed (non-optimised) constant that participates in the model attribute protocol. Used in ``overwrite()`` to pin a node variable to a fixed setpoint instead of turning it into a free decision variable. """ def __init__(self, value) -> None: self.value = value def __repr__(self): return f"Const({self.value!r})" def __deepcopy__(self, memo): new = Const.__new__(Const) memo[id(self)] = new new.value = self.value return new
[docs] class Intermediate: """ A computed (derived) quantity, not a decision variable. The solver evaluates the matching :class:`IntermediateEq` and writes the result back to ``.value`` after each solve. """ def __init__(self, value=0): self.value = value def __repr__(self): return f"Intermediate({self.value!r})" def __deepcopy__(self, memo): new = Intermediate.__new__(Intermediate) memo[id(self)] = new new.value = self.value return new
[docs] class IntermediateEq: """ Declares how to compute an :class:`Intermediate` attribute from other variables. Return an ``IntermediateEq`` from a model's ``equations()`` to register a derived quantity; the solver evaluates ``eq`` and stores the result on the attribute named by ``attr``. """ def __init__(self, attr, eq): self.attr = attr self.eq = eq
[docs] class PostProcess: """A report-only quantity computed *after* the solve, outside the solver, from the solved model values via ``fn(values)``, where ``values`` is a namespace of the model's solved fields (read as ``values.vm_pu``). Unlike :class:`Intermediate`, it is never injected as a solver variable nor referenced by any equation - it carries no degrees of freedom and cannot affect convergence or squareness. The clean home for derived reports (e.g. ``vm_pu_squared = vm_pu^2``): physics stays in the solver, reporting outside. """ def __init__(self, fn, value=0): self.fn = fn # callable(values_namespace) -> number self.value = value def __repr__(self): return f"PostProcess({self.value!r})" def __deepcopy__(self, memo): new = PostProcess.__new__(PostProcess) memo[id(self)] = new new.fn = self.fn # a function is atomic - share by reference new.value = self.value return new
[docs] class GenericModel(ABC): """ Base class for all component models (nodes, branches, children, compounds). Public attributes (no leading ``_``) form the solver-visible state. Use :class:`Var` for decision variables, :class:`Const` for pinned setpoints, plain scalars for parameters. """ def __init__(self, **kwargs) -> None: super().__init__() self._ext_data = kwargs @property def vars(self): return {k: v for k, v in self.__dict__.items() if k[0] != "_"} @property def values(self): return {k: value(v) for k, v in self.__dict__.items() if k[0] != "_"}
[docs] def is_cp(self): return False
def __deepcopy__(self, memo): new = self.__class__.__new__(self.__class__) memo[id(self)] = new for k, v in self.__dict__.items(): if v is None or v.__class__ in _IMMUTABLE_PRIMITIVES: new.__dict__[k] = v else: new.__dict__[k] = copy.deepcopy(v, memo) return new
[docs] class NodeModel(GenericModel): """Abstract base class for node models (buses, junctions). Subclasses define nodal equations."""
[docs] @abstractmethod def equations(self, grid, in_branch_models, out_branch_models, childs, **kwargs): """Return nodal equations (e.g. flow conservation, voltage balance)."""
[docs] def minimize(self, _grid, _in_branch_models, _out_branch_models, _childs, **kwargs): """Optional objective contribution (e.g. slack penalties). Default: none.""" return []
[docs] class BranchModel(GenericModel): """Abstract base class for network branches (lines, pipes). Subclasses define branch equations."""
[docs] @abstractmethod def equations(self, grid, from_node_model, to_node_model, **kwargs): pass
[docs] def minimize(self, _grid, _from_node_model, _to_node_model, **kwargs): return []
[docs] def loss_percent(self): return 0
[docs] def is_cp(self): return False
[docs] def init(self, grid): """Optional pre-solve initialization hook for the branch. Default: no-op."""
[docs] class MultiGridBranchModel(BranchModel): """Branch that couples multiple grid domains (e.g. CHP, P2H). Always a control point."""
[docs] @abstractmethod def equations(self, grids, from_node_model, to_node_model, **kwargs): pass
[docs] def is_cp(self): return True
[docs] def init(self, grids): """Optional pre-solve initialization hook spanning grids. Default: no-op."""
[docs] class MultiGridNodeModel(NodeModel): """Node that participates in multiple grid domains. Always a control point."""
[docs] def is_cp(self): return True
[docs] class CompoundModel(GenericModel): """ Composite component spanning multiple nodes/carriers (e.g. P2H unit). Subclasses implement :meth:`create` to add sub-components to the network, and may override :meth:`equations` for coupling constraints. """
[docs] @abstractmethod def create(self, network): """Add sub-components (nodes, branches, children) to *network*."""
[docs] def equations(self, _network, **kwargs): return []
[docs] def minimize(self, _network, **kwargs): return []
[docs] class MultiGridCompoundModel(CompoundModel):
[docs] def is_cp(self): return False
[docs] class ChildModel(GenericModel): """ Leaf component attached to a single node (loads, generators, ext-grids). Load convention: positive = consumption, negative = generation. Generator subclasses negate the user-supplied magnitude so the public API takes positive numbers. ``regulation`` in ``[0.0, 1.0]`` scales the setpoint; used by multi-energy couplers (P2H, CHP) for partial dispatch. """ def __init__(self, regulation: float = 1.0, **kwargs): super().__init__(**kwargs) self.regulation = regulation
[docs] def overwrite(self, node_model, grid): """Pin node variables to fixed setpoints (grid-forming children). Default: no-op."""
[docs] @abstractmethod def equations(self, grid, node_model, **kwargs): pass
[docs] def minimize(self, _grid, _node_model, **kwargs): return []
[docs] class Component(ABC): def __init__( self, id, model, formulation=None, constraints=None, grid=None, name=None, active=True, independent=True, ) -> None: self.model = model self.id = id self.constraints = [] if constraints is None else constraints self.name = name self.active = active self.grid = grid self.independent = independent self.ignored = False # Declarative only - which equations to use. Variable declaration # (ensure_var) is deferred to the solver's attach_formulations() pass, # which runs on the solve-time network copy. self.formulation = formulation # Set by the Network builders when an explicit formulation= was passed; # pinned formulations survive a solver-level formulation override. self.formulation_pinned = False @property def tid(self): return f"{self.__class__.__name__}-{self.id}".lower() @property def nid(self): return f"{self.model.__class__.__name__}-{self.id}".lower() def __deepcopy__(self, memo): new = self.__class__.__new__(self.__class__) memo[id(self)] = new for k, v in self.__dict__.items(): if v is None or v.__class__ in _IMMUTABLE_PRIMITIVES: new.__dict__[k] = v else: new.__dict__[k] = copy.deepcopy(v, memo) return new
[docs] class Child(Component): def __init__( self, child_id, model, formulation=None, constraints=None, grid=None, name=None, active=True, independent=True, ) -> None: super().__init__( child_id, model, formulation, constraints, grid, name, active, independent ) self.node_id = None
[docs] def equations(self, grid, node_model, **kwargs): model_eqs = self.model.equations(grid, node_model, **kwargs) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.equations( self.model, grid, node_model, **kwargs ) return ( form_eqs + model_eqs + [c(self.model, grid, node_model, **kwargs) for c in self.constraints] )
[docs] def minimize(self, grid, node_model, **kwargs): model_eqs = self.model.minimize(grid, node_model, **kwargs) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.minimize(self.model, grid, node_model, **kwargs) return form_eqs + model_eqs
[docs] class Compound(Component): def __init__( self, compound_id, model: CompoundModel, connected_to, subcomponents, formulation=None, constraints=None, grid=None, name=None, active=True, ) -> None: super().__init__( compound_id, model, formulation, constraints, grid, name, active, True ) self.connected_to = connected_to self.subcomponents = subcomponents
[docs] def equations(self, network, **kwargs): model_eqs = self.model.equations(network, **kwargs) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.equations(self.model, network, **kwargs) return ( form_eqs + model_eqs + [c(self.model, network, **kwargs) for c in self.constraints] )
[docs] def minimize(self, network, **kwargs): model_eqs = self.model.minimize(network, **kwargs) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.minimize(self.model, network, **kwargs) return form_eqs + model_eqs
[docs] def component_of_type(self, comp_type): return [ component for component in self.subcomponents if type(component) is comp_type ]
[docs] class Node(Component): """Network node tracking attached child components and incoming/outgoing branches.""" def __init__( self, node_id, model, child_ids=None, formulation=None, constraints=None, grid=None, name=None, position=None, active=True, independent=True, ) -> None: super().__init__( node_id, model, formulation, constraints, grid, name, active, independent ) self.child_ids = [] if child_ids is None else child_ids self.constraints = [] if constraints is None else constraints self.from_branch_ids = [] self.to_branch_ids = [] self.position = position
[docs] def equations(self, grid, in_branch_models, out_branch_models, childs, **kwargs): model_eqs = self.model.equations( grid, in_branch_models, out_branch_models, childs, **kwargs ) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.equations( self.model, grid, in_branch_models, out_branch_models, childs, **kwargs ) return ( form_eqs + model_eqs + [ c( self.model, grid, in_branch_models, out_branch_models, childs, **kwargs, ) for c in self.constraints ] )
[docs] def minimize(self, grid, in_branch_models, out_branch_models, childs, **kwargs): model_eqs = self.model.minimize( grid, in_branch_models, out_branch_models, childs, **kwargs ) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.minimize( self.model, grid, in_branch_models, out_branch_models, childs, **kwargs ) return form_eqs + model_eqs
[docs] def add_from_branch_id(self, branch_id): self.from_branch_ids.append(branch_id)
[docs] def add_to_branch_id(self, branch_id): self.to_branch_ids.append(branch_id)
def _remove_branch(self, branch_id): if branch_id in self.to_branch_ids: self.to_branch_ids.remove(branch_id) elif branch_id in self.from_branch_ids: self.from_branch_ids.remove(branch_id)
[docs] def remove_branch(self, branch_id): """Remove ``branch_id`` and its reversed counterpart from both branch lists.""" switched = (branch_id[1], branch_id[0], branch_id[2]) self._remove_branch(branch_id) self._remove_branch(switched)
[docs] class Branch(Component): def __init__( self, model, from_node_id, to_node_id, formulation=None, constraints=None, grid=None, name=None, active=True, independent=True, ) -> None: super().__init__( None, model, formulation, constraints, grid, name, active, independent ) self.from_node_id = from_node_id self.to_node_id = to_node_id
[docs] def equations(self, grid, from_node_model, to_node_model, **kwargs): model_eqs = self.model.equations(grid, from_node_model, to_node_model, **kwargs) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.equations( self.model, grid, from_node_model, to_node_model, **kwargs ) return ( form_eqs + model_eqs + [ c(self.model, grid, from_node_model, to_node_model, **kwargs) for c in self.constraints ] )
[docs] def minimize(self, grid, from_node_model, to_node_model, **kwargs): model_eqs = self.model.minimize(grid, from_node_model, to_node_model, **kwargs) form_eqs = [] if self.formulation is not None: form_eqs = self.formulation.minimize( self.model, grid, from_node_model, to_node_model, **kwargs ) return form_eqs + model_eqs
@property def tid(self): if self.id[0] > self.id[1]: return f"branch-{self.id[0]}-{self.id[1]}" else: return f"branch-{self.id[1]}-{self.id[0]}"