Source code for monee.network.mes

import random

import networkx as nx
from geopy import distance

import monee.express as mx
import monee.model as mm
import monee.model.phys.core.hydraulics as hyd
from monee.model.grid import DEFAULT_GAS_HHV_MJ_PER_KG

REF_PA = 1000000
REF_TEMP = 356
DEFAULT_LENGTH = 100

# Gas HHV [MJ/kg] for sizing gas demands from power magnitudes. Sourced from the
# lgas grid definition so the sizing uses the SAME heating value as the coupling
# physics (see monee.model.grid.DEFAULT_GAS_HHV_MJ_PER_KG).
GAS_HHV_MJ_PER_KG = DEFAULT_GAS_HHV_MJ_PER_KG


def _node_power_load_mw(power_net: mm.Network, node):
    p_mw = 0.0
    for child in power_net.childs_by_ids(node.child_ids):
        if isinstance(child.model, mm.PowerLoad):
            p_mw += float(getattr(child.model, "p_mw", 0.0) or 0.0)
    return p_mw


def _node_power_gen_mw(power_net: mm.Network, node):
    """|generation| at *node* from PowerGenerator + GridFormingGenerator.
    Excludes ExtPowerGrid (its p_mw is a free slack Var pre-solve)."""
    p_mw = 0.0
    for child in power_net.childs_by_ids(node.child_ids):
        model = child.model
        if isinstance(model, mm.PowerGenerator):
            p_mw += abs(float(getattr(model, "p_mw", 0.0) or 0.0))
        elif isinstance(model, mm.GridFormingGenerator):
            p_mw += abs(float(model.p_mw.max or 0.0))
    return p_mw


[docs] def get_length( net: mm.Network, branch, node1_id, node2_id, default_length=DEFAULT_LENGTH ): if hasattr(branch.model, "length_m"): return branch.model.length_m node1 = net.node_by_id(node1_id) node2 = net.node_by_id(node2_id) if node1.position is None or node2.position is None: return default_length return distance.distance(node1.position, node2.position).m
[docs] def create_heat_net_for_power( power_net, target_net, heat_deployment_rate, mass_flow_rate_kgs=0.075, default_diameter_m=0.12, length_scale=1, default_length=DEFAULT_LENGTH, power_scale=1, design_flow_headroom=1.5, seed=None, ): rng = random.Random(seed) if seed is not None else random heat_grid = mm.create_water_grid("water", t_ref_k=REF_TEMP, pressure_ref_pa=REF_PA) target_net.set_default_grid("water", heat_grid) power_net_as_st = mm.to_spanning_tree(power_net) bus_index_to_junction_index = {} bus_index_to_end_junction_index = {} node_demand_kgs = {} for node in power_net_as_st.nodes: junc_id = mx.create_junction(target_net, position=node.position, grid=heat_grid) sink_mass_flow = mass_flow_rate_kgs + rng.random() * mass_flow_rate_kgs / 10 node_demand_kgs[node.id] = sink_mass_flow mx.create_sink( target_net, junc_id, mass_flow_kgs=sink_mass_flow, ) bus_index_to_junction_index[node.id] = junc_id bus_index_to_end_junction_index[node.id] = junc_id deployment_c_value = rng.random() if deployment_c_value < heat_deployment_rate: bus_index_to_end_junction_index[node.id] = mx.create_junction( target_net, position=node.position, grid=heat_grid ) # Passive: an in-line device on the distribution run - the tree # flow is already fixed by the downstream sinks, so a design-flow # prescribing HeatExchanger would over-determine the hydraulics. mx.create_passive_heat_exchanger( target_net, from_node_id=bus_index_to_junction_index[node.id], to_node_id=bus_index_to_end_junction_index[node.id], q_mw=(-1 if rng.random() > 0.8 else 1) * -0.1 * rng.random() * power_scale, diameter_m=default_diameter_m, ) end_sink_mass_flow = ( mass_flow_rate_kgs + rng.random() * mass_flow_rate_kgs / 10 ) node_demand_kgs[node.id] += end_sink_mass_flow mx.create_sink( target_net, bus_index_to_end_junction_index[node.id], mass_flow_kgs=end_sink_mass_flow, ) # Capacity-based trunk sizing (same design as the auto_diameter mode of # create_heat_supply_return_net_for_power): every tree pipe must carry the # cumulative downstream sink demand, so size its diameter for that flow at # the grid's design velocity_mps - floored at default_diameter_m so leaf pipes # are never thinned. Without this, a flat default diameter (or the grid # max_mass_flow_kgs) caps the slack trunk below total demand and the net is infeasible # by construction. root = power_net_as_st.first_node() parent = {root: None} undirected = nx.Graph(power_net_as_st.graph) for p, c in nx.bfs_edges(undirected, source=root): parent[c] = p subtree_kgs = dict(node_demand_kgs) for c in reversed(list(nx.topological_sort(nx.bfs_tree(undirected, root)))): if parent.get(c) is not None: subtree_kgs[parent[c]] += subtree_kgs[c] def _trunk_diameter(downstream_kgs): d = hyd.calc_min_diameter_for_mass_flow( downstream_kgs * design_flow_headroom, heat_grid.fluid_density_kg_per_m3, heat_grid.v_max_mps, ) return max(d, default_diameter_m) for branch in power_net_as_st.branches: # The endpoint whose tree-parent is the other endpoint sits downstream. if parent.get(branch.to_node_id) == branch.from_node_id: downstream = subtree_kgs[branch.to_node_id] else: downstream = subtree_kgs[branch.from_node_id] from_node_id = bus_index_to_end_junction_index[branch.from_node_id] to_node_id = bus_index_to_junction_index[branch.to_node_id] mx.create_water_pipe( target_net, from_node_id=from_node_id, to_node_id=to_node_id, diameter_m=_trunk_diameter(downstream), length_m=get_length( target_net, branch, from_node_id, to_node_id, default_length=default_length, ) * length_scale, temperature_ext_k=296.15, roughness_m=0.001, grid=heat_grid, ) mx.create_ext_hydr_grid( target_net, node_id=bus_index_to_junction_index[root], t_k=REF_TEMP, name="Grid Connection Heat", ) # max_mass_flow_kgs is the grid-level per-branch cap: it must at least admit the slack # trunk's design flow or the sized diameters are moot. Leaf pipes stay # tightly capped through their velocity_mps-based per-pipe bound. heat_grid.max_mass_flow_kgs = max( heat_grid.max_mass_flow_kgs, design_flow_headroom * subtree_kgs[root] ) return (bus_index_to_junction_index, bus_index_to_end_junction_index)
[docs] def create_gas_net_for_power( power_net, target_net: mm.Network, gas_deployment_rate, scaling=1, source_scaling=1, default_diameter_m=0.3, length_scale=1, default_length=100, seed=None, gas_type="lgas", ): rng = random.Random(seed) if seed is not None else random gas_grid = mm.create_gas_grid( "gas", type=gas_type, t_ref_k=REF_TEMP, pressure_ref_pa=REF_PA ) target_net.set_default_grid("gas", gas_grid) power_net_as_st = mm.to_spanning_tree(power_net) bus_index_to_junction_index = {} for node in power_net_as_st.nodes: junc_id = mx.create_junction(target_net, position=node.position, grid=gas_grid) bus_index_to_junction_index[node.id] = junc_id for branch in power_net_as_st.branches: from_node_id = bus_index_to_junction_index[branch.from_node_id] to_node_id = bus_index_to_junction_index[branch.to_node_id] mx.create_gas_pipe( target_net, from_node_id=from_node_id, to_node_id=to_node_id, diameter_m=default_diameter_m * scaling, length_m=get_length( target_net, branch, from_node_id, to_node_id, default_length=default_length, ) * length_scale, grid=gas_grid, ) for node in power_net_as_st.nodes: deployment_c_value = rng.random() if deployment_c_value <= gas_deployment_rate: mx.create_sink( target_net, bus_index_to_junction_index[node.id], mass_flow_kgs=round(0.1 + 0.5 * rng.random() * scaling, 2), ) mx.create_source( target_net, node_id=bus_index_to_junction_index[power_net_as_st.first_node()], mass_flow_kgs=10 * scaling * source_scaling, ) mx.create_ext_hydr_grid( target_net, node_id=bus_index_to_junction_index[power_net_as_st.first_node()], t_k=REF_TEMP, name="Grid Connection Gas", ) return bus_index_to_junction_index
[docs] def create_p2h_in_combined_generated_network( new_mes_net: mm.Network, net_power, bus_to_heat_junc, end_bus_to_heat_junc, p2h_density, seed=None, ): rng = random.Random(seed) if seed is not None else random for power_node in net_power.nodes: heat_junc = bus_to_heat_junc[power_node.id] heat_junc_two = end_bus_to_heat_junc[power_node.id] if ( rng.random() <= p2h_density and heat_junc != heat_junc_two and new_mes_net.has_branch_between(heat_junc, heat_junc_two) ): new_mes_net.remove_branch_between(heat_junc, heat_junc_two) mx.create_p2h( new_mes_net, power_node_id=power_node.id, heat_node_id=heat_junc_two, heat_return_node_id=heat_junc, heat_energy_mw=0.015, diameter_m=0.003, efficiency=0.4 * 0.5 * 0.5, )
[docs] def create_chp_in_combined_generated_network( new_mes_net: mm.Network, net_power, bus_to_heat_junc, end_bus_to_heat_junc, bus_to_gas_junc, chp_density, seed=None, ): rng = random.Random(seed) if seed is not None else random for power_node in net_power.nodes: heat_junc = bus_to_heat_junc[power_node.id] heat_junc_two = end_bus_to_heat_junc[power_node.id] gas_junc = bus_to_gas_junc[power_node.id] efficiency = 0.8 + rng.random() / 10 if ( rng.random() <= chp_density and heat_junc != heat_junc_two and new_mes_net.has_branch_between(heat_junc, heat_junc_two) ): new_mes_net.remove_branch_between(heat_junc, heat_junc_two) mx.create_chp( new_mes_net, power_node_id=power_node.id, heat_node_id=heat_junc_two, heat_return_node_id=heat_junc, gas_node_id=gas_junc, mass_flow_setpoint_kgs=0.015 * rng.random(), diameter_m=0.035, efficiency_power=efficiency / 2, efficiency_heat=efficiency / 2, )
[docs] def create_p2g_in_combined_generated_network( new_mes_net, net_power, bus_to_gas_junc, p2g_density, seed=None ): rng = random.Random(seed) if seed is not None else random for power_node in net_power.nodes: gas_junc = bus_to_gas_junc[power_node.id] if rng.random() <= p2g_density: mx.create_p2g( new_mes_net, from_node_id=power_node.id, to_node_id=gas_junc, efficiency=0.7, mass_flow_setpoint_kgs=0.045 * rng.random(), )
[docs] def generate_mes_based_on_power_net( net_power: mm.Network, heat_deployment_rate, gas_deployment_rate, chp_density=0.1, p2g_density=0.02, p2h_density=0.1, seed=None, ): # Derive a distinct sub-seed per builder so a single `seed` makes the whole # generation deterministic without coupling the per-builder RNG streams. if seed is not None: ss = random.Random(seed) heat_seed, gas_seed, p2h_seed, chp_seed, p2g_seed = ( ss.random() for _ in range(5) ) else: heat_seed = gas_seed = p2h_seed = chp_seed = p2g_seed = None new_mes_net = net_power.copy() bus_to_heat_junc, end_bus_to_heat_junc = create_heat_net_for_power( net_power, new_mes_net, heat_deployment_rate, seed=heat_seed ) bus_to_gas_junc = create_gas_net_for_power( net_power, new_mes_net, gas_deployment_rate, seed=gas_seed ) create_p2h_in_combined_generated_network( new_mes_net, net_power, bus_to_heat_junc, end_bus_to_heat_junc, p2h_density, seed=p2h_seed, ) create_chp_in_combined_generated_network( new_mes_net, net_power, bus_to_heat_junc, end_bus_to_heat_junc, bus_to_gas_junc, chp_density, seed=chp_seed, ) create_p2g_in_combined_generated_network( new_mes_net, net_power, bus_to_gas_junc, p2g_density, seed=p2g_seed ) return new_mes_net
[docs] def create_monee_benchmark_net(): random.seed(9002) pn = mm.Network(el_model=mm.PowerGrid(name="power", sn_mva=100)) node_0 = pn.node( mm.Bus(base_kv=120), mm.EL, child_ids=[pn.child(mm.PowerGenerator(p_mw=10, q_mvar=0, regulation=0.5))], ) node_1 = pn.node( mm.Bus(base_kv=120), mm.EL, child_ids=[pn.child(mm.ExtPowerGrid(p_mw=10, q_mvar=0, vm_pu=1, va_radians=0))], ) node_2 = pn.node( mm.Bus(base_kv=120), mm.EL, child_ids=[pn.child(mm.PowerLoad(p_mw=10, q_mvar=0))], ) node_3 = pn.node( mm.Bus(base_kv=120), mm.EL, child_ids=[pn.child(mm.PowerLoad(p_mw=20, q_mvar=0))], ) node_4 = pn.node( mm.Bus(base_kv=120), mm.EL, child_ids=[pn.child(mm.PowerLoad(p_mw=20, q_mvar=0))], ) node_5 = pn.node( mm.Bus(base_kv=120), mm.EL, child_ids=[pn.child(mm.PowerGenerator(p_mw=30, q_mvar=0, regulation=0.5))], ) node_6 = pn.node( mm.Bus(base_kv=120), mm.EL, child_ids=[pn.child(mm.GridFormingGenerator(p_mw_max=50, q_mvar_max=50))], ) max_i_ka = 319 pn.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, max_i_ka=max_i_ka, parallel=1, ), node_0, node_1, ) pn.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, max_i_ka=max_i_ka, parallel=1, ), node_1, node_2, ) pn.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, max_i_ka=max_i_ka, parallel=1, ), node_1, node_5, ) pn.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, max_i_ka=max_i_ka, parallel=1, ), node_2, node_3, ) pn.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, max_i_ka=max_i_ka, parallel=1, ), node_3, node_4, ) pn.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, max_i_ka=max_i_ka, parallel=1, ), node_3, node_6, ) new_mes = pn.copy() # gas bus_to_gas_junc = create_gas_net_for_power(pn, new_mes, 1, scaling=1) # # # heat bus_index_to_junction_index, _ = create_heat_net_for_power( pn, new_mes, 1, mass_flow_rate_kgs=1, default_diameter_m=0.16 ) new_water_junc = mx.create_water_junction(new_mes) mx.create_sink( new_mes, new_water_junc, mass_flow_kgs=0.05, ) new_water_junc_2 = mx.create_water_junction(new_mes) mx.create_sink( new_mes, new_water_junc_2, mass_flow_kgs=0.05, ) mx.create_passive_heat_exchanger( new_mes, from_node_id=new_water_junc, to_node_id=new_water_junc_2, q_mw=0.03, diameter_m=0.16, ) new_water_junc_3 = mx.create_water_junction(new_mes) mx.create_sink( new_mes, new_water_junc_3, mass_flow_kgs=0.06, ) mx.create_passive_heat_exchanger( new_mes, from_node_id=new_water_junc_2, to_node_id=new_water_junc_3, q_mw=0.03, diameter_m=0.16, ) mx.create_p2g( new_mes, from_node_id=node_4, to_node_id=bus_to_gas_junc[node_4], efficiency=0.7, mass_flow_setpoint_kgs=1, regulation=0, ) mx.create_chp( new_mes, power_node_id=node_1, heat_node_id=bus_index_to_junction_index[node_0], heat_return_node_id=new_water_junc, gas_node_id=bus_to_gas_junc[node_3], mass_flow_setpoint_kgs=0.005, diameter_m=0.1, efficiency_power=0.5, efficiency_heat=0.5, regulation=1, ) mx.create_g2p( new_mes, from_node_id=bus_to_gas_junc[node_1], to_node_id=node_1, efficiency=0.9, p_mw_setpoint=2, regulation=0, ) mx.create_g2p( new_mes, from_node_id=bus_to_gas_junc[node_6], to_node_id=node_6, efficiency=0.9, p_mw_setpoint=1, regulation=0, ) new_mes.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, parallel=1, backup=True, on_off=0, max_i_ka=max_i_ka, ), node_4, node_0, ) new_mes.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.00007, x_ohm_per_m=0.00007, parallel=1, backup=True, on_off=0, max_i_ka=max_i_ka, ), node_5, node_2, ) return new_mes
[docs] def create_mv_multi_cigre(): import pandapower.networks as pn from monee.io.from_pandapower import from_pandapower_net random.seed(9004) pnet = pn.create_cigre_network_mv(with_der="pv_wind") monee_net = from_pandapower_net(pnet) new_mes = monee_net.copy() create_gas_net_for_power( monee_net, new_mes, 1, source_scaling=1, default_diameter_m=0.64, length_scale=0.001, default_length=100000, # This reference grid was tuned for the pre-2026 methane-like gas; keep # it on that gas so the bench net stays feasible under realistic lgas. gas_type="methane", ) create_heat_net_for_power( monee_net, new_mes, 0.5, mass_flow_rate_kgs=25, default_diameter_m=0.68, power_scale=100, length_scale=0.001, default_length=100000, ) mx.create_power_generator(new_mes, 5, 2, 1) mx.create_power_generator(new_mes, 6, 3, 1) mx.create_p2g( new_mes, from_node_id=4, to_node_id=21, efficiency=0.7, mass_flow_setpoint_kgs=0.5, regulation=0.1, ) mx.create_chp( new_mes, power_node_id=2, heat_node_id=43, heat_return_node_id=44, gas_node_id=25, mass_flow_setpoint_kgs=0.5, diameter_m=0.3, efficiency_power=0.58, efficiency_heat=0.4, regulation=1, remove_existing_branch=True, ) new_mes.branch( mm.PowerLine( length_m=100, r_ohm_per_m=0.007, x_ohm_per_m=0.007, parallel=1, backup=True, on_off=0, max_i_ka=319, ), 5, 2, ) return new_mes
def _add_gas_mesh_pipes( target_net, bus_index_to_junction_index, gas_grid, extra_mesh_pipes, mesh_diameter_factor, mesh_length_factor, default_diameter_m, default_length, length_scale, mesh_seed, ): # Add resilience tie pipes: pick non-adjacent junction pairs uniformly # at random and connect them with a smaller-diameter / longer pipe. # The point is to give every junction more than one path to the # slack so single pipe failures don't isolate large subtrees - the # main reason additive CPs underperform on a tree layout. rng = random.Random(mesh_seed) if mesh_seed is not None else random junctions = list(bus_index_to_junction_index.values()) existing_pairs = set() for branch in target_net.branches: if isinstance(branch.model, mm.GasPipe): a, b = branch.from_node_id, branch.to_node_id existing_pairs.add(frozenset((a, b))) candidates = [ (j1, j2) for i, j1 in enumerate(junctions) for j2 in junctions[i + 1 :] if frozenset((j1, j2)) not in existing_pairs ] rng.shuffle(candidates) for j1, j2 in candidates[:extra_mesh_pipes]: mx.create_gas_pipe( target_net, from_node_id=j1, to_node_id=j2, diameter_m=default_diameter_m * mesh_diameter_factor, length_m=default_length * mesh_length_factor * length_scale, grid=gas_grid, )
[docs] def create_gas_tree_net_for_power( power_net: mm.Network, target_net: mm.Network, gas_load_share=3.0, gas_gen_share=1.0, default_diameter_m=0.3, length_scale=1, default_length=DEFAULT_LENGTH, min_load_kgs=1.8e-4, min_source_kgs=1.8e-4, slack_node_id=None, extra_mesh_pipes=0, mesh_diameter_factor=0.5, mesh_length_factor=2.0, mesh_seed=None, ): """Build a gas grid on the spanning tree of ``power_net``. Loads/generators are sized from electrical magnitudes via ``gas_load_share`` / ``gas_gen_share`` and the HHV; ``extra_mesh_pipes`` add tie-lines so a single failure doesn't isolate a subtree. ``min_load_kgs`` / ``min_source_kgs`` floor per-bus values at ~10 kW thermal. Returns ``{power_node_id → gas_junction_id}``. """ gas_grid = mm.create_gas_grid( "gas", type="lgas", t_ref_k=REF_TEMP, pressure_ref_pa=REF_PA ) target_net.set_default_grid("gas", gas_grid) # HHV [MJ/kg] of this grid's gas fluid (sizing matches the gas physics). gas_hhv_mj = gas_grid.higher_heating_value_kwh_per_kg * 3.6 power_net_as_st = mm.to_spanning_tree(power_net) bus_index_to_junction_index = {} for node in power_net_as_st.nodes: bus_index_to_junction_index[node.id] = mx.create_junction( target_net, position=node.position, grid=gas_grid ) for branch in power_net_as_st.branches: from_id = bus_index_to_junction_index[branch.from_node_id] to_id = bus_index_to_junction_index[branch.to_node_id] mx.create_gas_pipe( target_net, from_node_id=from_id, to_node_id=to_id, diameter_m=default_diameter_m, length_m=get_length( target_net, branch, from_id, to_id, default_length=default_length, ) * length_scale, grid=gas_grid, ) for node in power_net_as_st.nodes: p_load_mw = _node_power_load_mw(power_net, node) if p_load_mw > 0 and gas_load_share > 0: mass_flow_kgs = max(min_load_kgs, p_load_mw * gas_load_share / gas_hhv_mj) mx.create_sink( target_net, bus_index_to_junction_index[node.id], mass_flow_kgs=round(mass_flow_kgs, 4), ) p_gen_mw = _node_power_gen_mw(power_net, node) if p_gen_mw > 0 and gas_gen_share > 0: mass_flow_kgs = max(min_source_kgs, p_gen_mw * gas_gen_share / gas_hhv_mj) mx.create_source( target_net, node_id=bus_index_to_junction_index[node.id], mass_flow_kgs=round(mass_flow_kgs, 4), ) if extra_mesh_pipes > 0: _add_gas_mesh_pipes( target_net, bus_index_to_junction_index, gas_grid, extra_mesh_pipes=extra_mesh_pipes, mesh_diameter_factor=mesh_diameter_factor, mesh_length_factor=mesh_length_factor, default_diameter_m=default_diameter_m, default_length=default_length, length_scale=length_scale, mesh_seed=mesh_seed, ) slack_node = ( slack_node_id if slack_node_id is not None else power_net_as_st.first_node() ) mx.create_ext_hydr_grid( target_net, node_id=bus_index_to_junction_index[slack_node], t_k=REF_TEMP, name="Grid Connection Gas", ) return bus_index_to_junction_index
[docs] def create_heat_supply_return_net_for_power( power_net: mm.Network, target_net: mm.Network, heat_load_share=1.0, heat_gen_share=0.5, default_diameter_m=0.12, return_diameter_m=None, length_scale=1, default_length=DEFAULT_LENGTH, return_length_m=None, min_load_mw=0.005, min_gen_mw=0.005, slack_node_id=None, node_based_heat_loads=False, balance_gen_to_load=True, heat_plant_mode="closing_pipe", return_t_k=REF_TEMP - 30, return_pressure_pu=0.95, return_pin_temperature=False, node_heat_gen_share=1.0, supply_slack_t_k=REF_TEMP, auto_diameter=False, auto_diameter_v_mps=None, auto_diameter_headroom=1.5, auto_min_diameter_m=None, ): """Build a supply/return DHS grid on the spanning tree of ``power_net``. Supply junctions mirror the buses; one shared return junction collects all consumer returns and feeds the generators. HX-Load and HX-Gen branches bridge supply and return. ``heat_plant_mode``: * ``"closing_pipe"`` (default) - single supply slack + return-to-supply closing pipe. Robust under the LinearHeatExchanger formulation but the slack t-pin collapses some supply/return ΔT. * ``"two_port"`` - second slack at the return junction. Cleaner physics but needs McCormick-DHS (``node_based_heat_loads=True``). * ``"screening"`` - closing pipe plus an oversized return Sink; faster but mass flow is no longer physical. Capacities scale from electrical magnitudes via ``heat_load_share`` / ``heat_gen_share``; ``balance_gen_to_load`` rescales generators to match. In node-based mode ``node_heat_gen_share`` distributes :class:`HeatGenerator` children at every PowerGenerator bus (set to 0.0 for slack-only). ``auto_diameter`` (default off) sizes each supply pipe - and the return / closing pipe - to the mass flow it must carry instead of the flat ``default_diameter_m``. The required flow is the cumulative downstream consumer demand (the same per-pipe ``subtree`` total used for the McCormick-DHS envelopes), so trunk pipes near the slack come out wider than leaf pipes. Each diameter is the smallest that keeps the design velocity_mps at ``auto_diameter_v_mps`` (defaults to the grid's ``v_max_mps``) after a ``auto_diameter_headroom`` margin on the flow, floored at ``auto_min_diameter_m`` (defaults to ``default_diameter_m`` so leaf pipes are never thinned below the flat default). Without it, a large radial DHS (e.g. simbench MV-urban: ~256 kg/s through 0.12 m pipes) is physically infeasible - the trunk flow exceeds the velocity_mps cap and the Darcy pressure drop blows up. Returns ``({power_node_id → supply_junction_id}, return_junction_id)``. """ heat_grid = mm.create_water_grid("water", t_ref_k=REF_TEMP, pressure_ref_pa=REF_PA) if node_based_heat_loads: # Tighten McCormick-DHS envelopes heat_grid.t_pu_min_env = 0.75 heat_grid.t_pu_max_env = 1.15 heat_grid.v_max_mps = 2.0 target_net.set_default_grid("water", heat_grid) auto_v = ( auto_diameter_v_mps if auto_diameter_v_mps is not None else heat_grid.v_max_mps ) auto_floor_m = ( auto_min_diameter_m if auto_min_diameter_m is not None else default_diameter_m ) def _auto_diameter(mass_flow_kgs): """Capacity-sized diameter [m] for ``mass_flow_kgs``, floored.""" d = hyd.calc_min_diameter_for_mass_flow( mass_flow_kgs * auto_diameter_headroom, heat_grid.fluid_density_kg_per_m3, auto_v, ) return max(d, auto_floor_m) power_net_as_st = mm.to_spanning_tree(power_net) # Orient supply pipes outward from the slack via BFS so every supply # junction has an incoming pipe (McCormick-DHS pins direction). slack_root = ( slack_node_id if slack_node_id is not None else power_net_as_st.first_node() ) undirected = nx.Graph(power_net_as_st.graph) branch_lookup = {} for branch in power_net_as_st.branches: branch_lookup[(branch.from_node_id, branch.to_node_id)] = branch branch_lookup[(branch.to_node_id, branch.from_node_id)] = branch bfs_edges = list(nx.bfs_edges(undirected, source=slack_root)) if node_based_heat_loads: consumer_buses = { node.id for node in power_net_as_st.nodes if _node_power_load_mw(power_net, node) > 0 and heat_load_share > 0 } parent = {slack_root: None} for p, c in bfs_edges: parent[c] = p keep_nodes = {slack_root} for cb in consumer_buses: n = cb while n is not None and n not in keep_nodes: keep_nodes.add(n) n = parent.get(n) else: keep_nodes = {n.id for n in power_net_as_st.nodes} bus_index_to_supply_junction = {} for node in power_net_as_st.nodes: if node.id not in keep_nodes: continue bus_index_to_supply_junction[node.id] = mx.create_junction( target_net, position=node.position, grid=heat_grid ) return_junction = mx.create_junction( target_net, position=None, grid=heat_grid, name="dhs_return" ) # Cache the (from, to) supply-tree edges and the resulting WaterPipe ids # so we can decorate each pipe with a tighter ``m_U_design`` later in the # function once consumer demands are known. supply_pipe_for_edge = {} for p, c in bfs_edges: if p not in keep_nodes or c not in keep_nodes: continue branch = branch_lookup[(p, c)] from_id = bus_index_to_supply_junction[p] to_id = bus_index_to_supply_junction[c] pipe_id = mx.create_water_pipe( target_net, from_node_id=from_id, to_node_id=to_id, diameter_m=default_diameter_m, length_m=get_length( target_net, branch, from_id, to_id, default_length=default_length, ) * length_scale, temperature_ext_k=296.15, roughness_m=0.001, grid=heat_grid, ) supply_pipe_for_edge[(p, c)] = pipe_id load_specs = [] # list of (supply_id, heat_mw) gen_specs = [] # list of (supply_id, heat_mw_raw) for node in power_net_as_st.nodes: if node.id not in bus_index_to_supply_junction: continue supply_id = bus_index_to_supply_junction[node.id] p_load_mw = _node_power_load_mw(power_net, node) if p_load_mw > 0 and heat_load_share > 0: load_specs.append( (supply_id, max(min_load_mw, p_load_mw * heat_load_share)) ) if not node_based_heat_loads: p_gen_mw = _node_power_gen_mw(power_net, node) if p_gen_mw > 0 and heat_gen_share > 0: gen_specs.append( (supply_id, max(min_gen_mw, p_gen_mw * heat_gen_share)) ) total_load_q = sum(q for _, q in load_specs) total_gen_q_raw = sum(q for _, q in gen_specs) if balance_gen_to_load and total_gen_q_raw > 0 and total_load_q > total_gen_q_raw: gen_scale = total_load_q / total_gen_q_raw else: gen_scale = 1.0 # Create the HX-Loads (or node-based HeatLoad children). for supply_id, heat_mw in load_specs: if node_based_heat_loads: mx.create_heat_load(target_net, supply_id, q_mw=heat_mw) # Design flow same as the LinearHX would compute internally: # m = q / (c · ΔT) with c = 4180 J/(kg·K), ΔT = 30 K default. m_design = heat_mw * 1e6 / (4180.0 * 30.0) mx.create_water_sink( target_net, supply_id, mass_flow_kgs=round(m_design, 6) ) else: mx.create_heat_exchanger( target_net, from_node_id=supply_id, to_node_id=return_junction, q_mw=heat_mw, ) if not node_based_heat_loads: for supply_id, heat_mw_raw in gen_specs: heat_mw = heat_mw_raw * gen_scale mx.create_heat_exchanger( target_net, from_node_id=return_junction, to_node_id=supply_id, q_mw=-heat_mw, ) bus_demand_kgs: dict[int, float] = {} subtree: dict = {} # The per-pipe cumulative downstream demand feeds both the McCormick-DHS # envelopes (node-based mode) and capacity-based pipe sizing (auto_diameter), # so compute it whenever either is requested. if node_based_heat_loads or auto_diameter: for supply_id, heat_mw in load_specs: m_design = heat_mw * 1e6 / (4180.0 * 30.0) bus_demand_kgs[supply_id] = bus_demand_kgs.get(supply_id, 0.0) + m_design children_of: dict = {} for p, c in bfs_edges: if p in keep_nodes and c in keep_nodes: children_of.setdefault(p, []).append(c) stack = [(slack_root, False)] while stack: bus, visited = stack.pop() if visited: supply_id = bus_index_to_supply_junction[bus] total = bus_demand_kgs.get(supply_id, 0.0) for child_bus in children_of.get(bus, []): total += subtree[child_bus] subtree[bus] = total else: stack.append((bus, True)) for child_bus in children_of.get(bus, []): stack.append((child_bus, False)) if node_based_heat_loads and node_heat_gen_share > 0: for node in power_net_as_st.nodes: if node.id not in bus_index_to_supply_junction: continue p_gen_mw = _node_power_gen_mw(power_net, node) if p_gen_mw <= 0: continue heat_mw = max(min_gen_mw, p_gen_mw * node_heat_gen_share) m_downstream = subtree.get(node.id, 0.0) q_cap_mw = 4180.0 * m_downstream * 30.0 / 1e6 if q_cap_mw > 0: heat_mw = min(heat_mw, q_cap_mw) elif q_cap_mw == 0: # No downstream consumers - no heat can be transported. continue mx.create_heat_generator( target_net, node_id=bus_index_to_supply_junction[node.id], q_mw=round(heat_mw, 6), ) slack_supply_junction = bus_index_to_supply_junction[slack_root] if heat_plant_mode not in ("two_port", "closing_pipe", "screening"): raise ValueError( f"Unknown heat_plant_mode {heat_plant_mode!r}; expected one of " f"'two_port', 'closing_pipe', 'screening'." ) # Hot port (supply slack): pins T = supply_slack_t_k, p = 1.0 at the # slack junction. Always present. mx.create_ext_hydr_grid( target_net, node_id=slack_supply_junction, t_k=supply_slack_t_k, grid_key=mm.WATER_KEY, name="Grid Connection Heat Supply", ) if heat_plant_mode == "two_port": mx.create_ext_hydr_grid( target_net, node_id=return_junction, t_k=return_t_k, pressure_pu=return_pressure_pu, pin_temperature=return_pin_temperature, grid_key=mm.WATER_KEY, name="Grid Connection Heat Return", ) else: if return_diameter_m is not None: closing_diameter_m = return_diameter_m elif auto_diameter: # The closing pipe returns the whole network's flow to the slack. closing_diameter_m = _auto_diameter(subtree.get(slack_root, 0.0)) else: closing_diameter_m = default_diameter_m * 1.5 mx.create_water_pipe( target_net, from_node_id=return_junction, to_node_id=slack_supply_junction, diameter_m=closing_diameter_m, length_m=return_length_m if return_length_m is not None else default_length * length_scale, temperature_ext_k=296.15, roughness_m=0.001, grid=heat_grid, ) if not node_based_heat_loads and heat_plant_mode == "screening" and load_specs: sink_capacity = total_load_q * 1e6 / (4180.0 * 30.0) * 5.0 mx.create_water_sink( target_net, node_id=return_junction, mass_flow_kgs=round(sink_capacity, 6), name="Grid Connection Heat Return Sink", ) if node_based_heat_loads: SAFETY = 5.0 for (p, c), pipe_id in supply_pipe_for_edge.items(): m_design = subtree.get(c, 0.0) if m_design > 0: pipe = target_net.branch_by_id(pipe_id) pipe.model.m_U_design = m_design * SAFETY pipe.model.mass_flow_nominal_kgs = m_design if auto_diameter: # Widen each supply pipe to carry its cumulative downstream demand at # the design velocity_mps; trunk pipes near the slack end up widest. for (p, c), pipe_id in supply_pipe_for_edge.items(): pipe = target_net.branch_by_id(pipe_id) pipe.model.diameter_m = _auto_diameter(subtree.get(c, 0.0)) return bus_index_to_supply_junction, return_junction
def _drain_proportionally(items, get_mag, set_mag, remove, total) -> float: """Drain ``total`` from *items* by scaling every item with one common factor ``(pool − absorbed) / pool``. Proportional draining preserves the spatial distribution of the remaining capacity — which generators carry the replaced output is then independent of child insertion order, so CP-replacement variants differ from their no-CP baseline only by the CP routing, not by an arbitrary set of deleted generators. Items scaled to ~0 are removed entirely. Returns the unabsorbed remainder (> 0 iff ``total`` exceeds the pool). """ remaining = float(total) if remaining <= 0: return 0.0 items = [i for i in items if get_mag(i) > 0] pool = sum(get_mag(i) for i in items) if pool <= 0: return remaining absorb = min(pool, remaining) factor = (pool - absorb) / pool for item in items: new_mag = get_mag(item) * factor if new_mag <= 1e-12: remove(item) else: set_mag(item, new_mag) return remaining - absorb def _drain_power_gen_capacity(net: mm.Network, total_mw: float) -> float: """Drain ``total_mw`` proportionally from PowerGenerator children (skips ExtPowerGrid - slack). Returns the unabsorbed remainder.""" return _drain_proportionally( [c for c in net.childs if isinstance(c.model, mm.PowerGenerator)], get_mag=lambda c: abs(float(c.model.p_mw)), set_mag=lambda c, v: setattr(c.model, "p_mw", -v), remove=lambda c: net.remove_child(c.id), total=total_mw, ) def _drain_gas_source_capacity(net: mm.Network, total_kgs: float) -> float: """Drain ``total_kgs`` proportionally from gas-side Sources only. Returns the unabsorbed remainder.""" return _drain_proportionally( [ c for c in net.childs if isinstance(c.model, mm.Source) and isinstance(c.grid, mm.GasGrid) ], get_mag=lambda c: abs(float(c.model.mass_flow_kgs)), set_mag=lambda c, v: setattr(c.model, "mass_flow_kgs", -v), remove=lambda c: net.remove_child(c.id), total=total_kgs, ) def _drain_heat_gen_capacity(net: mm.Network, total_mw: float) -> float: """Drain ``total_mw`` proportionally from HeatGenerator children, then (only if their pool is exhausted) proportionally from HX-Gen branches. Returns the unabsorbed remainder; no slack fallback by design.""" remaining = _drain_proportionally( [c for c in net.childs if isinstance(c.model, mm.HeatGenerator)], get_mag=lambda c: abs(float(c.model.q_mw_heat)), set_mag=lambda c, v: setattr(c.model, "q_mw_heat", -v), remove=lambda c: net.remove_child(c.id), total=total_mw, ) if remaining <= 1e-12: return remaining return _drain_proportionally( [ b for b in net.branches if isinstance(b.model, mm.HeatExchanger) and float(getattr(b.model, "q_mw_set", 0.0) or 0.0) > 0 ], get_mag=lambda b: float(getattr(b.model, "q_mw_set", 0.0) or 0.0), set_mag=lambda b, v: setattr(b.model, "q_mw_set", v), remove=lambda b: net.remove_branch_between( b.from_node_id, b.to_node_id, b.id[2] ), total=remaining, )
[docs] def create_coupling_points_for_mes( mes_net: mm.Network, bus_to_gas_junc, bus_to_heat_supply_junc, heat_return_junc, density=0.2, centralized=False, central_node_id=None, couplings=("chp", "p2g", "p2h"), chp_efficiency_power=0.4, chp_efficiency_heat=0.45, chp_diameter_m=0.05, chp_p_share=0.5, p2g_efficiency=0.7, p2g_p_share=1.0, p2h_efficiency=0.95, p2h_p_share=1.0, p2h_diameter_m=0.01, cp_size_multiplier=1.0, regulation=1.0, use_hg_variants=False, seed=None, replace_primary_generation=False, ): """Add CHP / P2G / P2H coupling points to an MES network. ``density`` ∈ [0,1] is per-node Bernoulli in decentralised mode and ``ceil(density·N)`` units on one hub in centralised mode. Capacities scale from each bus's local p_ref via the per-type ``*_p_share`` and a global ``cp_size_multiplier``. ``replace_primary_generation=True`` (default False is additive) drains the matching primary fleet to keep total rated production per carrier invariant. In node-based heat mode, heat-side replacement drains :class:`HeatGenerator` children (no slack fallback). Returns ``list[{"type", "node", "id"}]``. """ if not 0 <= regulation <= 1: raise ValueError("regulation must be in [0, 1]") if not 0 <= density <= 1: raise ValueError("density must be in [0, 1]") if cp_size_multiplier <= 0: raise ValueError("cp_size_multiplier must be > 0") for share_name, share in ( ("chp_p_share", chp_p_share), ("p2g_p_share", p2g_p_share), ("p2h_p_share", p2h_p_share), ): if share < 0: raise ValueError(f"{share_name} must be >= 0") rng = random.Random(seed) if seed is not None else random coupling_set = {c.lower() for c in couplings} valid = {"chp", "p2g", "p2h"} unknown = coupling_set - valid if unknown: raise ValueError(f"unknown coupling types: {sorted(unknown)}") candidate_node_ids = [ nid for nid in bus_to_gas_junc.keys() if nid in bus_to_heat_supply_junc ] if not candidate_node_ids: raise ValueError("no nodes are coupled to both gas and heat supply") if centralized: hub = ( central_node_id if central_node_id is not None else min(candidate_node_ids) ) if hub not in candidate_node_ids: raise ValueError( f"central_node_id={hub} is not in the gas/heat coupled set" ) n_units = int(round(density * len(candidate_node_ids))) target_nodes = [hub] * n_units else: target_nodes = [nid for nid in candidate_node_ids if rng.random() < density] created = [] # HHV [MJ/kg] of the gas fluid actually registered on the net, so coupling- # point sizing uses the same heating value as the gas physics regardless of # which gas type the grid was built with. gas_hhv_mj = ( mes_net.node_by_id( next(iter(bus_to_gas_junc.values())) ).grid.higher_heating_value_kwh_per_kg * 3.6 ) cp_power_out_mw = 0.0 cp_gas_out_kgs = 0.0 cp_heat_out_mw = 0.0 for power_node_id in target_nodes: gas_junc = bus_to_gas_junc[power_node_id] heat_supply_junc = bus_to_heat_supply_junc[power_node_id] node = mes_net.node_by_id(power_node_id) p_ref_mw = _node_power_gen_mw(mes_net, node) or _node_power_load_mw( mes_net, node ) if p_ref_mw <= 0: continue unit_type = rng.choice(sorted(coupling_set)) if unit_type == "chp": chp_p_target_mw = chp_p_share * cp_size_multiplier * p_ref_mw mass_flow_kgs = round( chp_p_target_mw / max(chp_efficiency_power, 1e-3) / gas_hhv_mj, 6, ) cp_power_out_mw += mass_flow_kgs * gas_hhv_mj * chp_efficiency_power cp_heat_out_mw += mass_flow_kgs * gas_hhv_mj * chp_efficiency_heat if use_hg_variants: uid = mx.create_chp_hg( mes_net, power_node_id=power_node_id, heat_node_id=heat_supply_junc, gas_node_id=gas_junc, mass_flow_setpoint_kgs=mass_flow_kgs, efficiency_power=chp_efficiency_power, efficiency_heat=chp_efficiency_heat, regulation=regulation, ) else: uid = mx.create_chp( mes_net, power_node_id=power_node_id, heat_node_id=heat_supply_junc, heat_return_node_id=heat_return_junc, gas_node_id=gas_junc, mass_flow_setpoint_kgs=mass_flow_kgs, diameter_m=chp_diameter_m, efficiency_power=chp_efficiency_power, efficiency_heat=chp_efficiency_heat, regulation=regulation, ) created.append({"type": "chp", "node": power_node_id, "id": uid}) elif unit_type == "p2g": p2g_p_in_mw = p2g_p_share * cp_size_multiplier * p_ref_mw mass_flow_kgs = round(p2g_efficiency * p2g_p_in_mw / gas_hhv_mj, 6) cp_gas_out_kgs += mass_flow_kgs bid = mx.create_p2g( mes_net, from_node_id=power_node_id, to_node_id=gas_junc, efficiency=p2g_efficiency, mass_flow_setpoint_kgs=mass_flow_kgs, regulation=regulation, ) created.append({"type": "p2g", "node": power_node_id, "id": bid}) elif unit_type == "p2h": p2h_p_in_mw = p2h_p_share * cp_size_multiplier * p_ref_mw heat_mw = round(p2h_p_in_mw * p2h_efficiency, 6) cp_heat_out_mw += heat_mw if use_hg_variants: bid = mx.create_p2h_hg( mes_net, power_node_id=power_node_id, heat_node_id=heat_supply_junc, heat_energy_mw=heat_mw, efficiency=p2h_efficiency, ) created.append({"type": "p2h", "node": power_node_id, "id": bid}) else: uid = mx.create_p2h( mes_net, power_node_id=power_node_id, heat_node_id=heat_supply_junc, heat_return_node_id=heat_return_junc, heat_energy_mw=heat_mw, diameter_m=p2h_diameter_m, efficiency=p2h_efficiency, regulation=regulation, ) created.append({"type": "p2h", "node": power_node_id, "id": uid}) if replace_primary_generation: # Drain primary generation so total rated capacity per carrier stays invariant. unabsorbed_p = _drain_power_gen_capacity(mes_net, cp_power_out_mw) unabsorbed_g = _drain_gas_source_capacity(mes_net, cp_gas_out_kgs) unabsorbed_h = _drain_heat_gen_capacity(mes_net, cp_heat_out_mw) for label, asked, left in ( ("PowerGenerator", cp_power_out_mw, unabsorbed_p), ("gas Source", cp_gas_out_kgs, unabsorbed_g), ("heat (HeatGenerator + HX-Gen)", cp_heat_out_mw, unabsorbed_h), ): if left > 1e-9 and asked > 0: # Hard error by design: an unabsorbed remainder means the CP # output exceeds the primary pool, so total rated capacity is # NOT invariant any more and every density-sweep comparison # built on that invariance is silently biased. raise ValueError( f"replace_primary_generation: {label} pool absorbed " f"{asked - left:g} of {asked:g} requested; {left:g} " f"unabsorbed. CP output exceeds the primary pool — " f"capacity invariance would break. Reduce CP density / " f"size (or raise the primary generation share)." ) return created
[docs] def generate_supply_return_mes_based_on_power_net( net_power: mm.Network, *, coupling_density=0.2, centralized=False, central_node_id=None, couplings=("chp", "p2g", "p2h"), heat_kwargs=None, gas_kwargs=None, coupling_kwargs=None, ): """Wrap :func:`create_gas_tree_net_for_power` + :func:`create_heat_supply_return_net_for_power` + :func:`create_coupling_points_for_mes` into one call. ``gas_kwargs`` / ``heat_kwargs`` / ``coupling_kwargs`` forward to those builders.""" new_mes_net = net_power.copy() bus_to_gas_junc = create_gas_tree_net_for_power( net_power, new_mes_net, **(gas_kwargs or {}) ) bus_to_heat_supply, heat_return = create_heat_supply_return_net_for_power( net_power, new_mes_net, **(heat_kwargs or {}) ) create_coupling_points_for_mes( new_mes_net, bus_to_gas_junc=bus_to_gas_junc, bus_to_heat_supply_junc=bus_to_heat_supply, heat_return_junc=heat_return, density=coupling_density, centralized=centralized, central_node_id=central_node_id, couplings=couplings, **(coupling_kwargs or {}), ) return new_mes_net