Skip to content

SubscriptionEventWatcher

The SubscriptionEventWatcher is a singleton Durable Object that polls on-chain subscription contract events from Arbitrum and Ethereum, then syncs them to each user's UserPaperTradePortfolio. It acts as a fallback reconciliation layer — when the frontend fails to notify the backend of a subscription change (e.g., mobile wallet transaction), this DO catches the event from the blockchain and applies it.

High-Level Design

Alarm-Driven Polling Loop

The watcher runs a 30-second alarm loop that processes both chains independently each cycle:

alarm() every 30s
  ├─ processChainEvents("ARBITRUM")
  ├─ processChainEvents("ETHEREUM")
  ├─ cleanupOldEvents()
  └─ schedule next alarm

Each chain maintains its own block cursor — one chain lagging doesn't block the other. A maximum of 1000 blocks per chain per cycle prevents RPC timeouts; if behind, the watcher catches up gradually over subsequent alarms.

Data Flow

  Blockchain              SubscriptionEventWatcher           UserPaperTrade
  (Arb/Eth)                                                    Portfolio
      │                              │                            │
      │  1. eth_getLogs              │                            │
      │  (fromBlock..toBlock)        │                            │
      │<─────────────────────────────│                            │
      │                              │                            │
      │  2. Raw logs                 │                            │
      │─────────────────────────────>│                            │
      │                              │                            │
      │                   3. Parse logs into events               │
      │                   4. Deduplicate (txHash-logIndex)        │
      │                   5. Group by transaction                 │
      │                   6. Correlate PaymentCharged             │
      │                      with state-change events             │
      │                              │                            │
      │                              │  7. RPC calls per event    │
      │                              │  (createSubscription,      │
      │                              │   cancelSubscription,      │
      │                              │   renewSubscription, etc.) │
      │                              │───────────────────────────>│
      │                              │                            │
      │                   8. Mark events processed                │
      │                   9. Advance cursor to toBlock            │

Event Types

The watcher monitors 6 contract events from a single subscription contract deployed at the same address on both chains (via CREATE2):

EventAction on Portfolio
SubscribedcreateSubscription() (or resumeSubscription() if already active)
UnsubscribedcancelSubscription() — starts wind-down
SubscriptionRenewedrenewSubscription() with correlated payment details
PaymentChargedCorrelation-only — payment data attached to Subscribed/Renewed/Upgraded events in same tx (no standalone portfolio call)
SubscriptionUpgradedupgradePlan() — mid-cycle upgrade
SubscriptionDowngradedschedulePlanChange() — deferred downgrade

Tier indices map to plans: 0 → starter, 1 → standard, 2 → pro.

Transaction Grouping & Payment Correlation

Events are grouped by transaction hash before processing. This allows the handler to find a PaymentCharged event in the same transaction as a Subscribed or SubscriptionRenewed event and attach the payment details (amount, token, chain) to the subscription RPC call. This avoids requiring separate lookups for billing information.

PaymentCharged is correlation-only: its data (amount, token) is passed as context to createSubscription(), renewSubscription(), or upgradePlan() if present in the same transaction. It does not trigger a standalone recordPayment() call — the correlated path in each subscription RPC produces richer payment records (with subscription ID, plan, promo code). For 100% discount upgrades where no PaymentCharged event exists on-chain, upgradePlan() falls back to the current subscription's billing token/chain, then "USDC"/"ARBITRUM". A warning is logged in processEventBatch() if a PaymentCharged event has no correlated subscription event in the same transaction.

Deduplication

Each event is uniquely identified by ${txHash}-${logIndex}. Processed event hashes are stored in SQLite and checked before processing. This handles:

  • Blockchain reorgs: Re-fetched logs for the same block range are safely skipped
  • Cursor resets: Admin-triggered backfills won't double-process events
  • Retention: Hashes older than 10,000 blocks are garbage-collected each cycle

Idempotency

Beyond deduplication, each downstream RPC call is inherently idempotent via txHash — the portfolio uses the transaction hash as a unique key for subscription operations, so replaying the same event is a no-op.

Error Handling

  • Per-event try-catch: One failing event doesn't block the rest of the batch
  • Failed events are dropped: Only successfully handled events are added to the dedup table, but the cursor still advances past toBlock. This means a failed event in an already-processed block range will not be retried unless the cursor is manually reset via resetCursor()
  • Health threshold: The watcher reports healthy when blocksBehind < 2000 per chain

Managers

ManagerResponsibility
CursorStateManagerTracks last_processed_block per chain. Initializes 100 blocks behind current on first run. Supports admin resetCursor() for backfill.
DeduplicationManagerStores processed event hashes (txHash-logIndex). Garbage-collects entries older than 10K blocks.
RpcManagerExecutes eth_getLogs and eth_blockNumber calls. Uses environment-specific RPC endpoints (Tenderly proxy in dev/staging, public RPCs in production).
EventHandlerManagerParses raw logs into typed events, groups by transaction, correlates payments, and dispatches RPC calls to UserPaperTradePortfolio.