Pulse

Async

Real-world applications are full of asynchronous operations: fetching data from APIs, saving to databases, waiting for user input, and more. Pulse has first-class support for async Python, making it easy to build responsive applications that handle these operations gracefully.

Async callbacks

The simplest way to use async in Pulse is with async event handlers. Any callback can be an async function:

import asyncio
import pulse as ps

class SearchState(ps.State):
    query: str = ""
    results: list[str] = []
    loading: bool = False

    async def search(self):
        self.loading = True
        self.results = []

        # Simulate an API call
        await asyncio.sleep(1)
        self.results = [f"Result for '{self.query}' #{i}" for i in range(5)]

        self.loading = False

@ps.component
def SearchBox():
    with ps.init():
        state = SearchState()

    return ps.div()[
        ps.input(
            type="text",
            value=state.query,
            onChange=lambda e: setattr(state, "query", e["target"]["value"]),
            placeholder="Search...",
        ),
        ps.button("Search", onClick=state.search, disabled=state.loading),
        ps.p("Loading..." if state.loading else f"{len(state.results)} results"),
        ps.ul()[
            ps.For(state.results, lambda r, i: ps.li(r, key=i))
        ],
    ]

When the user clicks "Search", the async handler runs. The UI updates immediately to show the loading state, waits for the simulated API call, then updates again with the results.

Automatic batching

Pulse automatically batches state updates that happen synchronously. In an async function, updates are batched between await statements:

class CounterState(ps.State):
    count: int = 0

    async def increment_twice(self):
        # These two updates are batched - only one re-render
        self.count += 1
        self.count += 1

        await asyncio.sleep(1)

        # These two updates are also batched - one more re-render
        self.count += 1
        self.count += 1

This is important for performance. Without batching, each state update would trigger a separate re-render. With batching, Pulse waits until the synchronous code finishes before updating the UI.

Handling concurrent operations

When users can trigger multiple async operations, you need to think about concurrency. What if the user clicks "Search" twice quickly? You might want to:

  1. Cancel the first search when the second starts
  2. Only show results from the most recent search
  3. Prevent new searches while one is running

Here is a pattern for handling this:

class SearchState(ps.State):
    query: str = ""
    results: list[str] = []
    loading: bool = False
    _search_id: int = 0

    async def search(self):
        # Track which search this is
        self._search_id += 1
        current_search = self._search_id

        self.loading = True
        self.results = []

        await asyncio.sleep(1)  # Simulated API call

        # Only update if this is still the latest search
        if current_search == self._search_id:
            self.results = [f"Result for '{self.query}' #{i}" for i in range(5)]
            self.loading = False

The _search_id pattern ensures that only the most recent search updates the results. If a newer search starts, the older one's results are ignored.

Async effects

Effects can also be async. This is useful for background tasks or operations that need to run continuously:

import asyncio
import pulse as ps

class PollingState(ps.State):
    data: dict = None
    polling: bool = False

    @ps.effect(lazy=True)
    async def poll_data(self):
        while True:
            # Fetch fresh data
            await asyncio.sleep(5)
            self.data = await fetch_latest_data()

    def start_polling(self):
        self.polling = True
        self.poll_data.schedule()

    def stop_polling(self):
        self.polling = False
        self.poll_data.cancel()

The lazy=True parameter means the effect does not run automatically. Instead, you control when it starts using .schedule() and .cancel().

Error handling

Always handle errors in async code to prevent your application from crashing:

class DataState(ps.State):
    data: dict = None
    error: str = ""
    loading: bool = False

    async def fetch_data(self):
        self.loading = True
        self.error = ""

        try:
            await asyncio.sleep(1)
            # Simulated API call that might fail
            if random.random() < 0.3:
                raise Exception("Network error")
            self.data = {"message": "Success!"}
        except Exception as e:
            self.error = str(e)
            self.data = None
        finally:
            self.loading = False

Then in your component, display appropriate UI for each state:

@ps.component
def DataDisplay():
    with ps.init():
        state = DataState()

    if state.loading:
        return ps.div("Loading...")

    if state.error:
        return ps.div()[
            ps.p(f"Error: {state.error}", style={"color": "red"}),
            ps.button("Retry", onClick=state.fetch_data),
        ]

    if state.data:
        return ps.div(f"Data: {state.data['message']}")

    return ps.button("Load Data", onClick=state.fetch_data)

Complete example

Here is a more complete example showing a data fetching component with loading states, error handling, and refresh capability:

import asyncio
import pulse as ps
from dataclasses import dataclass

@dataclass
class User:
    id: int
    name: str
    email: str

class UserState(ps.State):
    user_id: int = 1
    user: User = None
    loading: bool = False
    error: str = ""

    async def fetch_user(self):
        self.loading = True
        self.error = ""
        self.user = None

        try:
            # Simulate API call
            await asyncio.sleep(1)
            self.user = User(
                id=self.user_id,
                name=f"User {self.user_id}",
                email=f"user{self.user_id}@example.com",
            )
        except Exception as e:
            self.error = str(e)
        finally:
            self.loading = False

    def next_user(self):
        self.user_id += 1
        # Note: we do not await here, just schedule the async work
        asyncio.create_task(self.fetch_user())

    def prev_user(self):
        if self.user_id > 1:
            self.user_id -= 1
            asyncio.create_task(self.fetch_user())

@ps.component
def UserProfile():
    with ps.init():
        state = UserState()
        # Fetch initial user
        asyncio.create_task(state.fetch_user())

    return ps.div()[
        ps.h2("User Profile"),

        # Navigation
        ps.div()[
            ps.button("Previous", onClick=state.prev_user, disabled=state.user_id <= 1),
            ps.span(f" User #{state.user_id} "),
            ps.button("Next", onClick=state.next_user),
            ps.button("Refresh", onClick=state.fetch_user, disabled=state.loading),
        ],

        # Content
        ps.div()[
            ps.p("Loading...") if state.loading else None,
            ps.p(f"Error: {state.error}", style={"color": "red"}) if state.error else None,
            ps.div()[
                ps.p(f"Name: {state.user.name}"),
                ps.p(f"Email: {state.user.email}"),
            ] if state.user else None,
        ],
    ]

Tips for async code

  1. Always set loading states: Users should know when something is happening in the background.

  2. Handle errors gracefully: Network requests fail. APIs return errors. Be prepared.

  3. Consider race conditions: When multiple async operations can run concurrently, think about which results should win.

  4. Use queries for data fetching: For most data fetching needs, Pulse queries (covered in the next section) provide a better abstraction than raw async handlers.

See also

What to read next

On this page