fix(provider): use incremental SSE stream reading for openai-codex responses

Replace full-body buffering (`response.text().await`) in
`decode_responses_body()` with incremental `bytes_stream()` chunk
processing.  The previous approach held the HTTP connection open until
every byte had arrived; on high-latency links the long-lived connection
would frequently drop mid-read, producing the "error decoding response
body" failure on the first attempt (succeeding only after retry).

Reading chunks incrementally lets each network segment complete within
its own timeout window, eliminating the systematic first-attempt failure.

Closes #3544

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
simianastronaut 2026-03-15 15:22:55 -04:00 committed by Roman Tataurov
parent 09ca915927
commit f9f9cc6b4a
No known key found for this signature in database
GPG Key ID: 70A51EF3185C334B

View File

@ -4,6 +4,7 @@ use crate::multimodal;
use crate::providers::traits::{ChatMessage, Provider, ProviderCapabilities};
use crate::providers::ProviderRuntimeOptions;
use async_trait::async_trait;
use futures_util::StreamExt;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@ -472,8 +473,24 @@ fn extract_stream_error_message(event: &Value) -> Option<String> {
None
}
/// Read the response body incrementally via `bytes_stream()` to avoid
/// buffering the entire SSE payload in memory. The previous implementation
/// used `response.text().await?` which holds the HTTP connection open until
/// every byte has arrived — on high-latency links the long-lived connection
/// often drops mid-read, producing the "error decoding response body" failure
/// reported in #3544.
async fn decode_responses_body(response: reqwest::Response) -> anyhow::Result<String> {
let body = response.text().await?;
let mut body = String::new();
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let bytes = chunk
.map_err(|err| anyhow::anyhow!("error reading OpenAI Codex response stream: {err}"))?;
let text = std::str::from_utf8(&bytes).map_err(|err| {
anyhow::anyhow!("OpenAI Codex response contained invalid UTF-8: {err}")
})?;
body.push_str(text);
}
if let Some(text) = parse_sse_text(&body)? {
return Ok(text);