Source code for monee.solver.pyo

import logging
import math
import os
import shutil
import tempfile
import types

import pyomo.environ as pyo
from pyomo.opt import SolverStatus, TerminationCondition

from monee.model import (
    Const,
    GenericModel,
    Intermediate,
    IntermediateEq,
    Network,
    Var,
)
from monee.model.extension.islanding.core import NetworkIslandingConfig
from monee.model.formulation.registry import attach_formulations
from monee.problem.core import OptimizationProblem
from monee.solver.infeasibility import diagnose_infeasibility

from .core import (
    SolverInterface,
    SolverResult,
    StepState,
    apply_post_process_all,
    as_iter,
    compute_bound_violations,
    filter_bool_eqs,
    filter_intermediate_eqs,
    find_ignored_nodes,
    ignore_branch,
    ignore_child,
    ignore_compound,
    ignore_node,
    inject_vars,
    mark_he_flow_prescription,
    mark_heat_balance_slacks,
    mark_ignored_components,
    persist_solution,
    pin_floating_hydraulic_gauges,
    withdraw_vars,
)

_log = logging.getLogger(__name__)

DEFAULT_SOLVER_OPTIONS = {}

# Gurobi defaults tuned for McCormick-DHS + MISOCP electrical:
#   ScaleFlag=2 - aggressive geometric scaling for the SOC/bilinear blocks;
#   MIPFocus=2  - close the gap (the LP root bound is already at the optimum);
#   MIPGap=1e-3 - ~kW precision at MW scale (default 1e-4 wastes B&B).
# NumericFocus stays at 0: was net-negative once Reynolds/pressure_pa scaling fixed.
PER_SOLVER_OPTIONS = {
    "gurobi": {
        "ScaleFlag": 2,
        "MIPFocus": 2,
        "MIPGap": 1e-3,
        "TimeLimit": 300,
    },
    # Dual presolve reductions make SCIP spuriously prove infeasibility on the
    # ill-conditioned MISOCP load-shedding models (verified: identical model
    # optimal with these off and with Gurobi, 'infeasible' with them on).
    "scip": {
        "misc/allowstrongdualreds": False,
        "misc/allowweakdualreds": False,
        "limits/time": 300,
    },
}


def _classic_scip_available() -> bool:
    """True iff the standalone ``scip``/``scipampl`` executable is on PATH.

    Checked via ``shutil.which`` rather than ``SolverFactory("scip").available()``
    so we don't trigger Pyomo's noisy "Could not locate the 'scip' executable"
    warning just to decide which backend to use.
    """
    return bool(shutil.which("scip") or shutil.which("scipampl"))


def _pyscipopt_available() -> bool:
    """True iff the ``pyscipopt`` Python bindings (PyPI ``pyscipopt``) import."""
    try:
        import pyscipopt  # noqa: F401
    except ImportError:
        return False
    return True


[docs] class PyscipoptSolver: """Drive SCIP through the ``pyscipopt`` bindings when the classic ``scip`` executable is absent.. """ def __init__(self): self.options: dict = {}
[docs] def warm_start_capable(self) -> bool: return False
[docs] def available(self, exception_flag: bool = False) -> bool: return _pyscipopt_available()
[docs] def solve(self, model, tee: bool = False, **kwargs): from pyscipopt import Model as ScipModel tmpdir = tempfile.mkdtemp(prefix="monee_scip_") nl_path = os.path.join(tmpdir, "model.nl") try: model.write( nl_path, format="nl", io_options={"symbolic_solver_labels": True}, ) sm = ScipModel() if not tee: sm.hideOutput() sm.readProblem(nl_path) for key, value in self.options.items(): try: sm.setParam(key, value) except Exception: _log.debug("pyscipopt: ignoring unknown SCIP option %r", key) sm.optimize() status = sm.getStatus() has_solution = sm.getNSols() > 0 if has_solution: # Strip whitespace from SCIP variable names: on Windows the # pyomo-written .col label file has CRLF line endings, so # SCIP's NL reader keeps a trailing '\r' in every name. scip_vals = {var.name.strip(): sm.getVal(var) for var in sm.getVars()} matched = 0 for var in model.component_data_objects(pyo.Var, active=True): val = scip_vals.get(var.name) if val is not None: var.set_value(val, skip_validation=True) matched += 1 if matched == 0 and scip_vals: _log.warning( "pyscipopt bridge: no SCIP variable names matched the " "pyomo model; solution write-back failed and the " "reported values are the variable initializations." ) return self._build_result(status, has_solution) finally: shutil.rmtree(tmpdir, ignore_errors=True)
@staticmethod def _build_result(status: str, has_solution: bool): """Map a SCIP status string onto the ``(status, termination_condition)`` pair that :func:`_classify_solve_result` inspects.""" if status == "optimal": solver_status, tc = SolverStatus.ok, TerminationCondition.optimal elif status == "infeasible": solver_status, tc = SolverStatus.warning, TerminationCondition.infeasible elif status == "unbounded": solver_status, tc = SolverStatus.warning, TerminationCondition.unbounded elif has_solution: # A limit was hit (time/node/gap) but SCIP has a feasible incumbent; # _classify_solve_result treats this as a usable witness solution. solver_status, tc = SolverStatus.aborted, TerminationCondition.maxTimeLimit else: # Limit hit with no incumbent -> report as infeasible so the caller # surfaces a diagnostic rather than reading unset Vars as a solution. solver_status, tc = SolverStatus.warning, TerminationCondition.infeasible return types.SimpleNamespace( solver=types.SimpleNamespace(status=solver_status, termination_condition=tc) )
[docs] class PyomoPWLImpl: def __init__(self, pm: pyo.ConcreteModel, pw_repn: str = "SOS2"): self.pm = pm self.pw_repn = pw_repn # counter to ensure unique component names if not hasattr(pm, "_pwl_counter"): pm._pwl_counter = 0
[docs] def piecewise_eq(self, y, x, xs, ys, name: str | None = None): pm = self.pm pm._pwl_counter += 1 k = pm._pwl_counter if name is None: name = f"pwl_{k}" xs = list(xs) ys = list(ys) pw = pyo.Piecewise( y, x, pw_pts=xs, f_rule=ys, # list form: y[i] at xs[i] pw_constr_type="EQ", pw_repn=self.pw_repn, # "SOS2" (tight), or "BIGM" warn_domain_coverage=False, ) setattr(pm, name, pw)
def _classify_solve_result(result, pm, solver_name: str, *, phase_label: str): """Map a Pyomo solve outcome to ``(success, report, status_str, tc_str)``. OK → silent; infeasible → error + MIS report; other non-OK (limits with a feasible incumbent) → warning, treated as success. The status/tc strings let downstream consumers identify *which* non-ok outcome a "success" actually came from (e.g. a Gurobi time-limit abort that returned a witness incumbent — the MC pipeline drops those samples rather than averaging them in). """ status = result.solver.status tc = result.solver.termination_condition status_str = str(status) if status is not None else None tc_str = str(tc) if tc is not None else None if status == SolverStatus.ok: return True, None, status_str, tc_str if tc == TerminationCondition.infeasible: report = diagnose_infeasibility( pm, solver_name=solver_name, compute_mis_flag=True, tol=0.001 ) _log.error( "%s infeasible (status=%s, termination=%s). Diagnostic report:\n%s", phase_label, status, tc, report.summary(max_items=50), ) return False, report, status_str, tc_str # Only limit-type terminations come with a feasible incumbent worth using. # Anything else (infeasibleOrUnbounded, unbounded, error, solverFailure...) # has no witness: declaring success there means downstream code reads the # variable *initializations* as a solution. witness_terminations = { TerminationCondition.maxTimeLimit, TerminationCondition.maxIterations, TerminationCondition.maxEvaluations, TerminationCondition.feasible, TerminationCondition.optimal, TerminationCondition.locallyOptimal, TerminationCondition.globallyOptimal, } if tc in witness_terminations: _log.warning( "%s returned non-ok status (status=%s, termination=%s); " "using witness solution.", phase_label, status, tc, ) return True, None, status_str, tc_str _log.error( "%s failed without a usable solution (status=%s, termination=%s).", phase_label, status, tc, ) return False, None, status_str, tc_str
[docs] class PyomoSolver(SolverInterface): """Pyomo-backed solver. ``solver_name`` is overridable per :meth:`solve`.""" def __init__(self, solver_name: str = "scip"): self._solver_name = solver_name # Per-solve simulation flag (set at the top of solve()); read by the # equation-building passes to drop operational flow limits. self._simulation: bool = False
[docs] @staticmethod def inject_pyomo_vars_attr( pm: pyo.ConcreteModel, target: GenericModel, prefix: str ): """Replace Var/Const with Pyomo Var / numeric. Clamps stale ``initialize`` into ``[min, max]`` (suppresses W1002 when bounds tightened since last solve).""" for key, value in target.__dict__.items(): if isinstance(value, Var): init = value.value # NaN survives into Gurobi's warmstart file and crashes the solve. if init is not None and isinstance(init, float) and math.isnan(init): init = None if init is not None: if value.min is not None and init < value.min: init = value.min if value.max is not None and init > value.max: init = value.max v = pyo.Var( domain=pyo.Integers if value.integer else pyo.Reals, bounds=(value.min, value.max), initialize=init, ) setattr(pm, PyomoSolver._sanitize_name(f"{prefix}__{key}"), v) setattr(target, key, v) elif type(value) is Const: setattr(target, key, float(value.value))
# Snap-to-bound tolerance; sub-epsilon noise re-injected as ``initialize`` # otherwise triggers W1002. _BOUND_SNAP_TOL = 1e-9
[docs] @staticmethod def withdraw_pyomo_vars_attr(target: GenericModel): """Pyomo Var → :class:`Var`. Restores ``integer``, snaps bound-noise to bounds, replaces NaN/None with 0 so the next solve's warmstart survives.""" for key, value in target.__dict__.items(): if isinstance(value, pyo.Var): lb, ub = value.bounds if value.bounds is not None else (None, None) is_integer = value.is_integer() val = pyo.value(value, exception=False) if val is None or (isinstance(val, float) and math.isnan(val)): val = 0 if is_integer: val = int(round(val)) else: tol = PyomoSolver._BOUND_SNAP_TOL if lb is not None and lb - tol <= val < lb: val = lb if ub is not None and ub < val <= ub + tol: val = ub setattr(target, key, Var(value=val, min=lb, max=ub, integer=is_integer)) elif isinstance(value, pyo.Expression): expr_val = pyo.value(value, exception=False) if expr_val is None or ( isinstance(expr_val, float) and math.isnan(expr_val) ): expr_val = 0 setattr(target, key, Intermediate(value=expr_val))
# --------- Constraint helpers --------- @staticmethod def _add_equation(pm, expr, name=None): # Trivial bool equations are not valid Pyomo Constraint expressions: # True = tautology (no-op); False = structural infeasibility from an # over-deactivated node/branch. Skip both so the load-shedding # objective can drive the solution instead of crashing construction, # but warn on False so a genuine modelling error is surfaced rather # than silently masked (mirrors the GEKKO backend via filter_bool_eqs). if isinstance(expr, bool): if expr is False: _log.warning( "Dropping always-false (structurally infeasible) equation%s; " "this usually means a node/branch was over-deactivated.", f" ({name})" if name is not None else "", ) return if name is not None: setattr(pm, name, pyo.Constraint(expr=expr)) else: pm.cons.add(expr) @staticmethod def _sanitize_name(name): """Make a string safe for use as a Pyomo component name.""" return ( name.replace("(", "").replace(")", "").replace(", ", "_").replace(" ", "_") ) def _add_equations(self, pm, exprs, name_prefix=None): for i, e in enumerate(exprs): if name_prefix is not None: name = self._sanitize_name(f"{name_prefix}_eq_{i}") else: name = None self._add_equation(pm, e, name=name) @staticmethod def _process_intermediate_eqs(pm, model_obj, equations): """ GEKKO Intermediate: two cases in your original code: - If attr is not Intermediate: enforce equality constraint - If attr is Intermediate: create an Intermediate and assign it In Pyomo, use Expression for intermediates. """ for intermediate_eq in [eq for eq in equations if type(eq) is IntermediateEq]: attr_val = getattr(model_obj, intermediate_eq.attr) eq = ( intermediate_eq.eq() if isinstance(intermediate_eq.eq, types.FunctionType) else intermediate_eq.eq ) if type(attr_val) is Intermediate: e = pyo.Expression(expr=eq) name = f"expr__{id(model_obj)}__{intermediate_eq.attr}" setattr(pm, name, e) setattr(model_obj, intermediate_eq.attr, e) @staticmethod def _make_solver(solver_name: str): """Resolve ``solver_name`` to a solver object. For ``scip``, transparently fall back to the :class:`PyscipoptSolver` bridge when the standalone ``scip`` executable is missing but the ``pyscipopt`` bindings are installed. Any other case defers to :func:`pyo.SolverFactory` (which raises the usual informative error at solve time if the solver is genuinely unavailable).""" if ( solver_name == "scip" and not _classic_scip_available() and _pyscipopt_available() ): _log.info( "Standalone 'scip' executable not found; solving via pyscipopt " "(SCIP Python bindings) over an AMPL .nl round-trip." ) return PyscipoptSolver() return pyo.SolverFactory(solver_name)
[docs] def solve( self, input_network: Network, optimization_problem: OptimizationProblem = None, draw_debug=False, exclude_unconnected_nodes: bool = False, step_state: StepState = None, simulation: bool = False, formulation=None, solver_name: str | None = None, **kwargs, ): # Parameter names/order mirror SolverInterface.solve so that a caller # passing draw_debug= (per the ABC) works on both backends. ``debug=`` # is accepted as a legacy alias; ``solver_name`` is a Pyomo-only extra. debug = draw_debug or kwargs.pop("debug", False) self._simulation = simulation if solver_name is None: solver_name = self._solver_name pm = pyo.ConcreteModel() pm.cons = pyo.ConstraintList() # Two buckets for lex mode (single sum in legacy mode). pm.user_obj_exprs = [] # user objectives pm.aux_obj_exprs = [] # formulation minimize() tightening terms network = input_network.copy() for ext in network.extensions: ext.prepare(network) # Attach the effective formulations and declare their vars on the copy. # In simulation mode this also squares the model (phantom DOFs pinned to # Const, |m| warm-started, vm_pu_squared demoted to a PostProcess). # Pyomo has no IMODE=1; a square system simply solves as the # objective-free feasibility problem, yielding the same unique steady # state GEKKO gets via IMODE=1 - kept consistent across backends so # pyomo+ipopt is an equivalent NLP path. Runs BEFORE # pin_floating_hydraulic_gauges / mark_heat_balance_slacks (which set # pressure Consts a re-declare would overwrite). attach_formulations(network, formulation, simulation=simulation) islanding_config = next( (e for e in network.extensions if isinstance(e, NetworkIslandingConfig)), None, ) # ignored_nodes computed before _apply so controllable filters checking # component.ignored exclude disconnected components. ignored_nodes = set() if optimization_problem is None or exclude_unconnected_nodes: ignored_nodes = find_ignored_nodes(network, islanding_config) if ignored_nodes: mark_ignored_components(network, ignored_nodes) if optimization_problem is not None: optimization_problem._apply(network) nodes = network.nodes for node in nodes: if ignore_node(node, network, ignored_nodes): continue for child in network.childs_by_ids(node.child_ids): if child.active: child.model.overwrite(node.model, node.grid) branches = network.branches compounds = network.compounds # Pin the pressure gauge of any hydraulic island without a grid-forming # source - removes a rank-deficient free DOF. pin_floating_hydraulic_gauges(network, ignored_nodes) # Drop each heat island's dependent nodal balance at its grid-forming # node (heat slack) - result-preserving for the exact balance. Under # mccormick the Junction balance is already trivial and the relaxed # eq. 9a is kept, so this is a no-op there. mark_heat_balance_slacks(network, ignored_nodes) # Decide per compound-internal SubHE whether the design flow prescribes # the through-flow (supply/return islands with free-mass-flow slacks) # or yields to a network-determined flow (e.g. a fixed sink downstream). mark_he_flow_prescription(network, ignored_nodes) inject_vars( lambda model, comp, cat: PyomoSolver.inject_pyomo_vars_attr( pm, model, prefix=f"{cat}_{comp.id}" ), nodes, branches, compounds, network, ignored_nodes, ) if step_state is not None: for ext in network.extensions: ext.activate_timeseries(network, ignored_nodes, step_state=step_state) self.mark_temporal_components(network, ignored_nodes) self.init_branches(branches) self.process_equations_nodes_childs(pm, network, nodes, ignored_nodes) self.process_equations_branches(pm, network, branches, ignored_nodes) self.process_equations_compounds(pm, network, compounds, ignored_nodes) if optimization_problem is not None: self.process_oxf_components(pm, network, optimization_problem) else: self.process_internal_oxf_components(pm, network) if step_state is not None: self.process_inter_step_equations( pm, network, nodes, branches, compounds, ignored_nodes, step_state, optimization_problem=optimization_problem, ) for ext in network.extensions: self._add_equations( pm, ext.inter_step_equations(network, ignored_nodes, step_state) ) self._add_equations( pm, ext.inter_temporal_equations(network, ignored_nodes, step_state) ) for ext in network.extensions: self._add_equations(pm, ext.equations(network, ignored_nodes)) lex_objectives = ( optimization_problem is not None and optimization_problem.lex_objectives and len(pm.user_obj_exprs) > 0 and len(pm.aux_obj_exprs) > 0 ) solver = self._make_solver(solver_name) for k, v in DEFAULT_SOLVER_OPTIONS.items(): solver.options[k] = v for k, v in PER_SOLVER_OPTIONS.get(solver_name, {}).items(): solver.options[k] = v solve_kwargs = {"tee": debug} if getattr(solver, "warm_start_capable", lambda: False)(): solve_kwargs["warmstart"] = True if lex_objectives: result, success, report, status_str, tc_str = self._solve_lexicographic( pm, solver, solver_name, solve_kwargs ) else: pm.obj = pyo.Objective( expr=sum(pm.user_obj_exprs) + sum(pm.aux_obj_exprs), sense=pyo.minimize, ) result = solver.solve(pm, **solve_kwargs) success, report, status_str, tc_str = _classify_solve_result( result, pm, solver_name, phase_label="Pyomo solve", ) withdraw_vars( PyomoSolver.withdraw_pyomo_vars_attr, nodes, branches, compounds, network ) apply_post_process_all(nodes, branches, compounds, network) persist_solution(network, input_network) violations = compute_bound_violations(nodes, branches, compounds, network) # NaN fallback: pyo.value raises on unset Vars (e.g. McCormick aux on a # freshly-activated branch); SolverResult.success is the real outcome. obj_val = pyo.value(pm.obj, exception=False) if obj_val is None: obj_val = float("nan") solver_result = SolverResult( network, network.as_result_dataframe_dict(), obj_val, success, violations, solver_status=status_str, termination_condition=tc_str, infeasibility_report=report if not success else None, ) return solver_result
# Slack added on top of solver MIPGap so the phase-2 cap never excludes # a phase-1 incumbent the solver accepted within tolerance. _LEX_REL_TOL = 1e-6 _LEX_ABS_TOL = 1e-9 @classmethod def _lex_cap_slack(cls, s_star: float, solver_options: dict) -> float: rel_gap = float(solver_options.get("MIPGap", 0.0) or 0.0) rel = max(rel_gap, cls._LEX_REL_TOL) return cls._LEX_ABS_TOL + rel * max(1.0, abs(s_star)) def _solve_lexicographic(self, pm, solver, solver_name, solve_kwargs): """Two-phase lex: minimise ``Σ user`` then ``Σ aux`` under ``Σ user ≤ S* + slack``. Combined ``pm.obj`` attached (deactivated) for backwards-compatible ``pyo.value(pm.obj)``.""" pm.obj_user = pyo.Objective(expr=sum(pm.user_obj_exprs), sense=pyo.minimize) pm.obj_aux = pyo.Objective(expr=sum(pm.aux_obj_exprs), sense=pyo.minimize) pm.obj_aux.deactivate() # Phase 1: user objective only. result = solver.solve(pm, **solve_kwargs) success, report, status_str, tc_str = _classify_solve_result( result, pm, solver_name, phase_label="Lexicographic phase 1", ) if not success: pm.obj = pyo.Objective( expr=sum(pm.user_obj_exprs) + sum(pm.aux_obj_exprs), sense=pyo.minimize, ) pm.obj.deactivate() return result, success, report, status_str, tc_str s_star = pyo.value(pm.obj_user, exception=False) if s_star is None: s_star = 0.0 slack = self._lex_cap_slack(s_star, solver.options) pm.lex_cap = pyo.Constraint(expr=sum(pm.user_obj_exprs) <= s_star + slack) pm.obj_user.deactivate() pm.obj_aux.activate() result = solver.solve(pm, **solve_kwargs) success, report, status_str, tc_str = _classify_solve_result( result, pm, solver_name, phase_label="Lexicographic phase 2", ) pm.obj = pyo.Objective( expr=sum(pm.user_obj_exprs) + sum(pm.aux_obj_exprs), sense=pyo.minimize, ) pm.obj.deactivate() return result, success, report, status_str, tc_str
[docs] def process_internal_oxf_components(self, pm, network): for constraint in network.constraints: self._add_equation(pm, constraint(network)) obj = None for objective in network.objectives: obj = objective(network) if obj is None else (obj + objective(network)) if obj is not None: pm.user_obj_exprs.append(obj)
[docs] def process_oxf_components( self, pm, network, optimization_problem: OptimizationProblem, period_index=None, ): if optimization_problem.constraints is not None and ( not optimization_problem.constraints.empty ): self._add_equations( pm, optimization_problem.constraints.all( network, period_index=period_index ), ) obj = None for objective in ( optimization_problem.objectives.all(network, period_index=period_index) if optimization_problem.objectives is not None else [] ): obj = objective if obj is None else (obj + objective) if obj is not None: pm.user_obj_exprs.append(obj)
[docs] def process_equations_compounds(self, pm, network, compounds, ignored_nodes): for compound in compounds: if ignore_compound(compound, ignored_nodes): continue equations = compound.equations(network) if equations is not None: equations = filter_bool_eqs( as_iter(equations), context=f"compound_{compound.id}" ) self._process_intermediate_eqs(pm, compound, equations) self._add_equations( pm, filter_intermediate_eqs(equations), name_prefix=f"compound_{compound.id}", )
[docs] def process_equations_nodes_childs( self, pm, network: Network, nodes, ignored_nodes ): sin_impl = pyo.sin cos_impl = pyo.cos abs_impl = abs sqrt_impl = pyo.sqrt log_impl = pyo.log for node in nodes: if ignore_node(node, network, ignored_nodes): continue node_childs = network.childs_by_ids(node.child_ids) grid = node.grid from_branches = [ network.branch_by_id(bid).model for bid in node.from_branch_ids if not ignore_branch(network.branch_by_id(bid), network, ignored_nodes) ] to_branches = [ network.branch_by_id(bid).model for bid in node.to_branch_ids if not ignore_branch(network.branch_by_id(bid), network, ignored_nodes) ] connected_childs = [ child.model for child in node_childs if not ignore_child(child, ignored_nodes) ] equations = as_iter( node.equations( grid, from_branches, to_branches, connected_childs, sin_impl=sin_impl, cos_impl=cos_impl, abs_impl=abs_impl, sqrt_impl=sqrt_impl, log_impl=log_impl, ) ) for expr in node.minimize( grid, from_branches, to_branches, connected_childs, sqrt_impl=sqrt_impl ): pm.aux_obj_exprs.append(expr) node_eqs = filter_bool_eqs(equations, context=f"node_{node.id}") self._process_intermediate_eqs(pm, node.model, node_eqs) self._add_equations( pm, filter_intermediate_eqs(node_eqs), name_prefix=f"node_{node.id}" ) for child in node_childs: if ignore_child(child, ignored_nodes): continue for expr in child.minimize(grid, node, sqrt_impl=sqrt_impl): pm.aux_obj_exprs.append(expr) child_eqs = filter_bool_eqs( as_iter(child.equations(grid, node)), context=f"child_{child.id}" ) self._process_intermediate_eqs(pm, child.model, child_eqs) self._add_equations( pm, filter_intermediate_eqs(child_eqs), name_prefix=f"child_{child.id}", )
[docs] def process_equations_branches(self, pm, network, branches, ignored_nodes): sin_impl = pyo.sin cos_impl = pyo.cos abs_impl = abs sqrt_impl = pyo.sqrt log_impl = pyo.log pwl_impl = PyomoPWLImpl(pm, pw_repn="SOS2") def if_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO if2/if3 with a Pyomo Piecewise / big-M formulation." ) def max_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO max2 with Pyomo max_ (or explicit constraints)." ) def sign_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO sign2/sign3 with a Pyomo formulation (often binary)." ) for branch in branches: if ignore_branch(branch, network, ignored_nodes): continue grid = branch.grid branch_eqs = as_iter( branch.equations( grid, network.node_by_id(branch.from_node_id).model, network.node_by_id(branch.to_node_id).model, sin_impl=sin_impl, cos_impl=cos_impl, if_impl=if_impl, abs_impl=abs_impl, max_impl=max_impl, sign_impl=sign_impl, log_impl=log_impl, sqrt_impl=sqrt_impl, pwl_impl=pwl_impl, simulation=self._simulation, ) ) for expr in branch.minimize( grid, network.node_by_id(branch.from_node_id).model, network.node_by_id(branch.to_node_id).model, sqrt_impl=sqrt_impl, ): pm.aux_obj_exprs.append(expr) branch_eqs = filter_bool_eqs(branch_eqs, context=f"branch_{branch.id}") self._process_intermediate_eqs(pm, branch.model, branch_eqs) self._add_equations( pm, filter_intermediate_eqs(branch_eqs), name_prefix=f"branch_{branch.id}", )