Skip to main content

Stream Package Cheatsheet

Code-verified reference for packages/stream.

Core API

use Cognesy\Stream\Transformation;
use Cognesy\Stream\Transform\Filter\Transducers\Filter;
use Cognesy\Stream\Transform\Limit\Transducers\TakeN;
use Cognesy\Stream\Transform\Map\Transducers\Map;

$pipeline = Transformation::define(
    new Map(fn($x) => $x * 2),
    new Filter(fn($x) => $x > 5),
    new TakeN(3),
);

$result = $pipeline->executeOn([1, 2, 3, 4, 5, 6]);
// [6, 8, 10]

Transformation Methods

use Cognesy\Stream\Sinks\ToStringReducer;

$pipeline = Transformation::define();

$pipeline = $pipeline
    ->through($transducerA, $transducerB)
    ->withSink(new ToStringReducer(separator: ', '))
    ->withInput($iterable);

$result = $pipeline->execute();
$result = $pipeline->executeOn($iterable);

$execution = $pipeline->execution();
$iterator = $pipeline->iterator();
$bufferedIterator = $pipeline->iterator(buffered: true);

$other = Transformation::define($transducerC);
$combined1 = $pipeline->before($other);
$combined2 = $pipeline->after($other);

TransformationStream (Lazy Stream)

use Cognesy\Stream\TransformationStream;
use Cognesy\Stream\Transform\Filter\Transducers\Filter;
use Cognesy\Stream\Transform\Map\Transducers\Map;

$stream = TransformationStream::from([1, 2, 3, 4])
    ->through(new Map(fn($x) => $x * 2))
    ->through(new Filter(fn($x) => $x > 4));

foreach ($stream as $value) {
    // 6, 8
}

$all = $stream->getCompleted(); // [6, 8]
TransformationStream is emission-oriented:
  • getCompleted() returns the values emitted by the stream
  • completed streams retain emitted values in memory so they can be replayed
  • custom sinks on a reused Transformation are ignored
  • use Transformation::execute() / executeOn() when you need sink-specific completion results
Use predefined transformation:
$normalize = Transformation::define(
    new Map(fn($x) => trim((string) $x)),
    new Filter(fn($x) => $x !== ''),
);

$stream = TransformationStream::from(['  foo  ', '', ' bar '])
    ->using($normalize);

Tee (Split One Stream Into Many)

use Cognesy\Stream\Support\Tee;

[$left, $right] = Tee::split([1, 2, 3, 4], branches: 2);

$leftItems = iterator_to_array($left, false);
$rightItems = iterator_to_array($right, false);
Unused branches still count as active until they are released. Drop never-consumed branches promptly if another branch may run far ahead.

Source Facades

Array

use Cognesy\Stream\Sources\Array\ArrayStream;

$stream = ArrayStream::from([1, 2, 3]);

Filesystem

use Cognesy\Stream\Sources\Filesystem\DirectoryStream;
use Cognesy\Stream\Sources\Filesystem\FileStream;

$file = FileStream::fromPath('data.txt');
$lines = $file->lines(dropEmpty: true);
$chunks = $file->chunks(4096);
$bytes = $file->bytes();
$chars = $file->chars(); // alias for bytes()

$dir = DirectoryStream::from('/tmp')->withExtensions('php', 'md');
$files = $dir->files(recursive: true);   // default true
$dirs = $dir->dirs(recursive: true);     // default true
$any = $dir->any(recursive: true);       // default true

CSV / JSONL

use Cognesy\Stream\Sources\Csv\CsvStream;
use Cognesy\Stream\Sources\Json\JsonlStream;

$csvRows = CsvStream::fromPath('data.csv', delimiter: ',', enclosure: '"', escape: '\\')->rows();
$csvAssocRows = CsvStream::fromPath('data.csv')->rowsAssoc();

$jsonlLines = JsonlStream::fromPath('data.jsonl')->lines();
$jsonlDecoded = JsonlStream::fromPath('data.jsonl')->decoded(assoc: true);

Text

use Cognesy\Stream\Sources\Text\TextStream;

$text = TextStream::from("Hello world\nNext line");
$chars = $text->chars();
$lines = $text->lines(eol: PHP_EOL, dropEmpty: false);
$words = $text->words(pattern: "\\w+");

HTTP Chunks

use Cognesy\Stream\Sources\Http\HttpStream;
use Generator;

$chunks = (function (): Generator {
    yield "data: one\n\n";
    yield "data: two\n\n";
})();

$http = HttpStream::from($chunks);
$bytes = $http->bytes();
$lines = $http->lines();
$events = $http->events();

Transducers — Map

use Cognesy\Stream\Transform\Map\Transducers\Map;
use Cognesy\Stream\Transform\Map\Transducers\MapIndexed;
use Cognesy\Stream\Transform\Map\Transducers\Replace;

new Map(fn($x) => $x * 2);                        // transform each element
new MapIndexed(fn($value, int $index) => "$index:$value"); // transform with index
new Replace(['a' => 'alpha', 'b' => 'beta']);      // substitute matching values via lookup map

Transducers — Filter

use Cognesy\Stream\Transform\Filter\Transducers\Filter;
use Cognesy\Stream\Transform\Filter\Transducers\Keep;
use Cognesy\Stream\Transform\Filter\Transducers\Remove;

new Filter(fn($x) => $x > 0);       // keep elements where predicate returns true
new Remove(fn($x) => $x < 0);       // drop elements where predicate returns true (inverse of Filter)
new Keep(fn($x) => $x['name'] ?? null); // apply fn, keep result only when non-null

Transducers — Limit

use Cognesy\Stream\Transform\Limit\Transducers\TakeN;
use Cognesy\Stream\Transform\Limit\Transducers\TakeLast;
use Cognesy\Stream\Transform\Limit\Transducers\TakeNth;
use Cognesy\Stream\Transform\Limit\Transducers\TakeWhile;
use Cognesy\Stream\Transform\Limit\Transducers\TakeUntil;
use Cognesy\Stream\Transform\Limit\Transducers\DropFirst;
use Cognesy\Stream\Transform\Limit\Transducers\DropLast;
use Cognesy\Stream\Transform\Limit\Transducers\DropWhile;
use Cognesy\Stream\Transform\Limit\Transducers\DropUntil;
use Cognesy\Stream\Transform\Limit\Transducers\RandomSample;

new TakeN(10);                              // first N elements
new TakeLast(3);                            // last N elements (buffered)
new TakeNth(2);                             // every Nth element (1-based)
new TakeWhile(fn($x) => $x !== null);      // take while predicate holds
new TakeUntil(fn($x) => $x === 'STOP');    // take until predicate becomes true
new DropFirst(2);                           // skip first N elements
new DropLast(2);                            // skip last N elements (buffered)
new DropWhile(fn($x) => $x < 5);           // skip while predicate holds
new DropUntil(fn($x) => $x >= 5);          // skip until predicate becomes true
new RandomSample(0.5);                      // keep each element with given probability (0.0-1.0)

Transducers — Flatten

use Cognesy\Stream\Transform\Flatten\Transducers\Cat;
use Cognesy\Stream\Transform\Flatten\Transducers\FlatMap;
use Cognesy\Stream\Transform\Flatten\Transducers\Flatten;

new Cat();                                  // concatenate inner iterables: [[1,2],[3]] -> [1,2,3]
new FlatMap(fn($x) => [$x, $x * 10]);      // map then flatten one level
new Flatten(depth: 2);                      // flatten nested structure up to given depth (default 1)

Transducers — Group

use Cognesy\Stream\Transform\Group\Transducers\Chunk;
use Cognesy\Stream\Transform\Group\Transducers\Pairwise;
use Cognesy\Stream\Transform\Group\Transducers\PartitionBy;
use Cognesy\Stream\Transform\Group\Transducers\SlidingWindow;

new Chunk(100);                             // group into fixed-size arrays
new Pairwise();                             // emit [prev, current] pairs (SlidingWindow(2))
new PartitionBy(fn($x) => $x['type']);      // group consecutive elements sharing same key
new SlidingWindow(3);                       // emit sliding windows of given size

Transducers — Deduplicate

use Cognesy\Stream\Transform\Deduplicate\Transducers\Distinct;
use Cognesy\Stream\Transform\Deduplicate\Transducers\DistinctBy;
use Cognesy\Stream\Transform\Deduplicate\Transducers\CollapseRepetitions;

new Distinct();                             // remove duplicate values (tracks seen set)
new DistinctBy(fn($x) => $x['id']);         // deduplicate by key function
new CollapseRepetitions();                  // collapse consecutive equal elements into one
new CollapseRepetitions(fn($x) => $x['category']); // collapse by optional key function

Transducers — Combine

use Cognesy\Stream\Transform\Combine\Transducers\Append;
use Cognesy\Stream\Transform\Combine\Transducers\Prepend;
use Cognesy\Stream\Transform\Combine\Transducers\Interleave;
use Cognesy\Stream\Transform\Combine\Transducers\Interpose;
use Cognesy\Stream\Transform\Combine\Transducers\Zip;
use Cognesy\Stream\Transform\Combine\Transducers\ZipWith;

new Append('x', 'y');                       // append values after stream ends
new Prepend('a', 'b');                      // prepend values before stream starts
new Interleave([10, 20, 30]);               // interleave elements from other iterables
new Interpose(',');                         // insert separator between every element
new Zip([10, 20, 30]);                      // pair elements with other iterables into arrays
new ZipWith(fn($a, $b) => $a + $b, [10, 20]); // zip with custom combine function

Transducers — Accumulate

use Cognesy\Stream\Transform\Accumulate\Transducers\Scan;

new Scan(
    scanFn: fn($state, $value) => $state + $value,
    initialState: 0,
);  // running accumulation -- emits intermediate state at each step

Transducers — Repeat

use Cognesy\Stream\Transform\Repeat\Transducers\Repeat;
use Cognesy\Stream\Transform\Repeat\Transducers\Cycle;

new Repeat(3);       // emit each element 3 times consecutively
new Cycle(2);        // replay entire input 2 times (null = infinite)

Transducers — Misc

use Cognesy\Stream\Transform\Misc\Transducers\Tap;
use Cognesy\Stream\Transform\Misc\Transducers\TryCatch;
use Cognesy\Stream\Transform\Misc\Transducers\Compose;
use Throwable;

new Tap(fn($x) => logger()->info($x));     // side-effect without modifying values
new TryCatch(
    tryFn: fn($x) => riskyOperation($x),
    onError: fn(Throwable $e, mixed $value) => null,  // optional error handler
    throwOnError: false,                               // default true
);
Compose::from(new Map(fn($x) => $x + 1), new Filter(fn($x) => $x > 0)); // compose transducers

Transducers — Result

Operates on Cognesy\Utils\Result\Result values (success/failure monads).
use Cognesy\Stream\Transform\Result\Transducers\WrapResult;
use Cognesy\Stream\Transform\Result\Transducers\UnwrapResult;
use Cognesy\Stream\Transform\Result\Transducers\UnwrapResultOr;
use Cognesy\Stream\Transform\Result\Transducers\MapResult;
use Cognesy\Stream\Transform\Result\Transducers\MapErrorResult;
use Cognesy\Stream\Transform\Result\Transducers\ThenResult;
use Cognesy\Stream\Transform\Result\Transducers\FilterSuccess;
use Cognesy\Stream\Transform\Result\Transducers\FilterFailure;
use Cognesy\Stream\Transform\Result\Transducers\EnsureResult;
use Cognesy\Stream\Transform\Result\Transducers\RecoverResult;
use Cognesy\Stream\Transform\Result\Transducers\TapResult;
use Cognesy\Stream\Transform\Result\Transducers\PartitionResults;

new WrapResult();                              // wrap plain values in Result::success()
new UnwrapResult();                            // unwrap successes, skip failures
new UnwrapResultOr(defaultValue: 0);           // unwrap successes, use default for failures
new MapResult(fn($val) => $val * 2);           // map over success values, pass failures through
new MapErrorResult(fn($err) => "Err: $err");   // map over failure values, pass successes through
new ThenResult(fn($id) => $repo->find($id));   // chain Result-returning fn (flatMap for Result)
new FilterSuccess();                           // keep only successful Results
new FilterFailure();                           // keep only failed Results
new EnsureResult(
    conditionFn: fn($x) => $x > 0,
    errorMessage: fn($x) => "Must be positive, got $x",  // also accepts plain string
);                                             // validate success values, convert to failure if check fails
new RecoverResult(fn($error) => fallback());   // convert failures to successes via recovery fn
new TapResult(
    onSuccess: fn($val) => log("ok: $val"),    // optional
    onFailure: fn($err) => log("err: $err"),   // optional
);                                             // side-effects based on Result state
new PartitionResults();                        // accumulate into ['successes' => [...], 'failures' => [...]]

Common Sinks (Reducers)

Collection

use Cognesy\Stream\Sinks\ToArrayReducer;
use Cognesy\Stream\Sinks\ToStringReducer;
use Cognesy\Stream\Sinks\GroupByReducer;

new ToArrayReducer();
new ToStringReducer(separator: ', ', prefix: '[', suffix: ']');
new GroupByReducer(fn($x) => $x['type']);

Selection

use Cognesy\Stream\Sinks\Select\FindReducer;
use Cognesy\Stream\Sinks\Select\FirstReducer;
use Cognesy\Stream\Sinks\Select\LastReducer;

new FindReducer(fn($x) => $x > 10, default: null);
new FirstReducer(default: null);
new LastReducer(default: null);

Statistics

use Cognesy\Stream\Sinks\Stats\AverageReducer;
use Cognesy\Stream\Sinks\Stats\CountReducer;
use Cognesy\Stream\Sinks\Stats\FrequenciesReducer;
use Cognesy\Stream\Sinks\Stats\MaxReducer;
use Cognesy\Stream\Sinks\Stats\MinReducer;
use Cognesy\Stream\Sinks\Stats\SumReducer;

new SumReducer(mapFn: fn($x) => $x['price']);     // optional mapFn
new AverageReducer(mapFn: fn($x) => $x['score']); // optional mapFn
new CountReducer(predicateFn: fn($x) => $x > 0);  // optional predicate
new FrequenciesReducer(keyFn: fn($x) => $x['category']); // optional keyFn
new MaxReducer(keyFn: fn($x) => $x['age']);        // optional keyFn
new MinReducer(keyFn: fn($x) => $x['age']);        // optional keyFn

Boolean

use Cognesy\Stream\Sinks\Bool\MatchesAllReducer;
use Cognesy\Stream\Sinks\Bool\MatchesAnyReducer;
use Cognesy\Stream\Sinks\Bool\MatchesNoneReducer;

new MatchesAllReducer(fn($x) => $x > 0);   // true if all match; short-circuits on first non-match
new MatchesAnyReducer(fn($x) => $x < 0);   // true if any match; short-circuits on first match
new MatchesNoneReducer(fn($x) => $x < 0);  // true if none match; short-circuits on first match

Side Effects

use Cognesy\Stream\Sinks\SideEffect\ForEachReducer;

new ForEachReducer(fn($x) => print($x)); // executes callback per element, returns count

Result Sinks

For streams of Cognesy\Utils\Result\Result objects:
use Cognesy\Stream\Transform\Result\Sinks\AllSuccessReducer;
use Cognesy\Stream\Transform\Result\Sinks\CollectErrorsReducer;
use Cognesy\Stream\Transform\Result\Sinks\CountFailureReducer;
use Cognesy\Stream\Transform\Result\Sinks\CountSuccessReducer;
use Cognesy\Stream\Transform\Result\Sinks\FirstSuccessReducer;
use Cognesy\Stream\Transform\Result\Sinks\PartitionResultsReducer;
use Cognesy\Stream\Transform\Result\Sinks\ToResultReducer;

new AllSuccessReducer();            // Result::success(true) if all succeed, else first failure
new FirstSuccessReducer();          // first successful Result, or Result::failure(...) if none
new CountSuccessReducer();          // int count of successes
new CountFailureReducer();          // int count of failures
new CollectErrorsReducer();         // array of error values from failed Results
new PartitionResultsReducer();      // ['successes' => [...], 'failures' => [...]]
new ToResultReducer();              // Result::success([values]) if all succeed, else Result::failure(first error)

Custom Transducer / Reducer

use Cognesy\Stream\Contracts\Reducer;
use Cognesy\Stream\Contracts\Transducer;

final readonly class MyTransducer implements Transducer
{
    public function __invoke(Reducer $reducer): Reducer {
        return new class($reducer) implements Reducer {
            public function __construct(private Reducer $next) {}

            public function init(): mixed {
                return $this->next->init();
            }

            public function step(mixed $accumulator, mixed $reducible): mixed {
                $transformed = $reducible;
                return $this->next->step($accumulator, $transformed);
            }

            public function complete(mixed $accumulator): mixed {
                return $this->next->complete($accumulator);
            }
        };
    }
}