Source code for monee.io.native

"""Native JSON (de)serialization for monee :class:`~monee.model.network.Network`.

The native format is a plain ``dict`` (JSON-friendly) with this shape::

    {
        "version": 2,
        "grids":    {name: {"model_type": str, "values": {...}}},
        "nodes":    [{"id", "grid_id", "child_ids", "position",
                      "active", "name", "values", "model_type"}],
        "childs":   [{"id", "active", "name", "values", "model_type"}],
        "branches": [{"id", "from_node", "to_node", "grid_id", "grid_ids",
                      "active", "name", "values", "model_type"}],
        "compounds":[{"id", "connected_to", "values", "model_type"}],
    }

Each model attribute in a ``values`` block is encoded with full fidelity:
plain scalars pass through unchanged, while :class:`~monee.model.core.Var`,
:class:`~monee.model.core.Const` and :class:`~monee.model.core.Intermediate`
are tagged with a reserved ``__type__`` key so they round-trip exactly
(``Var`` keeps its bounds, ``integer`` flag and ``name``).

The reader stays backward compatible with the legacy format produced by older
versions and by :mod:`monee.io.matpower` (untagged ``{"value", "max", "min"}``
Var dicts).

Both the public, solver-visible attributes of a model *and* its JSON-encodable
private attributes (e.g. a storage's ``_p_max`` / ``_lossy``) are serialized, so
models whose behaviour is configured through private state round-trip faithfully.
Private attributes holding object references (grids, sub-models) are skipped on
purpose - they are rebuilt by the compound ``create()`` / branch ``init()`` hooks.

Known limitations (not represented in the native format): user-supplied
``network.constraint`` / ``network.objective`` callables, network-level
extensions registered via ``add_extension`` (linepack, LTC, islanding), and
non-default formulations applied with ``apply_formulation`` - the latter are
re-derived from the default formulation rules on load.
"""

import inspect
import json
import numbers

from monee.model import Network
from monee.model.core import (
    Compound,
    Const,
    Intermediate,
    PostProcess,
    Var,
    component_list,
)

#: Bumped whenever the on-disk structure changes in a non-additive way.
FORMAT_VERSION = 2

#: Reserved key used to tag the concrete type of an encoded value object.
_TYPE_KEY = "__type__"


[docs] class PersistenceException(Exception): pass
# --------------------------------------------------------------------------- # # Value (de)serialization # # --------------------------------------------------------------------------- # def _encodable(value): """Return ``True`` if *value* can be losslessly written to the native format.""" if value is None or isinstance(value, (bool, str)): return True if isinstance(value, (Var, Const, Intermediate, PostProcess)): return True if isinstance(value, (list, tuple)): return all(_encodable(item) for item in value) if isinstance(value, dict): return all(isinstance(k, str) and _encodable(v) for k, v in value.items()) # numbers.Number also covers numpy integer/float scalars. return isinstance(value, numbers.Number) def _encode_value(value): """Encode a single model attribute value to a JSON-serializable form.""" if isinstance(value, Var): return { _TYPE_KEY: "Var", "value": value.value, "max": value.max, "min": value.min, "integer": value.integer, "name": value.name, } if isinstance(value, Const): return {_TYPE_KEY: "Const", "value": value.value} if isinstance(value, Intermediate): return {_TYPE_KEY: "Intermediate", "value": value.value} if isinstance(value, PostProcess): # Only the computed value is portable; the lambda is re-attached on the # next solve (its model's equations()/__init__ recreate it). return {_TYPE_KEY: "PostProcess", "value": value.value} if isinstance(value, dict): return {k: _encode_value(v) for k, v in value.items()} if isinstance(value, (list, tuple)): return [_encode_value(v) for v in value] return value def _decode_dict_value(value): tag = value.get(_TYPE_KEY) if tag == "Var": return Var( value["value"], max=value.get("max"), min=value.get("min"), integer=value.get("integer", False), name=value.get("name"), ) if tag == "Const": return Const(value["value"]) if tag == "Intermediate": return Intermediate(value["value"]) if tag == "PostProcess": # Carry the stored value via a constant lambda; the model's next # solve re-attaches the real computation. stored = value["value"] return PostProcess(lambda _vals, _v=stored: _v, value=stored) if tag is not None: raise PersistenceException(f"Unknown encoded value type: {tag!r}") # --- legacy (untagged) handling ------------------------------------ # # Older files and matpower import emit bare Var dicts without a tag. if "value" in value and ("max" in value or "min" in value): return Var( value["value"], max=value.get("max"), min=value.get("min"), integer=value.get("integer", False), name=value.get("name"), ) if set(value.keys()) == {"value"}: # Legacy Intermediate/Const both serialized as {"value": x}; restore # the numeric value as an Intermediate rather than dropping it. return Intermediate(value["value"]) return {k: _decode_value(v) for k, v in value.items()} def _decode_value(value): """Inverse of :func:`_encode_value`, tolerant of the legacy format.""" if isinstance(value, dict): return _decode_dict_value(value) if isinstance(value, list): return [_decode_value(v) for v in value] return value def _encode_values(model_dict): return {k: _encode_value(v) for k, v in model_dict.items()} def _decode_values(values_dict): return {k: _decode_value(v) for k, v in values_dict.items()} def _json_default(obj): """Fallback encoder for stray objects (e.g. numpy scalars) in ``values``.""" if isinstance(obj, (Var, Const, Intermediate, PostProcess)): return _encode_value(obj) if hasattr(obj, "item"): # numpy scalar return obj.item() return vars(obj) # --------------------------------------------------------------------------- # # Deserialization (dict -> Network) # # --------------------------------------------------------------------------- #
[docs] def init_model(model_type, preprocessed_dict): model_type_dict = { component_cls.__name__: component_cls for component_cls in component_list } if model_type not in model_type_dict: raise PersistenceException( f"The type {model_type} is not known! Maybe you forgot to decorate " f"your model class with @model?" ) model_cls = model_type_dict[model_type] # Required params get 0; optional params keep their default. Real attr # values arrive via setattr, bypassing constructor transforms so the # serialized state round-trips exactly. init_kwargs = {} for argname, param in list( inspect.signature(model_cls.__init__).parameters.items() )[1:]: if param.kind in ( inspect.Parameter.VAR_POSITIONAL, inspect.Parameter.VAR_KEYWORD, ): # *args / **kwargs accept nothing by default. continue init_kwargs[argname] = ( 0 if param.default is inspect.Parameter.empty else param.default ) model = model_cls(**init_kwargs) for key, value in preprocessed_dict.items(): setattr(model, key, value) return model
# Kept for backward compatibility (older callers/imports).
[docs] def preprocess_dict(model_dict): return _decode_values(model_dict)
[docs] def native_dict_to_network(dict_struct) -> Network: network = Network(None) # Build grid objects without mutating the caller's dict. grid_by_name = {} for name, grid_entry in dict_struct["grids"].items(): grid_by_name[name] = init_model( grid_entry["model_type"], _decode_values(grid_entry["values"]) ) for child_dict in dict_struct["childs"]: model = init_model( child_dict["model_type"], _decode_values(child_dict["values"]) ) child_id = network.child( model, overwrite_id=child_dict["id"], name=child_dict.get("name"), ) if not child_dict.get("active", True): network.deactivate_by_id(type(network.child_by_id(child_id)), child_id) for node_dict in dict_struct["nodes"]: model = init_model(node_dict["model_type"], _decode_values(node_dict["values"])) position = node_dict.get("position") node_id = network.node( model, child_ids=list(node_dict["child_ids"]), grid=grid_by_name[node_dict["grid_id"]], overwrite_id=node_dict["id"], name=node_dict.get("name"), position=tuple(position) if isinstance(position, list) else position, ) node = network.node_by_id(node_id) node.active = node_dict.get("active", True) for compound_dict in dict_struct.get("compounds", []): model = init_model( compound_dict["model_type"], _decode_values(compound_dict["values"]) ) compound_id = network.compound( model, overwrite_id=compound_dict.get("id"), **compound_dict["connected_to"], ) compound = network.compound_by_id(compound_id) compound.name = compound_dict.get("name") if not compound_dict.get("active", True): network.deactivate_by_id(Compound, compound_id) for branch_dict in dict_struct["branches"]: model = init_model( branch_dict["model_type"], _decode_values(branch_dict["values"]) ) grid = _resolve_branch_grid(branch_dict, grid_by_name) branch_id = network.branch( model, from_node_id=branch_dict["from_node"], to_node_id=branch_dict["to_node"], grid=grid, name=branch_dict.get("name"), ) branch = network.branch_by_id(branch_id) branch.active = branch_dict.get("active", True) return network
def _resolve_branch_grid(branch_dict, grid_by_name): """Return the grid argument for re-creating a branch. Single-grid branches get their concrete grid. Multi-grid (coupling) branches return ``None`` so :meth:`Network.branch` rebuilds the grid-type mapping from the endpoint node grids. """ grid_ids = branch_dict.get("grid_ids") if grid_ids is None: grid_id = branch_dict.get("grid_id") grid_ids = [grid_id] if grid_id is not None else [] distinct = [grid_by_name[g] for g in dict.fromkeys(grid_ids)] if len(distinct) == 1: return distinct[0] return None
[docs] def load_to_network(file) -> Network: with open(file, encoding="utf-8") as read_fp: dict_struct = json.load(read_fp) return native_dict_to_network(dict_struct)
# --------------------------------------------------------------------------- # # Serialization (Network -> dict) # # --------------------------------------------------------------------------- #
[docs] def iter_concrete_grids(grid): """Yield the concrete grid object(s) backing a component. Single-grid components carry their grid directly. Multi-grid branches (coupling points that span e.g. a power and a gas grid) carry a ``dict`` mapping grid type -> grid (and sometimes a ``list`` of grids). Yield every distinct concrete grid that exposes a ``name``, in a stable order. """ if isinstance(grid, dict): seen = set() for value in grid.values(): candidates = value if isinstance(value, (list, tuple)) else [value] for candidate in candidates: if ( getattr(candidate, "name", None) is not None and id(candidate) not in seen ): seen.add(id(candidate)) yield candidate elif getattr(grid, "name", None) is not None: yield grid
[docs] def fetch_grid_to_dict(grid_dict, grid_from_model): for grid in iter_concrete_grids(grid_from_model): if grid.name not in grid_dict: grid_dict[grid.name] = grid elif grid_dict[grid.name] is not grid: raise PersistenceException( f"You must not define multiple grids with the same name: {grid.name}" )
[docs] def model_to_dict(model): """Encode a model's full state. All public (solver-visible) attributes are serialized; private attributes are serialized only when JSON-encodable, since non-encodable ones hold object references that are rebuilt by ``create()`` / ``init()`` on load. A public attribute that cannot be encoded is a genuine gap and raises. """ result = {} for key, value in model.__dict__.items(): if _encodable(value): result[key] = _encode_value(value) elif not key.startswith("_"): raise PersistenceException( f"Cannot serialize public attribute {key!r} of " f"{type(model).__name__}: unsupported type {type(value).__name__}." ) return result
[docs] def child_to_dict(child): return { "id": child.id, "active": child.active, "name": child.name, "values": model_to_dict(child.model), "model_type": type(child.model).__name__, }
[docs] def compound_to_dict(compound): return { "id": compound.id, "active": compound.active, "name": compound.name, "values": model_to_dict(compound.model), "model_type": type(compound.model).__name__, "connected_to": compound.connected_to, }
[docs] def branch_to_dict(branch, grids): fetch_grid_to_dict(grids, branch.grid) grid_ids = [grid.name for grid in iter_concrete_grids(branch.grid)] return { "id": branch.id, "from_node": branch.id[0], "to_node": branch.id[1], # grid_id keeps a single representative for backward compatibility; # grid_ids preserves the full (possibly multi-grid) mapping. "grid_id": grid_ids[0] if grid_ids else None, "grid_ids": grid_ids, "active": branch.active, "name": branch.name, "values": model_to_dict(branch.model), "model_type": type(branch.model).__name__, }
[docs] def node_to_dict(node, grids): fetch_grid_to_dict(grids, node.grid) return { "id": node.id, "grid_id": node.grid.name, "child_ids": node.child_ids, "position": node.position, "active": node.active, "name": node.name, "values": model_to_dict(node.model), "model_type": type(node.model).__name__, }
[docs] def network_to_native_dict(network: Network) -> dict: """Serialize *network* to the native, JSON-friendly ``dict`` form.""" grids = {} node_dict_list = [ node_to_dict(node, grids) for node in network.nodes if not network.is_blacklisted(node) ] branch_dict_list = [ branch_to_dict(branch, grids) for branch in network.branches if not network.is_blacklisted(branch) ] child_dict_list = [ child_to_dict(child) for child in network.childs if not network.is_blacklisted(child) ] compound_dict_list = [ compound_to_dict(compound) for compound in network.compounds if not network.is_blacklisted(compound) ] return { "version": FORMAT_VERSION, "grids": { k: {"values": _encode_values(vars(v)), "model_type": type(v).__name__} for k, v in grids.items() }, "nodes": node_dict_list, "childs": child_dict_list, "branches": branch_dict_list, "compounds": compound_dict_list, }
[docs] def write_omef_network(file, network: Network): """Serialize *network* to *file* as native JSON.""" to_serialize = network_to_native_dict(network) with open(file, "w", encoding="utf-8") as write_fp: json.dump(to_serialize, write_fp, indent=3, default=_json_default)