diff --git a/.sqlx/query-74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809.json b/.sqlx/query-1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc.json similarity index 53% rename from .sqlx/query-74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809.json rename to .sqlx/query-1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc.json index e93bf2d39..45f1b5735 100644 --- a/.sqlx/query-74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809.json +++ b/.sqlx/query-1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nSELECT\n low_tx.tx_id AS \"tx_id!\",\n low_tx_out.tx_out_id AS \"tx_out_id!\",\n low_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n low_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 LIMIT 1), 0) AS id) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id < block.id ORDER BY block_id DESC LIMIT 1), 0) AS tx_id) AS low_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id <= low_tx.tx_id ORDER BY tx_id DESC LIMIT 1), 0) AS tx_out_id) AS low_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id <= low_tx_out.tx_out_id ORDER BY tx_out_id DESC LIMIT 1), 0) AS ma_tx_out_id) AS low_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id <= low_tx.tx_id ORDER BY tx_in_id DESC LIMIT 1), 0) AS tx_in_id) AS low_tx_in;\n", + "query": "\nSELECT\n low_tx.tx_id AS \"tx_id!\",\n low_tx_out.tx_out_id AS \"tx_out_id!\",\n low_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n low_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 ORDER BY id ASC LIMIT 1), 0) AS id) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id < block.id ORDER BY block_id DESC LIMIT 1), 0) AS tx_id) AS low_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id <= low_tx.tx_id ORDER BY tx_id DESC LIMIT 1), 0) AS tx_out_id) AS low_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id <= low_tx_out.tx_out_id ORDER BY tx_out_id DESC LIMIT 1), 0) AS ma_tx_out_id) AS low_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id <= low_tx.tx_id ORDER BY tx_in_id DESC LIMIT 1), 0) AS tx_in_id) AS low_tx_in;\n", "describe": { "columns": [ { @@ -36,5 +36,5 @@ null ] }, - "hash": "74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809" + "hash": "1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc" } diff --git a/.sqlx/query-542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de.json b/.sqlx/query-542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de.json new file mode 100644 index 000000000..730fc962e --- /dev/null +++ b/.sqlx/query-542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de.json @@ -0,0 +1,40 @@ +{ + "db_name": "PostgreSQL", + "query": "\nSELECT\n high_tx.tx_id AS \"tx_id!\",\n high_tx_out.tx_out_id AS \"tx_out_id!\",\n high_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n high_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT id FROM block WHERE block_no = $1 ORDER BY id DESC LIMIT 1) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id > block.id ORDER BY block_id ASC LIMIT 1), 9223372036854775807) AS tx_id) AS high_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id >= high_tx.tx_id ORDER BY tx_id ASC LIMIT 1), 9223372036854775807) AS tx_out_id) AS high_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id >= high_tx_out.tx_out_id ORDER BY tx_out_id ASC LIMIT 1), 9223372036854775807) AS ma_tx_out_id) AS high_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id >= high_tx.tx_id ORDER BY tx_in_id ASC LIMIT 1), 9223372036854775807) AS tx_in_id) AS high_tx_in;\n", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_id!", + "type_info": "Int8" + }, + { + "ordinal": 1, + "name": "tx_out_id!", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "ma_tx_out_id!", + "type_info": "Int8" + }, + { + "ordinal": 3, + "name": "tx_in_id!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + null, + null, + null, + null + ] + }, + "hash": "542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de" +} diff --git a/.sqlx/query-ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9.json b/.sqlx/query-ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9.json deleted file mode 100644 index cc99fa95d..000000000 --- a/.sqlx/query-ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\nSELECT\n high_tx.tx_id AS \"tx_id!\",\n high_tx_out.tx_out_id AS \"tx_out_id!\",\n high_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n high_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT id FROM block WHERE block_no = $1 LIMIT 1) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id > block.id ORDER BY block_id ASC LIMIT 1), 9223372036854775807) AS tx_id) AS high_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id >= high_tx.tx_id ORDER BY tx_id ASC LIMIT 1), 9223372036854775807) AS tx_out_id) AS high_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id >= high_tx_out.tx_out_id ORDER BY tx_out_id ASC LIMIT 1), 9223372036854775807) AS ma_tx_out_id) AS high_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id >= high_tx.tx_id ORDER BY tx_in_id ASC LIMIT 1), 9223372036854775807) AS tx_in_id) AS high_tx_in;\n", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_id!", - "type_info": "Int8" - }, - { - "ordinal": 1, - "name": "tx_out_id!", - "type_info": "Int8" - }, - { - "ordinal": 2, - "name": "ma_tx_out_id!", - "type_info": "Int8" - }, - { - "ordinal": 3, - "name": "tx_in_id!", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - null, - null, - null, - null - ] - }, - "hash": "ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9" -} diff --git a/changes/node/changed/cnight-observation-fetch-determinism.md b/changes/node/changed/cnight-observation-fetch-determinism.md new file mode 100644 index 000000000..18f434c39 --- /dev/null +++ b/changes/node/changed/cnight-observation-fetch-determinism.md @@ -0,0 +1,24 @@ +#node #runtime #security +# Make cNIGHT observation inherent generation deterministic and unskippable + +The cNIGHT inherent provider used a row-count over-fetch (`tx_capacity * factor`) +as a one-shot SQL `LIMIT`, then treated "fewer than `tx_capacity` distinct txs +returned" as "range complete" and advanced the Cardano cursor to the tip — +without checking whether the query was truncated by its limit. A range holding +more matching rows than the limit across fewer txs (many UTXOs per tx) was +silently skipped, and a node that fetched more rows derived a different inherent +→ `check_inherent` rejection (fork/liveness) plus corrupted mint/burn accounting. + +Fix: one deterministic path for both the in-memory cache and the db fallback. +Fetch the complete range (`bulk_pull` now reports whether its limit was hit) and +truncate whole-transaction to the runtime envelope (`tx_capacity * +UTXO_PER_TX_OVERESTIMATE`). The cursor reaches the tip only on a proven-complete +fetch; otherwise it stops at the last fully-observed tx. With no fetch-size input +left, every node derives identical inherents. + +`UTXO_PER_TX_OVERESTIMATE` moves to `midnight-primitives-cnight-observation` as +the single source shared by runtime and node. Byte-identical to the prior 64x +path on benign history, so finalized blocks replay unchanged. + +PR: +Issue: diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs index a62f0290a..dc9483a4e 100644 --- a/ledger/src/lib.rs +++ b/ledger/src/lib.rs @@ -154,6 +154,129 @@ pub fn drop_all_default_storage() { ledger_9::storage::drop_default_storage_if_exists(); } +/// Genesis bootstrap that dispatches on the ledger version embedded in the +/// genesis state, instead of assuming the newest. +/// +/// All ledger versions are compiled in, but a genesis blob is serialized with +/// exactly one version's `LedgerState`, and its deserializer rejects any other +/// version's tag. Hardcoding the latest (as each ledger bump historically did) +/// makes the node unable to bootstrap a chain still on an older genesis — e.g. +/// live mainnet, whose genesis is immutable. These wrappers read the blob's tag +/// and route to the matching module. This is *not* state migration: each chain +/// keeps running its own ledger version; we only stop forcing the newest at the +/// genesis boundary. +#[cfg(feature = "std")] +pub mod genesis_version { + /// Ledger storage version a tagged genesis blob was produced with. + #[derive(Debug, Clone, Copy, PartialEq, Eq)] + pub enum LedgerVersion { + /// ledger 7 + V7, + /// ledger 8 + V8, + /// ledger 9 + V9, + } + + /// Detect a genesis blob's ledger version by matching its leading tag + /// against each compiled-in module's own `LedgerState` tag. Newest first. + /// `None` if no known version matches (corrupt or future-version blob). + pub fn detect(genesis_state: &[u8]) -> Option { + let head = &genesis_state[..genesis_state.len().min(64)]; + let has = |tag: &str| head.windows(tag.len()).any(|w| w == tag.as_bytes()); + if has(&super::ledger_9::storage::genesis_ledger_state_tag()) { + Some(LedgerVersion::V9) + } else if has(&super::ledger_8::storage::genesis_ledger_state_tag()) { + Some(LedgerVersion::V8) + } else if has(&super::ledger_7::storage::genesis_ledger_state_tag()) { + Some(LedgerVersion::V7) + } else { + None + } + } + + fn unknown_version(genesis_state: &[u8]) -> String { + format!( + "unrecognised ledger genesis version tag in {}-byte genesis blob (known: {}, {}, {})", + genesis_state.len(), + super::ledger_9::storage::genesis_ledger_state_tag(), + super::ledger_8::storage::genesis_ledger_state_tag(), + super::ledger_7::storage::genesis_ledger_state_tag(), + ) + } + + /// Version-dispatched [`get_root`](super::ledger_9::storage::get_root). + pub fn get_root(genesis_state: &[u8], network_id: Option<&str>) -> Result, String> { + match detect(genesis_state) { + Some(LedgerVersion::V9) => super::ledger_9::storage::get_root(genesis_state, network_id) + .map_err(|e| e.to_string()), + Some(LedgerVersion::V8) => super::ledger_8::storage::get_root(genesis_state, network_id) + .map_err(|e| e.to_string()), + Some(LedgerVersion::V7) => super::ledger_7::storage::get_root(genesis_state, network_id) + .map_err(|e| e.to_string()), + None => Err(unknown_version(genesis_state)), + } + } + + /// Version-dispatched separate-db genesis init. + pub fn init_storage_paritydb_separate>( + dir: P, + genesis_state: &[u8], + cache_size: usize, + ) -> Vec { + match detect(genesis_state) { + Some(LedgerVersion::V9) => super::ledger_9::storage::init_storage_paritydb_separate( + dir, + genesis_state, + cache_size, + ), + Some(LedgerVersion::V8) => super::ledger_8::storage::init_storage_paritydb_separate( + dir, + genesis_state, + cache_size, + ), + Some(LedgerVersion::V7) => super::ledger_7::storage::init_storage_paritydb_separate( + dir, + genesis_state, + cache_size, + ), + None => panic!("{}", unknown_version(genesis_state)), + } + } + + /// Version-dispatched unified-db genesis init. + pub fn init_storage_paritydb_unified< + D: std::ops::Deref + Default + Send + Sync + 'static, + const COLUMN_OFFSET: u8, + >( + db_instance: D, + genesis_state: &[u8], + cache_size: usize, + ) -> Vec { + match detect(genesis_state) { + Some(LedgerVersion::V9) => + super::ledger_9::storage::init_storage_paritydb_unified::( + db_instance, + genesis_state, + cache_size, + ), + Some(LedgerVersion::V8) => + super::ledger_8::storage::init_storage_paritydb_unified::( + db_instance, + genesis_state, + cache_size, + ), + Some(LedgerVersion::V7) => + super::ledger_7::storage::init_storage_paritydb_unified::( + db_instance, + genesis_state, + cache_size, + ), + None => panic!("{}", unknown_version(genesis_state)), + } + } +} + mod common; pub mod types { diff --git a/ledger/src/versions/common/storage.rs b/ledger/src/versions/common/storage.rs index ef39ea8d9..ec3aebf26 100644 --- a/ledger/src/versions/common/storage.rs +++ b/ledger/src/versions/common/storage.rs @@ -74,6 +74,17 @@ impl core::fmt::Display for GetRootError { } } +/// The `LedgerState` serialization tag for this ledger version, e.g. +/// `ledger-state[v13]` (ledger 8) or `ledger-state[v16]` (ledger 9). A tagged +/// genesis blob carries this verbatim, so it lets the bootstrap pick the right +/// deserializer before touching the body (see [`crate::genesis_version`]). +#[cfg(feature = "std")] +pub fn genesis_ledger_state_tag() -> std::borrow::Cow<'static, str> { + use super::ledger_storage_local::DefaultDB; + use super::midnight_serialize_local::Tagged; + as Tagged>::tag() +} + pub fn get_root(state: &[u8], network_id: Option<&str>) -> Result, GetRootError> { // Get empty state key use super::api::Ledger; diff --git a/node/src/backend/custom_parity_db.rs b/node/src/backend/custom_parity_db.rs index 5cd5ab729..688380d89 100644 --- a/node/src/backend/custom_parity_db.rs +++ b/node/src/backend/custom_parity_db.rs @@ -109,7 +109,7 @@ pub fn open>( match storage_config.separation { StorageSeparation::Separate => { - midnight_node_ledger::ledger_9::storage::init_storage_paritydb_separate( + midnight_node_ledger::genesis_version::init_storage_paritydb_separate( &storage_config.db_path, &storage_config.genesis_state, storage_config.cache_size, @@ -117,7 +117,7 @@ pub fn open>( Ok((OwnedDb(db), LedgerStorageDb::SeparateDb(storage_config.db_path.clone()))) }, StorageSeparation::Unified => { - midnight_node_ledger::ledger_9::storage::init_storage_paritydb_unified::< + midnight_node_ledger::genesis_version::init_storage_paritydb_unified::< _, NUM_COLUMNS_POLKADOT, >(OwnedDb(db.clone()), &storage_config.genesis_state, storage_config.cache_size); diff --git a/node/src/chain_spec/mod.rs b/node/src/chain_spec/mod.rs index 7fafe3f32..b8a84cc9d 100644 --- a/node/src/chain_spec/mod.rs +++ b/node/src/chain_spec/mod.rs @@ -56,7 +56,7 @@ pub enum ChainSpecInitError { Missing(String), ParseError(String), Serialization(String), - GenesisStateError(midnight_node_ledger::ledger_9::storage::GetRootError), + GenesisStateError(String), } impl fmt::Display for ChainSpecInitError { @@ -268,7 +268,7 @@ fn genesis_config(genesis: T) -> Result u128 { + u128::try_from(quantity).unwrap_or_else(|_| { + panic!( + "negative cNIGHT asset quantity {quantity} from db-sync — data source corrupt or forged" + ) + }) +} + #[derive(thiserror::Error, Debug)] pub enum RegistrationDatumDecodeError { #[error("Cardano credential not bytes")] @@ -107,7 +119,7 @@ impl MidnightCNightObservationDataSource for MidnightCNightObservationDataSource start_position: &CardanoPosition, current_tip: McBlockHash, tx_capacity: usize, - utxo_overestimate: usize, + max_utxos: usize, ) -> Result> { // Resolve current_tip -> CardanoPosition. This must preserve the historic // replay semantics: query through the block's Cardano tip and only then @@ -121,18 +133,18 @@ impl MidnightCNightObservationDataSource for MidnightCNightObservationDataSource drop(_block_timer); let end = end.increment(); - // The over-fetch bound is consensus-affecting and runtime-supplied, so it - // must flow into the SQL row limit (see `bulk_pull`) rather than a fixed - // client-side constant. - let utxos = - bulk_pull(&self.pool, config, start_position, &end, utxo_overestimate).await?; - let (result, _full_window) = truncate_to_tx_capacity( - utxos, - tx_capacity, - start_position, - end, - ); - Ok(result) + // This single query serves exactly one inherent, so fetch only one + // envelope's worth plus a sentinel row. Anything past `max_utxos` is + // discarded by `truncate_to_tx_capacity` anyway; a subquery returning the + // full `max_utxos + 1` rows flags the range row-limited + // (`complete = false`), holding the cursor short of the tip. Each category + // query is `ORDER BY (block_no, block_index, ...) LIMIT n`, so this prefix + // holds the same first-`max_utxos` rows the old whole-range pull did: + // byte-identical inherent, far less resident. The cache path still pulls + // whole windows with `LARGE_LIMIT`; both truncate to the same envelope. + let (utxos, complete) = + bulk_pull(&self.pool, config, start_position, &end, max_utxos.saturating_add(1)).await?; + Ok(truncate_to_tx_capacity(utxos, tx_capacity, max_utxos, complete, start_position, end)) } } ); @@ -333,7 +345,7 @@ impl MidnightCNightObservationDataSourceImpl { let utxo = ObservedUtxo { header, data: ObservedUtxoData::AssetCreate(CreateData { - value: row.quantity as u128, + value: checked_asset_quantity(row.quantity), owner, utxo_tx_hash: McTxHash(row.tx_hash.0), utxo_tx_index: row.utxo_index.0, @@ -390,7 +402,7 @@ impl MidnightCNightObservationDataSourceImpl { let utxo = ObservedUtxo { header, data: ObservedUtxoData::AssetSpend(SpendData { - value: row.quantity as u128, + value: checked_asset_quantity(row.quantity), owner, utxo_tx_hash: McTxHash(row.utxo_tx_hash.0), utxo_tx_index: row.utxo_index.0, @@ -404,3 +416,22 @@ impl MidnightCNightObservationDataSourceImpl { Ok(utxos) } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn checked_asset_quantity_accepts_non_negative() { + assert_eq!(checked_asset_quantity(0), 0); + assert_eq!(checked_asset_quantity(1), 1); + assert_eq!(checked_asset_quantity(i64::MAX), i64::MAX as u128); + } + + #[test] + #[should_panic(expected = "negative cNIGHT asset quantity")] + fn checked_asset_quantity_panics_on_negative() { + // A bare `as u128` would sign-extend -1 to ~1.8e19 and mint it as DUST. + let _ = checked_asset_quantity(-1); + } +} diff --git a/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs b/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs index ab9c1af7c..692d85b6c 100644 --- a/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs +++ b/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs @@ -36,10 +36,11 @@ use sqlx::PgPool; use std::collections::HashMap; use std::sync::{Arc, Mutex}; -/// Effectively-no-limit page size for bulk pulls. The query path supports -/// `LIMIT` for paged use but the sliding window wants the whole range in -/// one shot. -const LARGE_LIMIT: usize = 5_000_000; +/// Row ceiling for a "whole range" pull. Both the sliding window and the +/// consensus path pass this and rely on completeness (not a row count) for +/// determinism; a pull that actually returns this many rows is treated as +/// truncated (see `bulk_pull`). +pub const LARGE_LIMIT: usize = 5_000_000; /// Default number of cardano blocks to keep in the sliding window when the /// node config doesn't override it. Memory cost ≈ 5 KB × events-per-block, @@ -72,23 +73,25 @@ pub enum BulkPullError { Observation(#[from] MidnightCNightObservationDataSourceError), } -/// Pull every cnight observation event in `[start, end]` (inclusive) and -/// return them sorted ascending by `tx_position`. +/// Pull every cnight observation event in `[start, end]` (inclusive), sorted by +/// `tx_position`, with `complete = true` iff no query hit `limit` (so the result +/// is the whole range, not a row-limited prefix). /// -/// Both endpoints are full `CardanoPosition`s so the per-call data source can -/// pass exact `(block_number, tx_index_in_block)` boundaries while the bulk -/// /sliding-window paths can pass whole-block ranges via +/// `complete == false` means a query held >`limit` rows. At [`LARGE_LIMIT`] that +/// is pathological, so it is logged at error level; callers must not advance the +/// cursor to the tip on it (that would skip the unfetched rows). +/// +/// Both endpoints are full `CardanoPosition`s so callers can pass exact +/// `(block, tx_index)` boundaries or whole-block ranges via /// `CardanoPosition::{min,max}_for_block`. pub async fn bulk_pull( pool: &PgPool, cfg: &CNightAddresses, start: &CardanoPosition, end: &CardanoPosition, - // Per-query SQL row limit (over-fetch bound). The consensus inherent path - // passes the runtime-supplied `utxo_overestimate`; the background cache - // refresh passes `LARGE_LIMIT` to pull a whole multi-block window. + // Per-query SQL row limit; callers pass `LARGE_LIMIT` for a whole-range pull. limit: usize, -) -> Result, BulkPullError> { +) -> Result<(Vec, bool), BulkPullError> { let data_source = MidnightCNightObservationDataSourceImpl::new(pool.clone(), None, 0); let mapping_validator_address = Address::from_bech32(&cfg.mapping_validator_address) @@ -156,51 +159,101 @@ pub async fn bulk_pull( all.extend(v); } all.sort(); + // A query that returned exactly `limit` rows may have more behind it, so the + // pull is not provably complete. + let complete = + counts.0 < limit && counts.1 < limit && counts.2 < limit && counts.3 < limit; log::info!( target: "cnight::sliding-window", - "bulk_pull [{}/{}, {}/{}] -> reg={} dereg={} create={} spend={} (auth_ident={:?} cnight_ident={:?})", + "bulk_pull [{}/{}, {}/{}] -> reg={} dereg={} create={} spend={} complete={} (auth_ident={:?} cnight_ident={:?})", start.block_number, start.tx_index_in_block, end.block_number, end.tx_index_in_block, - counts.0, counts.1, counts.2, counts.3, auth_token_ident, cnight_ident, + counts.0, counts.1, counts.2, counts.3, complete, auth_token_ident, cnight_ident, ); - Ok(all) + if !complete { + log::error!( + target: "cnight::sliding-window", + "bulk_pull hit the {limit}-row limit in [{}, {}] (reg={} dereg={} create={} spend={}); \ + the cNIGHT observation window may be incomplete and the cursor will not advance to the tip", + start.block_number, end.block_number, counts.0, counts.1, counts.2, counts.3, + ); + } + Ok((all, complete)) } -/// Truncate a sorted, unique-position event list to at most `tx_capacity` -/// whole transactions. Returns the truncated `ObservedUtxos` plus a flag -/// indicating whether the full input fit (`true`: all events accepted up to -/// `fallback_end`; `false`: capacity hit and `result.end` is the position -/// just past the last accepted event). +/// Build the observation inherent from the **complete** sorted event list for +/// `[start, fallback_end]`. Caps at `tx_capacity` whole transactions and +/// `max_utxos` UTXOs (the runtime `process_tokens` envelope). +/// +/// Consensus invariants: +/// - Transactions are admitted whole, so `end` lands on a tx boundary; resuming +/// there cannot skip a counted tx's UTXOs. +/// - The cursor reaches `fallback_end` (the tip) ONLY when `complete` (the fetch +/// wasn't row-limited); otherwise it stops at the last fully-observed tx. +/// +/// Every node feeds this the complete range and the same `max_utxos`, so there is +/// no fetch-size input left to disagree on — inherents are byte-identical. pub fn truncate_to_tx_capacity( events: Vec, tx_capacity: usize, + max_utxos: usize, + complete: bool, start_position: &CardanoPosition, fallback_end: CardanoPosition, -) -> (ObservedUtxos, bool) { - let mut truncated: Vec = Vec::with_capacity(events.len().min(tx_capacity * 64)); +) -> ObservedUtxos { + let n = events.len(); + let mut truncated: Vec = Vec::with_capacity(n.min(max_utxos)); let mut num_txs: usize = 0; - let mut cur_tx: Option = None; - for utxo in events { - if cur_tx.as_ref().is_none_or(|tx| tx < &utxo.header.tx_position) { - num_txs += 1; - cur_tx = Some(utxo.header.tx_position.clone()); + let mut capped = false; + let mut i = 0usize; + while i < n { + // Extent [i, j) of the whole tx at `i` — its UTXOs share a position. + let mut j = i + 1; + while j < n + && events[j].header.tx_position.partial_cmp(&events[i].header.tx_position) + == Some(core::cmp::Ordering::Equal) + { + j += 1; } - if num_txs == tx_capacity { + let tx_len = j - i; + // A lone tx bigger than the whole envelope is still admitted (cap check + // skipped while empty) so the runtime's bound rejects it loudly instead + // of us stalling forever. + let exceeds_tx_cap = num_txs + 1 > tx_capacity; + let exceeds_max_utxos = !truncated.is_empty() && truncated.len() + tx_len > max_utxos; + if exceeds_tx_cap || exceeds_max_utxos { + capped = true; break; } - truncated.push(utxo); + truncated.extend_from_slice(&events[i..j]); + num_txs += 1; + i = j; } - let full_window = num_txs < tx_capacity; - let end = if full_window { + + let end = if capped { + // Caps are far below LARGE_LIMIT, so the last accepted tx is whole. + boundary_after(&truncated, start_position) + } else if complete { fallback_end } else { - truncated - .last() - .map(|u| u.header.tx_position.clone()) - .unwrap_or_else(|| start_position.clone()) - .increment() + // Row-limited: the final fetched tx may be missing UTXOs past the limit, + // so drop it and resume only as far as we proved complete. + if let Some(last) = truncated.last().map(|u| u.header.tx_position.clone()) { + truncated.retain(|u| u.header.tx_position < last); + } + boundary_after(&truncated, start_position) }; - (ObservedUtxos { start: start_position.clone(), end, utxos: truncated }, full_window) + + ObservedUtxos { start: start_position.clone(), end, utxos: truncated } +} + +/// Position just past the last accepted event, or `start` if none were accepted. +fn boundary_after(truncated: &[ObservedUtxo], start_position: &CardanoPosition) -> CardanoPosition { + truncated + .last() + .map(|u| u.header.tx_position.clone()) + .unwrap_or_else(|| start_position.clone()) + .increment() } /// Cached result of the previous `get_utxos_up_to_capacity` call. During @@ -421,9 +474,9 @@ impl RefreshContext { CardanoPosition::min_for_block(from_block), CardanoPosition::max_for_block(target_end), ); - // Warming the cache means pulling a whole multi-block window, so the per-query - // limit is the wide `LARGE_LIMIT` rather than the per-block over-fetch bound. - let extension = + // Whole multi-block window, so `LARGE_LIMIT`. `_complete`: bulk_pull + // already error-logs a truncated pull (which would leave a gap here). + let (extension, _complete) = bulk_pull(&self.pool, &self.cnight_addresses, &start, &end, LARGE_LIMIT).await?; { let mut events_guard = @@ -522,7 +575,7 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour start_position: &CardanoPosition, current_tip: McBlockHash, tx_capacity: usize, - utxo_overestimate: usize, + max_utxos: usize, ) -> Result> { // Same-tip cache: if `current_tip` and `start_position` are both // unchanged, the Cardano window hasn't grown, so reuse the previous @@ -598,7 +651,7 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour start_position, current_tip, tx_capacity, - utxo_overestimate, + max_utxos, ) .await; } @@ -612,7 +665,7 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour start_position, current_tip, tx_capacity, - utxo_overestimate, + max_utxos, ) .await; } @@ -625,8 +678,10 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour Ok(guard) => slice_range(&guard, start_position, &end).to_vec(), Err(_) => Vec::new(), }; - let (result, _full_window) = - truncate_to_tx_capacity(window, tx_capacity, start_position, end); + // Window holds the complete range, so `complete = true` — same inputs as + // the db fallback, hence the same inherent. + let result = + truncate_to_tx_capacity(window, tx_capacity, max_utxos, true, start_position, end); if let Ok(mut guard) = self.last_observation.lock() { *guard = Some(LastObservation { @@ -780,4 +835,84 @@ mod tests { existing.iter().map(|u| u.header.tx_position.block_number).collect(); assert_eq!(block_numbers, (14..20).collect::>()); } + + /// `block_number`-th transaction, `utxo_index`-th UTXO within it. UTXOs that + /// share `block_number` belong to the same Cardano transaction (one distinct + /// `tx_position`). + fn utxo_with_index(block_number: u32, utxo_index: u16) -> ObservedUtxo { + let mut u = utxo(block_number, 0); + u.header.utxo_index = UtxoIndexInTx(utxo_index); + u + } + + /// 50 transactions, 5 UTXOs each — distinct `tx_position` per transaction. + fn fifty_txs_five_utxos() -> Vec { + (0..50u32).flat_map(|tx| (0..5u16).map(move |u| utxo_with_index(tx, u))).collect() + } + + /// The cNIGHT observation skip bug, fixed: a row-limited (incomplete) fetch + /// must NEVER advance `next_cardano_position` to the tip. The rows between + /// the last observed tx and the tip would be skipped forever, and a node + /// that DID fetch them would build a different inherent (check_inherent + /// split). Before the fix, "fewer than tx_capacity distinct txs" was treated + /// as "range complete" and the cursor jumped to the tip regardless. + #[test] + fn incomplete_fetch_must_not_advance_to_tip() { + // The range holds 50 txs but the fetch was row-limited to the first 40. + let fetched: Vec = + (0..40u32).flat_map(|tx| (0..5u16).map(move |u| utxo_with_index(tx, u))).collect(); + let tip = pos(100, 0); + + let obs = truncate_to_tx_capacity( + fetched, /* tx_capacity */ 1000, /* max_utxos */ 100_000, /* complete */ false, + &pos(0, 0), tip.clone(), + ); + + assert_ne!(obs.end, tip, "advanced to tip on an incomplete fetch -> skips unfetched txs"); + // The last fetched tx may be partial, so it is dropped; resume at the last + // provably-whole tx (tx 38), never past observed data. + assert_eq!(obs.end, pos(38, 0).increment()); + assert!( + obs.utxos.iter().all(|u| u.header.tx_position < pos(39, 0)), + "retained a tx at/after the truncation frontier", + ); + } + + /// A complete fetch under both caps reports the whole range and advances to + /// the tip — the steady-state case, and what both the in-memory cache slice + /// and the LARGE_LIMIT db fetch produce for the same block (hence no + /// cache-vs-fallback divergence). + #[test] + fn complete_fetch_advances_to_tip() { + let events = fifty_txs_five_utxos(); + let tip = pos(100, 0); + let obs = + truncate_to_tx_capacity(events.clone(), 1000, 100_000, true, &pos(0, 0), tip.clone()); + assert_eq!(obs.end, tip); + assert_eq!(obs.utxos.len(), events.len()); + } + + /// The UTXO envelope cap truncates at a whole-tx boundary: never + /// mid-transaction (resuming would skip the rest of that tx's UTXOs) and + /// never above the runtime acceptance bound. + #[test] + fn utxo_envelope_cap_truncates_at_whole_tx() { + // 250 events. Cap 22 -> 4 whole txs (20 UTXOs); the 5th (would reach 25) + // is held back. + let obs = truncate_to_tx_capacity(fifty_txs_five_utxos(), 1000, 22, true, &pos(0, 0), pos(100, 0)); + assert_eq!(obs.utxos.len(), 20, "did not cut on a whole-tx boundary"); + assert!(obs.utxos.len() <= 22); + assert_eq!(obs.end, pos(3, 0).increment(), "must resume past the last whole tx"); + } + + /// The transaction-count cap admits at most `tx_capacity` whole transactions. + #[test] + fn tx_capacity_cap_truncates_at_whole_tx() { + let obs = truncate_to_tx_capacity(fifty_txs_five_utxos(), 10, 100_000, true, &pos(0, 0), pos(100, 0)); + let distinct: std::collections::BTreeSet = + obs.utxos.iter().map(|u| u.header.tx_position.block_number).collect(); + assert_eq!(distinct.len(), 10); + assert_eq!(obs.utxos.len(), 50, "10 txs x 5 UTXOs"); + assert_eq!(obs.end, pos(9, 0).increment()); + } } diff --git a/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs b/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs index ff6d54dd6..8dc355681 100644 --- a/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs +++ b/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs @@ -84,7 +84,7 @@ impl MidnightCNightObservationDataSource for CNightObservationDataSourceMock { start: &CardanoPosition, _current_tip: McBlockHash, _tx_capacity: usize, - _utxo_overestimate: usize, + _max_utxos: usize, ) -> Result> { let mut end = start.clone(); end.block_number += 1; diff --git a/primitives/mainchain-follower/src/db/queries/cnight_observation.rs b/primitives/mainchain-follower/src/db/queries/cnight_observation.rs index f266e0c4a..e83eaaeb3 100644 --- a/primitives/mainchain-follower/src/db/queries/cnight_observation.rs +++ b/primitives/mainchain-follower/src/db/queries/cnight_observation.rs @@ -404,7 +404,7 @@ SELECT low_ma_tx_out.ma_tx_out_id AS "ma_tx_out_id!", low_tx_in.tx_in_id AS "tx_in_id!" FROM - (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 LIMIT 1), 0) AS id) AS block, + (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 ORDER BY id ASC LIMIT 1), 0) AS id) AS block, LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id < block.id ORDER BY block_id DESC LIMIT 1), 0) AS tx_id) AS low_tx, LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id <= low_tx.tx_id ORDER BY tx_id DESC LIMIT 1), 0) AS tx_out_id) AS low_tx_out, LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id <= low_tx_out.tx_out_id ORDER BY tx_out_id DESC LIMIT 1), 0) AS ma_tx_out_id) AS low_ma_tx_out, @@ -435,7 +435,7 @@ SELECT high_ma_tx_out.ma_tx_out_id AS "ma_tx_out_id!", high_tx_in.tx_in_id AS "tx_in_id!" FROM - (SELECT id FROM block WHERE block_no = $1 LIMIT 1) AS block, + (SELECT id FROM block WHERE block_no = $1 ORDER BY id DESC LIMIT 1) AS block, LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id > block.id ORDER BY block_id ASC LIMIT 1), 9223372036854775807) AS tx_id) AS high_tx, LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id >= high_tx.tx_id ORDER BY tx_id ASC LIMIT 1), 9223372036854775807) AS tx_out_id) AS high_tx_out, LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id >= high_tx_out.tx_out_id ORDER BY tx_out_id ASC LIMIT 1), 9223372036854775807) AS ma_tx_out_id) AS high_ma_tx_out, diff --git a/primitives/mainchain-follower/src/idp/cnight_observation.rs b/primitives/mainchain-follower/src/idp/cnight_observation.rs index d00df21bd..8bd9ccb65 100644 --- a/primitives/mainchain-follower/src/idp/cnight_observation.rs +++ b/primitives/mainchain-follower/src/idp/cnight_observation.rs @@ -16,7 +16,7 @@ use crate::{MidnightCNightObservationDataSource, MidnightObservationTokenMovement, ObservedUtxo}; use midnight_primitives_cnight_observation::{ CNightAddresses, CNightObservationApi, CardanoPosition, INHERENT_IDENTIFIER, InherentError, - TimestampUnixMillis, + TimestampUnixMillis, UTXO_PER_TX_OVERESTIMATE, }; use parity_scale_codec::Decode; use sidechain_domain::McBlockHash; @@ -100,17 +100,11 @@ impl MidnightCNightObservationInherentDataProvider { let mapping_validator_address = String::from_utf8(api.get_mapping_validator_address(parent_hash)?)?; let tx_capacity = api.get_utxo_capacity_per_block(parent_hash)?; - - // The over-fetch quantity used when querying db-sync is consensus-affecting: - // validators must agree on it to produce identical inherents. The reduction - // from 64x to 4x is therefore gated on the on-chain `CNightObservationApi` - // version: v2+ runtimes use the new factor, older runtimes keep the legacy - // 64x used by node binaries that shipped against v1. - let api_version = api - .api_version::>(parent_hash)? - .ok_or(IDPCreationError::CNightObservationApiUnavailable)?; - let overestimate_factor: u32 = if api_version >= 2 { 4 } else { 64 }; - let utxo_overestimate = tx_capacity.saturating_mul(overestimate_factor); + // UTXO acceptance envelope, derived here at the runtime-API boundary (where + // on-chain `tx_capacity` is read) and passed down — the db data source + // stays decoupled from runtime tokenomics. Matches the runtime's + // `process_tokens` bound so node and chain agree on what an inherent may hold. + let max_utxos = tx_capacity.saturating_mul(UTXO_PER_TX_OVERESTIMATE) as usize; let (cnight_policy_id, cnight_asset_name) = api.get_cnight_token_identifier(parent_hash)?; let auth_token_asset_name: String = api @@ -136,7 +130,7 @@ impl MidnightCNightObservationInherentDataProvider { &cardano_position_start, mc_hash, tx_capacity as usize, - utxo_overestimate as usize, + max_utxos, ) .await .map_err(IDPCreationError::DataSourceError)?; diff --git a/primitives/mainchain-follower/src/lib.rs b/primitives/mainchain-follower/src/lib.rs index d9daa0b62..c04ef8d45 100644 --- a/primitives/mainchain-follower/src/lib.rs +++ b/primitives/mainchain-follower/src/lib.rs @@ -56,7 +56,7 @@ pub mod inherent_provider { start_position: &CardanoPosition, current_tip: McBlockHash, tx_capacity: usize, - utxo_overestimate: usize, + max_utxos: usize, ) -> Result>; } diff --git a/primitives/mainchain-follower/tests/cnight_equivalence.rs b/primitives/mainchain-follower/tests/cnight_equivalence.rs index 704530292..31dbf4252 100644 --- a/primitives/mainchain-follower/tests/cnight_equivalence.rs +++ b/primitives/mainchain-follower/tests/cnight_equivalence.rs @@ -35,7 +35,6 @@ //! - `CNIGHT_EQUIV_FROM_BLOCK` first Cardano block_no (default: 0) //! - `CNIGHT_EQUIV_TO_BLOCK` last Cardano block_no (default: max in db) //! - `CNIGHT_EQUIV_TX_CAPACITY` whole-tx capacity per call (default: 200) -//! - `CNIGHT_EQUIV_UTXO_OVERESTIMATE` per-query SQL over-fetch bound (default: 12800) //! - `CNIGHT_EQUIV_MAX_COMPARISONS` cap on number of (start,tip) queries (default: 500) //! //! NOTE: the standard source clips its SQL window to `LIVE_PULL_BLOCK_DELTA` @@ -47,7 +46,7 @@ use std::sync::Arc; use midnight_primitives_cnight_observation::{ - CNightAddresses, CardanoPosition, TimestampUnixMillis, + CNightAddresses, CardanoPosition, TimestampUnixMillis, UTXO_PER_TX_OVERESTIMATE, }; use midnight_primitives_mainchain_follower::data_source::{ BulkCacheConfig, BulkCachedCNightObservationDataSource, DEFAULT_WINDOW_SIZE, @@ -114,9 +113,9 @@ async fn bulk_source_matches_standard_over_block_range() { }; assert!(to_block > from_block, "empty block range [{from_block}, {to_block}]"); - // Consensus knobs — override to match the runtime config of the network. + // Consensus knob — override to match the runtime config of the network. let tx_capacity: usize = env_parsed("CNIGHT_EQUIV_TX_CAPACITY").unwrap_or(200); - let utxo_overestimate: usize = env_parsed("CNIGHT_EQUIV_UTXO_OVERESTIMATE").unwrap_or(12_800); + let max_utxos = tx_capacity * UTXO_PER_TX_OVERESTIMATE as usize; let rows = sqlx::query( "SELECT block_no::bigint AS block_no, hash FROM block \ @@ -151,9 +150,10 @@ async fn bulk_source_matches_standard_over_block_range() { let window_start = whole_block_position(window_from, 0); let window_end = whole_block_position(window_to, u32::try_from(i32::MAX).expect("i32::MAX is non-negative")); - let events = bulk_pull(&pool, &addresses, &window_start, &window_end, LARGE_LIMIT) + let (events, complete) = bulk_pull(&pool, &addresses, &window_start, &window_end, LARGE_LIMIT) .await .expect("bulk_pull window"); + assert!(complete, "test window exceeded LARGE_LIMIT rows; widen LARGE_LIMIT or narrow the range"); eprintln!("cached window [{window_from}, {window_to}] holds {} events", events.len()); let db_fallback = Arc::new(MidnightCNightObservationDataSourceImpl::new(pool.clone(), None, 0)); @@ -195,23 +195,11 @@ async fn bulk_source_matches_standard_over_block_range() { }; let tip = blocks[(i + tip_delta).min(blocks.len() - 1)].1.clone(); let standard_result = standard - .get_utxos_up_to_capacity( - &addresses, - &start, - tip.clone(), - tx_capacity, - utxo_overestimate, - ) + .get_utxos_up_to_capacity(&addresses, &start, tip.clone(), tx_capacity, max_utxos) .await .expect("standard source query"); let bulk_result = bulk - .get_utxos_up_to_capacity( - &addresses, - &start, - tip.clone(), - tx_capacity, - utxo_overestimate, - ) + .get_utxos_up_to_capacity(&addresses, &start, tip.clone(), tx_capacity, max_utxos) .await .expect("bulk source query");