Source code for disropt.algorithms.subgradient

import numpy as np
import time
import warnings
from copy import deepcopy
from typing import Union, Callable, Tuple
from ..agents import Agent
from ..constraints.projection_sets import AbstractSet
from .consensus import Consensus, BlockConsensus
from disropt.functions.submodular_func import SubmodularFn


[docs]class SubgradientMethod(Consensus): """Distributed projected (sub)gradient method. From the perspective of agent :math:`i` the algorithm works as follows. For :math:`k=0,1,\\dots` .. math:: y_i^{k} &= \sum_{j=1}^N w_{ij} x_j^k x_i^{k+1} &= \Pi_{X}[y_i^k - \\alpha^k \\tilde{\\nabla} f_i(y_i^k)] where :math:`x_i,y_i\\in\\mathbb{R}^n`, :math:`X\\subseteq\mathbb{R}^n`, :math:`\\alpha^k` is a positive stepsize, :math:`w_{ij}` denotes the weight assigned by agent :math:`i` to agent :math:`j`, :math:`\\Pi_X[]` denotes the projection operator over the set :math:`X` and :math:`\\tilde{\\nabla} f_i(y_i^k)\\in\\partial f_i(y_i^k)` a (sub)gradient of :math:`f_i` computed at :math:`y_i^k`. The weight matrix :math:`W=[w_{ij}]_{i,j=1}^N` should be doubly stochastic. The algorithm, as written above, was originally presented in [NeOz09]_. Many other variants and extension has been proposed, allowing for stochastic objective functions, time-varying graphs, local stepsize sequences. All these variant can be implemented through the :class:`SubgradientMethod` class. """ def __init__(self, agent: Agent, initial_condition: np.ndarray, enable_log: bool = False): if not isinstance(agent, Agent): raise TypeError("Agent must be an Agent") super(SubgradientMethod, self).__init__(agent, initial_condition, enable_log) def _update_local_solution(self, x: np.ndarray, stepsize: float = 0.1, projection: bool = False): y = x # Perform a sugradient step self.x = y - stepsize * self.agent.problem.objective_function.subgradient(y) if projection: self.x = self.agent.problem.project_on_constraint_set(self.x)
[docs] def run(self, iterations: int = 1000, stepsize: Union[float, Callable] = 0.001, verbose: bool=False) -> np.ndarray: """Run the algorithm for a given number of iterations Args: iterations: Number of iterations. Defaults to 1000. stepsize: If a float is given as input, the stepsize is constant. If a function is given, it must take an iteration k as input and output the corresponding stepsize.. Defaults to 0.1. verbose: If True print some information during the evolution of the algorithm. Defaults to False. Raises: TypeError: The number of iterations must be an int TypeError: The stepsize must be a float or a callable ValueError: Only sets (children of AstractSet) with explicit projections are currently supported ValueError: Only one constraint per time is currently supported Returns: return the sequence of estimates if enable_log=True. """ if not isinstance(iterations, int): raise TypeError("The number of iterations must be an int") if not (isinstance(stepsize, float) or callable(stepsize)): raise TypeError("The stepsize must be a float or a function") actv_projection = False if len(self.agent.problem.constraints) != 0: actv_projection = True if self.enable_log: dims = [iterations] for dim in self.x.shape: dims.append(dim) self.sequence = np.zeros(dims) for k in range(iterations): if not isinstance(stepsize, float): step = stepsize(k) else: step = stepsize self.iterate_run(stepsize=step, projection=actv_projection) if self.enable_log: self.sequence[k] = self.x if verbose: if self.agent.id == 0: print('Iteration {}'.format(k), end="\r") if self.enable_log: return self.sequence
[docs]class BlockSubgradientMethod(BlockConsensus): """Distributed block subgradient method. This is a special instance of the Block Proximal Method in [FaNo19]_ At each iteration, the agent can update its local estimate or not at each iteration according to a certain probability (awakening_probability). From the perspective of agent :math:`i` the algorithm works as follows. At iteration :math:`k` if the agent is awake, it selects a random block :math:`\\ell_i^k` of its local solution and updates .. math:: \\begin{align} y_i^k &= \\sum_{j\\in\\mathcal{N}_i} w_{ij} x_{j\\mid i}^k \\\\ x_{i,\\ell}^{k+1} &= \\begin{cases} \\Pi_{X_\\ell}\\left[y_i^k - \\alpha_i^k [\\tilde{\\nabla} f_i(y_i^k)]_{\\ell}\\right] & \\text{if } \\ell = \\ell_i^k \\\\ x_{i,\\ell}^{k} & \\text{otherwise} \\end{cases} \\end{align} then it broadcasts :math:`x_{i,\\ell_i^k}^{k+1}` to its out-neighbors. Otherwise (if the agent is not awake) :math:`x_{i}^{k+1}=x_i^k`. Here :math:`\\mathcal{N}_i` is the current set of in-neighbors and :math:`x_{j\\mid i},j\\in\\mathcal{N}_i` is the local copy of :math:`x_j` available at node :math:`i` and :math:`x_{i,\\ell}` denotes the :math:`\\ell`-th block of :math:`x_i`. The weight matrix :math:`W=[w_{ij}]_{i,j=1}^N` should be doubly stochastic. Notice that if there is only one block and awakening_probability=1 the :class:`BlockSubgradientMethod` reduces to the :class:`SubgradientMethod`. """ def __init__(self, agent: Agent, initial_condition: np.ndarray, **kwargs): if not isinstance(agent, Agent): raise TypeError("Agent must be an Agent") super(BlockSubgradientMethod, self).__init__(agent, initial_condition, **kwargs) def _update_local_solution(self, x: np.ndarray, selected_block: Tuple, stepsize: float = 0.1, projection: bool = False): block = list(selected_block) y = x # Perform a sugradient step block_method = getattr(self.agent.problem.objective_function, "blocksubgradient", None) if callable(block_method): self.x[block] = y[block] - stepsize * self.agent.problem.objective_function.blocksubgradient(y, block)[block] else: self.x[block] = y[block] - stepsize * self.agent.problem.objective_function.subgradient(y)[block] if isinstance(self.agent.problem.objective_function, SubmodularFn): for i in np.arange(len(block)): if self.x[block[i]] > 1: self.x[block[i]] = 1 if self.x[block[i]] < 0: self.x[block[i]] = 0 if projection: # TODO: single block projection with multiple sets old_x = deepcopy(self.x) self.x = self.agent.problem.project_on_constraint_set(self.x) for idx, value in enumerate(self.x): if idx not in block: if old_x[idx] != value: warnings.warn("Constraints are not separable on blocks. Convergence is not guaranteed.") break
[docs] def run(self, iterations: int = 1000, stepsize: Union[float, Callable] = 0.1, verbose: bool=False) -> np.ndarray: """Run the algorithm for a given number of iterations Args: iterations: Number of iterations. Defaults to 1000. stepsize: If a float is given as input, the stepsize is constant. If a function is given, it must take an iteration k as input and output the corresponding stepsize.. Defaults to 0.1. verbose: If True print some information during the evolution of the algorithm. Defaults to False. Raises: TypeError: The number of iterations must be an int TypeError: The stepsize must be a float or a callable ValueError: Only sets (children of AstractSet) with explicit projections are currently supported ValueError: Only one constraint per time is currently supported Returns: return the sequence of estimates if enable_log=True. """ if not isinstance(iterations, int): raise TypeError("The number of iterations must be an int") if not (isinstance(stepsize, float) or callable(stepsize)): raise TypeError("The stepsize must be a float or a function") actv_projection = False if len(self.agent.problem.constraints) != 0: actv_projection = True if self.enable_log: dims = [iterations] for dim in self.x.shape: dims.append(dim) self.sequence = np.zeros(dims) for k in range(iterations): if not isinstance(stepsize, float): step = stepsize(k) else: step = stepsize self.iterate_run(stepsize=step, projection=actv_projection) if self.enable_log: self.sequence[k] = self.x if verbose: if self.agent.id == 0: print('Iteration {}'.format(k), end="\r") if self.enable_log: return self.sequence