from abc import ABC, abstractmethod
import numpy as np
import monee.model.phys.nonlinear.hf as ohfmodel
from .core import BranchModel, Intermediate, IntermediateEq, Var, model
from .grid import GasGrid, PowerGrid, WaterGrid
[docs]
@model
class GenericPowerBranch(BranchModel):
def __init__(
self,
tap,
shift,
br_r_pu,
br_x_pu,
g_fr_pu,
b_fr_pu,
g_to_pu,
b_to_pu,
max_i_ka=3.19,
backup=False,
on_off=1,
**kwargs,
) -> None:
super().__init__()
self.tap = tap
self.shift = shift
self.br_r_pu = br_r_pu
self.br_x_pu = br_x_pu
self.g_fr_pu = g_fr_pu
self.b_fr_pu = b_fr_pu
self.g_to_pu = g_to_pu
self.b_to_pu = b_to_pu
self.max_i_ka = max_i_ka
self.backup = backup
self.on_off = on_off
self.p_from_mw = Var(1, name="p_from_mw")
self.q_from_mvar = Var(1, name="q_from_mvar")
self.i_from_ka = Var(1, min=0, name="i_from_ka")
self.loading_from_pu = Var(1, min=0, name="loading_from_pu")
self.p_to_mw = Var(1, name="p_to_mw")
self.q_to_mvar = Var(1, name="q_to_mvar")
self.i_to_ka = Var(1, min=0, name="i_to_ka")
self.loading_to_pu = Var(1, min=0, name="loading_to_pu")
@property
def loading_pu(self):
return max(self.loading_to_pu.value, self.loading_from_pu.value)
[docs]
def loss_percent(self):
return abs((self.p_from_mw.value - self.p_to_mw.value) / self.p_from_mw.value)
[docs]
def equations(self, grid: PowerGrid, from_node_model, to_node_model, **kwargs):
# loading_*_percent \leftrightarrow i_*_ka identity is owned by the branch formulation
# (AC: equality; MISOCP: derived from current_pu_squared post-solve).
return []
[docs]
@model
class PowerBranch(GenericPowerBranch, ABC):
def __init__(self, tap, shift, backup=False, on_off=1, **kwargs) -> None:
super().__init__(
tap, shift, 0, 0, 0, 0, 0, 0, backup=backup, on_off=on_off, **kwargs
)
self.tap = tap
self.shift = shift
self.p_from_mw = Var(1, name="p_from_mw")
self.q_from_mvar = Var(1, name="q_from_mvar")
self.p_to_mw = Var(1, name="p_to_mw")
self.q_to_mvar = Var(1, name="q_to_mvar")
[docs]
@abstractmethod
def calc_r_x(self, grid, from_node_model, to_node_model):
pass
[docs]
def equations(self, grid: PowerGrid, from_node_model, to_node_model, **kwargs):
self.br_r_pu, self.br_x_pu = self.calc_r_x(grid, from_node_model, to_node_model)
return super().equations(grid, from_node_model, to_node_model, **kwargs)
[docs]
@model
class PowerLine(PowerBranch):
def __init__(
self,
length_m,
r_ohm_per_m,
x_ohm_per_m,
parallel,
backup=False,
on_off=1,
**kwargs,
) -> None:
super().__init__(1, 0, backup=backup, on_off=on_off, **kwargs)
self.length_m = length_m
self.r_ohm_per_m = r_ohm_per_m
self.x_ohm_per_m = x_ohm_per_m
self.parallel = parallel
[docs]
def calc_r_x(self, grid: PowerGrid, from_node_model, to_node_model):
base_r = from_node_model.base_kv**2 / grid.sn_mva
br_r_pu = self.r_ohm_per_m * self.length_m / base_r / self.parallel
br_x_pu = self.x_ohm_per_m * self.length_m / base_r / self.parallel
return (br_r_pu, br_x_pu)
[docs]
@model
class Trafo(PowerBranch):
def __init__(
self, vk_percent=12.2, vkr_percent=0.25, sn_trafo_mva=160, shift=0
) -> None:
super().__init__(1, shift)
self.vk_percent = vk_percent
self.vkr_percent = vkr_percent
self.sn_trafo_mva = sn_trafo_mva
self.vn_trafo_lv = 1
[docs]
def calc_r_x(self, grid: PowerGrid, lv_model, hv_model):
tap_lv = np.square(lv_model.base_kv / hv_model.base_kv) * grid.sn_mva
z_sc = self.vk_percent / 100.0 / self.sn_trafo_mva * tap_lv
r_sc = self.vkr_percent / 100.0 / self.sn_trafo_mva * tap_lv
x_sc = np.sign(z_sc) * np.sqrt((z_sc**2 - r_sc**2).astype(float))
return (r_sc, x_sc)
[docs]
def equations(self, grid: PowerGrid, from_node_model, to_node_model, **kwargs):
self.tap = 1
return super().equations(grid, from_node_model, to_node_model, **kwargs)
[docs]
def sign(v):
return 1 if v >= 0 else -1
[docs]
@model
class WaterPipe(BranchModel):
def __init__(
self,
diameter_m,
length_m,
temperature_ext_k=283.15,
roughness_m=4.5e-05,
lambda_insulation_w_per_m_k=0.025,
insulation_thickness_m=0.12,
on_off=1,
friction=None,
unidirectional=False,
) -> None:
super().__init__()
self.diameter_m = diameter_m
self.length_m = length_m
self.temperature_ext_k = temperature_ext_k
self.roughness_m = roughness_m
self.lambda_insulation_w_per_m_k = lambda_insulation_w_per_m_k
self.insulation_thickness_m = insulation_thickness_m
self.on_off = on_off
self.unidirectional = unidirectional
self.mass_flow_kgs = Intermediate(0.1)
self.mass_flow_pos_kgs = Var(0.1, min=0, name="mass_flow_pos_kgs")
self.mass_flow_neg_kgs = Var(0.1, min=0, name="mass_flow_neg_kgs")
self.mass_flow_pos_kgs_squared = Var(0, min=0, name="mass_flow_pos_kgs_squared")
self.mass_flow_neg_kgs_squared = Var(0, min=0, name="mass_flow_neg_kgs_squared")
self.direction = Var(1, integer=True, min=0, max=1, name="direction")
self.velocity_mps = Var(1, min=-50, max=50, name="velocity_mps")
self.q_mw = Var(1e-6, name="q_mw")
# reynolds_scaled is stored as Re/1e6 (see REYNOLDS_SCALE); 1e-3 \approx laminar floor.
self.reynolds_scaled = Var(1e-3, min=0, max=10, name="reynolds_scaled")
self.t_from_pu = Var(1, min=0, max=2, name="t_from_pu")
self.t_to_pu = Var(1, min=0, max=2, name="t_to_pu")
# friction upper bound 7 covers the PWL leftmost breakpoint (Re=10, 64/10).
self.friction = (
Var(0.02, min=0, max=7, name="friction") if friction is None else friction
)
[docs]
def loss_percent(self):
mass_flow_kgs = abs(self.mass_flow_kgs.value)
if mass_flow_kgs == 0:
return 0
# Average fluid temperature in Kelvin from the per-unit endpoint temps
# (WaterGrid.t_ref_k is the per-unit reference; the branch has no grid here).
t_average_k = (
(self.t_from_pu.value + self.t_to_pu.value) / 2 * WaterGrid.t_ref_k
)
return (
abs(self.q_mw.value)
* 1e6
/ (mass_flow_kgs * ohfmodel.SPECIFIC_HEAT_CAP_WATER * t_average_k)
)
[docs]
def equations(self, grid: WaterGrid, from_node_model, to_node_model, **kwargs):
return [
IntermediateEq(
"mass_flow_kgs", self.mass_flow_pos_kgs - self.mass_flow_neg_kgs
)
]
[docs]
@model
class HeatExchanger(BranchModel):
def __init__(
self,
q_mw,
mass_flow_design_kgs=None,
T_delta_design_K=30, # NOSONAR
regulation=1,
) -> None:
super().__init__()
self._calc_mass_flow = False
self._T_delta_design_K = T_delta_design_K
self.on_off = 1
self.regulation = regulation
self.q_mw_set = -q_mw
self.q_mw = Var(0, name="q_mw")
# Numeric design duty for warm-starting. ``q_mw`` is a Var when the
# surrounding network sizes the HE (SubHE in a compound), but its
# initial value is still the design setpoint, so it seeds the flow and
# temperature vars either way.
self._q_design_mw = q_mw.value if isinstance(q_mw, Var) else q_mw
# Design through-flow at the nominal dT. Seeding the flow away from zero
# matters: at m=0 the fixed-q energy balance needs t_out -> inf, which
# rails the temperature vars and stalls IPOPT. Whether that degenerate
# start converges is linear-solver/platform dependent, so a consistent
# warm start is what keeps the smooth NLP solving across builds.
self._mass_flow_seed_kgs = (
abs(self._q_design_mw * 1e6)
/ (ohfmodel.SPECIFIC_HEAT_CAP_WATER * T_delta_design_K)
if self._q_design_mw
else 0.1
)
if mass_flow_design_kgs is None:
if isinstance(q_mw, (int, float)):
mass_flow_design_kgs = self._mass_flow_seed_kgs
else:
mass_flow_design_kgs = Var(0, name="mass_flow_design_kgs")
self._calc_mass_flow = True
self.mass_flow_design_kgs = mass_flow_design_kgs
# Flow runs in the neg direction (the formulations pin mass_flow_pos to
# 0), so the magnitude lives on mass_flow_neg_kgs.
self.mass_flow_kgs = Intermediate(-self._mass_flow_seed_kgs)
self.mass_flow_pos_kgs = Var(0, min=0, name="mass_flow_pos_kgs")
self.mass_flow_neg_kgs = Var(
self._mass_flow_seed_kgs, min=0, name="mass_flow_neg_kgs"
)
self.direction = Var(0, integer=True, min=0, max=1, name="direction")
self.t_from_pu = Var(1, min=0, max=2, name="t_from_pu")
self.t_to_pu = Var(1, min=0, max=2, name="t_to_pu")
[docs]
def equations(self, grid: WaterGrid, from_node_model, to_node_model, **kwargs):
eqs = [
IntermediateEq(
"mass_flow_kgs", self.mass_flow_pos_kgs - self.mass_flow_neg_kgs
),
]
if self._calc_mass_flow:
eqs.append(
self.mass_flow_design_kgs # NOSONAR
== -self.q_mw
* 1e6
/ (ohfmodel.SPECIFIC_HEAT_CAP_WATER * self._T_delta_design_K)
)
else:
eqs.append(self.q_mw == self.q_mw_set * self.regulation)
return eqs
[docs]
@model
class HeatExchangerLoad(HeatExchanger):
def __init__(self, q_mw, mass_flow_design_kgs=None, regulation=1) -> None:
super().__init__(
q_mw, mass_flow_design_kgs=mass_flow_design_kgs, regulation=regulation
)
[docs]
@model
class HeatExchangerGenerator(HeatExchanger):
def __init__(self, q_mw, mass_flow_design_kgs=None, regulation=1) -> None:
super().__init__(
q_mw, mass_flow_design_kgs=mass_flow_design_kgs, regulation=regulation
)
[docs]
@model
class PassiveHeatExchanger(BranchModel):
r"""
Passive heat exchanger injecting/extracting fixed ``q_mw`` into a free-flowing
water branch. Mass flow is determined by surrounding hydraulics; temperature
change follows from q_mw and actual mass flow.
Sign: positive q_mw = load, negative = generator.
Hydraulics: by default the pressure drop is Darcy-Weisbach friction over
``length_m``. Passing ``loss_coefficient`` (zeta) instead switches to the
pandapipes ``heat_exchanger`` model - a zero-length minor loss
:math:`\Delta p = \zeta \cdot \tfrac{\rho}{2} v^2` (``length_m`` / friction
are then ignored; ``loss_coefficient=0`` is a lossless heat injector).
"""
def __init__(
self,
q_mw,
diameter_m,
roughness_m=0.0001,
length_m=2.5,
temperature_ext_k=293,
regulation=1,
friction=None,
loss_coefficient=None,
) -> None:
super().__init__()
self.diameter_m = diameter_m
self.temperature_ext_k = temperature_ext_k
self.roughness_m = roughness_m
self.length_m = length_m
self.loss_coefficient = loss_coefficient
self.limit = 0.1
self.regulation = regulation
self.on_off = 1
self.q_mw_set = -q_mw
self.q_mw = Var(-1e-3, name="q_mw")
self.mass_flow_kgs = Intermediate(0.1)
self.mass_flow_pos_kgs = Var(0, min=0, name="mass_flow_pos_kgs")
self.mass_flow_neg_kgs = Var(0, min=0, name="mass_flow_neg_kgs")
self.mass_flow_pos_kgs_squared = Var(0, min=0, name="mass_flow_pos_kgs_squared")
self.mass_flow_neg_kgs_squared = Var(0, min=0, name="mass_flow_neg_kgs_squared")
self.direction = Var(0, integer=True, min=0, max=1, name="direction")
self.velocity_mps = Var(1, min=-50, max=50, name="velocity_mps")
# reynolds_scaled = Re/1e6 (see REYNOLDS_SCALE); 1e-3 \approx laminar floor.
self.reynolds_scaled = Var(1e-3, min=0, max=10, name="reynolds_scaled")
self.t_from_pu = Var(1, min=0, max=2, name="t_from_pu")
self.t_to_pu = Var(1, min=0, max=2, name="t_to_pu")
# friction upper bound 7 covers the PWL leftmost breakpoint (Re=10).
self.friction = (
Var(0.01, min=0, max=7, name="friction") if friction is None else friction
)
[docs]
def equations(self, grid: WaterGrid, from_node_model, to_node_model, **kwargs):
return [
IntermediateEq(
"mass_flow_kgs", self.mass_flow_pos_kgs - self.mass_flow_neg_kgs
),
self.q_mw == self.q_mw_set * self.regulation,
]
[docs]
@model
class PassiveHeatExchangerLoad(PassiveHeatExchanger):
"""Passive heat exchanger that consumes heat (load, ``q_mw > 0``)."""
def __init__(
self, q_mw, diameter_m, temperature_ext_k=293, loss_coefficient=None
) -> None:
super().__init__(
q_mw,
diameter_m,
temperature_ext_k=temperature_ext_k,
loss_coefficient=loss_coefficient,
)
[docs]
@model
class PassiveHeatExchangerGenerator(PassiveHeatExchanger):
"""Passive heat exchanger that injects heat (generator, ``q_mw < 0``)."""
def __init__(
self, q_mw, diameter_m, temperature_ext_k=293, loss_coefficient=None
) -> None:
super().__init__(
q_mw,
diameter_m,
temperature_ext_k=temperature_ext_k,
loss_coefficient=loss_coefficient,
)
[docs]
@model
class GasPipe(BranchModel):
def __init__(
self,
diameter_m,
length_m,
temperature_ext_k=296.15,
roughness_m=0.0001,
on_off=1,
friction=None,
) -> None:
super().__init__()
self.diameter_m = diameter_m
self.length_m = length_m
self.temperature_ext_k = temperature_ext_k
self.roughness_m = roughness_m
self.on_off = on_off
self.mass_flow_kgs = Intermediate(0.1)
self.mass_flow_pos_kgs = Var(0, min=0, name="mass_flow_pos_kgs")
self.mass_flow_neg_kgs = Var(0, min=0, name="mass_flow_neg_kgs")
self.mass_flow_pos_kgs_squared = Var(0, min=0, name="mass_flow_pos_kgs_squared")
self.mass_flow_neg_kgs_squared = Var(0, min=0, name="mass_flow_neg_kgs_squared")
self.direction = Var(0, integer=True, min=0, max=1)
self.velocity_mps = Var(1, min=-100, max=100, name="velocity_mps")
# reynolds_scaled = Re/1e6 (see REYNOLDS_SCALE); 1e-3 \approx laminar floor.
self.reynolds_scaled = Var(1e-3, min=0, max=10, name="reynolds_scaled")
self.gas_density_kg_per_m3 = Var(
1, min=0, max=100, name="gas_density_kg_per_m3"
)
self.friction = (
Var(0.02, min=0, max=7, name="friction") if friction is None else friction
)
self.q_mw = 0
[docs]
def equations(self, grid: GasGrid, from_node_model, to_node_model, **kwargs):
return [
IntermediateEq(
"mass_flow_kgs", self.mass_flow_pos_kgs - self.mass_flow_neg_kgs
)
]
[docs]
@model
class GasCompressor(BranchModel):
"""
Ideal compressor - fixed pressure ratio, unidirectional (suction → discharge).
Forward flow lives in ``mass_flow_neg_kgs`` to match GasPipe's Weymouth convention.
"""
def __init__(self, compression_ratio=1.5, max_flow_kgs=10.0) -> None:
super().__init__()
self.compression_ratio = compression_ratio
self.max_flow_kgs = max_flow_kgs
self.mass_flow_kgs = Intermediate(0.1)
self.mass_flow_neg_kgs = Var(
0.1, min=0, max=max_flow_kgs, name="mass_flow_neg_kgs"
)
self.on_off = 1
[docs]
def equations(self, grid: GasGrid, from_node_model, to_node_model, **kwargs):
p_sq_from = from_node_model.vars["pressure_squared_pu"]
p_sq_to = to_node_model.vars["pressure_squared_pu"]
return [
IntermediateEq("mass_flow_kgs", -self.mass_flow_neg_kgs),
self.compression_ratio**2 * p_sq_from == p_sq_to,
]