Queries
Query, mutation, and infinite query APIs
Types
QueryKey
QueryKey: TypeAlias = tuple[Hashable, ...]Tuple of hashable values identifying a query in the store.
QueryStatus
QueryStatus: TypeAlias = Literal["loading", "success", "error"]ActionResult
ActionResult: TypeAlias = ActionSuccess[T] | ActionErrorActionSuccess
@dataclass(slots=True, frozen=True)
class ActionSuccess(Generic[T]):
data: T
status: Literal["success"] = "success"ActionError
@dataclass(slots=True, frozen=True)
class ActionError:
error: Exception
status: Literal["error"] = "error"@query
Decorator for async data fetching on State methods.
@overload
def query(
fn: Callable[[TState], Awaitable[T]],
*,
stale_time: float = 0.0,
gc_time: float | None = 300.0,
refetch_interval: float | None = None,
keep_previous_data: bool = False,
retries: int = 3,
retry_delay: float | None = None,
initial_data_updated_at: float | datetime | None = None,
enabled: bool = True,
fetch_on_mount: bool = True,
key: QueryKey | None = None,
) -> QueryProperty[T, TState]: ...Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
stale_time | float | 0.0 | Seconds before data is considered stale. 0.0 means always stale (refetch on access). |
gc_time | float | None | 300.0 | Seconds to keep unused query in cache |
refetch_interval | float | None | None | Auto-refetch interval in seconds |
keep_previous_data | bool | False | Keep previous data while refetching. Preserves data across key changes, even if the new key has initial_data. |
retries | int | 3 | Number of retry attempts |
retry_delay | float | None | 2.0 | Delay between retries in seconds |
initial_data_updated_at | float | datetime | None | None | Timestamp for initial data staleness |
enabled | bool | True | Whether query is enabled |
fetch_on_mount | bool | True | Fetch when component mounts |
key | QueryKey | None | None | Static key for shared queries |
Basic Usage
class UserState(ps.State):
user_id: str = ""
@ps.query
async def user(self) -> User:
return await api.get_user(self.user_id)Keyed Query (Shared)
Use .key decorator to share query results across instances:
class UserState(ps.State):
user_id: str = ""
@ps.query
async def user(self) -> User:
return await api.get_user(self.user_id)
@user.key
def _user_key(self):
return ("user", self.user_id)Or with static key:
@ps.query(key=("users", "current"))
async def current_user(self) -> User:
return await api.get_current_user()Decorators
@query_prop.key
Define dynamic query key:
@my_query.key
def _key(self) -> QueryKey:
return ("resource", self.id)@query_prop.initial_data
Provide initial data:
@my_query.initial_data
def _initial(self) -> T:
return default_value@query_prop.on_success
Handle successful fetch:
@my_query.on_success
def _on_success(self, data: T):
self.last_fetched = time.time()@query_prop.on_error
Handle fetch error:
@my_query.on_error
def _on_error(self, error: Exception):
logger.error(f"Failed: {error}")QueryResult
Result object returned when accessing a query property. Type varies based on whether query is keyed.
Properties
| Property | Type | Description |
|---|---|---|
data | T | None | Query data |
error | Exception | None | Last error |
status | QueryStatus | Current status |
is_loading | bool | Status is "loading" |
is_success | bool | Status is "success" |
is_error | bool | Status is "error" |
is_fetching | bool | Currently fetching |
is_scheduled | bool | Fetch is scheduled |
Methods
async refetch(cancel_refetch: bool = True) -> ActionResult[T]
Re-run the query. If cancel_refetch=True, cancels any in-flight request.
result = await state.user.refetch()
if result.status == "success":
print(result.data)async wait() -> ActionResult[T]
Wait for the current in-flight fetch to complete. Does not start a fetch.
async ensure() -> ActionResult[T]
Start the initial fetch if needed, then wait for completion.
invalidate() -> None
Mark the query as stale and trigger a refetch.
state.user.invalidate()set_data(data: T | Callable[[T | None], T]) -> None
Optimistically set data without fetching.
state.user.set_data(lambda prev: {**prev, "name": "Updated"})set_initial_data(data: T | Callable[[], T], *, updated_at: float | datetime | None = None) -> None
Seed initial data while still in loading state.
set_error(error: Exception) -> None
Set error state on the query.
enable() -> None
Enable the query.
disable() -> None
Disable the query, preventing fetches.
@mutation
Decorator for async mutations (write operations) on State methods.
@overload
def mutation(
fn: Callable[Concatenate[TState, P], Awaitable[T]],
) -> MutationProperty[T, TState, P]: ...Usage
class UserState(ps.State):
@ps.mutation
async def update_name(self, name: str) -> User:
return await api.update_user(name=name)Decorators
@mutation_prop.on_success
@update_name.on_success
def _on_success(self, data: User):
self.user.invalidate()@mutation_prop.on_error
@update_name.on_error
def _on_error(self, error: Exception):
logger.error(f"Update failed: {error}")MutationResult
Result object returned when accessing a mutation property.
Properties
| Property | Type | Description |
|---|---|---|
data | T | None | Last mutation result |
error | Exception | None | Last error |
is_running | bool | Mutation in progress |
Calling
Mutations are callable:
result = await state.update_name("New Name")@infinite_query
Decorator for paginated queries. Infinite queries always require a key—either via key= in the decorator or via @query.key.
Why? Unlike regular queries, infinite queries accumulate multiple pages of data. If they supported automatic dependency tracking (like unkeyed queries), a dependency change would reset all loaded pages. This can cause confusing behavior since users may not expect their scroll position and loaded content to reset. Requiring an explicit key makes the caching behavior predictable and forces you to think about when pages should reset.
@overload
def infinite_query(
fn: Callable[[TState, TParam], Awaitable[T]],
*,
initial_page_param: TParam,
max_pages: int = 0,
stale_time: float = 0.0,
gc_time: float | None = 300.0,
refetch_interval: float | None = None,
keep_previous_data: bool = False,
retries: int = 3,
retry_delay: float | None = None,
initial_data_updated_at: float | datetime | None = None,
enabled: bool = True,
fetch_on_mount: bool = True,
key: QueryKey = ..., # required, or use @query.key
) -> InfiniteQueryProperty[T, TParam, TState]: ...Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
initial_page_param | TParam | required | First page parameter |
max_pages | int | 0 | Max pages to keep (0 = unlimited) |
stale_time | float | 0.0 | Seconds before data is considered stale. 0.0 means always stale. |
gc_time | float | None | 300.0 | Seconds to keep unused query in cache |
refetch_interval | float | None | None | Auto-refetch interval in seconds |
keep_previous_data | bool | False | Keep previous data while refetching. Preserves data across key changes. |
retries | int | 3 | Number of retry attempts |
retry_delay | float | None | 2.0 | Delay between retries in seconds |
enabled | bool | True | Whether query is enabled |
fetch_on_mount | bool | True | Fetch when component mounts |
key | QueryKey | required | Query key (use @query.key for dynamic keys) |
Usage
With a static key:
class FeedState(ps.State):
@ps.infinite_query(initial_page_param=None, key=("feed",))
async def posts(self, cursor: str | None) -> list[Post]:
return await api.get_posts(cursor=cursor)
@posts.get_next_page_param
def _next_cursor(self, pages: list[Page]) -> str | None:
return pages[-1].data[-1].id if pages and pages[-1].data else NoneWith a dynamic key (using @query.key decorator):
class FeedState(ps.State):
feed_type: str = "recent"
@ps.infinite_query(initial_page_param=None)
async def posts(self, cursor: str | None) -> list[Post]:
return await api.get_posts(feed_type=self.feed_type, cursor=cursor)
@posts.key
def _posts_key(self):
return ("feed", self.feed_type) # pages reset when feed_type changes
@posts.get_next_page_param
def _next_cursor(self, pages: list[Page]) -> str | None:
return pages[-1].data[-1].id if pages and pages[-1].data else NoneRequired Decorators
@infinite_query_prop.key
Define a dynamic query key. Required if you didn't pass key= to the decorator:
@my_query.key
def _key(self) -> QueryKey:
return ("feed", self.feed_type)@infinite_query_prop.get_next_page_param
Define how to get the next page parameter:
@my_query.get_next_page_param
def _next(self, pages: list[Page[T, TParam]]) -> TParam | None:
return pages[-1].next_cursor if pages else NoneOptional Decorators
@infinite_query_prop.get_previous_page_param
Define how to get the previous page parameter (for bi-directional pagination).
@infinite_query_prop.initial_data
Provide initial pages before the first fetch:
@my_query.initial_data
def _initial(self) -> list[Page[T, TParam]]:
return [Page(data=[], param=None)]@infinite_query_prop.on_success
Handle successful fetch.
@infinite_query_prop.on_error
Handle fetch error.
InfiniteQueryResult
Result object for infinite queries.
Properties
| Property | Type | Description |
|---|---|---|
data | list[Page[T, TParam]] | None | All pages |
pages | list[T] | None | Page data only |
page_params | list[TParam] | None | Page parameters |
error | Exception | None | Last error |
status | QueryStatus | Current status |
is_loading | bool | Status is "loading" |
is_success | bool | Status is "success" |
is_error | bool | Status is "error" |
is_fetching | bool | Currently fetching |
has_next_page | bool | More pages available |
has_previous_page | bool | Previous pages available |
is_fetching_next_page | bool | Fetching next page |
is_fetching_previous_page | bool | Fetching previous page |
Methods
async fetch_next_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
Fetch the next page.
result = await state.posts.fetch_next_page()async fetch_previous_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
Fetch the previous page.
async fetch_page(page_param: TParam, *, cancel_fetch: bool = False, clear: bool = False) -> ActionResult[T | None]
Fetch a specific page by its parameter. If clear=True, clears all other pages and keeps only the fetched page—useful for resetting pagination to a specific page.
async refetch(*, cancel_fetch: bool = False, refetch_page: Callable | None = None) -> ActionResult[list[Page]]
Refetch all pages. Optional refetch_page predicate: (data, index, all_data) -> bool.
async wait() -> ActionResult[list[Page]]
Wait for the current in-flight fetch to complete. Does not start a fetch.
async ensure() -> ActionResult[list[Page]]
Start the initial fetch if needed, then wait for completion.
invalidate() -> None
Mark as stale and trigger refetch.
set_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
Set pages directly. Accepts either a list of pages or an updater function.
set_initial_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
Seed initial data while still in loading state. If the query has already loaded, this is a no-op.
enable() / disable() -> None
Enable/disable the query.
Page
Named tuple for infinite query pages.
class Page(NamedTuple, Generic[T, TParam]):
data: T
param: TParam