Source code for disropt.algorithms.asymm

import numpy as np
from copy import deepcopy
from scipy import stats
import time
import warnings
import random
from typing import Optional, List, Tuple, Any
from ..agents import Agent
from .algorithm import Algorithm
from .misc import AsynchronousLogicAnd, LogicAnd
from ..utils.utilities import is_pos_def
from ..functions import Variable, SquaredNorm, Square, Max, Norm, Square, Abs

[docs]class ASYMM(AsynchronousLogicAnd): """Asyncrhonous Distributed Method of Multipliers [FaGa19b]_ See [FaGa19b]_ for the details. .. warning:: This algorithm is currently under development """ def __init__(self, agent: Agent, graph_diameter: int, initial_condition: np.ndarray, enable_log: bool = False, **kwargs): super(ASYMM, self).__init__(agent=agent, graph_diameter=graph_diameter, enable_log=enable_log, **kwargs) self.x0 = initial_condition self.x_old = initial_condition self.x = initial_condition self.shape = self.x.shape self.x_neigh = {} # multipliers self.nu_from = {} self.nu_to = {} self.rho_from = {} self.rho_to = {} self.gr_cn = {} self.multiplier = {} self.penalty = {} self.constr_val = {} # Algorithm parameters # max penalty self.max_penalty = 1e5 # constant stepsize self.alpha = 0.1 # initial tolerance self.e = 0.1 # penalty threshold self.gamma = 0.25 # penalty growth parameter self.beta = 2 # auxiliary variables self.M_done = False # local lagrangian self.local_lagrangian = None def __initialize_multipliers(self): for neigh in self.agent.in_neighbors: self.nu_from[neigh] = 0.000001*np.random.rand(*self.shape) self.rho_from[neigh] = 0.1 self.nu_to[neigh] = 0.000001*np.random.rand(*self.shape) self.rho_to[neigh] = 0.1 self.gr_cn[neigh] = np.linalg.norm(self.x - self.x_neigh[neigh], ord=2) for idx, constraint in enumerate(self.agent.problem.constraints): self.multiplier[idx] = 0.000001*np.random.rand(*constraint.output_shape) self.penalty[idx] = 0.1 if constraint.is_equality: self.constr_val[idx] = np.linalg.norm(constraint.function.eval(self.x), ord=2) elif constraint.is_inequality: self.constr_val[idx] = np.linalg.norm( np.max([-self.multiplier[idx]/self.penalty[idx], constraint.function.eval(self.x)], axis=0)) def __update_local_lagrangian(self): x = Variable(self.shape[0]) # objective function fn = 0 fn += self.agent.problem.objective_function # neighboring constraints for j in self.agent.in_neighbors: fn += (self.nu_to[j] - self.nu_from[j]) @ x + (self.rho_to[j]+self.rho_from[j]) / 2 * (x - self.x_neigh[j]) @ (x - self.x_neigh[j]) # fn += (self.rho_to[j]+self.rho_from[j]) / 2 * Abs((x - self.x_neigh[j])@(x - self.x_neigh[j]) - 1e-7) # equality constraints for idx, constraint in enumerate(self.agent.problem.constraints): if constraint.is_equality: fn += self.multiplier[idx] @ constraint.function + \ 0.5 * self.penalty[idx] * SquaredNorm(constraint.function) elif constraint.is_inequality: shape = constraint.output_shape fn += 1/(2*self.penalty[idx]) * np.ones(shape) @ ( Square(Max(0, self.multiplier[idx] + self.penalty[idx]*constraint.function)) - np.power(self.multiplier[idx], 2)) self.local_lagrangian = fn def __update_stepsize(self): # TODO stepsize estimate self.alpha /= 1.4 def __perform_descent(self, stepsize_type='newton'): if stepsize_type == 'newton': gradient = self.local_lagrangian.subgradient(self.x) hessian = self.local_lagrangian.hessian(self.x) if is_pos_def(hessian): try: self.x -= 0.5 * np.linalg.inv(hessian) @ gradient except: self.x -= 0.5 * 1/np.linalg.norm(hessian) * gradient else: self.x -= min(self.alpha, self.e/10) * gradient else: self.x -= self.alpha * self.local_lagrangian.subgradient(self.x)
[docs] def primal_update_step(self): self.__update_local_lagrangian() self.__perform_descent() gradient = self.local_lagrangian.subgradient(self.x) if np.linalg.norm(gradient, ord=2) <= self.e: self.change_flag(new_flag=True) self.matrix_update()
[docs] def dual_update_step(self): for neigh in self.agent.out_neighbors: self.nu_to[neigh] += float(self.rho_to[neigh]) * (self.x - self.x_neigh[neigh]) if np.linalg.norm(self.x - self.x_neigh[neigh]) >= self.gamma * self.gr_cn[neigh]: self.rho_to[neigh] = min(self.beta * self.rho_to[neigh], self.max_penalty) # norm of the constraint value self.gr_cn[neigh] = np.linalg.norm(self.x-self.x_neigh[neigh]) for idx, constr in enumerate(self.agent.problem.constraints): if constr.is_equality: self.multiplier[idx] += self.penalty[idx] * constr.function.eval(self.x) if np.linalg.norm(constr.function.eval(self.x), ord=2) > self.gamma * self.constr_val[idx]: self.penalty[idx] = min(self.beta * self.penalty[idx], self.max_penalty) self.constr_val[idx] = np.linalg.norm(constr.function.eval(self.x), ord=2) elif constr.is_inequality: self.multiplier[idx] = np.max( [np.zeros(constr.output_shape), self.multiplier[idx] + self.penalty[idx] * constr.function.eval(self.x)], axis=0) if np.linalg.norm( np.max([-self.multiplier[idx] / self.penalty[idx], constr.function.eval(self.x)], axis=0)) > self.gamma * self.constr_val[idx]: self.penalty[idx] = min(self.beta * self.penalty[idx], self.max_penalty) self.constr_val[idx] = np.linalg.norm( np.max([-self.multiplier[idx]/self.penalty[idx], constr.function.eval(self.x)], axis=0)) self.M_done = True
[docs] def reset_step(self): """ Reset the matrix S and update e """ self.M_done = False self.matrix_reset() self.e = self.e/1.5 self.__update_stepsize()
[docs] def run(self, running_time: float = 10.0): if not isinstance(running_time, float): raise TypeError("Running time must be a float") if self.enable_log: dims = [1] for dim in self.x.shape: dims.append(dim) self.dims = dims self.sequence = np.zeros(dims) self.sequence[0] += self.x self.timestamp_sequence_awake = [time.time()] self.timestamp_sequence_sleep = [time.time()] # Exchange all data at the beginning data = self.agent.neighbors_exchange(self.x) for neigh in data: self.x_neigh[neigh] = data[neigh] self.__initialize_multipliers() self.__update_local_lagrangian() self.__update_stepsize() received_nu = 0 end_time = time.time() + running_time while time.time() < end_time: if self.enable_log: self.timestamp_sequence_awake.append(time.time()) data = self.agent.neighbors_receive_asynchronous() for neigh in data: msg = data[neigh] if msg["type"] == "primal": self.x_neigh[neigh] = msg['x'] self.update_column(neigh, msg['S']) elif msg["type"] == "dual": self.force_matrix_update() self.nu_from[neigh] = msg['nu'] self.rho_from[neigh] = msg['rho'] received_nu += 1 del(msg) if self.M_done and received_nu == len(self.agent.in_neighbors): received_nu = 0 self.reset_step() self.__update_local_lagrangian() if not self.check_stop() and not self.M_done: self.primal_update_step() msg = {'type': "primal", 'x': self.x, 'S': self.S[:, -1]} self.agent.neighbors_send(msg) if self.check_stop() and not self.M_done: self.dual_update_step() for neigh in self.agent.out_neighbors: msg = {'type': "dual", 'nu': self.nu_to[neigh], 'rho': self.rho_to[neigh]} self.agent.communicator.neighbors_send(msg, [neigh]) # save sequence if self.enable_log: self.sequence = np.vstack([self.sequence, self.x.reshape(self.dims)]) if self.enable_log: self.timestamp_sequence_sleep.append(time.time()) if self.enable_log: return self.timestamp_sequence_awake, \ self.timestamp_sequence_sleep, \ self.sequence
[docs] def get_result(self): """Return the value of the solution """ return self.x