Pulse

Queries

Query, mutation, and infinite query APIs

Types

QueryKey

QueryKey: TypeAlias = tuple[Hashable, ...]

Tuple of hashable values identifying a query in the store.

QueryStatus

QueryStatus: TypeAlias = Literal["loading", "success", "error"]

ActionResult

ActionResult: TypeAlias = ActionSuccess[T] | ActionError

ActionSuccess

@dataclass(slots=True, frozen=True)
class ActionSuccess(Generic[T]):
    data: T
    status: Literal["success"] = "success"

ActionError

@dataclass(slots=True, frozen=True)
class ActionError:
    error: Exception
    status: Literal["error"] = "error"

@query

Decorator for async data fetching on State methods.

@overload
def query(
    fn: Callable[[TState], Awaitable[T]],
    *,
    stale_time: float = 0.0,
    gc_time: float | None = 300.0,
    refetch_interval: float | None = None,
    keep_previous_data: bool = False,
    retries: int = 3,
    retry_delay: float | None = None,
    initial_data_updated_at: float | datetime | None = None,
    enabled: bool = True,
    fetch_on_mount: bool = True,
    key: QueryKey | None = None,
) -> QueryProperty[T, TState]: ...

Parameters

ParameterTypeDefaultDescription
stale_timefloat0.0Seconds before data is considered stale
gc_timefloat | None300.0Seconds to keep unused query in cache
refetch_intervalfloat | NoneNoneAuto-refetch interval in seconds
keep_previous_databoolFalseKeep previous data while refetching
retriesint3Number of retry attempts
retry_delayfloat | None2.0Delay between retries in seconds
initial_data_updated_atfloat | datetime | NoneNoneTimestamp for initial data staleness
enabledboolTrueWhether query is enabled
fetch_on_mountboolTrueFetch when component mounts
keyQueryKey | NoneNoneStatic key for shared queries

Basic Usage

class UserState(ps.State):
    user_id: str = ""

    @ps.query
    async def user(self) -> User:
        return await api.get_user(self.user_id)

Keyed Query (Shared)

Use .key decorator to share query results across instances:

class UserState(ps.State):
    user_id: str = ""

    @ps.query
    async def user(self) -> User:
        return await api.get_user(self.user_id)

    @user.key
    def _user_key(self):
        return ("user", self.user_id)

Or with static key:

@ps.query(key=("users", "current"))
async def current_user(self) -> User:
    return await api.get_current_user()

Decorators

@query_prop.key

Define dynamic query key:

@my_query.key
def _key(self) -> QueryKey:
    return ("resource", self.id)

@query_prop.initial_data

Provide initial data:

@my_query.initial_data
def _initial(self) -> T:
    return default_value

@query_prop.on_success

Handle successful fetch:

@my_query.on_success
def _on_success(self, data: T):
    self.last_fetched = time.time()

@query_prop.on_error

Handle fetch error:

@my_query.on_error
def _on_error(self, error: Exception):
    logger.error(f"Failed: {error}")

QueryResult

Result object returned when accessing a query property. Type varies based on whether query is keyed.

Properties

PropertyTypeDescription
dataT | NoneQuery data
errorException | NoneLast error
statusQueryStatusCurrent status
is_loadingboolStatus is "loading"
is_successboolStatus is "success"
is_errorboolStatus is "error"
is_fetchingboolCurrently fetching
is_scheduledboolFetch is scheduled

Methods

async refetch(cancel_refetch: bool = True) -> ActionResult[T]

Re-run the query. If cancel_refetch=True, cancels any in-flight request.

result = await state.user.refetch()
if result.status == "success":
    print(result.data)

async wait() -> ActionResult[T]

Wait for the current query to complete.

invalidate() -> None

Mark the query as stale and trigger a refetch.

state.user.invalidate()

set_data(data: T | Callable[[T | None], T]) -> None

Optimistically set data without fetching.

state.user.set_data(lambda prev: {**prev, "name": "Updated"})

set_initial_data(data: T | Callable[[], T], *, updated_at: float | datetime | None = None) -> None

Seed initial data while still in loading state.

set_error(error: Exception) -> None

Set error state on the query.

enable() -> None

Enable the query.

disable() -> None

Disable the query, preventing fetches.


@mutation

Decorator for async mutations (write operations) on State methods.

@overload
def mutation(
    fn: Callable[Concatenate[TState, P], Awaitable[T]],
) -> MutationProperty[T, TState, P]: ...

Usage

class UserState(ps.State):
    @ps.mutation
    async def update_name(self, name: str) -> User:
        return await api.update_user(name=name)

Decorators

@mutation_prop.on_success

@update_name.on_success
def _on_success(self, data: User):
    self.user.invalidate()

@mutation_prop.on_error

@update_name.on_error
def _on_error(self, error: Exception):
    logger.error(f"Update failed: {error}")

MutationResult

Result object returned when accessing a mutation property.

Properties

PropertyTypeDescription
dataT | NoneLast mutation result
errorException | NoneLast error
is_runningboolMutation in progress

Calling

Mutations are callable:

result = await state.update_name("New Name")

@infinite_query

Decorator for paginated queries.

@overload
def infinite_query(
    fn: Callable[[TState, TParam], Awaitable[T]],
    *,
    initial_page_param: TParam,
    max_pages: int = 0,
    stale_time: float = 0.0,
    gc_time: float | None = 300.0,
    refetch_interval: float | None = None,
    keep_previous_data: bool = False,
    retries: int = 3,
    retry_delay: float | None = None,
    initial_data_updated_at: float | datetime | None = None,
    enabled: bool = True,
    fetch_on_mount: bool = True,
    key: QueryKey | None = None,
) -> InfiniteQueryProperty[T, TParam, TState]: ...

Parameters

ParameterTypeDefaultDescription
initial_page_paramTParamrequiredFirst page parameter
max_pagesint0Max pages to keep (0 = unlimited)
(other parameters same as @query)

Usage

class FeedState(ps.State):
    @ps.infinite_query(initial_page_param=None, key=("feed",))
    async def posts(self, cursor: str | None) -> list[Post]:
        return await api.get_posts(cursor=cursor)

    @posts.key
    def _posts_key(self):
        return ("feed", self.feed_type)

    @posts.get_next_page_param
    def _next_cursor(self, pages: list[Page]) -> str | None:
        if not pages:
            return None
        last = pages[-1]
        return last.data[-1].id if last.data else None

    @posts.get_previous_page_param
    def _prev_cursor(self, pages: list[Page]) -> str | None:
        if not pages:
            return None
        first = pages[0]
        return first.data[0].id if first.data else None

Required Decorators

@infinite_query_prop.key

Define the query key (required for infinite queries):

@my_query.key
def _key(self) -> QueryKey:
    return ("feed", self.feed_type)

@infinite_query_prop.get_next_page_param

Define how to get the next page parameter:

@my_query.get_next_page_param
def _next(self, pages: list[Page[T, TParam]]) -> TParam | None:
    return pages[-1].next_cursor if pages else None

Optional Decorators

@infinite_query_prop.get_previous_page_param

Define how to get the previous page parameter (for bi-directional pagination).

@infinite_query_prop.on_success

Handle successful fetch.

@infinite_query_prop.on_error

Handle fetch error.


InfiniteQueryResult

Result object for infinite queries.

Properties

PropertyTypeDescription
datalist[Page[T, TParam]] | NoneAll pages
pageslist[T] | NonePage data only
page_paramslist[TParam] | NonePage parameters
errorException | NoneLast error
statusQueryStatusCurrent status
is_loadingboolStatus is "loading"
is_successboolStatus is "success"
is_errorboolStatus is "error"
is_fetchingboolCurrently fetching
has_next_pageboolMore pages available
has_previous_pageboolPrevious pages available
is_fetching_next_pageboolFetching next page
is_fetching_previous_pageboolFetching previous page

Methods

async fetch_next_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]

Fetch the next page.

result = await state.posts.fetch_next_page()

async fetch_previous_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]

Fetch the previous page.

async fetch_page(page_param: TParam, *, cancel_fetch: bool = False) -> ActionResult[T | None]

Refetch a specific page by its parameter.

async refetch(*, cancel_fetch: bool = False, refetch_page: Callable | None = None) -> ActionResult[list[Page]]

Refetch all pages. Optional refetch_page predicate: (data, index, all_data) -> bool.

async wait() -> ActionResult[list[Page]]

Wait for current fetch to complete.

invalidate() -> None

Mark as stale and trigger refetch.

set_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None

Set pages directly.

set_initial_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None

Seed initial data.

enable() / disable() -> None

Enable/disable the query.


Page

Named tuple for infinite query pages.

class Page(NamedTuple, Generic[T, TParam]):
    data: T
    param: TParam

On this page

Types
TOC connector
QueryKey
QueryStatus
ActionResult
ActionSuccess
ActionError
TOC connector
@query
TOC connector
Parameters
Basic Usage
Keyed Query (Shared)
Decorators
@query_prop.key
@query_prop.initial_data
@query_prop.on_success
@query_prop.on_error
TOC connector
QueryResult
TOC connector
Properties
Methods
async refetch(cancel_refetch: bool = True) -> ActionResult[T]
async wait() -> ActionResult[T]
invalidate() -> None
set_data(data: T | Callable[[T | None], T]) -> None
set_initial_data(data: T | Callable[[], T], *, updated_at: float | datetime | None = None) -> None
set_error(error: Exception) -> None
enable() -> None
disable() -> None
TOC connector
@mutation
TOC connector
Usage
Decorators
@mutation_prop.on_success
@mutation_prop.on_error
TOC connector
MutationResult
TOC connector
Properties
Calling
TOC connector
@infinite_query
TOC connector
Parameters
Usage
Required Decorators
@infinite_query_prop.key
@infinite_query_prop.get_next_page_param
Optional Decorators
@infinite_query_prop.get_previous_page_param
@infinite_query_prop.on_success
@infinite_query_prop.on_error
TOC connector
InfiniteQueryResult
TOC connector
Properties
Methods
async fetch_next_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
async fetch_previous_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
async fetch_page(page_param: TParam, *, cancel_fetch: bool = False) -> ActionResult[T | None]
async refetch(*, cancel_fetch: bool = False, refetch_page: Callable | None = None) -> ActionResult[list[Page]]
async wait() -> ActionResult[list[Page]]
invalidate() -> None
set_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
set_initial_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
enable() / disable() -> None
TOC connector
Page