silverback.utils

silverback.utils.async_wrap_iter(it: Iterator) AsyncIterator

Wrap blocking iterator into an asynchronous one

silverback.utils.clean_hexbytes_dict(data: dict, recurse_count: int = 0) dict

Strips HexBtes objects from dictionary values, as they do not encode well

silverback.utils.decode_topics_from_string(encoded_topics: str) list[list[HexStr] | HexStr | None]

Decode a topic list from a TaskIQ label into Web3py topics

silverback.utils.encode_topics_to_string(topics: list[list[HexStr] | HexStr | None]) str

Encode a topic list to a string, for TaskIQ label

silverback.utils.parse_hexbytes_dict(data: dict, recurse_count: int = 0) dict

Converts any hex string values in a flat dictionary to HexBytes.