Skip to content

CandleAggregationManager

Generates 1-minute and 1-hour OHLC candles from raw 10-second price buckets using cursor-based chronological processing.

High-Level Design

Data Flow

PriceStorageManager.flushPriceData()
  │  (for each flushed bucket)

onBucketWritten(timestampBucket)

  ├──▶ tryAdvanceCursor1m()
  │      Check if next 1m period (6 × 10s buckets) is ready
  │      ├─ complete       → generate candle, store blob, advance cursor
  │      ├─ incomplete     → stop (wait for backfill)
  │      ├─ all failed     → skip period, advance cursor
  │      └─ too recent     → stop (period still in progress)
  │      Loop up to MAX_ADVANCE_1M (1000) times

  ├──▶ tryAdvanceCursor1h()
  │      Same logic for 360 × 10s buckets
  │      Loop up to MAX_ADVANCE_1H (72) times

  └──▶ maybeRunCleanup()
         Delete candles older than 30 days (once per day)

Cursor-Based Processing

Two independent cursors track the last completed period for each resolution. On every bucket write, the manager checks whether the cursor's next period is now complete and advances as far as possible in a single call.

CursorPeriodBuckets per PeriodMax Catch-Up
cursor1m60s61000 periods (~16.7 hours)
cursor1h3600s36072 periods (3 days)

Cursors are persisted to collector_metadata after every advance, making the system DO-eviction safe — on restart, processing resumes from the last persisted position.

Gap-Aware Advancement

The cursor cannot blindly advance. For each period it checks completeness against two data sources:

  1. price_data table — counts how many of the expected 10s buckets exist
  2. backfill_gaps table — determines if missing buckets are still recoverable or permanently failed

This creates three possible states for an incomplete period:

StateConditionAction
IncompleteMissing buckets with pending/in-progress backfillBlock — wait for BackfillManager
Permanently failedAll missing buckets have status='failed'Skip — advance cursor past period
Too recentPeriod end ≥ Date.now()Stop — period still accumulating data

This design ensures candles are never generated from partial data, while also never permanently blocking on gaps that will never be filled.

OHLC Calculation

Buckets within a period are processed in chronological order:

  • Open = first price point of the first bucket
  • High = maximum price across all buckets
  • Low = minimum price across all buckets
  • Close = last price point of the last bucket

All assets are packed into a single binary shard blob per candle timestamp (same format as price_data), stored via INSERT OR REPLACE for idempotency.

Storage

TableKeyContent
price_candles_1mminute-aligned timestampBinary shard blob with OHLC per asset (36 bytes/asset)
price_candles_1hhour-aligned timestampBinary shard blob with OHLC per asset (36 bytes/asset)

Binary format per candle: 4B timestamp + 32B OHLC (4 × int64) = 36 bytes per asset.

Retention

A daily cleanup job (triggered inside onBucketWritten) deletes candles older than 30 days from both tables.

Edge Cases

  • DO eviction: Cursors recovered from SQLite; MAX_ADVANCE limits prevent long catch-up blocking
  • First run: Cursors initialized to earliest price_data bucket
  • Idempotent writes: INSERT OR REPLACE makes regeneration after backfill safe
  • Missing metadata row: persistCursors() uses INSERT OR IGNORE defensively

See Also