Skip to content

feat(pipes): resolve table dependencies for cache invalidation#343

Open
taitelee wants to merge 21 commits into
mainfrom
pipe-cache-invalidation
Open

feat(pipes): resolve table dependencies for cache invalidation#343
taitelee wants to merge 21 commits into
mainfrom
pipe-cache-invalidation

Conversation

@taitelee

Copy link
Copy Markdown
Contributor

Summary

Pipes were cached TTL-only because they couldn't report which tables they read, so an ingest never invalidated a stale pipe result (#178). This teaches a pipe to resolve the ingested base tables its SQL reads — through views and aliases — and version-invalidate on the same namespace-versioned path structured queries already use.

When a pipe is created or updated, PipesHandler.Put resolves its base tables by running EXPLAIN QUERY TREE over dummy-bound SQL (the table set depends only on the SQL, not on per-request parameter values) and keeping the names the schema
registry knows in the configured database; the result is stored on the pipe (ResolvedTables, server-owned — never trusted from client input). Execute folds those tables into the cache key as whole-table namespaces, encoded exactly as the ingest worker encodes them, so an ingest into any of them invalidates the cached result. Resolution is best-effort and bounded: with no registry/ClickHouse wired or on any failure, the pipe falls back to the previous TTL-only behavior,

Related Issues

Closes #178

@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

The PR implements dependency-aware pipe cache invalidation. VersionManager is refactored to a two-level tableVersions/namespaceVersions model with BumpTable/BumpNamespace APIs. A new pipe_deps.go module resolves a pipe's referenced base tables via EXPLAIN QUERY TREE with recursive view expansion and backtick-aware identifier parsing. NamedQuery stores resolved tables and DummyBind generates safe runnable SQL for analysis. PipesHandler populates resolved tables at Put time and uses them to key cache entries in Execute, enabling ingest-triggered invalidation to evict stale pipe results.

Changes

Pipe dependency-based cache invalidation

Layer / File(s) Summary
VersionManager two-level versioning refactor
internal/cache/version_manager.go
Replaces single versions map with tableVersions and namespaceVersions; adds Namespace struct, NamespaceKey, QueryKey, BumpTable, BumpNamespace; removes GetCacheKey, GetVersion, IncrementVersion.
NamedQuery.ResolvedTables and DummyBind helper
internal/pipes/pipes.go, internal/pipes/dummybind_test.go
NamedQuery gains a server-owned ResolvedTables []string field; DummyBind generates runnable SQL for EXPLAIN analysis by substituting type-appropriate dummy values for all parameters; tests validate typed, boolean, bare inline, inline-default, and no-param cases.
Pipe SQL dependency extraction via EXPLAIN QUERY TREE
internal/api/pipe_deps.go
New pipe_deps.go implements best-effort base-table resolution through EXPLAIN QUERY TREE with recursive view expansion, cycle prevention, identifier parsing (backtick/escape handling), and schema-registry filtering; helpers include resolvePipeDeps, collectReadTables, explainQueryTreeTables, parseQueryTreeTables, filterKnownTables, splitQualified, unquoteIdent, and pipeDeps.
Pipe deps unit tests
internal/api/pipe_deps_test.go
Unit tests cover all parsing/normalization helpers and the pipeDeps namespace converter; fakeCache test double records cache dependency flow; handler tests assert deps pass through on cache HIT/MISS and server-side ownership of resolved_tables.
PipesHandler dependency-based cache wiring
internal/api/pipes.go
PipesHandler adds Registry and Database fields for best-effort resolution; Put calls resolvePipeDeps to populate ResolvedTables (server-owned); Execute computes deps from resolved tables and passes them to Cache.Get/Cache.Set instead of always nil, enabling ingest invalidation to reach pipe cache entries.
Main wiring and integration tests
cmd/wavehouse/main.go, tests/integration/pipe_deps_test.go, clients/ts/src/types.ts, docs/src/content/docs/pipes.mdx, tests/integration/setup_test.go, CHANGELOG.md
main.go constructs pipesHandler separately to assign Registry/Database; integration tests exercise view→base-table resolution, materialized view source tracking, direct table references, UNION multi-table resolution, and MISS→HIT→invalidation→MISS cache cycles against a real ClickHouse instance; client type adds resolved_tables field; docs describe cache invalidation behavior and TTL fallback; setup improves table-name sanitization; CHANGELOG documents two-level versioning and dependency resolution.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant PipesHandler
    participant resolvePipeDeps
    participant ClickHouse
    participant SchemaRegistry
    participant Cache

    rect rgba(100, 149, 237, 0.5)
        Note over Client,Cache: PUT /v1/pipes/{name}
        Client->>PipesHandler: PUT pipe SQL
        PipesHandler->>resolvePipeDeps: pipe definition
        resolvePipeDeps->>ClickHouse: DummyBind SQL → EXPLAIN QUERY TREE
        ClickHouse-->>resolvePipeDeps: table_name identifiers
        resolvePipeDeps->>ClickHouse: system.tables as_select (view expansion)
        ClickHouse-->>resolvePipeDeps: view SQL (recursive)
        resolvePipeDeps->>SchemaRegistry: filterKnownTables
        SchemaRegistry-->>resolvePipeDeps: resolved base table names
        resolvePipeDeps-->>PipesHandler: ResolvedTables []string
        PipesHandler->>Cache: store pipe with ResolvedTables
    end

    rect rgba(60, 179, 113, 0.5)
        Note over Client,Cache: GET /v1/pipes/{name}/execute
        Client->>PipesHandler: Execute request
        PipesHandler->>PipesHandler: pipeDeps(q.ResolvedTables) → []Namespace
        PipesHandler->>Cache: Get(sha, deps)
        Cache-->>PipesHandler: HIT or MISS
        alt MISS
            PipesHandler->>ClickHouse: run pipe SQL
            ClickHouse-->>PipesHandler: results
            PipesHandler->>Cache: Set(sha, deps, results)
        end
        PipesHandler-->>Client: results + X-Cache header
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • Wave-RF/WaveHouse#314: Introduces the cache.Namespace struct and foundational VersionManager.QueryKey/BumpTable/BumpNamespace APIs that this PR's pipe invalidation logic is built on.
  • Wave-RF/WaveHouse#177: Modifies PipesHandler cache wiring in internal/api/pipes.go, directly overlapping with this PR's changes to Execute and Put cache integration.
  • Wave-RF/WaveHouse#172: Changes PipesHandler authorization in Execute, colliding with the same handler code paths modified here for dependency-aware cache keying.

Suggested labels

area/query, area/ingest

Suggested reviewers

  • EricAndrechek
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.94% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(pipes): resolve table dependencies for cache invalidation' directly and clearly summarizes the main change: enabling pipes to resolve and use table dependencies for cache invalidation.
Description check ✅ Passed The description explains the problem (pipes cached TTL-only without table awareness) and the solution (resolving base tables via EXPLAIN QUERY TREE and version-invalidating), clearly relating to the changeset.
Linked Issues check ✅ Passed The PR fully addresses issue #178's acceptance criteria: pipe cache is now invalidated on table writes, resolved tables are extracted via EXPLAIN QUERY TREE at pipe registration, and multi-table pipes are supported with consistent cache namespace encoding.
Out of Scope Changes check ✅ Passed All changes are directly related to pipe dependency resolution and cache invalidation: query tree parsing, dependency discovery, version manager refactoring, and integration/unit tests validating the feature work.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch pipe-cache-invalidation
✨ Simplify code
  • Create PR with simplified code
  • Commit simplified code in branch pipe-cache-invalidation

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@github-actions github-actions Bot added go Pull requests that update go code area/api HTTP handlers, routing, middleware area/ingest Ingest pipeline (Bento, batching, DLQ) area/cache Local / shared / tiered caching area/pipes Named query pipes area/docs Documentation, site/, README area/infra CI, build, deploy, Docker, release labels Jun 11, 2026

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: d7ff65f4-c449-4c3d-a7b3-4ef586eb7bbe

📥 Commits

Reviewing files that changed from the base of the PR and between 9661c82 and 27ae1f1.

📒 Files selected for processing (18)
  • CHANGELOG.md
  • cmd/wavehouse/main.go
  • internal/api/pipe_deps.go
  • internal/api/pipe_deps_test.go
  • internal/api/pipes.go
  • internal/api/structured_query.go
  • internal/cache/cache.go
  • internal/cache/cache_test.go
  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
  • internal/ingest/worker.go
  • internal/ingest/worker_test.go
  • internal/pipes/dummybind_test.go
  • internal/pipes/pipes.go
  • internal/testutil/mocks.go
  • tests/integration/pipe_deps_test.go
💤 Files with no reviewable changes (1)
  • internal/cache/cache_test.go
📜 Review details
🧰 Additional context used
📓 Path-based instructions (7)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Use Go 1.26 with strict formatting enforced by gofumpt
Use structured logging with log/slog (JSON handler)
Use Chi v5 for HTTP routing
Return errors, don't panic. Wrap with fmt.Errorf("context: %w", err)
Use package naming: lowercase, single word (or abbreviated). internal/ enforces module privacy
No global state: Dependencies are passed explicitly (constructor injection)
Comment the why, not the what. Add a comment only when the reason isn't obvious from the code; a line that matches the surrounding pattern needs none. Keep comments to 1–2 lines
DRY — one source of truth. Before adding logic, look for an existing helper, type, or constant to reuse; before duplicating a rule, factor it into one place every caller reads
Leave it neater than you found it — within reason. Fix small, safe things in passing: a stale comment, an obvious typo, a misnamed local, dead code on your path

Files:

  • internal/api/structured_query.go
  • internal/testutil/mocks.go
  • internal/ingest/worker.go
  • internal/pipes/pipes.go
  • internal/pipes/dummybind_test.go
  • cmd/wavehouse/main.go
  • tests/integration/pipe_deps_test.go
  • internal/ingest/worker_test.go
  • internal/api/pipe_deps.go
  • internal/cache/local.go
  • internal/api/pipe_deps_test.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/api/pipes.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
internal/api/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Chi HTTP router, JWT/JWKS middleware (from auth/), ingest/query/structured-query/SSE/schema/DLQ/policy/pipes handlers, Hub

Files:

  • internal/api/structured_query.go
  • internal/api/pipe_deps.go
  • internal/api/pipe_deps_test.go
  • internal/api/pipes.go
internal/ingest/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Ingest worker pipeline (worker.go): JetStream input → per-table batch INSERT with DLQ output. The pipeline is insert-only. Wire format EventMessage carries {table_name, scope, received_timestamp, data} and nothing else

Files:

  • internal/ingest/worker.go
  • internal/ingest/worker_test.go
internal/pipes/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Named query pipes: NamedQuery type + NATS KV store (WAVEHOUSE_PIPES) + .sql file bootstrap. Pre-defined SQL templates with param binding + caching; per-pipe allowed_roles is the only execute-path gate via policy.RoleAllowed

Files:

  • internal/pipes/pipes.go
  • internal/pipes/dummybind_test.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...)
Use shared mocks from internal/testutil/ (MockPublisher, MockCache, MockDeduplicator, MockSubscriber) instead of creating ad-hoc mocks
Use testutil.MakeJWT(t, claims) and testutil.MakeExpiredJWT(t, claims) for auth tests
Use testutil.NewTestSchemaRegistry(tables) or discovery.NewSchemaRegistryFromMap(tables) for schema-aware tests
Use policy.NewMemoryStore(p) for in-memory policy testing without NATS
Use pipes.NewMemoryStore(queries...) for in-memory pipes testing without NATS
Use testutil.AssertJSONResponse(t, rec, status, expected) and testutil.AssertJSONContains(t, rec, status, substring) for response assertions

Files:

  • internal/pipes/dummybind_test.go
  • tests/integration/pipe_deps_test.go
  • internal/ingest/worker_test.go
  • internal/api/pipe_deps_test.go
  • internal/cache/local_test.go
  • internal/cache/version_manager_test.go
tests/integration/**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

Go integration tests (//go:build integration; ClickHouse testcontainer); run via make test-integration

Files:

  • tests/integration/pipe_deps_test.go
internal/cache/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Implement Cache interface → LocalCache (Ristretto) + SharedCache (TBD) + TieredCache (singleflight)

Files:

  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
🧠 Learnings (5)
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.

Applied to files:

  • CHANGELOG.md
📚 Learning: 2026-05-25T11:24:21.130Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 180
File: internal/cache/local.go:0-0
Timestamp: 2026-05-25T11:24:21.130Z
Learning: In WaveHouse’s cache packages (e.g., internal/cache/local.go), it’s acceptable to define package-level `var` constants that hold immutable OpenTelemetry metric attribute sets / `metric.MeasurementOption` values (for example: `cacheL1Attrs = metric.WithAttributes(attribute.String("tier","L1"))`). Treat these as stateless, pre-allocated option values (analogous to `regexp.MustCompile(...)`), not mutable global state. When applying the AGENTS.md “no global state / constructor injection” guideline, apply it to application dependencies (e.g., Cache, Publisher, Deduplicator) rather than to these immutable OTel attribute/measurement option variables—do not flag them as constructor-injection violations.

Applied to files:

  • internal/cache/local.go
  • internal/cache/local_test.go
  • internal/cache/cache.go
  • internal/cache/version_manager.go
  • internal/cache/version_manager_test.go
📚 Learning: 2026-05-20T01:02:00.784Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 164
File: internal/api/router_test.go:289-350
Timestamp: 2026-05-20T01:02:00.784Z
Learning: In WaveHouse’s internal API tests (files matching internal/api/**/*_test.go), follow the existing separation-of-concerns convention for testing the RequireRole middleware: inject `ContextKeyRole` directly into the request `context.Context` instead of using `testutil.MakeJWT`/JWT-driven flows. Do not refactor role-gate tests to use JWT tokens—JWT parsing and token handling are covered separately in `middleware_test.go` (the dedicated JWT parsing tests), and mixing those concerns would expand the failure surface and reduce isolation.

Applied to files:

  • internal/api/pipe_deps_test.go
📚 Learning: 2026-05-23T01:23:59.268Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 174
File: internal/api/ingest_test.go:111-111
Timestamp: 2026-05-23T01:23:59.268Z
Learning: In WaveHouse Go tests in internal/api/**/*_test.go, use internal/testutil.AssertJSONErrorResponse(t, w) for HTTP error-path JSON assertions. Do not use (or reintroduce) package-local assertJSONErrorResponse helpers. AssertJSONErrorResponse verifies the response Content-Type is application/json, includes the X-Content-Type-Options: nosniff header, and that the JSON body contains an "error" field.

Applied to files:

  • internal/api/pipe_deps_test.go
📚 Learning: 2026-05-20T20:30:15.808Z
Learnt from: taitelee
Repo: Wave-RF/WaveHouse PR: 172
File: internal/api/pipes_test.go:106-118
Timestamp: 2026-05-20T20:30:15.808Z
Learning: For WaveHouse pipes authorization allowlist checks, fix the empty-role fail-open behavior by (1) removing any outer guard that prevents allowlist evaluation when the incoming `role` is `""` (e.g., don’t short-circuit with `if role != "" { ... }`), and (2) during allowlist scanning, ensure only non-empty allowlist entries can match—e.g., require `ar != "" && ar == role` (so a malformed allowlist like `["" ]` cannot grant access to an empty incoming role via `"" == ""`).

Applied to files:

  • internal/api/pipes.go
🔇 Additional comments (30)
internal/cache/cache.go (1)

8-32: LGTM!

internal/cache/version_manager.go (3)

16-32: LGTM!


34-51: LGTM!


76-94: LGTM!

internal/cache/version_manager_test.go (1)

9-72: LGTM!

internal/cache/local.go (1)

31-75: LGTM!

internal/cache/local_test.go (1)

12-161: LGTM!

internal/api/structured_query.go (2)

129-143: LGTM!


172-174: LGTM!

internal/testutil/mocks.go (1)

117-136: LGTM!

internal/ingest/worker_test.go (4)

224-232: LGTM!


449-543: LGTM!


704-731: LGTM!


733-764: LGTM!

internal/ingest/worker.go (1)

461-509: LGTM!

internal/pipes/pipes.go (2)

29-37: LGTM!


204-243: LGTM!

internal/api/pipe_deps.go (7)

28-35: LGTM!


43-56: LGTM!


64-83: LGTM!


90-103: LGTM!


134-153: LGTM!


196-205: LGTM!


179-187: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Incorrect escape sequence handling for identifiers with both backslashes and backticks.

Lines 183-184 unescape ClickHouse backtick-quoted identifiers in the wrong order, causing incorrect results when an identifier contains both a literal backslash and a literal backtick. ClickHouse uses \\ for a backslash and ``` for a backtick inside backtick-quoted identifiers.

Example: The ClickHouse identifier a\b(a, backslash, backtick, b) is written in SQL as ``a\`b`` (where\` and \`` → `` ``). After stripping the outer backticks, the string is a\\\b`. The current code:

  1. Line 183 replaces \`` with `` ``: a\\\b` → `a\b`
  2. Line 184 tries to replace \\\\ with \\: no match (only two backslashes remain)
  3. Result: a\\b (wrong; expected a\b`)

Sequential ReplaceAll is unsafe here because the replacements can interact. The correct approach is to process escape sequences in a single pass:

  • Iterate byte-by-byte; when \ is seen, check the next byte:
    • If \, output one \ and advance two bytes
    • If `, output one ` and advance two bytes
    • Otherwise, output \ (or handle as an error)
🔧 Proposed fix for correct escape handling
 func unquoteIdent(s string) string {
 	s = strings.TrimSpace(s)
 	if len(s) >= 2 && s[0] == '`' && s[len(s)-1] == '`' {
-		s = s[1 : len(s)-1]
-		s = strings.ReplaceAll(s, "\\`", "`")
-		s = strings.ReplaceAll(s, "\\\\", "\\")
+		s = s[1 : len(s)-1] // strip outer backticks
+		// Unescape in a single pass to avoid interaction between \\ and \`
+		var out strings.Builder
+		for i := 0; i < len(s); i++ {
+			if s[i] == '\\' && i+1 < len(s) {
+				next := s[i+1]
+				if next == '\\' || next == '`' {
+					out.WriteByte(next)
+					i++ // skip the next byte (already consumed)
+					continue
+				}
+			}
+			out.WriteByte(s[i])
+		}
+		s = out.String()
 	}
 	return s
 }
			> Likely an incorrect or invalid review comment.
internal/api/pipes.go (3)

27-33: LGTM!


76-80: LGTM!


155-193: LGTM!

cmd/wavehouse/main.go (1)

367-372: LGTM!

internal/pipes/dummybind_test.go (1)

1-63: LGTM!

internal/api/pipe_deps_test.go (1)

1-192: LGTM!

Comment thread CHANGELOG.md Outdated
Comment thread internal/api/pipe_deps.go
Comment thread internal/cache/version_manager.go
Comment thread tests/integration/pipe_deps_test.go
@github-project-automation github-project-automation Bot moved this from Backlog to In review in WaveHouse Task Board Jun 11, 2026
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 14, 2026
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 14, 2026
@github-actions github-actions Bot removed the area/ingest Ingest pipeline (Bento, batching, DLQ) label Jun 14, 2026
@github-code-quality

github-code-quality Bot commented Jun 14, 2026

Copy link
Copy Markdown
Contributor

Code Coverage Overview

Languages: Go

Go

The overall coverage in the branch remains at 89%, unchanged from the branch.

Show a code coverage summary of the most impacted files.
File e583c45 d6f447a +/-
internal/cache/...sion_manager.go 100% 100% 0%
cmd/wavehouse/main.go 68% 69% +1%
internal/pipes/pipes.go 83% 85% +2%
internal/api/pipes.go 86% 90% +4%
internal/api/pipe_deps.go 0% 86% +86%

Updated June 16, 2026 19:47 UTC
Code Coverage is in Public Preview. Learn more and provide us with your feedback.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
internal/pipes/pipes.go (1)

263-271: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Return an array-shaped dummy for declared array parameters.

ParamDef.Type documents "array", but dummyForType falls through to "x". A pipe like WHERE id IN {{ids}} dummy-binds as IN 'x', making EXPLAIN fail and leaving ResolvedTables nil/TTL-only for a common filtered-pipe shape.

Proposed fix
 func dummyForType(t string) any {
 	switch strings.ToLower(t) {
 	case "number":
 		return 0
 	case "boolean":
 		return false
+	case "array":
+		return []any{nil}
 	default: // "string" and anything unrecognized
 		return "x"
 	}
 }
internal/api/pipes.go (1)

27-33: 🛠️ Refactor suggestion | 🟠 Major | 🏗️ Heavy lift

Pass dependency-resolution collaborators through the constructor.

Registry and Database are required for the new invalidation behavior, but they are optional mutable fields after NewPipesHandler, so any missed call site silently falls back to TTL-only caching. Prefer constructor args or an options struct that initializes them with the rest of the handler dependencies.

As per coding guidelines, “No global state: Dependencies are passed explicitly (constructor injection)”.

Also applies to: 47-48

Source: Coding guidelines


ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: bf950fb1-b7db-47a4-975e-0611d95c9907

📥 Commits

Reviewing files that changed from the base of the PR and between 038a205 and b1e4bcc.

📒 Files selected for processing (6)
  • CHANGELOG.md
  • cmd/wavehouse/main.go
  • internal/api/pipe_deps.go
  • internal/api/pipes.go
  • internal/pipes/pipes.go
  • tests/integration/pipe_deps_test.go
📜 Review details
⏰ Context from checks skipped due to timeout of 300000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Coverage
  • GitHub Check: Integration tests
  • GitHub Check: Unit tests
  • GitHub Check: E2E tests
🧰 Additional context used
📓 Path-based instructions (5)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Use Go 1.26 with strict formatting enforced by gofumpt
Use structured logging with log/slog (JSON handler)
Use Chi v5 for HTTP routing
Return errors, don't panic. Wrap with fmt.Errorf("context: %w", err)
Use package naming: lowercase, single word (or abbreviated). internal/ enforces module privacy
No global state: Dependencies are passed explicitly (constructor injection)
Comment the why, not the what. Add a comment only when the reason isn't obvious from the code; a line that matches the surrounding pattern needs none. Keep comments to 1–2 lines
DRY — one source of truth. Before adding logic, look for an existing helper, type, or constant to reuse; before duplicating a rule, factor it into one place every caller reads
Leave it neater than you found it — within reason. Fix small, safe things in passing: a stale comment, an obvious typo, a misnamed local, dead code on your path

Files:

  • cmd/wavehouse/main.go
  • internal/pipes/pipes.go
  • internal/api/pipe_deps.go
  • internal/api/pipes.go
  • tests/integration/pipe_deps_test.go
internal/pipes/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Named query pipes: NamedQuery type + NATS KV store (WAVEHOUSE_PIPES) + .sql file bootstrap. Pre-defined SQL templates with param binding + caching; per-pipe allowed_roles is the only execute-path gate via policy.RoleAllowed

Files:

  • internal/pipes/pipes.go
internal/api/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Chi HTTP router, JWT/JWKS middleware (from auth/), ingest/query/structured-query/SSE/schema/DLQ/policy/pipes handlers, Hub

Files:

  • internal/api/pipe_deps.go
  • internal/api/pipes.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...)
Use shared mocks from internal/testutil/ (MockPublisher, MockCache, MockDeduplicator, MockSubscriber) instead of creating ad-hoc mocks
Use testutil.MakeJWT(t, claims) and testutil.MakeExpiredJWT(t, claims) for auth tests
Use testutil.NewTestSchemaRegistry(tables) or discovery.NewSchemaRegistryFromMap(tables) for schema-aware tests
Use policy.NewMemoryStore(p) for in-memory policy testing without NATS
Use pipes.NewMemoryStore(queries...) for in-memory pipes testing without NATS
Use testutil.AssertJSONResponse(t, rec, status, expected) and testutil.AssertJSONContains(t, rec, status, substring) for response assertions

Files:

  • tests/integration/pipe_deps_test.go
tests/integration/**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

Go integration tests (//go:build integration; ClickHouse testcontainer); run via make test-integration

Files:

  • tests/integration/pipe_deps_test.go
🧠 Learnings (2)
📚 Learning: 2026-05-20T20:30:15.808Z
Learnt from: taitelee
Repo: Wave-RF/WaveHouse PR: 172
File: internal/api/pipes_test.go:106-118
Timestamp: 2026-05-20T20:30:15.808Z
Learning: For WaveHouse pipes authorization allowlist checks, fix the empty-role fail-open behavior by (1) removing any outer guard that prevents allowlist evaluation when the incoming `role` is `""` (e.g., don’t short-circuit with `if role != "" { ... }`), and (2) during allowlist scanning, ensure only non-empty allowlist entries can match—e.g., require `ar != "" && ar == role` (so a malformed allowlist like `["" ]` cannot grant access to an empty incoming role via `"" == ""`).

Applied to files:

  • internal/api/pipes.go
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.

Applied to files:

  • CHANGELOG.md
🔇 Additional comments (6)
internal/pipes/pipes.go (1)

29-37: LGTM!

Also applies to: 42-48, 152-165, 172-225, 274-355, 381-384

internal/api/pipe_deps.go (1)

16-68: LGTM!

Also applies to: 70-82, 112-297

internal/api/pipes.go (1)

39-44: LGTM!

Also applies to: 77-97, 157-178, 187-225

cmd/wavehouse/main.go (2)

391-391: LGTM!


107-117: The OTLP endpoint migration is complete and correct. cfg.OTel.Addr has been fully removed from the codebase (no references remain except a comment in the test fixture explicitly noting its absence). The config.yaml, documentation, and code consistently use the standard OTEL_EXPORTER_OTLP_ENDPOINT env var; no config-only deployments can silently fall back to a stale field.

tests/integration/pipe_deps_test.go (1)

145-250: LGTM!

Comment thread CHANGELOG.md Outdated
Comment thread internal/api/pipe_deps.go Outdated

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
internal/api/pipe_deps.go (1)

92-112: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Registry-known views can still survive as pipe dependencies. The resolver expands views, but it also keeps the original ref before expansion; the new test only checks that the base table is present, so it would not catch [base, view].

  • internal/api/pipe_deps.go#L92-L112: append r only on the non-view path, and continue after appending recursively expanded dependencies.
  • tests/integration/pipe_deps_test.go#L161-L165: assert exact resolved dependencies, or at least assert the view name is absent.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro Plus

Run ID: db06d93b-bf91-4b2f-b8bb-2bd9b91e4f57

📥 Commits

Reviewing files that changed from the base of the PR and between b1e4bcc and 394420d.

📒 Files selected for processing (3)
  • CHANGELOG.md
  • internal/api/pipe_deps.go
  • tests/integration/pipe_deps_test.go
📜 Review details
⏰ Context from checks skipped due to timeout of 300000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Coverage
  • GitHub Check: E2E tests
  • GitHub Check: Analyze (go)
  • GitHub Check: Lint
🧰 Additional context used
📓 Path-based instructions (4)
**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*.go: Use Go 1.26 with strict formatting enforced by gofumpt
Use structured logging with log/slog (JSON handler)
Use Chi v5 for HTTP routing
Return errors, don't panic. Wrap with fmt.Errorf("context: %w", err)
Use package naming: lowercase, single word (or abbreviated). internal/ enforces module privacy
No global state: Dependencies are passed explicitly (constructor injection)
Comment the why, not the what. Add a comment only when the reason isn't obvious from the code; a line that matches the surrounding pattern needs none. Keep comments to 1–2 lines
DRY — one source of truth. Before adding logic, look for an existing helper, type, or constant to reuse; before duplicating a rule, factor it into one place every caller reads
Leave it neater than you found it — within reason. Fix small, safe things in passing: a stale comment, an obvious typo, a misnamed local, dead code on your path

Files:

  • tests/integration/pipe_deps_test.go
  • internal/api/pipe_deps.go
**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

**/*_test.go: Use table-driven tests with tests := []struct{ name string; ... } and t.Run(tt.name, ...)
Use shared mocks from internal/testutil/ (MockPublisher, MockCache, MockDeduplicator, MockSubscriber) instead of creating ad-hoc mocks
Use testutil.MakeJWT(t, claims) and testutil.MakeExpiredJWT(t, claims) for auth tests
Use testutil.NewTestSchemaRegistry(tables) or discovery.NewSchemaRegistryFromMap(tables) for schema-aware tests
Use policy.NewMemoryStore(p) for in-memory policy testing without NATS
Use pipes.NewMemoryStore(queries...) for in-memory pipes testing without NATS
Use testutil.AssertJSONResponse(t, rec, status, expected) and testutil.AssertJSONContains(t, rec, status, substring) for response assertions

Files:

  • tests/integration/pipe_deps_test.go
tests/integration/**/*_test.go

📄 CodeRabbit inference engine (AGENTS.md)

Go integration tests (//go:build integration; ClickHouse testcontainer); run via make test-integration

Files:

  • tests/integration/pipe_deps_test.go
internal/api/**/*.go

📄 CodeRabbit inference engine (AGENTS.md)

Chi HTTP router, JWT/JWKS middleware (from auth/), ingest/query/structured-query/SSE/schema/DLQ/policy/pipes handlers, Hub

Files:

  • internal/api/pipe_deps.go
🧠 Learnings (1)
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.

Applied to files:

  • CHANGELOG.md
🔇 Additional comments (1)
CHANGELOG.md (1)

18-18: LGTM!

Also applies to: 63-64

Comment thread tests/integration/pipe_deps_test.go Outdated
coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 16, 2026
@github-actions github-actions Bot added documentation Improvements or additions to documentation area/sdk TypeScript SDK (clients/ts/) labels Jun 16, 2026
@github-actions

github-actions Bot commented Jun 16, 2026

Copy link
Copy Markdown

📚 Docs preview is livehttps://ea14c34b-wavehouse-docs.wave-rf.workers.dev

  • Commitd6f447a: fix(pipes): dummy-bind array params for table-dep resolution; tighten SDK type and document the resolver
  • Author@taitelee
  • Committed — 2026-06-16 15:42 (UTC-04:00)
  • Deployed — 2026-06-16 15:46 EDT

coderabbitai[bot]
coderabbitai Bot previously approved these changes Jun 16, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/api HTTP handlers, routing, middleware area/cache Local / shared / tiered caching area/docs Documentation, site/, README area/infra CI, build, deploy, Docker, release area/pipes Named query pipes area/sdk TypeScript SDK (clients/ts/) documentation Improvements or additions to documentation go Pull requests that update go code

Projects

Status: In review

Development

Successfully merging this pull request may close these issues.

feat(cache): pipe cache invalidation via table/scope extraction from named queries

1 participant