PostgreSQL read_events silently drops events that fail deserialization #372

Closed
opened 2026-04-15 10:55:23 -07:00 by jwilger-ai-bot · 0 comments
jwilger-ai-bot commented 2026-04-15 10:55:23 -07:00 (Migrated from github.com)

Summary

The PostgreSQL EventReader::read_events implementation silently drops events that fail JSON deserialization via .ok(). This can cause projections to get permanently stuck when the events table contains events of mixed types, because the projection pipeline interprets an empty result as "no more events" and stops.

Location

eventcore-postgres/src/lib.rs, lines 336-345:

let events: Vec<(E, StreamPosition)> = rows
    .into_iter()
    .filter_map(|row| {
        let event_data: Json<Value> = row.get("event_data");
        let event_id: Uuid = row.get("event_id");
        serde_json::from_value::<E>(event_data.0)
            .ok()                    // <-- silent failure
            .map(|e| (e, StreamPosition::new(event_id)))
    })
    .collect();

Problem

When read_events fetches N rows from the database but all N fail deserialization, it returns an empty Vec. The caller (projection pipeline) cannot distinguish between:

  • "No events exist" — the table is empty or all events have been processed
  • "Events exist but none match the requested type" — rows were fetched but deserialization failed

In batch mode, the projection pipeline reads one page and stops. If the first page is entirely non-matching events, the projection terminates, never reaching the matching events further in the table.

Impact

This affects any scenario where the eventcore_events table contains events from multiple event types (which is the expected norm — all events share one table). A projection for EventTypeA can get stuck behind a block of EventTypeB events.

In continuous mode, this could cause infinite empty-poll loops with backoff, wasting resources without ever processing events that are waiting in the table.

Possible Fixes

Several approaches, not mutually exclusive:

1. Filter by event type in SQL (preferred)

Store the event_type_name() in a column and filter in the query:

SELECT event_id, event_data, stream_id
FROM eventcore_events
WHERE event_type = $1
ORDER BY event_id
LIMIT $2

This requires a schema migration to add an event_type column and backfill it.

2. Return position metadata alongside events

Change read_events to return the last position seen (from the raw rows) even if no events deserialized successfully. This lets the projection pipeline advance past non-matching events:

struct ReadResult<E> {
    events: Vec<(E, StreamPosition)>,
    last_position_seen: Option<StreamPosition>,
    rows_fetched: usize,
}

3. Log deserialization failures

At minimum, log a warning when deserialization fails so the problem is observable:

serde_json::from_value::<E>(event_data.0)
    .map_err(|e| {
        tracing::warn!(event_id = %event_id, error = %e, "Failed to deserialize event");
        e
    })
    .ok()

4. Advance checkpoint past non-matching events

Track the highest event_id seen in the raw rows and use it as the checkpoint position, so the next poll starts after all fetched rows regardless of deserialization success.

Affected Backends

  • PostgreSQL — confirmed affected (all events in one table)
  • SQLite — likely affected (same shared table pattern) but harder to trigger since tests use in-memory databases
  • Memory — likely affected but similarly hard to trigger

This was discovered via the stress test projection failure (see related issue), but it's a library-level bug that affects any production deployment with multiple event types.

## Summary The PostgreSQL `EventReader::read_events` implementation silently drops events that fail JSON deserialization via `.ok()`. This can cause projections to get permanently stuck when the events table contains events of mixed types, because the projection pipeline interprets an empty result as "no more events" and stops. ## Location `eventcore-postgres/src/lib.rs`, lines 336-345: ```rust let events: Vec<(E, StreamPosition)> = rows .into_iter() .filter_map(|row| { let event_data: Json<Value> = row.get("event_data"); let event_id: Uuid = row.get("event_id"); serde_json::from_value::<E>(event_data.0) .ok() // <-- silent failure .map(|e| (e, StreamPosition::new(event_id))) }) .collect(); ``` ## Problem When `read_events` fetches N rows from the database but all N fail deserialization, it returns an empty `Vec`. The caller (projection pipeline) cannot distinguish between: - **"No events exist"** — the table is empty or all events have been processed - **"Events exist but none match the requested type"** — rows were fetched but deserialization failed In batch mode, the projection pipeline reads one page and stops. If the first page is entirely non-matching events, the projection terminates, never reaching the matching events further in the table. ## Impact This affects any scenario where the `eventcore_events` table contains events from multiple event types (which is the expected norm — all events share one table). A projection for `EventTypeA` can get stuck behind a block of `EventTypeB` events. In continuous mode, this could cause infinite empty-poll loops with backoff, wasting resources without ever processing events that are waiting in the table. ## Possible Fixes Several approaches, not mutually exclusive: ### 1. Filter by event type in SQL (preferred) Store the `event_type_name()` in a column and filter in the query: ```sql SELECT event_id, event_data, stream_id FROM eventcore_events WHERE event_type = $1 ORDER BY event_id LIMIT $2 ``` This requires a schema migration to add an `event_type` column and backfill it. ### 2. Return position metadata alongside events Change `read_events` to return the last position seen (from the raw rows) even if no events deserialized successfully. This lets the projection pipeline advance past non-matching events: ```rust struct ReadResult<E> { events: Vec<(E, StreamPosition)>, last_position_seen: Option<StreamPosition>, rows_fetched: usize, } ``` ### 3. Log deserialization failures At minimum, log a warning when deserialization fails so the problem is observable: ```rust serde_json::from_value::<E>(event_data.0) .map_err(|e| { tracing::warn!(event_id = %event_id, error = %e, "Failed to deserialize event"); e }) .ok() ``` ### 4. Advance checkpoint past non-matching events Track the highest `event_id` seen in the raw rows and use it as the checkpoint position, so the next poll starts after all fetched rows regardless of deserialization success. ## Affected Backends - **PostgreSQL** — confirmed affected (all events in one table) - **SQLite** — likely affected (same shared table pattern) but harder to trigger since tests use in-memory databases - **Memory** — likely affected but similarly hard to trigger ## Related This was discovered via the stress test projection failure (see related issue), but it's a library-level bug that affects any production deployment with multiple event types.
Sign in to join this conversation.
No milestone
No project
No assignees
1 participant
Notifications
Due date
The due date is invalid or out of range. Please use the format "yyyy-mm-dd".

No due date set.

Dependencies

No dependencies set.

Reference
jwilger/eventcore#372
No description provided.