Skip to content

In this section, we first introduce the general paradigm of horizontal FL and then discuss the corresponding implementation in FLGo.

2.1.1 Classical Paradigm

Figure_fedprocess

In a classical horizontal FL scene, there is a center server that coordinates clients to collaboratively train a global model iteratively. In each iteration, the server first samples a subset from all the clients. Then, the server broadcasts the global model the selected clients. After receiving the global model, the clients locally train it with local data. Finally, the clients send back the updated models to the server and the server aggregates the models into the new global model. The whole process is as shown in the figure above. Existing methods usually improve one or more of the five steps to realize various purposes like fairness and robustness.

Figure1

The cooresponding implementation of the FL process is shown in Figure 2. We use iterate function to model the behaviors of the server and reply function to model the behaviors of clients when being selected.

2.1.2 Details of Steps

Server's Behavior: Server.iterate

The training process starts with run method of the server, which starts iterations (i.e. communication rounds) by using a loop. In each iteration of the loop, the server will call iterate to carry out each step. A standard implementation of iterate (i.e. flgo.algorithm.fedbase.iterate) is as below:

    def iterate(self):
        """
        The standard iteration of each federated communication round that contains three
        necessary procedure in FL: client selection, communication and model aggregation.

        Returns:
            False if the global model is not updated in this iteration
        """
        # sample clients: Uniform sampling as default
        self.selected_clients = self.sample()
        # training
        models = self.communicate(self.selected_clients)['model']
        # aggregate: pk = ni/sum(ni) as default
        self.model = self.aggregate(models)
        return len(models) > 0

① Server.sample

During each iteration, the server first sample clients by calling self.sample(), which returns a list of clients' IDs. We implement three sampling strategies in our preset sampling method as below. full sampling means selecting all the clients. uniform sampling means selecting clients uniformly without replacement. md sampling means selecting clients with replacement by probabilities w.r.t. the ratio of data sizes. Improvement on sampling strategies can be adapted here by overwriting sample.

    def sample(self):
        r"""
        Sample the clients. There are three types of sampling manners:
        full sample, uniform sample without replacement, and MDSample
        with replacement. Particularly, if 'available' is in self.sample_option,
        the server will only sample from currently available clients.

        Returns:
            a list of the ids of the selected clients

        Example:
        ```python
            >>> selected_clients=self.sample()
            >>> selected_clients
            >>> # The selected_clients is a list of clients' ids
        ```
        """
        all_clients = self.available_clients if 'available' in self.sample_option else [cid for cid in
                                                                                        range(self.num_clients)]
        # full sampling with unlimited communication resources of the server
        if 'full' in self.sample_option:
            return all_clients
        # sample clients
        elif 'uniform' in self.sample_option:
            # original sample proposed by fedavg
            selected_clients = list(
                np.random.choice(all_clients, min(self.clients_per_round, len(all_clients)), replace=False)) if len(
                all_clients) > 0 else []
        elif 'md' in self.sample_option:
            # the default setting that is introduced by FedProx, where the clients are sampled with the probability in proportion to their local_movielens_recommendation data sizes
            local_data_vols = [self.clients[cid].datavol for cid in all_clients]
            total_data_vol = sum(local_data_vols)
            p = np.array(local_data_vols) / total_data_vol
            selected_clients = list(np.random.choice(all_clients, self.clients_per_round, replace=True, p=p)) if len(
                all_clients) > 0 else []
        return selected_clients

② Communication- Broadcast: Server.pack & Client.unpack

The communication process is realized by the method communicate(client_ids: list[int], mtype: str, asynchronous: bool), which contains a full ask&reply process between the server and the clients. The second step only refers to the broadcast-communication, which only describes what the server transmitting to the clients. Therefore, we use two method, Server.pack(client_id) and Client.unpack() to model the broadcast-communication process

class Server:
    def pack(self, client_id, mtype=0, *args, **kwargs):
        """
        Pack the necessary information for the client's local training.
        Any operations of compression or encryption should be done here.
        :param
            client_id: the id of the client to communicate with
        :return
            a dict that only contains the global model as default.
        """
        return {
            "model" : copy.deepcopy(self.model),
        }

class Client:
        def unpack(self, received_pkg):
        """
        Unpack the package received from the server
        :param
            received_pkg: a dict contains the global model as default
        :return:
            the unpacked information that can be rewritten
        """
        # unpack the received package
        return received_pkg['model']

The transmitted package should be a dict in python. The server will send a copy of the global model, and the client will unpack the package to obtain the global model as default. Any changes on the content of the down-streaming packages should be implemented here.

Clients' Behavior: Client.reply

After clients receiving the global models, the method Client.reply will automatically be triggered to model the clients' behaviors. The implementation of reply is as follows:

    def reply(self, svr_pkg):
        r"""
        Reply a package to the server. The whole local_movielens_recommendation procedure should be defined here.
        The standard form consists of three procedure: unpacking the
        server_package to obtain the global model, training the global model,
        and finally packing the updated model into client_package.

        Args:
            svr_pkg (dict): the package received from the server

        Returns:
            client_pkg (dict): the package to be send to the server
        """
        model = self.unpack(svr_pkg)
        self.train(model)
        cpkg = self.pack(model)
        return cpkg

③ Local Training: Client.train

The local training is made by the method Client.train, which receives a global model as the input and trains it with local data. Any modification on local training procedures should be implemented here. The default implementation is as follow:

    def train(self, model):
        r"""
        Standard local_movielens_recommendation training procedure. Train the transmitted model with
        local_movielens_recommendation training dataset.

        Args:
            model (FModule): the global model
        """
        model.train()
        optimizer = self.calculator.get_optimizer(model, lr=self.learning_rate, weight_decay=self.weight_decay,
                                                  momentum=self.momentum)
        for iter in range(self.num_steps):
            # get a batch of data
            batch_data = self.get_batch_data()
            model.zero_grad()
            # calculate the loss of the model on batched dataset through task-specified calculator
            loss = self.calculator.compute_loss(model, batch_data)['loss']
            loss.backward()
            if self.clip_grad>0:torch.nn.utils.clip_grad_norm_(parameters=model.parameters(), max_norm=self.clip_grad)
            optimizer.step()
        return

Particularly, we let the task-spefific calculation be transparent to the optimization algorithms. Therefore, one algorithm (e.g. FedAvg) can be adapted to different types of tasks without any changes. calculator is responsible for all the task-specific calculations.

④ Communication - Upload: Client.pack & Server.unpack

The communication of uploading models from clients is modeled by Client.pack(*args, **kwargs) and Server.unpack(packages_list), which is similar to the step ②. Different from ②, the server as the receiver needs to simultaneously handle a list of packages from different clients. We let Server.unpack return the values in the uploaded packages as a dict that shares the same keys with each client's pakcage. Modification on the content of upload-communication should be implemented in Client.pack that returns a dict as a package each time.

class Server:
    def unpack(self, packages_received_from_clients):
        """
        Unpack the information from the received packages. Return models and losses as default.
        :param
            packages_received_from_clients:
        :return:
            res: collections.defaultdict that contains several lists of the clients' reply
        """
        if len(packages_received_from_clients)==0: return collections.defaultdict(list)
        res = {pname:[] for pname in packages_received_from_clients[0]}
        for cpkg in packages_received_from_clients:
            for pname, pval in cpkg.items():
                res[pname].append(pval)
        return res

class Client:
    def pack(self, model, *args, **kwargs):
        """
        Packing the package to be send to the server. The operations of compression
        of encryption of the package should be done here.
        :param
            model: the locally trained model
        :return
            package: a dict that contains the necessary information for the server
        """
        return {
            "model" : model,
            }

⑤ Model Aggregation: Server.aggregate()

The server finally aggregates the received models into a new global model by the method Server.aggregate(models: list). There are four preset aggregation modes in our implementation. And using the normalized ratios of local data sizes (i.e. FedAvg) is set the default aggregatino option.

def aggregate(self, models: list, *args, **kwargs):
        """
        Aggregate the locally improved models.
        :param
            models: a list of local models
        :return
            the averaged result
        pk = nk/n where n=self.data_vol
        K = |S_t|
        N = |S|
        -------------------------------------------------------------------------------------------------------------------------
         weighted_scale                 |uniform (default)          |weighted_com (original fedavg)   |other
        ==========================================================================================================================
        N/K * Σpk * model_k             |1/K * Σmodel_k             |(1-Σpk) * w_old + Σpk * model_k  |Σ(pk/Σpk) * model_k
        """
        if len(models) == 0: return self.model
        local_data_vols = [c.datavol for c in self.clients]
        total_data_vol = sum(local_data_vols)
        if self.aggregation_option == 'weighted_scale':
            p = [1.0 * local_data_vols[cid] /total_data_vol for cid in self.received_clients]
            K = len(models)
            N = self.num_clients
            return fmodule._model_sum([model_k * pk for model_k, pk in zip(models, p)]) * N / K
        elif self.aggregation_option == 'uniform':
            return fmodule._model_average(models)
        elif self.aggregation_option == 'weighted_com':
            p = [1.0 * local_data_vols[cid] / total_data_vol for cid in self.received_clients]
            w = fmodule._model_sum([model_k * pk for model_k, pk in zip(models, p)])
            return (1.0-sum(p))*self.model + w
        else:
            p = [1.0 * local_data_vols[cid] / total_data_vol for cid in self.received_clients]
            sump = sum(p)
            p = [pk/sump for pk in p]
            return fmodule._model_sum([model_k * pk for model_k, pk in zip(models, p)])

We will show how to modify each steps to realize different algorithms by the following sections.