Pulse

Queries

Query, mutation, and infinite query APIs

Types

QueryKey

QueryKey: TypeAlias = tuple[Hashable, ...]

Tuple of hashable values identifying a query in the store.

QueryStatus

QueryStatus: TypeAlias = Literal["loading", "success", "error"]

ActionResult

ActionResult: TypeAlias = ActionSuccess[T] | ActionError

ActionSuccess

@dataclass(slots=True, frozen=True)
class ActionSuccess(Generic[T]):
    data: T
    status: Literal["success"] = "success"

ActionError

@dataclass(slots=True, frozen=True)
class ActionError:
    error: Exception
    status: Literal["error"] = "error"

@query

Decorator for async data fetching on State methods.

@overload
def query(
    fn: Callable[[TState], Awaitable[T]],
    *,
    stale_time: float = 0.0,
    gc_time: float | None = 300.0,
    refetch_interval: float | None = None,
    keep_previous_data: bool = False,
    retries: int = 3,
    retry_delay: float | None = None,
    initial_data_updated_at: float | datetime | None = None,
    enabled: bool = True,
    fetch_on_mount: bool = True,
    key: QueryKey | None = None,
) -> QueryProperty[T, TState]: ...

Parameters

ParameterTypeDefaultDescription
stale_timefloat0.0Seconds before data is considered stale. 0.0 means always stale (refetch on access).
gc_timefloat | None300.0Seconds to keep unused query in cache
refetch_intervalfloat | NoneNoneAuto-refetch interval in seconds
keep_previous_databoolFalseKeep previous data while refetching. Preserves data across key changes, even if the new key has initial_data.
retriesint3Number of retry attempts
retry_delayfloat | None2.0Delay between retries in seconds
initial_data_updated_atfloat | datetime | NoneNoneTimestamp for initial data staleness
enabledboolTrueWhether query is enabled
fetch_on_mountboolTrueFetch when component mounts
keyQueryKey | NoneNoneStatic key for shared queries

Basic Usage

class UserState(ps.State):
    user_id: str = ""

    @ps.query
    async def user(self) -> User:
        return await api.get_user(self.user_id)

Keyed Query (Shared)

Use .key decorator to share query results across instances:

class UserState(ps.State):
    user_id: str = ""

    @ps.query
    async def user(self) -> User:
        return await api.get_user(self.user_id)

    @user.key
    def _user_key(self):
        return ("user", self.user_id)

Or with static key:

@ps.query(key=("users", "current"))
async def current_user(self) -> User:
    return await api.get_current_user()

Decorators

@query_prop.key

Define dynamic query key:

@my_query.key
def _key(self) -> QueryKey:
    return ("resource", self.id)

@query_prop.initial_data

Provide initial data:

@my_query.initial_data
def _initial(self) -> T:
    return default_value

@query_prop.on_success

Handle successful fetch:

@my_query.on_success
def _on_success(self, data: T):
    self.last_fetched = time.time()

@query_prop.on_error

Handle fetch error:

@my_query.on_error
def _on_error(self, error: Exception):
    logger.error(f"Failed: {error}")

QueryResult

Result object returned when accessing a query property. Type varies based on whether query is keyed.

Properties

PropertyTypeDescription
dataT | NoneQuery data
errorException | NoneLast error
statusQueryStatusCurrent status
is_loadingboolStatus is "loading"
is_successboolStatus is "success"
is_errorboolStatus is "error"
is_fetchingboolCurrently fetching
is_scheduledboolFetch is scheduled

Methods

async refetch(cancel_refetch: bool = True) -> ActionResult[T]

Re-run the query. If cancel_refetch=True, cancels any in-flight request.

result = await state.user.refetch()
if result.status == "success":
    print(result.data)

async wait() -> ActionResult[T]

Wait for the current in-flight fetch to complete. Does not start a fetch.

async ensure() -> ActionResult[T]

Start the initial fetch if needed, then wait for completion.

invalidate() -> None

Mark the query as stale and trigger a refetch.

state.user.invalidate()

set_data(data: T | Callable[[T | None], T]) -> None

Optimistically set data without fetching.

state.user.set_data(lambda prev: {**prev, "name": "Updated"})

set_initial_data(data: T | Callable[[], T], *, updated_at: float | datetime | None = None) -> None

Seed initial data while still in loading state.

set_error(error: Exception) -> None

Set error state on the query.

enable() -> None

Enable the query.

disable() -> None

Disable the query, preventing fetches.


@mutation

Decorator for async mutations (write operations) on State methods.

@overload
def mutation(
    fn: Callable[Concatenate[TState, P], Awaitable[T]],
) -> MutationProperty[T, TState, P]: ...

Usage

class UserState(ps.State):
    @ps.mutation
    async def update_name(self, name: str) -> User:
        return await api.update_user(name=name)

Decorators

@mutation_prop.on_success

@update_name.on_success
def _on_success(self, data: User):
    self.user.invalidate()

@mutation_prop.on_error

@update_name.on_error
def _on_error(self, error: Exception):
    logger.error(f"Update failed: {error}")

MutationResult

Result object returned when accessing a mutation property.

Properties

PropertyTypeDescription
dataT | NoneLast mutation result
errorException | NoneLast error
is_runningboolMutation in progress

Calling

Mutations are callable:

result = await state.update_name("New Name")

@infinite_query

Decorator for paginated queries. Infinite queries always require a key—either via key= in the decorator or via @query.key.

Why? Unlike regular queries, infinite queries accumulate multiple pages of data. If they supported automatic dependency tracking (like unkeyed queries), a dependency change would reset all loaded pages. This can cause confusing behavior since users may not expect their scroll position and loaded content to reset. Requiring an explicit key makes the caching behavior predictable and forces you to think about when pages should reset.

@overload
def infinite_query(
    fn: Callable[[TState, TParam], Awaitable[T]],
    *,
    initial_page_param: TParam,
    max_pages: int = 0,
    stale_time: float = 0.0,
    gc_time: float | None = 300.0,
    refetch_interval: float | None = None,
    keep_previous_data: bool = False,
    retries: int = 3,
    retry_delay: float | None = None,
    initial_data_updated_at: float | datetime | None = None,
    enabled: bool = True,
    fetch_on_mount: bool = True,
    key: QueryKey = ...,  # required, or use @query.key
) -> InfiniteQueryProperty[T, TParam, TState]: ...

Parameters

ParameterTypeDefaultDescription
initial_page_paramTParamrequiredFirst page parameter
max_pagesint0Max pages to keep (0 = unlimited)
stale_timefloat0.0Seconds before data is considered stale. 0.0 means always stale.
gc_timefloat | None300.0Seconds to keep unused query in cache
refetch_intervalfloat | NoneNoneAuto-refetch interval in seconds
keep_previous_databoolFalseKeep previous data while refetching. Preserves data across key changes.
retriesint3Number of retry attempts
retry_delayfloat | None2.0Delay between retries in seconds
enabledboolTrueWhether query is enabled
fetch_on_mountboolTrueFetch when component mounts
keyQueryKeyrequiredQuery key (use @query.key for dynamic keys)

Usage

With a static key:

class FeedState(ps.State):
    @ps.infinite_query(initial_page_param=None, key=("feed",))
    async def posts(self, cursor: str | None) -> list[Post]:
        return await api.get_posts(cursor=cursor)

    @posts.get_next_page_param
    def _next_cursor(self, pages: list[Page]) -> str | None:
        return pages[-1].data[-1].id if pages and pages[-1].data else None

With a dynamic key (using @query.key decorator):

class FeedState(ps.State):
    feed_type: str = "recent"

    @ps.infinite_query(initial_page_param=None)
    async def posts(self, cursor: str | None) -> list[Post]:
        return await api.get_posts(feed_type=self.feed_type, cursor=cursor)

    @posts.key
    def _posts_key(self):
        return ("feed", self.feed_type)  # pages reset when feed_type changes

    @posts.get_next_page_param
    def _next_cursor(self, pages: list[Page]) -> str | None:
        return pages[-1].data[-1].id if pages and pages[-1].data else None

Required Decorators

@infinite_query_prop.key

Define a dynamic query key. Required if you didn't pass key= to the decorator:

@my_query.key
def _key(self) -> QueryKey:
    return ("feed", self.feed_type)

@infinite_query_prop.get_next_page_param

Define how to get the next page parameter:

@my_query.get_next_page_param
def _next(self, pages: list[Page[T, TParam]]) -> TParam | None:
    return pages[-1].next_cursor if pages else None

Optional Decorators

@infinite_query_prop.get_previous_page_param

Define how to get the previous page parameter (for bi-directional pagination).

@infinite_query_prop.initial_data

Provide initial pages before the first fetch:

@my_query.initial_data
def _initial(self) -> list[Page[T, TParam]]:
    return [Page(data=[], param=None)]

@infinite_query_prop.on_success

Handle successful fetch.

@infinite_query_prop.on_error

Handle fetch error.


InfiniteQueryResult

Result object for infinite queries.

Properties

PropertyTypeDescription
datalist[Page[T, TParam]] | NoneAll pages
pageslist[T] | NonePage data only
page_paramslist[TParam] | NonePage parameters
errorException | NoneLast error
statusQueryStatusCurrent status
is_loadingboolStatus is "loading"
is_successboolStatus is "success"
is_errorboolStatus is "error"
is_fetchingboolCurrently fetching
has_next_pageboolMore pages available
has_previous_pageboolPrevious pages available
is_fetching_next_pageboolFetching next page
is_fetching_previous_pageboolFetching previous page

Methods

async fetch_next_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]

Fetch the next page.

result = await state.posts.fetch_next_page()

async fetch_previous_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]

Fetch the previous page.

async fetch_page(page_param: TParam, *, cancel_fetch: bool = False, clear: bool = False) -> ActionResult[T | None]

Fetch a specific page by its parameter. If clear=True, clears all other pages and keeps only the fetched page—useful for resetting pagination to a specific page.

async refetch(*, cancel_fetch: bool = False, refetch_page: Callable | None = None) -> ActionResult[list[Page]]

Refetch all pages. Optional refetch_page predicate: (data, index, all_data) -> bool.

async wait() -> ActionResult[list[Page]]

Wait for the current in-flight fetch to complete. Does not start a fetch.

async ensure() -> ActionResult[list[Page]]

Start the initial fetch if needed, then wait for completion.

invalidate() -> None

Mark as stale and trigger refetch.

set_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None

Set pages directly. Accepts either a list of pages or an updater function.

set_initial_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None

Seed initial data while still in loading state. If the query has already loaded, this is a no-op.

enable() / disable() -> None

Enable/disable the query.


Page

Named tuple for infinite query pages.

class Page(NamedTuple, Generic[T, TParam]):
    data: T
    param: TParam

On this page

Types
TOC connector
QueryKey
QueryStatus
ActionResult
ActionSuccess
ActionError
TOC connector
@query
TOC connector
Parameters
Basic Usage
Keyed Query (Shared)
Decorators
@query_prop.key
@query_prop.initial_data
@query_prop.on_success
@query_prop.on_error
TOC connector
QueryResult
TOC connector
Properties
Methods
async refetch(cancel_refetch: bool = True) -> ActionResult[T]
async wait() -> ActionResult[T]
async ensure() -> ActionResult[T]
invalidate() -> None
set_data(data: T | Callable[[T | None], T]) -> None
set_initial_data(data: T | Callable[[], T], *, updated_at: float | datetime | None = None) -> None
set_error(error: Exception) -> None
enable() -> None
disable() -> None
TOC connector
@mutation
TOC connector
Usage
Decorators
@mutation_prop.on_success
@mutation_prop.on_error
TOC connector
MutationResult
TOC connector
Properties
Calling
TOC connector
@infinite_query
TOC connector
Parameters
Usage
Required Decorators
@infinite_query_prop.key
@infinite_query_prop.get_next_page_param
Optional Decorators
@infinite_query_prop.get_previous_page_param
@infinite_query_prop.initial_data
@infinite_query_prop.on_success
@infinite_query_prop.on_error
TOC connector
InfiniteQueryResult
TOC connector
Properties
Methods
async fetch_next_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
async fetch_previous_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
async fetch_page(page_param: TParam, *, cancel_fetch: bool = False, clear: bool = False) -> ActionResult[T | None]
async refetch(*, cancel_fetch: bool = False, refetch_page: Callable | None = None) -> ActionResult[list[Page]]
async wait() -> ActionResult[list[Page]]
async ensure() -> ActionResult[list[Page]]
invalidate() -> None
set_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
set_initial_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
enable() / disable() -> None
TOC connector
Page