"""Pyomo infeasibility diagnostics. Translates Pyomo var names like
``child_3__p_mw`` back to monee component descriptions and wraps the
Pyomo analysis utilities."""
import contextlib
import io
import logging
import re
from dataclasses import dataclass, field
import pyomo.environ as pyo
_log = logging.getLogger(__name__)
# Pyomo var name: {cat}_{id}__{attr} or {cat}_{id}_t{period}__{attr}.
_VAR_NAME_RE = re.compile(
r"^(?P<cat>\w+?)_(?P<id>\d+)(?:_t(?P<t>\d+))?__(?P<attr>\w+)$"
)
def _parse_var_name(name: str) -> dict | None:
"""Return ``{cat, id, t|None, attr}`` or None if *name* doesn't match."""
m = _VAR_NAME_RE.match(name)
if m is None:
return None
return {
"cat": m.group("cat"),
"id": int(m.group("id")),
"t": int(m.group("t")) if m.group("t") else None,
"attr": m.group("attr"),
}
def _var_display_name(name: str) -> str:
"""Pyomo name → ``cat[id].attr`` (with ``(t=...)`` for multi-period)."""
parsed = _parse_var_name(name)
if parsed is None:
return name
label = f"{parsed['cat']}[{parsed['id']}].{parsed['attr']}"
if parsed["t"] is not None:
label += f" (t={parsed['t']})"
return label
[docs]
@dataclass
class ConstraintResidual:
"""A constraint that is violated or close to violation."""
index: int | str
body_value: float
lower: float | None
upper: float | None
residual: float
expression: str | None = None
[docs]
def collect_constraint_residuals( # NOSONAR
pm: pyo.ConcreteModel, tol: float = 1e-4
) -> list[ConstraintResidual]:
"""List constraints whose residual exceeds *tol*, sorted by magnitude."""
violated = []
for idx in pm.cons:
con = pm.cons[idx]
if con.body is None:
continue
try:
body_val = pyo.value(con.body, exception=False)
except Exception:
continue
if body_val is None:
continue
lb = pyo.value(con.lower) if con.lower is not None else None
ub = pyo.value(con.upper) if con.upper is not None else None
residual = 0.0
if lb is not None and body_val < lb - tol:
residual = lb - body_val
if ub is not None and body_val > ub + tol:
residual = max(residual, body_val - ub)
if lb is not None and ub is not None and lb == ub:
residual = abs(body_val - lb)
if residual > tol:
try:
expr_str = str(con.expr)
except Exception:
expr_str = None
violated.append(
ConstraintResidual(
index=idx,
body_value=body_val,
lower=lb,
upper=ub,
residual=residual,
expression=expr_str,
)
)
violated.sort(key=lambda r: r.residual, reverse=True)
return violated
[docs]
@dataclass
class BoundViolation:
"""A variable violating or sitting on its bounds."""
name: str
display_name: str
value: float
lower: float | None
upper: float | None
violation: float
[docs]
def collect_variable_bound_violations(
pm: pyo.ConcreteModel, tol: float = 1e-4
) -> list[BoundViolation]:
"""Return variables whose current value violates their bounds."""
violations = []
for var in pm.component_objects(pyo.Var, active=True):
name = var.name
try:
val = pyo.value(var, exception=False)
except Exception:
continue
if val is None:
continue
lb = var.lb
ub = var.ub
viol = 0.0
if lb is not None and val < lb - tol:
viol = lb - val
if ub is not None and val > ub + tol:
viol = max(viol, val - ub)
if viol > tol:
violations.append(
BoundViolation(
name=name,
display_name=_var_display_name(name),
value=val,
lower=lb,
upper=ub,
violation=viol,
)
)
violations.sort(key=lambda v: v.violation, reverse=True)
return violations
[docs]
def collect_variables_at_bounds(pm: pyo.ConcreteModel, tol: float = 1e-4) -> list[dict]:
"""List variables whose value sits within *tol* of a bound (active bounds)."""
at_bounds = []
for var in pm.component_objects(pyo.Var, active=True):
name = var.name
try:
val = pyo.value(var, exception=False)
except Exception:
continue
if val is None:
continue
lb = var.lb
ub = var.ub
at_lb = lb is not None and abs(val - lb) < tol
at_ub = ub is not None and abs(val - ub) < tol
if at_lb or at_ub:
at_bounds.append(
{
"name": name,
"display_name": _var_display_name(name),
"value": val,
"lower": lb,
"upper": ub,
"at_lower": at_lb,
"at_upper": at_ub,
}
)
return at_bounds
# Per-solver option key for a wall-clock cap on each internal solve in
# ``compute_infeasibility_explanation`` (it iterates several solves).
_MIS_TIME_LIMIT_OPTION = {
"gurobi": "TimeLimit",
"cplex": "timelimit",
"scip": "limits/time",
"appsi_highs": "time_limit",
"cbc": "seconds",
}
# Cap on every internal MIS solve. The MIS routine elastically relaxes
# the LP (≈3× model size) and runs a fresh MILP for each candidate; on
# pathological cases (large slack envelope + 100%-gap-from-zero) those
# never converge. 10 s is enough to return a useful MIS on small cases
# and to time out gracefully on hard ones.
_MIS_TIME_LIMIT_S: float = 10.0
[docs]
def compute_mis(pm: pyo.ConcreteModel, solver_name: str = "scip") -> list[str]:
"""Names/indices of constraints/bounds forming a Minimal Infeasible Subsystem.
Needs SCIP/Gurobi/CPLEX. Each internal solve is capped at
``_MIS_TIME_LIMIT_S`` so a hard MIS computation can't stall the caller."""
from pyomo.contrib.iis import compute_infeasibility_explanation
log_capture = io.StringIO()
handler = logging.StreamHandler(log_capture)
handler.setLevel(logging.INFO)
iis_logger = logging.getLogger("pyomo.contrib.iis")
original_level = iis_logger.level
iis_logger.setLevel(logging.INFO)
iis_logger.addHandler(handler)
solver = pyo.SolverFactory(solver_name)
time_limit_key = _MIS_TIME_LIMIT_OPTION.get(solver_name)
if time_limit_key is not None:
solver.options[time_limit_key] = _MIS_TIME_LIMIT_S
try:
with contextlib.redirect_stderr(io.StringIO()):
compute_infeasibility_explanation(pm, solver=solver)
except Exception as e:
_log.warning("MIS computation failed: %s", e)
return []
finally:
iis_logger.removeHandler(handler)
iis_logger.setLevel(original_level)
output = log_capture.getvalue()
mis_constraints = []
in_mis_section = False
for line in output.splitlines():
if "Constraints / bounds in MIS:" in line:
in_mis_section = True
continue
if "Constraints / bounds in guards" in line:
in_mis_section = False
continue
if in_mis_section and line.strip().startswith("constraint:"):
mis_constraints.append(line.strip())
if in_mis_section and line.strip().startswith("variable bound:"):
mis_constraints.append(line.strip())
return mis_constraints
[docs]
@dataclass
class InfeasibilityReport:
"""Structured infeasibility report."""
constraint_residuals: list[ConstraintResidual] = field(default_factory=list)
bound_violations: list[BoundViolation] = field(default_factory=list)
variables_at_bounds: list[dict] = field(default_factory=list)
mis_constraints: list[str] = field(default_factory=list)
[docs]
def summary(self, max_items: int = 10) -> str: # NOSONAR
lines = []
if self.constraint_residuals:
lines.append(
f"=== Violated constraints ({len(self.constraint_residuals)} total) ==="
)
for r in self.constraint_residuals[:max_items]:
bound_str = ""
if r.lower is not None and r.upper is not None and r.lower == r.upper:
bound_str = f"== {r.lower}"
elif r.lower is not None and r.upper is not None:
bound_str = f"in [{r.lower}, {r.upper}]"
elif r.lower is not None:
bound_str = f">= {r.lower}"
elif r.upper is not None:
bound_str = f"<= {r.upper}"
lines.append(
f" cons[{r.index}]: body={r.body_value:.6g} {bound_str} "
f"(residual={r.residual:.4g})"
)
if len(self.constraint_residuals) > max_items:
lines.append(
f" ... and {len(self.constraint_residuals) - max_items} more"
)
else:
lines.append("=== No violated constraints ===")
lines.append("")
if self.bound_violations:
lines.append(
f"=== Variable bound violations ({len(self.bound_violations)} total) ==="
)
for v in self.bound_violations[:max_items]:
lines.append(
f" {v.display_name}: value={v.value:.6g} "
f"bounds=[{v.lower}, {v.upper}] "
f"(violation={v.violation:.4g})"
)
if len(self.bound_violations) > max_items:
lines.append(f" ... and {len(self.bound_violations) - max_items} more")
else:
lines.append("=== No variable bound violations ===")
if self.variables_at_bounds:
lines.append("")
n_at_lb = sum(1 for v in self.variables_at_bounds if v["at_lower"])
n_at_ub = sum(1 for v in self.variables_at_bounds if v["at_upper"])
lines.append(
f"=== Variables at bounds: {n_at_lb} at lower, {n_at_ub} at upper ==="
)
for v in self.variables_at_bounds[:max_items]:
if v["at_lower"] and not v["at_upper"]:
which = "lower"
elif v["at_upper"] and not v["at_lower"]:
which = "upper"
else:
which = "both"
lines.append(
f" {v['display_name']}: value={v['value']:.6g} "
f"bounds=[{v['lower']}, {v['upper']}] (at {which})"
)
if len(self.variables_at_bounds) > max_items:
lines.append(
f" ... and {len(self.variables_at_bounds) - max_items} more"
)
if self.mis_constraints:
lines.append("")
lines.append(
f"=== Minimal Infeasible Subsystem ({len(self.mis_constraints)} elements) ==="
)
for c in self.mis_constraints:
lines.append(f" {c}")
return "\n".join(lines)
def __str__(self):
return self.summary()
def __repr__(self):
return (
f"InfeasibilityReport("
f"constraints={len(self.constraint_residuals)}, "
f"bound_violations={len(self.bound_violations)}, "
f"at_bounds={len(self.variables_at_bounds)}, "
f"mis={len(self.mis_constraints)})"
)
[docs]
def diagnose_infeasibility(
pm: pyo.ConcreteModel,
solver_name: str = "scip",
tol: float = 1e-4,
compute_mis_flag: bool = True,
) -> InfeasibilityReport:
"""Build an :class:`InfeasibilityReport`. ``compute_mis_flag`` triggers the
Minimal Infeasible Subsystem analysis (slow)."""
report = InfeasibilityReport(
constraint_residuals=collect_constraint_residuals(pm, tol=tol),
bound_violations=collect_variable_bound_violations(pm, tol=tol),
variables_at_bounds=collect_variables_at_bounds(pm, tol=tol),
)
if compute_mis_flag:
try:
report.mis_constraints = compute_mis(pm, solver_name=solver_name)
except Exception as e:
_log.warning("MIS computation failed: %s", e)
return report