Streaming responses are a powerful feature that allows processing data as it arrives from the server, rather than waiting for the entire response to be received. This is particularly valuable when:
- Working with large responses that might exceed memory limits
- Processing real-time data streams
- Handling responses from AI models that generate content token by token
- Building user interfaces that show progressive updates
The Instructor HTTP client API provides robust support for streaming responses across all supported HTTP client implementations.
Enabling Streaming
To receive a streaming response, you need to configure the request with the stream
option set to true
:
use Cognesy\Http\HttpClient;
use Cognesy\Http\Data\HttpClientRequest;
// Create a streaming request
$request = new HttpClientRequest(
url: 'https://api.example.com/stream',
method: 'GET',
headers: [
'Accept' => 'text/event-stream',
],
body: [],
options: [
'stream' => true, // Enable streaming
]
);
// Or use the withStreaming method on an existing request
$streamingRequest = $request->withStreaming(true);
// Create a client and send the request
$client = new HttpClient();
$response = $client->handle($streamingRequest);
The stream
option tells the HTTP client to treat the response as a stream, which means:
- It won’t buffer the entire response in memory
- It will provide a way to read the response incrementally
- The connection will remain open until all data is received or the stream is closed
Processing Streamed Data
Once you have a streaming response, you can process it using the stream()
method, which returns a PHP Generator:
$response = $client->handle($streamingRequest);
// Process the stream chunk by chunk
foreach ($response->stream() as $chunk) {
// Process each chunk of data as it arrives
echo "Received chunk: $chunk\n";
// You could parse JSON chunks, update progress, etc.
// If this is a streaming JSON response, you might need to buffer until
// you have complete JSON objects
}
By default, the stream()
method reads the response in small chunks. You can control the chunk size by passing a parameter:
// Read in chunks of 1024 bytes
foreach ($response->stream(1024) as $chunk) {
// Process larger chunks of data
echo "Received chunk of approximately 1KB: $chunk\n";
}
Example: Downloading a Large File
Here’s an example of downloading a large file with streaming to avoid memory issues:
use Cognesy\Http\HttpClient;
use Cognesy\Http\Data\HttpClientRequest;
use Cognesy\Http\Exceptions\RequestException;
// Create a streaming request
$request = new HttpClientRequest(
url: 'https://example.com/large-file.zip',
method: 'GET',
headers: [],
body: [],
options: ['stream' => true]
);
$client = new HttpClient();
try {
$response = $client->handle($request);
// Open a file handle to save the file
$fileHandle = fopen('downloaded-file.zip', 'wb');
if (!$fileHandle) {
throw new \RuntimeException("Could not open file for writing");
}
// Keep track of bytes received
$totalBytes = 0;
// Process the stream and write to file
foreach ($response->stream(8192) as $chunk) {
fwrite($fileHandle, $chunk);
$totalBytes += strlen($chunk);
// Display progress (if not in a web request)
echo "\rDownloaded: " . number_format($totalBytes / 1024 / 1024, 2) . " MB";
}
// Close the file handle
fclose($fileHandle);
echo "\nDownload complete!\n";
} catch (RequestException $e) {
echo "Download failed: {$e->getMessage()}\n";
// Clean up if file was partially downloaded
if (isset($fileHandle) && is_resource($fileHandle)) {
fclose($fileHandle);
}
if (file_exists('downloaded-file.zip')) {
unlink('downloaded-file.zip');
}
}
This approach allows downloading very large files without loading the entire file into memory.
Example: Processing Server-Sent Events (SSE)
Server-Sent Events (SSE) are a common streaming format used by many APIs. Here’s how to process them:
$request = new HttpClientRequest(
url: 'https://api.example.com/events',
method: 'GET',
headers: [
'Accept' => 'text/event-stream',
'Cache-Control' => 'no-cache',
],
body: [],
options: ['stream' => true]
);
$response = $client->handle($request);
$buffer = '';
foreach ($response->stream() as $chunk) {
// Add the chunk to our buffer
$buffer .= $chunk;
// Process complete events (SSE events are separated by double newlines)
while (($pos = strpos($buffer, "\n\n")) !== false) {
// Extract and process the event
$event = substr($buffer, 0, $pos);
$buffer = substr($buffer, $pos + 2);
// Parse the event (SSE format: "field: value")
$parsedEvent = [];
foreach (explode("\n", $event) as $line) {
if (preg_match('/^([^:]+):\s*(.*)$/', $line, $matches)) {
$field = $matches[1];
$value = $matches[2];
$parsedEvent[$field] = $value;
}
}
// Process the parsed event
if (isset($parsedEvent['event'], $parsedEvent['data'])) {
$eventType = $parsedEvent['event'];
$eventData = $parsedEvent['data'];
echo "Received event type: $eventType\n";
echo "Event data: $eventData\n";
// You could also parse the data as JSON if appropriate
if ($eventType === 'update') {
$data = json_decode($eventData, true);
if ($data) {
echo "Processed update: {$data['message']}\n";
}
}
}
}
}
While this works, processing streaming responses line by line is common enough that the library provides a dedicated middleware for it, as we’ll see in the next section.
Line-by-Line Processing
For many streaming APIs, especially those that send event streams or line-delimited JSON, it’s useful to process the response line by line. The library provides the StreamByLineMiddleware
to simplify this task:
use Cognesy\Http\HttpClient;
use Cognesy\Http\Data\HttpClientRequest;
use Cognesy\Http\Middleware\StreamByLine\StreamByLineMiddleware;
// Create a client with the StreamByLineMiddleware
$client = new HttpClient();
$client->withMiddleware(new StreamByLineMiddleware());
// Create a streaming request
$request = new HttpClientRequest(
url: 'https://api.example.com/events',
method: 'GET',
headers: [],
body: [],
options: ['stream' => true]
);
$response = $client->handle($request);
// Process the stream line by line
foreach ($response->stream() as $line) {
// Each $line is a complete line from the response
echo "Received line: $line\n";
// Parse the line (e.g., as JSON)
$event = json_decode($line, true);
if ($event) {
// Process the event
echo "Event type: {$event['type']}\n";
}
}
Customizing Line Processing
You can customize how lines are parsed by providing a parser function to the middleware:
$lineParser = function (string $line) {
// Pre-process each line before yielding it
$trimmedLine = trim($line);
if (empty($trimmedLine)) {
return null; // Skip empty lines
}
return $trimmedLine;
};
$client->withMiddleware(new StreamByLineMiddleware($lineParser));
If your parser returns null
, that line will be skipped in the stream.
Example: Processing OpenAI Chat Completions
Here’s a practical example of using the StreamByLineMiddleware
to process streaming responses from the OpenAI API:
use Cognesy\Http\HttpClient;
use Cognesy\Http\Data\HttpClientRequest;
use Cognesy\Http\Middleware\StreamByLine\StreamByLineMiddleware;
// OpenAI API requires a parser that handles their SSE format
$openAiParser = function (string $line) {
// Skip empty lines
if (trim($line) === '') {
return null;
}
// Remove "data: " prefix from each line
if (strpos($line, 'data: ') === 0) {
$line = substr($line, 6);
// Skip the "[DONE]" message
if ($line === '[DONE]') {
return null;
}
// Return the parsed line
return $line;
}
return null; // Skip non-data lines
};
// Create a client with the StreamByLineMiddleware
$client = new HttpClient('guzzle'); // Use Guzzle for better streaming support
$client->withMiddleware(new StreamByLineMiddleware($openAiParser));
// Create a request to OpenAI API
$request = new HttpClientRequest(
url: 'https://api.openai.com/v1/chat/completions',
method: 'POST',
headers: [
'Content-Type' => 'application/json',
'Authorization' => 'Bearer ' . $apiKey,
],
body: [
'model' => 'gpt-3.5-turbo',
'messages' => [
['role' => 'user', 'content' => 'Write a short poem about coding.'],
],
'stream' => true,
],
options: ['stream' => true]
);
try {
$response = $client->handle($request);
$fullResponse = '';
// Process the streaming response
foreach ($response->stream() as $chunk) {
// Parse the chunk as JSON
$data = json_decode($chunk, true);
if ($data && isset($data['choices'][0]['delta']['content'])) {
$content = $data['choices'][0]['delta']['content'];
$fullResponse .= $content;
// Print each piece as it arrives
echo $content;
flush(); // Ensure output is sent immediately
}
}
echo "\n\nFull response:\n$fullResponse\n";
} catch (Exception $e) {
echo "Error: {$e->getMessage()}\n";
}
This approach allows you to display the AI-generated content to the user in real-time as it’s being generated, providing a more responsive user experience.
Considerations for Streaming
When working with streaming responses, keep these considerations in mind:
-
Memory Usage: While streaming reduces memory usage overall, be careful not to accumulate the entire response in memory by appending to a variable unless necessary.
-
Connection Stability: Streaming connections can be more sensitive to network issues. Consider implementing error handling and retry logic for more robust applications.
-
Server Timeouts: Some servers or proxies might timeout long-running connections. Make sure your infrastructure is configured to allow the necessary connection times.
-
Middleware Order: When using middleware that processes streaming responses, the order of middleware can be important. Middleware is executed in the order it’s added to the stack.
In the next chapter, we’ll explore how to make multiple concurrent requests using request pools, which can significantly improve performance when fetching data from multiple endpoints.