Queries
Query, mutation, and infinite query APIs
Types
QueryKey
QueryKey: TypeAlias = tuple[Hashable, ...]Tuple of hashable values identifying a query in the store.
QueryStatus
QueryStatus: TypeAlias = Literal["loading", "success", "error"]ActionResult
ActionResult: TypeAlias = ActionSuccess[T] | ActionErrorActionSuccess
@dataclass(slots=True, frozen=True)
class ActionSuccess(Generic[T]):
data: T
status: Literal["success"] = "success"ActionError
@dataclass(slots=True, frozen=True)
class ActionError:
error: Exception
status: Literal["error"] = "error"@query
Decorator for async data fetching on State methods.
@overload
def query(
fn: Callable[[TState], Awaitable[T]],
*,
stale_time: float = 0.0,
gc_time: float | None = 300.0,
refetch_interval: float | None = None,
keep_previous_data: bool = False,
retries: int = 3,
retry_delay: float | None = None,
initial_data_updated_at: float | datetime | None = None,
enabled: bool = True,
fetch_on_mount: bool = True,
key: QueryKey | None = None,
) -> QueryProperty[T, TState]: ...Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
stale_time | float | 0.0 | Seconds before data is considered stale |
gc_time | float | None | 300.0 | Seconds to keep unused query in cache |
refetch_interval | float | None | None | Auto-refetch interval in seconds |
keep_previous_data | bool | False | Keep previous data while refetching |
retries | int | 3 | Number of retry attempts |
retry_delay | float | None | 2.0 | Delay between retries in seconds |
initial_data_updated_at | float | datetime | None | None | Timestamp for initial data staleness |
enabled | bool | True | Whether query is enabled |
fetch_on_mount | bool | True | Fetch when component mounts |
key | QueryKey | None | None | Static key for shared queries |
Basic Usage
class UserState(ps.State):
user_id: str = ""
@ps.query
async def user(self) -> User:
return await api.get_user(self.user_id)Keyed Query (Shared)
Use .key decorator to share query results across instances:
class UserState(ps.State):
user_id: str = ""
@ps.query
async def user(self) -> User:
return await api.get_user(self.user_id)
@user.key
def _user_key(self):
return ("user", self.user_id)Or with static key:
@ps.query(key=("users", "current"))
async def current_user(self) -> User:
return await api.get_current_user()Decorators
@query_prop.key
Define dynamic query key:
@my_query.key
def _key(self) -> QueryKey:
return ("resource", self.id)@query_prop.initial_data
Provide initial data:
@my_query.initial_data
def _initial(self) -> T:
return default_value@query_prop.on_success
Handle successful fetch:
@my_query.on_success
def _on_success(self, data: T):
self.last_fetched = time.time()@query_prop.on_error
Handle fetch error:
@my_query.on_error
def _on_error(self, error: Exception):
logger.error(f"Failed: {error}")QueryResult
Result object returned when accessing a query property. Type varies based on whether query is keyed.
Properties
| Property | Type | Description |
|---|---|---|
data | T | None | Query data |
error | Exception | None | Last error |
status | QueryStatus | Current status |
is_loading | bool | Status is "loading" |
is_success | bool | Status is "success" |
is_error | bool | Status is "error" |
is_fetching | bool | Currently fetching |
is_scheduled | bool | Fetch is scheduled |
Methods
async refetch(cancel_refetch: bool = True) -> ActionResult[T]
Re-run the query. If cancel_refetch=True, cancels any in-flight request.
result = await state.user.refetch()
if result.status == "success":
print(result.data)async wait() -> ActionResult[T]
Wait for the current query to complete.
invalidate() -> None
Mark the query as stale and trigger a refetch.
state.user.invalidate()set_data(data: T | Callable[[T | None], T]) -> None
Optimistically set data without fetching.
state.user.set_data(lambda prev: {**prev, "name": "Updated"})set_initial_data(data: T | Callable[[], T], *, updated_at: float | datetime | None = None) -> None
Seed initial data while still in loading state.
set_error(error: Exception) -> None
Set error state on the query.
enable() -> None
Enable the query.
disable() -> None
Disable the query, preventing fetches.
@mutation
Decorator for async mutations (write operations) on State methods.
@overload
def mutation(
fn: Callable[Concatenate[TState, P], Awaitable[T]],
) -> MutationProperty[T, TState, P]: ...Usage
class UserState(ps.State):
@ps.mutation
async def update_name(self, name: str) -> User:
return await api.update_user(name=name)Decorators
@mutation_prop.on_success
@update_name.on_success
def _on_success(self, data: User):
self.user.invalidate()@mutation_prop.on_error
@update_name.on_error
def _on_error(self, error: Exception):
logger.error(f"Update failed: {error}")MutationResult
Result object returned when accessing a mutation property.
Properties
| Property | Type | Description |
|---|---|---|
data | T | None | Last mutation result |
error | Exception | None | Last error |
is_running | bool | Mutation in progress |
Calling
Mutations are callable:
result = await state.update_name("New Name")@infinite_query
Decorator for paginated queries.
@overload
def infinite_query(
fn: Callable[[TState, TParam], Awaitable[T]],
*,
initial_page_param: TParam,
max_pages: int = 0,
stale_time: float = 0.0,
gc_time: float | None = 300.0,
refetch_interval: float | None = None,
keep_previous_data: bool = False,
retries: int = 3,
retry_delay: float | None = None,
initial_data_updated_at: float | datetime | None = None,
enabled: bool = True,
fetch_on_mount: bool = True,
key: QueryKey | None = None,
) -> InfiniteQueryProperty[T, TParam, TState]: ...Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
initial_page_param | TParam | required | First page parameter |
max_pages | int | 0 | Max pages to keep (0 = unlimited) |
(other parameters same as @query) |
Usage
class FeedState(ps.State):
@ps.infinite_query(initial_page_param=None, key=("feed",))
async def posts(self, cursor: str | None) -> list[Post]:
return await api.get_posts(cursor=cursor)
@posts.key
def _posts_key(self):
return ("feed", self.feed_type)
@posts.get_next_page_param
def _next_cursor(self, pages: list[Page]) -> str | None:
if not pages:
return None
last = pages[-1]
return last.data[-1].id if last.data else None
@posts.get_previous_page_param
def _prev_cursor(self, pages: list[Page]) -> str | None:
if not pages:
return None
first = pages[0]
return first.data[0].id if first.data else NoneRequired Decorators
@infinite_query_prop.key
Define the query key (required for infinite queries):
@my_query.key
def _key(self) -> QueryKey:
return ("feed", self.feed_type)@infinite_query_prop.get_next_page_param
Define how to get the next page parameter:
@my_query.get_next_page_param
def _next(self, pages: list[Page[T, TParam]]) -> TParam | None:
return pages[-1].next_cursor if pages else NoneOptional Decorators
@infinite_query_prop.get_previous_page_param
Define how to get the previous page parameter (for bi-directional pagination).
@infinite_query_prop.on_success
Handle successful fetch.
@infinite_query_prop.on_error
Handle fetch error.
InfiniteQueryResult
Result object for infinite queries.
Properties
| Property | Type | Description |
|---|---|---|
data | list[Page[T, TParam]] | None | All pages |
pages | list[T] | None | Page data only |
page_params | list[TParam] | None | Page parameters |
error | Exception | None | Last error |
status | QueryStatus | Current status |
is_loading | bool | Status is "loading" |
is_success | bool | Status is "success" |
is_error | bool | Status is "error" |
is_fetching | bool | Currently fetching |
has_next_page | bool | More pages available |
has_previous_page | bool | Previous pages available |
is_fetching_next_page | bool | Fetching next page |
is_fetching_previous_page | bool | Fetching previous page |
Methods
async fetch_next_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
Fetch the next page.
result = await state.posts.fetch_next_page()async fetch_previous_page(*, cancel_fetch: bool = False) -> ActionResult[Page | None]
Fetch the previous page.
async fetch_page(page_param: TParam, *, cancel_fetch: bool = False) -> ActionResult[T | None]
Refetch a specific page by its parameter.
async refetch(*, cancel_fetch: bool = False, refetch_page: Callable | None = None) -> ActionResult[list[Page]]
Refetch all pages. Optional refetch_page predicate: (data, index, all_data) -> bool.
async wait() -> ActionResult[list[Page]]
Wait for current fetch to complete.
invalidate() -> None
Mark as stale and trigger refetch.
set_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
Set pages directly.
set_initial_data(pages: list[Page] | Callable, updated_at: float | datetime | None = None) -> None
Seed initial data.
enable() / disable() -> None
Enable/disable the query.
Page
Named tuple for infinite query pages.
class Page(NamedTuple, Generic[T, TParam]):
data: T
param: TParam