API#

fed.api module#

fed.get(fed_objects: Union[ObjectRef, List[FedObject], FedObject]) Any[source]#

Gets the real data of the given fed_object.

If the object is located in current party, return it immediately, otherwise return it after receiving the real data from the located party.

fed.init(addresses: Dict = None, party: str = None, config: Dict = {}, tls_config: Dict = None, logging_level: str = 'info', sender_proxy_cls: SenderProxy = None, receiver_proxy_cls: ReceiverProxy = None, receiver_sender_proxy_cls: SenderReceiverProxy = None, job_name: str = None, sending_failure_handler: Callable[[Exception], None] = None)[source]#

Initialize a RayFed client.

Args:
addresses:

optional; a dict describes the addresses configurations. E.g.

{
    # The address that can be connected to `alice` by other parties.
    'alice': '127.0.0.1:10001',
    # The address that can be connected to `bob` by other parties.
    'bob': '127.0.0.1:10002',
    # The address that can be connected to `carol` by other parties.
    'carol': '127.0.0.1:10003',
}
party:

optional; self party.

config:

optional; a dict describes general job configurations. Currently the supported configurations are [‘cross_silo_comm’, ‘barrier_on_initializing’].

cross_silo_comm

optional; a dict describes the cross-silo common configs, the supported configs can be referred to fed.config.CrossSiloMessageConfig() and fed.config.GrpcCrossSiloMessageConfig(). Note that, the cross_silo_comm.messages_max_size_in_bytes will be overrided if cross_silo_comm.grpc_channel_options is provided and contains grpc.max_send_message_length or grpc.max_receive_message_length.

barrier_on_initializing

optional; a bool value indicates whether to wait for all parties to be ready before starting the job. If set to True, the job will be started after all parties are ready, otherwise, the job will be started immediately after the current party is ready.

E.g.

{
    "cross_silo_comm": {
        "messages_max_size_in_bytes": 500*1024,
        "timeout_in_ms": 1000,
        "exit_on_sending_failure": True,
        "expose_error_trace": True,
        "use_global_proxy": True,
    },
    "barrier_on_initializing": True,
}
tls_config:

optional; a dict describes the tls config. E.g. For alice,

{
    "ca_cert": "root ca cert of other parties.",
    "cert": "alice's server cert",
    "key": "alice's server cert key",
}

For bob,

{
    "ca_cert": "root ca cert of other parties.",
    "cert": "bob's server cert",
    "key": "bob's server cert key",
}
logging_level:

optional; the logging level, could be debug, info, warning, error, critical, not case sensititive.

job_name:

optional; the job name of the current job. Note that, the job name must be identical in all parties, otherwise, messages will be ignored because of the job name mismatch. If the job name is not provided, an default fixed name will be assigned, therefore messages of all anonymous jobs will be mixed together, which should only be used in the single job scenario or test mode.

sending_failure_handler:

optional; a callback which will be triggeed if cross-silo message sending failed and exit_on_sending_failure in config is True.

Examples:
>>> import fed
>>> import ray
>>> ray.init(address='local')
>>> addresses = {
>>>    'alice': '127.0.0.1:10001',
>>>    'bob': '127.0.0.1:10002',
>>>    'carol': '127.0.0.1:10003',
>>> }
>>> # Start as alice.
>>> fed.init(addresses=addresses, party='alice')
fed.kill(actor: FedActorHandle, *, no_restart=True)[source]#

Kill an actor forcefully.

Args:

actor: Handle to the actor to kill. no_restart: Whether or not this actor should be restarted if

it’s a restartable actor.

fed.remote(*args, **kwargs)[source]#

Defines a remote function or an actor class.

This function can be used as a decorator with no arguments to define a remote function or actor as follows:

import fed

@fed.remote
def f(a, b, c):
    return a + b + c

object_ref = f.part('alice').remote(1, 2, 3)
result = fed.get(object_ref)
assert result == (1 + 2 + 3)

@fed.remote
class Foo:
    def __init__(self, arg):
        self.x = arg

    def method(self, a):
        return self.x + a

actor_handle = Foo.party('alice').remote(123)
object_ref = actor_handle.method.remote(321)
result = fed.get(object_ref)
assert result == (123 + 321)

Equivalently, use a function call to create a remote function or actor.

def g(a, b, c):
    return a + b + c

remote_g = fed.remote(g)
object_ref = remote_g.party('alice').remote(1, 2, 3)
assert fed.get(object_ref) == (1 + 2 + 3)

class Bar:
    def __init__(self, arg):
        self.x = arg

    def method(self, a):
        return self.x + a

RemoteBar = fed.remote(Bar)
actor_handle = RemoteBar.party('alice').remote(123)
object_ref = actor_handle.method.remote(321)
result = fed.get(object_ref)
assert result == (123 + 321)

It can also be used with specific keyword arguments just same as ray options.

fed.shutdown()[source]#

Shutdown a RayFed client.

fed.config module#

This module should be cached locally due to all configurations are mutable.

class fed.config.ClusterConfig(raw_bytes: bytes)[source]#

A local cache of cluster configuration items.

class fed.config.CrossSiloMessageConfig(proxy_max_restarts: int = None, timeout_in_ms: int = 60000, messages_max_size_in_bytes: int = None, exit_on_sending_failure: Optional[bool] = False, serializing_allowed_list: Optional[Dict[str, str]] = None, send_resource_label: Optional[Dict[str, str]] = None, recv_resource_label: Optional[Dict[str, str]] = None, http_header: Optional[Dict[str, str]] = None, max_concurrency: Optional[int] = None, expose_error_trace: Optional[bool] = False, use_global_proxy: Optional[bool] = True)[source]#

A class to store parameters used for Proxy Actor.

Attributes:
proxy_max_restarts:

The max restart times for the send proxy.

serializing_allowed_list:

The package or class list allowed for serializing(deserializating) cross silos. It’s used for avoiding pickle deserializing execution attack when crossing silos.

send_resource_label:

Customized resource label, the SenderProxyActor will be scheduled based on the declared resource label. For example, when setting to {“my_label”: 1}, then the sender proxy actor will be started only on nodes with {“resource”: {“my_label”: $NUM}} where $NUM >= 1.

recv_resource_label:

Customized resource label, the ReceiverProxyActor will be scheduled based on the declared resource label. For example, when setting to {“my_label”: 1}, then the receiver proxy actor will be started only on nodes with {“resource”: {“my_label”: $NUM}} where $NUM >= 1.

exit_on_sending_failure:

whether exit when failure on cross-silo sending. If True, a SIGINT will be signaled to self if failed to sending cross-silo data and exit then.

messages_max_size_in_bytes:

The maximum length in bytes of cross-silo messages. If None, the default value of 500 MB is specified.

timeout_in_ms:

The timeout in mili-seconds of a cross-silo RPC call. It’s 60000 by default.

http_header:

The HTTP header, e.g. metadata in grpc, sent with the RPC request. This won’t override basic tcp headers, such as user-agent, but concat them together.

max_concurrency:

the max_concurrency of the sender/receiver proxy actor.

use_global_proxy:

Whether using the global proxy actor or create new proxy actor for current job.

classmethod from_dict(data: Dict) CrossSiloMessageConfig[source]#

Initialize CrossSiloMessageConfig from a dictionary.

Args:

data (Dict): Dictionary with keys as member variable names.

Returns:

CrossSiloMessageConfig: An instance of CrossSiloMessageConfig.

class fed.config.GrpcCrossSiloMessageConfig(proxy_max_restarts: int = None, timeout_in_ms: int = 60000, messages_max_size_in_bytes: int = None, exit_on_sending_failure: Optional[bool] = False, serializing_allowed_list: Optional[Dict[str, str]] = None, send_resource_label: Optional[Dict[str, str]] = None, recv_resource_label: Optional[Dict[str, str]] = None, http_header: Optional[Dict[str, str]] = None, max_concurrency: Optional[int] = None, expose_error_trace: Optional[bool] = False, use_global_proxy: Optional[bool] = True, grpc_channel_options: List = None, grpc_retry_policy: Dict[str, str] = None)[source]#

A class to store parameters used for GRPC communication

Attributes:
grpc_retry_policy:

a dict descibes the retry policy for cross silo rpc call. If None, the following default retry policy will be used. More details please refer to retry-policy. # noqa

{
    "maxAttempts": 4,
    "initialBackoff": "0.1s",
    "maxBackoff": "1s",
    "backoffMultiplier": 2,
    "retryableStatusCodes": [
        "UNAVAILABLE"
    ]
}
grpc_channel_options: A list of tuples to store GRPC channel options, e.g.
[
    ('grpc.enable_retries', 1),
    ('grpc.max_send_message_length', 50 * 1024 * 1024)
]