Lesson 03 — In-band stream error replay parity
Grounding: issue
#1575
(Part 1 — resumable replay).
Code in
packages/agents/src/chat/resumable-stream.ts,
packages/ai-chat/src/index.ts, and
packages/ai-chat/src/ws-chat-transport.ts. The
companion @cloudflare/think paths mirror ai-chat.
1. What goes wrong on reconnect
A model provider can fail mid-stream in two ways. The first is an
out-of-band failure: the SDK reader throws, the
server catches it, and broadcasts a single error frame. The second
is an in-band failure: the provider keeps the
stream alive and emits a UI message chunk of the shape
{ type: "error", errorText }. This second case is the
one the AI SDK uses for things like rate limits and provider
policy denials.
Live clients handle both fine. The interesting case is the client
that disconnects (network blip, page refresh) during the turn and
reconnects after the error has been emitted. With the current
code, that client cannot tell that the turn failed: the server
replays nothing or sends a bare done: true frame
indistinguishable from a clean completion. The live observer and
the reconnect observer disagree about terminal state.
This lesson reconstructs why, and what the fix has to preserve.
2. Primer: the resumable stream model
Every chat turn the SDK runs maps onto three concrete artefacts.
| Layer | What it holds | Lifetime |
|---|---|---|
SQLite cf_ai_chat_stream_metadata |
One row per stream. status is
streaming / completed /
error. |
Persists across DO eviction. Garbage-collected after 24h. |
SQLite cf_ai_chat_stream_chunks |
Append-only log of serialized UI message chunks. | Same as metadata. |
| WebSocket frames | CF_AGENT_USE_CHAT_RESPONSE messages with
body, done, optional
error: true / replay: true. |
Lives only while a client is connected. |
ResumableStream in
packages/agents/src/chat/resumable-stream.ts owns
both tables. Three methods drive the lifecycle:
start(requestId)— insert a metadata row, set_activeStreamId, return the new id.storeChunk(streamId, body)— buffer a chunk; flush to SQLite in batches of 10.complete(streamId)ormarkError(streamId)— flush the buffer, updatestatus, clear in-memory active state.
On reconnect, a client sends
CF_AGENT_STREAM_RESUME_REQUEST. The server either
replies STREAM_RESUMING (and replays stored chunks
after the client ACKs) or STREAM_RESUME_NONE.
3. The live error path
In packages/ai-chat/src/index.ts:4169-4192, when the
parser sees a type: "error" chunk it does three
things in order:
case "error": {
const error = data.errorText ?? JSON.stringify({ type: data.type });
this._broadcastChatMessage({
error: true,
body: error,
done: false,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
this._markStreamError(streamId); // metadata → 'error', clear active
this._emit("message:error", { error });
await reader.cancel().catch(() => {});
streamCompleted.value = true;
this._broadcastChatMessage({
body: "",
done: true,
id,
type: MessageType.CF_AGENT_USE_CHAT_RESPONSE,
...(continuation && { continuation: true })
});
return { status: "error", error };
}
Notice what is missing: there is no storeChunk call
for the error frame. The wire-level information that this turn
ended in error exists only in two transient messages, then it's
gone. The durable record is the metadata row's
status = 'error', plus whatever content chunks
happened to be flushed before the error.
Why this is structurally awkward
The chunk buffer is meant to be the single source of truth for what the client sees, but it doesn't capture the error event. The metadata row is meant to track lifecycle, but it doesn't hold the error text. Neither projection alone is enough to reproduce the live experience on replay.
4. The replay path, and what it loses
When a client reconnects, the server runs this handler in
packages/ai-chat/src/index.ts:892-925:
if (data.type === MessageType.CF_AGENT_STREAM_RESUME_REQUEST) {
if (this._resumableStream.hasActiveStream()) {
// … continuation handling, then _notifyStreamResuming(connection)
} else {
sendIfOpen(connection, JSON.stringify({
type: MessageType.CF_AGENT_STREAM_RESUME_NONE
}));
}
return;
}
But markError already cleared
_activeStreamId, so hasActiveStream()
returns false. The server takes the STREAM_RESUME_NONE
branch. From the client's point of view, that's the same answer
it would receive if no turn had ever happened.
The late-ACK branch in the same file
(:929-968) is no better. It falls through to
replayCompletedChunksByRequestId, which selects only
rows with status = 'completed':
const streams = this.sql<StreamMetadata>`
select * from cf_ai_chat_stream_metadata
where request_id = ${requestId}
and status = 'completed' // ← errored streams are filtered out
order by created_at desc
limit 1
`;
Result: the caller sees false and emits a bare
done: true replay frame with no body
and no error flag. That's exactly what a clean
completion looks like on the wire.
sequenceDiagram
participant L as Live client
participant S as Server
participant DB as SQLite
participant R as Reconnecting client
L->>S: subscribe
S-->>L: chunk(text)
S->>DB: storeChunk(text)
Note over L,DB: turn-text flushed
S-->>L: error:true, body=errorText, done:false
S->>DB: markError → status='error', clear active
S-->>L: body="", done:true
Note over L: live observer sees error
R->>S: STREAM_RESUME_REQUEST (after reconnect)
S->>DB: hasActiveStream? no
S-->>R: STREAM_RESUME_NONE
Note over R,S: reconnect observer sees "nothing happened"
5. The information that exists but doesn't flow
At the point of markError the system actually knows
three things:
- The stream ended terminally (
status = 'error'). - The error text (held as a local variable in the broadcast call site, then dropped).
- Which chunks already made it into the buffer (so partial assistant content from before the error is recoverable, in principle).
Only (1) survives to replay time, and replay code is written to treat (1) as "nothing to do". Part 1 of #1575 is fundamentally about making (1) drive a deliberate replay outcome, and ensuring (2) and (3) survive alongside it.
6. Two design options
Option A: persist a terminal-error chunk
At the point of markError, call
storeChunk with a sentinel body that the replay
decoder recognises as a terminal error event. The chunk buffer
becomes a complete projection of what a live observer saw, in
order. Replay just walks the chunks.
Pros: single source of truth; existing chunk-ordering machinery does the work. Cons: introduces a new sentinel that has to survive schema/protocol evolution.
Option B: extend metadata with terminal context
Add columns to cf_ai_chat_stream_metadata for
error_text (and maybe finish_reason).
Have the replay paths read these on errored streams and
synthesise a terminal error: true, done: true frame
after the existing content chunks.
Pros: no new sentinel; schema clearly says "this is terminal state, not content". Cons: replay code has to remember to project the metadata into a synthetic chunk; second source of truth.
Lean
Option A keeps the chunk stream as the single contract.
replayChunks already emits synthetic terminal
frames for orphaned streams (lines 354-373 in
resumable-stream.ts); errored streams are just
another shape of terminal. Worth confirming with threepointone
before committing.
7. The test that pins the wrong behaviour
packages/ai-chat/src/tests/resumable-streaming.test.ts:733-786
is named:
it("does not replay stored chunks from an errored stream after a late ACK", ...)
It asserts that a late ACK after markError produces
exactly one frame: { done: true, replay: true }, no
body, no error flag. That is exactly the wire contract #1575
wants to change. Part 1 has to rewrite this test, not work
around it. Before doing so, check that no consumer outside the
test relies on the old shape — the test name suggests it was
written defensively when partial-replay-after-error felt like
the dangerous case.
8. Self check
Q1
Why does hasActiveStream() return false on
reconnect after an in-band error, given that the metadata row
still exists?
Answer
hasActiveStream() reads in-memory state, not
SQLite. markError clears
_activeStreamId as part of its bookkeeping. The
metadata row persists with status = 'error',
but the in-memory pointer to it is gone. Replay has to walk
SQLite directly to recover the terminal state.
Q2
Suppose a turn streams three text-delta chunks, then emits an in-band error. A client reconnects after the error. Which of those three chunks does the current implementation replay?
Answer
None. STREAM_RESUME_REQUEST short-circuits with
STREAM_RESUME_NONE because the stream is no
longer active. Even on the late-ACK path,
replayCompletedChunksByRequestId filters out
errored streams. The chunks are still in SQLite — they're
just not surfaced.
Q3
If you choose Option A (persist a terminal-error chunk), what has to be true about the encoding so existing replay decoders don't crash on it?
Answer
The chunk body is JSON parsed by client-side reducers. A new event type has to either be ignored by old clients (forward compatibility) or gated by a protocol version bump. The safest shape is something the parser already handles — possibly the same wire frame as the live error event, so the decoder needs no new branch.
Q4
Why is Think (packages/think/src/think.ts)
relevant to a fix in ai-chat?
Answer
Both packages share ResumableStream and both
emit in-band error frames via the same protocol. A fix in
ResumableStream (Option A or B) propagates to
Think automatically; a fix only at the ai-chat broadcaster
level leaves Think emitting the old wire shape. Convergent
fix at the shared layer is the right move.