Source code for disropt.utils.graph_constructor

import numpy as np
from mpi4py import MPI
from typing import Union

[docs]def ring_graph(N: int, link_type: str='undirected') -> np.ndarray: """construct a ring graph Args: N (int): number of agents link_type (str, optional): 'directed' or 'undirected'. Defaults to 'undirected'. Returns: numpy.ndarray: adjacency matrix """ Adj = np.roll(np.eye(N), -1, 0) if link_type == 'undirected': Adj = np.logical_or(Adj, Adj.transpose()) return Adj.astype(int)
def path_graph(N: int) -> np.ndarray: """construct an undirected path graph Args: N (int): number of agents Returns: numpy.ndarray: adjacency matrix """ Adj = np.eye(N, k=1) Adj = np.logical_or(Adj, Adj.transpose()) return Adj.astype(int)
[docs]def binomial_random_graph(N: int, p: float=None, seed: int=None, link_type: str='undirected') -> np.ndarray: """construct a random binomial graph Args: N (int): number of agents p (float, optional): link probability. Defaults to None (=1). seed (int, optional): [description]. Defaults to None (=1). link_type (str, optional): 'directed' or 'undirected'. Defaults to 'undirected'. Returns: numpy.ndarray: adjacency matrix """ if p is None: p = 1 if seed is None: seed = 1 np.random.seed(seed) while True: Adj = np.random.binomial(1, p, (N, N)) # each entry is 1 with prob "p" if link_type == 'undirected': Adj = np.logical_or(Adj, Adj.transpose()) # Undirected I_NN = np.eye(N) # Adj = np.logical_or(Adj, I_NN).astype(int) # add self - loops Adj = np.logical_and(Adj, np.logical_not(I_NN)).astype(int) # remove self - loops testAdj = np.linalg.matrix_power((I_NN + Adj), N) # check if G is connected check = 0 for i in range(N): check += int(bool(len(np.nonzero(Adj[:, i])[0]))) + int(bool(len(np.nonzero(Adj[i])[0]))) if not np.any(np.any(np.logical_not(testAdj))) and check == 2*N: break return Adj
[docs]def binomial_random_graph_sequence(under_adj: np.ndarray, n_graphs: int, p: Union[float, np.ndarray]=0.1, period: int=None, seed: int=None, link_type: str='undirected'): """Construct a sequence of random binomial graphs starting from a given underlying graph Args: under_adj (2D numpy.ndarray): Adjacency matrix of underlying graph. n_graphs (int): number of graphs in the returned sequence. p (float or 2D numpy.ndarray, optional): link probability. Defaults to None (=0.1). period (int, optional): T-connectivity period of the returned sequence (obtained artificially with cycles). Defaults to None (no T-connectivity). seed (int, optional): [description]. Defaults to None (=1). link_type (str, optional): 'directed' or 'undirected'. Defaults to 'undirected'. Returns: tuple(numpy.ndarray): sequence of adjacency matrices """ if p is None: p = 0.1 if seed is None: seed = 1 np.random.seed(seed) N = under_adj.shape[0] adj_mask = under_adj > 0 adj_seq = [] for t in range(n_graphs): # random matrix from underlying graph adj_tt = np.random.binomial(adj_mask, p) if link_type == 'undirected': adj_tt = np.logical_or(adj_tt, adj_tt.transpose()) # Undirected # add edges artificially to obtain T-connectivity if period is not None: # TODO implement period != N i = t % N j = (t+1) % N adj_tt[i,j] = 1 adj_tt[j,i] = 1 adj_seq.append(adj_tt.astype(int)) return tuple(adj_seq)
[docs]def metropolis_hastings(Adj: np.ndarray, link_type: str='undirected') -> np.ndarray: """Construct a weight matrix using the Metropolis-Hastings method Args: Adj (numpy.ndarray): Adjacency matrix Returns: numpy.ndarray: weighted adjacency matrix """ # weighted adjacency matrix (Metropolis-Hastings) N = np.shape(Adj)[0] degree = np.sum(Adj, axis=0) W = np.zeros([N, N]) for i in range(N): N_i = np.nonzero(Adj[i, :])[0] # Fixed Neighbors for j in N_i: W[i, j] = 1/(1+np.max([degree[i], degree[j]])) W[i, i] = 1 - np.sum(W[i, :]) return W
[docs]def row_stochastic_matrix(Adj: np.ndarray, weights_type: str='uniform') -> np.ndarray: """Construct a row-stochastic weighted adjacency matrix Args: Adj (numpy.ndarray): Adjacency matrix Returns: numpy.ndarray: weighted adjacency matrix """ # row stochastic adjacency matrix N = np.shape(Adj)[0] W = np.zeros([N, N]) if weights_type == 'uniform': for i in range(N): N_i = len(np.nonzero(Adj[i, :])[0]) W[i, :] = Adj[i, :]/(N_i + 1) elif weights_type == 'random': for i in range(N): N_i = np.nonzero(Adj[i, :])[0].tolist() for j in N_i: W[i, j] = np.random.rand() self_weight = np.random.rand() W[i, :] = W[i, :]/(self_weight + sum(W[i, :])) else: raise ValueError('Unknown in_weights type') return W
[docs]def column_stochastic_matrix(Adj: np.ndarray, weights_type: str='uniform') -> np.ndarray: """Construct a column-stochastic weighted adjacency matrix Args: Adj (numpy.ndarray): Adjacency matrix Returns: numpy.ndarray: weighted adjacency matrix """ # column stochastic adjacency matrix W = row_stochastic_matrix(Adj.transpose(), weights_type).transpose() return W
[docs]class MPIgraph: """Create a graph on the network Args: graph_type (str, optional): type of graph ('complete', 'random_binomial'). Defaults to None (complete). in_weight_matrix_type (str, optional): type of matrix describing in-neighbors weights ('metropolis', 'row_stochastic', 'column_stochastic'). Defaults to None (metropolis). out_weight_matrix_type (str, optional): type of matrix describing out-neighbors weights ('metropolis', 'row_stochastic', 'column_stochastic'). Defaults to None (metropolis). """ def __init__(self, graph_type: str=None, in_weight_matrix_type: str=None, out_weight_matrix_type: str=None, **kwargs): if graph_type is None: graph_type = 'complete' if in_weight_matrix_type is None: in_weight_matrix_type = 'metropolis' if out_weight_matrix_type is None: out_weight_matrix_type = 'metropolis' self.number_of_agents = MPI.COMM_WORLD.Get_size() self.adjacency_matrix = None self.graph_type = None self.in_weights = None self.out_weights = None self.__set_adjacency_matrix(graph_type, **kwargs) self.__set_in_weights(in_weight_matrix_type) self.__set_out_weights(out_weight_matrix_type) def __set_adjacency_matrix(self, graph_type, **kwargs): if graph_type == 'complete': self.adjacency_matrix = np.ones([self.number_of_agents, self.number_of_agents]) - np.eye(self.number_of_agents) self.graph_type = 'undirected' elif graph_type == 'random_binomial': p = kwargs.get('p', None) seed = kwargs.get('seed', None) links_type = kwargs.get('links_type', 'undirected') self.graph_type = links_type self.adjacency_matrix = binomial_random_graph(self.number_of_agents, p, seed, links_type) else: raise ValueError("Unknown graph type") def __set_in_weights(self, weight_matrix_type): if weight_matrix_type == 'metropolis': self.in_weights = metropolis_hastings(self.adjacency_matrix) elif weight_matrix_type == 'row_stochastic': self.in_weights = row_stochastic_matrix(self.adjacency_matrix) elif weight_matrix_type == 'column_stochastic': self.in_weights = column_stochastic_matrix(self.adjacency_matrix) else: raise ValueError("Unknown in_weights matrix type") def __set_out_weights(self, weight_matrix_type): if weight_matrix_type == 'metropolis': self.out_weights = metropolis_hastings(self.adjacency_matrix) elif weight_matrix_type == 'row_stochastic': self.out_weights = row_stochastic_matrix(self.adjacency_matrix) elif weight_matrix_type == 'column_stochastic': self.out_weights = column_stochastic_matrix(self.adjacency_matrix) else: raise ValueError("Unknown in_weights matrix type")
[docs] def get_local_info(self): """return the local info available at the agent Returns: tuple: local_rank, in_neighbors, out_neighbors, in_weights, out_weights, """ local_rank = MPI.COMM_WORLD.Get_rank() in_neighbors = np.nonzero(self.adjacency_matrix[local_rank, :])[0].tolist() out_neighbors = np.nonzero(self.adjacency_matrix[:, local_rank])[0].tolist() in_weights = self.in_weights[local_rank, :].tolist() out_weights = self.out_weights[:, local_rank].tolist() return local_rank, in_neighbors, out_neighbors, in_weights, out_weights