fedn.network.clients package

The FEDn client package is responsible for executing the federated learning tasks, including ML model training and validation. It’s the acting gRPC client for the federated network. The client first connacts the centralized controller to receive fedn.network.combiner.Combiner assingment. The client then connects to the combiner and sends requests to the combiner to receive model updates and send model updates.

Submodules

fedn.network.clients.client_v2 module

Client module for handling client operations in the FEDn network.

class fedn.network.clients.client_v2.Client(api_url: str, api_port: int, client_obj: ClientOptions, combiner_host: str | None = None, combiner_port: int | None = None, token: str | None = None, package_checksum: str | None = None, helper_type: str | None = None)[source]

Bases: object

Client for interacting with the FEDn network.

__init__(api_url: str, api_port: int, client_obj: ClientOptions, combiner_host: str | None = None, combiner_port: int | None = None, token: str | None = None, package_checksum: str | None = None, helper_type: str | None = None) None[source]

Initialize the Client.

on_backward(in_gradients, client_id)[source]
on_forward(client_id, is_sl_inference)[source]
on_train(in_model: BytesIO, client_settings: dict) Tuple[BytesIO | None, dict][source]

Handle the training callback.

on_validation(in_model: BytesIO) dict | None[source]

Handle the validation callback.

set_helper(response: GrpcConnectionOptions | None = None) None[source]

Set the helper based on the response or default.

start() None[source]

Start the client.

class fedn.network.clients.client_v2.ClientOptions(name: str, package: str, preferred_combiner: str | None = None, id: str | None = None)[source]

Bases: object

Options for configuring the client.

__init__(name: str, package: str, preferred_combiner: str | None = None, id: str | None = None) None[source]

Initialize ClientOptions with validation.

to_json() Dict[str, str | None][source]

Convert ClientOptions to JSON.

fedn.network.clients.client_v2.get_url(api_url: str, api_port: int) str[source]

Construct the URL for the API.

fedn.network.clients.connect module

Connector class for assigning clients to the FEDn network via the discovery service (REST-API).

The Connector class is used by the Client class in fedn/network/clients/client.py. Once assigned, the client will retrieve combiner assignment from the discovery service. The discovery service will also add the client to the statestore.

class fedn.network.clients.connect.ConnectorClient(host: str, port: int, token: str, name: str, remote_package: bool, force_ssl: bool = False, verify: bool = False, combiner: str | None = None, id: str | None = None)[source]

Bases: object

Connector for assigning client to a combiner in the FEDn network.

Parameters:
  • host (str) – host of discovery service

  • port (int) – port of discovery service

  • token (str) – token for authentication

  • name (str) – name of client

  • remote_package (bool) – True if remote package is used, False if local

  • force_ssl (bool) – True if https is used, False if http

  • verify (bool) – True if certificate is verified, False if not

  • combiner (Optional[str]) – name of preferred combiner

  • id (Optional[str]) – id of client

__init__(host: str, port: int, token: str, name: str, remote_package: bool, force_ssl: bool = False, verify: bool = False, combiner: str | None = None, id: str | None = None) None[source]

Initialize the ConnectorClient.

assign() Tuple[Status, dict | None][source]

Connect client to FEDn network discovery service, ask for combiner assignment.

Returns:

Tuple with assignment status, combiner connection information if successful, else None.

Return type:

tuple(fedn.network.clients.connect.Status, Optional[dict])

refresh_token() int[source]

Refresh client token.

Returns:

Status code of the token refresh request.

Return type:

int

class fedn.network.clients.connect.Status(value)[source]

Bases: Enum

Enum for representing the status of a client assignment.

Assigned = 1
TryAgain = 2
UnAuthorized = 3
UnMatchedConfig = 4
Unassigned = 0

fedn.network.clients.fedn_client module

FednClient class for interacting with the FEDn network.

class fedn.network.clients.fedn_client.ConnectToApiResult(value)[source]

Bases: Enum

Enum for representing the result of connecting to the FEDn API.

Assigned = 0
ComputePackageMissing = 1
IncorrectUrl = 4
UnAuthorized = 2
UnMatchedConfig = 3
UnknownError = 5
class fedn.network.clients.fedn_client.FednClient(train_callback: callable | None = None, validate_callback: callable | None = None, predict_callback: callable | None = None)[source]

Bases: object

Client for interacting with the FEDn network.

__init__(train_callback: callable | None = None, validate_callback: callable | None = None, predict_callback: callable | None = None) None[source]

Initialize the FednClient.

backward_gradients(request)[source]

Split learning backward pass to update the local client models.

connect_to_api(url: str, token: str, json: dict) Tuple[ConnectToApiResult, Any][source]

Connect to the FEDn API.

create_backward_completion_message(gradient_id: str, meta: dict, request: TaskRequest)[source]

Create a backward completion message.

create_prediction_message(prediction: dict, request: TaskRequest) ModelPrediction[source]

Create a prediction message.

create_update_message(model_id: str, model_update_id: str, meta: dict, request: TaskRequest) ModelUpdate[source]

Create an update message.

create_validation_message(metrics: dict, request: TaskRequest) ModelValidation[source]

Create a validation message.

default_telemetry_loop(update_frequency: float = 5.0) None[source]

Send default telemetry data.

download_compute_package(url: str, token: str, name: str | None = None) bool[source]

Download compute package from controller.

forward_embeddings(request)[source]

Forward pass for split learning gradient calculation or inference.

get_model_from_combiner(id: str, client_id: str, timeout: int = 20) BytesIO[source]

Get the model from the combiner.

get_or_set_environment() bool[source]

Get or set the environment.

init_grpchandler(config: GrpcConnectionOptions, client_name: str, token: str) bool[source]

Initialize the GRPC handler.

init_local_compute_package() bool[source]

Initialize the local compute package.

init_remote_compute_package(url: str, token: str, package_checksum: str | None = None) bool[source]

Initialize the remote compute package.

listen_to_task_stream(client_name: str, client_id: str) None[source]

Listen to the task stream.

log_attributes(attributes: dict) bool[source]

Log the attributes to the server.

Parameters:

attributes (dict) – The attributes to log.

Returns:

True if the attributes were logged successfully, False otherwise.

Return type:

bool

log_metric(metrics: dict, step: int | None = None, commit: bool = True) bool[source]

Log the metrics to the server.

Parameters:
  • metrics (dict) – The metrics to log.

  • step (int, optional) – The step number.

  • value. (If provided the context step will be set to this)

  • provided (If not)

  • used. (the step from the context will be)

  • commit (bool, optional) – Whether or not to increment the step. Defaults to True.

Returns:

True if the metrics were logged successfully, False otherwise.

Return type:

bool

log_telemetry(telemetry: dict) bool[source]

Log the telemetry data to the server.

Parameters:

telemetry (dict) – The telemetry data to log.

Returns:

True if the telemetry data was logged successfully, False otherwise.

Return type:

bool

logging_context(context: LoggingContext)[source]

Set the logging context.

predict_global_model(request: TaskRequest) None[source]

Predict using the global model.

run(with_telemetry=True, with_heartbeat=True) None[source]

Run the client.

send_heartbeats(client_name: str, client_id: str, update_frequency: float = 2.0) None[source]

Send heartbeats to the server.

send_model_prediction(prediction: ModelPrediction) bool[source]

Send the model prediction.

send_model_to_combiner(model: BytesIO, id: str) None[source]

Send the model to the combiner.

send_model_update(update: ModelUpdate) bool[source]

Send the model update.

send_model_validation(validation: ModelValidation) bool[source]

Send the model validation.

send_status(msg: str, log_level: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0430> = 1, type: str | None = None, request: ~fedn_pb2.ModelUpdate | ~fedn_pb2.ModelValidation | ~fedn_pb2.TaskRequest | None = None, session_id: str | None = None, sender_name: str | None = None) None[source]

Send the status.

set_backward_callback(callback: callable)[source]
set_client_id(client_id: str) None[source]

Set the client ID.

set_compute_package_checksum(url: str, token: str, name: str | None = None) bool[source]

Get checksum of compute package from controller.

set_dispatcher(path: str) bool[source]

Set the dispatcher.

set_forward_callback(callback: callable)[source]
set_name(name: str) None[source]

Set the client name.

set_predict_callback(callback: callable) None[source]

Set the predict callback.

set_train_callback(callback: callable) None[source]

Set the train callback.

set_validate_callback(callback: callable) None[source]

Set the validate callback.

unpack_compute_package() Tuple[bool, str][source]

Unpack the compute package.

update_local_model(request: TaskRequest) None[source]

Update the local model.

validate_compute_package(checksum: str) bool[source]

Validate the compute package.

validate_global_model(request: TaskRequest) None[source]

Validate the global model.

class fedn.network.clients.fedn_client.GrpcConnectionOptions(host: str, port: int, status: str = '', fqdn: str = '', package: str = '', ip: str = '', helper_type: str = '')[source]

Bases: object

Options for configuring the GRPC connection.

__init__(host: str, port: int, status: str = '', fqdn: str = '', package: str = '', ip: str = '', helper_type: str = '') None[source]

Initialize GrpcConnectionOptions.

classmethod from_dict(config: dict) GrpcConnectionOptions[source]

Create a GrpcConnectionOptions instance from a JSON string.

class fedn.network.clients.fedn_client.LoggingContext(*, step: int = 0, model_id: str | None = None, round_id: str | None = None, session_id: str | None = None, request: TaskRequest | None = None)[source]

Bases: object

Context for keeping track of the session, model and round IDs during a dispatched call from a request.

__init__(*, step: int = 0, model_id: str | None = None, round_id: str | None = None, session_id: str | None = None, request: TaskRequest | None = None) None[source]
fedn.network.clients.fedn_client.get_compute_package_dir_path() str[source]

Get the directory path for the compute package.

fedn.network.clients.grpc_handler module

GrpcHandler class for handling GRPC connections and operations.

class fedn.network.clients.grpc_handler.GrpcAuth(key: str)[source]

Bases: AuthMetadataPlugin

GRPC authentication plugin.

__init__(key: str) None[source]

Initialize GrpcAuth with a key.

class fedn.network.clients.grpc_handler.GrpcHandler(host: str, port: int, name: str, token: str, combiner_name: str)[source]

Bases: object

Handler for GRPC connections and operations.

__init__(host: str, port: int, name: str, token: str, combiner_name: str) None[source]

Initialize the GrpcHandler.

create_backward_completion_message(sender_name: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, gradient_id: str, session_id: str, meta: dict)[source]
create_metric_message(sender_name: str, sender_client_id: str, metrics: dict, step: int, model_id: str, session_id: str, round_id: str) ModelMetric[source]

Create a metric message.

create_prediction_message(sender_name: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, model_id: str, prediction_output: str, correlation_id: str, session_id: str) ModelPrediction[source]

Create a prediction message.

create_update_message(sender_name: str, model_id: str, model_update_id: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, meta: dict) ModelUpdate[source]

Create an update message.

create_validation_message(sender_name: str, sender_client_id: str, receiver_name: str, receiver_role: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0550>, model_id: str, metrics: str, correlation_id: str, session_id: str) ModelValidation[source]

Create a validation message.

get_model_from_combiner(id: str, client_id: str, timeout: int = 20) BytesIO | None[source]

Fetch a model from the assigned combiner.

Downloads the model update object via a gRPC streaming channel.

Parameters:
  • id (str) – The id of the model update object.

  • client_id (str) – The id of the client.

  • timeout (int) – The timeout for the request.

Returns:

The model update object.

Return type:

Optional[BytesIO]

heartbeat(client_name: str, client_id: str, memory_utilisation: float | None = None, cpu_utilisation: float | None = None) Response[source]

Send a heartbeat to the combiner.

Returns:

Response from the combiner.

Return type:

fedn.Response

listen_to_task_stream(client_name: str, client_id: str, callback: Callable[[Any], None]) None[source]

Subscribe to the model update request stream.

send_attributes(attribute: AttributeMessage) bool[source]

Send a attribute message to the combiner.

send_backward_completion(update: BackwardCompletion)[source]

Send a backward completion message to the combiner.

send_heartbeats(client_name: str, client_id: str, update_frequency: float = 2.0) None[source]

Send heartbeats to the combiner at regular intervals.

send_model_metric(metric: ModelMetric) bool[source]

Send a model metric to the combiner.

send_model_prediction(prediction: ModelPrediction) bool[source]

Send a model prediction to the combiner.

send_model_to_combiner(model: BytesIO, id: str) BytesIO | None[source]

Send a model update to the assigned combiner.

Uploads the model updated object via a gRPC streaming channel, Upload.

Parameters:
  • model (BytesIO) – The model update object.

  • id (str) – The id of the model update object.

Returns:

The model update object.

Return type:

Optional[BytesIO]

send_model_update(update: ModelUpdate) bool[source]

Send a model update to the combiner.

send_model_validation(validation: ModelValidation) bool[source]

Send a model validation to the combiner.

send_status(msg: str, log_level: <google.protobuf.internal.enum_type_wrapper.EnumTypeWrapper object at 0x797211da0430> = 1, type: str | None = None, request: ~fedn_pb2.ModelUpdate | ~fedn_pb2.ModelValidation | ~fedn_pb2.TaskRequest | None = None, session_id: str | None = None, sender_name: str | None = None) None[source]

Send status message.

Parameters:
  • msg (str) – The message to send.

  • log_level (fedn.LogLevel.INFO, fedn.LogLevel.WARNING, fedn.LogLevel.ERROR) – The log level of the message.

  • type (str) – The type of the message.

  • request (fedn.Request) – The request message.

send_telemetry(telemetry: TelemetryMessage) bool[source]

Send a telemetry message to the combiner.

exception fedn.network.clients.grpc_handler.RetryException[source]

Bases: Exception

fedn.network.clients.grpc_handler.grpc_retry(max_retries: int = 3, retry_interval: float = 5, backoff: float = 2) Callable[source]

GRPC retry decorator.

Parameters:
  • max_retries (int) – The maximum number of retries. -1 means infinite retries.

  • retry_interval (float) – The interval between retries in seconds.

Returns:

The decorated function.

Return type:

Callable

fedn.network.clients.package_runtime module

Contains the PackageRuntime class, used to download, validate, and unpack compute packages.

class fedn.network.clients.package_runtime.PackageRuntime(package_path: str)[source]

Bases: object

PackageRuntime is used to download, validate, and unpack compute packages.

Parameters:

package_path (str) – Path to compute package.

__init__(package_path: str) None[source]

Initialize the PackageRuntime.

download_compute_package(url: str, token: str, name: str | None = None) bool[source]

Download compute package from controller.

Parameters:
  • url – URL of the controller.

  • token – Token for authentication.

  • name – Name of the package.

Returns:

True if download was successful, False otherwise.

Return type:

bool

get_dispatcher(run_path: str) Dispatcher | None[source]

Dispatch the compute package.

Parameters:

run_path (str) – Path to dispatch the compute package.

Returns:

Dispatcher object or None if an error occurred.

Return type:

Optional[Dispatcher]

set_checksum(url: str, token: str, name: str | None = None) bool[source]

Get checksum of compute package from controller.

Parameters:
  • url – URL of the controller.

  • token – Token for authentication.

  • name – Name of the package.

Returns:

True if checksum was set successfully, False otherwise.

Return type:

bool

unpack_compute_package() Tuple[bool, str][source]

Unpack the compute package.

Returns:

Tuple containing a boolean indicating success and the path to the unpacked package.

Return type:

Tuple[bool, str]

validate(expected_checksum: str) bool[source]

Validate the package against the checksum provided by the controller.

Parameters:

expected_checksum – Checksum provided by the controller.

Returns:

True if checksums match, False otherwise.

Return type:

bool

fedn.network.clients.state module

Module for representing and converting client states.

class fedn.network.clients.state.ClientState(value)[source]

Bases: Enum

Enum for representing the state of a client.

idle = 1
predicting = 4
training = 2
validating = 3
fedn.network.clients.state.client_state_to_string(state: ClientState) str[source]

Convert a ClientState to a string representation.

Parameters:

state (fedn.network.clients.state.ClientState) – the state to convert

Returns:

string representation of the state

Return type:

str