Source code for disropt.algorithms.primal_decomp

import numpy as np
from typing import Union, Callable
from threading import Event
from ..agents.agent import Agent
from ..problems.problem import Problem
from ..problems.constraint_coupled_problem import ConstraintCoupledProblem
from .algorithm import Algorithm
from ..functions import Variable
from ..functions import ExtendedFunction
from ..constraints import ExtendedConstraint


[docs]class PrimalDecomposition(Algorithm): """Distributed primal decomposition. From the perspective of agent :math:`i` the algorithm works as follows. Initialization: :math:`y_i^0` such that :math:`\sum_{i=1}^N y_i^0 = 0` For :math:`k=0,1,\\dots` * Compute :math:`((x_i^k, \\rho_i^k), \mu_i^k)` as a primal-dual optimal solution of .. math:: :nowrap: \\begin{split} \min_{x_i, \\rho_i} \hspace{1.1cm} & \: f_i(x_i) + M \\rho_i \\\\ \mathrm{subj. to} \: \mu_i: \: & \: g_i(x_i) \le y_i^k + \\rho_i \\boldsymbol{1} \\\\ & \: x_i \in X_i, \\rho_i \ge 0 \\end{split} * Gather :math:`\mu_j^k` from :math:`j \in \mathcal{N}_i` and update .. math:: y_i^{k+1} = y_i^{k} + \\alpha^k \sum_{j \in \mathcal{N}_i} (\mu_i^k - \mu_j^k) where :math:`x_i\\in\\mathbb{R}^{n_i}`, :math:`\mu_i,y_i\\in\\mathbb{R}^S`, :math:`X_i\\subseteq\mathbb{R}^{n_i}` for all :math:`i` and :math:`\\alpha^k` is a positive stepsize. The algorithm has been presented in ????. """ # TODO choose ref def __init__(self, agent: Agent, initial_condition: np.ndarray, enable_log: bool = False): super(PrimalDecomposition, self).__init__(agent, enable_log) if not isinstance(agent.problem, ConstraintCoupledProblem): raise TypeError("The agent must be equipped with a ConstraintCoupledProblem") if sum(1 for i in agent.problem.objective_function.input_shape if i > 1) > 1: raise ValueError("Currently only mono-dimensional objective functions are supported") if sum(1 for i in initial_condition.shape if i > 1) > 1: raise ValueError("Currently only mono-dimensional outputs for coupling functions are supported") # shape of local variable and coupling constraints self.x_shape = agent.problem.objective_function.input_shape self.S = initial_condition.size # TODO extend to non mono-dimensional case # initialize allocation, primal solution and cost self.y0 = np.copy(initial_condition) self.y = np.copy(initial_condition) self.y_avg = np.copy(initial_condition) self.x = None self.J = None # extended versions of objective function, coupling function and constraints (+ 1 variable for rho) self.objective_function = ExtendedFunction(agent.problem.objective_function) self.coupling_function = ExtendedFunction(agent.problem.coupling_function) self.local_constraints = ExtendedConstraint(agent.problem.constraints)
[docs] def run(self, iterations: int = 1000, stepsize: Union[float, Callable] = 0.1, M: float = 1000.0, verbose: bool=False, callback_iter: Callable=None, compute_runavg: bool=False, runavg_start_iter: int=0, event: Event=None, **kwargs) -> np.ndarray: """Run the algorithm for a given number of iterations Args: iterations: Number of iterations. Defaults to 1000. stepsize: If a float is given as input, the stepsize is constant. If a function is given, it must take an iteration k as input and output the corresponding stepsize.. Defaults to 0.1. M: Value of the parameter :math:`M`. Defaults to 1000. verbose: If True print some information during the evolution of the algorithm. Defaults to False. callback_iter: callback function to be called at the end of each iteration. Must take an iteration k as input. Defaults to None. compute_runavg: whether or not to compute also running average of allocation. Defaults to False. runavg_start_iter: specifies when to start computing running average (applies only if compute_runavg = True). Defaults to 0. Raises: TypeError: The number of iterations must be an int TypeError: The stepsize must be a float or a callable TypeError: The parameter M must be a float Returns: return a tuple (x, y, J) with the sequence of primal solutionsm allocation estimates and cost if enable_log=True. If compute_runavg=True, then return (x, y, y_avg, J) """ if not isinstance(iterations, int): raise TypeError("The number of iterations must be an int") if not (isinstance(stepsize, float) or callable(stepsize)): raise TypeError("The stepsize must be a float or a function") if not isinstance(M, float): raise TypeError("The parameter M must be a float") if callback_iter is not None and not callable(callback_iter): raise TypeError("The callback function must be a Callable") if runavg_start_iter < 0: raise ValueError("The parameter runavg_start_iter must not be negative") if self.enable_log: # initialize sequence of costs x_dims = [iterations] self.J_sequence = np.zeros(x_dims) # initialize sequence of x for dim in self.x_shape: x_dims.append(dim) self.x_sequence = np.zeros(x_dims) # initialize sequence of y y_dims = [iterations] for dim in self.y.shape: y_dims.append(dim) self.y_sequence = np.zeros(y_dims) # initialize sequence of averaged y if compute_runavg: self.y_avg_sequence = np.zeros(y_dims) # initialize cumulative sum of stepsize if needed if compute_runavg: self.stepsize_sum = 0 last_iter = np.copy(iterations) for k in range(iterations): # store current allocation if self.enable_log: self.y_sequence[k] = self.y if compute_runavg: self.y_avg_sequence[k] = self.y_avg # compute stepsize if not isinstance(stepsize, float): step = stepsize(k) else: step = stepsize # determine whether or not running average must be updated update_runavg = compute_runavg and k >= runavg_start_iter # perform iteration self.iterate_run(stepsize=step, M=M, update_runavg=update_runavg, event=event, **kwargs) # store primal solution and cost if self.enable_log: self.x_sequence[k] = self.x self.J_sequence[k] = self.J # check if we must stop for external event if event is not None and event.is_set(): last_iter = k break # perform external callback (if any) if callback_iter is not None: callback_iter(k) if verbose: if self.agent.id == 0: print('Iteration {}'.format(k), end="\r") if self.enable_log and compute_runavg: return (self.x_sequence.take(np.arange(0,last_iter), axis=0), self.y_sequence.take(np.arange(0,last_iter), axis=0), self.y_avg_sequence.take(np.arange(0,last_iter), axis=0), self.J_sequence[0:last_iter]) elif self.enable_log: return (self.x_sequence.take(np.arange(0,last_iter), axis=0), self.y_sequence.take(np.arange(0,last_iter), axis=0), self.J_sequence[0:last_iter])
def _update_local_solution(self, x: np.ndarray, mu: np.ndarray, mu_neigh: list, stepsize: float, update_runavg, **kwargs): """Update the local solution Args: x: current primal solution mu: current dual solution mu_neigh: dual solutions of neighbors stepsize: step-size for update """ y_new = self.y for mu_j in mu_neigh: y_new += stepsize * (mu - mu_j) self.x = x self.J = np.asscalar(self.agent.problem.objective_function.eval(x)) self.y = y_new if update_runavg: self.stepsize_sum += stepsize self.y_avg += stepsize * (self.y - self.y_avg) / self.stepsize_sum
[docs] def iterate_run(self, stepsize: float, M: float, update_runavg, event: Event, **kwargs): """Run a single iterate of the algorithm """ # TODO extend this function to non mono-dimensional case # build local problem z = Variable(self.x_shape[0] + 1) A = np.hstack((np.zeros((self.S, self.x_shape[0])), np.ones((self.S,1)))).transpose() A_rho = np.hstack((np.zeros((1, self.x_shape[0])), [[1]])).transpose() rho = A_rho @ z alloc_constr = self.coupling_function <= self.y + A @ z rho_constr = rho >= 0 constraints = [alloc_constr, rho_constr] constraints.extend(self.local_constraints) obj_function = self.objective_function + M * rho pb = Problem(obj_function, constraints) # solve problem and save data out = pb.solve(return_only_solution=False) if out['status'] != 'solved': raise ValueError("The local problem could not be solved") x = out['solution'][:-1] # rho = out['solution'][-1] mu = out['dual_variables'][0] # exchange dual variables with neighbors data = self.agent.neighbors_exchange(mu, event=event) mu_neigh = [data[idx] for idx in data] if event is None or not event.is_set(): self._update_local_solution(x, mu, mu_neigh, stepsize, update_runavg, **kwargs)
[docs] def get_result(self, return_runavg:bool = False): """Return the current value of primal solution, allocation and cost Args: return_runavg: whether or not to return also running average of allocation. Defaults to False. Returns: tuple (primal, allocation, cost) of numpy.ndarray: value of primal solution, allocation, cost (if return_runavg = False) tuple (primal, allocation, allocation_avg cost) if return_runavg = True """ if return_runavg: return (self.x, self.y, self.y_avg, self.J) else: return (self.x, self.y, self.J)