Pulse

Channels

Realtime channels

Bidirectional communication channels for real-time messaging between server and client.

Functions

channel

def channel(identifier: str | None = None) -> Channel

Create a channel bound to the current render session.

Parameters:

  • identifier - Optional channel ID. Auto-generated UUID if not provided.

Returns: Channel instance.

Raises: RuntimeError if called outside an active render session.

import pulse as ps

@ps.component
def ChatRoom():
    ch = ps.channel("chat")

    @ch.on("message")
    def handle_message(payload):
        # Handle incoming message from client
        ch.emit("broadcast", payload)

    return ps.div("Chat room")

Classes

Channel

Bidirectional communication channel bound to a render session.

Attributes

AttributeTypeDescription
idstrChannel identifier
render_idstrAssociated render session ID
session_idstrAssociated user session ID
route_pathstr | NoneRoute path this channel is bound to
closedboolWhether the channel is closed

Methods

on
def on(self, event: str, handler: ChannelHandler) -> Callable[[], None]

Register a handler for an incoming event.

Parameters:

  • event - Event name to listen for.
  • handler - Callback function (payload: Any) -> Any | Awaitable[Any].

Returns: Callable that removes the handler when invoked.

Raises: ChannelClosed if channel is closed.

ch = ps.channel()

def cleanup():
    remove_handler()

remove_handler = ch.on("data", lambda payload: print(payload))
emit
def emit(self, event: str, payload: Any = None) -> None

Send a fire-and-forget event to the client.

Parameters:

  • event - Event name.
  • payload - Data to send (optional).

Raises: ChannelClosed if channel is closed.

ch.emit("notification", {"message": "Hello"})
request
async def request(
    self,
    event: str,
    payload: Any = None,
    *,
    timeout: float | None = None,
) -> Any

Send a request to the client and await the response.

Parameters:

  • event - Event name.
  • payload - Data to send (optional).
  • timeout - Timeout in seconds (optional).

Returns: Response payload from client.

Raises:

  • ChannelClosed if channel is closed.
  • ChannelTimeout if request times out.
result = await ch.request("get_value", timeout=5.0)
close
def close(self) -> None

Close the channel and clean up resources.

Exceptions

ChannelClosed

class ChannelClosed(RuntimeError)

Raised when interacting with a channel that has been closed.

ChannelTimeout

class ChannelTimeout(asyncio.TimeoutError)

Raised when a channel request times out waiting for a response.

Type Aliases

ChannelHandler

ChannelHandler = Callable[[Any], Any | Awaitable[Any]]

Handler function for channel events. Can be sync or async.

On this page