API

ESHET Client

class eshet.Client

Bases: object

ESHET client

__init__(base: str = '/', server: str = None, client_id=None, timeout_cfg: ~eshet.protocol.TimeoutConfig = TimeoutConfig(idle_ping=15, server_timeout=30, ping_timeout=5), logger=<Logger eshet.client (WARNING)>)
Parameters:
  • base – Base path for this client: relative paths passed to other functions will be relative to this.

  • server – ESHET server to connect to; defaults to the ESHET_SERVER environment variable, or localhost. This may also include the port number (in the form host:port), which defaults to 11236.

  • client_id – msgpack-serialisable ID for this client; will be allocated by the server if not provided

  • timeout_cfg – configuration for protocol-level timeouts

  • logger – logger for connection messages

property connected: bool

are we currently connected?

wait_for_connection()

wait until the connection has been established

async action_register(path, callback)

register an action

callback will be called with the action arguments to get the return value. if the return is awaitable, it will be awaited in a task

async action_call(path, *args)

call an action

async get(path)

get a state or property

async set(path, value)

set a state or property

async event_register(path)

register an event; returns an async callable which emits an event given a payload

async event_listen_cb(path, callback)

listen for events; callback will be called with the payload

if it returns a coroutine, it will be ran in a task

async event_listen(path)

listen for events; returns an async iterator of the payloads

class StateWrapper

Bases: object

wrapper around registered states which just stores a client and a path

async changed(value)

update the value of the state; equivalent to Client.state_changed()

async unknown()

update the value of the state; equivalent to Client.state_unknown()

__init__(client: Client, path: str) None
async state_register(path, set_callback=None) StateWrapper

register a state

async state_changed(path, value)

update the value of a registered state

if value is eshet.Unknown, it is marked as unknown

async state_unknown(path)

clear the value of a registered state

async state_observe(path, callback)

observe a state, returns the current value or Unknown, and calls callback with subsequent values

eshet.Unknown

value used for states when they are not known

class eshet.TimeoutConfig

Bases: object

configuration for protocol-level timeouts

idle_ping: int = 15

send a ping if we haven’t sent anything for this long

server_timeout: int = 30

tell the server to time out if it hasn’t received a message for this long; must be more than idle_ping

ping_timeout: int = 5

how long to wait for a ping before assuming the connection is dead

__init__(idle_ping: int = 15, server_timeout: int = 30, ping_timeout: int = 5) None

YARP Wrapper

YARP wrappers for ESHET use a default shared client if client is not specified; see get_default_eshet_client() and set_default_eshet_client().

async eshet.yarp.action_call(path, *args, client=None, strategy: TaskStrategy = RunInTask())

call an action whenever args does not contain NoValue/Unknown

Each argument can be a yarp.Value, yarp.Event, or a regular value. These are combined together as in yarp.fn(). If the overall value is an yarp.Event, the action will be called on every emission, whereas if it’s an yarp.Value it will be called with the initial and subsequent values.

async eshet.yarp.set_value(path, value: Value | Event, client=None, strategy: TaskStrategy = RunInTask())

set a state or prop whenever value does not contain NoValue/Unknown

Events

async eshet.yarp.event_listen(path, client=None) Event

make an Event which emits whenever the event at path does

States

async eshet.yarp.state_observe(path, client=None) Value

make a Value which has the value of the given state

the value will be set by the time this returns, and will be set to client.Unknown if the state is unknown. it will therefore never be NoValue.

async eshet.yarp.state_register(path, value: Value, client=None, settable=False, set_callback=None)

register a state which has the same value as value

if settable, a set callback is registered which sets the value

alternatively, set_callback can be a callback which accepts the new value

client.Unknown and yarp.NoValue are both mapped to unknown

async eshet.yarp.state_register_set_event(path, value: Value, client=None) Event

register a state as with state_register, but return an Event which emits whenever the value is set

Default Client

async eshet.yarp.get_default_eshet_client() Client

get the default (global) client used by yarp wrapper functions

If no client exists (because this has not been called by yarp functions and set_default_eshet_client() has not been set, a new Client with default settings will be created.

Use this if you want to mix yarp functions with plain Client use.

eshet.yarp.set_default_eshet_client(client: Client)

set the default (global) client used by yarp wrapper functions

Utilities

eshet.utils.in_task(f)

decorator: return a function which calls f in a task

class eshet.utils.TaskStrategy

Bases: object

a strategy for running tasks in the background

call build to get an implementation object, which can be called with tasks (no-argument async functions), and will run them in the background according to some defined strategy

build() Callable[[Callable[[], Awaitable[None]]], None]

instantiate this strategy

The return value can be called with a task to run it in the specified way.

wrap_fn(f)

the returned function calls the wrapped function in an instance of this strategy

class eshet.utils.RunSerially

Bases: TaskStrategy

strategy for running tasks serially, which allows retries and queue jumping

tasks are pushed to a queue, and are ran in a background task

only_latest: bool = False

skip tasks is there is more than one in the queue

this is useful for idempotent tasks

retry: bool = False

retry failed tasks

the delay between runs starts at retry_start and is multiplied by retry_multiplier each time, up to a maximum of retry_end

assume_failed: bool = False

assume that the task has failed, even if it succeeds

this is useful for tasks which do something inherently unreliable (e.g. sending a message on an unreliable medium); retry and only_latest must be set if this is

build()

instantiate this strategy

The return value can be called with a task to run it in the specified way.

__init__(only_latest: bool = False, retry: bool = False, assume_failed: bool = False, retry_start: float = 2.0, retry_end: float = 30.0, retry_multiplier: float = 2.0) None