Lesson 01 — Chat Stream Lifecycle
Grounding: @cloudflare/think,
@cloudflare/ai-chat, agents/chat on
main. Goal: build the model needed for #1575.
1. Why this lesson exists
#1575 is not a bug fix you can localise to a single function. It is a parity question: do live observers and reconnecting observers see the same terminal outcome? To answer that, you need a mental model of what a chat turn is as it moves through three layers in parallel:
- the wire (WebSocket frames a client receives),
-
the durable store
(
cf_ai_chat_stream_metadataandcf_ai_chat_stream_chunks), -
and the programmatic surfaces
(
SaveMessagesResult,onChatResponse, agent-tool child run rows).
#1567 already made the programmatic surfaces honest about errors. #1575 is about making the wire and the durable store agree.
2. Cast of characters
Before any diagrams, pin these down. They reappear constantly.
| Name | What it is | Where it lives |
|---|---|---|
Think / AIChatAgent |
The two opinionated chat-agent base classes. They share the same protocol shape and most of the same lifecycle. |
packages/think/src/think.ts,
packages/ai-chat/src/index.ts
|
_streamResult |
Consumes a UI message stream chunk-by-chunk, broadcasts each
chunk over WebSocket, stores it for replay, and decides
whether the turn ended completed,
error, or aborted.
|
think.ts:5675 and the equivalent
_streamSSEReply in AIChat
|
ResumableStream |
Standalone class that owns chunk persistence, terminal status, replay, and stale-stream cleanup. |
packages/agents/src/chat/resumable-stream.ts
|
cf_ai_chat_stream_metadata /
cf_ai_chat_stream_chunks
|
Two SQLite tables in the agent's Durable Object storage. The first carries one row per stream with a status. The second carries indexed chunk bodies. | resumable-stream.ts:103–117 |
cf_agent_use_chat_response |
The wire frame the agent broadcasts for every streamed chunk.
Carries body, done,
error, replay.
|
Constants in agents/chat/protocol.ts; sent from
_broadcastChat / _broadcastChatMessage
|
cf_agent_stream_resume_request /
cf_agent_stream_resuming /
cf_agent_stream_resume_ack /
cf_agent_stream_resume_none
|
The four-frame reconnect handshake. Client asks if there is anything to resume, server either says "yes, here is the request id" or "no", client acks, server replays. |
think.ts:5113,
ai-chat/src/index.ts:900
|
SaveMessagesResult |
The single canonical result shape for any programmatic chat
turn entry point. Status is one of
completed, error,
aborted, skipped.
|
packages/agents/src/chat/lifecycle.ts:75
|
onChatResponse
|
The terminal lifecycle hook fired exactly once per turn. Receives the assistant message and the same status as the programmatic result. |
lifecycle.ts:23; fired via
_fireResponseHook
|
startAgentToolRun |
The sub-agent's "run as a tool" entry point. Wraps
saveMessages with a child-run row and a tailer
registry.
|
think.ts:2892,
ai-chat/src/index.ts:2310
|
3. A happy turn, end to end
Walk through a single successful chat turn before introducing any failure mode. Everything else is a deviation from this picture.
3.1 The pipeline
flowchart LR client[Browser useChat] ws[(WebSocket)] agent[Think or AIChatAgent] stream["_streamResult / _streamSSEReply"] rs[ResumableStream] meta[(cf_ai_chat_stream_metadata)] chunks[(cf_ai_chat_stream_chunks)] hook[onChatResponse] client -- cf_agent_use_chat_request --> ws --> agent agent --> stream stream -- toUIMessageStream --> stream stream -- broadcast cf_agent_use_chat_response --> ws --> client stream --> rs rs -- start --> meta rs -- storeChunk batched --> chunks stream -- complete --> rs rs -- status = 'completed' --> meta stream -- fire --> hook
3.2 Lifecycle, one frame at a time
sequenceDiagram
autonumber
participant C as Client
participant A as Agent (Think)
participant S as _streamResult
participant R as ResumableStream
participant DB as SQLite
C->>A: cf_agent_use_chat_request (POST body)
A->>S: invoke with requestId
S->>R: start(requestId)
R->>DB: INSERT cf_ai_chat_stream_metadata status='streaming'
loop for each chunk from toUIMessageStream()
S->>R: storeChunk(streamId, body)
R->>DB: buffered INSERT cf_ai_chat_stream_chunks
S->>C: cf_agent_use_chat_response { body, done:false }
end
S->>R: complete(streamId)
R->>DB: UPDATE metadata status='completed'
S->>C: cf_agent_use_chat_response { body:"", done:true }
S->>A: _fireResponseHook(status:"completed")
Key observation
The wire and the durable store are written from the
same loop. That is what makes the happy path easy: there
is one moment when status transitions from
streaming to completed, and one moment
when done:true goes to the wire. Any divergence
between live and replay starts with an extra moment.
4. Four terminal statuses
SaveMessagesResult.status is the single source of
truth. Memorise the matrix below, because #1575 is largely about
making the wire and the durable store reach the same status
as this enum.
| Status | When it happens | Wire signal today | Metadata status | onChatResponse |
|---|---|---|---|---|
| completed | Stream ended naturally. | done:true |
completed |
status:"completed" |
| error |
Thrown error in the stream, OR an in-band
{ type: "error", errorText } chunk.
|
Thrown: done:true, error:true. In-band:
error:true, done:false then a separate
done:true.
|
error |
status:"error", error: populated
|
| aborted | Abort signal fired mid-stream. | done:true (no error flag) |
completed in the table — interesting |
status:"aborted" |
| skipped | Turn was superseded by a CHAT_CLEAR. |
No frame: client clears state separately. | n/a | n/a (hook does not fire) |
Subtlety
In-band errors and thrown errors both end up with metadata
status = 'error', but their wire shape is
different today. That asymmetry is one of the things
#1575 wants resolved.
5. The resumable stream tables
ResumableStream exists so that a client which drops a
WebSocket connection can reconnect and recover whatever it missed
without rerunning the LLM. There are two tables.
cf_ai_chat_stream_metadata
id text primary key
request_id text
status text -- 'streaming' | 'completed' | 'error'
created_at integer
completed_at integer
cf_ai_chat_stream_chunks
id text primary key
stream_id text
body text -- the JSON-encoded UIMessage chunk
chunk_index integer
created_at integer
5.1 Three lifecycle states
stateDiagram-v2 [*] --> streaming: start(requestId) streaming --> completed: complete(streamId) streaming --> error: markError(streamId) completed --> [*] error --> [*]
5.2 The four-frame reconnect handshake
Connecting clients always negotiate. The server proactively says "there is a stream you can resume"; the client decides whether to opt in.
sequenceDiagram
autonumber
participant C as New WS client
participant A as Agent
participant R as ResumableStream
Note over C,A: Case A — stream is still live
A->>C: cf_agent_stream_resuming { id: requestId }
C->>A: cf_agent_stream_resume_ack { id: requestId }
A->>R: replayChunks(connection, requestId)
R-->>C: cf_agent_use_chat_response { body, replay:true } (xN)
R-->>C: cf_agent_use_chat_response { body:"", replay:true, replayComplete:true }
Note over A,R: live stream continues from here
Note over C,A: Case B — stream completed before client reconnected
C->>A: cf_agent_stream_resume_request
A->>R: replayCompletedChunksByRequestId(connection, id)
R-->>C: cf_agent_use_chat_response { body, replay:true } (xN)
R-->>C: cf_agent_use_chat_response { body:"", done:true, replay:true }
Note over C,A: Case C — nothing to resume
C->>A: cf_agent_stream_resume_request
A-->>C: cf_agent_stream_resume_none
The gap
There is no Case D for errored streams. If
status = 'error', the server today falls through to
the "nothing to resume" branch and sends a normal
done:true frame. The reconnecting client sees a
successful (empty) turn even though metadata clearly says it
errored. That is one of the two fixes #1575 needs.
See think.ts:5157 and
ai-chat/src/index.ts:951.
6. Where errors enter the picture
Three places a turn can end with status:"error":
flowchart TD
start([_streamResult begins])
loop{for await chunk}
inband{chunk.type === error?}
thrown{thrown error?}
late{captureOutput &&
result.output rejects?}
ok[status: completed]
err[status: error]
start --> loop
loop -- next chunk --> inband
inband -- yes --> err
inband -- no --> loop
loop -- iterator throws --> thrown
thrown -- yes --> err
loop -- iterator done --> late
late -- yes --> err
late -- no --> ok
6.1 In-band error chunk (the common case)
// think.ts:5720 (paraphrased)
if (action?.type === "error") {
streamError = action.error;
this._emit("message:error", { error: streamError });
this._broadcastChat({
type: MSG_CHAT_RESPONSE,
id: requestId,
body: action.error,
done: false,
error: true,
});
break;
}
// ...later
if (streamError) {
this._resumableStream.markError(streamId);
} else {
this._resumableStream.complete(streamId);
}
this._broadcastChat({ type: MSG_CHAT_RESPONSE, id: requestId, body: "", done: true });
Two wire frames go out: an error:true, done:false
frame carrying the error text, then a normal done:true
frame. The metadata moves to error.
onChatResponse fires with
status:"error". Partial parts before the break are
persisted.
6.2 Thrown stream error
The catch branch in _streamResult sets
streamError from error.message, calls
markError, and (if no done was sent) emits a
done:true, error:true frame in a single shot.
6.3 Structured output rejection
When captureOutput is true, awaiting
result.output after the stream loop can reject. Same
terminal handling, no extra wire frame because the stream is
already done.
Why the wire shape matters
The client transport treats any frame with
error: true as terminal — it errors the
ReadableStream immediately. So in practice in-band
errors work on the wire today. But the trailing
done:true still goes through, and that is the frame
the reconnect path reproduces, which is why reconnecting clients
currently see a "successful" turn.
7. Live vs replay parity
This is the heart of #1575. Walk both observers through the same failing turn and compare what they see.
sequenceDiagram
autonumber
participant L as Live client
participant A as Agent
participant R as ResumableStream
participant N as Reconnecting client
L->>A: cf_agent_use_chat_request
A->>R: start
A->>R: storeChunk "Hello"
A->>L: { body:"Hello", done:false }
A->>R: storeChunk "Wor"
A->>L: { body:"Wor", done:false }
Note over A: provider emits { type:"error", errorText:"boom" }
A->>L: { body:"boom", done:false, error:true } ← L's stream errors
A->>R: markError
A->>L: { body:"", done:true }
Note over L,A: ...time passes, L's tab refreshes as N...
N->>A: connect, cf_agent_stream_resume_request
A-->>N: cf_agent_stream_resume_none ← bug: no error replay
Note over N: N sees an empty turn, no partial content, no error
What #1575 wants instead:
Target wire shape for reconnect-after-error
-
The server recognises
status = 'error'for thisrequestId. -
It replays stored chunks ("Hello", "Wor") with
replay:true. -
It emits a terminal
{ body: errorText, done: true, error: true, replay: true }frame so the client errors its stream identically to the live path.
That requires two changes: (a) storing the error text alongside
metadata or as a final chunk, and (b) adding a
replayTerminalChunksByRequestId that handles both
completed and error.
8. Agent-tool runs ride on top
An agent-tool run is a sub-agent invocation packaged so the parent
can see its progress and final status. It is built on top of
saveMessages, plus a child-run row and a tailer
forwarder pipeline for live chunks.
flowchart LR parent[Parent agent] child[Child agent] save[saveMessages] childrow[(cf_agent_tool_child_runs)] fwd[broadcast forwarder] tailer["tailAgentToolRun (ReadableStream)"] forwarders["_agentToolForwarders Map>"] errors["_agentToolLastErrors Map "] parent -- runAgentTool(input, runId) --> child child -- startAgentToolRun(input, {runId}) --> childrow child -- saveMessages --> save save -- broadcast cf_agent_use_chat_response --> fwd fwd -- if forwarders exist --> forwarders fwd -- if error frame --> errors parent -- tailAgentToolRun(runId) --> tailer tailer -. registers forward fn .-> forwarders
8.1 How the run gets its terminal status
startAgentToolRun awaits saveMessages,
then looks at three signals to classify the run:
const streamError = result.error ?? this._agentToolLastErrors.get(options.runId);
const status =
result.status === "error" || skipped || streamError
? "error"
: result.status === "aborted"
? "aborted"
: "completed";
Post-#1567, result.error is populated whenever the
child turn errored, so the run can be classified
without ever observing a forwarded error frame. The
_agentToolLastErrors map is mostly belt-and-braces.
8.2 Where the two packages diverge
flowchart TB
subgraph think["@cloudflare/think"]
tbroad["broadcast override"]
tloop["for each forwarder
set _agentToolLastErrors[runId]"]
tbroad --> tloop
end
subgraph ai["@cloudflare/ai-chat"]
abroad["broadcast override"]
aactive{_agentToolActiveRunId
set?}
aactive_yes["scope error/chunk to that one run"]
aactive_no["fan out to all forwarders
(legacy behavior)"]
abroad --> aactive
aactive -- yes --> aactive_yes
aactive -- no --> aactive_no
end
The Think-side gap
When two Think agent-tool runs are tailed concurrently and one of
them errors, the broadcast override writes that error into
every registered forwarder's slot in
_agentToolLastErrors. So a sibling run that
eventually completes successfully can still be classified
error when its own
result.error is undefined and
_agentToolLastErrors.get(runId) happens to be set
from a different run.
AIChat fixed this with _agentToolActiveRunId: it
records which run is currently driving the
broadcast() call (set around the
saveMessages invocation) and scopes the writes to
that one run.
Think: think.ts:1457-1486. AIChat:
ai-chat/src/index.ts:494-538,
2345.
9. Mapping back to #1575
| Acceptance criterion | Current state on main | Action |
|---|---|---|
| Live and reconnecting clients see same terminal error | missing |
Extend ResumableStream with terminal-error
replay; store error text.
|
| Stream metadata distinguishes terminal error | done | status = 'error' already written. |
| Partial content before error still recoverable | half-done | Live path persists assistant parts. Replay path drops them because the errored stream is not replayed at all. |
| Tests for reconnect/replay of in-band errors | missing | Add. There is also one existing AIChat test that locks in the wrong behavior; it should flip. |
startAgentToolRun classifies error with no
tailer
|
done |
Already classified from result.error. Add a
regression test.
|
| Delayed-tailer classification unaffected | done | Add a regression test. |
| No cross-run contamination of agent-tool errors | done in ai-chat not in think |
Port _agentToolActiveRunId scoping into
Think.broadcast().
|
10. Self check
Answer in your head, then expand to verify. If any answer surprises you, that is where to dig next.
Q1. Why does an in-band error today produce two wire frames where a thrown error produces one?
Show answer
The in-band branch breaks out of the chunk loop, then falls
through to the normal "loop ended" path which emits the
trailing done:true. The thrown path lives in
catch, where the not-yet-sent guard combines body
and done into a single done:true, error:true frame.
#1575 asks us to consider unifying these.
Q2. What does the reconnecting client receive today if it reconnects 200ms after an in-band error?
Show answer
cf_agent_stream_resume_none (server replies "no
active stream"), then on ACK the fallback path sends an empty
done:true replay frame. The client's
ReadableStream closes cleanly. No partial parts.
No error. This is the visible #1575 bug.
Q3. Why can startAgentToolRun classify an in-band error without a tailer?
Show answer
Because _streamResult returns
{ status: "error", error }, which propagates
through saveMessages to
SaveMessagesResult.error, which
startAgentToolRun reads first. The tailer pipeline
is no longer the source of truth for run status.
Q4. Why does the AIChat _agentToolActiveRunId scope help, and why does Think still need it?
Show answer
Without scoping, every active forwarder's
_agentToolLastErrors[runId] gets overwritten when
any chat error frame is broadcast. Scoping it to the
one currently running child means run A's error only affects run
A, not concurrent sibling B. AIChat sets it around the child's
saveMessages; Think never sets it and writes to
every forwarder.
Q5. If you were to add a replayTerminalChunksByRequestId, what is the smallest change to cf_ai_chat_stream_metadata that lets it reconstruct the error text?
Show answer
Add a nullable error_message column populated by
markError(streamId, errorText). The alternative
is encoding a final synthetic chunk for the error event, which
avoids a schema migration but ties two table shapes together
forever. Pick deliberately; either is defensible.