fedn.network.clients package
The FEDn client package is responsible for executing the federated learning tasks, including ML model training and validation. It’s the acting gRPC client for the federated network.
The client first connacts the centralized controller to receive fedn.network.combiner.Combiner assingment. The client then connects to the combiner and
sends requests to the combiner to receive model updates and send model updates.
Submodules
fedn.network.clients.client_v2 module
Client module for handling client operations in the FEDn network.
- class fedn.network.clients.client_v2.Client(api_url: str, api_port: int, client_obj: ClientOptions, combiner_host: str | None = None, combiner_port: int | None = None, token: str | None = None, package_checksum: str | None = None, helper_type: str | None = None)[source]
Bases:
objectClient for interacting with the FEDn network.
- __init__(api_url: str, api_port: int, client_obj: ClientOptions, combiner_host: str | None = None, combiner_port: int | None = None, token: str | None = None, package_checksum: str | None = None, helper_type: str | None = None) None[source]
Initialize the Client.
- on_train(in_model: BytesIO, client_settings: dict) Tuple[BytesIO | None, dict][source]
Handle the training callback.
- set_helper(response: GrpcConnectionOptions | None = None) None[source]
Set the helper based on the response or default.
- class fedn.network.clients.client_v2.ClientOptions(name: str, package: str, preferred_combiner: str | None = None, id: str | None = None)[source]
Bases:
objectOptions for configuring the client.
fedn.network.clients.connect module
Connector class for assigning clients to the FEDn network via the discovery service (REST-API).
The Connector class is used by the Client class in fedn/network/clients/client.py. Once assigned, the client will retrieve combiner assignment from the discovery service. The discovery service will also add the client to the statestore.
- class fedn.network.clients.connect.ConnectorClient(host: str, port: int, token: str, name: str, remote_package: bool, force_ssl: bool = False, verify: bool = False, combiner: str | None = None, id: str | None = None)[source]
Bases:
objectConnector for assigning client to a combiner in the FEDn network.
- Parameters:
host (str) – host of discovery service
port (int) – port of discovery service
token (str) – token for authentication
name (str) – name of client
remote_package (bool) – True if remote package is used, False if local
force_ssl (bool) – True if https is used, False if http
verify (bool) – True if certificate is verified, False if not
combiner (Optional[str]) – name of preferred combiner
id (Optional[str]) – id of client
- __init__(host: str, port: int, token: str, name: str, remote_package: bool, force_ssl: bool = False, verify: bool = False, combiner: str | None = None, id: str | None = None) None[source]
Initialize the ConnectorClient.
fedn.network.clients.fedn_client module
FednClient class for interacting with the FEDn network.
- class fedn.network.clients.fedn_client.ConnectToApiResult(value)[source]
Bases:
EnumEnum for representing the result of connecting to the FEDn API.
- Assigned = 0
- ComputePackageMissing = 1
- IncorrectUrl = 4
- UnAuthorized = 2
- UnMatchedConfig = 3
- UnknownError = 5
- class fedn.network.clients.fedn_client.FednClient(train_callback: callable | None = None, validate_callback: callable | None = None, predict_callback: callable | None = None)[source]
Bases:
objectClient for interacting with the FEDn network.
- __init__(train_callback: callable | None = None, validate_callback: callable | None = None, predict_callback: callable | None = None) None[source]
Initialize the FednClient.
- backward_gradients(request)[source]
Split learning backward pass to update the local client models.
- connect_to_api(url: str, token: str, json: dict) Tuple[ConnectToApiResult, Any][source]
Connect to the FEDn API.
- create_backward_completion_message(gradient_id: str, meta: dict, request: TaskRequest)[source]
Create a backward completion message.
- create_prediction_message(prediction: dict, request: TaskRequest) ModelPrediction[source]
Create a prediction message.
- create_update_message(model_id: str, model_update_id: str, meta: dict, request: TaskRequest) ModelUpdate[source]
Create an update message.
- create_validation_message(metrics: dict, request: TaskRequest) ModelValidation[source]
Create a validation message.
- download_compute_package(url: str, token: str, name: str | None = None) bool[source]
Download compute package from controller.
- forward_embeddings(request)[source]
Forward pass for split learning gradient calculation or inference.
- get_model_from_combiner(id: str, client_id: str, timeout: int = 20) BytesIO[source]
Get the model from the combiner.
- init_grpchandler(config: GrpcConnectionOptions, client_name: str, token: str) bool[source]
Initialize the GRPC handler.
- init_remote_compute_package(url: str, token: str, package_checksum: str | None = None) bool[source]
Initialize the remote compute package.
- log_metric(metrics: dict, step: int | None = None, commit: bool = True) bool[source]
Log the metrics to the server.
- Parameters:
- Returns:
True if the metrics were logged successfully, False otherwise.
- Return type:
- logging_context(context: LoggingContext)[source]
Set the logging context.
- send_heartbeats(client_name: str, client_id: str, update_frequency: float = 2.0) None[source]
Send heartbeats to the server.
- send_status(msg: str, log_level: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0430> = 1, type: str | None = None, request: ~fedn_pb2.ModelUpdate | ~fedn_pb2.ModelValidation | ~fedn_pb2.TaskRequest | None = None, session_id: str | None = None, sender_name: str | None = None) None[source]
Send the status.
- class fedn.network.clients.fedn_client.GrpcConnectionOptions(host: str, port: int, status: str = '', fqdn: str = '', package: str = '', ip: str = '', helper_type: str = '')[source]
Bases:
objectOptions for configuring the GRPC connection.
- __init__(host: str, port: int, status: str = '', fqdn: str = '', package: str = '', ip: str = '', helper_type: str = '') None[source]
Initialize GrpcConnectionOptions.
- classmethod from_dict(config: dict) GrpcConnectionOptions[source]
Create a GrpcConnectionOptions instance from a JSON string.
- class fedn.network.clients.fedn_client.LoggingContext(*, step: int = 0, model_id: str | None = None, round_id: str | None = None, session_id: str | None = None, request: TaskRequest | None = None)[source]
Bases:
objectContext for keeping track of the session, model and round IDs during a dispatched call from a request.
fedn.network.clients.grpc_handler module
GrpcHandler class for handling GRPC connections and operations.
- class fedn.network.clients.grpc_handler.GrpcAuth(key: str)[source]
Bases:
AuthMetadataPluginGRPC authentication plugin.
- class fedn.network.clients.grpc_handler.GrpcHandler(host: str, port: int, name: str, token: str, combiner_name: str)[source]
Bases:
objectHandler for GRPC connections and operations.
- __init__(host: str, port: int, name: str, token: str, combiner_name: str) None[source]
Initialize the GrpcHandler.
- create_backward_completion_message(sender_name: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, gradient_id: str, session_id: str, meta: dict)[source]
- create_metric_message(sender_name: str, sender_client_id: str, metrics: dict, step: int, model_id: str, session_id: str, round_id: str) ModelMetric[source]
Create a metric message.
- create_prediction_message(sender_name: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, model_id: str, prediction_output: str, correlation_id: str, session_id: str) ModelPrediction[source]
Create a prediction message.
- create_update_message(sender_name: str, model_id: str, model_update_id: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, meta: dict) ModelUpdate[source]
Create an update message.
- create_validation_message(sender_name: str, sender_client_id: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, model_id: str, metrics: str, correlation_id: str, session_id: str) ModelValidation[source]
Create a validation message.
- get_model_from_combiner(id: str, client_id: str, timeout: int = 20) BytesIO | None[source]
Fetch a model from the assigned combiner.
Downloads the model update object via a gRPC streaming channel.
- heartbeat(client_name: str, client_id: str, memory_utilisation: float | None = None, cpu_utilisation: float | None = None) Response[source]
Send a heartbeat to the combiner.
- Returns:
Response from the combiner.
- Return type:
fedn.Response
- listen_to_task_stream(client_name: str, client_id: str, callback: Callable[[Any], None]) None[source]
Subscribe to the model update request stream.
- send_attributes(attribute: AttributeMessage) bool[source]
Send a attribute message to the combiner.
- send_backward_completion(update: BackwardCompletion)[source]
Send a backward completion message to the combiner.
- send_heartbeats(client_name: str, client_id: str, update_frequency: float = 2.0) None[source]
Send heartbeats to the combiner at regular intervals.
- send_model_prediction(prediction: ModelPrediction) bool[source]
Send a model prediction to the combiner.
- send_model_to_combiner(model: BytesIO, id: str) BytesIO | None[source]
Send a model update to the assigned combiner.
Uploads the model updated object via a gRPC streaming channel, Upload.
- Parameters:
model (BytesIO) – The model update object.
id (str) – The id of the model update object.
- Returns:
The model update object.
- Return type:
Optional[BytesIO]
- send_model_validation(validation: ModelValidation) bool[source]
Send a model validation to the combiner.
- send_status(msg: str, log_level: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0430> = 1, type: str | None = None, request: ~fedn_pb2.ModelUpdate | ~fedn_pb2.ModelValidation | ~fedn_pb2.TaskRequest | None = None, session_id: str | None = None, sender_name: str | None = None) None[source]
Send status message.
fedn.network.clients.package_runtime module
Contains the PackageRuntime class, used to download, validate, and unpack compute packages.
- class fedn.network.clients.package_runtime.PackageRuntime(package_path: str)[source]
Bases:
objectPackageRuntime is used to download, validate, and unpack compute packages.
- Parameters:
package_path (str) – Path to compute package.
- download_compute_package(url: str, token: str, name: str | None = None) bool[source]
Download compute package from controller.
- Parameters:
url – URL of the controller.
token – Token for authentication.
name – Name of the package.
- Returns:
True if download was successful, False otherwise.
- Return type:
- get_dispatcher(run_path: str) Dispatcher | None[source]
Dispatch the compute package.
- Parameters:
run_path (str) – Path to dispatch the compute package.
- Returns:
Dispatcher object or None if an error occurred.
- Return type:
Optional[Dispatcher]
- set_checksum(url: str, token: str, name: str | None = None) bool[source]
Get checksum of compute package from controller.
- Parameters:
url – URL of the controller.
token – Token for authentication.
name – Name of the package.
- Returns:
True if checksum was set successfully, False otherwise.
- Return type:
fedn.network.clients.state module
Module for representing and converting client states.
- class fedn.network.clients.state.ClientState(value)[source]
Bases:
EnumEnum for representing the state of a client.
- idle = 1
- predicting = 4
- training = 2
- validating = 3
- fedn.network.clients.state.client_state_to_string(state: ClientState) str[source]
Convert a ClientState to a string representation.
- Parameters:
state (
fedn.network.clients.state.ClientState) – the state to convert- Returns:
string representation of the state
- Return type: