Stream Package Cheatsheet
Code-verified reference forpackages/stream.
Core API
Copy
use Cognesy\Stream\Transformation;
use Cognesy\Stream\Transform\Filter\Transducers\Filter;
use Cognesy\Stream\Transform\Limit\Transducers\TakeN;
use Cognesy\Stream\Transform\Map\Transducers\Map;
$pipeline = Transformation::define(
new Map(fn($x) => $x * 2),
new Filter(fn($x) => $x > 5),
new TakeN(3),
);
$result = $pipeline->executeOn([1, 2, 3, 4, 5, 6]);
// [6, 8, 10]
Transformation Methods
Copy
use Cognesy\Stream\Sinks\ToStringReducer;
$pipeline = Transformation::define();
$pipeline = $pipeline
->through($transducerA, $transducerB)
->withSink(new ToStringReducer(separator: ', '))
->withInput($iterable);
$result = $pipeline->execute();
$result = $pipeline->executeOn($iterable);
$execution = $pipeline->execution();
$iterator = $pipeline->iterator();
$bufferedIterator = $pipeline->iterator(buffered: true);
$other = Transformation::define($transducerC);
$combined1 = $pipeline->before($other);
$combined2 = $pipeline->after($other);
TransformationStream (Lazy Stream)
Copy
use Cognesy\Stream\TransformationStream;
use Cognesy\Stream\Transform\Filter\Transducers\Filter;
use Cognesy\Stream\Transform\Map\Transducers\Map;
$stream = TransformationStream::from([1, 2, 3, 4])
->through(new Map(fn($x) => $x * 2))
->through(new Filter(fn($x) => $x > 4));
foreach ($stream as $value) {
// 6, 8
}
$all = $stream->getCompleted(); // [6, 8]
TransformationStream is emission-oriented:
getCompleted()returns the values emitted by the stream- completed streams retain emitted values in memory so they can be replayed
- custom sinks on a reused
Transformationare ignored - use
Transformation::execute()/executeOn()when you need sink-specific completion results
Copy
$normalize = Transformation::define(
new Map(fn($x) => trim((string) $x)),
new Filter(fn($x) => $x !== ''),
);
$stream = TransformationStream::from([' foo ', '', ' bar '])
->using($normalize);
Tee (Split One Stream Into Many)
Copy
use Cognesy\Stream\Support\Tee;
[$left, $right] = Tee::split([1, 2, 3, 4], branches: 2);
$leftItems = iterator_to_array($left, false);
$rightItems = iterator_to_array($right, false);
Source Facades
Array
Copy
use Cognesy\Stream\Sources\Array\ArrayStream;
$stream = ArrayStream::from([1, 2, 3]);
Filesystem
Copy
use Cognesy\Stream\Sources\Filesystem\DirectoryStream;
use Cognesy\Stream\Sources\Filesystem\FileStream;
$file = FileStream::fromPath('data.txt');
$lines = $file->lines(dropEmpty: true);
$chunks = $file->chunks(4096);
$bytes = $file->bytes();
$chars = $file->chars(); // alias for bytes()
$dir = DirectoryStream::from('/tmp')->withExtensions('php', 'md');
$files = $dir->files(recursive: true); // default true
$dirs = $dir->dirs(recursive: true); // default true
$any = $dir->any(recursive: true); // default true
CSV / JSONL
Copy
use Cognesy\Stream\Sources\Csv\CsvStream;
use Cognesy\Stream\Sources\Json\JsonlStream;
$csvRows = CsvStream::fromPath('data.csv', delimiter: ',', enclosure: '"', escape: '\\')->rows();
$csvAssocRows = CsvStream::fromPath('data.csv')->rowsAssoc();
$jsonlLines = JsonlStream::fromPath('data.jsonl')->lines();
$jsonlDecoded = JsonlStream::fromPath('data.jsonl')->decoded(assoc: true);
Text
Copy
use Cognesy\Stream\Sources\Text\TextStream;
$text = TextStream::from("Hello world\nNext line");
$chars = $text->chars();
$lines = $text->lines(eol: PHP_EOL, dropEmpty: false);
$words = $text->words(pattern: "\\w+");
HTTP Chunks
Copy
use Cognesy\Stream\Sources\Http\HttpStream;
use Generator;
$chunks = (function (): Generator {
yield "data: one\n\n";
yield "data: two\n\n";
})();
$http = HttpStream::from($chunks);
$bytes = $http->bytes();
$lines = $http->lines();
$events = $http->events();
Transducers — Map
Copy
use Cognesy\Stream\Transform\Map\Transducers\Map;
use Cognesy\Stream\Transform\Map\Transducers\MapIndexed;
use Cognesy\Stream\Transform\Map\Transducers\Replace;
new Map(fn($x) => $x * 2); // transform each element
new MapIndexed(fn($value, int $index) => "$index:$value"); // transform with index
new Replace(['a' => 'alpha', 'b' => 'beta']); // substitute matching values via lookup map
Transducers — Filter
Copy
use Cognesy\Stream\Transform\Filter\Transducers\Filter;
use Cognesy\Stream\Transform\Filter\Transducers\Keep;
use Cognesy\Stream\Transform\Filter\Transducers\Remove;
new Filter(fn($x) => $x > 0); // keep elements where predicate returns true
new Remove(fn($x) => $x < 0); // drop elements where predicate returns true (inverse of Filter)
new Keep(fn($x) => $x['name'] ?? null); // apply fn, keep result only when non-null
Transducers — Limit
Copy
use Cognesy\Stream\Transform\Limit\Transducers\TakeN;
use Cognesy\Stream\Transform\Limit\Transducers\TakeLast;
use Cognesy\Stream\Transform\Limit\Transducers\TakeNth;
use Cognesy\Stream\Transform\Limit\Transducers\TakeWhile;
use Cognesy\Stream\Transform\Limit\Transducers\TakeUntil;
use Cognesy\Stream\Transform\Limit\Transducers\DropFirst;
use Cognesy\Stream\Transform\Limit\Transducers\DropLast;
use Cognesy\Stream\Transform\Limit\Transducers\DropWhile;
use Cognesy\Stream\Transform\Limit\Transducers\DropUntil;
use Cognesy\Stream\Transform\Limit\Transducers\RandomSample;
new TakeN(10); // first N elements
new TakeLast(3); // last N elements (buffered)
new TakeNth(2); // every Nth element (1-based)
new TakeWhile(fn($x) => $x !== null); // take while predicate holds
new TakeUntil(fn($x) => $x === 'STOP'); // take until predicate becomes true
new DropFirst(2); // skip first N elements
new DropLast(2); // skip last N elements (buffered)
new DropWhile(fn($x) => $x < 5); // skip while predicate holds
new DropUntil(fn($x) => $x >= 5); // skip until predicate becomes true
new RandomSample(0.5); // keep each element with given probability (0.0-1.0)
Transducers — Flatten
Copy
use Cognesy\Stream\Transform\Flatten\Transducers\Cat;
use Cognesy\Stream\Transform\Flatten\Transducers\FlatMap;
use Cognesy\Stream\Transform\Flatten\Transducers\Flatten;
new Cat(); // concatenate inner iterables: [[1,2],[3]] -> [1,2,3]
new FlatMap(fn($x) => [$x, $x * 10]); // map then flatten one level
new Flatten(depth: 2); // flatten nested structure up to given depth (default 1)
Transducers — Group
Copy
use Cognesy\Stream\Transform\Group\Transducers\Chunk;
use Cognesy\Stream\Transform\Group\Transducers\Pairwise;
use Cognesy\Stream\Transform\Group\Transducers\PartitionBy;
use Cognesy\Stream\Transform\Group\Transducers\SlidingWindow;
new Chunk(100); // group into fixed-size arrays
new Pairwise(); // emit [prev, current] pairs (SlidingWindow(2))
new PartitionBy(fn($x) => $x['type']); // group consecutive elements sharing same key
new SlidingWindow(3); // emit sliding windows of given size
Transducers — Deduplicate
Copy
use Cognesy\Stream\Transform\Deduplicate\Transducers\Distinct;
use Cognesy\Stream\Transform\Deduplicate\Transducers\DistinctBy;
use Cognesy\Stream\Transform\Deduplicate\Transducers\CollapseRepetitions;
new Distinct(); // remove duplicate values (tracks seen set)
new DistinctBy(fn($x) => $x['id']); // deduplicate by key function
new CollapseRepetitions(); // collapse consecutive equal elements into one
new CollapseRepetitions(fn($x) => $x['category']); // collapse by optional key function
Transducers — Combine
Copy
use Cognesy\Stream\Transform\Combine\Transducers\Append;
use Cognesy\Stream\Transform\Combine\Transducers\Prepend;
use Cognesy\Stream\Transform\Combine\Transducers\Interleave;
use Cognesy\Stream\Transform\Combine\Transducers\Interpose;
use Cognesy\Stream\Transform\Combine\Transducers\Zip;
use Cognesy\Stream\Transform\Combine\Transducers\ZipWith;
new Append('x', 'y'); // append values after stream ends
new Prepend('a', 'b'); // prepend values before stream starts
new Interleave([10, 20, 30]); // interleave elements from other iterables
new Interpose(','); // insert separator between every element
new Zip([10, 20, 30]); // pair elements with other iterables into arrays
new ZipWith(fn($a, $b) => $a + $b, [10, 20]); // zip with custom combine function
Transducers — Accumulate
Copy
use Cognesy\Stream\Transform\Accumulate\Transducers\Scan;
new Scan(
scanFn: fn($state, $value) => $state + $value,
initialState: 0,
); // running accumulation -- emits intermediate state at each step
Transducers — Repeat
Copy
use Cognesy\Stream\Transform\Repeat\Transducers\Repeat;
use Cognesy\Stream\Transform\Repeat\Transducers\Cycle;
new Repeat(3); // emit each element 3 times consecutively
new Cycle(2); // replay entire input 2 times (null = infinite)
Transducers — Misc
Copy
use Cognesy\Stream\Transform\Misc\Transducers\Tap;
use Cognesy\Stream\Transform\Misc\Transducers\TryCatch;
use Cognesy\Stream\Transform\Misc\Transducers\Compose;
use Throwable;
new Tap(fn($x) => logger()->info($x)); // side-effect without modifying values
new TryCatch(
tryFn: fn($x) => riskyOperation($x),
onError: fn(Throwable $e, mixed $value) => null, // optional error handler
throwOnError: false, // default true
);
Compose::from(new Map(fn($x) => $x + 1), new Filter(fn($x) => $x > 0)); // compose transducers
Transducers — Result
Operates onCognesy\Utils\Result\Result values (success/failure monads).
Copy
use Cognesy\Stream\Transform\Result\Transducers\WrapResult;
use Cognesy\Stream\Transform\Result\Transducers\UnwrapResult;
use Cognesy\Stream\Transform\Result\Transducers\UnwrapResultOr;
use Cognesy\Stream\Transform\Result\Transducers\MapResult;
use Cognesy\Stream\Transform\Result\Transducers\MapErrorResult;
use Cognesy\Stream\Transform\Result\Transducers\ThenResult;
use Cognesy\Stream\Transform\Result\Transducers\FilterSuccess;
use Cognesy\Stream\Transform\Result\Transducers\FilterFailure;
use Cognesy\Stream\Transform\Result\Transducers\EnsureResult;
use Cognesy\Stream\Transform\Result\Transducers\RecoverResult;
use Cognesy\Stream\Transform\Result\Transducers\TapResult;
use Cognesy\Stream\Transform\Result\Transducers\PartitionResults;
new WrapResult(); // wrap plain values in Result::success()
new UnwrapResult(); // unwrap successes, skip failures
new UnwrapResultOr(defaultValue: 0); // unwrap successes, use default for failures
new MapResult(fn($val) => $val * 2); // map over success values, pass failures through
new MapErrorResult(fn($err) => "Err: $err"); // map over failure values, pass successes through
new ThenResult(fn($id) => $repo->find($id)); // chain Result-returning fn (flatMap for Result)
new FilterSuccess(); // keep only successful Results
new FilterFailure(); // keep only failed Results
new EnsureResult(
conditionFn: fn($x) => $x > 0,
errorMessage: fn($x) => "Must be positive, got $x", // also accepts plain string
); // validate success values, convert to failure if check fails
new RecoverResult(fn($error) => fallback()); // convert failures to successes via recovery fn
new TapResult(
onSuccess: fn($val) => log("ok: $val"), // optional
onFailure: fn($err) => log("err: $err"), // optional
); // side-effects based on Result state
new PartitionResults(); // accumulate into ['successes' => [...], 'failures' => [...]]
Common Sinks (Reducers)
Collection
Copy
use Cognesy\Stream\Sinks\ToArrayReducer;
use Cognesy\Stream\Sinks\ToStringReducer;
use Cognesy\Stream\Sinks\GroupByReducer;
new ToArrayReducer();
new ToStringReducer(separator: ', ', prefix: '[', suffix: ']');
new GroupByReducer(fn($x) => $x['type']);
Selection
Copy
use Cognesy\Stream\Sinks\Select\FindReducer;
use Cognesy\Stream\Sinks\Select\FirstReducer;
use Cognesy\Stream\Sinks\Select\LastReducer;
new FindReducer(fn($x) => $x > 10, default: null);
new FirstReducer(default: null);
new LastReducer(default: null);
Statistics
Copy
use Cognesy\Stream\Sinks\Stats\AverageReducer;
use Cognesy\Stream\Sinks\Stats\CountReducer;
use Cognesy\Stream\Sinks\Stats\FrequenciesReducer;
use Cognesy\Stream\Sinks\Stats\MaxReducer;
use Cognesy\Stream\Sinks\Stats\MinReducer;
use Cognesy\Stream\Sinks\Stats\SumReducer;
new SumReducer(mapFn: fn($x) => $x['price']); // optional mapFn
new AverageReducer(mapFn: fn($x) => $x['score']); // optional mapFn
new CountReducer(predicateFn: fn($x) => $x > 0); // optional predicate
new FrequenciesReducer(keyFn: fn($x) => $x['category']); // optional keyFn
new MaxReducer(keyFn: fn($x) => $x['age']); // optional keyFn
new MinReducer(keyFn: fn($x) => $x['age']); // optional keyFn
Boolean
Copy
use Cognesy\Stream\Sinks\Bool\MatchesAllReducer;
use Cognesy\Stream\Sinks\Bool\MatchesAnyReducer;
use Cognesy\Stream\Sinks\Bool\MatchesNoneReducer;
new MatchesAllReducer(fn($x) => $x > 0); // true if all match; short-circuits on first non-match
new MatchesAnyReducer(fn($x) => $x < 0); // true if any match; short-circuits on first match
new MatchesNoneReducer(fn($x) => $x < 0); // true if none match; short-circuits on first match
Side Effects
Copy
use Cognesy\Stream\Sinks\SideEffect\ForEachReducer;
new ForEachReducer(fn($x) => print($x)); // executes callback per element, returns count
Result Sinks
For streams ofCognesy\Utils\Result\Result objects:
Copy
use Cognesy\Stream\Transform\Result\Sinks\AllSuccessReducer;
use Cognesy\Stream\Transform\Result\Sinks\CollectErrorsReducer;
use Cognesy\Stream\Transform\Result\Sinks\CountFailureReducer;
use Cognesy\Stream\Transform\Result\Sinks\CountSuccessReducer;
use Cognesy\Stream\Transform\Result\Sinks\FirstSuccessReducer;
use Cognesy\Stream\Transform\Result\Sinks\PartitionResultsReducer;
use Cognesy\Stream\Transform\Result\Sinks\ToResultReducer;
new AllSuccessReducer(); // Result::success(true) if all succeed, else first failure
new FirstSuccessReducer(); // first successful Result, or Result::failure(...) if none
new CountSuccessReducer(); // int count of successes
new CountFailureReducer(); // int count of failures
new CollectErrorsReducer(); // array of error values from failed Results
new PartitionResultsReducer(); // ['successes' => [...], 'failures' => [...]]
new ToResultReducer(); // Result::success([values]) if all succeed, else Result::failure(first error)
Custom Transducer / Reducer
Copy
use Cognesy\Stream\Contracts\Reducer;
use Cognesy\Stream\Contracts\Transducer;
final readonly class MyTransducer implements Transducer
{
public function __invoke(Reducer $reducer): Reducer {
return new class($reducer) implements Reducer {
public function __construct(private Reducer $next) {}
public function init(): mixed {
return $this->next->init();
}
public function step(mixed $accumulator, mixed $reducible): mixed {
$transformed = $reducible;
return $this->next->step($accumulator, $transformed);
}
public function complete(mixed $accumulator): mixed {
return $this->next->complete($accumulator);
}
};
}
}