from typing import Union, Any
from threading import Event
from ..communicators import Communicator, MPICommunicator
from ..problems import Problem
[docs]class Agent():
"""The Agent object represents an agent in a network with communication capabilities
Args:
in_neighbors (list): list of agents from which communication is received
out_neighbors (list): list of agents to which information is send
communicator (Communicator, optional): a Communicator object used to perform communications (if none is provided, it is automatically set to MPICommunicator). Defaults to None.
in_weights (list or dict, optional): list or dict containing weights to assign to information coming from each in-neighbor. If a list is provided, it must have lenght equal to the number of agents in the network. If a dict is provided, it must have a key for each in-neighbor and, associated to it, the correspondig weight. Defaults to None, implies equal in_weights to in-neighbors.
out_weights (list or dict, optional): list or dict containing weights to assign to out-neighbor. If a list is provided, it must have lenght equal to the number of agents in the network. If a dict is provided, it must have a key for each out-neighbor and, associated to it, the correspondig weight. Defaults to None, implies equal in_weights to out-neighbors.
auto_local (bool, optional): If False the (in-)weight for the local agent must be provided. Otherwise it is set automatically, provided that the in_weights have sum in [0,1]. Defaults to True.
Attributes:
id (int): id of the Agent
in_neighbors (list): list of in-neighbors
out_neighbors (list): list of out-neighbors
in_weights (dict): a dict containing weights to assign to information coming from each in-neighbor.
out_weights (dict): a dict containing weights to assign to out-neighbors.
communicator (Communicator): Communicator object used to perform communications.
problem (Problem): Local optimization problem.
"""
def __init__(self, in_neighbors: list = None, out_neighbors: list = None, communicator: Communicator = None,
in_weights: Union[list, dict] = None, out_weights: Union[list, dict] = None, auto_local: bool = True):
if communicator is not None:
if isinstance(communicator, Communicator):
self.communicator = communicator
else:
raise ValueError(
"the communicator must be and instance of the Communicator class.")
else:
self.communicator = MPICommunicator()
self.id = self.communicator.rank
if in_neighbors is None:
in_neighbors = list(range(self.communicator.size)).pop(self.id)
if out_neighbors is None:
out_neighbors = in_neighbors
# Set neighbors
self.in_neighbors = None
self.out_neighbors = None
self.set_neighbors(in_neighbors, out_neighbors)
self.in_weights = {}
self.set_weights(in_weights, out_weights, auto_local)
self.problem = None
[docs] def set_neighbors(self, in_neighbors: list, out_neighbors: list):
"""Set in and out neighbors
Args:
in_neighbors: list of agents from which communication is received
out_neighbors: list of agents to which information is send
"""
if isinstance(in_neighbors, list) and isinstance(out_neighbors, list):
self.in_neighbors = in_neighbors
self.out_neighbors = out_neighbors
# Remove self-loops if any
if self.id in self.in_neighbors:
self.in_neighbors.pop(self.id)
if self.id in self.out_neighbors:
self.out_neighbors.pop(self.id)
[docs] def set_weights(self, in_weights: Union[list, dict] = None, out_weights: Union[list, dict] = None, auto_local: bool = True):
"""Set in_weights to assign to in-neighbors and the one for agent itself.
Args:
in_weights: list or dict contatining in_weights to assign to information coming from each in-neighbor. If a list is provided, it must have lenght equal to the number of agents in the network. If a dict is provided, it must have a key for each in-neighbor and, associated to it, the correspondig weight. Defaults to None, implies equal in_weights to in-neighbors.
out_weights: list or dict contatining in_weights to assign to out-neighbors. If a list is provided, it must have lenght equal to the number of agents in the network. If a dict is provided, it must have a key for each out-neighbor and, associated to it, the correspondig weight. Defaults to None, implies equal in_weights to out neighbors.
auto_local: If False the weight for the local agent must be provided. Otherwise it is set automatically, provided that the in_weights have sum in [0,1]. Defaults to True.
Raises:
ValueError: If a dict is provided as argument, it must contain a key for each in-neighbor.
ValueError: Input must be list or dict
ValueError: If auto_local is not False, the provided in_weights must have sum in [0,1]
"""
self.in_weights = {}
self.out_weights = {}
if in_weights is not None:
# if a list is provided with length equal to the number of agents in the network
# it is converted in a dict
if isinstance(in_weights, list) and len(in_weights) == self.communicator.size:
for neighbor in self.in_neighbors:
self.in_weights[neighbor] = in_weights[neighbor]
# if a dict is provided it is checked if all in neighbors have a weight
else:
if isinstance(in_weights, dict):
for neighbor in self.in_neighbors:
if neighbor in in_weights:
self.in_weights[neighbor] = in_weights[neighbor]
else:
raise ValueError(
"A weight for each in-neighbor must be provided.")
else:
raise ValueError("The in_weights argument must be a list or a dict. \
If a list is provided, it must have lenght equal to the number of agents in the network. \
If a dict is provided, it must have a key for each in-neighbor and, associated to it, the correspondig weight.")
# Assign self weight (if sum of provided in_weights in [0,1])
if not auto_local:
self.in_weights[self.id] = in_weights[self.id]
else:
if 0 <= sum(self.in_weights.values()) <= 1:
self.in_weights[self.id] = 1 - sum(self.in_weights.values())
else:
raise ValueError(
"If auto_local is set to True, the provided in_weights must have sum in [0,1]")
else:
for neighbor in self.in_neighbors:
self.in_weights[neighbor] = 1/len(self.in_neighbors)
if out_weights is not None:
# if a list is provided with length equal to the number of agents in the network
# it is converted in a dict
if isinstance(out_weights, list) and len(out_weights) == self.communicator.size:
for neighbor in self.out_neighbors:
self.out_weights[neighbor] = out_weights[neighbor]
# if a dict is provided it is checked if all in neighbors have a weight
else:
if isinstance(out_weights, dict):
for neighbor in self.out_neighbors:
if neighbor in out_weights:
self.out_weights[neighbor] = out_weights[neighbor]
else:
raise ValueError(
"A weight for each in-neighbor must be provided.")
else:
raise ValueError("The in_weights argument must be a list or a dict. \
If a list is provided, it must have lenght equal to the number of agents in the network. \
If a dict is provided, it must have a key for each in-neighbor and, associated to it, the correspondig weight.")
# Assign self out_weight (if sum of provided in_weights in [0,1])
if not auto_local:
self.out_weights[self.id] = out_weights[self.id]
else:
if 0 <= sum(self.out_weights.values()) <= 1:
self.out_weights[self.id] = 1 - sum(self.out_weights.values())
else:
raise ValueError(
"If auto_local is set to True, the provided out_weights must have sum in [0,1]")
else:
for neighbor in self.out_neighbors:
self.out_weights[neighbor] = 1/len(self.out_neighbors)
[docs] def neighbors_exchange(self, obj: Any, dict_neigh=False, event: Event=None):
"""Exchange data with neighbors (synchronously). Send obj to the out-neighbors and receive received_obj from in-neighbors
Args:
obj: object to send
dict_neigh: True if obj contains a dictionary with different objects for each neighbor. Defaults to False.
Returns:
dict: a dictionary containing an object for each in-neighbor
"""
received_obj = self.communicator.neighbors_exchange(
obj, self.in_neighbors, self.out_neighbors, dict_neigh, event)
return received_obj
[docs] def neighbors_send(self, obj: Any):
"""Send data to out-neighbors
Args:
obj: object to send
"""
self.communicator.neighbors_send(obj, self.out_neighbors)
[docs] def neighbors_receive_asynchronous(self):
"""Receive data from in-neighbors (if any have been sent)
Returns:
dict: a dictionary containing an object for each in-neighbor that has sent one
"""
received_obj = self.communicator.neighbors_receive_asynchronous(
self.out_neighbors)
return received_obj
[docs] def set_problem(self, problem: Problem):
"""set the local optimization problem
Args:
problem (Problem): Problem object
Raises:
TypeError: Input must be a Problem object
"""
if not isinstance(problem, Problem):
raise TypeError("Input must be a Problem object")
self.problem = problem