Appearance
SubscriptionEventWatcher
The SubscriptionEventWatcher is a singleton Durable Object that polls on-chain subscription contract events from Arbitrum and Ethereum, then syncs them to each user's UserPaperTradePortfolio. It acts as a fallback reconciliation layer — when the frontend fails to notify the backend of a subscription change (e.g., mobile wallet transaction), this DO catches the event from the blockchain and applies it.
High-Level Design
Alarm-Driven Polling Loop
The watcher runs a 30-second alarm loop that processes both chains independently each cycle:
alarm() every 30s
├─ processChainEvents("ARBITRUM")
├─ processChainEvents("ETHEREUM")
├─ cleanupOldEvents()
└─ schedule next alarmEach chain maintains its own block cursor — one chain lagging doesn't block the other. A maximum of 1000 blocks per chain per cycle prevents RPC timeouts; if behind, the watcher catches up gradually over subsequent alarms.
Data Flow
Blockchain SubscriptionEventWatcher UserPaperTrade
(Arb/Eth) Portfolio
│ │ │
│ 1. eth_getLogs │ │
│ (fromBlock..toBlock) │ │
│<─────────────────────────────│ │
│ │ │
│ 2. Raw logs │ │
│─────────────────────────────>│ │
│ │ │
│ 3. Parse logs into events │
│ 4. Deduplicate (txHash-logIndex) │
│ 5. Group by transaction │
│ 6. Correlate PaymentCharged │
│ with state-change events │
│ │ │
│ │ 7. RPC calls per event │
│ │ (createSubscription, │
│ │ cancelSubscription, │
│ │ renewSubscription, etc.) │
│ │───────────────────────────>│
│ │ │
│ 8. Mark events processed │
│ 9. Advance cursor to toBlock │Event Types
The watcher monitors 6 contract events from a single subscription contract deployed at the same address on both chains (via CREATE2):
| Event | Action on Portfolio |
|---|---|
Subscribed | createSubscription() (or resumeSubscription() if already active) |
Unsubscribed | cancelSubscription() — starts wind-down |
SubscriptionRenewed | renewSubscription() with correlated payment details |
PaymentCharged | Correlation-only — payment data attached to Subscribed/Renewed/Upgraded events in same tx (no standalone portfolio call) |
SubscriptionUpgraded | upgradePlan() — mid-cycle upgrade |
SubscriptionDowngraded | schedulePlanChange() — deferred downgrade |
Tier indices map to plans: 0 → starter, 1 → standard, 2 → pro.
Transaction Grouping & Payment Correlation
Events are grouped by transaction hash before processing. This allows the handler to find a PaymentCharged event in the same transaction as a Subscribed or SubscriptionRenewed event and attach the payment details (amount, token, chain) to the subscription RPC call. This avoids requiring separate lookups for billing information.
PaymentCharged is correlation-only: its data (amount, token) is passed as context to createSubscription(), renewSubscription(), or upgradePlan() if present in the same transaction. It does not trigger a standalone recordPayment() call — the correlated path in each subscription RPC produces richer payment records (with subscription ID, plan, promo code). For 100% discount upgrades where no PaymentCharged event exists on-chain, upgradePlan() falls back to the current subscription's billing token/chain, then "USDC"/"ARBITRUM". A warning is logged in processEventBatch() if a PaymentCharged event has no correlated subscription event in the same transaction.
Deduplication
Each event is uniquely identified by ${txHash}-${logIndex}. Processed event hashes are stored in SQLite and checked before processing. This handles:
- Blockchain reorgs: Re-fetched logs for the same block range are safely skipped
- Cursor resets: Admin-triggered backfills won't double-process events
- Retention: Hashes older than 10,000 blocks are garbage-collected each cycle
Idempotency
Beyond deduplication, each downstream RPC call is inherently idempotent via txHash — the portfolio uses the transaction hash as a unique key for subscription operations, so replaying the same event is a no-op.
Error Handling
- Per-event try-catch: One failing event doesn't block the rest of the batch
- Failed events are dropped: Only successfully handled events are added to the dedup table, but the cursor still advances past
toBlock. This means a failed event in an already-processed block range will not be retried unless the cursor is manually reset viaresetCursor() - Health threshold: The watcher reports healthy when
blocksBehind < 2000per chain
Managers
| Manager | Responsibility |
|---|---|
| CursorStateManager | Tracks last_processed_block per chain. Initializes 100 blocks behind current on first run. Supports admin resetCursor() for backfill. |
| DeduplicationManager | Stores processed event hashes (txHash-logIndex). Garbage-collects entries older than 10K blocks. |
| RpcManager | Executes eth_getLogs and eth_blockNumber calls. Uses environment-specific RPC endpoints (Tenderly proxy in dev/staging, public RPCs in production). |
| EventHandlerManager | Parses raw logs into typed events, groups by transaction, correlates payments, and dispatches RPC calls to UserPaperTradePortfolio. |