Source code for fedn.network.grpc.server

from concurrent import futures
from typing import TypedDict

import grpc
from grpc_health.v1 import health, health_pb2_grpc

import fedn.network.grpc.fedn_pb2_grpc as rpc
from fedn.common.log_config import logger, set_log_level_from_string, set_log_stream
from fedn.network.grpc.auth import JWTInterceptor


[docs] class ServerConfig(TypedDict): port: int secure: bool key: str certificate: str logfile: str verbosity: str
[docs] class Server: """Class for configuring and launching the gRPC server."""
[docs] def __init__(self, servicer, modelservicer, config: ServerConfig): set_log_level_from_string(config.get("verbosity", "INFO")) set_log_stream(config.get("logfile", None)) # Keepalive settings: these detect if the client is alive KEEPALIVE_TIME_MS = 60 * 1000 # send keepalive ping every 60 second # wait 30 seconds for keepalive ping ack before considering connection dead KEEPALIVE_TIMEOUT_MS = 30 * 1000 # max idle time before server terminates the connection (5 minutes) MAX_CONNECTION_IDLE_MS = 5 * 60 * 1000 MAX_PING_STRIKES = 2 self.server = grpc.server( futures.ThreadPoolExecutor(max_workers=350), interceptors=[JWTInterceptor()], options=[ ("grpc.keepalive_time_ms", KEEPALIVE_TIME_MS), ("grpc.keepalive_timeout_ms", KEEPALIVE_TIMEOUT_MS), ("grpc.max_connection_idle_ms", MAX_CONNECTION_IDLE_MS), ("grpc.http2.max_pings_without_data", 0), # Allow unlimited PINGs without data ("grpc.http2.max_ping_strikes", MAX_PING_STRIKES), ], ) self.certificate = None self.health_servicer = health.HealthServicer() if isinstance(servicer, rpc.CombinerServicer): rpc.add_CombinerServicer_to_server(servicer, self.server) if isinstance(servicer, rpc.ConnectorServicer): rpc.add_ConnectorServicer_to_server(servicer, self.server) if isinstance(servicer, rpc.ReducerServicer): rpc.add_ReducerServicer_to_server(servicer, self.server) if isinstance(modelservicer, rpc.ModelServiceServicer): rpc.add_ModelServiceServicer_to_server(modelservicer, self.server) if isinstance(servicer, rpc.CombinerServicer): rpc.add_ControlServicer_to_server(servicer, self.server) health_pb2_grpc.add_HealthServicer_to_server(self.health_servicer, self.server) if config["secure"]: logger.info("Creating secure gRPCS server using certificate") server_credentials = grpc.ssl_server_credentials( ( ( config["key"], config["certificate"], ), ) ) self.server.add_secure_port("[::]:" + str(config["port"]), server_credentials) else: logger.info("Creating gRPC server") self.server.add_insecure_port("[::]:" + str(config["port"]))
[docs] def start(self): """Start the gRPC server.""" logger.info("gRPC Server started") self.server.start()
[docs] def stop(self): """Stop the gRPC server.""" logger.info("gRPC Server stopped") self.server.stop(0)