"""Minimal load shedding optimisation for multi-energy grids.
Minimises total unserved energy across electrical, gas, and thermal carriers
(single- or multi-period).
"""
import logging
import math
from monee.model.branch import (
GenericPowerBranch,
HeatExchanger,
HeatExchangerGenerator,
HeatExchangerLoad,
PassiveHeatExchanger,
PassiveHeatExchangerGenerator,
PassiveHeatExchangerLoad,
)
from monee.model.child import (
ExtHydrGrid,
ExtPowerGrid,
HeatGenerator,
HeatLoad,
PowerGenerator,
PowerLoad,
Sink,
Source,
)
from monee.model.grid import (
DEFAULT_GAS_HHV_KWH_PER_KG,
KGPS_KWHPERKG_TO_MW,
GasGrid,
WaterGrid,
)
from monee.model.multi import (
CHPControlNode,
CHPHGControlNode,
GasToHeatControlNode,
GasToHeatHG,
GasToPower,
PowerToGas,
PowerToHeatControlNode,
PowerToHeatHG,
)
from monee.model.node import Bus, Junction
from monee.problem.core import (
REGULATION_ATTR,
Constraints,
Objectives,
OptimizationProblem,
nan_to_zero,
)
from monee.problem.utils import cp_input_rated_mw, line_loading_limit
WEIGHT_DEMAND = 1e3
WEIGHT_GENERATOR = 0.1
# 10× weaker than the demand-shed cost so ties prefer self-sufficiency.
EXT_SLACK_WEIGHT_RATIO = 0.1
# Fallback HHV (kWh/kg) for Sink/Source on a grid without higher_heating_value_kwh_per_kg.
_HHV_DEFAULT = DEFAULT_GAS_HHV_KWH_PER_KG
# Types that participate in the load-shedding objective.
_DEMAND_TYPES = (
PowerLoad,
HeatLoad,
HeatExchangerLoad,
PassiveHeatExchangerLoad,
Sink,
)
_GENERATOR_TYPES = (
PowerGenerator,
HeatGenerator,
HeatExchangerGenerator,
PassiveHeatExchangerGenerator,
Source,
)
# HE types for the objective; SubHE is excluded via exact type() checks.
_HE_OBJECTIVE_TYPES = (
HeatExchangerLoad,
HeatExchangerGenerator,
PassiveHeatExchanger,
PassiveHeatExchangerLoad,
PassiveHeatExchangerGenerator,
)
# Coupling-point models (control nodes + branch HG variants). When the
# load-shedding problem is constructed with ``include_coupling_points=True``
# every CP regulation cut is penalised at the demand weight, scaled by the
# CP's nameplate input MW.
_COUPLING_POINT_TYPES = (
CHPControlNode,
CHPHGControlNode,
GasToHeatControlNode,
PowerToHeatControlNode,
GasToPower,
PowerToGas,
PowerToHeatHG,
GasToHeatHG,
)
def _gas_mw_factor(grid):
"""Return ``KGPS_KWHPERKG_TO_MW · HHV`` (kg/s → MW); falls back to :data:`_HHV_DEFAULT`."""
hhv = getattr(grid, "higher_heating_value_kwh_per_kg", _HHV_DEFAULT)
return KGPS_KWHPERKG_TO_MW * hhv
_SQRT_3 = math.sqrt(3.0)
def _aux_objective_upper_bound( # NOSONAR
network,
*,
vm_pu_fallback: float = 1.1,
max_line_loading: float | None = None,
) -> float:
"""Upper bound on ``|Σ aux_obj|`` over the formulation-level minimize hooks.
Contributors:
* MISOCP electricity: ``current_pu_squared·br_r_pu`` capped by ``min(SOC_bound,
line_loading_bound)``;
* Linear HE: ``|q_mw_set|``;
* NL gas/water epigraph (ε=1e-5): ``1e-5·2·M²`` per branch;
* McCormick-DHS t_pu pull (ε=1e-6): ``1e-6·2`` per junction.
Triangle inequality absorbs signs.
"""
total = 0.0
for component in network.all_components():
m = component.model
# br_r_pu/br_x_pu on PowerLine/Trafo are computed in equations() - at
# _apply time they are still 0, so call calc_r_x to read them now.
br_r_pu = getattr(m, "br_r_pu", None)
br_x_pu = getattr(m, "br_x_pu", None)
from_model = to_model = None
if hasattr(component, "from_node_id"):
try:
from_model = network.node_by_id(component.from_node_id).model
to_model = network.node_by_id(component.to_node_id).model
if hasattr(m, "calc_r_x"):
br_r_pu, br_x_pu = m.calc_r_x(component.grid, from_model, to_model)
except Exception:
from_model = to_model = None
if isinstance(br_r_pu, (int, float)) and isinstance(br_x_pu, (int, float)):
denom = br_r_pu * br_r_pu + br_x_pu * br_x_pu
if denom > 0 and br_r_pu > 0:
vm_max = getattr(component.grid, "vm_pu_max", vm_pu_fallback)
w_max = vm_max * vm_max
current_pu_max = 4.0 * w_max / denom # SOC physics bound
# Tighten via line-loading constraint:
# current_pu_squared ≤ max_loading² · (max_i_ka / I_base)²
max_i_ka = getattr(m, "max_i_ka", None)
grid = component.grid
if (
max_line_loading is not None
and isinstance(max_i_ka, (int, float))
and from_model is not None
and to_model is not None
and getattr(grid, "sn_mva", None)
and getattr(from_model, "base_kv", None)
and getattr(to_model, "base_kv", None)
):
tap = getattr(m, "tap", 1.0) or 1.0
i_base_from = grid.sn_mva / (_SQRT_3 * from_model.base_kv) / tap
i_base_to = grid.sn_mva / (_SQRT_3 * to_model.base_kv)
i_base_min = min(i_base_from, i_base_to)
loading_bound = max_line_loading**2 * (max_i_ka / i_base_min) ** 2
current_pu_max = min(current_pu_max, loading_bound)
total += current_pu_max * br_r_pu
# Linear HX: |q_mw_delivered| ≤ |q_mw_set·regulation| ⇒ bound |q_mw_set|.
q_mw_set = getattr(m, "q_mw_set", None)
if isinstance(q_mw_set, (int, float)):
total += abs(q_mw_set)
# NL gas/water epigraph ε·(m_pos²+m_neg²) (ε≈1e-5; use 1e-4 for safety).
m_pos_sq = getattr(m, "mass_flow_pos_kgs_squared", None)
m_neg_sq = getattr(m, "mass_flow_neg_kgs_squared", None)
if m_pos_sq is not None and m_neg_sq is not None:
max_pos = getattr(m_pos_sq, "max", None)
max_neg = getattr(m_neg_sq, "max", None)
ub_pos = max_pos if max_pos is not None else 100.0**2
ub_neg = max_neg if max_neg is not None else 100.0**2
total += 1e-4 * (ub_pos + ub_neg)
# McCormick-DHS t_pu pull: ε·(1-t_pu), t_pu∈[0,2].
t_pu = getattr(m, "t_pu", None)
if t_pu is not None:
total += 1e-6 * 2.0
return total
def _make_auto_priority_floor_hook( # NOSONAR
weights: dict,
*,
alpha: float,
debug: bool,
max_line_loading: float | None,
weight_for_load=None,
):
"""Return a ``_controllable_appliables`` hook that retunes *weights*.
Sets ``weights['demand'] = max(weights['demand'], α·A_max)`` and scales
``weights['generator']`` by the same factor so the user-set ratio is kept.
When ``weight_for_load`` is supplied, per-load weights returned by the
callback may be smaller than ``weights['demand']``. To preserve the
auto-floor contract (every load's shed cost dominates aux), the hook
samples the minimum effective weight across all demand-class models
and, if it is below the floor, scales every weight up by
``floor / min_w``. Generator weights and the per-load callback's
returns are both lifted by the same scale via the ``weights["scale"]``
multiplier consulted in ``weight_fn`` - the user-set load-priority
*ratios* are preserved.
"""
_log = logging.getLogger(__name__)
def _hook(network):
a_max = _aux_objective_upper_bound(network, max_line_loading=max_line_loading)
floor = alpha * a_max
if weight_for_load is not None:
# Sample the minimum effective per-load weight. We consider
# every model that ``weight_fn`` would route through
# ``weight_for_load`` (demand + HX-objective types).
min_w: float | None = None
for model in network.all_models():
if isinstance(model, _DEMAND_TYPES + _HE_OBJECTIVE_TYPES):
try:
w = weight_for_load(model)
except Exception:
w = None
if w is None:
# Model falls back to ``weights['demand']``.
w = weights["demand"]
if min_w is None or float(w) < min_w:
min_w = float(w)
if min_w is not None and min_w < floor:
scale = floor / min_w if min_w > 0 else 1.0
# ``weights['scale']`` multiplies BOTH the callback return
# AND the default ``weights['demand']`` - see ``weight_fn``.
weights["scale"] = weights.get("scale", 1.0) * scale
weights["generator"] = weights["generator"] * scale
if debug:
_log.warning(
"Auto priority floor (per-load): A_max=%.3g, α=%.3g, "
"min_w=%.3g → scale ×%.3g (generator weight scaled too)",
a_max,
alpha,
min_w,
scale,
)
elif debug:
_log.warning(
"Auto priority floor (per-load): A_max=%.3g, α·A_max=%.3g "
"already covered by per-load min_w=%.3g - no change.",
a_max,
floor,
min_w or 0.0,
)
return
# Legacy path: no per-load callback, scale the single demand weight.
old_demand = weights["demand"]
if floor > old_demand:
scale = floor / old_demand if old_demand > 0 else 1.0
weights["demand"] = floor
weights["generator"] = weights["generator"] * scale
if debug:
_log.warning(
"Auto priority floor: A_max=%.3g, α=%.3g → "
"demand_weight %.3g → %.3g (generator scaled ×%.3g)",
a_max,
alpha,
old_demand,
weights["demand"],
scale,
)
elif debug:
_log.warning(
"Auto priority floor: A_max=%.3g, α·A_max=%.3g already "
"covered by user-supplied demand_weight=%.3g - no change.",
a_max,
floor,
old_demand,
)
return _hook
def _shedding_mw(model, gas_mw_factor=None, cp_rated_mw=None): # NOSONAR
"""Unserved-energy expression for *model* in MW-equivalent.
For LinearHX branches the under-delivery gap ``|q_mw_set - q_mw_delivered|``
captures both regulation cuts and physical shortfall from a narrow ΔT,
which a pure ``(1-reg)`` proxy would miss. External grids return 0.
CP types (when ``include_coupling_points=True``) are penalised by
``cp_rated_mw · (1 - regulation)``.
"""
reg = getattr(model, "regulation", 1)
if isinstance(model, _COUPLING_POINT_TYPES):
rated = cp_rated_mw if cp_rated_mw is not None else 0
return rated * (1 - reg)
if isinstance(model, (PowerLoad, PowerGenerator)):
p = nan_to_zero(model.p_mw)
# Sign-known by construction: PowerLoad.p_mw ≥ 0, PowerGenerator ≤ 0.
if isinstance(model, PowerLoad):
return p * (1 - reg)
return (-p) * (1 - reg)
if isinstance(model, (HeatLoad, HeatGenerator)):
q = nan_to_zero(model.q_mw_heat)
if isinstance(model, HeatLoad):
return q * (1 - reg)
return (-q) * (1 - reg)
if isinstance(model, _HE_OBJECTIVE_TYPES) or type(model) is HeatExchanger:
q_mw_set = getattr(model, "q_mw_set", 0)
q_mw_delivered = getattr(model, "q_mw_delivered", None)
if q_mw_delivered is not None and isinstance(q_mw_set, (int, float)):
if q_mw_set > 0:
return q_mw_set - q_mw_delivered
if q_mw_set < 0:
return q_mw_delivered - q_mw_set
if isinstance(q_mw_set, (int, float)):
return abs(q_mw_set) * (1 - reg)
return q_mw_set * (1 - reg)
if isinstance(model, (Sink, Source)):
mf = nan_to_zero(model.mass_flow_kgs)
factor = (
gas_mw_factor
if gas_mw_factor is not None
else KGPS_KWHPERKG_TO_MW * _HHV_DEFAULT
)
if isinstance(model, Sink):
return mf * factor * (1 - reg)
return (-mf) * factor * (1 - reg)
return 0
def _calc_objective(model_to_data):
"""Sum weighted unserved energy.
``model_to_data: {model → (weight, gas_factor|None, cp_rated_mw|None)}``.
``cp_rated_mw`` is only populated for coupling-point models when
``include_coupling_points=True``; ``gas_factor`` only for Sink/Source.
"""
return sum(
_shedding_mw(model, gas_mw_factor=g, cp_rated_mw=cp) * weight
for model, (weight, g, cp) in model_to_data.items()
)
[docs]
def create_min_load_shedding_problem( # NOSONAR
*,
demand_weight=WEIGHT_DEMAND,
generator_weight=WEIGHT_GENERATOR,
weight_for_load=None,
bounds_vm=(0.9, 1.1),
bounds_pressure=(0.9, 1.1),
bounds_t=(0.9, 1.1),
max_line_loading=1.5,
bounds_ext_el=(-3, 3),
bounds_ext_gas=(-10, 10),
bounds_ext_heat=(-10, 10),
regulation_ramp_limit=None,
include_storages=False,
include_ext_grids=True,
include_coupling_points=False,
check_vm=True,
check_pressure=True,
check_t=True,
check_lp=True,
lex_objectives=False,
auto_priority_floor=True,
priority_safety_factor=10.0,
debug=False,
):
"""Create a minimal load-shedding optimisation problem for multi-energy grids.
Each demand/generator/coupling gets a regulation Var ∈ [0,1]; the objective
penalises ``(1-regulation)`` weighted per category. Gas Sink/Source shed is
energy-converted via the enclosing grid's HHV. External grids contribute
only via ``ext_grid_*_bounds`` constraints (and an optional quadratic slack
nudge to zero exchange when ``include_ext_grids=True``).
``lex_objectives`` (Pyomo only) solves in two phases: shed first, then
formulation-tightening aux terms with the phase-1 optimum pinned.
``auto_priority_floor`` (default True) scales ``demand_weight`` to
``α · A_max`` so the shed term dominates aux terms regardless of network size.
``include_coupling_points`` (default False) extends the demand-side of the
objective to coupling-point components (CHP / CHPHG / G2H / P2H control
nodes and the HG branch variants P2G / G2P / P2H_HG / G2H_HG). Each CP is
penalised at ``demand_weight · cp_input_rated_mw · (1 - regulation)`` -
i.e. treated like a load on its input carrier (gas or power).
"""
problem = OptimizationProblem(debug=debug, lex_objectives=lex_objectives)
# Mutable so the auto-priority-floor hook can retune at _apply time.
# ``scale`` is a multiplicative factor applied on top of both the
# default ``demand`` weight AND any per-load weight returned by
# ``weight_for_load`` - the auto-floor hook uses it to lift every
# demand-side weight uniformly when the minimum effective per-load
# weight would otherwise fall below ``α·A_max``. Default 1.0 makes
# it a no-op for legacy (no callback) usage.
_weights = {
"demand": float(demand_weight),
"generator": float(generator_weight),
"scale": 1.0,
}
problem.controllable_demands(REGULATION_ATTR)
problem.controllable_generators(REGULATION_ATTR)
problem.controllable_cps(REGULATION_ATTR)
problem.controllable_backup_lines()
if include_ext_grids:
problem.controllable_ext()
if include_storages:
problem.controllable_storages()
if auto_priority_floor:
problem._controllable_appliables.append(
_make_auto_priority_floor_hook(
_weights,
alpha=priority_safety_factor,
debug=debug,
max_line_loading=max_line_loading if check_lp else None,
weight_for_load=weight_for_load,
)
)
if check_vm:
problem.bounds(bounds_vm, lambda m, _: type(m) is Bus, ["vm_pu"])
if check_pressure:
problem.bounds(
bounds_pressure, lambda m, _: type(m) is Junction, ["pressure_pu"]
)
if check_t:
problem.bounds(
bounds_t,
lambda m, g: type(m) is Junction and type(g) is WaterGrid,
["t_pu"],
)
# Lookups go through _weights so the auto-priority-floor can retune.
# When ``weight_for_load`` is set and applicable, the per-load
# weight it returns takes precedence - multiplied by the auto-
# floor scale so per-load weights and the legacy demand weight
# share the same aux-dominance lift.
def weight_fn(model):
scale = _weights.get("scale", 1.0)
if weight_for_load is not None and isinstance(
model, _DEMAND_TYPES + _HE_OBJECTIVE_TYPES
):
try:
w = weight_for_load(model)
except Exception:
w = None
if w is not None:
return float(w) * scale
if isinstance(model, _DEMAND_TYPES):
return _weights["demand"] * scale
# CPs are penalised at the demand weight when include_coupling_points
# is on - the input draw is treated as a load on its input carrier.
if isinstance(model, _COUPLING_POINT_TYPES):
return _weights["demand"] * scale
# Bare HeatExchanger / PassiveHeatExchanger: route by sign of q_mw_set.
if isinstance(model, (HeatExchanger, PassiveHeatExchanger)):
q = getattr(model, "q_mw_set", 0)
if isinstance(q, (int, float)):
return (_weights["demand"] if q > 0 else _weights["generator"]) * scale
return _weights["demand"] * scale
if isinstance(model, _GENERATOR_TYPES):
return _weights["generator"]
return 1
objective_types = _DEMAND_TYPES + _GENERATOR_TYPES + _HE_OBJECTIVE_TYPES
if include_coupling_points:
objective_types = objective_types + _COUPLING_POINT_TYPES
def _is_objective_model(m):
# type() is needed for HeatExchanger to exclude SubHE (compound-internal).
return isinstance(m, objective_types) or type(m) is HeatExchanger
def _is_gas_grid(g):
# Water-grid Sinks/Sources are heating-loop mass flow, not gas demand;
# applying HHV would yield phantom MW-scale shed penalty.
if g is None:
return False
grids = g if isinstance(g, list) else [g]
return any(
gg is not None and hasattr(gg, "higher_heating_value_kwh_per_kg")
for gg in grids
)
# Populated by _objective_models, read by _data_attacher.
# model_to_grid - Sink/Source HHV lookup (gas grid only).
# model_to_cp_mw - CP nameplate input MW (when include_coupling_points).
model_to_grid: dict = {}
model_to_cp_mw: dict = {}
def _objective_models(network):
"""Like Objectives.select but filters water-grid Sink/Source (heating
loop ≠ gas demand) and (opt-in) folds in coupling-point components."""
model_to_grid.clear()
model_to_cp_mw.clear()
out = []
# Standard child/branch loads + generators + HX. Inactive/ignored
# components are excluded (mirroring Constraints.select and the CP
# loop below): their Vars are never registered with the backend, so
# e.g. an ignored HeatExchangerLoad's q_mw_delivered would otherwise
# leak a raw monee Var into the objective expression.
for component in network.all_components():
if not component.active or component.ignored:
continue
model = component.model
grid = getattr(component, "grid", None)
if not _is_objective_model(model):
continue
if isinstance(model, (Sink, Source)) and not _is_gas_grid(grid):
continue
# CP control nodes also surface via all_models_with_grid, but we
# need ``component`` (not just ``model``) to compute the rated MW
# via cp_input_rated_mw - handled in the dedicated loop below.
if isinstance(model, _COUPLING_POINT_TYPES):
continue
out.append(model)
model_to_grid[model] = grid
# Opt-in CP coverage: control nodes (in network.nodes) + branch CPs.
if include_coupling_points:
for component in list(network.nodes) + list(network.branches):
if not isinstance(component.model, _COUPLING_POINT_TYPES):
continue
if not component.active or component.ignored:
continue
cp = cp_input_rated_mw(component)
if cp is None:
continue
model = component.model
out.append(model)
model_to_grid[model] = getattr(component, "grid", None)
model_to_cp_mw[model] = cp[1]
return out
def _data_attacher(model):
weight = weight_fn(model)
gas_factor = None
cp_rated = None
if isinstance(model, (Sink, Source)):
gas_factor = _gas_mw_factor(model_to_grid.get(model))
if isinstance(model, _COUPLING_POINT_TYPES):
cp_rated = model_to_cp_mw.get(model)
return (weight, gas_factor, cp_rated)
objectives = Objectives()
objectives.with_models(_objective_models).data(_data_attacher).calculate(
_calc_objective
)
# Quadratic ext-grid slack toward zero exchange, weighted at
# demand_weight · EXT_SLACK_WEIGHT_RATIO. Off when include_ext_grids=False.
if include_ext_grids:
def _ext_slack_models(network):
out = []
for model in network.all_models():
if isinstance(model, (ExtPowerGrid, ExtHydrGrid)):
out.append(model)
return out
def _ext_slack_data(_model):
return _weights["demand"] * EXT_SLACK_WEIGHT_RATIO
def _ext_slack_calc(model_to_data):
total = 0
for model, weight in model_to_data.items():
if isinstance(model, ExtPowerGrid):
total = total + model.p_mw * model.p_mw * weight
elif isinstance(model, ExtHydrGrid):
total = total + model.mass_flow_kgs * model.mass_flow_kgs * weight
return total
objectives.with_models(_ext_slack_models).data(_ext_slack_data).calculate(
_ext_slack_calc
)
problem.objectives = objectives
constraints = Constraints()
if check_lp:
constraints.select_types(GenericPowerBranch).equation(
lambda m: line_loading_limit(m, "from", max_line_loading)
).equation(lambda m: line_loading_limit(m, "to", max_line_loading))
if include_ext_grids:
constraints.select_types(ExtPowerGrid).equation(
lambda m: m.p_mw >= bounds_ext_el[0]
).equation(lambda m: m.p_mw <= bounds_ext_el[1])
constraints.select(
lambda c: type(c.grid) is GasGrid and type(c.model) is ExtHydrGrid
).equation(lambda m: m.mass_flow_kgs >= bounds_ext_gas[0]).equation(
lambda m: m.mass_flow_kgs <= bounds_ext_gas[1]
)
constraints.select(
lambda c: type(c.grid) is WaterGrid and type(c.model) is ExtHydrGrid
).equation(lambda m: m.mass_flow_kgs >= bounds_ext_heat[0]).equation(
lambda m: m.mass_flow_kgs <= bounds_ext_heat[1]
)
if regulation_ramp_limit is not None:
constraints.regulation_ramp(regulation_ramp_limit)
problem.constraints = constraints
return problem