Source code for disropt.problems.quadratic_problem

import numpy as np
import scipy.sparse as sp
import warnings
from .problem import Problem
from ..functions import AffineForm, QuadraticForm
from ..functions.affine_form import aggregate_affine_form
from ..constraints import Constraint
from ..utils.utilities import check_symmetric, is_semi_pos_def
from .utilities import check_affine_constraints


[docs]class QuadraticProblem(Problem): """Solve a Quadratic programming problem defined as: .. math:: \\text{minimize } & x^\\top P x + q^\\top x + r \\text{subject to } & G x \\leq h & A x = b Quadratic problems are currently solved by using CVXOPT or OSQP https://osqp.org. Args: objective_function (QuadraticForm): objective function constraints (list, optional): list of constraints. Defaults to None. is_pos_def (bool): True if P is (semi)positive definite. Defaults to True. """ def __new__(cls, *args, **kwargs): instance = object.__new__(cls) return instance def __init__(self, objective_function: QuadraticForm, constraints: list = None, is_pos_def: bool = True, **kwargs): self.objective_function = None self.constraints = [] self.input_shape = None self.output_shape = None self.is_pos_def = is_pos_def self.set_objective_function(objective_function) if not check_affine_constraints(constraints): raise ValueError("Constraints must be affine") self.aggregated_constraints = [] if constraints is not None: self.set_constraints(constraints) # TODO: check init is done twice if instantiated via Problem
[docs] def set_objective_function(self, objective_function: QuadraticForm): """set the objective function Args: objective_function (QuadraticForm): objective function Raises: TypeError: Objective function must be a QuadraticForm """ if not isinstance(objective_function, QuadraticForm): raise TypeError("Objective function must be a QuadraticForm") P, _, _ = objective_function.get_parameters() if not self.is_pos_def: if not is_semi_pos_def(P): warnings.warn("Objective function is not convex.") self.objective_function = objective_function self.input_shape = objective_function.input_shape self.output_shape = objective_function.output_shape
[docs] def set_constraints(self, constraints): """Set the constraints Args: constraints (list): list of constraints Raises: TypeError: a list of affine Constraints must be provided """ if not isinstance(constraints, list): constraints = [constraints] for constraint in constraints: super().add_constraint(constraint) self.constraints_map = {} equality_idx_counter = 0 inequality_idx_counter = 0 equalities = [] inequalities = [] for idx, constraint in enumerate(self.constraints): if constraint.is_affine: if constraint.is_equality: equalities.append(constraint.function) self.constraints_map[idx] = {'type': 'affine_equality', 'indices': list( range(equality_idx_counter, equality_idx_counter+constraint.output_shape[0]))} equality_idx_counter += constraint.output_shape[0] elif constraint.is_inequality: inequalities.append(constraint.function) self.constraints_map[idx] = {'type': 'affine_inequality', 'indices': list( range(inequality_idx_counter, inequality_idx_counter+constraint.output_shape[0]))} inequality_idx_counter += constraint.output_shape[0] else: raise TypeError("a list of affine Constraints must be provided") for key, item in self.constraints_map.items(): if item['type'] == 'affine_inequality': item['indices'] = list(np.array(item['indices']) + equality_idx_counter) if len(equalities) != 0: aggregated_equality = aggregate_affine_form(equalities) self.aggregated_constraints.append(aggregated_equality == 0) if len(inequalities) != 0: aggregated_inequality = aggregate_affine_form(inequalities) self.aggregated_constraints.append(aggregated_inequality <= 0)
# def add_constraint(self, constraint): # """A a new (affine) constraint # Args: # constraint (Constraint): affine constraint # Raises: # TypeError: an affine Constraints must be provided # """ # if not isinstance(constraint, Constraint): # raise TypeError("an affine Constraints must be provided") # if constraint.is_affine: # if constraint.is_equality: # if len(self.constraints) == 0: # self.constraints.append(constraint) # else: # equality_found = False # for const in self.constraints: # if const.is_equality: # equality_found = True # aggregated_equality = aggregate_affine_form([const.function, constraint.function]) # self.constraints.remove(const) # self.constraints.append(aggregated_equality == 0) # if not equality_found: # self.constraints.append(constraint) # elif constraint.is_inequality: # if len(self.constraints) == 0: # self.constraints.append(constraint) # else: # inequality_found = False # for const in self.constraints: # if const.is_inequality: # inequality_found = True # aggregated_inequality = aggregate_affine_form([const.function, constraint.function]) # self.constraints.remove(const) # self.constraints.append(aggregated_inequality <= 0) # if not inequality_found: # self.constraints.append(constraint) # else: # raise TypeError("an affine Constraints must be provided") def __get_osqp_objective_parameters(self): """return the objective parameter fo osqp solver """ P, q, _ = self.objective_function.get_parameters() if isinstance(P, np.ndarray): if not check_symmetric(P): raise ValueError("Matrix P must be symmetric.") # if not is_semi_pos_def(P): # raise ValueError("Nonconvex objective function.") P = sp.csc_matrix(2*P) return P, q.flatten() def __get_osqp_constraints_parameters(self): """return the constraints parameter fo osqp solver """ A = None b = None G = None h = None if len(self.aggregated_constraints) != 0: for const in self.aggregated_constraints: if const.is_equality: A, b = const.get_parameters() if const.is_inequality: G, h = const.get_parameters() if A is not None: A_qp = A l_qp = -b u_qp = -b if G is not None: A_qp = np.hstack([A, G]) l_qp = np.vstack([-b, -np.inf * np.ones((len(h), 1))]) u_qp = np.vstack([-b, -h]) else: A_qp = G l_qp = -np.inf * np.ones(len(h)) u_qp = -h A_osqp = sp.csc_matrix(A_qp.transpose()) l_osqp = l_qp.flatten() u_osqp = u_qp.flatten() return A_osqp, l_osqp, u_osqp else: return None, None, None def __solve_osqp(self, initial_value: np.ndarray = None, return_only_solution: bool = True): """return the problem through osqp """ try: from osqp import OSQP except ImportError: raise ImportError("OSQP is not installed") P, q = self.__get_osqp_objective_parameters() A, l, u = self.__get_osqp_constraints_parameters() osqp_pb = OSQP() osqp_pb.setup(P=P, q=q, A=A, l=l, u=u, verbose=False) osqp_pb.update_settings(eps_abs=1e-7, eps_rel=1e-7, max_iter=10000) if initial_value is not None: osqp_pb.warm_start(x=initial_value) res = osqp_pb.solve() if res.info.status != 'solved': warnings.warn("OSQP exited with status {}".format(res.info.status)) solution = res.x.reshape(self.input_shape) if not return_only_solution: dual_variables = {} for idx, _ in enumerate(self.constraints): dual_variables[idx] = res.y[self.constraints_map[idx]['indices']].reshape(-1, 1) output_dict = { 'solution': solution, 'status': res.info.status, 'dual_variables': dual_variables, } return output_dict else: return solution def __get_cvxopt_objective_parameters(self): """return the objective parameter fo osqp solver """ P, q, _ = self.objective_function.get_parameters() if isinstance(P, np.ndarray): if not is_semi_pos_def(P): raise ValueError("Nonconvex objective function.") return 2*P, q def __get_cvxopt_constraints_parameters(self): """return the constraints parameter fo osqp solver """ A = None b = None G = None h = None if len(self.aggregated_constraints) != 0: for const in self.aggregated_constraints: if const.is_equality: A, b = const.get_parameters() A = A.transpose() b = -b if const.is_inequality: G, h = const.get_parameters() G = G.transpose() h = -h return A, b, G, h def __solve_cvxopt(self, initial_value: np.ndarray = None, return_only_solution: bool = True): """return the problem through osqp """ try: import cvxopt except ImportError: raise ImportError("CVXOPT is required") cvxopt.solvers.options['show_progress'] = False cvxopt.solvers.options['maxiters'] = 10000 P, q = self.__get_cvxopt_objective_parameters() A, b, G, h = self.__get_cvxopt_constraints_parameters() if initial_value is not None: initvals = {'x': initial_value} else: initvals = None if A is not None: A = cvxopt.matrix(A) b = cvxopt.matrix(b) if G is not None: G = cvxopt.matrix(G) h = cvxopt.matrix(h) sol = cvxopt.solvers.qp(P=cvxopt.matrix(P), q=cvxopt.matrix(q), G=G, h=h, A=A, b=b, initvals=initvals) solution = np.array(sol['x']).reshape(self.input_shape) if not return_only_solution: dual_variables = {} for idx, constr in enumerate(self.constraints): if constr.is_inequality: dual_variables[idx] = np.array(sol['z'])[self.constraints_map[idx]['indices']] elif constr.is_equality: dual_variables[idx] = np.array(sol['y'])[self.constraints_map[idx]['indices']] if sol['status'] == 'optimal': status = 'solved' else: status = sol['status'] output_dict = { 'solution': solution, 'status': status, 'dual_variables': dual_variables, } return output_dict else: return solution
[docs] def solve(self, initial_value: np.ndarray = None, solver='osqp', return_only_solution: bool = True): """Solve the problem Args: initial_value (numpy.ndarray), optional): Initial value for warm start. Defaults to None. solver (str, optional): Solver to use ('osqp' or 'cvxopt'). Defaults to 'osqp'. Raises: ValueError: Unsupported solver, only 'osqp' and 'cvxopt' are currently supported Returns: numpy.ndarray: solution """ P, _, _ = self.objective_function.get_parameters()#self.__get_cvxopt_objective_parameters() if solver == 'osqp': if not check_symmetric(P): solver = 'cvxopt' warnings.warn("OSQP solver requires the matrix of the quadratic function to be symmetric. Solving with cvxopt.") if solver == 'osqp': return self.__solve_osqp(initial_value=initial_value, return_only_solution=return_only_solution) elif solver == 'cvxopt': try: return self.__solve_cvxopt(initial_value=initial_value, return_only_solution=return_only_solution) except: return super().solve(return_only_solution=return_only_solution) else: raise ValueError("Unsupported solver")