import numpy as np
from autograd import grad as ag_grad
from autograd import jacobian as ag_jacobian
from autograd import hessian as ag_hessian
from typing import Union
from .utilities import check_input
import warnings
[docs]class AbstractFunction:
"""AbstractFunction class. This should be the parent of all specific (objective) functions.
Attributes:
input_shape (tuple): shape of the input of the function
output_shape (tuple): shape of the output of the function
differentiable (bool): True if the function is differentiable
affine (bool): True if the function is affine
quadratic (bool): True if the function is quadratic
"""
# For forcing __r methods to be used when a numpy array comes first
__array_priority__ = 1
input_shape = None
output_shape = None
differentiable = False
affine = False
quadratic = False
def __init__(self):
self._setdiff()
def _expression(self):
return self.__class__.__name__
def _to_cvxpy(self):
pass
def _extend_variable(self, n_var, axis, pos):
pass
def __str__(self):
description = {'expression': self._expression(),
'input shape': self.input_shape,
'output shape': self.output_shape,
'affine': self.is_affine,
'quadratic': self.is_quadratic
}
return str(description)
def __neg__(self):
return NegFunction(self)
def __add__(self, other):
if isinstance(other, AbstractFunction):
return SumFunction(self, other)
elif isinstance(other, (int, float, np.ndarray)):
return ConstantSumFunction(other, self)
else:
raise TypeError
def __radd__(self, other: Union[int, float, np.ndarray]):
if isinstance(other, (int, float, np.ndarray)):
return ConstantSumFunction(other, self)
else:
raise TypeError
def __iadd__(self, other):
if isinstance(other, AbstractFunction):
return SumFunction(self, other)
elif isinstance(other, (int, float, np.ndarray)):
return ConstantSumFunction(other, self)
else:
raise TypeError
def __sub__(self, other):
if isinstance(other, AbstractFunction):
return SumFunction(self, -other)
elif isinstance(other, (int, float, np.ndarray)):
return ConstantSumFunction(-other, self)
else:
raise TypeError
def __rsub__(self, other):
if isinstance(other, AbstractFunction):
return SumFunction(-self, other)
elif isinstance(other, (int, float, np.ndarray)):
return ConstantSumFunction(other, -self)
else:
raise TypeError
def __isub__(self, other):
if isinstance(other, AbstractFunction):
return SumFunction(self, -other)
elif isinstance(other, (int, float, np.ndarray)):
return ConstantSumFunction(-other, self)
else:
raise TypeError
def __mul__(self, other):
if isinstance(other, AbstractFunction):
return MulFunction(self, other)
elif isinstance(other, (int, float)):
return ScalarMulFunction(other, self)
else:
raise TypeError
def __rmul__(self, other: Union[int, float]):
if isinstance(other, (int, float)):
return ScalarMulFunction(other, self)
else:
raise NotImplementedError
def __imul__(self, other):
if isinstance(other, AbstractFunction):
return MulFunction(self, other)
elif isinstance(other, (int, float)):
return ScalarMulFunction(other, self)
else:
raise TypeError
def __truediv__(self, other):
if isinstance(other, AbstractFunction):
from .power import Power
return MulFunction(self, Power(other, -1))
elif isinstance(other, (int, float)):
return ScalarMulFunction(1.0/other, self)
else:
raise TypeError
def __rdiv__(self, other: Union[int, float]):
if isinstance(other, (int, float)):
return ScalarMulFunction(1.0/other, self)
else:
raise NotImplementedError
def __idiv__(self, other):
if isinstance(other, AbstractFunction):
from .power import Power
return MulFunction(self, Power(other, -1))
elif isinstance(other, (int, float)):
return ScalarMulFunction(1.0/other, self)
else:
raise TypeError
def __pow__(self, other):
if isinstance(other, (int, float)):
from .power import Power
return Power(self, float(other))
else:
raise TypeError
def __ipow__(self, other):
if isinstance(other, (int, float)):
from .power import Power
return Power(self, float(other))
else:
raise TypeError
def __matmul__(self, other):
if isinstance(other, AbstractFunction):
return MatMulFunction(self, other)
elif isinstance(other, (np.ndarray)):
if other.shape == self.output_shape:
return ConstantMatMulFunction(other, self)
else:
raise NotImplementedError
else:
raise TypeError
def __rmatmul__(self, other: np.ndarray):
if isinstance(other, (np.ndarray)):
return ConstantMatMulFunction(other, self)
else:
raise TypeError
def __imatmul__(self, other: np.ndarray):
return self.__matmul__(other)
def __to_constraint(self, other, sign):
if not isinstance(other, (AbstractFunction, np.ndarray, float, int)):
raise TypeError
if isinstance(other, AbstractFunction):
if (self.input_shape != other.input_shape) or (self.output_shape != other.output_shape):
raise ValueError("Incompatible shapes")
from ..constraints.constraints import Constraint
return Constraint(self - other, sign=sign)
if isinstance(other, np.ndarray):
if other.shape != self.output_shape:
raise ValueError("Incompatible shapes")
from ..constraints.constraints import Constraint
return Constraint(self - other, sign=sign)
if isinstance(other, (int, float)):
if self.output_shape != (1, 1):
const = other * np.ones(self.output_shape)
else:
const = other
from ..constraints.constraints import Constraint
return Constraint(self - const, sign=sign)
def __eq__(self, other):
return self.__to_constraint(other, "==")
def __le__(self, other):
return self.__to_constraint(other, "<=")
def __ge__(self, other):
return self.__to_constraint(other, ">=")
@property
def is_differentiable(self):
return self.differentiable
@property
def is_affine(self):
return self.affine
@property
def is_quadratic(self):
return self.quadratic
[docs] def get_parameters(self):
raise NotImplementedError
def _setdiff(self):
self._subgradient = ag_grad(self.eval)
self._jacobian = ag_jacobian(self.eval)
self._hessian = ag_hessian(self.eval)
[docs] @check_input
def eval(self, x: np.ndarray) -> np.ndarray:
"""Evaluate the function at a point x
Args:
x: input point
"""
pass
[docs] @check_input
def jacobian(self, x: np.ndarray, **kwargs) -> np.ndarray:
"""Evaluate the jacobian of the the function at a point x
Args:
x: input point
"""
warnings.simplefilter("error", category=RuntimeWarning)
warnings.simplefilter("ignore", category=UserWarning) # TODO: check
try:
jac = np.squeeze(self._jacobian(x))
except RuntimeWarning:
try:
jac = self._alternative_jacobian(x)
except NotImplementedError:
raise NotImplementedError("No jacobian can be computed")
if jac.shape == (): # in case of scalar
jac = jac.reshape(1, 1)
if len(jac.shape) == 1:
jac = jac.reshape(1, -1)
# np.nan_to_num(jac, copy=False)
return jac
@check_input
def _alternative_jacobian(self, x: np.ndarray, **kwargs) -> np.ndarray:
"""Evaluate the jacobian of the the function at a point x
Args:
x: input point
"""
raise NotImplementedError
[docs] @check_input
def hessian(self, x: np.ndarray, **kwargs) -> np.ndarray:
"""Evaluate the hessian of the the function at a point x
Args:
x: input point
"""
warnings.simplefilter("error", category=RuntimeWarning)
warnings.simplefilter("ignore", category=UserWarning) # TODO: check
try:
hess = np.squeeze(self._hessian(x))
except RuntimeWarning:
try:
hess = np.squeeze(self._alternative_hessian(x))
except NotImplementedError:
# TODO risolvere caso in cui autograd non riesce: np.true_divide(inf, inf)
raise NotImplementedError("No hessian can be computed")
if hess.shape == (): # in case of scalar
hess = hess.reshape(1, 1)
# np.nan_to_num(hess, copy=False)
return hess
@check_input
def _alternative_hessian(self, x: np.ndarray, **kwargs) -> np.ndarray:
"""Evaluate the hessian of the the function at a point x
Args:
x: input point
"""
raise NotImplementedError
[docs] @check_input
def subgradient(self, x: np.ndarray, **kwargs) -> np.ndarray:
"""Evaluate the subgradient of the function at a point x
Args:
x: input point
Raises:
ValueError: subgradient is defined only for functions with scalar output
"""
warnings.simplefilter("error", category=RuntimeWarning)
warnings.simplefilter("ignore", category=UserWarning) # TODO: check
if not self.output_shape == (1, 1):
raise ValueError("Undefined subgradient")
else:
try:
subg = self._alternative_subgradient(x).reshape(self.input_shape)
except NotImplementedError:
try:
subg = self._subgradient(x).reshape(self.input_shape)
# subg = self._alternative_subgradient(x).reshape(self.input_shape)
except RuntimeWarning:
raise NotImplementedError("No subgradient can be computed")
# subg = self.jacobian(x).reshape(self.input_shape)
# np.nan_to_num(subg, copy=False)
return subg
@check_input
def _alternative_subgradient(self, x: np.ndarray, **kwargs) -> np.ndarray:
"""Evaluate the hessian of the the function at a point x
Args:
x: input point
"""
return self._alternative_jacobian(x).reshape(self.input_shape)
class ConstantSumFunction(AbstractFunction):
"""Add a constant to a AbstractFunction
Args:
constant (numpy.ndarray): constant to add
fn (AbstractFunction): function to add a constant to
Raises:
ValueError: The shape of the constant must be equal to the output shape of the function
"""
def __init__(self, constant: Union[int, float, np.ndarray], fn: AbstractFunction):
self.constant = constant
if isinstance(constant, np.ndarray):
if fn.output_shape != constant.shape:
raise ValueError("Different shapes")
self.fn = fn
self.input_shape = fn.input_shape
self.output_shape = fn.output_shape
self.differentiable = fn.is_differentiable
if fn.is_affine:
self.affine = True
if fn.is_quadratic:
self.quadratic = True
super().__init__()
def _expression(self):
return str(self.constant) + " + " + self.fn._expression()
@check_input
def eval(self, x):
return self.fn.eval(x) + self.constant
def _to_cvxpy(self):
return self.constant + self.fn._to_cvxpy()
def _extend_variable(self, n_var, axis, pos):
return self.fn._extend_variable(n_var, axis, pos) + self.constant
@check_input
def jacobian(self, x):
return self.fn.jacobian(x)
@check_input
def hessian(self, x):
return self.fn.hessian(x)
@check_input
def subgradient(self, x):
return self.fn.subgradient(x)
class SumFunction(AbstractFunction):
"""Sum two AbstractFunction objects
Args:
f1 (AbstractFunction): first function
f2 (AbstractFunction): second function
Raises:
ValueError: The input/output shapes of the two functions must be the same
"""
def __init__(self, f1: AbstractFunction, f2: AbstractFunction):
if f1.input_shape != f2.input_shape or f1.output_shape != f2.output_shape:
raise ValueError("Different input/output shapes")
self.f1 = f1
self.f2 = f2
self.input_shape = f1.input_shape
self.output_shape = f1.output_shape
if f1.is_differentiable and f2.is_differentiable:
self.differentiable = True
if f1.is_affine and f2.is_affine:
self.affine = True
if (f1.is_quadratic and f2.is_quadratic) or (f1.is_quadratic and f2.is_affine) or (f1.is_affine and f2.is_quadratic):
self.quadratic = True
super().__init__()
def _expression(self):
return self.f1._expression() + " + " + self.f2._expression()
def _to_cvxpy(self):
return self.f1._to_cvxpy() + self.f2._to_cvxpy()
def _extend_variable(self, n_var, axis, pos):
return self.f1._extend_variable(n_var, axis, pos) + self.f2._extend_variable(n_var, axis, pos)
@check_input
def eval(self, x):
return self.f1.eval(x) + self.f2.eval(x)
@check_input
def jacobian(self, x):
return self.f1.jacobian(x) + self.f2.jacobian(x)
@check_input
def hessian(self, x):
return self.f1.hessian(x) + self.f2.hessian(x)
@check_input
def subgradient(self, x):
return self.f1.subgradient(x) + self.f2.subgradient(x)
class NegFunction(AbstractFunction):
"""Multiplies a AbstractFunction by -1
Args:
fn (AbstractFunction): input function function
"""
def __init__(self, fn: AbstractFunction):
self.fn = fn
self.input_shape = fn.input_shape
self.output_shape = fn.output_shape
self.differentiable = fn.is_differentiable
if fn.is_affine:
self.affine = True
if fn.is_quadratic:
self.quadratic = True
super().__init__()
def _expression(self):
return " - " + self.fn._expression()
def _to_cvxpy(self):
return -self.fn._to_cvxpy()
def _extend_variable(self, n_var, axis, pos):
return -self.fn._extend_variable(n_var, axis, pos)
@check_input
def eval(self, x):
return -self.fn.eval(x)
@check_input
def jacobian(self, x):
return -self.fn.jacobian(x)
@check_input
def hessian(self, x):
return -self.fn.hessian(x)
@check_input
def subgradient(self, x):
return -self.fn.subgradient(x)
class MulFunction(AbstractFunction):
def __init__(self, f1: AbstractFunction, f2: AbstractFunction):
"""Multiplies two functions (with scalar outputs)
Args:
f1 (AbstractFunction): first function
f2 (AbstractFunction): second function
Raises:
ValueError: input functions must have the same input shapes
ValueError: at least the first input function must have scalar output
NotImplementedError: only differentiable input functions are currently supported
"""
if f1.input_shape != f2.input_shape:
raise ValueError("Different input shapes")
if not(f1.output_shape == (1, 1)):
raise ValueError(
"At least the first input function must have scalar output")
self.f1 = f1
self.f2 = f2
self.input_shape = f1.input_shape
self.output_shape = f2.output_shape
if f1.is_differentiable and f2.is_differentiable:
self.differentiable = True
super().__init__()
def _expression(self):
return self.f1._expression() + " * " + self.f2._expression()
@check_input
def eval(self, x):
return self.f1.eval(x) * self.f2.eval(x)
def _extend_variable(self, n_var, axis, pos):
return self.f1._extend_variable(n_var, axis, pos) * self.f2._extend_variable(n_var, axis, pos)
def _to_cvxpy(self):
return self.f1._to_cvxpy() * self.f2._to_cvxpy()
class ScalarMulFunction(AbstractFunction):
def __init__(self, scalar: Union[int, float], fn: AbstractFunction):
"""Multiply a function by a scalar
Args:
scalar (float or int): scalar
fn (AbstractFunction): input function
Raises:
ValueError: the scalar must be a float (or an int)
"""
if not isinstance(scalar, (int, float)):
raise ValueError(
"Multiplication by scalars requires the scalar to be float or int")
self.scalar = scalar
self.fn = fn
self.input_shape = fn.input_shape
self.output_shape = fn.output_shape
if fn.is_differentiable:
self.differentiable = True
if fn.is_affine:
self.affine = True
if fn.is_quadratic:
self.quadratic = True
super().__init__()
def _expression(self):
return str(self.scalar) + " * " + self.fn._expression()
def _to_cvxpy(self):
return self.scalar * self.fn._to_cvxpy()
@check_input
def eval(self, x):
return self.scalar * self.fn.eval(x)
def _extend_variable(self, n_var, axis, pos):
return self.scalar * self.fn._extend_variable(n_var, axis, pos)
@check_input
def jacobian(self, x):
return self.scalar * self.fn.jacobian(x)
@check_input
def hessian(self, x):
return self.scalar * self.fn.hessian(x)
@check_input
def subgradient(self, x):
return self.scalar * self.fn.subgradient(x)
class MatMulFunction(AbstractFunction):
def __init__(self, f1: AbstractFunction, f2: AbstractFunction):
"""Dot product of two function
:math:`f(x) = \\langle f_1(x), f_2(x)\\rangle=\\f_1(x)^\\top f_2(x)
with :math:`f_1,f_2:\\mathbb{R}^m\\to\\mathbb{R}^n`
Args:
f1 (AbstractFunction): first function (output shape (n, 1))
f2 (AbstractFunction): second function (output shape (n, 1))
Raises:
ValueError: input functions must have the same input shapes
NotImplementedError: only differentiable input functions are currently supported
"""
if not (f1.input_shape == f2.input_shape and f1.output_shape == f2.output_shape):
raise ValueError("Different input/output shapes")
self.f1 = f1
self.f2 = f2
self.input_shape = f1.input_shape
self.output_shape = (1, 1)
if f1.output_shape[1] != 1:
raise ValueError("Unsupported input functions output shape. Must be (n, 1)")
if f1.is_differentiable and f2.is_differentiable:
self.differentiable = True
super().__init__()
def _expression(self):
return self.f1._expression() + " @ " + self.f2._expression()
@check_input
def eval(self, x):
return self.f1.eval(x).transpose() @ self.f2.eval(x)
def _extend_variable(self, n_var, axis, pos):
return MatMulFunction(self.f1._extend_variable(n_var, axis, pos), self.f2._extend_variable(n_var, axis, pos))
def _to_cvxpy(self):
return self.f1._to_cvxpy().transpose() @ self.f2._to_cvxpy()
class ConstantMatMulFunction(AbstractFunction):
def __init__(self, constant: np.ndarray, fn: AbstractFunction):
"""Matrix/Dot product of a matrix/vector and a function
:math:`f(x) = \\langle A, g(x)\\rangle=\\A^\\top g(x)
with :math:`A:\\mathbb{R}^{n\\times m}` and :math:`g:\\mathbb{R}^d\\to\\mathbb{R}^n`
Args:
constant (numpy.ndarray): constant vector (shape (n, m))
fn (AbstractFunction): function (output shape (n, 1))
Raises:
ValueError: input functions must have the same input shapes
NotImplementedError: only differentiable input functions are currently supported
"""
if not isinstance(constant, (np.ndarray)):
raise ValueError(
"Multiplication by constant vector requires the vector to be a numpy.ndarray")
if fn.output_shape[0] != constant.shape[0]:
raise ValueError(
"Incompatible dimensions: function ouput is {} and given matrix is {}".format(
fn.output_shape, constant.shape))
self.constant = constant
self.fn = fn
self.input_shape = fn.input_shape
self.output_shape = (self.constant.shape[1], self.fn.output_shape[1])
if fn.output_shape[1] != 1:
raise ValueError("Unsupported function output shape. Must be (n, 1)")
if fn.is_differentiable:
self.differentiable = True
if fn.is_affine:
self.affine = True
super().__init__()
def _expression(self):
return str(self.constant()) + " @ " + self.fn._expression()
@check_input
def eval(self, x):
return self.constant.transpose() @ self.fn.eval(x)
def _extend_variable(self, n_var, axis, pos):
return ConstantMatMulFunction(self.constant, self.fn._extend_variable(n_var, axis, pos))
def _to_cvxpy(self):
return self.constant.transpose() @ self.fn._to_cvxpy()