Documentation Index
Fetch the complete documentation index at: https://docs.instructorphp.com/llms.txt
Use this file to discover all available pages before exploring further.
Overview
This example uses InferenceRuntime with streaming enabled and shows the
Langfuse connection inline while sending the full LLM and HTTP lifecycle —
including the complete response body — to Langfuse.
For streaming responses the HTTP span stays open while chunks arrive and closes
only when the stream is exhausted (HttpStreamCompleted). By enabling
captureStreamingChunks: true each SSE chunk is also recorded as a log event
under the http.client.request span, which is useful for debugging but should
be left off in production.
Key concepts:
- explicit
LangfuseConfig / LangfuseHttpTransport / LangfuseExporter setup
withStreaming(): requests a server-sent-events stream from the LLM provider
HttpClientTelemetryProjector($hub, captureStreamingChunks: true): records
each chunk and closes the HTTP span with the full body on stream completion
HttpStreamCompleted: new event fired when the stream generator is exhausted
Telemetry::flush(): must be called after the stream is consumed
Example
<?php
require 'examples/boot.php';
use Cognesy\Config\Env;
use Cognesy\Events\Dispatchers\EventDispatcher;
use Cognesy\Http\Telemetry\HttpClientTelemetryProjector;
use Cognesy\Messages\Messages;
use Cognesy\Polyglot\Inference\Inference;
use Cognesy\Polyglot\Inference\InferenceRuntime;
use Cognesy\Polyglot\Inference\LLMProvider;
use Cognesy\Polyglot\Telemetry\PolyglotTelemetryProjector;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseConfig;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseExporter;
use Cognesy\Telemetry\Adapters\Langfuse\LangfuseHttpTransport;
use Cognesy\Telemetry\Application\Registry\TraceRegistry;
use Cognesy\Telemetry\Application\Telemetry;
use Cognesy\Telemetry\Application\Projector\CompositeTelemetryProjector;
use Cognesy\Telemetry\Application\Projector\RuntimeEventBridge;
$serviceName = 'examples.b03.telemetry-streaming-langfuse';
$baseUrl = (string) Env::get('LANGFUSE_BASE_URL', '');
if ($baseUrl === '') {
throw new RuntimeException('Set LANGFUSE_BASE_URL in .env to run this example.');
}
$publicKey = (string) Env::get('LANGFUSE_PUBLIC_KEY', '');
if ($publicKey === '') {
throw new RuntimeException('Set LANGFUSE_PUBLIC_KEY in .env to run this example.');
}
$secretKey = (string) Env::get('LANGFUSE_SECRET_KEY', '');
if ($secretKey === '') {
throw new RuntimeException('Set LANGFUSE_SECRET_KEY in .env to run this example.');
}
$events = new EventDispatcher($serviceName);
$hub = new Telemetry(
registry: new TraceRegistry(),
exporter: new LangfuseExporter(
transport: new LangfuseHttpTransport(new LangfuseConfig(
baseUrl: $baseUrl,
publicKey: $publicKey,
secretKey: $secretKey,
)),
),
);
(new RuntimeEventBridge(new CompositeTelemetryProjector([
new PolyglotTelemetryProjector($hub),
// captureStreamingChunks: true logs every SSE chunk as an event under
// the http.client.request span — great for debugging, disable in production
new HttpClientTelemetryProjector($hub, captureStreamingChunks: true),
])))->attachTo($events);
$runtime = InferenceRuntime::fromProvider(
provider: LLMProvider::using('openai'),
events: $events,
);
$stream = Inference::fromRuntime($runtime)
->with(
messages: Messages::fromString('Explain in 3 bullet points why distributed tracing matters for streaming AI responses.'),
options: ['max_tokens' => 200],
)
->withStreaming()
->stream();
echo "Response (streaming):\n";
$fullContent = '';
foreach ($stream->deltas() as $delta) {
echo $delta->contentDelta;
$fullContent .= $delta->contentDelta;
}
echo "\n\n";
// Flush AFTER the stream is fully consumed: HttpStreamCompleted fires when
// the generator above is exhausted, carrying the full raw HTTP response body.
$hub->flush();
echo "Telemetry: flushed to Langfuse\n";
assert($fullContent !== '');
?>