silverback.main
The silverback.main
module contains the high-level implementation of the the user’s
Silverback application, meant to be used to expose method handlers and other functionality.
Bases:
defaultdict
Class containing the bot shared state that all workers can read from and write to.
`{warning} This is not networked in any way, nor is it multi-process safe, but will be accessible across multiple thread workers within a single process. `
Usage example:
@bot.on_(...) def do_something_with_state(value): # Read from state using `getattr` ... = bot.state.something # Set state using `setattr` bot.state.something = ... # Read from state using `getitem` ... = bot.state["something"] # Set state using setitem bot.state["something"] = ...
- class silverback.main.SilverbackBot(settings: Settings | None = None)
Bases:
ManagerAccessMixin
The bot singleton. Must be initialized prior to use.
Usage example:
from silverback import SilverbackBot bot = SilverbackBot() ... # Connection has been initialized, can call broker methods e.g. `bot.on_(...)`
- broker_task_decorator(task_type: TaskType, container: BlockContainer | ContractEvent | None = None) Callable[[Callable], AsyncTaskiqDecoratedTask]
Dynamically create a new broker task that handles tasks of
task_type
.`{warning} Dynamically creating a task does not ensure that the runner will be aware of the task in order to trigger it. Use at your own risk. `
- Parameters:
task_type –
TaskType
: The type of task to create.container – (BlockContainer | ContractEvent): The event source to watch.
- Returns:
A function wrapper that will register the task handler.
- Return type:
Callable[[Callable],
AsyncTaskiqDecoratedTask
]- Raises:
ContainerTypeMismatchError – If there is a mismatch between task_type and the container type it should handle.
- on_(container: BlockContainer | ContractEvent, new_block_timeout: int | None = None, start_block: int | None = None)
Create task to handle events created by the container trigger.
- Parameters:
container – (BlockContainer | ContractEvent): The event source to watch.
new_block_timeout – (int | None): Override for block timeout that is acceptable. Defaults to whatever the bot’s settings are for default polling timeout are.
start_block (int | None) – block number to start processing events from. Defaults to whatever the latest block is.
- Raises:
InvalidContainerTypeError – If the type of container is not configurable for the bot.
- on_shutdown() Callable
Code that will be exected by one worker before worker shutdown, after the Runner has decided to put the bot into the “shutdown” state.
Usage example:
@bot.on_shutdown() def do_something_on_shutdown(): ... # Record final state of bot
- on_startup() Callable
Code that will be exected by one worker after worker startup, but before the bot is put into the “run” state by the Runner.
Usage example:
@bot.on_startup() def do_something_on_startup(startup_state: StateSnapshot): ... # Reprocess missed events or blocks
- on_worker_shutdown() Callable
Code to execute on every worker immediately before broker shutdown.
`{note} This is where you should also release any resources you have loaded during worker startup. `
Usage example:
@bot.on_worker_shutdown() def do_something_on_shutdown(state): ... # Update some external service, perhaps using information from `state`.
- on_worker_startup() Callable
Code to execute on every worker immediately after broker startup.
`{note} This is a great place to load heavy dependencies for the workers, such as database connections, ML models, etc. `
Usage example:
@bot.on_worker_startup() def do_something_on_startup(state): ... # Can provision resources, or add things to `state`.
- class silverback.main.SystemConfig(*, sdk_version: str, task_types: list[str])
Bases:
BaseModel
- class silverback.main.TaskData(*, name: str, labels: dict[str, Any])
Bases:
BaseModel