Pulse

Reactive Core

Signals, computeds, effects, batching

Signal

A reactive value container. Reading registers a dependency; writing notifies observers.

class Signal(Generic[T]):
    def __init__(self, value: T, name: str | None = None)

Parameters

ParameterTypeDefaultDescription
valueTrequiredInitial value
namestr | NoneNoneDebug name

Attributes

AttributeTypeDescription
valueTCurrent value (direct access, no tracking)
namestr | NoneDebug name
last_changeintEpoch when last changed

Methods

read() -> T

Read the value, registering a dependency in the current scope.

__call__() -> T

Alias for read().

signal = Signal(5)
value = signal()  # Same as signal.read()

write(value: T) -> None

Update the value and notify observers. No-op if value equals current.

unwrap() -> T

Alias for read().

Example

count = Signal(0, name="count")
print(count())     # 0 (registers dependency)
count.write(1)     # Updates and notifies observers
print(count.value) # 1 (no dependency tracking)

Computed

A derived value that auto-updates when dependencies change.

class Computed(Generic[T]):
    def __init__(self, fn: Callable[..., T], name: str | None = None)

Parameters

ParameterTypeDefaultDescription
fnCallable[..., T]requiredFunction computing the value
namestr | NoneNoneDebug name

Attributes

AttributeTypeDescription
valueTCached computed value
namestr | NoneDebug name
dirtyboolWhether recompute is needed
last_changeintEpoch when value last changed

Methods

read() -> T

Get the computed value, recomputing if dirty, and register a dependency.

__call__() -> T

Alias for read().

unwrap() -> T

Alias for read().

Behavior

  • Lazy evaluation: only recomputes when read and dirty
  • Accepts optional prev_value argument for incremental computation
  • Throws if a signal is written inside the computed function

Example

count = Signal(5)

doubled = Computed(lambda: count() * 2)
print(doubled())  # 10

count.write(10)
print(doubled())  # 20

Effect

Runs a function when dependencies change.

class Effect(Disposable):
    def __init__(
        self,
        fn: EffectFn,
        name: str | None = None,
        immediate: bool = False,
        lazy: bool = False,
        on_error: Callable[[Exception], None] | None = None,
        deps: list[Signal[Any] | Computed[Any]] | None = None,
        interval: float | None = None,
    )

Type Aliases

EffectCleanup = Callable[[], None]
EffectFn = Callable[[], EffectCleanup | None]

Parameters

ParameterTypeDefaultDescription
fnEffectFnrequiredEffect function
namestr | NoneNoneDebug name
immediateboolFalseRun synchronously when scheduled
lazyboolFalseDon't run on creation
on_errorCallable[[Exception], None] | NoneNoneError handler
depslist[Signal | Computed] | NoneNoneExplicit deps (disables auto-tracking)
intervalfloat | NoneNoneRe-run interval in seconds

Methods

run() -> None

Execute the effect immediately.

schedule() -> None

Schedule the effect to run in the current batch.

flush() -> None

If scheduled in a batch, remove and run immediately.

cancel(cancel_interval: bool = True) -> None

Cancel the scheduled effect. Optionally cancel interval timer.

pause() -> None

Pause the effect; it won't run when dependencies change.

resume() -> None

Resume a paused effect and schedule it to run.

dispose() -> None

Clean up the effect, run cleanup function, remove from dependencies.

Example

count = Signal(0)

def log_count():
    print(f"Count: {count()}")
    return lambda: print("Cleanup")

effect = Effect(log_count)
count.write(1)  # Effect runs after batch flush
effect.dispose()

AsyncEffect

Async version of Effect for coroutine functions.

class AsyncEffect(Effect):
    def __init__(
        self,
        fn: AsyncEffectFn,
        name: str | None = None,
        lazy: bool = False,
        on_error: Callable[[Exception], None] | None = None,
        deps: list[Signal[Any] | Computed[Any]] | None = None,
        interval: float | None = None,
    )

Type Alias

AsyncEffectFn = Callable[[], Awaitable[EffectCleanup | None]]

Additional Methods

run() -> asyncio.Task[Any]

Start the async effect, cancelling any previous run. Returns the task.

wait() -> None

Wait for the current task to complete.

Behavior

  • Does not use batching; cancels and restarts on each dependency change
  • immediate parameter not supported (raises if passed)

Batch

Groups reactive updates to run effects once after all writes.

class Batch:
    def __init__(self, effects: list[Effect] | None = None, name: str | None = None)

Methods

register_effect(effect: Effect) -> None

Add an effect to run when the batch flushes.

flush() -> None

Run all scheduled effects.

Usage as Context Manager

count = Signal(0)

with Batch() as batch:
    count.write(1)
    count.write(2)
    count.write(3)
# Effects run once here with final value 3

Global Batch

By default, effects are scheduled in a global batch that flushes on the next event loop iteration.


Scope

Tracks dependencies and effects created within a context.

class Scope:
    def __init__(self)

Attributes

AttributeTypeDescription
depsdict[Signal | Computed, int]Tracked dependencies
effectslist[Effect]Effects created in this scope

Usage

with Scope() as scope:
    value = signal()  # Dependency tracked
    effect = Effect(fn)  # Effect registered
print(scope.deps)    # {signal: last_change}
print(scope.effects) # [effect]

Untrack

A scope that disables dependency tracking.

class Untrack(Scope)

Usage

with Untrack():
    value = signal()  # No dependency registered

IgnoreBatch

A batch that ignores effect registrations and does nothing when flushed.

class IgnoreBatch(Batch)

Used internally during State initialization to prevent effects from running during setup.


ReactiveContext

Composite context holding epoch, batch, and scope.

class ReactiveContext:
    def __init__(
        self,
        epoch: Epoch | None = None,
        batch: Batch | None = None,
        scope: Scope | None = None,
        on_effect_error: Callable[[Effect, Exception], None] | None = None,
    )

Attributes

AttributeTypeDescription
epochEpochGlobal version counter
batchBatchCurrent batch for effect scheduling
scopeScope | NoneCurrent scope for dependency tracking
on_effect_errorCallable | NoneGlobal effect error handler

Usage as Context Manager

ctx = ReactiveContext()
with ctx:
    # All reactive operations use this context
    pass

Helper Functions

flush_effects() -> None

Flush the current batch, running all scheduled effects.

count = Signal(0)
Effect(lambda: print(count()))
count.write(1)
flush_effects()  # Prints: 1

epoch() -> int

Get the current reactive epoch (version counter).

increment_epoch() -> None

Increment the reactive epoch. Called automatically on signal writes.

On this page