silverback.main

The silverback.main module contains the high-level implementation of the the user’s Silverback application, meant to be used to expose method handlers and other functionality.

class silverback.main.SharedState

Bases: defaultdict

Class containing the bot shared state that all workers can read from and write to.

`{warning} This is not networked in any way, nor is it multi-process safe, but will be accessible across multiple thread workers within a single process. `

Usage example:

@bot.on_(...)
def do_something_with_state(value):
    # Read from state using `getattr`
    ... = bot.state.something

    # Set state using `setattr`
    bot.state.something = ...

    # Read from state using `getitem`
    ... = bot.state["something"]

    # Set state using setitem
    bot.state["something"] = ...
class silverback.main.SilverbackBot(settings: Settings | None = None)

Bases: ManagerAccessMixin

The bot singleton. Must be initialized prior to use.

Usage example:

from silverback import SilverbackBot

bot = SilverbackBot()

...  # Connection has been initialized, can call broker methods e.g. `bot.on_(...)`
broker_task_decorator(task_type: TaskType, container: BlockContainer | ContractEvent | None = None) Callable[[Callable], AsyncTaskiqDecoratedTask]

Dynamically create a new broker task that handles tasks of task_type.

`{warning} Dynamically creating a task does not ensure that the runner will be aware of the task in order to trigger it. Use at your own risk. `

Parameters:
  • task_typeTaskType: The type of task to create.

  • container – (BlockContainer | ContractEvent): The event source to watch.

Returns:

A function wrapper that will register the task handler.

Return type:

Callable[[Callable], AsyncTaskiqDecoratedTask]

Raises:

ContainerTypeMismatchError – If there is a mismatch between task_type and the container type it should handle.

on_(container: BlockContainer | ContractEvent, new_block_timeout: int | None = None, start_block: int | None = None)

Create task to handle events created by the container trigger.

Parameters:
  • container – (BlockContainer | ContractEvent): The event source to watch.

  • new_block_timeout – (int | None): Override for block timeout that is acceptable. Defaults to whatever the bot’s settings are for default polling timeout are.

  • start_block (int | None) – block number to start processing events from. Defaults to whatever the latest block is.

Raises:

InvalidContainerTypeError – If the type of container is not configurable for the bot.

on_shutdown() Callable

Code that will be exected by one worker before worker shutdown, after the Runner has decided to put the bot into the “shutdown” state.

Usage example:

@bot.on_shutdown()
def do_something_on_shutdown():
    ...  # Record final state of bot
on_startup() Callable

Code that will be exected by one worker after worker startup, but before the bot is put into the “run” state by the Runner.

Usage example:

@bot.on_startup()
def do_something_on_startup(startup_state: StateSnapshot):
    ...  # Reprocess missed events or blocks
on_worker_shutdown() Callable

Code to execute on every worker immediately before broker shutdown.

`{note} This is where you should also release any resources you have loaded during worker startup. `

Usage example:

@bot.on_worker_shutdown()
def do_something_on_shutdown(state):
    ...  # Update some external service, perhaps using information from `state`.
on_worker_startup() Callable

Code to execute on every worker immediately after broker startup.

`{note} This is a great place to load heavy dependencies for the workers, such as database connections, ML models, etc. `

Usage example:

@bot.on_worker_startup()
def do_something_on_startup(state):
    ...  # Can provision resources, or add things to `state`.
class silverback.main.SystemConfig(*, sdk_version: str, task_types: list[str])

Bases: BaseModel

class silverback.main.TaskData(*, name: str, labels: dict[str, Any])

Bases: BaseModel