feat(eventcore)!: streaming reads for read_stream (#364) #414

Merged
jwilger merged 1 commit from feat/364-streaming-reads into main 2026-06-13 09:18:56 -07:00
Owner

Summary

Makes EventStore::read_stream return a lazy async stream instead of a fully-materialized Vec, so large streams are no longer all decoded into memory at once and the executor folds events as they arrive. Breaking change to the EventStore trait (intentional, pre-1.0 — see ADR-0049 for the decision rationale: a clean stream return type over a lazy EventStreamReader wrapper that would secretly buffer for len()/first()).

API

  • read_stream<E>(id) -> Result<EventStream<E>, EventStoreError>, where EventStream<E> is a named newtype over Pin<Box<dyn Stream<Item = Result<E, EventStoreError>> + Send>> and implements Stream. Events are yielded oldest→newest; per-event decode failures surface as Err items (preserving the read_stream_errors_on_type_mismatch behavior).
  • EventStreamReader is removed; a free async helper collect_events(stream) -> Result<Vec<E>, _> (re-exported from eventcore and eventcore-types) covers the "I want them all" case.

Executor: incremental fold (the actual memory win)

The execute pipeline is refactored to a push model: the async shell (pump_stream_reads) pulls one event at a time and pushes each into the pipeline via StoreEffectResult::StreamEvent, which folds it into command state and increments a counter; StreamEnded sets expected_version to the streamed count (== the old reader.len()) and runs dynamic stream discovery. The shell never buffers the whole stream into a Vec; the fold stays in the pipeline (which owns the command). Multi-stream handling, retries, and error propagation are preserved.

Backends

  • postgres: lazy sqlx query(...).fetch(pool) over a cloned pool (real streaming win).
  • sqlite: rows read in spawn_blocking, streamed over a bounded tokio::sync::mpsc channel (incremental + backpressure).
  • memory / fs: yield events one at a time from in-process data (API uniformity; data already resident).

Tests / verification

  • cargo clippy --workspace --all-targets --all-features -- -D warnings clean
  • cargo nextest run --workspace275 tests pass (all four backend contract suites incl. type-mismatch, executor push-fold + per-event-error tests, collect_events tests, lib.rs mock stores).

ADR-0049 records the decision; event-sourcing, store-backends, and command-execution blueprints updated.

Closes #364

## Summary Makes `EventStore::read_stream` return a lazy async stream instead of a fully-materialized `Vec`, so large streams are no longer all decoded into memory at once and the executor folds events as they arrive. **Breaking change to the `EventStore` trait** (intentional, pre-1.0 — see ADR-0049 for the decision rationale: a clean stream return type over a lazy `EventStreamReader` wrapper that would secretly buffer for `len()`/`first()`). ## API - `read_stream<E>(id) -> Result<EventStream<E>, EventStoreError>`, where `EventStream<E>` is a named newtype over `Pin<Box<dyn Stream<Item = Result<E, EventStoreError>> + Send>>` and implements `Stream`. Events are yielded oldest→newest; per-event decode failures surface as `Err` items (preserving the `read_stream_errors_on_type_mismatch` behavior). - `EventStreamReader` is removed; a free async helper `collect_events(stream) -> Result<Vec<E>, _>` (re-exported from `eventcore` and `eventcore-types`) covers the "I want them all" case. ## Executor: incremental fold (the actual memory win) The execute pipeline is refactored to a push model: the async shell (`pump_stream_reads`) pulls one event at a time and pushes each into the pipeline via `StoreEffectResult::StreamEvent`, which folds it into command state and increments a counter; `StreamEnded` sets `expected_version` to the streamed count (== the old `reader.len()`) and runs dynamic stream discovery. The shell never buffers the whole stream into a `Vec`; the fold stays in the pipeline (which owns the command). Multi-stream handling, retries, and error propagation are preserved. ## Backends - **postgres**: lazy `sqlx` `query(...).fetch(pool)` over a cloned pool (real streaming win). - **sqlite**: rows read in `spawn_blocking`, streamed over a bounded `tokio::sync::mpsc` channel (incremental + backpressure). - **memory / fs**: yield events one at a time from in-process data (API uniformity; data already resident). ## Tests / verification - `cargo clippy --workspace --all-targets --all-features -- -D warnings` clean - `cargo nextest run --workspace` — **275 tests pass** (all four backend contract suites incl. type-mismatch, executor push-fold + per-event-error tests, `collect_events` tests, lib.rs mock stores). ADR-0049 records the decision; `event-sourcing`, `store-backends`, and `command-execution` blueprints updated. Closes #364
feat(eventcore)!: streaming reads for read_stream (#364)
All checks were successful
CI / Detect Changes (pull_request) Successful in 4s
CI / Request auto_review semantic review (pull_request) Successful in 2s
CI / Format (pull_request) Successful in 15s
auto_review auto_review: no findings
CI / Clippy (pull_request) Successful in 2m19s
CI / Security Audit (pull_request) Successful in 29s
CI / Mutation (pull_request) Has been skipped
CI / CI Gate (pull_request) Successful in 2s
CI / Test (pull_request) Successful in 3m24s
34de966183
BREAKING CHANGE: EventStore::read_stream now returns an async stream
instead of a collected EventStreamReader. For large streams this bounds
peak memory by the in-progress command state rather than the full stream
length, and lets the executor fold events as they arrive.

- read_stream returns Result<EventStream<E>, EventStoreError>, where
  EventStream<E> is a named newtype over a boxed Send Stream yielding
  Result<E, EventStoreError> per event in stream-version order. A named
  newtype (not RPITIT) keeps the trait usable as a generic bound and lets
  the effect plumbing and wrapper stores name the type.
- Remove EventStreamReader; add the free async helper collect_events for
  callers that want a Vec (re-exported from eventcore-types and eventcore).
- Executor push-fold: the shell opens the stream and pushes one event at a
  time into the pipeline (StoreEffectResult::StreamEvent), which folds it
  into state and stays in AwaitingStreamRead (PipelineStep::WaitForResult);
  on StreamEnded it sets expected_version to the streamed event count and
  runs dynamic stream discovery; StreamReadError completes with an error.
  The shell never buffers the whole stream into a Vec.
- Per-backend streaming: postgres uses sqlx fetch (lazy) over a cloned pool;
  sqlite reads rows in spawn_blocking and sends them over a bounded mpsc
  channel consumed by the async stream; memory and fs materialize per-event
  results under their lock then yield incrementally. Per-event decode
  failures surface as Err items, preserving read_stream_errors_on_type_mismatch.
- Update the eventcore-testing contract suite, chaos/deterministic wrapper
  stores, scenario helpers, all mock stores, integration tests, bench, and
  stress scenarios to the streaming API (open stream then collect_events).
- ADR-0049 records the decision; event-sourcing, store-backends, and
  command-execution blueprints document streaming reads.

Closes #364
auto-review left a comment

This PR introduces a significant change to the EventStore::read_stream method, transitioning it to return a lazy async stream instead of a fully-materialized Vec. This change aims to improve memory efficiency by processing events incrementally. The update is a breaking change, but it aligns with pre-1.0 expectations and is well-documented in ADR-0049.

Walkthrough

  • ADR-0049: Provides a comprehensive rationale for the change, detailing the benefits of streaming reads and the decision to remove EventStreamReader.
  • Executor Changes: The executor now folds events incrementally, which is a key memory optimization.
  • Backend Implementations:
    • PostgreSQL: Uses lazy fetching to avoid buffering the entire result set.
    • SQLite: Implements a bounded channel to handle synchronous reads with backpressure.
    • In-Memory and File: Maintain API uniformity but do not gain memory benefits.
  • Testing: Ensure that tests cover the new incremental processing behavior and that all dependent code is updated to use collect_events where necessary.

LLM usage and cost

This PR introduces a significant change to the `EventStore::read_stream` method, transitioning it to return a lazy async stream instead of a fully-materialized `Vec`. This change aims to improve memory efficiency by processing events incrementally. The update is a breaking change, but it aligns with pre-1.0 expectations and is well-documented in ADR-0049. ## Walkthrough - **ADR-0049**: Provides a comprehensive rationale for the change, detailing the benefits of streaming reads and the decision to remove `EventStreamReader`. - **Executor Changes**: The executor now folds events incrementally, which is a key memory optimization. - **Backend Implementations**: - **PostgreSQL**: Uses lazy fetching to avoid buffering the entire result set. - **SQLite**: Implements a bounded channel to handle synchronous reads with backpressure. - **In-Memory and File**: Maintain API uniformity but do not gain memory benefits. - **Testing**: Ensure that tests cover the new incremental processing behavior and that all dependent code is updated to use `collect_events` where necessary. ## LLM usage and cost - Reasoning (gpt-4o) in=25375 out=707 cost=$0.137480 - Cheap (gpt-4o-mini) in=11076 out=94 cost=$0.001718 Estimated total USD: $0.139198 via https://api.openai.com and https://api.openai.com
jwilger deleted branch feat/364-streaming-reads 2026-06-13 09:18:56 -07:00
Sign in to join this conversation.
No description provided.