Lesson 03 — In-band stream error replay parity

Grounding: issue #1575 (Part 1 — resumable replay). Code in packages/agents/src/chat/resumable-stream.ts, packages/ai-chat/src/index.ts, and packages/ai-chat/src/ws-chat-transport.ts. The companion @cloudflare/think paths mirror ai-chat.

1. What goes wrong on reconnect

A model provider can fail mid-stream in two ways. The first is an out-of-band failure: the SDK reader throws, the server catches it, and broadcasts a single error frame. The second is an in-band failure: the provider keeps the stream alive and emits a UI message chunk of the shape { type: "error", errorText }. This second case is the one the AI SDK uses for things like rate limits and provider policy denials.

Live clients handle both fine. The interesting case is the client that disconnects (network blip, page refresh) during the turn and reconnects after the error has been emitted. With the current code, that client cannot tell that the turn failed: the server replays nothing or sends a bare done: true frame indistinguishable from a clean completion. The live observer and the reconnect observer disagree about terminal state.

This lesson reconstructs why, and what the fix has to preserve.

2. Primer: the resumable stream model

Every chat turn the SDK runs maps onto three concrete artefacts.

LayerWhat it holdsLifetime
SQLite cf_ai_chat_stream_metadata One row per stream. status is streaming / completed / error. Persists across DO eviction. Garbage-collected after 24h.
SQLite cf_ai_chat_stream_chunks Append-only log of serialized UI message chunks. Same as metadata.
WebSocket frames CF_AGENT_USE_CHAT_RESPONSE messages with body, done, optional error: true / replay: true. Lives only while a client is connected.

ResumableStream in packages/agents/src/chat/resumable-stream.ts owns both tables. Three methods drive the lifecycle:

  • start(requestId) — insert a metadata row, set _activeStreamId, return the new id.
  • storeChunk(streamId, body) — buffer a chunk; flush to SQLite in batches of 10.
  • complete(streamId) or markError(streamId) — flush the buffer, update status, clear in-memory active state.

On reconnect, a client sends CF_AGENT_STREAM_RESUME_REQUEST. The server either replies STREAM_RESUMING (and replays stored chunks after the client ACKs) or STREAM_RESUME_NONE.

3. The live error path

In packages/ai-chat/src/index.ts:4169-4192, when the parser sees a type: "error" chunk it does three things in order:

case "error": {
  const error = data.errorText ?? JSON.stringify({ type: data.type });
  this._broadcastChatMessage({
    error: true,
    body: error,
    done: false,
    id,
    type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
    ...(continuation && { continuation: true })
  });
  this._markStreamError(streamId);     // metadata → 'error', clear active
  this._emit("message:error", { error });
  await reader.cancel().catch(() => {});
  streamCompleted.value = true;
  this._broadcastChatMessage({
    body: "",
    done: true,
    id,
    type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
    ...(continuation && { continuation: true })
  });
  return { status: "error", error };
}

Notice what is missing: there is no storeChunk call for the error frame. The wire-level information that this turn ended in error exists only in two transient messages, then it's gone. The durable record is the metadata row's status = 'error', plus whatever content chunks happened to be flushed before the error.

Why this is structurally awkward

The chunk buffer is meant to be the single source of truth for what the client sees, but it doesn't capture the error event. The metadata row is meant to track lifecycle, but it doesn't hold the error text. Neither projection alone is enough to reproduce the live experience on replay.

4. The replay path, and what it loses

When a client reconnects, the server runs this handler in packages/ai-chat/src/index.ts:892-925:

if (data.type === MessageType.CF_AGENT_STREAM_RESUME_REQUEST) {
  if (this._resumableStream.hasActiveStream()) {
    // … continuation handling, then _notifyStreamResuming(connection)
  } else {
    sendIfOpen(connection, JSON.stringify({
      type: MessageType.CF_AGENT_STREAM_RESUME_NONE
    }));
  }
  return;
}

But markError already cleared _activeStreamId, so hasActiveStream() returns false. The server takes the STREAM_RESUME_NONE branch. From the client's point of view, that's the same answer it would receive if no turn had ever happened.

The late-ACK branch in the same file (:929-968) is no better. It falls through to replayCompletedChunksByRequestId, which selects only rows with status = 'completed':

const streams = this.sql<StreamMetadata>`
  select * from cf_ai_chat_stream_metadata
  where request_id = ${requestId}
  and status = 'completed'    // ← errored streams are filtered out
  order by created_at desc
  limit 1
`;

Result: the caller sees false and emits a bare done: true replay frame with no body and no error flag. That's exactly what a clean completion looks like on the wire.

sequenceDiagram
  participant L as Live client
  participant S as Server
  participant DB as SQLite
  participant R as Reconnecting client

  L->>S: subscribe
  S-->>L: chunk(text)
  S->>DB: storeChunk(text)
  Note over L,DB: turn-text flushed
  S-->>L: error:true, body=errorText, done:false
  S->>DB: markError → status='error', clear active
  S-->>L: body="", done:true
  Note over L: live observer sees error
  R->>S: STREAM_RESUME_REQUEST (after reconnect)
  S->>DB: hasActiveStream? no
  S-->>R: STREAM_RESUME_NONE
  Note over R,S: reconnect observer sees "nothing happened"
            

5. The information that exists but doesn't flow

At the point of markError the system actually knows three things:

  1. The stream ended terminally (status = 'error').
  2. The error text (held as a local variable in the broadcast call site, then dropped).
  3. Which chunks already made it into the buffer (so partial assistant content from before the error is recoverable, in principle).

Only (1) survives to replay time, and replay code is written to treat (1) as "nothing to do". Part 1 of #1575 is fundamentally about making (1) drive a deliberate replay outcome, and ensuring (2) and (3) survive alongside it.

6. Two design options

Option A: persist a terminal-error chunk

At the point of markError, call storeChunk with a sentinel body that the replay decoder recognises as a terminal error event. The chunk buffer becomes a complete projection of what a live observer saw, in order. Replay just walks the chunks.

Pros: single source of truth; existing chunk-ordering machinery does the work. Cons: introduces a new sentinel that has to survive schema/protocol evolution.

Option B: extend metadata with terminal context

Add columns to cf_ai_chat_stream_metadata for error_text (and maybe finish_reason). Have the replay paths read these on errored streams and synthesise a terminal error: true, done: true frame after the existing content chunks.

Pros: no new sentinel; schema clearly says "this is terminal state, not content". Cons: replay code has to remember to project the metadata into a synthetic chunk; second source of truth.

Lean

Option A keeps the chunk stream as the single contract. replayChunks already emits synthetic terminal frames for orphaned streams (lines 354-373 in resumable-stream.ts); errored streams are just another shape of terminal. Worth confirming with threepointone before committing.

7. The test that pins the wrong behaviour

packages/ai-chat/src/tests/resumable-streaming.test.ts:733-786 is named:

it("does not replay stored chunks from an errored stream after a late ACK", ...)

It asserts that a late ACK after markError produces exactly one frame: { done: true, replay: true }, no body, no error flag. That is exactly the wire contract #1575 wants to change. Part 1 has to rewrite this test, not work around it. Before doing so, check that no consumer outside the test relies on the old shape — the test name suggests it was written defensively when partial-replay-after-error felt like the dangerous case.

8. Self check

Q1

Why does hasActiveStream() return false on reconnect after an in-band error, given that the metadata row still exists?

Answer

hasActiveStream() reads in-memory state, not SQLite. markError clears _activeStreamId as part of its bookkeeping. The metadata row persists with status = 'error', but the in-memory pointer to it is gone. Replay has to walk SQLite directly to recover the terminal state.

Q2

Suppose a turn streams three text-delta chunks, then emits an in-band error. A client reconnects after the error. Which of those three chunks does the current implementation replay?

Answer

None. STREAM_RESUME_REQUEST short-circuits with STREAM_RESUME_NONE because the stream is no longer active. Even on the late-ACK path, replayCompletedChunksByRequestId filters out errored streams. The chunks are still in SQLite — they're just not surfaced.

Q3

If you choose Option A (persist a terminal-error chunk), what has to be true about the encoding so existing replay decoders don't crash on it?

Answer

The chunk body is JSON parsed by client-side reducers. A new event type has to either be ignored by old clients (forward compatibility) or gated by a protocol version bump. The safest shape is something the parser already handles — possibly the same wire frame as the live error event, so the decoder needs no new branch.

Q4

Why is Think (packages/think/src/think.ts) relevant to a fix in ai-chat?

Answer

Both packages share ResumableStream and both emit in-band error frames via the same protocol. A fix in ResumableStream (Option A or B) propagates to Think automatically; a fix only at the ai-chat broadcaster level leaves Think emitting the old wire shape. Convergent fix at the shared layer is the right move.