Streaming¶
Maven supports token-by-token streaming when both the LLM provider returns chunks and the destination channel implements channels.StreamChannel. Other channels collapse the stream into a single send.
Capability check¶
type StreamChannel interface {
Channel
SendStream(ctx context.Context, chatID string, metadata map[string]any, events <-chan api.StreamEvent) error
}
| Channel | Streaming |
|---|---|
| Telegram | Yes, when channels.telegram.streaming = true. Private DMs use Bot API sendMessageDraft; groups/supergroups fall back to placeholder + editMessageText. |
| Web UI | Yes, by default. Each delta is a WebSocket text frame; stream_done terminates. |
| Web UI voice | Yes; deltas drive sentence segmentation → TTS → PCM frames over WebSocket. |
| Feishu, WeCom, WhatsApp, Matrix | No. The pipeline collapses to a single Send. |
The pipeline forces sync (useStream = false) when bus.RoutingHints.ForceSync is true — slash flows that need a single post-action commit (compact, /new) set this.
Flow¶
sequenceDiagram
participant P as Pipeline
participant Bus as MessageBus
participant RT as Runtime
participant CH as StreamChannel
P->>Bus: OnStreamBegin(hints)
P->>RT: RunStream(prompt, sessionID)
RT-->>P: stream events channel
P->>CH: SendStream(chatID, metadata, events)
loop deltas
RT-->>CH: api.StreamEvent
CH-->>User: incremental update
end
CH-->>P: nil (or err)
P->>Bus: OnStreamEnd(hints, err)
The pipeline never reads stream events itself — it forwards the channel directly. SendStream is responsible for backpressure, batching, and formatting.
Stream events¶
Maven inherits the agentsdk event model. The channels that consume them care about a small subset:
| Event | Use |
|---|---|
EventIterationStart |
Reset content buffer; bump iteration counter on status cards. |
EventContentBlockStart (tool_use) |
Begin capturing tool input deltas for summary. |
EventContentBlockDelta (text) |
Append to user-visible content buffer. |
EventContentBlockDelta (input_json_delta) |
Buffer tool argument JSON for human-readable summary on EventToolExecutionStart. |
EventToolExecutionStart |
Add a row to the Telegram status card or equivalent. |
EventToolExecutionOutput |
Stream subprocess output into the tool row (e.g. ACP DelegateTask). |
EventToolExecutionResult |
Mark the row done or failed. |
EventError |
Surface streaming error to the user. |
Telegram status card¶
internal/plugins/channel/telegram renders a compact HTML "status card" message that updates in real time during a turn. The card shows:
- A bolded "Working..." header.
- The current iteration number.
- Each tool call with an emoji status icon and a short input summary.
- Streamed tool subprocess output (for
DelegateTask) inside<pre>blocks, truncated to 1200 runes. - Elapsed time.
A separate "content" message (private chats use Bot API sendMessageDraft) holds the model's textual reply as it grows. On stream end, the final report is sent as a normal sendMessage, and the intermediate status/content messages are deleted unless final send fails.
Web UI text frames¶
The Web channel sends three JSON frame types over /ws:
{"type": "stream", "delta": "Hello, "}
{"type": "stream", "delta": "world."}
{"type": "stream_done"}
Errors short-circuit with a synthetic message frame.
Web UI voice frames¶
The voice transport wraps a kernel/voice.Session. Each api.EventContentBlockDelta text chunk feeds a sentence segmenter (kernel/voice.TakeCompleteSentences); whole sentences become TTS requests; raw PCM (signed 16-bit LE, mono, 24 kHz) is written as binary WebSocket frames. A one-byte 0x00 sentinel tells the browser to flush the audio queue when the user starts speaking again (voice activity detection on the client triggers sess.Interrupt()).
See Guides: Voice.
Backpressure and cancellation¶
- The bus uses strict blocking backpressure for both inbound and outbound enqueue.
PublishInbound/PublishOutboundblock until the buffer has space, the context is canceled, or the bus is closed. - The pipeline holds
turnMu.RLockfor the entire turn. A streaming turn that hangs blocks gateway shutdown until its context cancels. - The
StreamDelegate(registered on the bus) seesOnStreamBegin/OnStreamEndfor every streaming turn — including failures. Use it for tracing or counters.
Error path¶
If SendStream returns an error, the pipeline:
- Emits
events.EventStreamFailedwith channel, chat, and error. - Pulses
health.SignalDeliveryFailed. - Logs at error level.
The user does not receive a sync fallback for stream failures — partial output is already visible in chat. The error is observable to the operator only.