Source code for disropt.algorithms.dual_subgradient

import numpy as np
from typing import Union, Callable
from ..agents.agent import Agent
from .consensus import Consensus
from ..problems.problem import Problem


[docs]class DualSubgradientMethod(Consensus): """Distributed dual subgradient method. From the perspective of agent :math:`i` the algorithm works as follows. For :math:`k=0,1,\\dots` .. math:: y_i^{k} &= \sum_{j=1}^N w_{ij} \lambda_j^k x_i^{k+1} &\in \\text{arg} \min_{x_i \in X_i} f_i(x_i) + g_i(x_i)^\\top y_i^k \lambda_i^{k+1} &= \Pi_{\lambda \ge 0}[y_i^k + \\alpha^k g_i(x_i^{k+1})] \hat{x}_i^{k+1} &= \hat{x}_i^{k} + \\frac{\\alpha^k}{\sum_{r=0}^{k} \\alpha^r} (x_i^{k+1} - \hat{x}_i^{k}) where :math:`x_i, \hat{x}_i\\in\\mathbb{R}^{n_i}`, :math:`\lambda_i,y_i\\in\\mathbb{R}^S`, :math:`X_i\\subseteq\mathbb{R}^{n_i}` for all :math:`i`, :math:`\\alpha^k` is a positive stepsize and :math:`\\Pi_{\lambda \ge 0}[]` denotes the projection operator over the nonnegative orthant. The algorithm has been presented in [FaMa17]_. .. warning:: this algorithm is still under development """ def __init__(self, agent: Agent, initial_condition: np.ndarray, initial_runavg: np.ndarray = None, enable_log: bool = False): super(DualSubgradientMethod, self).__init__(agent, initial_condition, enable_log) # initalize running average if initial_runavg is None: self.x_hat = np.zeros(self.agent.problem.objective_function.input_shape) else: self.x_hat = initial_runavg # initialize cumulative sum of stepsize self.stepsize_sum = 0 # number of coupling constraints self.S = initial_condition.size def _update_local_solution(self, x: np.ndarray, stepsize: float = 0.1): y = x # define lagrangian: f_i(x_i) + y^top g_i(x_i) lagrangian = self.agent.problem.objective_function + (y @ self.agent.problem.coupling_function) # create local problem pb = Problem(lagrangian, self.agent.problem.constraints) # solve local problem x_lagr = pb.solve() # perform a sugradient step on the dual problem lambda_t = y + stepsize * self.agent.problem.coupling_function.eval(x_lagr) # project onto non-negative orthant lambda_t = np.maximum(lambda_t, np.zeros((self.S, 1))) self.x = lambda_t # update running average self.stepsize_sum += stepsize self.x_hat = self.x_hat + stepsize/self.stepsize_sum * (x_lagr - self.x_hat)
[docs] def run(self, iterations: int = 1000, stepsize: Union[float, Callable] = 0.1, verbose: bool=False, callback_iter: Callable=None) -> np.ndarray: """Run the algorithm for a given number of iterations Args: iterations: Number of iterations. Defaults to 1000. stepsize: If a float is given as input, the stepsize is constant. If a function is given, it must take an iteration k as input and output the corresponding stepsize.. Defaults to 0.1. verbose: If True print some information during the evolution of the algorithm. Defaults to False. callback_iter: callback function to be called at the end of each iteration. Must take an iteration k as input. Defaults to None. Raises: TypeError: The number of iterations must be an int TypeError: The stepsize must be a float or a callable Returns: return a tuple (lambda, x_hat) with the sequence of dual and primal estimates if enable_log=True. """ if not isinstance(iterations, int): raise TypeError("The number of iterations must be an int") if not (isinstance(stepsize, float) or callable(stepsize)): raise TypeError("The stepsize must be a float or a function") if callback_iter is not None and not callable(callback_iter): raise TypeError("The callback function must be a Callable") if self.enable_log: # initialize sequence of lambda lambda_dims = [iterations] for dim in self.x.shape: lambda_dims.append(dim) self.lambda_sequence = np.zeros(lambda_dims) # initialize sequence of x hat xhat_dims = [iterations] for dim in self.x_hat.shape: xhat_dims.append(dim) self.xhat_sequence = np.zeros(xhat_dims) for k in range(iterations): if not isinstance(stepsize, float): step = stepsize(k) else: step = stepsize self.iterate_run(stepsize=step) if callback_iter is not None: callback_iter(k) if self.enable_log: self.lambda_sequence[k] = self.x self.xhat_sequence[k] = self.x_hat if verbose: if self.agent.id == 0: print('Iteration {}'.format(k), end="\r") if self.enable_log: return (self.lambda_sequence, self.xhat_sequence)
[docs] def get_result(self): """Return the actual value of dual and primal averaged variable Returns: tuple (dual, primal) of numpy.ndarray: value of primal running average, value of dual variable """ return (self.x, self.x_hat)