Source code for monee.model.network

from __future__ import annotations

import copy
from typing import TYPE_CHECKING

import networkx as nx
import pandas

from .core import (
    EL_KEY,
    GAS_KEY,
    WATER_KEY,
    Branch,
    Child,
    Component,
    Compound,
    CompoundModel,
    Const,
    GenericModel,
    Intermediate,
    Node,
    PostProcess,
    Var,
)
from .grid import create_gas_grid, create_power_grid, create_water_grid

if TYPE_CHECKING:
    from .extension.core import NetworkAspect

    # Imported only for typing: at runtime ``apply_formulation`` resolves the
    # spec lazily via ``formulation.registry.resolve_formulation`` (a local
    # import), so the model package no longer eagerly triggers the
    # formulation -> branch/node import chain at load time.
    from .formulation import (  # noqa: F401
        Formulation,
        NetworkFormulation,
    )


[docs] class Network: def __init__( self, active_grid=None, el_model=None, water_model=None, gas_model=None, ) -> None: self._default_grid_models = { EL_KEY: el_model or create_power_grid("power"), WATER_KEY: water_model or create_water_grid("water"), GAS_KEY: gas_model or create_gas_grid("gas"), } self._network_internal = nx.MultiGraph() self._child_dict = {} self._compound_dict = {} self._constraints = [] self._objectives = [] self._extensions: list[NetworkAspect] = [] self.__blacklist = [] self.__collected_components = [] self.__force_blacklist = False self.__collect_components = False self.__current_grid = active_grid # Declarative network-level formulation choice. No default is seeded # here: components without an explicit choice fall back to # DEFAULT_SIMULATION_FORMULATION when the solver attaches formulations # (see monee.model.formulation.registry.attach_formulations). self.__default_formulation: dict[tuple[type, type], Formulation] = {}
[docs] def apply_formulation(self, network_formulation): """Record *network_formulation* as the network-level default. Accepts the same spec as the solver's ``formulation`` argument (:func:`~monee.model.formulation.registry.resolve_formulation`): a registry key string (``"smooth_nlp"``), a :class:`NetworkFormulation`, or a sequence of either (merged left to right). Side-effect free: only the network's formulation map is updated - components and their models are untouched. The choice materialises when a solver runs ``attach_formulations`` on its solve-time copy. A ``formulation`` argument passed to the solver overrides this choice; per-component formulations passed to the builder methods override both. Repeated calls merge: later registrations win per type key. """ from .formulation.registry import resolve_formulation network_formulation = resolve_formulation(network_formulation) if network_formulation is None: return for type_or_tuple, formulation in network_formulation.items(): tc, tg = None, None if isinstance(type_or_tuple, tuple): tc, tg = type_or_tuple else: tc = type_or_tuple self.__default_formulation[(tc, tg)] = formulation
[docs] def lookup_formulation(self, model, grid) -> Formulation | None: """The network-level formulation for *model* (and *grid*) accumulated from ``apply_formulation`` calls, or None. Last matching registration wins, mirroring :meth:`NetworkFormulation.lookup`.""" found = None for (tc, tg), formulation in self.__default_formulation.items(): if isinstance(model, tc) and (tg is None or type(grid) is tg): found = formulation return found
[docs] def set_default_grid(self, key, grid): self._default_grid_models[key] = grid
[docs] def activate_grid(self, grid): self.__current_grid = grid
@property def grids(self): return list({node.grid for node in self.nodes}) @property def graph(self): return self._network_internal def _set_active(self, cls, id, active): if cls == Node: self.node_by_id(id).active = active elif cls == Branch: branch = self.branch_by_id(id) if "active" in branch.model.vars: branch.model.active = active else: branch.active = active elif cls == Compound: compound: Compound = self.compound_by_id(id) # Propagate to subcomponents and the compound's own ``active`` so # ignore_*/inject_vars see a fully deactivated compound; model # set_active alone wouldn't deactivate subcomponent children. for component in compound.subcomponents: self._set_active(type(component), component.id, active) compound.active = active if hasattr(compound.model, "set_active"): compound.model.set_active(active) elif cls == Child: self.child_by_id(id).active = active
[docs] def deactivate_by_id(self, cls, id): self._set_active(cls, id, False)
[docs] def activate_by_id(self, cls, id): self._set_active(cls, id, True)
[docs] def activate(self, component): self.activate_by_id(type(component), component.id)
[docs] def deactivate(self, component): self.deactivate_by_id(type(component), component.id)
[docs] def all_models(self): return [model_container.model for model_container in self.all_components()]
[docs] def all_components(self): return self.childs + self.compounds + self.branches + self.nodes
[docs] def all_models_with_grid(self): model_container_list = self.childs + self.compounds + self.branches + self.nodes return [ ( model_container.model, model_container.grid if hasattr(model_container, "grid") else None, ) for model_container in model_container_list ]
@property def constraints(self): return self._constraints @property def objectives(self): return self._objectives @property def extensions(self) -> list[NetworkAspect]: """Solver-agnostic network-level extensions.""" return self._extensions
[docs] def add_extension(self, ext: NetworkAspect) -> None: """Register a NetworkAspect extension on this network.""" self._extensions.append(ext)
@property def compounds(self) -> list[Compound]: return list(self._compound_dict.values()) @property def childs(self) -> list[Child]: return list(self._child_dict.values()) @property def cps(self) -> list[GenericModel]: return [comp for comp in self.all_components() if comp.model.is_cp()]
[docs] def has_child(self, child_id): return child_id in self._child_dict
[docs] def remove_child(self, child_id): # Also drop the parent node's reference; otherwise childs_by_ids # raises KeyError walking node.child_ids. child = self._child_dict.pop(child_id) node_id = getattr(child, "node_id", None) if node_id is not None and node_id in self._network_internal.nodes: try: node = self.node_by_id(node_id) except KeyError: node = None if node is not None and child_id in node.child_ids: node.child_ids.remove(child_id)
[docs] def compound_of_node(self, node_id): for compound in self.compounds: for subcomponent in compound.subcomponents: if isinstance(subcomponent, Node) and subcomponent.id == node_id: return compound return None
[docs] def remove_node(self, node_id): # nx.remove_node drops all incident edges from the graph but leaves # the surviving neighbours' from_branch_ids/to_branch_ids pointing at # those now-vanished edges. Detach them first so later # branches_connected_to / components_connected_to on a neighbour does # not call branch_by_id on a missing edge. incident = [ (u, v, key) for u, v, key in self._network_internal.edges(node_id, keys=True) ] for u, v, key in incident: self.remove_branch_between(u, v, key=key) self._network_internal.remove_node(node_id)
[docs] def remove_branch(self, branch_id): branch: Branch = self.branch_by_id(branch_id) self.remove_branch_between( branch.from_node_id, branch.to_node_id, key=branch_id[2] )
[docs] def remove_compound(self, compound_id): compound: Compound = self.compound_by_id(compound_id) del self._compound_dict[compound_id] for subcomponent in compound.subcomponents: if isinstance(subcomponent, Node): self.remove_node(subcomponent.id) if isinstance(subcomponent, Branch): if self.has_branch(subcomponent.id): self.remove_branch(subcomponent.id)
[docs] def remove_branch_between(self, node_one, node_two, key=0): self._network_internal.remove_edge(node_one, node_two, key) self.node_by_id(node_one).remove_branch((node_one, node_two, key)) self.node_by_id(node_two).remove_branch((node_one, node_two, key))
[docs] def move_branch(self, branch_id, new_from_id, new_to_id): branch: Branch = self.branch_by_id(branch_id) self.remove_branch_between(branch_id[0], branch_id[1], key=branch_id[2]) return self.branch( branch.model, new_from_id, new_to_id, constraints=branch.constraints, grid=branch.grid, name=branch.name, )
[docs] def child_by_id(self, child_id): return self._child_dict[child_id]
[docs] def childs_by_type(self, cls): return [child for child in self.childs if type(child.model) is cls]
[docs] def compound_by_id(self, compound_id): return self._compound_dict[compound_id]
[docs] def compounds_by_type(self, cls): return [compound for compound in self.compounds if type(compound.model) is cls]
[docs] def nodes_by_type(self, cls): return [node for node in self.nodes if type(node.model) is cls]
[docs] def childs_by_ids(self, child_ids) -> list[Child]: return [self.child_by_id(child_id) for child_id in child_ids]
[docs] def has_any_child_of_type(self, branch, cls) -> bool: childs = self.get_childs_by_type(branch, cls) return len(childs) > 0
[docs] def get_childs_by_type(self, branch, cls) -> list[Child]: return [ child for child in self.childs_by_ids(branch.child_ids) if isinstance(child.model, cls) ]
[docs] def branches_by_ids(self, branch_ids) -> list[Branch]: return [self.branch_by_id(branch_id) for branch_id in branch_ids]
[docs] def is_blacklisted(self, obj): return obj in self.__blacklist
[docs] def has_node(self, node_id): return node_id in self._network_internal.nodes
[docs] def has_branch(self, branch_id): return branch_id in self._network_internal.edges
[docs] def get_branch_between(self, node_id_one, node_id_two, bid=0): return self._network_internal.get_edge_data(node_id_one, node_id_two)[bid][ "internal_branch" ]
[docs] def has_branch_between(self, node_id_one, node_id_two): return self._network_internal.has_edge(node_id_one, node_id_two)
[docs] def compounds_connected_to(self, node_id) -> list[Component]: return [ compound for compound in self.compounds if node_id in compound.connected_to.values() ]
[docs] def compound_of(self, subcomponent_component_id) -> Component | None: compounds = [ compound for compound in self.compounds if subcomponent_component_id in [sc.id for sc in compound.subcomponents] ] if len(compounds) == 0: return None return compounds[0]
[docs] def components_connected_to(self, node_id) -> list[Component]: node = self.node_by_id(node_id) return ( self.childs_by_ids(node.child_ids) + self.compounds_connected_to(node_id) + self.branches_by_ids(node.to_branch_ids) + self.branches_by_ids(node.from_branch_ids) )
[docs] def branches_connected_to(self, node_id) -> list[Branch]: node = self.node_by_id(node_id) return self.branches_by_ids(node.to_branch_ids) + self.branches_by_ids( node.from_branch_ids )
@property def nodes(self) -> list[Node]: return [ self._network_internal.nodes[node]["internal_node"] for node in self._network_internal.nodes ] @property def branches(self) -> list[Branch]: return [ self._network_internal.edges[edge]["internal_branch"] for edge in self._network_internal.edges ]
[docs] def node_by_id(self, node_id) -> Node: if node_id not in self._network_internal.nodes: raise ValueError( f"The node id '{node_id}' is not valid. The valid ids are {self._network_internal.nodes.keys()}" ) return self._network_internal.nodes[node_id]["internal_node"]
[docs] def branch_by_id(self, branch_id): if branch_id not in self._network_internal.edges: raise ValueError(f"The branch id '{branch_id}' is not valid.") return self._network_internal.edges[branch_id]["internal_branch"]
[docs] def branches_by_type(self, cls): return [branch for branch in self.branches if isinstance(branch.model, cls)]
def __insert_to_blacklist_if_forced(self, obj): if self.__force_blacklist: self.__blacklist.append(obj) def __insert_to_container_if_collect_toggled(self, obj): if self.__collect_components: self.__collected_components.append(obj)
[docs] def node_by_id_or_create(self, node_id, auto_node_creator, auto_grid_key): if not self.has_node(node_id): return self.node_by_id( self.node(auto_node_creator(), grid=auto_grid_key, overwrite_id=node_id) ) return self.node_by_id(node_id)
[docs] def child( self, model, attach_to_node_id=None, formulation=None, constraints=None, overwrite_id=None, name=None, auto_node_creator=None, auto_grid_key=None, ): next_child_id = ( 0 if len(self._child_dict) == 0 else max(self._child_dict.keys()) + 1 ) child_id = overwrite_id if overwrite_id is not None else next_child_id child = Child( child_id, model, formulation=formulation, constraints=constraints, name=name, independent=not self.__collect_components, ) child.formulation_pinned = formulation is not None self.__insert_to_blacklist_if_forced(child) self.__insert_to_container_if_collect_toggled(child) self._child_dict[child_id] = child if attach_to_node_id is not None: child.node_id = attach_to_node_id attaching_node = self.node_by_id_or_create( attach_to_node_id, auto_node_creator, auto_grid_key ) attaching_node.child_ids.append(child_id) child.grid = attaching_node.grid child.node_id = attaching_node.id return child_id
[docs] def child_to( self, model, node_id, formulation=None, constraints=None, overwrite_id=None, name=None, auto_node_creator=None, auto_grid_key=None, ): return self.child( model, formulation=formulation, attach_to_node_id=node_id, constraints=constraints, overwrite_id=overwrite_id, name=name, auto_node_creator=auto_node_creator, auto_grid_key=auto_grid_key, )
[docs] def first_node(self): return min(self._network_internal)
def _or_default(self, grid_or_name): if isinstance(grid_or_name, str): return self._default_grid_models[grid_or_name] if grid_or_name is None: if self.__current_grid is None: raise ValueError( "No active grid and no grid was provided. Please provide a grid by using the argument grid= or use activate_grid(grid) to activate a grid for the whole Network object." ) if isinstance(self.__current_grid, str): return self._default_grid_models[self.__current_grid] return self.__current_grid return grid_or_name
[docs] def node( self, model, grid=None, formulation=None, child_ids=None, constraints=None, overwrite_id=None, name=None, position=None, ): node_id = ( 0 if len(self._network_internal) == 0 else max(self._network_internal) + 1 ) if overwrite_id is not None: node_id = overwrite_id grid = self._or_default(grid) # Apply the grid's voltage floor to the bus voltage variable so the # 1/vm term in the AC current equations stays well-conditioned for NLP # solvers (IPOPT). No-op for nodes without ``vm_pu`` (e.g. junctions) and # for a user-customised lower bound (only the default 0 is raised). vm_pu = getattr(model, "vm_pu", None) if ( isinstance(vm_pu, Var) and vm_pu.min in (0, None) and hasattr(grid, "vm_pu_min") ): vm_pu.min = grid.vm_pu_min node = Node( node_id, model, child_ids, formulation=formulation, constraints=constraints, grid=grid, name=name, position=position, independent=not self.__collect_components, ) node.formulation_pinned = formulation is not None if child_ids is not None: for child_id in child_ids: child = self.child_by_id(child_id) child.grid = node.grid child.node_id = node_id self.__insert_to_blacklist_if_forced(node) self.__insert_to_container_if_collect_toggled(node) self._network_internal.add_node(node_id, internal_node=node) return node_id
[docs] def branch( self, model, from_node_id, to_node_id, formulation=None, constraints=None, grid=None, name=None, auto_node_creator=None, auto_grid_key=None, **kwargs, ): from_node = self.node_by_id_or_create( from_node_id, auto_node_creator=auto_node_creator, auto_grid_key=auto_grid_key, ) to_node = self.node_by_id_or_create( to_node_id, auto_node_creator=auto_node_creator, auto_grid_key=auto_grid_key ) if grid is not None: grid = self._or_default(grid) branch = Branch( model, from_node_id, to_node_id, formulation=formulation, constraints=constraints, grid=grid or ( from_node.grid if from_node.grid == to_node.grid else { type(from_node.grid): from_node.grid, type(to_node.grid): to_node.grid, } ), name=name, independent=not self.__collect_components, **kwargs, ) branch.formulation_pinned = formulation is not None self.__insert_to_blacklist_if_forced(branch) self.__insert_to_container_if_collect_toggled(branch) branch_id = ( from_node_id, to_node_id, self._network_internal.add_edge( from_node_id, to_node_id, internal_branch=branch ), ) branch.id = branch_id to_node.add_to_branch_id(branch_id) from_node.add_from_branch_id(branch_id) return branch_id
[docs] def compound( self, model: CompoundModel, formulation=None, constraints=None, overwrite_id=None, **connected_node_ids, ): next_compound_id = ( 0 if len(self._compound_dict) == 0 else max(self._compound_dict.keys()) + 1 ) compound_id = overwrite_id if overwrite_id is not None else next_compound_id self.__force_blacklist = True self.__collect_components = True try: model.create( self, **{ k.replace("_id", "") if k.endswith("_id") else k: self.node_by_id(v) for k, v in connected_node_ids.items() }, ) finally: self.__collect_components = False self.__force_blacklist = False compound = Compound( compound_id=compound_id, formulation=formulation, model=model, constraints=constraints, connected_to=connected_node_ids, subcomponents=self.__collected_components, ) compound.formulation_pinned = formulation is not None self._compound_dict[compound_id] = compound self.__collected_components = [] return compound_id
[docs] def constraint(self, constraint_equation): self._constraints.append(constraint_equation)
[docs] def objective(self, objective_function): self._objectives.append(objective_function)
@staticmethod def _model_dict_to_input(container): model_dict = container.model.__dict__ input_dict = { "active": container.active, "id": container.id, "independent": container.independent, "ignored": container.ignored, } for k, v in model_dict.items(): input_value = v if isinstance(v, Var): input_value = "$VAR" if isinstance(v, Intermediate): input_value = "$INT" if isinstance(v, Const): input_value = v.value input_dict[k] = input_value return input_dict
[docs] def as_dataframe_dict(self): input_dict_list_dict = {} model_containers = self.nodes + self.childs + self.branches for container in model_containers: model_type_name = type(container.model).__name__ if model_type_name not in input_dict_list_dict: input_dict_list_dict[model_type_name] = [] input_dict = Network._model_dict_to_input(container) if isinstance(container, Child): input_dict["node_id"] = container.node_id input_dict_list_dict[model_type_name].append(input_dict) dataframe_dict = {} for result_type, dict_list in input_dict_list_dict.items(): dataframe_dict[result_type] = pandas.DataFrame(dict_list) return dataframe_dict
@staticmethod def _model_dict_to_results(container): model_dict = container.model.vars result_dict = { "active": container.active, "id": container.id, "independent": container.independent, "ignored": container.ignored, } for k, v in model_dict.items(): result_value = v if isinstance(v, Var | Const | Intermediate | PostProcess): result_value = v.value result_dict[k] = result_value return result_dict
[docs] def as_result_dataframe_dict(self): result_dict_list_dict = {} model_containers = self.nodes + self.childs + self.branches for container in model_containers: model_type_name = type(container.model).__name__ if model_type_name not in result_dict_list_dict: result_dict_list_dict[model_type_name] = [] result_dict = Network._model_dict_to_results(container) if isinstance(container, Child): result_dict["node_id"] = container.node_id result_dict_list_dict[model_type_name].append(result_dict) dataframe_dict = {} for result_type, dict_list in result_dict_list_dict.items(): dataframe_dict[result_type] = pandas.DataFrame(dict_list) return dataframe_dict
[docs] def as_result_dataframe_dict_str(self): dataframes = self.as_result_dataframe_dict() result_str = "" for cls_str, dataframe in dataframes.items(): result_str += cls_str result_str += "\n" result_str += dataframe.to_string() result_str += "\n" result_str += "\n" return result_str
[docs] def as_dataframe_dict_str(self): dataframes = self.as_dataframe_dict() result_str = "" for cls_str, dataframe in dataframes.items(): result_str += cls_str result_str += "\n" result_str += dataframe.to_string() result_str += "\n" result_str += "\n" return result_str
def __repr__(self): return self.as_dataframe_dict_str() def __str__(self): return self.as_dataframe_dict_str()
[docs] def statistics(self): type_to_number = {} model_containers = self.nodes + self.childs + self.branches + self.compounds for container in model_containers: if not container.independent: continue model_type = type(container.model) if model_type in type_to_number: type_to_number[model_type] += 1 else: type_to_number[model_type] = 1 return type_to_number
[docs] def copy(self): return copy.deepcopy(self)
def __deepcopy__(self, memo): new = Network.__new__(Network) memo[id(self)] = new new._default_grid_models = copy.deepcopy(self._default_grid_models, memo) new._child_dict = { k: copy.deepcopy(v, memo) for k, v in self._child_dict.items() } new._compound_dict = { k: copy.deepcopy(v, memo) for k, v in self._compound_dict.items() } # Constraints/objectives are stateless lambdas - share by reference. new._constraints = list(self._constraints) new._objectives = list(self._objectives) new._extensions = copy.deepcopy(self._extensions, memo) # Compound-construction transients - deepcopy preserves consistency # if the copy ever lands mid-build. new._Network__blacklist = copy.deepcopy(self._Network__blacklist, memo) new._Network__collected_components = copy.deepcopy( self._Network__collected_components, memo ) new._Network__force_blacklist = self._Network__force_blacklist new._Network__collect_components = self._Network__collect_components new._Network__current_grid = copy.deepcopy(self._Network__current_grid, memo) # Default formulations are module-level singletons - share by reference. new._Network__default_formulation = dict(self._Network__default_formulation) # Manual MultiGraph rebuild - networkx generic deepcopy is much slower. g = nx.MultiGraph() new._network_internal = g for node_id, data in self._network_internal.nodes(data=True): new_data = {k: copy.deepcopy(v, memo) for k, v in data.items()} g.add_node(node_id, **new_data) for u, v, key, data in self._network_internal.edges(keys=True, data=True): new_data = {dk: copy.deepcopy(dv, memo) for dk, dv in data.items()} g.add_edge(u, v, key=key, **new_data) return new
[docs] def clear_childs(self): self._child_dict = {} for node in self.nodes: node.child_ids = []
def _clean_up_compound(network: Network, compound): node_components = compound.component_of_type(Node) fully_intact = True for component in node_components: if not network.has_node(component.id): fully_intact = False child_components = compound.component_of_type(Child) for component in child_components: if not network.has_child(component.id): fully_intact = False branch_components = compound.component_of_type(Branch) for component in branch_components: if not network.has_branch(component.id): fully_intact = False compound_components = compound.component_of_type(Compound) for component in compound_components: compound_alive = _clean_up_compound(network, component) if not compound_alive: fully_intact = False network.remove_compound(compound.id) return fully_intact
[docs] def to_spanning_tree(network: Network): return transform_network(network, nx.minimum_spanning_tree)
[docs] def transform_network(network: Network, graph_transform): network = network.copy() network._network_internal = graph_transform(network.graph) # The transform (e.g. minimum_spanning_tree) drops edges, leaving the # surviving nodes' from_branch_ids/to_branch_ids referencing branches that # no longer exist. Rebuild those lists from the reduced edge set so # branches_connected_to / components_connected_to stay consistent. for node in network.nodes: node.from_branch_ids = [] node.to_branch_ids = [] for from_id, to_id, key in network._network_internal.edges(keys=True): branch_id = (from_id, to_id, key) network.node_by_id(from_id).add_from_branch_id(branch_id) network.node_by_id(to_id).add_to_branch_id(branch_id) for child in network.childs: referenced = False for node in network.nodes: if child.id in node.child_ids: referenced = True if not referenced: network.remove_child(child.id) for compound in network.compounds: _clean_up_compound(network, compound) return network
def _add_tuple(a, b): return [a[i] + b[i] for i in range(len(a))] def _div_tuple(a, div): return tuple(a[i] / div for i in range(len(a)))
[docs] def calc_coordinates(network: Network, component: Component): if type(component) is Node: return component.position elif type(component) is Branch: node_start = network.node_by_id(component.from_node_id) node_end = network.node_by_id(component.to_node_id) return tuple( (node_start.position[i] + node_end.position[i]) / 2 for i in range(len(node_start.position)) ) elif type(component) is Child: return network.node_by_id(component.node_id).position elif type(component) is Compound: position = (0, 0) for connected_node_id in component.connected_to.values(): node = network.node_by_id(connected_node_id) position = _add_tuple(position, node.position) return _div_tuple(position, len(component.connected_to)) raise ValueError(f"This should not happen! The component {component} is unknown.")