Pulse

Scheduling

Timer and task scheduling primitives

Pulse provides scheduling primitives for running code after delays, at intervals, or as background tasks. These are automatically tracked and cleaned up when the render session or app closes.

All scheduling functions are available from pulse.scheduling:

from pulse.scheduling import later, repeat, create_task, create_future, call_soon

For common cases, later and repeat are also available directly on the pulse module:

import pulse as ps

ps.later(1.0, do_something)
ps.repeat(5.0, poll_data)

later

Schedule a callback to run after a delay.

def later(
    delay: float,
    fn: Callable[P, Any],
    *args: P.args,
    **kwargs: P.kwargs,
) -> TimerHandleLike

Parameters

ParameterTypeDefaultDescription
delayfloatrequiredSeconds to wait before running
fnCallablerequiredFunction to call (sync or async)
*argsAnyPositional arguments for fn
**kwargsAnyKeyword arguments for fn

Returns

TimerHandleLike with .cancel() method to cancel the scheduled callback.

Example

import pulse as ps

class NotificationState(ps.State):
    message: str = ""
    visible: bool = False

    def show(self, text: str):
        self.message = text
        self.visible = True
        # Auto-hide after 3 seconds
        ps.later(3.0, self.hide)

    def hide(self):
        self.visible = False

Async callbacks

If the callback is async or returns a coroutine, it's automatically awaited:

async def fetch_and_update():
    data = await api.fetch()
    state.data = data

# Works with async functions
ps.later(5.0, fetch_and_update)

# Also works with sync functions that return coroutines
def refresh():
    return fetch_and_update()

ps.later(5.0, refresh)

Cancelling

Call .cancel() on the returned handle to prevent the callback from running:

class TypeaheadState(ps.State):
    query: str = ""
    _debounce_handle: TimerHandleLike | None = None

    def on_input(self, value: str):
        self.query = value
        # Cancel previous debounce
        if self._debounce_handle:
            self._debounce_handle.cancel()
        # Schedule new search
        self._debounce_handle = ps.later(0.3, self.search)

    async def search(self):
        results = await api.search(self.query)
        self.results = results

repeat

Run a callback repeatedly at a fixed interval.

def repeat(
    interval: float,
    fn: Callable[P, Any],
    *args: P.args,
    **kwargs: P.kwargs,
) -> RepeatHandle

Parameters

ParameterTypeDefaultDescription
intervalfloatrequiredSeconds between executions
fnCallablerequiredFunction to call (sync or async)
*argsAnyPositional arguments for fn
**kwargsAnyKeyword arguments for fn

Returns

RepeatHandle with .cancel() method to stop the repeating callback.

Example

import pulse as ps

class LiveDataState(ps.State):
    data: dict = {}
    _poll_handle: RepeatHandle | None = None

    def start_polling(self):
        if self._poll_handle:
            return
        self._poll_handle = ps.repeat(5.0, self.fetch_data)

    def stop_polling(self):
        if self._poll_handle:
            self._poll_handle.cancel()
            self._poll_handle = None

    async def fetch_data(self):
        self.data = await api.get_latest()

Timing behavior

  • First execution happens after the initial interval (not immediately)
  • For async callbacks, the next interval starts after the callback completes
  • If the callback takes longer than the interval, executions don't overlap

Error handling

Exceptions in the callback are logged via the event loop's exception handler but don't stop the repeat loop:

def might_fail():
    if random.random() < 0.1:
        raise Exception("Oops!")
    print("Success")

# Keeps running even after exceptions
ps.repeat(1.0, might_fail)

call_soon

Schedule a callback to run as soon as possible on the event loop.

def call_soon(
    fn: Callable[P, Any],
    *args: P.args,
    **kwargs: P.kwargs,
) -> TimerHandleLike | None

Parameters

ParameterTypeDefaultDescription
fnCallablerequiredFunction to call (sync or async)
*argsAnyPositional arguments for fn
**kwargsAnyKeyword arguments for fn

Returns

TimerHandleLike or None if scheduling failed (e.g., during tests without an event loop).

Example

def schedule_update():
    # Run on next event loop iteration
    ps.call_soon(state.refresh)

This is useful when you need to defer execution to the next event loop iteration without a specific delay.


create_task

Create a tracked asyncio task.

def create_task(
    coroutine: Awaitable[T],
    *,
    name: str | None = None,
    on_done: Callable[[asyncio.Task[T]], None] | None = None,
) -> asyncio.Task[T]

Parameters

ParameterTypeDefaultDescription
coroutineAwaitable[T]requiredCoroutine to run
namestr | NoneNoneTask name for debugging
on_doneCallableNoneCallback when task completes

Returns

asyncio.Task[T] that's tracked by the current registry.

Example

from pulse.scheduling import create_task

class BackgroundWorkState(ps.State):
    status: str = "idle"

    def start_work(self):
        self.status = "running"
        create_task(self._do_work(), name="background-work")

    async def _do_work(self):
        await asyncio.sleep(5)
        self.status = "done"

Why use create_task?

Unlike asyncio.create_task(), tasks created with create_task are:

  • Tracked by the render session or app registry
  • Cancelled automatically when the session closes or app shuts down
  • Cleaned up from the registry when they complete

This prevents orphaned tasks and resource leaks.


create_future

Create an asyncio Future on the main event loop.

def create_future() -> asyncio.Future[Any]

Returns

asyncio.Future[Any] created on the main event loop.

Example

from pulse.scheduling import create_future

class PromiseState(ps.State):
    result: str = ""

    async def wait_for_signal(self):
        future = create_future()
        # Some external code will resolve this
        register_callback(future.set_result)
        self.result = await future

Automatic cleanup

All scheduling primitives are scoped to the current render session (if inside a component render) or the app (if outside). When the session closes or app shuts down:

  • Pending later callbacks are cancelled
  • repeat loops are stopped
  • Tasks created with create_task are cancelled

This happens automatically—you don't need to manually track and cancel timers in cleanup functions.

@ps.component
def PollingComponent():
    with ps.init():
        state = PollingState()
        # This repeat is automatically cancelled when the component unmounts
        ps.repeat(5.0, state.refresh)

    return ps.div(f"Data: {state.data}")

Thread safety

These functions work from any thread. If called from a background thread, they schedule work on the main event loop using anyio.from_thread.run().

import threading

def background_thread():
    # Works from any thread
    ps.later(1.0, lambda: print("Hello from main loop"))

threading.Thread(target=background_thread).start()

Untracking

Callbacks run with an untracked reactive context to avoid accidentally creating reactive dependencies:

class State(ps.State):
    count: int = 0

    @ps.effect
    def setup_timer(self):
        # The callback reads self.count but doesn't create a dependency
        # This effect won't re-run when count changes
        ps.later(1.0, lambda: print(f"Count is {self.count}"))

See also

On this page