1 Intermittent Client Availability
Classical FL ideally assumes the full client availability where all the clients are always available during the FL training process, which is inpractical in the real-world situation. To enable more sutdies on FL wiht intermittent client availability, we provide APIs for customizing different types of intermittent client availability.
2 API for Availability
The availability of clients is described by the rule of shift of the availability state, which contains two variables for each client in Simulator
:
- prob_available (float): range(0,1), the probability of being available from a unavailable state
- prob_unavailable (float): range(0,1), the probability of being unavailable from a unavailable state
To set the next moment's availability, one should overwrite the method update_client_availability(self)
of a new created Simulator
.
import flgo.simulator.base
class MySimulator(flgo.simulator.base.BasicSimulator):
def update_client_availabililty(self):
"""
Define the variable prob_unavailable and prob_available for all the clients by APIs
self.set_variable(self.all_clients, 'prob_available', prob_values: List[float]) and
self.set_variable(self.all_clients, 'unprob_available', prob_values: List[float]).
If the next state of a client is deterministic, directly set the prob value to be 1 or 0.
To fix the availability state of clients until the next aggregation round comes, set attribute
self.round_fixed_availability=True, whose value is False as default.
"""
return
Now we show how to customize client availability distribution by a simple example.
3 Example
import flgo.algorithm.fedavg as fedavg
import flgo.experiment.analyzer
import flgo.experiment.logger as fel
import flgo.simulator.base
import flgo.benchmark.cifar10_classification as cifar
import flgo.benchmark.partition as fbp
import os
import flgo.simulator.base
import random
# 1. Define two simulators for comparison
# 1.1 The probability of stage shift are fixed as 0.1,where clients' states are hard to change
class MySimulator(flgo.simulator.base.BasicSimulator):
def update_client_availability(self):
if self.gv.clock.current_time==0:
self.set_variable(self.all_clients, 'prob_available', [1 for _ in self.clients])
self.set_variable(self.all_clients, 'prob_unavailable', [int(random.random() >= 0.5) for _ in self.clients])
return
pa = [0.1 for _ in self.clients]
pua = [0.1 for _ in self.clients]
self.set_variable(self.all_clients, 'prob_available', pa)
self.set_variable(self.all_clients, 'prob_unavailable', pua)
# 1.2 The probability of clients are fixed as 0.9, where clients' states are easy to change
class MySimulator2(flgo.simulator.base.BasicSimulator):
def update_client_availability(self):
if self.gv.clock.current_time==0:
self.set_variable(self.all_clients, 'prob_available', [1 for _ in self.clients])
self.set_variable(self.all_clients, 'prob_unavailable', [int(random.random() >= 0.5) for _ in self.clients])
return
pa = [0.9 for _ in self.clients]
pua = [0.9 for _ in self.clients]
self.set_variable(self.all_clients, 'prob_available', pa)
self.set_variable(self.all_clients, 'prob_unavailable', pua)
# 2. Generate federated task and test
task = './IID_cifar10'
gen_config = {
'benchmark': cifar,
'partitioner': fbp.IIDPartitioner
}
if not os.path.exists(task): flgo.gen_task(gen_config, task_path=task)
# 3. Customize Logger to record the availability of clients at each moment
class MyLogger(fel.BasicLogger):
def log_once(self, *args, **kwargs):
if self.gv.clock.current_time==0: return
self.output['available_clients'].append(self.coordinator.available_clients)
print(self.output['available_clients'][-1])
if __name__ == '__main__':
# 4. Respectively Run FedAvg with the two simulators
runner1 = flgo.init(task, fedavg, {'gpu':[0,],'log_file':True, 'num_steps':1, 'num_rounds':100}, Logger=MyLogger, Simulator=MySimulator)
runner1.run()
runner2 = flgo.init(task, fedavg, {'gpu':[0,],'log_file':True, 'num_steps':1, 'num_rounds':100}, Logger=MyLogger, Simulator=MySimulator2)
runner2.run()
# 5. visualize the availability distribution
selector = flgo.experiment.analyzer.Selector({'task':task, 'header':['fedavg',], })
def visualize_availability(rec_data, title = ''):
avl_clients = rec_data['available_clients']
all_points_x = []
all_points_y = []
for round in range(len(avl_clients)):
all_points_x.extend([round + 1 for _ in avl_clients[round]])
all_points_y.extend([cid for cid in avl_clients[round]])
import matplotlib.pyplot as plt
plt.scatter(all_points_x, all_points_y, s=10)
plt.title(title)
plt.xlabel('communication round')
plt.ylabel('client ID')
plt.show()
rec0 = selector.records[task][0]
visualize_availability(rec0.data, rec0.name[rec0.name.find('_SIM')+4:rec0.name.find('_SIM')+16])
rec1 = selector.records[task][1]
visualize_availability(rec1.data, rec1.name[rec1.name.find('_SIM')+4:rec1.name.find('_SIM')+16])