Source code for disropt.algorithms.gradient_tracking

import numpy as np
from typing import Union, Callable
from ..agents import Agent
from .algorithm import Algorithm
from .consensus import PushSumConsensus


[docs]class GradientTracking(Algorithm): """Gradient Tracking Algorithm [...]_ From the perspective of agent :math:`i` the algorithm works as follows. For :math:`k=0,1,\\dots` .. math:: x_i^{k+1} & = \\sum_{j=1}^N w_{ij} x_j^k - \\alpha d_i^k \\\\ d_i^{k+1} & = \\sum_{j=1}^N w_{ij} d_j^k - [ \\nabla f_i (x_i^{k+1}) - \\nabla f_i (x_i^k)] where :math:`x_i\\in\\mathbb{R}^n` and :math:`d_i\\in\\mathbb{R}^n`. The weight matrix :math:`W=[w_{ij}]` must be doubly-stochastic. Extensions to other class of weight matrices :math:`W` are not currently supported. Args: agent (Agent): agent to execute the algorithm initial_condition (numpy.ndarray): initial condition for :math:`x_i` enable_log (bool): True for enabling log Attributes: agent (Agent): agent to execute the algorithm x0 (numpy.ndarray): initial condition x (numpy.ndarray): current value of the local solution d (numpy.ndarray): current value of the local tracker shape (tuple): shape of the variable x_neigh (dict): dictionary containing the local solution of the (in-)neighbors d_neigh (dict): dictionary containing the local tracker of the (in-)neighbors enable_log (bool): True for enabling log """ # TODO choose ref def __init__(self, agent: Agent, initial_condition: np.ndarray, enable_log: bool = False): super(GradientTracking, self).__init__(agent, enable_log) self.x0 = initial_condition self.x = initial_condition # Initialize tracker at x0 self.d = self.agent.problem.objective_function.subgradient(initial_condition) self.shape = self.x.shape self.x_neigh = {} self.d_neigh = {} def _update_local_solution(self, x: np.ndarray, **kwargs): """update the local solution Args: x: new value Raises: TypeError: Input must be a numpy.ndarray ValueError: Incompatible shapes """ if not isinstance(x, np.ndarray): raise TypeError("Input must be a numpy.ndarray") if x.shape != self. x0.shape: raise ValueError("Incompatible shapes") self.x = x def _update_local_tracker(self, d: np.ndarray, **kwargs): """update the local tracker Args: d: new value Raises: TypeError: Input must be a numpy.ndarray ValueError: Incompatible shapes """ if not isinstance(d, np.ndarray): raise TypeError("Input must be a numpy.ndarray") if d.shape != self. x0.shape: raise ValueError("Incompatible shapes") self.d = d
[docs] def iterate_run(self, stepsize: float, **kwargs): """Run a single iterate of the gradient tracking algorithm """ data_x = self.agent.neighbors_exchange(self.x) for neigh in data_x: self.x_neigh[neigh] = data_x[neigh] data_d = self.agent.neighbors_exchange(self.d) for neigh in data_d: self.d_neigh[neigh] = data_d[neigh] x_kp = self.agent.in_weights[self.agent.id] * self.x d_kp = self.agent.in_weights[self.agent.id] * self.d for j in self.agent.in_neighbors: x_kp += self.agent.in_weights[j] * self.x_neigh[j] d_kp += self.agent.in_weights[j] * self.d_neigh[j] x_kp += - stepsize * self.d d_kp += self.agent.problem.objective_function.subgradient( x_kp) - self.agent.problem.objective_function.subgradient(self.x) self._update_local_solution(x_kp, **kwargs) self._update_local_tracker(d_kp, **kwargs)
[docs] def run(self, iterations: int = 1000, stepsize: Union[float, Callable] = 0.1, verbose: bool=False) -> np.ndarray: """Run the gradient tracking algorithm for a given number of iterations Args: iterations: Number of iterations. Defaults to 1000. stepsize: If a float is given as input, the stepsize is constant. Default is 0.01. verbose: If True print some information during the evolution of the algorithm. Defaults to False. Raises: TypeError: The number of iterations must be an int TypeError: The stepsize must be a float Returns: return the sequence of estimates if enable_log=True. """ if not isinstance(iterations, int): raise TypeError("The number of iterations must be an int") if not (isinstance(stepsize, float) or callable(stepsize)): raise TypeError("The stepsize must be a float or a function") if self.enable_log: dims = [iterations] for dim in self.x.shape: dims.append(dim) self.sequence = np.zeros(dims) for k in range(iterations): if not isinstance(stepsize, float): step = stepsize(k) else: step = stepsize self.iterate_run(stepsize=step) if self.enable_log: self.sequence[k] = self.x if verbose: if self.agent.id == 0: print('Iteration {}'.format(k), end="\r") if self.enable_log: return self.sequence
[docs] def get_result(self): """Return the actual value of x Returns: numpy.ndarray: value of x """ return self.x
[docs]class DirectedGradientTracking(PushSumConsensus): """Gradient Tracking Algorithm [XiKh18]_ From the perspective of agent :math:`i` the algorithm works as follows. For :math:`k=0,1,\\dots` .. math:: x_i^{k+1} & = \\sum_{j=1}^N a_{ij} x_j^k - \\alpha y_i^k \\\\ y_i^{k+1} & = \\sum_{j=1}^N b_{ij} (y_j^k - [ \\nabla f_j (x_j^{k+1}) - \\nabla f_j (x_j^k)]) where :math:`x_i\\in\\mathbb{R}^n` and :math:`y_i\\in\\mathbb{R}^n`. The weight matrix :math:`A=[a_{ij}]` must be row-stochastic, while :math:`B=[b_{ij}]` must be column-stochastic. The underlying graph can be directed (and unbalanced). """ def __init__(self, agent: Agent, initial_condition: np.ndarray, enable_log: bool = False): super(DirectedGradientTracking, self).__init__(agent, initial_condition, enable_log) # initialize tracker self.y = self.agent.problem.objective_function.subgradient(initial_condition)
[docs] def iterate_run(self, **kwargs): """Run a single iterate of the algorithm """ stepsize = kwargs.get('stepsize', 0.1) # in data = self.agent.neighbors_exchange(self.x) for neigh in data: self.x_neigh[neigh] = data[neigh] x_avg = self.agent.in_weights[self.agent.id] * self.x for i in self.agent.in_neighbors: x_avg += self.agent.in_weights[i] * self.x_neigh[i] subg_old = self.agent.problem.objective_function.subgradient(self.x) self._update_x_average(x_avg - stepsize * self.y, **kwargs) subg_new = self.agent.problem.objective_function.subgradient(self.x) # y average send_data = {} gradient_diff = subg_new - subg_old for j in self.agent.out_neighbors: send_data[j] = self.agent.out_weights[j] * (self.y + gradient_diff) data = self.agent.neighbors_exchange(send_data, dict_neigh=True) for neigh in data: self.y_neigh[neigh] = data[neigh] y_avg = self.agent.out_weights[self.agent.id] * (self.y + gradient_diff) for i in self.agent.in_neighbors: y_avg += self.y_neigh[i] self._update_y_average(y_avg, **kwargs)
[docs] def get_result(self): """Return the actual value of x Returns: numpy.ndarray: value of x """ return self.x