Lesson 01 — Chat Stream Lifecycle

Grounding: @cloudflare/think, @cloudflare/ai-chat, agents/chat on main. Goal: build the model needed for #1575.

1. Why this lesson exists

#1575 is not a bug fix you can localise to a single function. It is a parity question: do live observers and reconnecting observers see the same terminal outcome? To answer that, you need a mental model of what a chat turn is as it moves through three layers in parallel:

  • the wire (WebSocket frames a client receives),
  • the durable store (cf_ai_chat_stream_metadata and cf_ai_chat_stream_chunks),
  • and the programmatic surfaces (SaveMessagesResult, onChatResponse, agent-tool child run rows).

#1567 already made the programmatic surfaces honest about errors. #1575 is about making the wire and the durable store agree.

2. Cast of characters

Before any diagrams, pin these down. They reappear constantly.

Name What it is Where it lives
Think / AIChatAgent The two opinionated chat-agent base classes. They share the same protocol shape and most of the same lifecycle. packages/think/src/think.ts, packages/ai-chat/src/index.ts
_streamResult Consumes a UI message stream chunk-by-chunk, broadcasts each chunk over WebSocket, stores it for replay, and decides whether the turn ended completed, error, or aborted. think.ts:5675 and the equivalent _streamSSEReply in AIChat
ResumableStream Standalone class that owns chunk persistence, terminal status, replay, and stale-stream cleanup. packages/agents/src/chat/resumable-stream.ts
cf_ai_chat_stream_metadata / cf_ai_chat_stream_chunks Two SQLite tables in the agent's Durable Object storage. The first carries one row per stream with a status. The second carries indexed chunk bodies. resumable-stream.ts:103–117
cf_agent_use_chat_response The wire frame the agent broadcasts for every streamed chunk. Carries body, done, error, replay. Constants in agents/chat/protocol.ts; sent from _broadcastChat / _broadcastChatMessage
cf_agent_stream_resume_request / cf_agent_stream_resuming / cf_agent_stream_resume_ack / cf_agent_stream_resume_none The four-frame reconnect handshake. Client asks if there is anything to resume, server either says "yes, here is the request id" or "no", client acks, server replays. think.ts:5113, ai-chat/src/index.ts:900
SaveMessagesResult The single canonical result shape for any programmatic chat turn entry point. Status is one of completed, error, aborted, skipped. packages/agents/src/chat/lifecycle.ts:75
onChatResponse The terminal lifecycle hook fired exactly once per turn. Receives the assistant message and the same status as the programmatic result. lifecycle.ts:23; fired via _fireResponseHook
startAgentToolRun The sub-agent's "run as a tool" entry point. Wraps saveMessages with a child-run row and a tailer registry. think.ts:2892, ai-chat/src/index.ts:2310

3. A happy turn, end to end

Walk through a single successful chat turn before introducing any failure mode. Everything else is a deviation from this picture.

3.1 The pipeline

flowchart LR
  client[Browser useChat]
  ws[(WebSocket)]
  agent[Think or AIChatAgent]
  stream["_streamResult / _streamSSEReply"]
  rs[ResumableStream]
  meta[(cf_ai_chat_stream_metadata)]
  chunks[(cf_ai_chat_stream_chunks)]
  hook[onChatResponse]

  client -- cf_agent_use_chat_request --> ws --> agent
  agent --> stream
  stream -- toUIMessageStream --> stream
  stream -- broadcast cf_agent_use_chat_response --> ws --> client
  stream --> rs
  rs -- start --> meta
  rs -- storeChunk batched --> chunks
  stream -- complete --> rs
  rs -- status = 'completed' --> meta
  stream -- fire --> hook

3.2 Lifecycle, one frame at a time

sequenceDiagram
  autonumber
  participant C as Client
  participant A as Agent (Think)
  participant S as _streamResult
  participant R as ResumableStream
  participant DB as SQLite

  C->>A: cf_agent_use_chat_request (POST body)
  A->>S: invoke with requestId
  S->>R: start(requestId)
  R->>DB: INSERT cf_ai_chat_stream_metadata status='streaming'
  loop for each chunk from toUIMessageStream()
    S->>R: storeChunk(streamId, body)
    R->>DB: buffered INSERT cf_ai_chat_stream_chunks
    S->>C: cf_agent_use_chat_response { body, done:false }
  end
  S->>R: complete(streamId)
  R->>DB: UPDATE metadata status='completed'
  S->>C: cf_agent_use_chat_response { body:"", done:true }
  S->>A: _fireResponseHook(status:"completed")

Key observation

The wire and the durable store are written from the same loop. That is what makes the happy path easy: there is one moment when status transitions from streaming to completed, and one moment when done:true goes to the wire. Any divergence between live and replay starts with an extra moment.

4. Four terminal statuses

SaveMessagesResult.status is the single source of truth. Memorise the matrix below, because #1575 is largely about making the wire and the durable store reach the same status as this enum.

Status When it happens Wire signal today Metadata status onChatResponse
completed Stream ended naturally. done:true completed status:"completed"
error Thrown error in the stream, OR an in-band { type: "error", errorText } chunk. Thrown: done:true, error:true. In-band: error:true, done:false then a separate done:true. error status:"error", error: populated
aborted Abort signal fired mid-stream. done:true (no error flag) completed in the table — interesting status:"aborted"
skipped Turn was superseded by a CHAT_CLEAR. No frame: client clears state separately. n/a n/a (hook does not fire)

Subtlety

In-band errors and thrown errors both end up with metadata status = 'error', but their wire shape is different today. That asymmetry is one of the things #1575 wants resolved.

5. The resumable stream tables

ResumableStream exists so that a client which drops a WebSocket connection can reconnect and recover whatever it missed without rerunning the LLM. There are two tables.

cf_ai_chat_stream_metadata
  id           text primary key
  request_id   text
  status       text   -- 'streaming' | 'completed' | 'error'
  created_at   integer
  completed_at integer

cf_ai_chat_stream_chunks
  id           text primary key
  stream_id    text
  body         text   -- the JSON-encoded UIMessage chunk
  chunk_index  integer
  created_at   integer

5.1 Three lifecycle states

stateDiagram-v2
  [*] --> streaming: start(requestId)
  streaming --> completed: complete(streamId)
  streaming --> error: markError(streamId)
  completed --> [*]
  error --> [*]

5.2 The four-frame reconnect handshake

Connecting clients always negotiate. The server proactively says "there is a stream you can resume"; the client decides whether to opt in.

sequenceDiagram
  autonumber
  participant C as New WS client
  participant A as Agent
  participant R as ResumableStream

  Note over C,A: Case A — stream is still live
  A->>C: cf_agent_stream_resuming { id: requestId }
  C->>A: cf_agent_stream_resume_ack { id: requestId }
  A->>R: replayChunks(connection, requestId)
  R-->>C: cf_agent_use_chat_response { body, replay:true } (xN)
  R-->>C: cf_agent_use_chat_response { body:"", replay:true, replayComplete:true }
  Note over A,R: live stream continues from here

  Note over C,A: Case B — stream completed before client reconnected
  C->>A: cf_agent_stream_resume_request
  A->>R: replayCompletedChunksByRequestId(connection, id)
  R-->>C: cf_agent_use_chat_response { body, replay:true } (xN)
  R-->>C: cf_agent_use_chat_response { body:"", done:true, replay:true }

  Note over C,A: Case C — nothing to resume
  C->>A: cf_agent_stream_resume_request
  A-->>C: cf_agent_stream_resume_none

The gap

There is no Case D for errored streams. If status = 'error', the server today falls through to the "nothing to resume" branch and sends a normal done:true frame. The reconnecting client sees a successful (empty) turn even though metadata clearly says it errored. That is one of the two fixes #1575 needs.

See think.ts:5157 and ai-chat/src/index.ts:951.

6. Where errors enter the picture

Three places a turn can end with status:"error":

flowchart TD
  start([_streamResult begins])
  loop{for await chunk}
  inband{chunk.type === error?}
  thrown{thrown error?}
  late{captureOutput &&
result.output rejects?} ok[status: completed] err[status: error] start --> loop loop -- next chunk --> inband inband -- yes --> err inband -- no --> loop loop -- iterator throws --> thrown thrown -- yes --> err loop -- iterator done --> late late -- yes --> err late -- no --> ok

6.1 In-band error chunk (the common case)

// think.ts:5720 (paraphrased)
if (action?.type === "error") {
  streamError = action.error;
  this._emit("message:error", { error: streamError });
  this._broadcastChat({
    type: MSG_CHAT_RESPONSE,
    id: requestId,
    body: action.error,
    done: false,
    error: true,
  });
  break;
}
// ...later
if (streamError) {
  this._resumableStream.markError(streamId);
} else {
  this._resumableStream.complete(streamId);
}
this._broadcastChat({ type: MSG_CHAT_RESPONSE, id: requestId, body: "", done: true });

Two wire frames go out: an error:true, done:false frame carrying the error text, then a normal done:true frame. The metadata moves to error. onChatResponse fires with status:"error". Partial parts before the break are persisted.

6.2 Thrown stream error

The catch branch in _streamResult sets streamError from error.message, calls markError, and (if no done was sent) emits a done:true, error:true frame in a single shot.

6.3 Structured output rejection

When captureOutput is true, awaiting result.output after the stream loop can reject. Same terminal handling, no extra wire frame because the stream is already done.

Why the wire shape matters

The client transport treats any frame with error: true as terminal — it errors the ReadableStream immediately. So in practice in-band errors work on the wire today. But the trailing done:true still goes through, and that is the frame the reconnect path reproduces, which is why reconnecting clients currently see a "successful" turn.

7. Live vs replay parity

This is the heart of #1575. Walk both observers through the same failing turn and compare what they see.

sequenceDiagram
  autonumber
  participant L as Live client
  participant A as Agent
  participant R as ResumableStream
  participant N as Reconnecting client

  L->>A: cf_agent_use_chat_request
  A->>R: start
  A->>R: storeChunk "Hello"
  A->>L: { body:"Hello", done:false }
  A->>R: storeChunk "Wor"
  A->>L: { body:"Wor", done:false }
  Note over A: provider emits { type:"error", errorText:"boom" }
  A->>L: { body:"boom", done:false, error:true }  ←  L's stream errors
  A->>R: markError
  A->>L: { body:"", done:true }
  Note over L,A: ...time passes, L's tab refreshes as N...
  N->>A: connect, cf_agent_stream_resume_request
  A-->>N: cf_agent_stream_resume_none  ←  bug: no error replay
  Note over N: N sees an empty turn, no partial content, no error

What #1575 wants instead:

Target wire shape for reconnect-after-error

  1. The server recognises status = 'error' for this requestId.
  2. It replays stored chunks ("Hello", "Wor") with replay:true.
  3. It emits a terminal { body: errorText, done: true, error: true, replay: true } frame so the client errors its stream identically to the live path.

That requires two changes: (a) storing the error text alongside metadata or as a final chunk, and (b) adding a replayTerminalChunksByRequestId that handles both completed and error.

8. Agent-tool runs ride on top

An agent-tool run is a sub-agent invocation packaged so the parent can see its progress and final status. It is built on top of saveMessages, plus a child-run row and a tailer forwarder pipeline for live chunks.

flowchart LR
  parent[Parent agent]
  child[Child agent]
  save[saveMessages]
  childrow[(cf_agent_tool_child_runs)]
  fwd[broadcast forwarder]
  tailer["tailAgentToolRun (ReadableStream)"]
  forwarders["_agentToolForwarders Map>"]
  errors["_agentToolLastErrors Map"]

  parent -- runAgentTool(input, runId) --> child
  child -- startAgentToolRun(input, {runId}) --> childrow
  child -- saveMessages --> save
  save -- broadcast cf_agent_use_chat_response --> fwd
  fwd -- if forwarders exist --> forwarders
  fwd -- if error frame --> errors
  parent -- tailAgentToolRun(runId) --> tailer
  tailer -. registers forward fn .-> forwarders

8.1 How the run gets its terminal status

startAgentToolRun awaits saveMessages, then looks at three signals to classify the run:

const streamError = result.error ?? this._agentToolLastErrors.get(options.runId);
const status =
  result.status === "error" || skipped || streamError
    ? "error"
    : result.status === "aborted"
      ? "aborted"
      : "completed";

Post-#1567, result.error is populated whenever the child turn errored, so the run can be classified without ever observing a forwarded error frame. The _agentToolLastErrors map is mostly belt-and-braces.

8.2 Where the two packages diverge

flowchart TB
  subgraph think["@cloudflare/think"]
    tbroad["broadcast override"]
    tloop["for each forwarder
set _agentToolLastErrors[runId]"] tbroad --> tloop end subgraph ai["@cloudflare/ai-chat"] abroad["broadcast override"] aactive{_agentToolActiveRunId
set?} aactive_yes["scope error/chunk to that one run"] aactive_no["fan out to all forwarders
(legacy behavior)"] abroad --> aactive aactive -- yes --> aactive_yes aactive -- no --> aactive_no end

The Think-side gap

When two Think agent-tool runs are tailed concurrently and one of them errors, the broadcast override writes that error into every registered forwarder's slot in _agentToolLastErrors. So a sibling run that eventually completes successfully can still be classified error when its own result.error is undefined and _agentToolLastErrors.get(runId) happens to be set from a different run.

AIChat fixed this with _agentToolActiveRunId: it records which run is currently driving the broadcast() call (set around the saveMessages invocation) and scopes the writes to that one run.

Think: think.ts:1457-1486. AIChat: ai-chat/src/index.ts:494-538, 2345.

9. Mapping back to #1575

Acceptance criterion Current state on main Action
Live and reconnecting clients see same terminal error missing Extend ResumableStream with terminal-error replay; store error text.
Stream metadata distinguishes terminal error done status = 'error' already written.
Partial content before error still recoverable half-done Live path persists assistant parts. Replay path drops them because the errored stream is not replayed at all.
Tests for reconnect/replay of in-band errors missing Add. There is also one existing AIChat test that locks in the wrong behavior; it should flip.
startAgentToolRun classifies error with no tailer done Already classified from result.error. Add a regression test.
Delayed-tailer classification unaffected done Add a regression test.
No cross-run contamination of agent-tool errors done in ai-chat not in think Port _agentToolActiveRunId scoping into Think.broadcast().

10. Self check

Answer in your head, then expand to verify. If any answer surprises you, that is where to dig next.

Q1. Why does an in-band error today produce two wire frames where a thrown error produces one?

Show answer

The in-band branch breaks out of the chunk loop, then falls through to the normal "loop ended" path which emits the trailing done:true. The thrown path lives in catch, where the not-yet-sent guard combines body and done into a single done:true, error:true frame. #1575 asks us to consider unifying these.

Q2. What does the reconnecting client receive today if it reconnects 200ms after an in-band error?

Show answer

cf_agent_stream_resume_none (server replies "no active stream"), then on ACK the fallback path sends an empty done:true replay frame. The client's ReadableStream closes cleanly. No partial parts. No error. This is the visible #1575 bug.

Q3. Why can startAgentToolRun classify an in-band error without a tailer?

Show answer

Because _streamResult returns { status: "error", error }, which propagates through saveMessages to SaveMessagesResult.error, which startAgentToolRun reads first. The tailer pipeline is no longer the source of truth for run status.

Q4. Why does the AIChat _agentToolActiveRunId scope help, and why does Think still need it?

Show answer

Without scoping, every active forwarder's _agentToolLastErrors[runId] gets overwritten when any chat error frame is broadcast. Scoping it to the one currently running child means run A's error only affects run A, not concurrent sibling B. AIChat sets it around the child's saveMessages; Think never sets it and writes to every forwarder.

Q5. If you were to add a replayTerminalChunksByRequestId, what is the smallest change to cf_ai_chat_stream_metadata that lets it reconstruct the error text?

Show answer

Add a nullable error_message column populated by markError(streamId, errorText). The alternative is encoding a final synthetic chunk for the error event, which avoids a schema migration but ties two table shapes together forever. Pick deliberately; either is defensible.