Skip to content

flgo.benchmark.base

AbstractTaskCalculator

Abstract Task Calculator

Source code in flgo\benchmark\base.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
class AbstractTaskCalculator(metaclass=ABCMeta):
    r"""
    Abstract Task Calculator
    """
    @abstractmethod
    def to_device(self, *args, **kwargs):
        """Put the data into the gpu device"""
        pass

    @abstractmethod
    def get_dataloader(self, *args, **kwargs):
        """Return a data loader that splits the input data into batches"""
        pass

    @abstractmethod
    def test(self, model, data, *args, **kwargs):
        """Evaluate the model on the data"""
        pass

    @abstractmethod
    def compute_loss(self, model, data, *args, **kwargs):
        """Compute the loss of the model on the data to complete the forward process"""
        pass

    @abstractmethod
    def get_optimizer(self, model, *args, **kwargs):
        """Return the optimizer on the parameters of the model"""
        pass

compute_loss(model, data, *args, **kwargs) abstractmethod

Compute the loss of the model on the data to complete the forward process

Source code in flgo\benchmark\base.py
69
70
71
72
@abstractmethod
def compute_loss(self, model, data, *args, **kwargs):
    """Compute the loss of the model on the data to complete the forward process"""
    pass

get_dataloader(*args, **kwargs) abstractmethod

Return a data loader that splits the input data into batches

Source code in flgo\benchmark\base.py
59
60
61
62
@abstractmethod
def get_dataloader(self, *args, **kwargs):
    """Return a data loader that splits the input data into batches"""
    pass

get_optimizer(model, *args, **kwargs) abstractmethod

Return the optimizer on the parameters of the model

Source code in flgo\benchmark\base.py
74
75
76
77
@abstractmethod
def get_optimizer(self, model, *args, **kwargs):
    """Return the optimizer on the parameters of the model"""
    pass

test(model, data, *args, **kwargs) abstractmethod

Evaluate the model on the data

Source code in flgo\benchmark\base.py
64
65
66
67
@abstractmethod
def test(self, model, data, *args, **kwargs):
    """Evaluate the model on the data"""
    pass

to_device(*args, **kwargs) abstractmethod

Put the data into the gpu device

Source code in flgo\benchmark\base.py
54
55
56
57
@abstractmethod
def to_device(self, *args, **kwargs):
    """Put the data into the gpu device"""
    pass

AbstractTaskGenerator

Abstract Task Generator

Source code in flgo\benchmark\base.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class AbstractTaskGenerator(metaclass=ABCMeta):
    r"""
    Abstract Task Generator
    """
    @abstractmethod
    def load_data(self, *args, **kwarg):
        """Load the original data into memory that can be partitioned"""
        pass

    @abstractmethod
    def partition(self, *args, **kwarg):
        """Partition the loaded data into subsets of data owned by clients
        and the test data owned by the server
        """
        pass

    @abstractmethod
    def generate(self, *args, **kwarg):
        """Load and partition the data, and then generate the necessary
        information about the federated task (e.g. path, partition way, ...)"""
        pass

generate(*args, **kwarg) abstractmethod

Load and partition the data, and then generate the necessary information about the federated task (e.g. path, partition way, ...)

Source code in flgo\benchmark\base.py
30
31
32
33
34
@abstractmethod
def generate(self, *args, **kwarg):
    """Load and partition the data, and then generate the necessary
    information about the federated task (e.g. path, partition way, ...)"""
    pass

load_data(*args, **kwarg) abstractmethod

Load the original data into memory that can be partitioned

Source code in flgo\benchmark\base.py
18
19
20
21
@abstractmethod
def load_data(self, *args, **kwarg):
    """Load the original data into memory that can be partitioned"""
    pass

partition(*args, **kwarg) abstractmethod

Partition the loaded data into subsets of data owned by clients and the test data owned by the server

Source code in flgo\benchmark\base.py
23
24
25
26
27
28
@abstractmethod
def partition(self, *args, **kwarg):
    """Partition the loaded data into subsets of data owned by clients
    and the test data owned by the server
    """
    pass

AbstractTaskPipe

Abstract Task Pipe

Source code in flgo\benchmark\base.py
36
37
38
39
40
41
42
43
44
45
46
47
48
class AbstractTaskPipe(metaclass=ABCMeta):
    r"""
    Abstract Task Pipe
    """
    @abstractmethod
    def save_task(self, *args, **kwargs):
        """Save a federated task created by TaskGenerator as a static file on the disk"""
        pass

    @abstractmethod
    def load_task(self, *args, **kwargs):
        """Load a federated task from disk"""
        pass

load_task(*args, **kwargs) abstractmethod

Load a federated task from disk

Source code in flgo\benchmark\base.py
45
46
47
48
@abstractmethod
def load_task(self, *args, **kwargs):
    """Load a federated task from disk"""
    pass

save_task(*args, **kwargs) abstractmethod

Save a federated task created by TaskGenerator as a static file on the disk

Source code in flgo\benchmark\base.py
40
41
42
43
@abstractmethod
def save_task(self, *args, **kwargs):
    """Save a federated task created by TaskGenerator as a static file on the disk"""
    pass

BasicTaskCalculator

Bases: AbstractTaskCalculator

Support task-specific computation when optimizing models, such as putting data into device, computing loss, evaluating models, and creating the data loader

Source code in flgo\benchmark\base.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
class BasicTaskCalculator(AbstractTaskCalculator):
    r"""
    Support task-specific computation when optimizing models, such
    as putting data into device, computing loss, evaluating models,
    and creating the data loader
    """

    def __init__(self, device, optimizer_name='sgd'):
        r"""
        Args:
            device (torch.device): device
            optimizer_name (str): the name of the optimizer
        """
        self.device = device
        self.optimizer_name = optimizer_name
        self.criterion = None
        self.DataLoader = None
        self.collect_fn = None

    def to_device(self, data, *args, **kwargs):
        return NotImplementedError

    def get_dataloader(self, dataset, batch_size=64, *args, **kwargs):
        return NotImplementedError

    def test(self, model, data, *args, **kwargs):
        return NotImplementedError

    def compute_loss(self, model, data, *args, **kwargs):
        return NotImplementedError

    def get_optimizer(self, model=None, lr=0.1, weight_decay=0, momentum=0):
        r"""
        Create optimizer of the model parameters

        Args:
            model (torch.nn.Module): model
            lr (float): learning rate
            weight_decay (float): the weight_decay coefficient
            momentum (float): the momentum coefficient

        Returns:
            the optimizer
        """
        OPTIM = getattr(importlib.import_module('torch.optim'), self.optimizer_name)
        filter_fn = filter(lambda p: p.requires_grad, model.parameters())
        if self.optimizer_name.lower() == 'sgd':
            return OPTIM(filter_fn, lr=lr, momentum=momentum, weight_decay=weight_decay)
        elif self.optimizer_name.lower() in ['adam', 'rmsprop', 'adagrad']:
            return OPTIM(filter_fn, lr=lr, weight_decay=weight_decay)
        else:
            raise RuntimeError("Invalid Optimizer.")

    def set_criterion(self, criterion:Callable)->None:
        self.criterion = criterion

    def set_collect_fn(self, collect_fn:Callable)->None:
        self.collect_fn = collect_fn

__init__(device, optimizer_name='sgd')

Parameters:

Name Type Description Default
device torch.device

device

required
optimizer_name str

the name of the optimizer

'sgd'
Source code in flgo\benchmark\base.py
354
355
356
357
358
359
360
361
362
363
364
def __init__(self, device, optimizer_name='sgd'):
    r"""
    Args:
        device (torch.device): device
        optimizer_name (str): the name of the optimizer
    """
    self.device = device
    self.optimizer_name = optimizer_name
    self.criterion = None
    self.DataLoader = None
    self.collect_fn = None

get_optimizer(model=None, lr=0.1, weight_decay=0, momentum=0)

Create optimizer of the model parameters

Parameters:

Name Type Description Default
model torch.nn.Module

model

None
lr float

learning rate

0.1
weight_decay float

the weight_decay coefficient

0
momentum float

the momentum coefficient

0

Returns:

Type Description

the optimizer

Source code in flgo\benchmark\base.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def get_optimizer(self, model=None, lr=0.1, weight_decay=0, momentum=0):
    r"""
    Create optimizer of the model parameters

    Args:
        model (torch.nn.Module): model
        lr (float): learning rate
        weight_decay (float): the weight_decay coefficient
        momentum (float): the momentum coefficient

    Returns:
        the optimizer
    """
    OPTIM = getattr(importlib.import_module('torch.optim'), self.optimizer_name)
    filter_fn = filter(lambda p: p.requires_grad, model.parameters())
    if self.optimizer_name.lower() == 'sgd':
        return OPTIM(filter_fn, lr=lr, momentum=momentum, weight_decay=weight_decay)
    elif self.optimizer_name.lower() in ['adam', 'rmsprop', 'adagrad']:
        return OPTIM(filter_fn, lr=lr, weight_decay=weight_decay)
    else:
        raise RuntimeError("Invalid Optimizer.")

BasicTaskGenerator

Bases: AbstractTaskGenerator

Load the original dataset and partition the original dataset into local_movielens_recommendation data

Source code in flgo\benchmark\base.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class BasicTaskGenerator(AbstractTaskGenerator):
    r"""
        Load the original dataset and partition the
        original dataset into local_movielens_recommendation data
    """
    def __init__(self, benchmark:str, rawdata_path:str):
        """
        Args:
            benchmark (str): the name of the federated task
            rawdata_path (str): the dictionary of the original dataset
        """
        # basic attribution
        self.benchmark = benchmark
        self.rawdata_path = rawdata_path
        # optional attribution
        self.partitioner = None
        self.train_data = None
        self.test_data = None
        self.val_data = None
        self.task_name = None
        self.para = {}
        self.additional_option = {}
        self.train_additional_option = {}
        self.test_additional_option = {}

    def generate(self, *args, **kwarg):
        """The whole process to generate federated task. """
        # load data
        self.load_data()
        # partition
        self.partition()
        # generate task name
        self.task_name = self.get_task_name()
        return

    def load_data(self, *args, **kwargs):
        """Download and load dataset into memory."""
        return

    def partition(self, *args, **kwargs):
        """Partition the data into different local_movielens_recommendation datasets"""
        return

    def register_partitioner(self, partitioner=None):
        """Register the partitioner as self's data partitioner"""
        self.partitioner = partitioner

    def init_para(self, para_list=None):
        pnames = list(self.para.keys())
        if para_list is not None:
            for i, pv in enumerate(para_list):
                pname = pnames[i]
                try:
                    self.para[pname] = type(self.para[pname])(pv)
                except:
                    self.para[pname] = pv
        for pname, pv in self.para.items():
            self.__setattr__(pname, pv)
        return

    def get_task_name(self):
        r"""
        Create the default name of the task
        """
        if not hasattr(self.partitioner, 'num_parties') and hasattr(self.partitioner, 'num_clients'):
            self.partitioner.num_parties = self.partitioner.num_clients
        else: self.partitioner.num_parties = 'unknown'
        return '_'.join(['B-' + self.benchmark, 'P-' + str(self.partitioner), 'N-' + str(self.partitioner.num_parties)])

__init__(benchmark, rawdata_path)

Parameters:

Name Type Description Default
benchmark str

the name of the federated task

required
rawdata_path str

the dictionary of the original dataset

required
Source code in flgo\benchmark\base.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def __init__(self, benchmark:str, rawdata_path:str):
    """
    Args:
        benchmark (str): the name of the federated task
        rawdata_path (str): the dictionary of the original dataset
    """
    # basic attribution
    self.benchmark = benchmark
    self.rawdata_path = rawdata_path
    # optional attribution
    self.partitioner = None
    self.train_data = None
    self.test_data = None
    self.val_data = None
    self.task_name = None
    self.para = {}
    self.additional_option = {}
    self.train_additional_option = {}
    self.test_additional_option = {}

generate(*args, **kwarg)

The whole process to generate federated task.

Source code in flgo\benchmark\base.py
104
105
106
107
108
109
110
111
112
def generate(self, *args, **kwarg):
    """The whole process to generate federated task. """
    # load data
    self.load_data()
    # partition
    self.partition()
    # generate task name
    self.task_name = self.get_task_name()
    return

get_task_name()

Create the default name of the task

Source code in flgo\benchmark\base.py
139
140
141
142
143
144
145
146
def get_task_name(self):
    r"""
    Create the default name of the task
    """
    if not hasattr(self.partitioner, 'num_parties') and hasattr(self.partitioner, 'num_clients'):
        self.partitioner.num_parties = self.partitioner.num_clients
    else: self.partitioner.num_parties = 'unknown'
    return '_'.join(['B-' + self.benchmark, 'P-' + str(self.partitioner), 'N-' + str(self.partitioner.num_parties)])

load_data(*args, **kwargs)

Download and load dataset into memory.

Source code in flgo\benchmark\base.py
114
115
116
def load_data(self, *args, **kwargs):
    """Download and load dataset into memory."""
    return

partition(*args, **kwargs)

Partition the data into different local_movielens_recommendation datasets

Source code in flgo\benchmark\base.py
118
119
120
def partition(self, *args, **kwargs):
    """Partition the data into different local_movielens_recommendation datasets"""
    return

register_partitioner(partitioner=None)

Register the partitioner as self's data partitioner

Source code in flgo\benchmark\base.py
122
123
124
def register_partitioner(self, partitioner=None):
    """Register the partitioner as self's data partitioner"""
    self.partitioner = partitioner

BasicTaskPipe

Bases: AbstractTaskPipe

Store the partition information of TaskGenerator into the disk when generating federated tasks.

Load the original dataset and the partition information to create the federated scenario when optimizing models

Source code in flgo\benchmark\base.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
class BasicTaskPipe(AbstractTaskPipe):
    r"""
    Store the partition information of TaskGenerator into the disk
    when generating federated tasks.

    Load the original dataset and the partition information to
    create the federated scenario when optimizing models
    """
    TaskDataset = None

    def __init__(self, task_path):
        r"""
        Args:
            task_path (str): the path of the federated task
        """
        self.task_path = task_path
        if os.path.exists(os.path.join(self.task_path, 'data.json')):
            with open(os.path.join(self.task_path, 'data.json'), 'r') as inf:
                self.feddata = json.load(inf)

    def save_task(self, generator):
        """Construct `feddata` and store it into the disk for recovering
        the partitioned datasets again from it"""
        raise NotImplementedError

    def load_data(self, running_time_option) -> dict:
        """Load the data and process it to the format that can be distributed
        to different objects"""
        raise NotImplementedError

    def generate_objects(self, running_time_option, algorithm, scene='horizontal') -> list:
        r"""
        Generate the virtual objects (i.e. coordinators and participants)
        in the FL system

        Args:
            running_time_option (dict): the option (i.e. configuration)
            algorithm (module|class): algorithm
            scene (str): horizontal or vertical
        """
        if scene=='horizontal':
            # init clients
            Client = algorithm.Client
            clients = [Client(running_time_option) for _ in range(len(self.feddata['client_names']))]
            for cid, c in enumerate(clients):
                c.id = cid
                c.name = self.feddata['client_names'][cid]
            # init server
            server = algorithm.Server(running_time_option)
            server.name = 'server'
            server.id = -1
            # bind clients and server
            server.register_clients(clients)
            for c in clients: c.register_server(server)
            # return objects as list
            objects = [server]
            objects.extend(clients)
        elif scene=='vertical':
            PassiveParty = algorithm.PassiveParty
            ActiveParty = algorithm.ActiveParty
            objects = []
            for pid, pname in enumerate(self.feddata['party_names']):
                is_active = self.feddata[pname]['data']['with_label']
                obj = ActiveParty(running_time_option) if is_active else PassiveParty(running_time_option)
                obj.id = pid
                obj.name = pname
                objects.append(obj)
            for party in objects:
                party.register_objects(objects)
        elif scene=='hierarchical':
            server = algorithm.Server(running_time_option)
            server.id = -1
            server.name = 'server'
            edge_servers = [algorithm.EdgeServer(running_time_option) for _ in range(self.feddata['num_edge_servers'])]
            for sid in range(len(edge_servers)):
                edge_servers[sid].id = sid
                edge_servers[sid].name = 'edge_server'+str(sid)
                edge_servers[sid].clients = []
            server.register_clients(edge_servers)
            clients = [algorithm.Client(running_time_option) for _ in range(len(self.feddata['client_names']))]
            edge_server_clients = [[] for _ in edge_servers]
            for cid, c in enumerate(clients):
                c.id = cid+len(edge_servers)
                c.name = self.feddata['client_names'][cid]
                edge_server_clients[self.feddata['client_group'][c.name]].append(c)
            for edge_server, client_set in zip(edge_servers,edge_server_clients):
                edge_server.register_clients(client_set)
            objects = [server]
            objects.extend(edge_servers)
            objects.extend(clients)
        elif scene=='decentralized':
            # init clients
            Client = algorithm.Client
            clients = [Client(running_time_option) for _ in range(len(self.feddata['client_names']))]
            for cid, c in enumerate(clients):
                c.id = cid
                c.name = self.feddata['client_names'][cid]
            # init topology of clients
            topology = self.feddata['topology']
            for c in clients:
                c.topology = topology
            adjacent = self.feddata['adjacent']
            for cid,c in enumerate(clients):
                c.clients = [clients[k] for k,nid in enumerate(adjacent[cid]) if nid>0]
            # init protocol
            protocol = algorithm.Protocol(running_time_option)
            protocol.name = 'protocol'
            # bind clients and server
            protocol.clients = clients
            # return objects as list
            objects = [protocol]
            objects.extend(clients)
        return objects

    def save_info(self, generator):
        r"""
        Save the basic information of the generated task into the disk
        """
        info = {'benchmark': '.'.join(generator.__module__.split('.')[:-1])}
        info['scene'] = generator.scene if hasattr(generator, 'scene') else 'unknown'
        info['num_clients'] = generator.num_clients if hasattr(generator, 'num_clients') else (generator.num_parties if hasattr(self, 'num_parties') else 'unknown')
        with open(os.path.join(self.task_path, 'info'), 'w') as outf:
            json.dump(info, outf)

    def load_task(self, running_time_option, *args, **kwargs):
        r"""
        Load the generated task into disk and create objects in the federated
        scenario.
        """
        task_data = self.load_data(running_time_option)
        objects = self.generate_objects(running_time_option)
        self.distribute(task_data, objects)
        return objects

    def distribute(self, task_data: dict, objects: list):
        r"""
        Distribute the loaded local_movielens_recommendation datasets to different objects in
        the federated scenario
        """
        for ob in objects:
            if ob.name in task_data.keys():
                ob_data = task_data[ob.name]
                for data_name, data in ob_data.items():
                    ob.set_data(data, data_name)

    def split_dataset(self, dataset, p=0.0):
        r"""
        Split the dataset into two parts.

        Args:
            dataset (torch.utils.data.Dataset): the dataset to be splitted
            p (float): the ratio of the splitting

        Returns:
            The two split parts
        """
        if p == 0: return dataset, None
        s1 = int(len(dataset) * p)
        s2 = len(dataset) - s1
        if s1==0:
            return dataset, None
        elif s2==0:
            return None, dataset
        else:
            return torch.utils.data.random_split(dataset, [s2, s1])

    def task_exists(self):
        r"""
        Check whether the task already exists.

        Returns:
            True if the task already exists
        """
        return os.path.exists(self.task_path)

    def remove_task(self):
        r"""Remove this task"""
        if self.task_exists():
            shutil.rmtree(self.task_path)
        return

    def create_task_architecture(self):
        """Create the directories of the task."""
        if not self.task_exists():
            os.mkdir(self.task_path)
            os.mkdir(os.path.join(self.task_path, 'record'))
            os.mkdir(os.path.join(self.task_path, 'log'))
        else:
            raise FileExistsError("federated task {} already exists!".format(self.task_path))

    def gen_client_names(self, num_clients):
        r"""
        Generate the names of clients

        Returns:
            a list of strings
        """
        return [('Client{:0>' + str(len(str(num_clients))) + 'd}').format(i) for i in range(num_clients)]

__init__(task_path)

Parameters:

Name Type Description Default
task_path str

the path of the federated task

required
Source code in flgo\benchmark\base.py
158
159
160
161
162
163
164
165
166
def __init__(self, task_path):
    r"""
    Args:
        task_path (str): the path of the federated task
    """
    self.task_path = task_path
    if os.path.exists(os.path.join(self.task_path, 'data.json')):
        with open(os.path.join(self.task_path, 'data.json'), 'r') as inf:
            self.feddata = json.load(inf)

create_task_architecture()

Create the directories of the task.

Source code in flgo\benchmark\base.py
329
330
331
332
333
334
335
336
def create_task_architecture(self):
    """Create the directories of the task."""
    if not self.task_exists():
        os.mkdir(self.task_path)
        os.mkdir(os.path.join(self.task_path, 'record'))
        os.mkdir(os.path.join(self.task_path, 'log'))
    else:
        raise FileExistsError("federated task {} already exists!".format(self.task_path))

distribute(task_data, objects)

Distribute the loaded local_movielens_recommendation datasets to different objects in the federated scenario

Source code in flgo\benchmark\base.py
282
283
284
285
286
287
288
289
290
291
def distribute(self, task_data: dict, objects: list):
    r"""
    Distribute the loaded local_movielens_recommendation datasets to different objects in
    the federated scenario
    """
    for ob in objects:
        if ob.name in task_data.keys():
            ob_data = task_data[ob.name]
            for data_name, data in ob_data.items():
                ob.set_data(data, data_name)

gen_client_names(num_clients)

Generate the names of clients

Returns:

Type Description

a list of strings

Source code in flgo\benchmark\base.py
338
339
340
341
342
343
344
345
def gen_client_names(self, num_clients):
    r"""
    Generate the names of clients

    Returns:
        a list of strings
    """
    return [('Client{:0>' + str(len(str(num_clients))) + 'd}').format(i) for i in range(num_clients)]

generate_objects(running_time_option, algorithm, scene='horizontal')

Generate the virtual objects (i.e. coordinators and participants) in the FL system

Parameters:

Name Type Description Default
running_time_option dict

the option (i.e. configuration)

required
algorithm module|class

algorithm

required
scene str

horizontal or vertical

'horizontal'
Source code in flgo\benchmark\base.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def generate_objects(self, running_time_option, algorithm, scene='horizontal') -> list:
    r"""
    Generate the virtual objects (i.e. coordinators and participants)
    in the FL system

    Args:
        running_time_option (dict): the option (i.e. configuration)
        algorithm (module|class): algorithm
        scene (str): horizontal or vertical
    """
    if scene=='horizontal':
        # init clients
        Client = algorithm.Client
        clients = [Client(running_time_option) for _ in range(len(self.feddata['client_names']))]
        for cid, c in enumerate(clients):
            c.id = cid
            c.name = self.feddata['client_names'][cid]
        # init server
        server = algorithm.Server(running_time_option)
        server.name = 'server'
        server.id = -1
        # bind clients and server
        server.register_clients(clients)
        for c in clients: c.register_server(server)
        # return objects as list
        objects = [server]
        objects.extend(clients)
    elif scene=='vertical':
        PassiveParty = algorithm.PassiveParty
        ActiveParty = algorithm.ActiveParty
        objects = []
        for pid, pname in enumerate(self.feddata['party_names']):
            is_active = self.feddata[pname]['data']['with_label']
            obj = ActiveParty(running_time_option) if is_active else PassiveParty(running_time_option)
            obj.id = pid
            obj.name = pname
            objects.append(obj)
        for party in objects:
            party.register_objects(objects)
    elif scene=='hierarchical':
        server = algorithm.Server(running_time_option)
        server.id = -1
        server.name = 'server'
        edge_servers = [algorithm.EdgeServer(running_time_option) for _ in range(self.feddata['num_edge_servers'])]
        for sid in range(len(edge_servers)):
            edge_servers[sid].id = sid
            edge_servers[sid].name = 'edge_server'+str(sid)
            edge_servers[sid].clients = []
        server.register_clients(edge_servers)
        clients = [algorithm.Client(running_time_option) for _ in range(len(self.feddata['client_names']))]
        edge_server_clients = [[] for _ in edge_servers]
        for cid, c in enumerate(clients):
            c.id = cid+len(edge_servers)
            c.name = self.feddata['client_names'][cid]
            edge_server_clients[self.feddata['client_group'][c.name]].append(c)
        for edge_server, client_set in zip(edge_servers,edge_server_clients):
            edge_server.register_clients(client_set)
        objects = [server]
        objects.extend(edge_servers)
        objects.extend(clients)
    elif scene=='decentralized':
        # init clients
        Client = algorithm.Client
        clients = [Client(running_time_option) for _ in range(len(self.feddata['client_names']))]
        for cid, c in enumerate(clients):
            c.id = cid
            c.name = self.feddata['client_names'][cid]
        # init topology of clients
        topology = self.feddata['topology']
        for c in clients:
            c.topology = topology
        adjacent = self.feddata['adjacent']
        for cid,c in enumerate(clients):
            c.clients = [clients[k] for k,nid in enumerate(adjacent[cid]) if nid>0]
        # init protocol
        protocol = algorithm.Protocol(running_time_option)
        protocol.name = 'protocol'
        # bind clients and server
        protocol.clients = clients
        # return objects as list
        objects = [protocol]
        objects.extend(clients)
    return objects

load_data(running_time_option)

Load the data and process it to the format that can be distributed to different objects

Source code in flgo\benchmark\base.py
173
174
175
176
def load_data(self, running_time_option) -> dict:
    """Load the data and process it to the format that can be distributed
    to different objects"""
    raise NotImplementedError

load_task(running_time_option, *args, **kwargs)

Load the generated task into disk and create objects in the federated scenario.

Source code in flgo\benchmark\base.py
272
273
274
275
276
277
278
279
280
def load_task(self, running_time_option, *args, **kwargs):
    r"""
    Load the generated task into disk and create objects in the federated
    scenario.
    """
    task_data = self.load_data(running_time_option)
    objects = self.generate_objects(running_time_option)
    self.distribute(task_data, objects)
    return objects

remove_task()

Remove this task

Source code in flgo\benchmark\base.py
323
324
325
326
327
def remove_task(self):
    r"""Remove this task"""
    if self.task_exists():
        shutil.rmtree(self.task_path)
    return

save_info(generator)

Save the basic information of the generated task into the disk

Source code in flgo\benchmark\base.py
262
263
264
265
266
267
268
269
270
def save_info(self, generator):
    r"""
    Save the basic information of the generated task into the disk
    """
    info = {'benchmark': '.'.join(generator.__module__.split('.')[:-1])}
    info['scene'] = generator.scene if hasattr(generator, 'scene') else 'unknown'
    info['num_clients'] = generator.num_clients if hasattr(generator, 'num_clients') else (generator.num_parties if hasattr(self, 'num_parties') else 'unknown')
    with open(os.path.join(self.task_path, 'info'), 'w') as outf:
        json.dump(info, outf)

save_task(generator)

Construct feddata and store it into the disk for recovering the partitioned datasets again from it

Source code in flgo\benchmark\base.py
168
169
170
171
def save_task(self, generator):
    """Construct `feddata` and store it into the disk for recovering
    the partitioned datasets again from it"""
    raise NotImplementedError

split_dataset(dataset, p=0.0)

Split the dataset into two parts.

Parameters:

Name Type Description Default
dataset torch.utils.data.Dataset

the dataset to be splitted

required
p float

the ratio of the splitting

0.0

Returns:

Type Description

The two split parts

Source code in flgo\benchmark\base.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
def split_dataset(self, dataset, p=0.0):
    r"""
    Split the dataset into two parts.

    Args:
        dataset (torch.utils.data.Dataset): the dataset to be splitted
        p (float): the ratio of the splitting

    Returns:
        The two split parts
    """
    if p == 0: return dataset, None
    s1 = int(len(dataset) * p)
    s2 = len(dataset) - s1
    if s1==0:
        return dataset, None
    elif s2==0:
        return None, dataset
    else:
        return torch.utils.data.random_split(dataset, [s2, s1])

task_exists()

Check whether the task already exists.

Returns:

Type Description

True if the task already exists

Source code in flgo\benchmark\base.py
314
315
316
317
318
319
320
321
def task_exists(self):
    r"""
    Check whether the task already exists.

    Returns:
        True if the task already exists
    """
    return os.path.exists(self.task_path)

FromDatasetGenerator

Bases: BasicTaskGenerator

This generator will do: 1. Directly create train_data and test_data from input; 2. Convert the train_data into the scheme that can be partitioned by Partitioner if necessary.

Source code in flgo\benchmark\base.py
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
class FromDatasetGenerator(BasicTaskGenerator):
    r"""
    This generator will do:
    1. Directly create train_data and test_data from input;
    2. Convert the train_data into the scheme that can be partitioned by Partitioner if necessary.
    """
    def __init__(self, benchmark, train_data, val_data=None, test_data=None):
        super(FromDatasetGenerator, self).__init__(benchmark=benchmark, rawdata_path='')
        self.train_data = train_data
        self.val_data = val_data
        self.test_data = test_data

    def generate(self, *args, **kwarg):
        self.partition()

    def partition(self, *args, **kwargs):
        self.train_data = self.prepare_data_for_partition()
        self.local_datas = self.partitioner(self.train_data)
        self.num_clients = len(self.local_datas)

    def prepare_data_for_partition(self):
        """Transform the attribution self.train_data into the format that can be received by partitioner"""
        return self.train_data

prepare_data_for_partition()

Transform the attribution self.train_data into the format that can be received by partitioner

Source code in flgo\benchmark\base.py
503
504
505
def prepare_data_for_partition(self):
    """Transform the attribution self.train_data into the format that can be received by partitioner"""
    return self.train_data

XYHorizontalTaskPipe

Bases: BasicTaskPipe

This pipe is for supervised learning where each sample contains a feature \(x_i\) and a label \(y_i\) that can be indexed by \(i\). To use this pipe, it's necessary to set the attribute test_data of the generator to be a dict like: {'x': [...], 'y':[...]} and the attribute local_datas to be a list of the above dict that means the local_movielens_recommendation data owned by clients: [{'x':[...], 'y':[...]}, ..., ]

Source code in flgo\benchmark\base.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
class XYHorizontalTaskPipe(BasicTaskPipe):
    """
    This pipe is for supervised learning where each sample contains a feature $x_i$ and a label $y_i$
     that can be indexed by $i$.
    To use this pipe, it's necessary to set the attribute `test_data` of the generator to be a dict like:
        {'x': [...], 'y':[...]}
    and the attribute `local_datas` to be a list of the above dict that means the local_movielens_recommendation data owned by clients:
        [{'x':[...], 'y':[...]}, ..., ]
    """
    TaskDataset = torch.utils.data.TensorDataset

    def save_task(self, generator):
        client_names = self.gen_client_names(len(generator.local_datas))
        feddata = {'client_names': client_names, 'server': {'data': generator.test_data}}
        for cid in range(len(client_names)): feddata[client_names[cid]] = {'data': generator.local_datas[cid]}
        with open(os.path.join(self.task_path, 'data.json'), 'w') as outf:
            json.dump(feddata, outf)

    def load_data(self, running_time_option) -> dict:
        test_data = self.feddata['server']['data']
        test_data = self.TaskDataset(torch.tensor(test_data['x']), torch.tensor(test_data['y']))
        local_datas = [self.TaskDataset(torch.tensor(self.feddata[cname]['data']['x']),
                                        torch.tensor(self.feddata[cname]['data']['y'])) for cname in
                       self.feddata['client_names']]
        server_data_test, server_data_val = self.split_dataset(test_data, running_time_option['test_holdout'])
        task_data = {'server': {'test': server_data_test, 'val': server_data_val}}
        for key in self.feddata['server'].keys():
            if key == 'data':
                continue
            task_data['server'][key] = self.feddata['server'][key]
        for cid, cname in enumerate(self.feddata['client_names']):
            cdata = local_datas[cid]
            cdata_train, cdata_val = self.split_dataset(cdata, running_time_option['train_holdout'])
            if running_time_option['local_test'] and cdata_val is not None:
                cdata_val, cdata_test = self.split_dataset(cdata_val, 0.5)
            else:
                cdata_test = None
            task_data[cname] = {'train': cdata_train, 'val': cdata_val, 'test': cdata_test}
            for key in self.feddata[cname]:
                if key == 'data':
                    continue
                task_data[cname][key] = self.feddata[cname][key]
        return task_data