Skip to content

1 Intermittent Client Availability

Figure_availability

Classical FL ideally assumes the full client availability where all the clients are always available during the FL training process, which is inpractical in the real-world situation. To enable more sutdies on FL wiht intermittent client availability, we provide APIs for customizing different types of intermittent client availability.

2 API for Availability

The availability of clients is described by the rule of shift of the availability state, which contains two variables for each client in Simulator:

  • prob_available (float): range(0,1), the probability of being available from a unavailable state
  • prob_unavailable (float): range(0,1), the probability of being unavailable from a unavailable state

To set the next moment's availability, one should overwrite the method update_client_availability(self) of a new created Simulator.

import flgo.simulator.base
class MySimulator(flgo.simulator.base.BasicSimulator):
    def update_client_availabililty(self):
        """
        Define the variable prob_unavailable and prob_available for all the clients by APIs 
        self.set_variable(self.all_clients, 'prob_available', prob_values: List[float]) and 
        self.set_variable(self.all_clients, 'unprob_available', prob_values: List[float]).
        If the next state of a client is deterministic, directly set the prob value to be 1 or 0.
        To fix the availability state of clients until the next aggregation round comes, set attribute 
        self.round_fixed_availability=True, whose value is False as default.
        """
        return

Now we show how to customize client availability distribution by a simple example.

3 Example

import flgo.algorithm.fedavg as fedavg
import flgo.experiment.analyzer
import flgo.experiment.logger as fel
import flgo.simulator.base
import flgo.benchmark.cifar10_classification as cifar
import flgo.benchmark.partition as fbp
import os
import flgo.simulator.base
import random
# 1. Define two simulators for comparison
# 1.1 The probability of stage shift are fixed as 0.1,where clients' states are hard to change
class MySimulator(flgo.simulator.base.BasicSimulator):
    def update_client_availability(self):
        if self.gv.clock.current_time==0:
            self.set_variable(self.all_clients, 'prob_available', [1 for _ in self.clients])
            self.set_variable(self.all_clients, 'prob_unavailable', [int(random.random() >= 0.5) for _ in self.clients])
            return
        pa = [0.1 for _ in self.clients]
        pua = [0.1 for _ in self.clients]
        self.set_variable(self.all_clients, 'prob_available', pa)
        self.set_variable(self.all_clients, 'prob_unavailable', pua)

# 1.2 The probability of clients are fixed as 0.9, where clients' states are easy to change
class MySimulator2(flgo.simulator.base.BasicSimulator):
    def update_client_availability(self):
        if self.gv.clock.current_time==0:
            self.set_variable(self.all_clients, 'prob_available', [1 for _ in self.clients])
            self.set_variable(self.all_clients, 'prob_unavailable', [int(random.random() >= 0.5) for _ in self.clients])
            return
        pa = [0.9 for _ in self.clients]
        pua = [0.9 for _ in self.clients]
        self.set_variable(self.all_clients, 'prob_available', pa)
        self.set_variable(self.all_clients, 'prob_unavailable', pua)

# 2. Generate federated task and test
task = './IID_cifar10'
gen_config = {
    'benchmark': cifar,
    'partitioner': fbp.IIDPartitioner
}
if not os.path.exists(task): flgo.gen_task(gen_config, task_path=task)

# 3. Customize Logger to record the availability of clients at each moment
class MyLogger(fel.BasicLogger):
    def log_once(self, *args, **kwargs):
        if self.gv.clock.current_time==0: return
        self.output['available_clients'].append(self.coordinator.available_clients)
        print(self.output['available_clients'][-1])

if __name__ == '__main__':
    # 4. Respectively Run FedAvg with the two simulators
    runner1 = flgo.init(task, fedavg, {'gpu':[0,],'log_file':True, 'num_steps':1, 'num_rounds':100}, Logger=MyLogger, Simulator=MySimulator)
    runner1.run()
    runner2 = flgo.init(task, fedavg, {'gpu':[0,],'log_file':True, 'num_steps':1, 'num_rounds':100}, Logger=MyLogger, Simulator=MySimulator2)
    runner2.run()

    # 5. visualize the availability distribution
    selector = flgo.experiment.analyzer.Selector({'task':task, 'header':['fedavg',], })
    def visualize_availability(rec_data, title = ''):
        avl_clients = rec_data['available_clients']
        all_points_x = []
        all_points_y = []
        for round in range(len(avl_clients)):
            all_points_x.extend([round + 1 for _ in avl_clients[round]])
            all_points_y.extend([cid for cid in avl_clients[round]])
        import matplotlib.pyplot as plt
        plt.scatter(all_points_x, all_points_y, s=10)
        plt.title(title)
        plt.xlabel('communication round')
        plt.ylabel('client ID')
        plt.show()
    rec0 = selector.records[task][0]
    visualize_availability(rec0.data, rec0.name[rec0.name.find('_SIM')+4:rec0.name.find('_SIM')+16])
    rec1 = selector.records[task][1]
    visualize_availability(rec1.data, rec1.name[rec1.name.find('_SIM')+4:rec1.name.find('_SIM')+16])

fig_simulator1 fig_simulator2