Source code for disropt.algorithms.dual_decomp

import numpy as np
from typing import Union, Callable
from copy import deepcopy
from ..agents.agent import Agent
from ..problems.problem import Problem
from .algorithm import Algorithm
from ..functions import Variable


[docs]class DualDecomposition(Algorithm): """Distributed dual decomposition. From the perspective of agent :math:`i` the algorithm works as follows. Initialization: :math:`\lambda_{ij}^0` for all :math:`j \in \mathcal{N}_i` For :math:`k=0,1,\\dots` * Gather :math:`\lambda_{ji}^k` from neighbors :math:`j \in \mathcal{N}_i` * Compute :math:`x_i^k` as an optimal solution of .. math:: \min_{x_i \in X_i} \: f_i(x_i) + x_i^\\top \sum_{j \in \mathcal{N}_i} (\lambda_{ij}^k - \lambda_{ji}^k) * Gather :math:`x_j^k` from neighbors :math:`j \in \mathcal{N}_i` * Update for all :math:`j \in \mathcal{N}_i` .. math:: \lambda_{ij}^{k+1} = \lambda_{ij}^{k} + \\alpha^k (x_i^k - x_j^k) where :math:`x_i\\in\\mathbb{R}^{n}`, :math:`\lambda_{ij}\\in\\mathbb{R}^n` for all :math:`j \in \mathcal{N}_i`, :math:`X_i\\subseteq\mathbb{R}^{n}` for all :math:`i` and :math:`\\alpha^k` is a positive stepsize. The algorithm has been presented in ????. """ # TODO choose ref def __init__(self, agent: Agent, initial_condition: dict, enable_log: bool = False): super(DualDecomposition, self).__init__(agent, enable_log) if not isinstance(agent.problem, Problem): raise TypeError("The agent must be equipped with a Problem") if sum(1 for i in agent.problem.objective_function.input_shape if i > 1) > 1: raise ValueError("Currently only mono-dimensional objective functions are supported") if not all([isinstance(x, np.ndarray) for _, x in initial_condition.items()]): raise TypeError("The initial condition dictionary can only contain numpy vectors") if sorted(agent.in_neighbors) != sorted([x for x in initial_condition]): raise TypeError("The initial condition dictionary must contain exactly one vector per neighbor") # shape of local variable self.x_shape = agent.problem.objective_function.input_shape # initialize dual variables and primal solution self.lambd0 = deepcopy(initial_condition) self.lambd = deepcopy(initial_condition) self.x = None
[docs] def run(self, iterations: int = 1000, stepsize: Union[float, Callable] = 0.1, verbose: bool=False, **kwargs) -> np.ndarray: """Run the algorithm for a given number of iterations Args: iterations: Number of iterations. Defaults to 1000. stepsize: If a float is given as input, the stepsize is constant. If a function is given, it must take an iteration k as input and output the corresponding stepsize.. Defaults to 0.1. verbose: If True print some information during the evolution of the algorithm. Defaults to False. Raises: TypeError: The number of iterations must be an int TypeError: The stepsize must be a float or a callable Returns: return a tuple (x, lambda) with the sequence of primal solutions and dual variables if enable_log=True. """ if not isinstance(iterations, int): raise TypeError("The number of iterations must be an int") if not (isinstance(stepsize, float) or callable(stepsize)): raise TypeError("The stepsize must be a float or a function") if self.enable_log: # initialize sequence of x x_dims = [iterations] for dim in self.x_shape: x_dims.append(dim) self.x_sequence = np.zeros(x_dims) # initialize sequence of lambda self.lambda_sequence = {} for j in self.agent.in_neighbors: self.lambda_sequence[j] = np.zeros(x_dims) for k in range(iterations): # store current lambda if self.enable_log: for j in self.agent.in_neighbors: self.lambda_sequence[j][k] = self.lambd[j] # perform an iteration if not isinstance(stepsize, float): step = stepsize(k) else: step = stepsize self.iterate_run(stepsize=step, **kwargs) # store primal solution if self.enable_log: self.x_sequence[k] = self.x if verbose: if self.agent.id == 0: print('Iteration {}'.format(k), end="\r") if self.enable_log: return (self.x_sequence, self.lambda_sequence)
def _update_local_solution(self, x: np.ndarray, x_neigh: dict, stepsize: float, **kwargs): """Update the local solution Args: x: current solution x_neigh: solutions of neighbors (dictionary) stepsize: step-size for update """ for j, x_j in x_neigh.items(): self.lambd[j] += stepsize * (x - x_j) self.x = x
[docs] def iterate_run(self, stepsize: float, **kwargs): """Run a single iterate of the algorithm """ # TODO extend to non mono-dimensional variables # exchange dual variables with neighbors lambda_neigh = self.agent.neighbors_exchange(self.lambd, dict_neigh=True) deltalambda = np.zeros(self.x_shape) for j in self.agent.in_neighbors: deltalambda += self.lambd[j] - lambda_neigh[j] # build local problem x = Variable(self.x_shape[0]) obj_function = self.agent.problem.objective_function + deltalambda @ x pb = Problem(obj_function, self.agent.problem.constraints) # solve problem and save data x = pb.solve() # exchange primal variables with neighbors x_neigh = self.agent.neighbors_exchange(x) self._update_local_solution(x, x_neigh, stepsize, **kwargs)
[docs] def get_result(self): """Return the current value primal solution and dual variable Returns: tuple (primal, dual): value of primal solution (np.ndarray), dual variables (dictionary of np.ndarray) """ return (self.x, self.lambd)