Source code for monee.solver.pyo

import pyomo.environ as pyo

from monee.model import (
    Const,
    GenericModel,
    Intermediate,
    IntermediateEq,
    Network,
    Var,
)
from monee.problem.core import OptimizationProblem

from .core import (
    SolverInterface,
    SolverResult,
    as_iter,
    filter_intermediate_eqs,
    find_ignored_nodes,
    ignore_branch,
    ignore_child,
    ignore_compound,
    ignore_node,
)

DEFAULT_SOLVER_OPTIONS = {}


[docs] class PyomoSolver(SolverInterface): """ """ def __init__(self): pass # --------- Injection / Withdrawal ---------
[docs] @staticmethod def inject_pyomo_vars_attr( pm: pyo.ConcreteModel, target: GenericModel, prefix: str ): """ Replace Var/Const fields on `target` with Pyomo Var / numeric constants. """ for key, value in target.__dict__.items(): if type(value) is Var: # Create a unique Pyomo Var on the ConcreteModel v = pyo.Var( domain=pyo.Integers if value.integer else pyo.Reals, bounds=(value.min, value.max), initialize=value.value, ) setattr(pm, f"{prefix}__{key}", v) setattr(target, key, v) elif type(value) is Const: # Pyomo can use plain floats; Param is optional. setattr(target, key, float(value.value))
[docs] @staticmethod def inject_nans(target: GenericModel): """ If a component is ignored, replace its Vars/Consts with NaN placeholders (keeps downstream code from crashing when it tries to read attributes). """ for key, value in target.__dict__.items(): if isinstance(value, Const): setattr(target, key, Const(float("nan"))) if isinstance(value, Var | Const): setattr(target, key, Var(float("nan"), max=value.max, min=value.min))
[docs] @staticmethod def inject_pyomo_vars(pm, nodes, branches, compounds, network, ignored_nodes): for branch in branches: if ignore_branch(branch, network, ignored_nodes): branch.ignored = True PyomoSolver.inject_nans(branch.model) continue PyomoSolver.inject_pyomo_vars_attr( pm, branch.model, prefix=f"branch_{branch.id}" ) for node in nodes: if ignore_node(node, network, ignored_nodes): node.ignored = True for child in network.childs_by_ids(node.child_ids): child.ignored = True PyomoSolver.inject_nans(child.model) PyomoSolver.inject_nans(node.model) continue PyomoSolver.inject_pyomo_vars_attr(pm, node.model, prefix=f"node_{node.id}") for child in network.childs_by_ids(node.child_ids): if ignore_child(child, ignored_nodes): child.ignored = True PyomoSolver.inject_nans(child.model) continue PyomoSolver.inject_pyomo_vars_attr( pm, child.model, prefix=f"child_{child.id}" ) for compound in compounds: if ignore_compound(compound, ignored_nodes): compound.ignored = True PyomoSolver.inject_nans(compound.model) continue PyomoSolver.inject_pyomo_vars_attr( pm, compound.model, prefix=f"compound_{compound.id}" )
[docs] @staticmethod def withdraw_pyomo_vars_attr(target: GenericModel): """ Convert Pyomo Var values back into your Var objects. Constants stay constants. """ for key, value in target.__dict__.items(): if isinstance(value, pyo.Var): lb, ub = value.bounds if value.bounds is not None else (None, None) val = pyo.value(value) setattr(target, key, Var(value=val, min=lb, max=ub)) elif isinstance(value, pyo.Expression): setattr(target, key, Intermediate(value=pyo.value(value)))
[docs] @staticmethod def withdraw_pyomo_vars(nodes, branches, compounds, network): for branch in branches: PyomoSolver.withdraw_pyomo_vars_attr(branch.model) for node in nodes: PyomoSolver.withdraw_pyomo_vars_attr(node.model) for child in network.childs_by_ids(node.child_ids): PyomoSolver.withdraw_pyomo_vars_attr(child.model) for compound in compounds: PyomoSolver.withdraw_pyomo_vars_attr(compound.model)
# --------- Constraint helpers --------- @staticmethod def _add_equation(pm, expr): """ GEKKO m.Equation(expr) becomes pm.cons.add(expr). `expr` must be a Pyomo relational expression (==, <=, >=). """ pm.cons.add(expr) @staticmethod def _add_equations(pm, exprs): for e in exprs: pm.cons.add(e) @staticmethod def _process_intermediate_eqs(pm, model_obj, equations): """ GEKKO Intermediate: two cases in your original code: - If attr is not Intermediate: enforce equality constraint - If attr is Intermediate: create an Intermediate and assign it In Pyomo, use Expression for intermediates. """ for intermediate_eq in [eq for eq in equations if type(eq) is IntermediateEq]: attr_val = getattr(model_obj, intermediate_eq.attr) # If the target attribute is not "Intermediate" wrapper, force equality: if type(attr_val) is not Intermediate: PyomoSolver._add_equation(pm, attr_val == intermediate_eq.eq) else: # Create a Pyomo Expression and attach it e = pyo.Expression(expr=intermediate_eq.eq) # Put on pm for uniqueness + easy value extraction name = f"expr__{id(model_obj)}__{intermediate_eq.attr}" setattr(pm, name, e) setattr(model_obj, intermediate_eq.attr, e) # --------- Core solve ---------
[docs] def solve( self, input_network: Network, optimization_problem: OptimizationProblem = None, draw_debug: bool = False, exclude_unconnected_nodes: bool = False, solver_name: str = "gurobi", ): pm = pyo.ConcreteModel() pm.cons = pyo.ConstraintList() pm.obj_exprs = [] network = input_network.copy() if optimization_problem is not None: optimization_problem._apply(network) ignored_nodes = set() if optimization_problem is None or exclude_unconnected_nodes: ignored_nodes = find_ignored_nodes(network) nodes = network.nodes for node in nodes: if ignore_node(node, network, ignored_nodes): continue for child in network.childs_by_ids(node.child_ids): if child.active: child.model.overwrite(node.model) branches = network.branches compounds = network.compounds # inject vars self.inject_pyomo_vars(pm, nodes, branches, compounds, network, ignored_nodes) # init branches self.init_branches(branches) # build constraints & objectives self.process_equations_nodes_childs(pm, network, nodes, ignored_nodes) self.process_equations_branches(pm, network, branches, ignored_nodes) self.process_equations_compounds(pm, network, compounds, ignored_nodes) # OXF components if optimization_problem is not None: self.process_oxf_components(pm, network, optimization_problem) else: self.process_internal_oxf_components(pm, network) # single objective: sum of collected objective expressions pm.obj = pyo.Objective(expr=sum(pm.obj_exprs), sense=pyo.minimize) # solve solver = pyo.SolverFactory(solver_name) for k, v in DEFAULT_SOLVER_OPTIONS.items(): solver.options[k] = v solver.solve(pm, tee=True) # pull values back into your objects self.withdraw_pyomo_vars(nodes, branches, compounds, network) # objective value obj_val = pyo.value(pm.obj) return SolverResult(network, network.as_result_dataframe_dict(), obj_val)
# --------- Your original processing rewritten to Pyomo ---------
[docs] def process_internal_oxf_components(self, pm, network): for constraint in network.constraints: self._add_equation(pm, constraint(network)) obj = None for objective in network.objectives: obj = objective(network) if obj is None else (obj + objective(network)) if obj is not None: pm.obj_exprs.append(obj)
[docs] def process_oxf_components( self, pm, network, optimization_problem: OptimizationProblem ): if optimization_problem.constraints is not None and ( not optimization_problem.constraints.empty ): self._add_equations(pm, optimization_problem.constraints.all(network)) obj = None for objective in optimization_problem.objectives.all(network): obj = objective if obj is None else (obj + objective) if obj is not None: pm.obj_exprs.append(obj)
[docs] def process_equations_compounds(self, pm, network, compounds, ignored_nodes): for compound in compounds: if ignore_compound(compound, ignored_nodes): continue equations = compound.equations(network) if equations is not None: equations = as_iter(equations) self._process_intermediate_eqs(pm, compound, equations) self._add_equations(pm, filter_intermediate_eqs(equations))
[docs] def process_equations_nodes_childs( self, pm, network: Network, nodes, ignored_nodes ): # Pyomo math operators sin_impl = pyo.sin cos_impl = pyo.cos abs_impl = abs sqrt_impl = pyo.sqrt log_impl = pyo.log # IMPORTANT: # GEKKO's if2/if3/max2/sign2/sign3 do NOT map 1:1 to Pyomo. # You must implement these yourself (typically with binaries + big-M or Piecewise), # or change your model.equations() to avoid them for Pyomo runs. def if_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO if2/if3 with a Pyomo Piecewise / big-M formulation." ) def max_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO max2 with Pyomo max_ (or explicit constraints)." ) def sign_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO sign2/sign3 with a Pyomo formulation (often binary)." ) for node in nodes: if ignore_node(node, network, ignored_nodes): continue node_childs = network.childs_by_ids(node.child_ids) grid = node.grid from_branches = [ network.branch_by_id(bid).model for bid in node.from_branch_ids if not ignore_branch(network.branch_by_id(bid), network, ignored_nodes) ] to_branches = [ network.branch_by_id(bid).model for bid in node.to_branch_ids if not ignore_branch(network.branch_by_id(bid), network, ignored_nodes) ] connected_childs = [ child.model for child in node_childs if not ignore_child(child, ignored_nodes) ] equations = as_iter( node.equations( grid, from_branches, to_branches, connected_childs, sin_impl=sin_impl, cos_impl=cos_impl, if_impl=if_impl, abs_impl=abs_impl, max_impl=max_impl, sign_impl=sign_impl, sqrt_impl=sqrt_impl, log_impl=log_impl, ) ) for expr in node.minimize( grid, from_branches, to_branches, connected_childs, sqrt_impl=sqrt_impl ): pm.obj_exprs.append(expr) node_eqs = [eq for eq in equations if type(eq) is not bool or not eq] self._process_intermediate_eqs(pm, node.model, node_eqs) self._add_equations(pm, filter_intermediate_eqs(node_eqs)) for child in node_childs: if ignore_child(child, ignored_nodes): continue for expr in child.minimize(grid, node, sqrt_impl=sqrt_impl): pm.obj_exprs.append(expr) child_eqs = as_iter(child.equations(grid, node)) self._process_intermediate_eqs(pm, child.model, child_eqs) self._add_equations(pm, filter_intermediate_eqs(child_eqs))
[docs] def init_branches(self, branches): for branch in branches: branch.model.init(branch.grid)
[docs] def process_equations_branches(self, pm, network, branches, ignored_nodes): sin_impl = pyo.sin cos_impl = pyo.cos abs_impl = abs sqrt_impl = pyo.sqrt log_impl = pyo.log def if_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO if2/if3 with a Pyomo Piecewise / big-M formulation." ) def max_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO max2 with Pyomo max_ (or explicit constraints)." ) def sign_impl(*args, **kwargs): raise NotImplementedError( "Replace GEKKO sign2/sign3 with a Pyomo formulation (often binary)." ) for branch in branches: if ignore_branch(branch, network, ignored_nodes): continue grid = branch.grid branch_eqs = as_iter( branch.equations( grid, network.node_by_id(branch.from_node_id).model, network.node_by_id(branch.to_node_id).model, sin_impl=sin_impl, cos_impl=cos_impl, if_impl=if_impl, abs_impl=abs_impl, max_impl=max_impl, sign_impl=sign_impl, log_impl=log_impl, sqrt_impl=sqrt_impl, ) ) for expr in branch.minimize( grid, network.node_by_id(branch.from_node_id).model, network.node_by_id(branch.to_node_id).model, sqrt_impl=sqrt_impl, ): pm.obj_exprs.append(expr) self._process_intermediate_eqs(pm, branch.model, branch_eqs) self._add_equations(pm, filter_intermediate_eqs(branch_eqs))