Skip to content

fix(opal-server): reconnect broadcaster on backbone disconnect (PER-15065)#915

Open
zeevmoney wants to merge 8 commits into
masterfrom
per-15065/broadcaster-reconnect-resilience
Open

fix(opal-server): reconnect broadcaster on backbone disconnect (PER-15065)#915
zeevmoney wants to merge 8 commits into
masterfrom
per-15065/broadcaster-reconnect-resilience

Conversation

@zeevmoney

@zeevmoney zeevmoney commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

Changes proposed

Fixes a production incident where a brief broadcaster-backbone (Postgres LISTEN/NOTIFY, Redis, Kafka) disconnect escalated into a self-sustaining, fleet-wide OPAL-client connection-drop storm that only a worker restart cleared — closes the cross-instance consistency gap the availability fix would otherwise open, and makes the failure visible to k8s so it can self-heal.

Root cause (in the pub/sub libraries OPAL wraps): with ignore_broadcaster_disconnected=False, every client websocket waits on a single shared broadcaster reader task. On a backbone drop the reader completes/never-recovers, so reconnecting clients are cancelled indefinitely; a non-idempotent disconnect adds a ValueError('list.remove(x): x not in list') storm. The same fragility has two faces: a loud storm (prod, when the reader was already running and the drop cancels clients) and a silent wedge (staging, when the drop made start_reader_task raise → leaked listener count → get_reader_task() returns Noneasyncio.wait([client, None]) TypeError crashes every new /ws handler) — both needing a manual restart.

This PR (Phase 1 — self-contained in OPAL):

  • Reconnecting reader (ReconnectingBroadcaster): the reader reconnects with bounded backoff + jitter and stays pending across a transient drop, so clients are never spuriously cancelled. Because its start_reader_task no longer connects-and-raises, the staging None-reader TypeError wedge cannot occur either.
  • Give-up → graceful restart: when OPAL_BROADCAST_RECONNECT_MAX_RETRIES (non-default) is exhausted, the broadcaster fires a give-up hook wired to the worker's graceful shutdown regardless of OPAL_STATISTICS_ENABLED (previously only the statistics path restarted on reader completion), so the worker actually restarts instead of re-entering the storm. Connect-OK/instant-close flaps now count toward the retry budget so MAX_RETRIES can trip on flap loops.
  • Idempotent disconnect (SafeConnectionManager): eliminates the list.remove storm.
  • Consistency across the gap (two layers): (B) a bounded outbound replay buffer replays broadcasts missed during the outage; (A) resync on recovery forces each worker's own clients to reconnect and re-fetch full policy+data — the guarantee that converges the fleet. Gap detection is reader-task-local (a fresh reader after all clients drained does not fire a spurious resync), and a gap that lands mid-recovery triggers a single-flight rerun rather than being dropped.
  • Broadcaster-aware /healthcheck (new): a wedged reader (reader task absent/dead while clients depend on it) now returns 503 so a k8s readiness/liveness probe can route away from / restart the worker instead of leaving it broken for hours. It stays healthy through a normal transient reconnect (no probe flap). / remains a trivial liveness 200.
  • Removed the superseded experiment flag OPAL_BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED (the earlier, partial fix attempt). ignore_broadcaster_disconnected is now derived from the broadcaster type — False for the reconnecting broadcaster (a completed reader surfaces the disconnect so clients reconnect), library-safe True for the stock EventBroadcaster rollback (OPAL_BROADCAST_RECONNECT_ENABLED=False) so the legacy path degrades to stale-but-connected rather than the storm. Blast radius: a deployment still setting that env var has it harmlessly ignored (Confi drops unknown keys).
  • New server config keys: OPAL_BROADCAST_RECONNECT_ENABLED, …_MAX_RETRIES, …_BACKOFF_MIN/MAX_SECONDS, OPAL_BROADCAST_REPLAY_BUFFER_SIZE, OPAL_BROADCAST_RESYNC_ON_RECONNECT, OPAL_BROADCAST_RESYNC_SETTLE_SECONDS, OPAL_BROADCAST_HEALTHCHECK_ENABLED (all default-on, reversible).
  • Tests: unit + integration with negative controls, a multi-instance consistency suite, the broadcaster-health contract + an end-to-end /healthcheck route test plus a simulated-DB-kill test (the probe stays 200 while the reader reconnects through a transient outage — no flap — and flips to 503 only when reconnect gives up), and an extended app-tests e2e (graceful + ungraceful backbone kill, publish-during-outage convergence, regression guards). Docs updated in broadcast-interface.mdx + configuration.mdx.
  • Housekeeping (in-diff): repaired the prek/pre-commit toolchain so hooks run on modern pre-commit>=4 and Python 3.13+ (docformatter v1.7.6 + a Python-3.12 default_language_version pin; byte-identical formatting), and stabilized the broadcaster e2e (deterministic replay timing + a clean teardown between retries).

A follow-up (Phase 2) upstreams the fixes to fastapi_websocket_rpc / fastapi_websocket_pubsub, bumps the pins, and removes these OPAL-side stop-gaps.

Check List (Check all the applicable boxes)

  • I sign off on contributing this submission to open-source
  • My code follows the code style of this project.
  • My change requires changes to the documentation.
  • I have updated the documentation accordingly.
  • All new and existing tests passed.
  • This PR does not contain plagiarized content.
  • The title of my pull request is a short description of the requested changes.

Note to reviewers

Internal tracking: Permit PER-15065. Deferred follow-up: PER-15129 (resync cooldown + byte-bounded replay buffer).

Reader-task lifecycle (the availability fix)

flowchart TD
  A["Backbone connection drops"] --> B{"Reconnect enabled?"}
  B -- "no (stock behavior)" --> C["Shared reader task completes or fails to start"]
  C --> D["get_reader_task done or None"]
  D --> E["Clients cancelled (storm) or new /ws crash (wedge)"]
  E --> F["Needs a manual restart"]
  B -- "yes (this PR)" --> G["Reader retries with backoff, stays pending"]
  G --> H["Clients keep their websocket; fan-out resumes on reconnect"]
  G -- "after max retries (give up)" --> GU["Fire give-up hook -> graceful worker restart"]
Loading

Consistency across a gap

flowchart TD
  X["Backbone gap; update published on worker A"] --> Y["A's clients get it locally; peers miss it"]
  Y --> Z["Backbone recovers"]
  Z --> R1["(B) A replays buffered updates -> peers that re-subscribed catch up"]
  Z --> R2["(A) every worker resyncs its own clients -> full policy + data refetch"]
  R1 --> OK["Fleet converges to current truth"]
  R2 --> OK
Loading

Ordering is enforced: on recovery a worker replays the buffer first, then fires the resync.

Why the broadcaster-aware healthcheck

Two real incidents showed the OPAL server can get wedged in pub/sub state it cannot self-heal, while k8s saw the pods as healthy the whole time (/healthcheck returned 200 unconditionally) — so nothing routed away or restarted, and a ~20-second backbone blip became a multi-hour outage. is_reader_healthy() is judged against the listener count (unhealthy only when listeners depend on a missing/completed reader), so it does not flap during a normal transient reconnect; the probe's failureThreshold absorbs any rare race. Recommended deployment follow-up: point the OPAL server liveness probe at /healthcheck (today only readiness uses it) so a wedged worker is restarted, and disable auto_minor_version_upgrade on the broadcaster RDS (the trigger in both incidents).

Reviewed by python-pro + fastapi-pro (two rounds); fixes applied

Round 1: reader self-cancellation during resync (pin a listening context); single-flight recovery + a lock around the buffer; cancel pending recovery tasks on shutdown; drop un-serializable buffered items; replaced a production assert with raise; per-worker jitter to avoid a fleet-wide resync stampede; corrected e2e log guards.

Round 2 (after removing the experiment flag): make the BROADCAST_RECONNECT_ENABLED=False rollback safe (ignore_broadcaster_disconnected=True for the stock broadcaster — degrade to stale, not storm); pin the whole post-gap recovery so the reader can't be cancelled mid-recovery; flush the replay buffer without holding the buffer lock across network I/O and re-enqueue the unsent tail on a mid-drain failure; cancel and join child tasks on shutdown; remove the write-only _buffer_overflowed flag and the phantom config-doc claim. Findings that were verified non-issues (a non-observable healthcheck startup race, infinite-retry "healthy while retrying", asyncio.Lock construction) were left as-is with rationale.

Round 3 — deep review by @EliMoshkovich; fixes applied

A thorough human review (verified against the upstream fastapi_websocket_pubsub 1.0.1 / fastapi_websocket_rpc internals) surfaced six lifecycle/consistency findings. 1–5 are fixed in this PR; F6 is tracked as PER-15129:

  • F1 (HIGH): gap detection is now reader-task-local instead of an instance flag — a fresh reader task (after the last client left and one returns) no longer fires a spurious fleet-wide resync on its first connect (this hit default, stats-off deployments).
  • F2: single-flight recovery now reruns for a gap that lands during the late recovery phase (rerun-requested flag) instead of permanently dropping that gap's flush + resync.
  • F3: on max-retries give-up, fire a give-up hook → the worker's graceful shutdown regardless of OPAL_STATISTICS_ENABLED (was only wired in the stats path); count connect-OK/instant-close flaps toward the retry budget so MAX_RETRIES can trip on flap loops.
  • F4 (docs): state the resync guarantee's actual scope (policy + the configured data sources, not runtime inline/one-off POST /data/config updates — those rely on the best-effort replay buffer) and the cross-worker replay-ordering caveat in broadcast-interface.mdx.
  • F5: partial-replay re-enqueue preserves drop-oldest (rebuild unsent + refill under the lock) instead of extendleft evicting the newest refill on a full deque.
  • F6 (LOW, deferred → PER-15129): a resync cooldown / minimum interval on a flapping backbone, and byte-bounding the replay buffer (currently count-bounded). F1 already removed the main amplifier, so this is hardening, not a correctness bug.

How it was tested

  • Unit + integration (negative controls reproducing both upstream bugs; a two-instance convergence test; the broadcaster-health state matrix; a simulated-DB-kill test driving a real reader through fault → reconnect → give-up → recover; the /healthcheck 200/503/kill-switch routes). Round-3 fixes add focused tests: fresh-reader-no-spurious-resync (F1), rerun-on-late-gap (F2), flap-loop-trips-give-up + give-up-hook-fires-only-on-give-up (F3), partial-replay-preserves-drop-oldest (F5). Full opal-server + opal-common suites pass; existing OpalServer startup test unaffected.
  • e2e: app-tests/run.sh (CI E2E Tests job) — graceful + ungraceful backbone kill, publish-during-outage consistency, no list.remove + replay-ran guards.

Blast radius

  • SafeConnectionManager and the healthcheck logic apply to every server; existing startup test confirms no regression. Reconnect / resync / healthcheck / give-up-restart activate only with a broadcaster configured and are each gated by a default-on, runtime-reversible flag (give-up-restart only when MAX_RETRIES > 0, which is non-default).

Generated with Claude Code

A brief broadcaster-backbone outage escalated into a fleet-wide client
connection-drop storm that only a worker restart cleared: the shared
broadcaster reader task completed on disconnect and was never restarted
while clients stayed connected, so every reconnecting client was cancelled.

Add ReconnectingBroadcaster (reader reconnects with bounded exponential
backoff, stays pending across a transient backbone loss) and an idempotent
SafeConnectionManager (eliminates the ValueError('list.remove(x): x not in
list') churn). Gate via OPAL_BROADCAST_RECONNECT_* config keys. Includes
unit + integration tests with negative controls that reproduce the bug, and
an extended app-tests e2e (graceful + ungraceful backbone drop).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@linear-code

linear-code Bot commented Jun 5, 2026

Copy link
Copy Markdown

PER-15065

PER-15129

@netlify

netlify Bot commented Jun 5, 2026

Copy link
Copy Markdown

Deploy Preview for opal-docs ready!

Name Link
🔨 Latest commit 671776e
🔍 Latest deploy log https://app.netlify.com/projects/opal-docs/deploys/6a2ebaa7f18f7f000808a8c8
😎 Deploy Preview https://deploy-preview-915--opal-docs.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

zeevmoney and others added 4 commits June 9, 2026 16:24
… broadcaster gap

Builds on the reconnecting broadcaster so a transient backbone outage no longer
silently desyncs OPAL server instances:

- (B) bounded outbound replay buffer: broadcasts that fail while the backbone is
  down are queued and replayed on reconnect so peers that re-subscribe catch up.
- (A) resync on recovery (the guarantee): after any gap each worker forces its own
  clients to reconnect and re-fetch full policy + data state, so the fleet converges
  (a worker may have missed incoming peer updates during its gap).

Pins the broadcaster's listening context during a resync so recycling clients does
not drop the listener count to 0 and cancel the reader; single-flight recovery plus
a lock around the buffer/overflow flag; drops un-serializable buffered items so a
poison payload cannot wedge the buffer; cancels pending recovery tasks on shutdown.
Adds OPAL_BROADCAST_REPLAY_BUFFER_SIZE / _RESYNC_ON_RECONNECT / _RESYNC_SETTLE_SECONDS.
Includes multi-instance convergence tests and extends the app-tests e2e with a
publish-during-outage consistency scenario.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
A wedged broadcaster reader (reader task absent/dead while clients depend on it)
previously left /healthcheck returning 200, so k8s never routed away from or
restarted the bad worker — a transient backbone blip could wedge a pod for hours.

Add ReconnectingBroadcaster.is_reader_healthy(): unhealthy only when listeners are
present and the reader task is missing or done (crashed / gave up) — it stays healthy
through a normal transient reconnect, so the probe does not flap during a backbone
blip. /healthcheck returns 503 in that wedged state (/ stays a trivial liveness 200),
gated by OPAL_BROADCAST_HEALTHCHECK_ENABLED (default true). Unit tests for the health
contract plus an end-to-end route test (200 / 503 / kill-switch).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…d DB kill

Drives a real ReconnectingBroadcaster reader against a fault-injectable backbone:
- transient DB kill -> reader stays pending (reconnecting) -> is_reader_healthy()
  stays True / /healthcheck stays 200 (the no-flap property: no needless restart
  during a normal blip), and recovers on bus.recover();
- permanent kill that exhausts reconnect retries -> reader task done ->
  is_reader_healthy() False / /healthcheck 503.
Plus route-level checks (pending -> 200, gave-up -> 503 with body, / stays 200).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
The docformatter hook pinned at v1.7.5 ships a second hook (`docformatter-venv`)
declaring `language: python_venv`, which prek and pre-commit>=4 reject for the whole
manifest — so hooks could not initialize locally. CI passed only because it pins
`pre-commit<4` on Python 3.12.

- Bump docformatter v1.7.5 -> v1.7.6 (drops the `docformatter-venv` hook; keeps
  `language: python`). v1.7.6 is byte-identical to v1.7.5 on this codebase, so the
  bump introduces no reformatting (v1.7.7 changes docstring wrapping, hence v1.7.6).
- Pin the hook toolchain to Python 3.12 via default_language_version: the pinned
  black 23.1.0 / isort / docformatter do not run on 3.13+, so hooks failed on
  machines whose default interpreter is newer.
- Apply docformatter to healthcheck_db_kill_test.py (the one file that predated this
  fix and had not been run through the hook).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@zeevmoney zeevmoney marked this pull request as ready for review June 10, 2026 16:50
@zeevmoney zeevmoney requested a review from Copilot June 10, 2026 16:50

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds resilience to OPAL server’s pub/sub layer so transient broadcast-backbone disconnects don’t cascade into client disconnect storms or leave workers silently wedged, while also making broadcaster-reader failure visible via /healthcheck for k8s self-healing.

Changes:

  • Introduces ReconnectingBroadcaster (bounded backoff + jitter, replay buffer, post-gap resync hook) and SafeConnectionManager (idempotent disconnect + staggered close) to harden pub/sub behavior.
  • Updates server health endpoints so / remains trivial liveness, while /healthcheck can return 503 when a reconnecting broadcaster is wedged and clients depend on it.
  • Adds new config keys plus extensive unit/integration/e2e tests and docs updates; updates pre-commit config and app-tests to validate the regression scenarios.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
packages/opal-server/opal_server/pubsub_resilience.py Adds reconnecting broadcaster + replay/resync logic and idempotent connection manager.
packages/opal-server/opal_server/pubsub.py Wires resilient broadcaster/config, installs SafeConnectionManager, and registers resync-on-reconnect behavior.
packages/opal-server/opal_server/server.py Makes /healthcheck broadcaster-aware (503 on wedged reader) and adds a trivial / liveness route.
packages/opal-server/opal_server/config.py Adds broadcaster reconnect/replay/resync/healthcheck configuration flags and tunables.
packages/opal-server/opal_server/tests/safe_connection_manager_test.py Unit tests for idempotent disconnect and staggered close behavior.
packages/opal-server/opal_server/tests/reconnecting_broadcaster_test.py Unit tests for reconnecting reader lifecycle, backoff bounds, and health predicate.
packages/opal-server/opal_server/tests/healthcheck_endpoint_test.py End-to-end tests for / and /healthcheck behavior (including kill switch).
packages/opal-server/opal_server/tests/healthcheck_db_kill_test.py Simulates DB/backbone kill to validate non-flapping health and 200/503 transitions.
packages/opal-server/opal_server/tests/broadcaster_reconnect_integration_test.py Integrates with PubSubEndpoint.main_loop to ensure clients aren’t cancelled across backbone drops.
packages/opal-server/opal_server/tests/broadcaster_consistency_integration_test.py Multi-instance convergence tests covering replay + resync single-flight behavior.
documentation/docs/getting-started/running-opal/run-opal-server/broadcast-interface.mdx Documents broadcaster reconnection/resync/replay configuration (needs one more env var entry).
app-tests/run.sh Extends e2e script to test graceful/ungraceful backbone outages, storm regression guards, and consistency across gaps.
app-tests/docker-compose-app-tests.yml Adds Postgres healthcheck for the broadcast channel container.
.pre-commit-config.yaml Pins hook runtime to Python 3.12 and bumps docformatter rev to avoid pre-commit manifest issues.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

zeevmoney and others added 2 commits June 11, 2026 13:22
…an retries)

The e2e failed in CI for two reasons:

1. The cross-instance consistency check was racy. A data update published to one
   server while the backbone is down reaches the second server's client only via
   the replay buffer (the base data config restores only /static on a resync), and
   with the default reconnect backoff (max 30s) the second server could still be
   mid-reconnect when the first replayed after the 2s settle, missing it. Pin the
   broadcaster timing in the compose env (BACKOFF_MIN=0.5, BACKOFF_MAX=2,
   RESYNC_SETTLE_SECONDS=3) so both replicas re-subscribe before the replay fires.

2. The retry loop re-ran main() without tearing the stack down, so generate_opal_keys
   could not bind host port 7002 and the previous attempt's stale transient client
   ERRORs tripped check_no_error. Tear the stack down between attempts.

Validated locally: the full app-tests suite (including the graceful + ungraceful
broadcaster kills and the consistency scenario) now passes on the first attempt.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…rden recovery

Remove the previous fix attempt for the broadcaster connection-loss storm — the
OPAL_BROADCAST_CONN_LOSS_BUGFIX_EXPERIMENT_ENABLED flag — now that the reconnecting
broadcaster is the real fix.

- Drop the experiment flag; derive ignore_broadcaster_disconnected from the broadcaster
  type: False for ReconnectingBroadcaster (a completed reader surfaces the disconnect so
  clients reconnect), True (library-safe) for the stock EventBroadcaster rollback so the
  legacy path degrades to "stale but connected" instead of the storm.
- Pin the whole post-gap recovery in a listening context so the reader cannot be cancelled
  mid-recovery (previously only close_all_staggered was pinned).
- Flush the replay buffer without holding the buffer lock across network I/O; re-enqueue
  the unsent tail on a mid-drain transport failure.
- Cancel AND join child tasks on reader shutdown.
- Remove the write-only _buffer_overflowed flag and the config-doc claim that overflow
  triggers a resync (it does not; the resync fires unconditionally on every gap).

Tests: add reconnect/disconnect/slow-connection coverage — flaky-reconnect-recovers,
slow-connect-stays-pending, subscribe-fail-reconnects, partial-replay-requeue, buffer
overflow/disabled, deterministic single-flight, max_retries>0-then-recover, skip-own,
shutdown-cancels-children, and the resync-pins-the-reader-alive invariant. The
consistency harness now holds a listening context to mirror a connected client.
Reviewed by python-pro + fastapi-pro; findings addressed.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>

@EliMoshkovich EliMoshkovich left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep Review (correctness, data consistency/integrity, security)

Verdict

High-quality, well-tested PR that fixes a real production failure mode. Recommend approval after addressing findings 1–3. The core mechanism (reconnecting reader + idempotent disconnect + healthcheck) is sound, and I verified its assumptions against the actual upstream fastapi_websocket_pubsub 1.0.1 / fastapi_websocket_rpc sources. The findings below are about edge-case lifecycle interactions and the precise scope of the consistency "guarantee" — none invalidate the design.

What I verified against upstream (all correct)

  • __read_notifications__ / __broadcast_notifications__ have trailing double underscores → no name mangling, so the overrides genuinely dispatch (upstream _subscribe_to_all_topics calls self.__broadcast_notifications__ — resolves to the subclass).
  • The double-disconnect storm is real: WebsocketRPCEndpoint.main_loop calls handle_disconnect() (disconnect #1) and its outer bare except calls manager.disconnect() again (#2); upstream ConnectionManager.disconnect is an unguarded list.remove. PubSubEndpoint offers no manager injection point, so the endpoint.endpoint.manager attribute swap (with the loud RuntimeError guard) is the only option. The manager is swapped before any connection is served — safe.
  • The staging "wedge" is real: upstream start_reader_task connects-and-raises inside EventBroadcasterContextManager.__aenter__ after incrementing _listen_count; the exception skips __aexit__, leaking the count, leaving get_reader_task() → None and asyncio.wait([task, None])TypeError. The override (reader loop owns connection) eliminates the raise path.
  • The recovery pin (get_listening_context() around the whole recovery) correctly prevents the upstream context manager from cancelling the reader when close_all_staggered drives the client-held listen count down; upstream __aexit__ cancels without awaiting, so no deadlock with _cancel_pending_tasks.
  • The resync premise holds client-side: close 1012 → websockets raises ConnectionClosedError (not ClosedOK) → client retries → DataUpdater.on_connectget_base_policy_data() full refetch.
  • No ack-semantics regression: EventNotifier.callback_subscribers already swallowed broadcast exceptions upstream, so publish was always fire-and-forget; buffering strictly improves on silent loss.

Findings

1. HIGH — _had_prior_connection conflates "backbone gap" with "reader task restart" → spurious resync, worst-case self-sustaining close loop

_had_prior_connection is instance-level state (pubsub_resilience.py, __read_notifications__). When the last client disconnects, upstream's context manager cancels the reader and sets _subscription_task = None. When the next client connects, a fresh reader task starts, sees _had_prior_connection == True from its predecessor, and schedules a full gap recovery — flush + resync that closes the client that just connected.

  • Benign case: every reader restart costs each client one spurious 1012-close + full policy/data refetch.
  • Pathological case (small deployments): close client → 2s tail sleep → pin releases → count hits 0 → reader cancelled → client reconnects (its retry backoff has grown past the settle window) → fresh reader → spurious recovery → close again — a self-sustaining churn loop, ironically a miniature version of the storm being fixed.

This affects default deployments: OPAL_STATISTICS_ENABLED defaults to False, so nothing holds a permanent listening context and the listen count is driven purely by clients. The e2e runs with OPAL_STATISTICS_ENABLED=true (reader pinned forever at startup), so this path is untested.

Fix: make gap detection reader-task-local (a local variable in __read_notifications__ instead of the instance attribute). It's strictly more correct: a reader restart only happens after the listener count hit 0, i.e. no clients existed to miss anything, and the first client back does a full refetch anyway.

2. MEDIUM — single-flight recovery can permanently skip a gap's replay and resync

_schedule_gap_recovery drops the request if a recovery is in flight. The flap-during-settle case is handled (the in-flight recovery's later flush picks up the new buffer entries). But a gap that opens and closes after the in-flight recovery's flush (i.e., during _fire_reconnect, which sleeps up to ~2×settle ≈ 4s) gets nothing: its buffered broadcasts sit unflushed and no resync fires — until some future gap triggers the next recovery, which may be days away. A managed-Postgres restart is exactly a multi-drop flap on this timescale.

Fix: replace skip with a "rerun requested" flag — when _schedule_gap_recovery finds a live recovery, mark it; _recover_after_gap loops while the flag is set.

3. MEDIUM — after max-retries give-up, the worker doesn't actually restart; the storm regime returns until k8s acts

The docstrings say exhausting retries lets "the worker restart", but in-process nothing restarts it: the reader-done → _graceful_shutdown() callback exists only in the statistics-enabled path (server.py). With statistics off and BROADCAST_RECONNECT_MAX_RETRIES > 0: after give-up, the done reader stays installed (_subscription_task is reset to None only when the count reaches 0 — which churn may never allow), so every reconnecting client's asyncio.wait completes instantly and it's dropped — the original storm. Recovery depends on /healthcheck 503 + a liveness probe pointed at it, which the PR only recommends as a deployment follow-up.

The default (0 = retry forever) avoids this, but the knob is an operator footgun. Fix: on give-up, schedule the same graceful shutdown the statistics path uses (or document loudly that MAX_RETRIES > 0 requires the liveness probe). Related: a backbone that flaps connect-OK/insta-close never increments attempt (it's reset on connect), so MAX_RETRIES can never trip on flap loops — the counter only sees consecutive exceptions.

4. MEDIUM (docs/data-integrity) — the "resync = guarantee" claim is broader than the mechanism

Two precise caveats the broadcast-interface.mdx text should carry:

  • Scope of the guarantee. A client's resync refetch covers policy + the configured data sources (get_base_policy_data). Runtime-published incremental updates (e.g. POST /data/config with inline data or one-off URLs) are not part of that baseline — for those, the best-effort replay buffer is the only recovery path. The guarantee holds when data sources serve full current truth, but for generic OPAL users with ad-hoc inline updates, a dropped replay (buffer overflow, finding 2, mid-flush crash) is a permanently lost update. The e2e implicitly proves this: convergence of consistency_user requires the replay log lines.
  • Replay reordering. A buffered update replayed at settle-time can land on peers after a newer live update published post-recovery (per-worker ordering is enforced; cross-worker is not). With URL-fetch entries this self-heals (fetch returns current truth); with inline-data entries, stale-wins is possible if the peer's own resync happened to complete before the replay arrived. Narrow window, worth a sentence in the docs.

5. LOW — partial-replay re-enqueue can evict the newest entries

_outbound_buffer.extendleft(reversed(unsent)) on a bounded deque: if concurrent failures refilled the buffer during the lock-free publish phase, extendleft evicts from the right — the newest items — inverting the documented drop-oldest policy. Cheap fix: rebuild as unsent + existing, truncated from the front.

6. LOW — resync amplification on a flapping backbone

Every re-subscribe after a gap fires a full recovery; each resync = fleet-wide client recycle + full policy/data refetch against git/data sources. Single-flight dedups overlaps but each flap-recovery cycle fires anew. Consider a minimum interval / cooldown between resyncs. Also: the buffer is bounded by count (10000), not bytes — large inline payloads could pin significant per-worker memory during a long outage.

Security assessment

No significant concerns; a few observations:

  • No new auth surface. /healthcheck and / were already unauthenticated; the 503 body leaks one bit ("backbone wedged") to unauthenticated callers — standard for probes, acceptable. No external input can influence the health state.
  • Buffered payloads can contain secrets (data-update entries may embed fetcher credentials). They are held in memory only and never logged — _buffer_outbound/flush log only counts and exception reprs, consistent with the recent FetcherConfig log-redaction fix (a846b50). Good.
  • Forced reconnect doesn't weaken auth: clients closed with 1012 re-authenticate (JWT) on reconnect like any new connection.
  • DoS angle: anyone able to flap the backbone can trigger repeated fleet-wide refetch storms (finding 6) — but backbone access is already a trusted position; the resync kill-switch and a cooldown would bound it.
  • random.uniform for jitter is non-cryptographic use — fine.

Tests & quality

Test quality is genuinely strong: negative controls reproducing both upstream bugs, a real two-instance convergence test with NOTIFY-like semantics, the health-state matrix, and a live fault→reconnect→give-up driver. Gaps: the statistics-off reader-restart lifecycle (finding 1), a gap closing during the late recovery phase (finding 2), and partial-replay-with-refill eviction (finding 5). test_resync_is_single_flight...'s len(calls) <= 2 is a weak assertion (passes with 0 calls), though the follow-up assertions partially compensate. The .pre-commit-config.yaml toolchain pin is unrelated housekeeping — fine, well-explained, ideally a separate commit.


Bottom line: the availability fix is correct and verified against the real upstream internals; the consistency machinery works but has three lifecycle edge cases (findings 1–3) worth fixing before merge — they're each small, localized changes — and the docs should state the guarantee's actual scope (finding 4).

🤖 Generated with Claude Code

…rt, replay order

Addresses EliMoshkovich's deep-review findings 1-5 + the single-flight test note,
and Copilot's docs comment, on PR #915.

- F1 (HIGH): make gap detection reader-task-local instead of an instance flag, so a
  fresh reader task (after the last client left and one returns) does not fire a
  spurious resync on its first connect. Hits default stats-off deployments.
- F2: single-flight recovery now reruns for a gap that lands during the late recovery
  phase (rerun-requested flag) instead of permanently dropping its flush + resync.
- F3: on max-retries give-up, fire a give-up hook wired to the worker's graceful
  shutdown regardless of OPAL_STATISTICS_ENABLED (was only wired in the stats path);
  count connect-OK/instant-close flaps toward the retry budget so MAX_RETRIES can trip.
- F5: partial-replay re-enqueue preserves drop-oldest (rebuild unsent+refill) instead
  of extendleft evicting the newest refill on a full deque.
- F4 + Copilot (docs): document OPAL_BROADCAST_HEALTHCHECK_ENABLED in the resilience
  table; state the resync guarantee's scope (policy + configured data sources, not
  runtime inline updates) and the cross-worker replay-ordering caveat.
- Strengthen the single-flight test assertion (was len(calls) <= 2, passed on 0).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
@zeevmoney

Copy link
Copy Markdown
Contributor Author

Thanks @EliMoshkovich — excellent, precise review, and the upstream verification is much appreciated. Findings 1–5 + the test note are addressed in 671776e; F6 is intentionally deferred with rationale below.

1. HIGH — _had_prior_connection conflates "backbone gap" with "reader task restart"

Fixed. Gap detection is now reader-task-localhad_prior_connection is a local in __read_notifications__, set after the first subscribe within that task. A fresh reader task (last client left, then one returns) starts clean and no longer schedules a spurious recovery on its first connect; only a reconnect within the same task's loop (a real gap) does. New test test_fresh_reader_task_does_not_recover_on_first_connect.

2. MEDIUM — single-flight recovery can permanently skip a gap's replay and resync

Fixed with the rerun-requested flag you suggested. _schedule_gap_recovery sets _recovery_rerun_requested when a recovery is in flight; _recover_after_gap is now a loop that clears the flag at the top of each iteration and re-checks it after the full pin→settle→flush→fire body — so a gap landing at any point (including during _fire_reconnect, after the flush) triggers exactly one more iteration. New test test_recovery_reruns_for_gap_during_late_phase.

3. MEDIUM — after max-retries give-up, the worker doesn't actually restart

Fixed both parts:

  • (a) Added a broadcaster-level on_give_up hook, wired in server.py (_wire_broadcaster_give_up) to the same _graceful_shutdown() the stats path uses — independent of OPAL_STATISTICS_ENABLED. It fires only on give-up (the reader returns), never on cancellation, so clean shutdown doesn't re-trigger it; a redundant second SIGTERM to an already-terminating worker is a harmless no-op. (Went with the hook rather than touching the stats done-callback — the cleaner option you flagged.)
  • (b) attempt is no longer reset on connect, only after the subscriber sustains (first read), so a connect-OK/instant-close flap now increments it and MAX_RETRIES can trip on flap loops. New tests test_flap_loop_counts_toward_give_up, test_give_up_hook_not_fired_on_cancellation, + server-wiring tests.

4. MEDIUM (docs) — the "resync = guarantee" claim is broader than the mechanism

Fixed in broadcast-interface.mdx: (1) the resync refetch covers policy + the configured OPAL_DATA_CONFIG_SOURCES, but runtime incremental updates (POST /data/config inline data / one-off URLs) are recoverable only via the best-effort replay buffer; (2) cross-worker replay ordering is not enforced (self-heals for fetch-URL entries, stale-wins possible for inline-data).

5. LOW — partial-replay re-enqueue can evict the newest entries

Fixed. _requeue_unsent rebuilds under the lock as unsent + refill (clear() then extend), so the bounded deque drops from the front (oldest) on overflow — preserving drop-oldest and unsent-before-refill order. New test test_partial_replay_requeue_preserves_drop_oldest.

6. LOW — resync amplification on a flapping backbone

Deferred (no code change) — finding 1's fix already removes the main amplification source (spurious resyncs on reader restarts). A minimum-interval cooldown between resyncs and byte-bounding the buffer (vs the current 10000-count bound) are good enhancements I'll track as a follow-up rather than widen this PR.

test_resync_is_single_flight...'s len(calls) <= 2 passes with 0 calls

Fixed — now asserts 1 <= len(calls) <= 2, == 2 under the new rerun semantics, and the rerun-flag state.

The security assessment and upstream-internals verification match what we found — thanks again for the depth. Full suite green (58 opal-server + the timing-sensitive broadcaster/recovery/healthcheck subset stable across repeated runs).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants