Skip to main content

Overview

This example uses InferenceRuntime with streaming enabled and shows the Logfire connection inline while sending the full LLM and HTTP lifecycle — including the complete response body — to Logfire. For streaming responses the HTTP span stays open while chunks arrive and closes only when the stream is exhausted (HttpStreamCompleted). By enabling captureStreamingChunks: true each SSE chunk is also recorded as a log event under the http.client.request span, which is useful for debugging but should be left off in production. Key concepts:
  • explicit LogfireConfig / LogfireExporter setup
  • withStreaming(): requests a server-sent-events stream from the LLM provider
  • HttpClientTelemetryProjector($hub, captureStreamingChunks: true): records each chunk and closes the HTTP span with the full body on stream completion
  • HttpStreamCompleted: new event fired when the stream generator is exhausted
  • Telemetry::flush(): must be called after the stream is consumed

Example

<?php
require 'examples/boot.php';

use Cognesy\Config\Env;
use Cognesy\Events\Dispatchers\EventDispatcher;
use Cognesy\Http\Telemetry\HttpClientTelemetryProjector;
use Cognesy\Messages\Messages;
use Cognesy\Polyglot\Inference\Inference;
use Cognesy\Polyglot\Inference\InferenceRuntime;
use Cognesy\Polyglot\Inference\LLMProvider;
use Cognesy\Polyglot\Telemetry\PolyglotTelemetryProjector;
use Cognesy\Telemetry\Adapters\Logfire\LogfireConfig;
use Cognesy\Telemetry\Adapters\Logfire\LogfireExporter;
use Cognesy\Telemetry\Application\Registry\TraceRegistry;
use Cognesy\Telemetry\Application\Telemetry;
use Cognesy\Telemetry\Application\Projector\CompositeTelemetryProjector;
use Cognesy\Telemetry\Application\Projector\RuntimeEventBridge;

$serviceName = 'examples.b03.telemetry-streaming-logfire';
$token = (string) Env::get('LOGFIRE_TOKEN', '');
if ($token === '') {
    throw new RuntimeException('Set LOGFIRE_TOKEN in .env to run this example.');
}
$endpoint = (string) Env::get('LOGFIRE_OTLP_ENDPOINT', '');
if ($endpoint === '') {
    throw new RuntimeException('Set LOGFIRE_OTLP_ENDPOINT in .env to run this example.');
}

$events = new EventDispatcher($serviceName);
$hub = new Telemetry(
    registry: new TraceRegistry(),
    exporter: new LogfireExporter(new LogfireConfig(
        endpoint: rtrim($endpoint, '/'),
        serviceName: $serviceName,
        headers: ['Authorization' => $token],
    )),
);

(new RuntimeEventBridge(new CompositeTelemetryProjector([
    new PolyglotTelemetryProjector($hub),
    // captureStreamingChunks: true logs every SSE chunk as an event under
    // the http.client.request span — great for debugging, disable in production
    new HttpClientTelemetryProjector($hub, captureStreamingChunks: true),
])))->attachTo($events);

$runtime = InferenceRuntime::fromProvider(
    provider: LLMProvider::using('openai'),
    events: $events,
);

$stream = Inference::fromRuntime($runtime)
    ->with(
        messages: Messages::fromString('Explain in 3 bullet points why distributed tracing matters for streaming AI responses.'),
        options: ['max_tokens' => 200],
    )
    ->withStreaming()
    ->stream();

echo "Response (streaming):\n";
$fullContent = '';
foreach ($stream->deltas() as $delta) {
    echo $delta->contentDelta;
    $fullContent .= $delta->contentDelta;
}
echo "\n\n";

// Flush AFTER the stream is fully consumed: HttpStreamCompleted fires when
// the generator above is exhausted, carrying the full raw HTTP response body.
$hub->flush();

echo "Telemetry: flushed to Logfire\n";

assert($fullContent !== '');
?>