Reactive Core
Signals, computeds, effects, batching
Signal
A reactive value container. Reading registers a dependency; writing notifies observers.
class Signal(Generic[T]):
def __init__(self, value: T, name: str | None = None)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
value | T | required | Initial value |
name | str | None | None | Debug name |
Attributes
| Attribute | Type | Description |
|---|---|---|
value | T | Current value (direct access, no tracking) |
name | str | None | Debug name |
last_change | int | Epoch when last changed |
Methods
read() -> T
Read the value, registering a dependency in the current scope.
__call__() -> T
Alias for read().
signal = Signal(5)
value = signal() # Same as signal.read()write(value: T) -> None
Update the value and notify observers. No-op if value equals current.
unwrap() -> T
Alias for read().
Example
count = Signal(0, name="count")
print(count()) # 0 (registers dependency)
count.write(1) # Updates and notifies observers
print(count.value) # 1 (no dependency tracking)Computed
A derived value that auto-updates when dependencies change.
class Computed(Generic[T]):
def __init__(self, fn: Callable[..., T], name: str | None = None)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
fn | Callable[..., T] | required | Function computing the value |
name | str | None | None | Debug name |
Attributes
| Attribute | Type | Description |
|---|---|---|
value | T | Cached computed value |
name | str | None | Debug name |
dirty | bool | Whether recompute is needed |
last_change | int | Epoch when value last changed |
Methods
read() -> T
Get the computed value, recomputing if dirty, and register a dependency.
__call__() -> T
Alias for read().
unwrap() -> T
Alias for read().
Behavior
- Lazy evaluation: only recomputes when read and dirty
- Accepts optional
prev_valueargument for incremental computation - Throws if a signal is written inside the computed function
Example
count = Signal(5)
doubled = Computed(lambda: count() * 2)
print(doubled()) # 10
count.write(10)
print(doubled()) # 20Effect
Runs a function when dependencies change.
class Effect(Disposable):
def __init__(
self,
fn: EffectFn,
name: str | None = None,
immediate: bool = False,
lazy: bool = False,
on_error: Callable[[Exception], None] | None = None,
deps: list[Signal[Any] | Computed[Any]] | None = None,
interval: float | None = None,
)Type Aliases
EffectCleanup = Callable[[], None]
EffectFn = Callable[[], EffectCleanup | None]Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
fn | EffectFn | required | Effect function |
name | str | None | None | Debug name |
immediate | bool | False | Run synchronously when scheduled |
lazy | bool | False | Don't run on creation |
on_error | Callable[[Exception], None] | None | None | Error handler |
deps | list[Signal | Computed] | None | None | Explicit deps (disables auto-tracking) |
interval | float | None | None | Re-run interval in seconds |
Methods
run() -> None
Execute the effect immediately.
schedule() -> None
Schedule the effect to run in the current batch.
flush() -> None
If scheduled in a batch, remove and run immediately.
cancel(cancel_interval: bool = True) -> None
Cancel the scheduled effect. Optionally cancel interval timer.
pause() -> None
Pause the effect; it won't run when dependencies change.
resume() -> None
Resume a paused effect and schedule it to run.
dispose() -> None
Clean up the effect, run cleanup function, remove from dependencies.
Example
count = Signal(0)
def log_count():
print(f"Count: {count()}")
return lambda: print("Cleanup")
effect = Effect(log_count)
count.write(1) # Effect runs after batch flush
effect.dispose()AsyncEffect
Async version of Effect for coroutine functions.
class AsyncEffect(Effect):
def __init__(
self,
fn: AsyncEffectFn,
name: str | None = None,
lazy: bool = False,
on_error: Callable[[Exception], None] | None = None,
deps: list[Signal[Any] | Computed[Any]] | None = None,
interval: float | None = None,
)Type Alias
AsyncEffectFn = Callable[[], Awaitable[EffectCleanup | None]]Additional Methods
run() -> asyncio.Task[Any]
Start the async effect, cancelling any previous run. Returns the task.
wait() -> None
Wait for the current task to complete.
Behavior
- Does not use batching; cancels and restarts on each dependency change
immediateparameter not supported (raises if passed)
Batch
Groups reactive updates to run effects once after all writes.
class Batch:
def __init__(self, effects: list[Effect] | None = None, name: str | None = None)Methods
register_effect(effect: Effect) -> None
Add an effect to run when the batch flushes.
flush() -> None
Run all scheduled effects.
Usage as Context Manager
count = Signal(0)
with Batch() as batch:
count.write(1)
count.write(2)
count.write(3)
# Effects run once here with final value 3Global Batch
By default, effects are scheduled in a global batch that flushes on the next event loop iteration.
Scope
Tracks dependencies and effects created within a context.
class Scope:
def __init__(self)Attributes
| Attribute | Type | Description |
|---|---|---|
deps | dict[Signal | Computed, int] | Tracked dependencies |
effects | list[Effect] | Effects created in this scope |
Usage
with Scope() as scope:
value = signal() # Dependency tracked
effect = Effect(fn) # Effect registered
print(scope.deps) # {signal: last_change}
print(scope.effects) # [effect]Untrack
A scope that disables dependency tracking.
class Untrack(Scope)Usage
with Untrack():
value = signal() # No dependency registeredIgnoreBatch
A batch that ignores effect registrations and does nothing when flushed.
class IgnoreBatch(Batch)Used internally during State initialization to prevent effects from running during setup.
ReactiveContext
Composite context holding epoch, batch, and scope.
class ReactiveContext:
def __init__(
self,
epoch: Epoch | None = None,
batch: Batch | None = None,
scope: Scope | None = None,
on_effect_error: Callable[[Effect, Exception], None] | None = None,
)Attributes
| Attribute | Type | Description |
|---|---|---|
epoch | Epoch | Global version counter |
batch | Batch | Current batch for effect scheduling |
scope | Scope | None | Current scope for dependency tracking |
on_effect_error | Callable | None | Global effect error handler |
Usage as Context Manager
ctx = ReactiveContext()
with ctx:
# All reactive operations use this context
passHelper Functions
flush_effects() -> None
Flush the current batch, running all scheduled effects.
count = Signal(0)
Effect(lambda: print(count()))
count.write(1)
flush_effects() # Prints: 1epoch() -> int
Get the current reactive epoch (version counter).
increment_epoch() -> None
Increment the reactive epoch. Called automatically on signal writes.