Advanced Patterns
Chat with Channels
Build real-time features using bidirectional messaging between your Python server and the JavaScript client.
The problem
You need server-initiated updates that go beyond the normal render cycle - live notifications, chat messages, or collaborative features where the server needs to push data to the client.
Solution
Create a channel on the server and use usePulseChannel on the client to send and receive messages.
Server side
import pulse as ps
from dataclasses import dataclass
@dataclass
class Message:
user: str
text: str
class ChatState(ps.State):
messages: list[Message]
channel: ps.Channel
def __init__(self):
self.messages = []
self.channel = ps.channel("chat")
# Register handler for client messages
self.channel.on("send_message", self._on_message)
def _on_message(self, payload):
"""Handle incoming message from client."""
msg = Message(user=payload["user"], text=payload["text"])
self.messages.append(msg)
# Broadcast to all connected clients
self.channel.emit("new_message", {"user": msg.user, "text": msg.text})
def send_notification(self, text: str):
"""Send a server-initiated message."""
self.channel.emit("notification", {"text": text})
@ps.component
def ChatRoom():
with ps.init():
state = ChatState()
return ps.div(
ps.h2("Chat"),
ps.ul([ps.li(f"{m.user}: {m.text}", key=i) for i, m in enumerate(state.messages)]),
# Pass channel ID to client component
ChatClient(channelId=state.channel.id),
)Client side
from pulse.js.pulse import usePulseChannel
useState = ps.Import("useState", "react")
useEffect = ps.Import("useEffect", "react")
@ps.javascript(jsx=True)
def ChatClient(*, channelId: str):
bridge = usePulseChannel(channelId)
messages, setMessages = useState([])
inputText, setInputText = useState("")
# Subscribe to server events
def subscribe():
def on_new_message(payload):
setMessages(lambda prev: [*prev, payload])
def on_notification(payload):
# Handle server notifications
print("Server says:", payload["text"])
off1 = bridge.on("new_message", on_new_message)
off2 = bridge.on("notification", on_notification)
def cleanup():
off1()
off2()
return cleanup
useEffect(subscribe, [bridge])
# Send message to server
def send():
if inputText:
bridge.emit("send_message", {"user": "me", "text": inputText})
setInputText("")
return ps.div()[
ps.ul()[
[ps.li(f"{m['user']}: {m['text']}", key=i) for i, m in enumerate(messages)]
],
ps.input(value=inputText, onChange=lambda e: setInputText(e.target.value)),
ps.button("Send", onClick=send),
]How it works
ps.channel()creates a bidirectional channel scoped to the current user sessionchannel.on(event, handler)registers a handler for client eventschannel.emit(event, payload)sends fire-and-forget messages to the clientusePulseChannel(id)connects the client to the channel- Channels are automatically cleaned up when the component unmounts
Request-response pattern
For operations where the server needs a response from the client:
# Server: request data from client
async def get_client_location(channel: ps.Channel):
try:
response = await channel.request("get_location", timeout=5.0)
return response # {"lat": 40.7, "lng": -74.0}
except ps.ChannelTimeout:
return None# Client: respond to server requests
def subscribe():
def on_get_location(payload):
# Return value goes back to server
return {"lat": 40.7, "lng": -74.0}
off = bridge.on("get_location", on_get_location)
return off