Source code for monee.solver.gekko

import logging

import networkx as nx
from gekko import GEKKO
from gekko.gk_operators import GK_Intermediate, GK_Operators
from gekko.gk_variable import GKVariable

from monee.model import (
    Const,
    GenericModel,
    Intermediate,
    Network,
    Var,
)
from monee.model.extension.islanding.core import NetworkIslandingConfig
from monee.model.formulation.registry import attach_formulations
from monee.problem.core import OptimizationProblem

from .core import (
    OperatorEquationAssembly,
    SolverInterface,
    SolverResult,
    StepState,
    apply_post_process_all,
    compute_bound_violations,
    find_ignored_nodes,
    generate_real_topology,
    ignore_node,
    inject_vars,
    mark_he_flow_prescription,
    mark_heat_balance_slacks,
    mark_ignored_components,
    persist_solution,
    pin_floating_hydraulic_gauges,
    remove_cps,
    withdraw_vars,
)
from .infeasibility.apm import (
    GekkoSolveError,
    diagnose_gekko_infeasibility,
    sanitize_apm_name,
)

# APOPT (SOLVER=1) MINLP options. IPOPT rejects the minlp_* keys, so they are
# applied only for the APOPT path (see _solver_options).
DEFAULT_SOLVER_OPTIONS = [
    "minlp_maximum_iterations 1000",
    "minlp_max_iter_with_int_sol 500",
    "minlp_as_nlp 0",
    "nlp_maximum_iterations 1000",
    "minlp_branch_method 3",
    "minlp_gap_tol 1.0e-3",
    "minlp_integer_tol 1.0e-4",
    "minlp_integer_max 2.0e5",
    "minlp_integer_leaves 150",
    "minlp_print_level 1",
    "objective_convergence_tolerance 1.0e-4",
    "constraint_convergence_tolerance 1.0e-4",
]

# IPOPT (SOLVER=3) is a pure-NLP solver, the smooth gas/heat formulations target
# it. Only IPOPT-valid option keys here.
IPOPT_SOLVER_OPTIONS = [
    "max_iter 3000",
    "tol 1.0e-6",
    "constr_viol_tol 1.0e-6",
]


def _solver_options(solver: int):
    """APOPT keeps its MINLP options; IPOPT gets NLP-only options it accepts."""
    return IPOPT_SOLVER_OPTIONS if solver == GEKKO_IPOPT else DEFAULT_SOLVER_OPTIONS


GEKKO_IPOPT = 3


[docs] class GekkoCubicSplineImpl: def __init__(self, m): self.m = m
[docs] def piecewise_eq(self, y, x, xs, ys, _name=None): xs = list(xs) ys = list(ys) return self.m.cspline(x, y, xs, ys)
[docs] class GEKKOSolver(OperatorEquationAssembly, SolverInterface): def __init__(self, solver=1): self.solver: int = solver self._simulation: bool = False
[docs] @staticmethod def inject_gekko_vars_attr(gekko: GEKKO, target: GenericModel, id, name_map=None): i = 0 for key, value in target.__dict__.items(): if isinstance(value, Var): name = f"{id}.{value.name}" if value.name is not None else f"{id}.{i}" if name_map is not None: # APMonitor sanitises names irreversibly; remember the # original for the infeasibility diagnostics. name_map[sanitize_apm_name(name)] = name setattr( target, key, gekko.Var( value.value, lb=value.min, ub=value.max, integer=value.integer, name=name, ), ) i += 1 if type(value) is Const: setattr(target, key, gekko.Const(value.value))
[docs] @staticmethod def withdraw_gekko_vars_attr(target: GenericModel): for key, value in target.__dict__.items(): if type(value) is GKVariable: setattr( target, key, Var( value=value.VALUE.value[0], min=value.LOWER, max=value.UPPER, name=value.NAME.split("_")[-1], ), ) if type(value) is GK_Operators: setattr(target, key, Const(value.VALUE.value)) if type(value) is GK_Intermediate: setattr(target, key, Intermediate(value=value.VALUE.value[0]))
def _add_equations(self, m, eqs): m.Equations(eqs) @staticmethod def _solve_with_fallback(m, use_sim): if use_sim: try: m.solve(disp=False) return 1 except Exception as exc: logging.warning( "Simulation (IMODE=1) solve failed; falling back to " "IMODE=3 - the fast square steady-state path was NOT used. " "The model is likely not square (check for phantom degrees " "of freedom / non-simulation formulations). Solver said: %s", exc, ) m.options.IMODE = 3 m.solve(disp=False) return m.options.IMODE
[docs] def solve( # NOSONAR self, input_network: Network, optimization_problem: OptimizationProblem = None, draw_debug=False, exclude_unconnected_nodes=False, step_state: StepState = None, simulation=False, formulation=None, ): self._simulation = simulation m = GEKKO(remote=False) m.options.SOLVER = self.solver m.options.WEB = 0 m.options.IMODE = 1 if simulation else 3 m.solver_options = _solver_options(self.solver) network = input_network.copy() for ext in network.extensions: ext.prepare(network) # Attach the effective formulations and declare their vars on the # solve-time copy (simulation=True additionally squares the model: # phantom DOFs pinned, |m| warm-started, vm_pu_squared demoted). attach_formulations(network, formulation, simulation=simulation) islanding_config = next( (e for e in network.extensions if isinstance(e, NetworkIslandingConfig)), None, ) # Compute ignored_nodes BEFORE _apply so controllable filters checking # component.ignored correctly exclude disconnected components. ignored_nodes = set() if optimization_problem is None or exclude_unconnected_nodes: ignored_nodes = find_ignored_nodes(network, islanding_config) if ignored_nodes: mark_ignored_components(network, ignored_nodes) if optimization_problem is not None: optimization_problem._apply(network) nodes = network.nodes for node in nodes: if ignore_node(node, network, ignored_nodes): continue for child in network.childs_by_ids(node.child_ids): if child.active: child.model.overwrite(node.model, node.grid) branches = network.branches compounds = network.compounds # Pin the pressure gauge of any hydraulic island without a grid-forming # source (e.g. an HE-fed return loop) - removes a rank-deficient free DOF # IPOPT would otherwise have to regularise. pin_floating_hydraulic_gauges(network, ignored_nodes) # Recognise each heat island's grid-forming node as the heat slack and # drop its (dependent) nodal heat balance - removes the heat carrier's # redundant constraint and is required for a square IMODE=1 solve. mark_heat_balance_slacks(network, ignored_nodes) # Decide per compound-internal SubHE whether the design flow prescribes # the through-flow (supply/return islands with free-mass-flow slacks) # or yields to a network-determined flow (e.g. a fixed sink downstream). mark_he_flow_prescription(network, ignored_nodes) apm_name_map: dict[str, str] = {} inject_vars( lambda model, comp, cat: GEKKOSolver.inject_gekko_vars_attr( m, model, comp.nid if cat == "branch" else comp.tid, name_map=apm_name_map, ), nodes, branches, compounds, network, ignored_nodes, ) if step_state is not None: for ext in network.extensions: ext.activate_timeseries(network, ignored_nodes, step_state=step_state) self.mark_temporal_components(network, ignored_nodes) objs_exprs = [] self.init_branches(branches) self.process_equations_nodes_childs(m, network, nodes, ignored_nodes) self.process_equations_branches(m, network, branches, ignored_nodes, objs_exprs) self.process_equations_compounds(m, network, compounds, ignored_nodes) if optimization_problem is not None: self.process_oxf_components(m, network, optimization_problem) else: self.process_internal_oxf_components(m, network) if step_state is not None: self.process_inter_step_equations( m, network, nodes, branches, compounds, ignored_nodes, step_state, optimization_problem=optimization_problem, ) for ext in network.extensions: m.Equations( ext.inter_step_equations(network, ignored_nodes, step_state) ) m.Equations( ext.inter_temporal_equations(network, ignored_nodes, step_state) ) for ext in network.extensions: m.Equations(ext.equations(network, ignored_nodes)) if objs_exprs: for expr in objs_exprs: m.Obj(expr) # IMODE=1 (square simulation) only applies to a plain flow: no objective # of any kind (else IMODE=1 silently ignores it). use_sim = ( simulation and optimization_problem is None and not objs_exprs and not network.objectives ) m.options.IMODE = 1 if use_sim else 3 try: imode_used = self._solve_with_fallback(m, use_sim) except Exception as exc: # APMonitor's exception is just "@error: Solution Not Found"; # build a proper report from the run-directory artifacts instead. report = diagnose_gekko_infeasibility( m, name_map=apm_name_map, solver_message=str(exc) ) if report is not None: logging.error( "Solver not converged. Diagnostic report:\n%s", report.summary(max_items=25), ) else: logging.error("Solver not converged.") if draw_debug: import matplotlib.pyplot as plt remove_cps(network) nx.draw_networkx( generate_real_topology(network._network_internal), node_size=5, font_size=2, width=0.4, ) plt.savefig("debug-network.pdf") # Best-effort warm-start handoff from the partial iterate. try: withdraw_vars( GEKKOSolver.withdraw_gekko_vars_attr, nodes, branches, compounds, network, ) persist_solution(network, input_network) except Exception: pass if report is not None: raise GekkoSolveError( "GEKKO solve failed.\n\nDiagnostic report:\n" + report.summary(max_items=25), report=report, ) from exc raise withdraw_vars( GEKKOSolver.withdraw_gekko_vars_attr, nodes, branches, compounds, network ) apply_post_process_all(nodes, branches, compounds, network) persist_solution(network, input_network) violations = compute_bound_violations(nodes, branches, compounds, network) solver_result = SolverResult( network, network.as_result_dataframe_dict(), m.options.OBJFCNVAL, m.options.APPSTATUS == 1, violations, mode_used="simulation" if imode_used == 1 else "optimization", ) return solver_result
def _pwl_impl(self, m): # spline outperforms GEKKO's native pwl return GekkoCubicSplineImpl(m)