Consensus algorithms¶
Classical consensus¶
The classical consensus algorithm is implemented through the Consensus
class.
From the perspective of agent \(i\) the classical consensus algorithm works as follows. For \(k=0,1,\dots\)
where \(x_i\in\mathbb{R}^n\) and \(w_{ij}\) is the weight assigned by agent \(i\) to agent \(j\). Usually, average consensus (i.e., the convergence of the local estimate sequences to the initial average) is guaranteed only if the weights \(w_{ij}\) form a doubly-stochastic matrix. Otherwise, agreement is still reached but at some other point.
In order to simulate a consensus algorithm over a static undirected graph (with a doubly-stochastic weight matrix), create a file containing the following code and call it launcher.py
import numpy as np
from mpi4py import MPI
from disropt.agents import Agent
from disropt.algorithms import Consensus
from disropt.utils.graph_constructor import binomial_random_graph, metropolis_hastings
# get MPI info
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
local_rank = comm.Get_rank()
# Generate a common graph (everyone use the same seed)
Adj = binomial_random_graph(nproc, p=0.3, seed=1)
W = metropolis_hastings(Adj)
# reset local seed
np.random.seed()
# create local agent
agent = Agent(in_neighbors=np.nonzero(Adj[local_rank, :])[0].tolist(),
out_neighbors=np.nonzero(Adj[:, local_rank])[0].tolist(),
in_weights=W[local_rank, :].tolist())
# instantiate the consensus algorithm
n = 4 # decision variable dimension (n, 1)
x0 = np.random.rand(n, 1)
algorithm = Consensus(agent=agent,
initial_condition=x0,
enable_log=True) # enable storing of the generated sequences
# run the algorithm
sequence = algorithm.run(iterations=100)
# print solution
print("Agent {}: {}".format(agent.id, algorithm.get_result()))
# save data
np.save("agents.npy", nproc)
np.save("agent_{}_sequence.npy".format(agent.id), sequence)
And then execute it with the desired number of agents:
mpirun -np 12 --oversubscribe python launcher.py
where the flag --oversubscribe
is necessary only if the requested number of agents (12 in this case) is higher than the available number of cores (or computing units).
Plot the generated sequences
In order to plot the local sequences generated by the algorithm, we create the file results.py.
import numpy as np
import matplotlib.pyplot as plt
# Load the number of agents
N = np.load("agents.npy")
# Load the locally generated sequences
sequence = {}
for i in range(N):
filename = "agent_{}_sequence.npy".format(i)
sequence[i] = np.load(filename)
# Plot the evolution of the local estimates
# generate N colors
colors = {}
for i in range(N):
colors[i] = np.random.rand(3, 1).flatten()
# generate figure
plt.figure()
for i in range(N):
dims = sequence[i].shape
iterations = dims[0]
for j in range(dims[1]):
if j == 0: # to avoid generating multiple labels
plt.plot(np.arange(iterations), sequence[i][:, j, 0], color=colors[i], label='agent {}'.format(i+1))
else:
plt.plot(np.arange(iterations), sequence[i][:, j, 0], color=colors[i])
plt.legend(loc='upper right')
plt.title("Evolution of the local estimates")
plt.xlabel(r"iterations ($k$)")
plt.ylabel(r"$x_i^k$")
plt.savefig('results_fig.png')
plt.show()
We execute results.py through:
python results.py
Time-varying graphs¶
Convergence of consensus algorithms is still achieved over time-varying (possibly directed) graphs, provided that they are jointly strongly connected.
In this case, one can use the set_neighbors
and set_weights
methods of the Agent
class in order to modify the communication network when necessary. This can be done in various ways, for example by overriding the run
method of the algorithm of by calling it multiple times over different graphs. For example, suppose that the graph changes every 100 iterations and that you want to perform 1000 iterations, then:
for g in range(10):
# generate a new common graph (everyone use the same seed)
Adj = construct_graph(nproc, p=0.3, seed=g)
W = metropolis_hastings(Adj)
# set new neighbors and weights
agent.set_neighbors(in_neighbors=np.nonzero(Adj[local_rank, :])[0].tolist(),
out_neighbors=np.nonzero(Adj[:, local_rank])[0].tolist())
agent.set_weights(weights=W[local_rank, :].tolist())
algorithm.run(iterations=100)
Block-wise consensus¶
Consensus can be also performed block-wise with respect to the decision variable by using the BlockConsensus
class.
From the perspective of agent \(i\) the algorithm works as follows. At iteration \(k\), if the agent is awake, it selects a random block \(\ell_i^k\) of its local solution and updates
where \(\mathcal{N}_i\) is the current set of in-neighbors and \(x_{j\mid i},j\in\mathcal{N}_i\) is the local copy of \(x_j\) available at node \(i\) and \(x_{i,\ell}\) denotes the \(\ell\)-th block of \(x_i\). Otherwise \(x_{i}^{k+1}=x_i^k\).
Moreover, at each iteration, each agent can update its local estimate or not at each iteration according to a certain probability (awakening_probability), thus modeling some asyncrhony.
The algorithm can be istantiated by providing a list of blocks of the decision variable and the probabilities of drawing each block:
algorithm = BlockConsensus(agent=agent,
initial_condition=x0,
enable_log=True,
blocks_list=[(0, 1), (2, 3)],
probabilities=[0.3, 0.7],
awakening_probability=0.5)
Asynchronous consensus¶
Asynchronous consensus can be seen as a sort of synchronous consensus over time-varying graphs with delays (which may model non negligible computation times and unreliable links) and it is implemented in the AsynchronousConsensus
class.
When running this algorithm, you can control the computation and sleep times of each agent and the communication channels failure probabilities. Moreover, when running asynchronous algorithms, you have to set the total duration of the execution (and not the number of iterations). We provide the following example.
import numpy as np
from mpi4py import MPI
from disropt.agents import Agent
from disropt.algorithms import AsynchronousConsensus
from disropt.utils.graph_constructor import binomial_random_graph, metropolis_hastings
# get MPI info
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
local_rank = comm.Get_rank()
# Generate a common graph (everyone use the same seed)
Adj = binomial_random_graph(nproc, p=0.3, seed=1)
W = metropolis_hastings(Adj)
# reset local seed
np.random.seed()
# create local agent
agent = Agent(
in_neighbors=np.nonzero(Adj[local_rank, :])[0].tolist(),
out_neighbors=np.nonzero(Adj[:, local_rank])[0].tolist(),
in_weights=W[local_rank, :].tolist())
# instantiate the asynchronous consensus algorithm
x0 = np.random.rand(2, 1)
algorithm = AsynchronousConsensus(agent=agent,
initial_condition=x0,
enable_log=True,
force_sleep=True,
maximum_sleep=0.1,
sleep_type="random",
force_computation_time=True,
maximum_computation_time=0.1,
computation_time_type="random",
force_unreliable_links=True,
link_failure_probability=0.1)
# run the algorithm
timestamp_sequence_awake, timestamp_sequence_sleep, sequence = algorithm.run(running_time=4)
# print solution
print("Agent {}: {}".format(agent.id, algorithm.get_result()))
# save data
np.save("agents.npy", nproc)
np.save("agent_{}_sequence.npy".format(agent.id), sequence)
np.save("agent_{}_timestamp_sequence_awake.npy".format(agent.id), timestamp_sequence_awake)
np.save("agent_{}_timestamp_sequence_sleep.npy".format(agent.id), timestamp_sequence_sleep)
Plot the generated sequences
import numpy as np
import matplotlib.pyplot as plt
# number of agents
N = np.load("agents.npy")
# retrieve local sequences
sequence = {}
timestamp_sequence_awake = {}
timestamp_sequence_sleep = {}
colors = {}
t_init = None
for i in range(N):
colors[i] = np.random.rand(3, 1).flatten()
filename = "agent_{}_sequence.npy".format(i)
sequence[i] = np.load(filename, allow_pickle=True)
filename = "agent_{}_timestamp_sequence_awake.npy".format(i)
timestamp_sequence_awake[i] = np.load(filename, allow_pickle=True)
filename = "agent_{}_timestamp_sequence_sleep.npy".format(i)
timestamp_sequence_sleep[i] = np.load(filename, allow_pickle=True)
if t_init is not None:
m = min(timestamp_sequence_awake[i])
t_init = min(t_init, m)
else:
t_init = min(timestamp_sequence_awake[i])
for i in range(N):
timestamp_sequence_awake[i] = timestamp_sequence_awake[i] - t_init
timestamp_sequence_sleep[i] = timestamp_sequence_sleep[i] - t_init
# plot
plt.figure()
colors = {}
for i in range(N):
colors[i] = np.random.rand(3, 1).flatten()
dims = sequence[i].shape
for j in range(dims[1]):
if j == 0:
plt.plot(timestamp_sequence_sleep[i], sequence[i][:, j, 0],
color=colors[i],
label="Agent {}: awakenings={}".format(i+1, dims[0]))
else:
plt.plot(timestamp_sequence_sleep[i], sequence[i][:, j, 0],
color=colors[i])
plt.xlabel("time (s)")
plt.ylabel("x_i")
plt.title("Local estimates sequences")
plt.legend()
plt.savefig('sequences.png')
S = {}
for i in range(N):
aw = np.array(timestamp_sequence_awake[i])
aw = np.vstack([aw, np.zeros(aw.shape)])
sl = np.array(timestamp_sequence_sleep[i])
sl = np.vstack([sl, np.ones(sl.shape)])
aux = np.hstack([aw, sl]).transpose()
signal = aux[aux[:, 0].argsort()]
inverse_signal = np.zeros(signal.shape)
inverse_signal += signal
inverse_signal[:, 1] = abs(inverse_signal[:, 1] - 1)
ww = np.empty([signal.shape[0]*2, 2])
ww[::2] = signal
ww[1::2] = inverse_signal
S[i] = ww
fig, axes = plt.subplots(int(N),1, sharex=True)
for i in range(N):
axes[i].plot(S[i][:, 0], S[i][:, 1], color=colors[i])
axes[i].set_ylabel("{}".format(i))
plt.xlabel("time (s)")
plt.savefig('sleep_awake.png')
plt.show()