Scheduling
Timer and task scheduling primitives
Pulse provides scheduling primitives for running code after delays, at intervals, or as background tasks. These are automatically tracked and cleaned up when the render session or app closes.
All scheduling functions are available from pulse.scheduling:
from pulse.scheduling import later, repeat, create_task, create_future, call_soonFor common cases, later and repeat are also available directly on the pulse module:
import pulse as ps
ps.later(1.0, do_something)
ps.repeat(5.0, poll_data)later
Schedule a callback to run after a delay.
def later(
delay: float,
fn: Callable[P, Any],
*args: P.args,
**kwargs: P.kwargs,
) -> TimerHandleLikeParameters
| Parameter | Type | Default | Description |
|---|---|---|---|
delay | float | required | Seconds to wait before running |
fn | Callable | required | Function to call (sync or async) |
*args | Any | Positional arguments for fn | |
**kwargs | Any | Keyword arguments for fn |
Returns
TimerHandleLike with .cancel() method to cancel the scheduled callback.
Example
import pulse as ps
class NotificationState(ps.State):
message: str = ""
visible: bool = False
def show(self, text: str):
self.message = text
self.visible = True
# Auto-hide after 3 seconds
ps.later(3.0, self.hide)
def hide(self):
self.visible = FalseAsync callbacks
If the callback is async or returns a coroutine, it's automatically awaited:
async def fetch_and_update():
data = await api.fetch()
state.data = data
# Works with async functions
ps.later(5.0, fetch_and_update)
# Also works with sync functions that return coroutines
def refresh():
return fetch_and_update()
ps.later(5.0, refresh)Cancelling
Call .cancel() on the returned handle to prevent the callback from running:
class TypeaheadState(ps.State):
query: str = ""
_debounce_handle: TimerHandleLike | None = None
def on_input(self, value: str):
self.query = value
# Cancel previous debounce
if self._debounce_handle:
self._debounce_handle.cancel()
# Schedule new search
self._debounce_handle = ps.later(0.3, self.search)
async def search(self):
results = await api.search(self.query)
self.results = resultsrepeat
Run a callback repeatedly at a fixed interval.
def repeat(
interval: float,
fn: Callable[P, Any],
*args: P.args,
**kwargs: P.kwargs,
) -> RepeatHandleParameters
| Parameter | Type | Default | Description |
|---|---|---|---|
interval | float | required | Seconds between executions |
fn | Callable | required | Function to call (sync or async) |
*args | Any | Positional arguments for fn | |
**kwargs | Any | Keyword arguments for fn |
Returns
RepeatHandle with .cancel() method to stop the repeating callback.
Example
import pulse as ps
class LiveDataState(ps.State):
data: dict = {}
_poll_handle: RepeatHandle | None = None
def start_polling(self):
if self._poll_handle:
return
self._poll_handle = ps.repeat(5.0, self.fetch_data)
def stop_polling(self):
if self._poll_handle:
self._poll_handle.cancel()
self._poll_handle = None
async def fetch_data(self):
self.data = await api.get_latest()Timing behavior
- First execution happens after the initial interval (not immediately)
- For async callbacks, the next interval starts after the callback completes
- If the callback takes longer than the interval, executions don't overlap
Error handling
Exceptions in the callback are logged via the event loop's exception handler but don't stop the repeat loop:
def might_fail():
if random.random() < 0.1:
raise Exception("Oops!")
print("Success")
# Keeps running even after exceptions
ps.repeat(1.0, might_fail)call_soon
Schedule a callback to run as soon as possible on the event loop.
def call_soon(
fn: Callable[P, Any],
*args: P.args,
**kwargs: P.kwargs,
) -> TimerHandleLike | NoneParameters
| Parameter | Type | Default | Description |
|---|---|---|---|
fn | Callable | required | Function to call (sync or async) |
*args | Any | Positional arguments for fn | |
**kwargs | Any | Keyword arguments for fn |
Returns
TimerHandleLike or None if scheduling failed (e.g., during tests without an event loop).
Example
def schedule_update():
# Run on next event loop iteration
ps.call_soon(state.refresh)This is useful when you need to defer execution to the next event loop iteration without a specific delay.
create_task
Create a tracked asyncio task.
def create_task(
coroutine: Awaitable[T],
*,
name: str | None = None,
on_done: Callable[[asyncio.Task[T]], None] | None = None,
) -> asyncio.Task[T]Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
coroutine | Awaitable[T] | required | Coroutine to run |
name | str | None | None | Task name for debugging |
on_done | Callable | None | Callback when task completes |
Returns
asyncio.Task[T] that's tracked by the current registry.
Example
from pulse.scheduling import create_task
class BackgroundWorkState(ps.State):
status: str = "idle"
def start_work(self):
self.status = "running"
create_task(self._do_work(), name="background-work")
async def _do_work(self):
await asyncio.sleep(5)
self.status = "done"Why use create_task?
Unlike asyncio.create_task(), tasks created with create_task are:
- Tracked by the render session or app registry
- Cancelled automatically when the session closes or app shuts down
- Cleaned up from the registry when they complete
This prevents orphaned tasks and resource leaks.
create_future
Create an asyncio Future on the main event loop.
def create_future() -> asyncio.Future[Any]Returns
asyncio.Future[Any] created on the main event loop.
Example
from pulse.scheduling import create_future
class PromiseState(ps.State):
result: str = ""
async def wait_for_signal(self):
future = create_future()
# Some external code will resolve this
register_callback(future.set_result)
self.result = await futureAutomatic cleanup
All scheduling primitives are scoped to the current render session (if inside a component render) or the app (if outside). When the session closes or app shuts down:
- Pending
latercallbacks are cancelled repeatloops are stopped- Tasks created with
create_taskare cancelled
This happens automatically—you don't need to manually track and cancel timers in cleanup functions.
@ps.component
def PollingComponent():
with ps.init():
state = PollingState()
# This repeat is automatically cancelled when the component unmounts
ps.repeat(5.0, state.refresh)
return ps.div(f"Data: {state.data}")Thread safety
These functions work from any thread. If called from a background thread, they schedule work on the main event loop using anyio.from_thread.run().
import threading
def background_thread():
# Works from any thread
ps.later(1.0, lambda: print("Hello from main loop"))
threading.Thread(target=background_thread).start()Untracking
Callbacks run with an untracked reactive context to avoid accidentally creating reactive dependencies:
class State(ps.State):
count: int = 0
@ps.effect
def setup_timer(self):
# The callback reads self.count but doesn't create a dependency
# This effect won't re-run when count changes
ps.later(1.0, lambda: print(f"Count is {self.count}"))See also
- Effects - reactive side effects
- Async - async patterns in Pulse
- Hooks reference - component lifecycle hooks