import numpy as np
import time
import warnings
import random
from typing import List, Tuple
from ..agents import Agent
from .algorithm import Algorithm
[docs]class Consensus(Algorithm):
"""Consensus Algorithm [OlSa07]_
From the perspective of agent :math:`i` the algorithm works as follows. For :math:`k=0,1,\\dots`
.. math::
x_i^{k+1} = \sum_{j=1}^N w_{ij} x_j^k
where :math:`x_i\\in\\mathbb{R}^n`. The weight matrix :math:`W=[w_{ij}]` should be doubly-stochastic in order to have convergence to the average of the local initial conditions. If :math:`W` is row-stochastic convergence is still attained but at a different point. Other type of matrices can be used, but convergence is not guaranteed.
Also time-varying graphs can be adopted.
Args:
agent (Agent): agent to execute the algorithm
initial_condition (numpy.ndarray): initial condition
enable_log (bool): True for enabling log
Attributes:
agent (Agent): agent to execute the algorithm
x0 (numpy.ndarray): initial condition
x (numpy.ndarray): current value of the local solution
shape (tuple): shape of the variable
x_neigh (dict): dictionary containing the local solution of the (in-)neighbors
enable_log (bool): True for enabling log
"""
def __init__(self, agent: Agent, initial_condition: np.ndarray, enable_log: bool=False):
super(Consensus, self).__init__(agent, enable_log)
self.x0 = initial_condition
self.x = initial_condition
self.shape = self.x.shape
self.x_neigh = {}
def _update_local_solution(self, x: np.ndarray, **kwargs):
"""update the local solution
Args:
x: new value
Raises:
TypeError: Input must be a numpy.ndarray
ValueError: Incompatible shapes
"""
if not isinstance(x, np.ndarray):
raise TypeError("Input must be a numpy.ndarray")
if x.shape != self. x0.shape:
raise ValueError("Incompatible shapes")
self.x = x
[docs] def iterate_run(self, **kwargs):
"""Run a single iterate of the algorithm
"""
data = self.agent.neighbors_exchange(self.x)
for neigh in data:
self.x_neigh[neigh] = data[neigh]
x_avg = self.agent.in_weights[self.agent.id] * self.x
for i in self.agent.in_neighbors:
x_avg += self.agent.in_weights[i] * self.x_neigh[i]
self._update_local_solution(x_avg, **kwargs)
[docs] def run(self, iterations: int=100, verbose: bool=False, **kwargs):
"""Run the algorithm for a given number of iterations
Args:
iterations: Number of iterations. Defaults to 100.
verbose: If True print some information during the evolution of the algorithm. Defaults to False.
"""
if not isinstance(iterations, int):
raise TypeError("iterations must be an int")
if self.enable_log:
dims = [iterations]
for dim in self.x.shape:
dims.append(dim)
self.sequence = np.zeros(dims)
for k in range(iterations):
self.iterate_run(**kwargs)
if self.enable_log:
self.sequence[k] = self.x
if verbose:
if self.agent.id == 0:
print('Iteration {}'.format(k), end="\r")
if self.enable_log:
return self.sequence
[docs] def get_result(self):
"""Return the actual value of x
Returns:
numpy.ndarray: value of x
"""
return self.x
[docs]class AsynchronousConsensus(Algorithm):
""" Asynchronous Consensus Algorithm
From the perspective of agent :math:`i` the algorithm works as follows. When agent :math:`i` gets awake it updates its local solution as
.. math::
x_i \\gets \sum_{j\\in\\mathcal{N}_i} w_{ij} x_{j\\mid i}
where :math:`\\mathcal{N}_i` is the current set of in-neighbors and :math:`x_{j\\mid i},j\\in\\mathcal{N}_i` is the local copy of :math:`x_j` available at node :math:`i` (which can be outdated, due to asynchrony, computation time and link failures).
Args:
agent: agent to execute the algorithm
initial_condition: initial condition
enable_log: True for enabling log. Defaults to False.
force_sleep: True if one wanst to force sleep after the computation phase. Defaults to False.
maximum_sleep: Maximum allowed sleep. Defaults to 0.01.
sleep_type: Type of sleep time("constant", "random"). Defaults to "random".
force_computation_time: True if one want sto force length computation phase. Defaults to False.
maximum_computation_time: Maximum allowed computation time. Defaults to 0.01.
computation_time_type: Type of computation time ("constant", "random"). Defaults to "random".
force_unreliable_links: True if one wants to force unreliable links. Defaults to False.
link_failure_probability: Probability of incoming links failure. Defaults to 0.
Attributes:
agent (Agent): agent to execute the algorithm
x0 (numpy.ndarray): initial condition
x (numpy.ndarray): current value of the local solution
shape (tuple): shape of the variable
x_neigh (dict): dictionary containing the local solution of the (in-)neighbors
enable_log (bool): True for enabling log
timestamp_sequence_awake ( list ): list of timestamps at which node get awake
timestamp_sequence_sleep ( list ): list of timestamps at which node go to sleep
force_sleep: True if one wanst to force sleep after the computation phase. Defaults to False.
maximum_sleep: Maximum allowed sleep. Defaults to 0.01.
sleep_type: Type of sleep time("constant", "random"). Defaults to "random".
force_computation_time: True if one want sto force length computation phase. Defaults to False.
maximum_computation_time: Maximum allowed computation time. Defaults to 0.01.
computation_time_type: Type of computation time ("constant", "random"). Defaults to "random".
force_unreliable_links: True if one wants to force unreliable links. Defaults to False.
link_failure_probability: Probability of incoming links failure. Defaults to 0.
"""
def __init__(self,
agent: Agent,
initial_condition: np.ndarray,
enable_log: bool = False,
force_sleep: bool = False,
maximum_sleep: float = 0.01,
sleep_type: str = "random",
force_computation_time: bool = False,
maximum_computation_time: float = 0.01,
computation_time_type: str = "random",
force_unreliable_links: bool = False,
link_failure_probability: float = 0):
super(AsynchronousConsensus, self).__init__(agent, enable_log)
self.x0 = initial_condition
self.x = initial_condition
self.shape = self.x.shape
self.x_neigh = {}
if force_sleep:
if sleep_type not in ("constant", "random"):
raise ValueError("sleep_type can be constant or random")
if force_computation_time:
if computation_time_type not in ("constant", "random"):
raise ValueError("computation_time_type can be constant or random")
self.enable_log = enable_log
self.force_sleep = force_sleep
self.maximum_sleep = maximum_sleep
self.sleep_type = sleep_type
self.force_computation_time = force_computation_time
self.maximum_computation_time = maximum_computation_time
self.computation_time_type = computation_time_type
self.force_unreliable_links = force_unreliable_links
self.link_failure_probability = link_failure_probability
self.timestamp_sequence_awake = None
self.timestamp_sequence_sleep = None
def _update_local_solution(self, x: np.ndarray, **kwargs):
"""update the local solution
Args:
x: new value
Raises:
TypeError: Input must be a numpy.ndarray
ValueError: Incompatible shapes
"""
if not isinstance(x, np.ndarray):
raise TypeError("Input must be a numpy.ndarray")
if x.shape != self. x0.shape:
raise ValueError("Incompatible shapes")
self.x = x
[docs] def iterate_run(self, **kwargs):
"""Run a single iterate
"""
data = self.agent.neighbors_receive_asynchronous()
if self.enable_log:
self.timestamp_sequence_awake.append(time.time())
for neigh in data:
self.x_neigh[neigh] = data[neigh]
if self.force_computation_time:
start_wait_time = time.time()
if not self.force_unreliable_links:
x_avg = self.agent.in_weights[self.agent.id] * self.x
for i in self.agent.in_neighbors:
x_avg += self.agent.in_weights[i] * self.x_neigh[i]
else:
warnings.warn("when forcing link failures, all neighbors are given the same weight")
neighbors = []
for neigh in self.agent.in_neighbors:
rnd = random.uniform(0, 1)
if rnd > self.link_failure_probability:
neighbors.append(neigh)
weight = 1.0/(len(neighbors) + 1)
x_avg = weight * self.x
for i in neighbors:
x_avg += weight * self.x_neigh[i]
self._update_local_solution(x_avg, **kwargs)
# save sequence
if self.enable_log:
self.sequence = np.vstack([self.sequence, self.x.reshape(self.dims)])
# force computation time if requested
if self.force_computation_time:
if self.computation_time_type == "random":
wait_time = random.uniform(0, self.maximum_computation_time)
elif self.computation_time_type == "constant":
wait_time = self.maximum_computation_time
remaining_wait_time = wait_time - (time.time() - start_wait_time)
if remaining_wait_time < 0:
warnings.warn("requested computation time cannot be guaranteed")
else:
time.sleep(remaining_wait_time)
self.agent.neighbors_send(self.x)
if self.enable_log:
self.timestamp_sequence_sleep.append(time.time())
[docs] def run(self, running_time: float = 5.0):
"""Run the asynchronous consensus algorithm for a certain amount of time
Args:
running_time: Total run time. Defaults to 5.0.
Returns:
tuple: timestamp_sequence_awake, timestamp_sequence_sleep, sequence
"""
if not isinstance(running_time, (int, float)):
raise TypeError("Running time must be a float")
if self.enable_log:
dims = [1]
for dim in self.x.shape:
dims.append(dim)
self.dims = dims
self.sequence = np.zeros(dims)
self.sequence[0] = self.x
self.timestamp_sequence_awake = [time.time()]
self.timestamp_sequence_sleep = [time.time()]
# Exchange all data at the beginning
data = self.agent.neighbors_exchange(self.x)
for neigh in data:
self.x_neigh[neigh] = data[neigh]
# Then go asyncrhronous
start_time = time.time()
end_time = start_time + running_time
while time.time() < end_time:
self.iterate_run()
# force a sleep time if requested
if self.force_sleep:
start_sleep_time = time.time()
if self.sleep_type == "random":
sleep_time = random.uniform(0, self.maximum_sleep)
elif self.sleep_type == "constant":
sleep_time = self.maximum_sleep
remaining_sleep_time = sleep_time - \
(time.time() - start_sleep_time)
if remaining_sleep_time < 0:
warnings.warn("requested delay cannot be guaranteed")
else:
time.sleep(remaining_sleep_time)
if self.enable_log:
return self.timestamp_sequence_awake, \
self.timestamp_sequence_sleep, \
self.sequence
[docs] def get_result(self):
"""Return the actual value of x
Returns:
numpy.ndarray: value of x
"""
return self.x
[docs]class BlockConsensus(Algorithm):
"""Block-wise consensus [FaNo19]_
At each iteration, the agent can update its local estimate or not at each iteration according to a certain probability (awakening_probability).
From the perspective of agent :math:`i` the algorithm works as follows. At iteration :math:`k` if the agent is awake, it selects a random block :math:`\\ell_i^k` of its local solution and updates
.. math::
x_{i,\\ell}^{k+1} = \\begin{cases}
\\sum_{j\\in\\mathcal{N}_i} w_{ij} x_{j\\mid i,\\ell}^k & \\text{if} \\ell = \\ell_i^k \\\\
x_{i,\\ell}^{k} & \\text{otherwise}
\\end{cases}
where :math:`\\mathcal{N}_i` is the current set of in-neighbors and :math:`x_{j\\mid i},j\\in\\mathcal{N}_i` is the local copy of :math:`x_j` available at node :math:`i` and :math:`x_{i,\\ell}` denotes the :math:`\\ell`-th block of :math:`x_i`. Otherwise :math:`x_{i}^{k+1}=x_i^k`.
Args:
agent: agent to execute the algorithm
initial_condition: initial condition
enable_log: True for enabling log
blocks_list: the list of blocks (list of tuples)
probabilities: list of probabilities of drawing each block
awakening_probability: probability of getting awake at each iteration
"""
def __init__(self,
agent: Agent,
initial_condition: np.ndarray,
enable_log: bool = False,
blocks_list: List[Tuple] = None,
probabilities: List[float] = None,
awakening_probability: float = 1.0):
super(BlockConsensus, self).__init__(agent, enable_log)
if (not isinstance(initial_condition, np.ndarray)) or \
(len(initial_condition.shape) != 2) or \
(initial_condition.shape[1] != 1):
raise ValueError("Initial condition must be a numpy.ndarray with shape (Any, 1)")
self.x0 = initial_condition
self.x = initial_condition
self.shape = self.x.shape
self.x_neigh = {}
if blocks_list is not None:
if not isinstance(blocks_list, list):
raise ValueError("blocks_list argument, if provided, must be a list of tuples")
items = 0
for item in blocks_list:
if not isinstance(item, tuple):
raise ValueError("blocks_list argument, if provided, must be a list of tuples")
items += len(item)
self.blocks_list = blocks_list
if items != self.shape[0]:
warnings.warn("Not all elements have been included in blocks_list")
else:
self.blocks_list = list(range(self.shape[0]))
self.blocks_number = len(self.blocks_list)
if probabilities is not None:
if isinstance(probabilities, list):
for i in probabilities:
if not isinstance(i, float):
raise ValueError("probabilities argument, if provided, must be a list of float")
else:
raise ValueError("probabilities argument, if provided, must be a list of float")
if len(probabilities) != len(self.blocks_list):
raise ValueError("blocks_list and probabilities arguments have different lengths")
if np.sum(probabilities) != 1.0:
raise ValueError("probabilities must sum to 1")
self.probabilities = probabilities
else:
self.probabilities = (np.ones(self.shape).flatten()/self.shape[0]).tolist()
if isinstance(awakening_probability, float) and (0 <= awakening_probability <= 1):
self.awakening_probability = awakening_probability
else:
raise ValueError("awakening_probability must be a float in [0,1]")
def _update_local_solution(self, x: np.ndarray, **kwargs):
"""update the local solution
Args:
x: new value
Raises:
TypeError: Input must be a numpy.ndarray
ValueError: Incompatible shapes
"""
if not isinstance(x, np.ndarray):
raise TypeError("Input must be a numpy.ndarray")
if x.shape != self. x0.shape:
raise ValueError("Incompatible shapes")
self.x = x
[docs] def iterate_run(self, **kwargs):
"""Run a single iterate of the algorithm
"""
awake = random.uniform(0, 1)
if awake <= self.awakening_probability:
selected_index = np.random.choice(np.arange(self.blocks_number), p=self.probabilities)
selected_block = self.blocks_list[selected_index]
if isinstance(selected_block, int):
selected_block = (selected_block, )
packet_send = {'block': selected_block, 'data': self.x[list(selected_block)]}
data = self.agent.neighbors_exchange(packet_send)
for neigh in data:
received_block = data[neigh]['block']
received_data = data[neigh]['data']
try:
self.x_neigh[neigh][list(received_block)] = received_data
except KeyError:
self.x_neigh[neigh] = np.zeros(self.shape)
self.x_neigh[neigh][list(received_block)] = received_data
x_avg = self.agent.in_weights[self.agent.id] * self.x
for i in self.agent.in_neighbors:
x_avg += self.agent.in_weights[i] * self.x_neigh[i]
self._update_local_solution(x_avg, selected_block=selected_block, **kwargs)
[docs] def run(self, iterations: int=100, verbose: bool=False):
"""Run the algorithm for a given number of iterations
Args:
iterations: Number of iterations. Defaults to 100.
verbose: If True print some information during the evolution of the algorithm. Defaults to False.
"""
if not isinstance(iterations, int):
raise TypeError("iterations must be an int")
if self.enable_log:
dims = [iterations]
for dim in self.x.shape:
dims.append(dim)
self.sequence = np.zeros(dims)
for k in range(iterations):
self.iterate_run()
if self.enable_log:
self.sequence[k] = self.x
if verbose:
if self.agent.id == 0:
print('Iteration {}'.format(k), end="\r")
if self.enable_log:
return self.sequence
[docs] def get_result(self):
"""Return the actual value of x
Returns:
numpy.ndarray: value of x
"""
return self.x
[docs]class PushSumConsensus(Algorithm):
"""Push-Sum Consensus Algorithm
From the perspective of agent :math:`i` the algorithm works as follows. For :math:`k=0,1,\\dots`
.. math::
x_i^{k+1} &= \\sum_{j=1}^N w_{ij} x_j^k
y_i^{k+1} &= \\sum_{j=1}^N w_{ij} y_j^k
z_i^{k+1} &= \\frac{x_i^{k+1}}{y_i^{k+1}}
where :math:`x_i\\in\\mathbb{R}^n`. The weight matrix :math:`W=[w_{ij}]` should be column-stochastic in order to let :math:`z_i^k` converge to the average of the local initial conditions.
Also time-varying graphs can be adopted.
Args:
agent (Agent): agent to execute the algorithm
initial_condition (numpy.ndarray): initial condition
enable_log (bool): True for enabling log
Attributes:
agent (Agent): agent to execute the algorithm
z0 (numpy.ndarray): initial condition
z (numpy.ndarray): current value of the local solution
shape (tuple): shape of the variable
x_neigh (dict): dictionary containing the x values of the (in-)neighbors
y_neigh (dict): dictionary containing the y values of the (in-)neighbors
enable_log (bool): True for enabling log
"""
def __init__(self, agent: Agent, initial_condition: np.ndarray, enable_log: bool=False):
super(PushSumConsensus, self).__init__(agent, enable_log)
self.z0 = initial_condition
self.x = initial_condition
self.z = initial_condition
self.y = np.ones(initial_condition.shape)
self.shape = self.z0.shape
self.x_neigh = {}
self.y_neigh = {}
def _update_x_average(self, x: np.ndarray, **kwargs):
if not isinstance(x, np.ndarray):
raise TypeError("Input must be a numpy.ndarray")
if x.shape != self.x.shape:
raise ValueError("Incompatible shapes")
self.x = x
def _update_y_average(self, y: np.ndarray, **kwargs):
if not isinstance(y, np.ndarray):
raise TypeError("Input must be a numpy.ndarray")
if y.shape != self.y.shape:
raise ValueError("Incompatible shapes")
self.y = y
def _update_local_solution(self, z: np.ndarray, **kwargs):
"""update the local solution
Args:
x: new value
Raises:
TypeError: Input must be a numpy.ndarray
ValueError: Incompatible shapes
"""
if not isinstance(z, np.ndarray):
raise TypeError("Input must be a numpy.ndarray")
if z.shape != self.z.shape:
raise ValueError("Incompatible shapes")
self.z = z
[docs] def iterate_run(self, **kwargs):
"""Run a single iterate of the algorithm
"""
# # in
# data = self.agent.neighbors_exchange(self.x)
# for neigh in data:
# self.x_neigh[neigh] = data[neigh]
# x_avg = self.agent.in_weights[self.agent.id] * self.x
# for i in self.agent.in_neighbors:
# x_avg += self.agent.in_weights[i] * self.x_neigh[i]
# x average
send_data = {}
for j in self.agent.out_neighbors:
send_data[j] = self.agent.out_weights[j] * self.x
data = self.agent.neighbors_exchange(send_data, dict_neigh=True)
for neigh in data:
self.x_neigh[neigh] = data[neigh]
x_avg = self.agent.out_weights[self.agent.id] * self.x
for i in self.agent.in_neighbors:
x_avg += self.x_neigh[i]
self._update_x_average(x_avg, **kwargs)
# y average
send_data = {}
for j in self.agent.out_neighbors:
send_data[j] = self.agent.out_weights[j] * self.y
data = self.agent.neighbors_exchange(send_data, dict_neigh=True)
for neigh in data:
self.y_neigh[neigh] = data[neigh]
y_avg = self.agent.out_weights[self.agent.id] * self.y
for i in self.agent.in_neighbors:
y_avg += self.y_neigh[i]
self._update_y_average(y_avg, **kwargs)
# aggregate
z = self.x/self.y
self._update_local_solution(z)
[docs] def run(self, iterations: int=100, verbose: bool=False, **kwargs):
"""Run the algorithm for a given number of iterations
Args:
iterations: Number of iterations. Defaults to 100.
verbose: If True print some information during the evolution of the algorithm. Defaults to False.
"""
if not isinstance(iterations, int):
raise TypeError("iterations must be an int")
if self.enable_log:
dims = [iterations]
for dim in self.x.shape:
dims.append(dim)
self.sequence = np.zeros(dims)
for k in range(iterations):
self.iterate_run(**kwargs)
if self.enable_log:
self.sequence[k] = self.x
if verbose:
if self.agent.id == 0:
print('Iteration {}'.format(k), end="\r")
if self.enable_log:
return self.sequence
[docs] def get_result(self):
"""Return the actual value of x
Returns:
numpy.ndarray: value of x
"""
return self.z