Source code for disropt.algorithms.misc

import numpy as np
import time
from typing import Any 
from ..agents import Agent
from .algorithm import Algorithm


[docs]class LogicAnd(Algorithm): """Logic-And algorithm. It can be used for checking in a distributed way if a certain condition (corresponding to flag=True in the algorithm) is satisfied by all the agents in the network. Details can be found in [FaGa19a]_ Args: agent (Agent): Agent graph_diameter (int): diameter of the graph representing the network flag (bool, optional): local flag value. Defaults to False. enable_log (bool, optional): True for enabling log. Defaults to False. """ def __init__(self, agent: Agent, graph_diameter: int, flag: bool = False, enable_log: bool = False, **kwargs): super(LogicAnd, self).__init__(agent, enable_log, **kwargs) self.flag = flag self.graph_diameter = graph_diameter self.S = np.zeros([graph_diameter, len(self.agent.in_neighbors) + 1]) self.S_ind = {} for j in range(len(self.agent.in_neighbors)): self.S_ind[self.agent.in_neighbors[j]] = j
[docs] def change_flag(self, new_flag: bool): """Change the local flag Args: new_flag: new flag """ if not isinstance(new_flag, bool): raise TypeError("new flag must be a bool") self.flag = new_flag
[docs] def matrix_update(self): """Update the matrix S """ self.S[0, -1] = int(self.flag) for l in range(self.graph_diameter-1): self.S[l+1, -1] = np.prod(self.S[l, :])
[docs] def force_matrix_update(self): """ Force the matrix S to have all ones in the last row """ self.S[-1] = 1
[docs] def update_column(self, neighbor: Any, column: np.ndarray): """Update a column of the matrix corresponding to a neighbor Args: neighbor: neighbor column: column value Raises: TypeError: second argument must be a numpy.ndarray with shape (graph_diameter, ) ValueError: second argument must be a numpy.ndarray with shape (graph_diameter, ) """ if not isinstance(column, np.ndarray): raise TypeError("second argument must be a numpy.ndarray with shape (graph_diameter, )") if column.shape != (self.graph_diameter, ): raise ValueError("second argument must be a numpy.ndarray with shape (graph_diameter, )") index = self.S_ind[neighbor] self.S[:, index] = column
[docs] def check_stop(self): """Check the last row of S Returns: bool: True if last row contains only ones. Meaning that all have the flag True """ return bool(np.prod(self.S[-1, :]))
[docs] def iterate_run(self): """Run an iterate """ data = self.agent.neighbors_exchange(self.S[:, -1]) for neigh in data: self.update_column(neigh, data[neigh]) self.matrix_update()
[docs] def run(self, maximum_iterations: int = 100, verbose: bool=False): """Run the algorithm Args: maximum_iterations: Maximum number of iterations. Defaults to 100. verbose: If True print some information during the evolution of the algorithm. Defaults to False. Raises: TypeError: maximum iterations must be an int """ if not isinstance(maximum_iterations, int): raise TypeError("maximum iterations must be an int") k = 0 while k < maximum_iterations: self.iterate_run() if self.check_stop(): print("logic-and completed in {} iterations".format(k)) break if verbose: if self.agent.id == 0: print('Iteration {}'.format(k), end="\r") k += 1
[docs] def matrix_reset(self): self.flag = False self.S = np.zeros(self.S.shape)
[docs]class AsynchronousLogicAnd(LogicAnd): """Asyncrhonous Logic-And algorithm. It can be used for checking in a distributed way if a certain condition (corresponding to flag=True in the algorithm) is satisfied by all the agents in the network. Details can be found in [FaGa19a]_ Args: agent (Agent): Agent graph_diameter (int): diameter of the graph representing the network flag (bool, optional): local flag value. Defaults to False. enable_log (bool, optional): True for enabling log. Defaults to False. """
[docs] def iterate_run(self): """Run an iterate """ data = self.agent.neighbors_receive_asynchronous() for neigh in data: self.update_column(neigh, data[neigh]) self.matrix_update() self.agent.neighbors_send(self.S[:, -1])
[docs] def run(self, maximum_running_time: float = 1): """Run the algorithm Args: maximum_running_time: Maximum running time. Defaults to 1. Raises: TypeError: maximum running time must be a float """ if not isinstance(maximum_running_time, float): raise TypeError("maximum running time must be a float") start_time = time.time() end_time = start_time + maximum_running_time while time.time() <= end_time: self.iterate_run() if self.check_stop(): print("logic-and completed in {} s".format(time.time()-start_time)) break
[docs]class MaxConsensus(Algorithm): """Max-Consensus algorithm. It computes the entry-wise maximum of a numpy array by using only neighboring communication. Args: agent (Agent): Agent x0 (np.ndarray): local initial condition graph_diameter (int, optional): diameter of the graph representing the network enable_log (bool, optional): True to enable log. Defaults to False. """ def __init__(self, agent: Agent, x0: np.ndarray, graph_diameter: int = None, enable_log: bool = False, **kwargs): super(MaxConsensus, self).__init__(agent, enable_log, **kwargs) self.x0 = x0 self.x = x0 self.graph_diameter = graph_diameter self.stop_iterations = None # if the graph diameter is provided, set stopping criterion if graph_diameter is not None: self.stop_iterations = 2 * graph_diameter + 1
[docs] def iterate_run(self): """Run an iterate """ data = self.agent.neighbors_exchange(self.x) for neigh in data: self.x = np.maximum(self.x, data[neigh])
[docs] def run(self, iterations: int = 100, verbose: bool=False): """Run the algorithm Args: iterations: Maximum number of iterations. Defaults to 100. verbose: If True print some information during the evolution of the algorithm. Defaults to False. Raises: TypeError: maximum iterations must be an int """ if not isinstance(iterations, int) or iterations <= 0: raise TypeError("iterations must be a positive integer") if self.enable_log: # initialize sequence dims = [iterations] dims.extend(self.x.shape) self.sequence_x = np.zeros(dims) # initialize counter for stopping criterion counter = 0 last_iter = np.copy(iterations) for k in range(iterations): # store previous value prev_x = self.x # perform an iteration self.iterate_run() # store solution sequence if self.enable_log: self.sequence_x[k] = self.x # print information if verbose and self.agent.id == 0: print('Iteration {}'.format(k), end="\r") # increase counter if solution has not changed, otherwise reset if np.linalg.norm(prev_x - self.x) < 1e-6: counter += 1 else: counter = 0 # check termination condition if self.stop_iterations is not None and counter > self.stop_iterations: # convergence detected self.agent.neighbors_send(self.x) # broadcast local basis a last time last_iter = k+1 break # return sequences if self.enable_log: return self.sequence_x.take(np.arange(0,last_iter), axis=0)
[docs] def get_result(self): return self.x