Channels
Realtime channels
Bidirectional communication channels for real-time messaging between server and client.
Functions
channel
def channel(identifier: str | None = None) -> ChannelCreate a channel bound to the current render session.
Parameters:
identifier- Optional channel ID. Auto-generated UUID if not provided.
Returns: Channel instance.
Raises: RuntimeError if called outside an active render session.
import pulse as ps
@ps.component
def ChatRoom():
ch = ps.channel("chat")
@ch.on("message")
def handle_message(payload):
# Handle incoming message from client
ch.emit("broadcast", payload)
return ps.div("Chat room")Classes
Channel
Bidirectional communication channel bound to a render session.
Attributes
| Attribute | Type | Description |
|---|---|---|
id | str | Channel identifier |
render_id | str | Associated render session ID |
session_id | str | Associated user session ID |
route_path | str | None | Route path this channel is bound to |
closed | bool | Whether the channel is closed |
Methods
on
def on(self, event: str, handler: ChannelHandler) -> Callable[[], None]Register a handler for an incoming event.
Parameters:
event- Event name to listen for.handler- Callback function(payload: Any) -> Any | Awaitable[Any].
Returns: Callable that removes the handler when invoked.
Raises: ChannelClosed if channel is closed.
ch = ps.channel()
def cleanup():
remove_handler()
remove_handler = ch.on("data", lambda payload: print(payload))emit
def emit(self, event: str, payload: Any = None) -> NoneSend a fire-and-forget event to the client.
Parameters:
event- Event name.payload- Data to send (optional).
Raises: ChannelClosed if channel is closed.
ch.emit("notification", {"message": "Hello"})request
async def request(
self,
event: str,
payload: Any = None,
*,
timeout: float | None = None,
) -> AnySend a request to the client and await the response.
Parameters:
event- Event name.payload- Data to send (optional).timeout- Timeout in seconds (optional).
Returns: Response payload from client.
Raises:
ChannelClosedif channel is closed.ChannelTimeoutif request times out.
result = await ch.request("get_value", timeout=5.0)close
def close(self) -> NoneClose the channel and clean up resources.
Exceptions
ChannelClosed
class ChannelClosed(RuntimeError)Raised when interacting with a channel that has been closed.
ChannelTimeout
class ChannelTimeout(asyncio.TimeoutError)Raised when a channel request times out waiting for a response.
Type Aliases
ChannelHandler
ChannelHandler = Callable[[Any], Any | Awaitable[Any]]Handler function for channel events. Can be sync or async.