Pool Indexer crate for uniswap-v3 liquidity from subgraphs#4422
Pool Indexer crate for uniswap-v3 liquidity from subgraphs#4422AryanGodara wants to merge 14 commits into
Conversation
|
Reminder: Please update the DB Readme and comment whether migrations are reversible (include rollback scripts if applicable).
Caused by: |
5937458 to
f10f0fc
Compare
2f7dab4 to
897f4ac
Compare
Goerli is no longer up. Removes the variant and all match arms across the chain, solvers, liquidity-sources, and price-estimation crates. Extracted from cowprotocol#4422 for independent review. Original comment: cowprotocol#4422 (comment)
897f4ac to
d306a15
Compare
There was a problem hiding this comment.
Code Review
This pull request introduces the pool-indexer service, a standalone component designed to index Uniswap V3 pool state and ticks into a local Postgres database and serve this data via HTTP. The changes include new database migrations, configuration updates to allow the driver to use the indexer as a data source, and a shared retry utility. Review feedback identified critical compilation errors in the E2E test suite due to invalid literal syntax and in the shared retry module due to the use of deprecated or non-existent rand crate methods.
|
What happened with the PR description formatting? |
I've updated it now; I drifted a bit from the original template while re-editing the description multiple times 😅 |
d306a15 to
cbfc4eb
Compare
squadgazzz
left a comment
There was a problem hiding this comment.
A first round. I hope I got the idea. Correct me if I am wrong somewhere.
| impl V3PoolDataSource for PoolIndexerClient { | ||
| async fn get_registered_pools(&self) -> Result<RegisteredPools> { | ||
| // We pin the snapshot to the first page's block_number; later pages | ||
| // may report a higher block — bounded drift, picked up by the |
There was a problem hiding this comment.
The driver does this at startup:
A. GET /pools -> envelope says block_number = N
B. pick top 100 by liquidity
C. GET /pools/by-ids + /pools/ticks -> returns at indexer's current head N+k
D. event-replay from chain RPC, starting at N+1
Step D re-applies any Mint/Burn that happened between A and C, because the pool state stored in step C already reflects them but the start block is from step A. Swap is idempotent, Mint and Burn are not. Effect: over- or under-counted liquidity until the next Swap overwrites the pool's state.
Fix: driver picks its own init block from RPC, then waits for the indexer to catch up to it before calling /pools/by-ids.
let head = block_retriever.current().await?;
loop {
let probe: PoolsResponse = http.get(format!("{base}/pools?limit=1")).send().await?.json().await?;
if probe.block_number >= head { break; }
tokio::time::sleep(Duration::from_millis(500)).await;
}
// safe to proceed: indexer is at >= headDrop the fetched_block_number field on RegisteredPools since callers shouldn't trust it.
There was a problem hiding this comment.
I went through this, and I follow the argument. One thing I want to flag, because it took me a while to convince myself: just waiting for the indexer to reach target_block doesn't fully close the race. The indexer keeps advancing in the background, so by the time C's HTTP response returns the data is at some block >= target_block but usually strictly greater. If we anchor at target_block (or A's block), events in (anchor, C_block] still get double-applied.
The fix I went with: take the snapshot block from C's response envelope, not from A's.
(please correct me if I misunderstood it))
- Both trait methods now take
target_block. Subgraph honors it viablock: { number: ... };pool-indexer waits viawait_until(500 ms polling, no upper bound, since the surroundingBackgroundInitLiquiditySourcealready caps init at 10 minutes). get_pools_with_ticks_by_idsnow returns aPoolsWithTicks{fetched_block_number, pools}struct so the caller sees the actual snapshot block — for pool-indexer this is what comes back in the response envelope; for subgraph it's justtarget_blockechoed back.PoolsCheckpointHandler::newpickstarget_blockfromblock_retriever.current_block(), calls A, then calls C with A's returned block, then anchors the checkpoint at C's returned block. That's the later of the two, so any event reflected in C's pools is also accounted for in the anchor.
| /// mainnet pool: USDC/WETH 0.05% (0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640) | ||
| /// had 1533 active ticks on 2026-04-22. Callers that hit this limit get a | ||
| /// `warn_truncated` log; bump if that starts firing on real pools. | ||
| pub const MAX_TICKS_PER_POOL: u32 = 5_000; |
There was a problem hiding this comment.
If a pool has >5000 active ticks, the response carries a partial set. The driver computes a wrong price curve and quotes are off. Today's biggest pool has ~1500, so there's headroom, but the failure mode is silent quoting bugs.
Either drop the cap (callers already cap pool count at 500, query is bounded) or return truncated: true per pool so the driver can refuse to quote that one.
There was a problem hiding this comment.
went with the simpler of your two options and dropped the cap entirely.
Changes:
- Deleted the
MAX_TICKS_PER_POOLconstant. get_ticks_for_poolsSQL: dropped theLATERAL+LIMITsetup (only needed because of the per-pool cap), now a plain `WHERE pool_address = ANY($2) query. more straightforwrd.get_ticks(single-pool) similarly loses itsLIMIT.- Deleted
warn_truncatedandwarn_on_truncated_poolshelpers — no callers left. - Updated the
api/uniswap_v3/ticks.rsdoc comment to reference the actual caller-side bound (MAX_POOL_IDS_PER_REQUEST) instead of the now-gone per-pool cap.
| } | ||
| } | ||
|
|
||
| pub async fn run(self, poll_interval: std::time::Duration) -> ! { |
There was a problem hiding this comment.
tokio::spawn(backfill_symbols(...));
tokio::spawn(backfill_decimals(...));The JoinHandles get dropped. If either task panics (e.g. one bad row in a batch update), the indexer keeps running but stops backfilling until somebody restarts the process. No metric, no log beyond the panic stderr.
Move both into the same JoinSet as the network indexer tasks in run.rs so the process exits on backfill failure, matching the existing supervisor pattern.
There was a problem hiding this comment.
I applied this fix. 🫡
I lifted both spawns into the per-network JoinSet in run_network_indexer (run.rs). Now a backfill panic propagates through factory_set.join_next() exactly like an indexer-loop panic —> process exits, k8s restarts the pod, the existing restarts metric pages on-call. Same pattern as elsewhere.
One thing I missed earlier: backfill is per-network (it iterates all tokens missing symbol/decimals of factory), but the spawns lived in the per-factory run(). So a network with N factories was techincally running 2N backfill tasks racing on the same DB rows. Lifting to the network level deduplicates that.
(Missed this earlier since the repeat SQL statements were idempotent) 👀
|
|
||
| /// Drop pools where either token's `decimals` is missing. Treating missing | ||
| /// as `0` would mis-scale prices by 10^18; fail closed until backfill. | ||
| fn pools_tokens_have_decimals(p: &IndexerPool) -> bool { |
There was a problem hiding this comment.
On a fresh deploy, decimals backfill takes a few minutes to drain ~1700 tokens. During that window, every driver call to /pools returns pools with missing decimals, and the driver logs a multi-line WARN per dropped pool.
Drop to debug! and aggregate:
| fn pools_tokens_have_decimals(p: &IndexerPool) -> bool { | |
| let mut dropped = 0usize; | |
| let filtered: Vec<_> = pools.into_iter() | |
| .filter(|p| { | |
| if p.token0.decimals.is_none() || p.token1.decimals.is_none() { | |
| dropped += 1; | |
| false | |
| } else { | |
| true | |
| } | |
| }) | |
| .collect(); | |
| if dropped > 0 { | |
| tracing::debug!(dropped, "pool-indexer returned pools missing decimals"); | |
| } |
There was a problem hiding this comment.
applied the aggeration fix 🫡
Pulled the per-pool warn out of the predicate and into a new helper drop_pools_missing_decimals(Vec<IndexerPool>) -> Vec<IndexerPool> that filters and emits a single debug! per call with both dropped and total counts.
Durign Steady-state this is silent; during initial decimals backfill you'd see one log line per page (e.g. dropped=124 total=1000) instead of ~6 lines per dropped pool at WARN.
Both call sites use it:
- the
get_registered_poolspagination loop (after the zero-liquidity pre-filter, so the counter reflects what was dropped for the decimals reason specifically), - and the
fetch_pools_by_idshelper.
Doc comment on the helper notes the deploy-cold-start expectation so the next reader doesn't have to derive it.
| } | ||
| } | ||
|
|
||
| async fn fetch_pool_liquidity(provider: &AlloyProvider, pool: Address, block: u64) -> Option<u128> { |
There was a problem hiding this comment.
Why these calls exist. Mint and Burn events emit (tickLower, tickUpper, amount, amount0, amount1) but not the pool's post-event liquidity. Swap and Initialize do carry it in the event payload. So to keep uniswap_v3_pool_states.liquidity correct after a Mint or Burn, the indexer currently reaches back to the chain with pool.liquidity().block(N) for every (pool, block) pair touched by a Mint/Burn in the chunk.
How far back N can be. During steady-state polling, N is within ~12 blocks of head, fine on any node. During catch-up after seeding, N walks forward from seed_block + 1. If seed_block is days or weeks behind chain head, those calls hit historical state that pruned nodes have thrown away. Default geth keeps 128 blocks (~25 minutes on mainnet), well short of any realistic seed.
Why the failure is silent. A pruned node returns "missing trie node" as a JSON-RPC error. The retry policy classifies it as transport, retries 5 times in ~400 ms, then gives up. The chunk commits without that liquidity update. The pool's liquidity row stays stale until the next Swap event for that pool overwrites it (Swap is "set", not "delta"). For a quiet pool that's hours of wrong state.
The fix. Remove fetch_pool_liquidity entirely and compute the new liquidity from the event alone. The driver already does this in append_events (pool_fetching.rs:407-477):
// Mint
if tick_lower <= pool.tick && pool.tick < tick_upper {
pool.liquidity += amount;
}
// Burn: same, but subtractThe indexer already has the pool's tick in scope (from a Swap/Initialize earlier in the chunk via LogAccumulator.full_states, or from uniswap_v3_pool_states at chunk start). Apply the same rule in handle_mint / handle_burn instead of consulting a prefetch cache.
Net effect. No more historical RPC calls. Archive-vs-pruned distinction stops mattering. Indexer becomes correct on any node that can serve eth_getLogs for the seed-block range. Drop prefetch_liquidities, LiquidityCache, update_liquidity_from_cache, and batch_update_pool_liquidity while you're at it.
There was a problem hiding this comment.
Hi Ilya, I went through this carefully. This bug went totally went under my radar 😅 , but I've applied the fix now 🫡
The suggested fix makes sense. the driver's pool_fetching.rs:429 already implements exactly the same active-range delta rule, so it's really just porting that logic to the indexer side and dropping the chain call.
Here's what I changed
- Deleted fetch_pool_liquidity, prefetch_liquidities and related structs.
- Added a new apply_position_delta_to_pool_liq on the accumulator that does if tick_lower <= pool.tick < tick_upper: pool.liquidity ± amount, exactly like the driver does.
- For the pre-event (tick, liquidity), the accumulator now looks in priority: prior
Swap/Initializein this chunk → priorMint/Burnin this chunk → a single DB load fromuniswap_v3_pool_states. That last source replaces theeth_call(sentirely.
| let tick_idxs: Vec<i32> = deltas.iter().map(|delta| delta.tick_idx).collect(); | ||
| let delta_values: Vec<BigDecimal> = deltas.iter().map(|delta| sql_i128(delta.delta)).collect(); | ||
|
|
||
| sqlx::query( |
There was a problem hiding this comment.
The DELETE ... WHERE upserted.liquidity_net = 0 shares a snapshot with the surrounding UPSERT, so it can only see rows that existed before the statement. New rows inserted with net=0 in the same statement would be invisible to the DELETE and leak.
Currently safe because into_chunk_changes filters delta != 0 upstream. Add a guard so a future caller doesn't get surprised:
| sqlx::query( | |
| ON CONFLICT (chain_id, pool_address, tick_idx) DO UPDATE | |
| SET liquidity_net = uniswap_v3_ticks.liquidity_net + EXCLUDED.liquidity_net | |
| WHERE EXCLUDED.liquidity_net <> 0 |
There was a problem hiding this comment.
I went through your suggestion and I think the patch above closes one face of the issue but not the central one.
Wanted to walk through why, and what I ended up doing.
Your suggested clause:
ON CONFLICT (...) DO UPDATE
SET liquidity_net = uniswap_v3_ticks.liquidity_net + EXCLUDED.liquidity_net
WHERE EXCLUDED.liquidity_net <> 0This gates the DO UPDATE action on the incoming delta being nonzero. So it correctly skips a no-op UPDATE when an in-batch delta is zero,
but it doesn't stop the INSERT branch from running with liquidity_net = 0 if the target (pool, tick) doesn't exist yet. That's the case the
DELETE-after-UPSERT structurally can't catch — Postgres modifying-CTE rules mean the DELETE's snapshot of uniswap_v3_ticks is from
statement start, so freshly INSERTed rows (in the same statement) are invisible to it. Sibling CTEs run against the pre-statement snapshot.
The minimum-correct fix I went with is to guard the INSERT SELECT directly:
INSERT INTO uniswap_v3_ticks (...)
SELECT $1, i.addr, i.tick_idx, i.total_delta
FROM input i
WHERE i.total_delta <> 0
AND EXISTS (
SELECT 1 FROM uniswap_v3_pools
WHERE chain_id = $1 AND address = i.addr AND factory = $2
)
ON CONFLICT (chain_id, pool_address, tick_idx) DO UPDATE
SET liquidity_net = uniswap_v3_ticks.liquidity_net + EXCLUDED.liquidity_netNow zero-net rows never reach the INSERT in the first place, and the existing DELETE-USING-upserted continues to handle the
UPDATE-going-to-zero case (which it does correctly — those rows exist in the pre-statement snapshot, so DELETE can see them).
Q. Why not split into two statements (UPSERT then DELETE WHERE liquidity_net = 0)?
Considered it, and it's cleaner conceptually and obviously correct.
But the reason I kept the one-statement form: the existing DELETE's USING upserted scopes it to only the rows this batch touched. A standalone DELETE WHERE liquidity_net = 0 would be unscoped.
| /// Deserializes an optional String from *either* an environment variable — | ||
| /// with the format `%<ENV_VAR_NAME>` — or directly from the field value. | ||
| /// Missing field or missing env var (when referenced) → `None`. | ||
| pub fn deserialize_optional_string_from_env<'de, D>( |
There was a problem hiding this comment.
If an operator writes subgraph-bearer-token = "%GRAPH_TOKEN" and forgets to set GRAPH_TOKEN, the indexer starts up unauthenticated and 401s on every subgraph call.
| pub fn deserialize_optional_string_from_env<'de, D>( | |
| match value.strip_prefix(ENV_VAR_PREFIX) { | |
| Some(env_var_name) => Ok(std::env::var(env_var_name).ok().or_else(|| { | |
| tracing::warn!(%env_var_name, "optional env var not set, field is None"); | |
| None | |
| })), | |
| None => Ok(Some(value)), | |
| } |
There was a problem hiding this comment.
Applied. 🫡 Added the tracing::warn! in deserialize_optional_string_from_env as you suggested.
an unset %-referenced env var now logs optional env var referenced in config but not set; field is None before returning None.
expanded the doc comment to write the rationale.
There was a problem hiding this comment.
It doesn't make sense to do this here. The whole point of the function is that the string is optional.
If you need the field to authenticate, you don't use the optional
| // driver's event replay. | ||
| let mut cursor: Option<String> = None; | ||
| let mut pools: Vec<PoolData> = Vec::new(); | ||
| let mut fetched_block_number: u64 = 0; |
There was a problem hiding this comment.
let mut fetched_block_number: u64 = 0;
if fetched_block_number == 0 {
fetched_block_number = page.block_number;
}Anvil reports block 0 as genesis, so this conflates "no pages yet" with "really at block 0." Use Option<u64> and unwrap at the return site.
Note: if comment #1 is taken, this field becomes unused and gets deleted instead.
There was a problem hiding this comment.
folded this into comment no 1 when applying the fix 🫡
| Ok(self.snapshot_block) | ||
| } | ||
|
|
||
| async fn seed_pools(&self) -> Result<Vec<String>> { |
There was a problem hiding this comment.
The configured factory.address is written into every uniswap_v3_pools.factory row, but the seeder never sends it to the subgraph. If somebody points a Uniswap V3 config at a PancakeSwap V3 subgraph URL by mistake, the seeder pulls PancakeSwap pools and stamps them with the Uniswap V3 factory address. Live indexing then ignores real PancakeSwap PoolCreated events (log.address() != configured_factory) but the seeded rows are already wrong.
Add one probe before seeding: fetch any one pool from the subgraph and assert its factory field matches the configured address.
There was a problem hiding this comment.
I applied the factory probe. New SubgraphClient::fetch_factory_attestation(block) does a one-pool, one-field GraphQL query for pools(first: 1) { factory } at the same snapshot block as the rest of seeding, and SubgraphSeeder::seed runs it before seed_pools() so no rows get written under a mismatched factory.
It didn't help. |
…alloy imports, remove georli
…rrier, and remove silent MAX_TICKS_PER_POOL truncation
0d8f871 to
f79b8ca
Compare
|
I'll analyze this and get back to you. |
There was a problem hiding this comment.
Code Review
This pull request introduces the pool-indexer service, a standalone component for indexing Uniswap V3 pool state and ticks into Postgres. It refactors the driver's liquidity fetching logic to support both the existing subgraph and the new indexer via a common V3PoolDataSource trait. A critical logic error was identified in the indexer's liquidity calculation where casting u128 values to signed i128 could cause overflows and incorrect state updates for high-liquidity pools.

Description
Replaces our driver's third-party Uniswap V3 subgraph dependency with our own pool-indexer service. This is the subgraph-bootstrap slice of the larger #4349, scoped down to keep review focused. Includes a few fixes surfaced during local testing.
Out of scope (deferred to follow-up PRs):
Changes
New service:
crates/pool-indexer/eth_getLogsand persists incremental state to Postgres/pools,/pools/by-ids,/pools/{addr}/ticks,/pools/ticksDriver-side abstraction (same as original PR, already reviewed)
V3PoolDataSourcetrait inliquidity-sources/src/uniswap_v3/mod.rsUniV3SubgraphClient(no behavior change) + newPoolIndexerClientbuild_pool_data_sourceselects the impl based on the optionalpool-indexer-urldriver config; defaults to subgraphDatabase migration:
V110__pool_indexer_uniswap_v3.sqlpool_indexer_checkpoints,uniswap_v3_pools,uniswap_v3_pool_states,uniswap_v3_ticksIS NULLcolumns for the backfill hot pathE2E tests (
crates/e2e/tests/e2e/pool_indexer.rs)driver_integration— driver → pool-indexer wiring, asserted viaper-route request counters on all four endpoints
checkpoint_resume— restart idempotency: pool count, per-pool state,and checkpoint advance all survive a stop+start
api_errors— input validation: 400 on unparseable address, 200 +empty ticks on valid-but-unknown address
pagination— cursor traversal withlimit=1across multiple pools,no duplicates, terminates on
next_cursor = nullalloy::sol!with embeddedcompiled bytecode — no additions to the
contractscrateHow to test
1. e2e tests
Expect 4 passes in ~10s after the build settles. These cover the driver↔indexer wiring, restart idempotency, the input-validation surface, and cursor pagination. (Same from original)
2. Manual (against a real network with a Uniswap V3 subgraph)
Before running, create
crates/pool-indexer/config.local.toml(schema =Configurationstruct insrc/config.rs). String fields accept%ENV_VAR, so RPC URLs and subgraph bearer tokens can come from the environment.Once the indexer is live, point a local driver at it by setting
pool-indexer-url = "http://localhost:8080"in the driver's Uniswap V3 liquidity config (replacing the usual.graph-url). Then submit a quote environment.Once the indexer is live, point a local driver at it by setting
pool-indexer-url = "http://localhost:8080"in the driver's Uniswap V3 liquidity config (replacing the usualgraph-url). Then submit a quote and confirm the log lineuniswap v3: using pool-indexer as data source url=...appears — that's the driver picking the indexer path.Verified manually: Ink mainnet (chain 57073), Uniswap V3 subgraph (via bearer auth), USDT0 ↔ WETH quotes in both directions — prices internally consistent and matched the live market.