feat(postgres): add PostgreSQL event store implementation #169
No reviewers
Labels
No labels
adr
automated
bug
chore
dependencies
documentation
enhancement
epic
github-actions
P1-high
P2-medium
P3-low
release
research
rust
No milestone
No project
No assignees
1 participant
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
jwilger/eventcore!169
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "feature/eventcore-005-postgres-store"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
Summary
eventcore-postgrescrate with sqlx-backed EventStore implementationChanges
eventcore-postgreswithPostgresEventStoreimplementing theEventStoretraiteventcore_eventstable with trigger-based version assignmentPostgresConfigfor pool tuning, plusfrom_pool()for shared pool scenariosTest plan
Pull request overview
This PR adds a production-ready PostgreSQL adapter for EventCore with comprehensive error handling and serialization support. It introduces the
eventcore-postgrescrate implementing theEventStoretrait, adds serialization requirements to theEventtrait, and refactors the storage layer to support JSON serialization of events.Key Changes
Serialize + DeserializeOwned) to theEventtrait to enable persistenceStreamWritesto capture event data and metadata at append time for adaptersReviewed changes
Copilot reviewed 30 out of 31 changed files in this pull request and generated 2 comments.
Show a summary per file
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
@ -0,0 +71,4 @@// Then: Tracing spans are emitted for both append and read operationsassert!(logs_contain("postgres.append_events") && logs_contain("postgres.read_stream"),The function
logs_containis called but not defined. Thetracing-testcrate provides alogs_contain!macro, not a function. Change tologs_contain!("postgres.append_events") && logs_contain!("postgres.read_stream").@ -0,0 +119,4 @@.as_ref().map(|reader| reader.len()).unwrap_or_default();let logs_contain_conflict = logs_contain("postgres.version_conflict");The function
logs_containis called but not defined in the test file. Thetracing-testcrate provides alogs_containmacro, not a function. The correct usage should belogs_contain!("postgres.version_conflict")instead oflogs_contain("postgres.version_conflict").Several issues need to be addressed.
Why? Any way to not put this limitation in place?
@ -64,14 +64,12 @@ jobs:- name: Run testsenv:Can we maybe pick one and normalize?
There are no "existing deployments" and we don't worry about backward compatibility for versions < 1.0.0
@ -0,0 +1,335 @@use std::time::Duration;Magic numbers for things that should ideally be configurable.
Yuck?
@ -0,0 +119,4 @@}})?;events.push(event);}How many events would need to be on a stream before just selecting the whole set at once (vs. streaming) becomes a problem?
@ -0,0 +193,4 @@Ok(EventStreamSlice)}}Wouldn't it be much cleaner to do an atomic insert where we insert all events in one statement along with a condition that the stream versions for each stream match the expected values, so that we only need to execute a single statement without locks?
@ -190,1 +219,3 @@derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Display)derive(Debug,Clone,Any particular reason we can't use derives with serde here?
@ -64,14 +64,12 @@ jobs:- name: Run testsenv:Normalized to
DATABASE_URLwhich is the standard convention used by sqlx and other database tools. Removed the customEVENTCORE_TEST_POSTGRES_URLandTEST_DATABASE_URLenv vars from CI. Commit:c0ccf9eRemoved the pre-1.0 migration upgrade guide section. Commit:
2e21fa9@ -0,0 +1,335 @@use std::time::Duration;Added
PostgresConfigstruct with configurablemax_connectionsandacquire_timeout. Defaults are now more sensible (10 connections, 30s timeout) and can be overridden viaPostgresEventStore::with_config(). Commit:6831640@ -0,0 +1,335 @@use std::time::Duration;Replaced with general-purpose
from_pool()that's not test-specific. Users who want full control over pool configuration (or want to share a pool) can use this. Commit:6831640@ -0,0 +119,4 @@}})?;events.push(event);}In event sourcing, streams typically represent bounded aggregates (single order, account, etc.) which should stay relatively small—usually under a few hundred events. Streams with thousands of events are a design smell suggesting the aggregate boundary is too large.
For typical aggregate sizes,
fetch_allis perfectly fine. The real pain point would be ~10K+ events where memory becomes a concern. If we see that pattern emerge, we could add:read_stream_from_version)sqlx::query().fetch()For now, keeping it simple. Happy to add streaming later if there's a concrete use case.
@ -190,1 +219,3 @@derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Display)derive(Debug,Clone,nutype generates a validated newtype where construction can fail (
try_newreturnsResult). With#[derive(Serialize, Deserialize)], deserialization would bypass validation and accept any string, including empty or overly long ones.The manual impl ensures deserialization goes through
StreamId::try_new(), preserving the invariants (non-empty, max 255 chars, trimmed). This is the standard pattern for validated newtypes—serialize the inner value, deserialize with validation.@ -0,0 +193,4 @@Ok(EventStreamSlice)}}Implemented trigger-based version management as discussed. The PostgreSQL trigger now:
stream_versionis automatically set toMAX + 1eventcore.expected_versionsand raisesP0001on mismatchApp code is now much simpler - just sets expected versions via
set_config()and INSERTs. No version calculation, no explicit transactions for locking.Commit:
9ea33cd@ -190,1 +219,3 @@derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Display)derive(Debug,Clone,I thought that the nutype library had specific serde support through a feature flag. Can you double check on that and make the adjustment if it does?
Addressed by removing the limitation entirely:
All 78 tests now run in parallel in ~4 seconds.
@ -190,1 +219,3 @@derive(Debug, Clone, PartialEq, Eq, Hash, AsRef, Deref, Display)derive(Debug,Clone,You're right - nutype does have serde support through the
serdefeature flag (which was already enabled in Cargo.toml). Updated to use the derive instead of manual implementation.