Consensus algorithms

Classical consensus

The classical consensus algorithm is implemented through the Consensus class.

From the perspective of agent \(i\) the classical consensus algorithm works as follows. For \(k=0,1,\dots\)

\[x_i^{k+1} = \sum_{j=1}^N w_{ij} x_j^k\]

where \(x_i\in\mathbb{R}^n\) and \(w_{ij}\) is the weight assigned by agent \(i\) to agent \(j\). Usually, average consensus (i.e., the convergence of the local estimate sequences to the initial average) is guaranteed only if the weights \(w_{ij}\) form a doubly-stochastic matrix. Otherwise, agreement is still reached but at some other point.

In order to simulate a consensus algorithm over a static undirected graph (with a doubly-stochastic weight matrix), create a file containing the following code and call it launcher.py

launcher.py
import numpy as np
from mpi4py import MPI
from disropt.agents import Agent
from disropt.algorithms import Consensus
from disropt.utils.graph_constructor import binomial_random_graph, metropolis_hastings

# get MPI info
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
local_rank = comm.Get_rank()

# Generate a common graph (everyone use the same seed)
Adj = binomial_random_graph(nproc, p=0.3, seed=1)
W = metropolis_hastings(Adj)

# reset local seed
np.random.seed()

# create local agent
agent = Agent(in_neighbors=np.nonzero(Adj[local_rank, :])[0].tolist(),
              out_neighbors=np.nonzero(Adj[:, local_rank])[0].tolist(),
              in_weights=W[local_rank, :].tolist())

# instantiate the consensus algorithm
n = 4  # decision variable dimension (n, 1)
x0 = np.random.rand(n, 1)
algorithm = Consensus(agent=agent,
                      initial_condition=x0,
                      enable_log=True)  # enable storing of the generated sequences

# run the algorithm
sequence = algorithm.run(iterations=100)

# print solution
print("Agent {}: {}".format(agent.id, algorithm.get_result()))

# save data
np.save("agents.npy", nproc)
np.save("agent_{}_sequence.npy".format(agent.id), sequence)

And then execute it with the desired number of agents:

mpirun -np 12 --oversubscribe python launcher.py

where the flag --oversubscribe is necessary only if the requested number of agents (12 in this case) is higher than the available number of cores (or computing units).

Plot the generated sequences

In order to plot the local sequences generated by the algorithm, we create the file results.py.

results.py
import numpy as np
import matplotlib.pyplot as plt

# Load the number of agents
N = np.load("agents.npy")

# Load the locally generated sequences
sequence = {}
for i in range(N):
    filename = "agent_{}_sequence.npy".format(i)
    sequence[i] = np.load(filename)

# Plot the evolution of the local estimates
# generate N colors
colors = {}
for i in range(N):
    colors[i] = np.random.rand(3, 1).flatten()
# generate figure
plt.figure()
for i in range(N):
    dims = sequence[i].shape
    iterations = dims[0]
    for j in range(dims[1]):
        if j == 0: # to avoid generating multiple labels
            plt.plot(np.arange(iterations), sequence[i][:, j, 0], color=colors[i], label='agent {}'.format(i+1))
        else:
            plt.plot(np.arange(iterations), sequence[i][:, j, 0], color=colors[i])
plt.legend(loc='upper right')
plt.title("Evolution of the local estimates")
plt.xlabel(r"iterations ($k$)")
plt.ylabel(r"$x_i^k$")
plt.savefig('results_fig.png')
plt.show()

We execute results.py through:

python results.py
../../_images/results_fig.png

Time-varying graphs

Convergence of consensus algorithms is still achieved over time-varying (possibly directed) graphs, provided that they are jointly strongly connected.

In this case, one can use the set_neighbors and set_weights methods of the Agent class in order to modify the communication network when necessary. This can be done in various ways, for example by overriding the run method of the algorithm of by calling it multiple times over different graphs. For example, suppose that the graph changes every 100 iterations and that you want to perform 1000 iterations, then:

for g in range(10):
    # generate a new common graph (everyone use the same seed)
    Adj = construct_graph(nproc, p=0.3, seed=g)
    W = metropolis_hastings(Adj)

    # set new neighbors and weights
    agent.set_neighbors(in_neighbors=np.nonzero(Adj[local_rank, :])[0].tolist(),
                        out_neighbors=np.nonzero(Adj[:, local_rank])[0].tolist())
    agent.set_weights(weights=W[local_rank, :].tolist())

    algorithm.run(iterations=100)

Block-wise consensus

Consensus can be also performed block-wise with respect to the decision variable by using the BlockConsensus class.

From the perspective of agent \(i\) the algorithm works as follows. At iteration \(k\), if the agent is awake, it selects a random block \(\ell_i^k\) of its local solution and updates

\[\begin{split}x_{i,\ell}^{k+1} = \begin{cases} \sum_{j\in\mathcal{N}_i} w_{ij} x_{j\mid i,\ell}^k & \text{if} \ell = \ell_i^k \\ x_{i,\ell}^{k} & \text{otherwise} \end{cases}\end{split}\]

where \(\mathcal{N}_i\) is the current set of in-neighbors and \(x_{j\mid i},j\in\mathcal{N}_i\) is the local copy of \(x_j\) available at node \(i\) and \(x_{i,\ell}\) denotes the \(\ell\)-th block of \(x_i\). Otherwise \(x_{i}^{k+1}=x_i^k\).

Moreover, at each iteration, each agent can update its local estimate or not at each iteration according to a certain probability (awakening_probability), thus modeling some asyncrhony.

The algorithm can be istantiated by providing a list of blocks of the decision variable and the probabilities of drawing each block:

algorithm = BlockConsensus(agent=agent,
                           initial_condition=x0,
                           enable_log=True,
                           blocks_list=[(0, 1), (2, 3)],
                           probabilities=[0.3, 0.7],
                           awakening_probability=0.5)

Asynchronous consensus

Asynchronous consensus can be seen as a sort of synchronous consensus over time-varying graphs with delays (which may model non negligible computation times and unreliable links) and it is implemented in the AsynchronousConsensus class.

When running this algorithm, you can control the computation and sleep times of each agent and the communication channels failure probabilities. Moreover, when running asynchronous algorithms, you have to set the total duration of the execution (and not the number of iterations). We provide the following example.

import numpy as np
from mpi4py import MPI
from disropt.agents import Agent
from disropt.algorithms import AsynchronousConsensus
from disropt.utils.graph_constructor import binomial_random_graph, metropolis_hastings


# get MPI info
comm = MPI.COMM_WORLD
nproc = comm.Get_size()
local_rank = comm.Get_rank()

# Generate a common graph (everyone use the same seed)
Adj = binomial_random_graph(nproc, p=0.3, seed=1)
W = metropolis_hastings(Adj)

# reset local seed
np.random.seed()

# create local agent
agent = Agent(
    in_neighbors=np.nonzero(Adj[local_rank, :])[0].tolist(),
    out_neighbors=np.nonzero(Adj[:, local_rank])[0].tolist(),
    in_weights=W[local_rank, :].tolist())

# instantiate the asynchronous consensus algorithm
x0 = np.random.rand(2, 1)
algorithm = AsynchronousConsensus(agent=agent,
                                  initial_condition=x0,
                                  enable_log=True,
                                  force_sleep=True,
                                  maximum_sleep=0.1,
                                  sleep_type="random",
                                  force_computation_time=True,
                                  maximum_computation_time=0.1,
                                  computation_time_type="random",
                                  force_unreliable_links=True,
                                  link_failure_probability=0.1)

# run the algorithm
timestamp_sequence_awake, timestamp_sequence_sleep, sequence = algorithm.run(running_time=4)

# print solution
print("Agent {}: {}".format(agent.id, algorithm.get_result()))

# save data
np.save("agents.npy", nproc)
np.save("agent_{}_sequence.npy".format(agent.id), sequence)
np.save("agent_{}_timestamp_sequence_awake.npy".format(agent.id), timestamp_sequence_awake)
np.save("agent_{}_timestamp_sequence_sleep.npy".format(agent.id), timestamp_sequence_sleep)

Plot the generated sequences

import numpy as np
import matplotlib.pyplot as plt


# number of agents
N = np.load("agents.npy")

# retrieve local sequences
sequence = {}
timestamp_sequence_awake = {}
timestamp_sequence_sleep = {}
colors = {}
t_init = None
for i in range(N):
    colors[i] = np.random.rand(3, 1).flatten()
    filename = "agent_{}_sequence.npy".format(i)
    sequence[i] = np.load(filename, allow_pickle=True)

    filename = "agent_{}_timestamp_sequence_awake.npy".format(i)
    timestamp_sequence_awake[i] = np.load(filename, allow_pickle=True)

    filename = "agent_{}_timestamp_sequence_sleep.npy".format(i)
    timestamp_sequence_sleep[i] = np.load(filename, allow_pickle=True)

    if t_init is not None:
        m = min(timestamp_sequence_awake[i])
        t_init = min(t_init, m)
    else:
        t_init = min(timestamp_sequence_awake[i])

for i in range(N):
    timestamp_sequence_awake[i] = timestamp_sequence_awake[i] - t_init
    timestamp_sequence_sleep[i] = timestamp_sequence_sleep[i] - t_init

# plot
plt.figure()
colors = {}
for i in range(N):
    colors[i] = np.random.rand(3, 1).flatten()

    dims = sequence[i].shape
    for j in range(dims[1]):
        if j == 0:
            plt.plot(timestamp_sequence_sleep[i], sequence[i][:, j, 0],
                     color=colors[i],
                     label="Agent {}: awakenings={}".format(i+1, dims[0]))
        else:
            plt.plot(timestamp_sequence_sleep[i], sequence[i][:, j, 0],
                     color=colors[i])
plt.xlabel("time (s)")
plt.ylabel("x_i")
plt.title("Local estimates sequences")
plt.legend()

plt.savefig('sequences.png')

S = {}
for i in range(N):
    aw = np.array(timestamp_sequence_awake[i])
    aw = np.vstack([aw, np.zeros(aw.shape)])
    sl = np.array(timestamp_sequence_sleep[i])
    sl = np.vstack([sl, np.ones(sl.shape)])
    aux = np.hstack([aw, sl]).transpose()
    signal = aux[aux[:, 0].argsort()]
    inverse_signal = np.zeros(signal.shape)
    inverse_signal += signal
    inverse_signal[:, 1] = abs(inverse_signal[:, 1] - 1)
    ww = np.empty([signal.shape[0]*2, 2])
    ww[::2] = signal
    ww[1::2] = inverse_signal
    S[i] = ww

fig, axes = plt.subplots(int(N),1, sharex=True)
for i in range(N):
    axes[i].plot(S[i][:, 0], S[i][:, 1], color=colors[i])
    axes[i].set_ylabel("{}".format(i))
        
plt.xlabel("time (s)")

plt.savefig('sleep_awake.png')

plt.show()
../../_images/sequences.png ../../_images/sleep_awake.png