diff --git a/.sqlx/query-74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809.json b/.sqlx/query-1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc.json
similarity index 53%
rename from .sqlx/query-74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809.json
rename to .sqlx/query-1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc.json
index e93bf2d39..45f1b5735 100644
--- a/.sqlx/query-74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809.json
+++ b/.sqlx/query-1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc.json
@@ -1,6 +1,6 @@
{
"db_name": "PostgreSQL",
- "query": "\nSELECT\n low_tx.tx_id AS \"tx_id!\",\n low_tx_out.tx_out_id AS \"tx_out_id!\",\n low_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n low_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 LIMIT 1), 0) AS id) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id < block.id ORDER BY block_id DESC LIMIT 1), 0) AS tx_id) AS low_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id <= low_tx.tx_id ORDER BY tx_id DESC LIMIT 1), 0) AS tx_out_id) AS low_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id <= low_tx_out.tx_out_id ORDER BY tx_out_id DESC LIMIT 1), 0) AS ma_tx_out_id) AS low_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id <= low_tx.tx_id ORDER BY tx_in_id DESC LIMIT 1), 0) AS tx_in_id) AS low_tx_in;\n",
+ "query": "\nSELECT\n low_tx.tx_id AS \"tx_id!\",\n low_tx_out.tx_out_id AS \"tx_out_id!\",\n low_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n low_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 ORDER BY id ASC LIMIT 1), 0) AS id) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id < block.id ORDER BY block_id DESC LIMIT 1), 0) AS tx_id) AS low_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id <= low_tx.tx_id ORDER BY tx_id DESC LIMIT 1), 0) AS tx_out_id) AS low_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id <= low_tx_out.tx_out_id ORDER BY tx_out_id DESC LIMIT 1), 0) AS ma_tx_out_id) AS low_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id <= low_tx.tx_id ORDER BY tx_in_id DESC LIMIT 1), 0) AS tx_in_id) AS low_tx_in;\n",
"describe": {
"columns": [
{
@@ -36,5 +36,5 @@
null
]
},
- "hash": "74714e19742fdca05573c5ca8bac0f158ab909846bb80e82860f61816cab8809"
+ "hash": "1ba2028c65e58648de38b54240031c03f754d1e1a04fc6755694bdf8abc87afc"
}
diff --git a/.sqlx/query-542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de.json b/.sqlx/query-542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de.json
new file mode 100644
index 000000000..730fc962e
--- /dev/null
+++ b/.sqlx/query-542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de.json
@@ -0,0 +1,40 @@
+{
+ "db_name": "PostgreSQL",
+ "query": "\nSELECT\n high_tx.tx_id AS \"tx_id!\",\n high_tx_out.tx_out_id AS \"tx_out_id!\",\n high_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n high_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT id FROM block WHERE block_no = $1 ORDER BY id DESC LIMIT 1) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id > block.id ORDER BY block_id ASC LIMIT 1), 9223372036854775807) AS tx_id) AS high_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id >= high_tx.tx_id ORDER BY tx_id ASC LIMIT 1), 9223372036854775807) AS tx_out_id) AS high_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id >= high_tx_out.tx_out_id ORDER BY tx_out_id ASC LIMIT 1), 9223372036854775807) AS ma_tx_out_id) AS high_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id >= high_tx.tx_id ORDER BY tx_in_id ASC LIMIT 1), 9223372036854775807) AS tx_in_id) AS high_tx_in;\n",
+ "describe": {
+ "columns": [
+ {
+ "ordinal": 0,
+ "name": "tx_id!",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 1,
+ "name": "tx_out_id!",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 2,
+ "name": "ma_tx_out_id!",
+ "type_info": "Int8"
+ },
+ {
+ "ordinal": 3,
+ "name": "tx_in_id!",
+ "type_info": "Int8"
+ }
+ ],
+ "parameters": {
+ "Left": [
+ "Int4"
+ ]
+ },
+ "nullable": [
+ null,
+ null,
+ null,
+ null
+ ]
+ },
+ "hash": "542a58ed5ad9ff37e14402f920b5be5f5590909ba68a4c805ac779f86aa826de"
+}
diff --git a/.sqlx/query-ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9.json b/.sqlx/query-ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9.json
deleted file mode 100644
index cc99fa95d..000000000
--- a/.sqlx/query-ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9.json
+++ /dev/null
@@ -1,40 +0,0 @@
-{
- "db_name": "PostgreSQL",
- "query": "\nSELECT\n high_tx.tx_id AS \"tx_id!\",\n high_tx_out.tx_out_id AS \"tx_out_id!\",\n high_ma_tx_out.ma_tx_out_id AS \"ma_tx_out_id!\",\n high_tx_in.tx_in_id AS \"tx_in_id!\"\nFROM\n (SELECT id FROM block WHERE block_no = $1 LIMIT 1) AS block,\n LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id > block.id ORDER BY block_id ASC LIMIT 1), 9223372036854775807) AS tx_id) AS high_tx,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id >= high_tx.tx_id ORDER BY tx_id ASC LIMIT 1), 9223372036854775807) AS tx_out_id) AS high_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id >= high_tx_out.tx_out_id ORDER BY tx_out_id ASC LIMIT 1), 9223372036854775807) AS ma_tx_out_id) AS high_ma_tx_out,\n LATERAL (SELECT COALESCE((SELECT id FROM tx_in WHERE tx_in.tx_in_id >= high_tx.tx_id ORDER BY tx_in_id ASC LIMIT 1), 9223372036854775807) AS tx_in_id) AS high_tx_in;\n",
- "describe": {
- "columns": [
- {
- "ordinal": 0,
- "name": "tx_id!",
- "type_info": "Int8"
- },
- {
- "ordinal": 1,
- "name": "tx_out_id!",
- "type_info": "Int8"
- },
- {
- "ordinal": 2,
- "name": "ma_tx_out_id!",
- "type_info": "Int8"
- },
- {
- "ordinal": 3,
- "name": "tx_in_id!",
- "type_info": "Int8"
- }
- ],
- "parameters": {
- "Left": [
- "Int4"
- ]
- },
- "nullable": [
- null,
- null,
- null,
- null
- ]
- },
- "hash": "ff6315ab1c9f1ac5f7b5672157310fd75fa29f680ac5f19158267ecea0ece6a9"
-}
diff --git a/changes/node/changed/cnight-observation-fetch-determinism.md b/changes/node/changed/cnight-observation-fetch-determinism.md
new file mode 100644
index 000000000..18f434c39
--- /dev/null
+++ b/changes/node/changed/cnight-observation-fetch-determinism.md
@@ -0,0 +1,24 @@
+#node #runtime #security
+# Make cNIGHT observation inherent generation deterministic and unskippable
+
+The cNIGHT inherent provider used a row-count over-fetch (`tx_capacity * factor`)
+as a one-shot SQL `LIMIT`, then treated "fewer than `tx_capacity` distinct txs
+returned" as "range complete" and advanced the Cardano cursor to the tip —
+without checking whether the query was truncated by its limit. A range holding
+more matching rows than the limit across fewer txs (many UTXOs per tx) was
+silently skipped, and a node that fetched more rows derived a different inherent
+→ `check_inherent` rejection (fork/liveness) plus corrupted mint/burn accounting.
+
+Fix: one deterministic path for both the in-memory cache and the db fallback.
+Fetch the complete range (`bulk_pull` now reports whether its limit was hit) and
+truncate whole-transaction to the runtime envelope (`tx_capacity *
+UTXO_PER_TX_OVERESTIMATE`). The cursor reaches the tip only on a proven-complete
+fetch; otherwise it stops at the last fully-observed tx. With no fetch-size input
+left, every node derives identical inherents.
+
+`UTXO_PER_TX_OVERESTIMATE` moves to `midnight-primitives-cnight-observation` as
+the single source shared by runtime and node. Byte-identical to the prior 64x
+path on benign history, so finalized blocks replay unchanged.
+
+PR:
+Issue:
diff --git a/ledger/src/lib.rs b/ledger/src/lib.rs
index a62f0290a..dc9483a4e 100644
--- a/ledger/src/lib.rs
+++ b/ledger/src/lib.rs
@@ -154,6 +154,129 @@ pub fn drop_all_default_storage() {
ledger_9::storage::drop_default_storage_if_exists();
}
+/// Genesis bootstrap that dispatches on the ledger version embedded in the
+/// genesis state, instead of assuming the newest.
+///
+/// All ledger versions are compiled in, but a genesis blob is serialized with
+/// exactly one version's `LedgerState`, and its deserializer rejects any other
+/// version's tag. Hardcoding the latest (as each ledger bump historically did)
+/// makes the node unable to bootstrap a chain still on an older genesis — e.g.
+/// live mainnet, whose genesis is immutable. These wrappers read the blob's tag
+/// and route to the matching module. This is *not* state migration: each chain
+/// keeps running its own ledger version; we only stop forcing the newest at the
+/// genesis boundary.
+#[cfg(feature = "std")]
+pub mod genesis_version {
+ /// Ledger storage version a tagged genesis blob was produced with.
+ #[derive(Debug, Clone, Copy, PartialEq, Eq)]
+ pub enum LedgerVersion {
+ /// ledger 7
+ V7,
+ /// ledger 8
+ V8,
+ /// ledger 9
+ V9,
+ }
+
+ /// Detect a genesis blob's ledger version by matching its leading tag
+ /// against each compiled-in module's own `LedgerState` tag. Newest first.
+ /// `None` if no known version matches (corrupt or future-version blob).
+ pub fn detect(genesis_state: &[u8]) -> Option {
+ let head = &genesis_state[..genesis_state.len().min(64)];
+ let has = |tag: &str| head.windows(tag.len()).any(|w| w == tag.as_bytes());
+ if has(&super::ledger_9::storage::genesis_ledger_state_tag()) {
+ Some(LedgerVersion::V9)
+ } else if has(&super::ledger_8::storage::genesis_ledger_state_tag()) {
+ Some(LedgerVersion::V8)
+ } else if has(&super::ledger_7::storage::genesis_ledger_state_tag()) {
+ Some(LedgerVersion::V7)
+ } else {
+ None
+ }
+ }
+
+ fn unknown_version(genesis_state: &[u8]) -> String {
+ format!(
+ "unrecognised ledger genesis version tag in {}-byte genesis blob (known: {}, {}, {})",
+ genesis_state.len(),
+ super::ledger_9::storage::genesis_ledger_state_tag(),
+ super::ledger_8::storage::genesis_ledger_state_tag(),
+ super::ledger_7::storage::genesis_ledger_state_tag(),
+ )
+ }
+
+ /// Version-dispatched [`get_root`](super::ledger_9::storage::get_root).
+ pub fn get_root(genesis_state: &[u8], network_id: Option<&str>) -> Result, String> {
+ match detect(genesis_state) {
+ Some(LedgerVersion::V9) => super::ledger_9::storage::get_root(genesis_state, network_id)
+ .map_err(|e| e.to_string()),
+ Some(LedgerVersion::V8) => super::ledger_8::storage::get_root(genesis_state, network_id)
+ .map_err(|e| e.to_string()),
+ Some(LedgerVersion::V7) => super::ledger_7::storage::get_root(genesis_state, network_id)
+ .map_err(|e| e.to_string()),
+ None => Err(unknown_version(genesis_state)),
+ }
+ }
+
+ /// Version-dispatched separate-db genesis init.
+ pub fn init_storage_paritydb_separate>(
+ dir: P,
+ genesis_state: &[u8],
+ cache_size: usize,
+ ) -> Vec {
+ match detect(genesis_state) {
+ Some(LedgerVersion::V9) => super::ledger_9::storage::init_storage_paritydb_separate(
+ dir,
+ genesis_state,
+ cache_size,
+ ),
+ Some(LedgerVersion::V8) => super::ledger_8::storage::init_storage_paritydb_separate(
+ dir,
+ genesis_state,
+ cache_size,
+ ),
+ Some(LedgerVersion::V7) => super::ledger_7::storage::init_storage_paritydb_separate(
+ dir,
+ genesis_state,
+ cache_size,
+ ),
+ None => panic!("{}", unknown_version(genesis_state)),
+ }
+ }
+
+ /// Version-dispatched unified-db genesis init.
+ pub fn init_storage_paritydb_unified<
+ D: std::ops::Deref + Default + Send + Sync + 'static,
+ const COLUMN_OFFSET: u8,
+ >(
+ db_instance: D,
+ genesis_state: &[u8],
+ cache_size: usize,
+ ) -> Vec {
+ match detect(genesis_state) {
+ Some(LedgerVersion::V9) =>
+ super::ledger_9::storage::init_storage_paritydb_unified::(
+ db_instance,
+ genesis_state,
+ cache_size,
+ ),
+ Some(LedgerVersion::V8) =>
+ super::ledger_8::storage::init_storage_paritydb_unified::(
+ db_instance,
+ genesis_state,
+ cache_size,
+ ),
+ Some(LedgerVersion::V7) =>
+ super::ledger_7::storage::init_storage_paritydb_unified::(
+ db_instance,
+ genesis_state,
+ cache_size,
+ ),
+ None => panic!("{}", unknown_version(genesis_state)),
+ }
+ }
+}
+
mod common;
pub mod types {
diff --git a/ledger/src/versions/common/storage.rs b/ledger/src/versions/common/storage.rs
index ef39ea8d9..ec3aebf26 100644
--- a/ledger/src/versions/common/storage.rs
+++ b/ledger/src/versions/common/storage.rs
@@ -74,6 +74,17 @@ impl core::fmt::Display for GetRootError {
}
}
+/// The `LedgerState` serialization tag for this ledger version, e.g.
+/// `ledger-state[v13]` (ledger 8) or `ledger-state[v16]` (ledger 9). A tagged
+/// genesis blob carries this verbatim, so it lets the bootstrap pick the right
+/// deserializer before touching the body (see [`crate::genesis_version`]).
+#[cfg(feature = "std")]
+pub fn genesis_ledger_state_tag() -> std::borrow::Cow<'static, str> {
+ use super::ledger_storage_local::DefaultDB;
+ use super::midnight_serialize_local::Tagged;
+ as Tagged>::tag()
+}
+
pub fn get_root(state: &[u8], network_id: Option<&str>) -> Result, GetRootError> {
// Get empty state key
use super::api::Ledger;
diff --git a/node/src/backend/custom_parity_db.rs b/node/src/backend/custom_parity_db.rs
index 5cd5ab729..688380d89 100644
--- a/node/src/backend/custom_parity_db.rs
+++ b/node/src/backend/custom_parity_db.rs
@@ -109,7 +109,7 @@ pub fn open>(
match storage_config.separation {
StorageSeparation::Separate => {
- midnight_node_ledger::ledger_9::storage::init_storage_paritydb_separate(
+ midnight_node_ledger::genesis_version::init_storage_paritydb_separate(
&storage_config.db_path,
&storage_config.genesis_state,
storage_config.cache_size,
@@ -117,7 +117,7 @@ pub fn open>(
Ok((OwnedDb(db), LedgerStorageDb::SeparateDb(storage_config.db_path.clone())))
},
StorageSeparation::Unified => {
- midnight_node_ledger::ledger_9::storage::init_storage_paritydb_unified::<
+ midnight_node_ledger::genesis_version::init_storage_paritydb_unified::<
_,
NUM_COLUMNS_POLKADOT,
>(OwnedDb(db.clone()), &storage_config.genesis_state, storage_config.cache_size);
diff --git a/node/src/chain_spec/mod.rs b/node/src/chain_spec/mod.rs
index 7fafe3f32..b8a84cc9d 100644
--- a/node/src/chain_spec/mod.rs
+++ b/node/src/chain_spec/mod.rs
@@ -56,7 +56,7 @@ pub enum ChainSpecInitError {
Missing(String),
ParseError(String),
Serialization(String),
- GenesisStateError(midnight_node_ledger::ledger_9::storage::GetRootError),
+ GenesisStateError(String),
}
impl fmt::Display for ChainSpecInitError {
@@ -268,7 +268,7 @@ fn genesis_config(genesis: T) -> Result u128 {
+ u128::try_from(quantity).unwrap_or_else(|_| {
+ panic!(
+ "negative cNIGHT asset quantity {quantity} from db-sync — data source corrupt or forged"
+ )
+ })
+}
+
#[derive(thiserror::Error, Debug)]
pub enum RegistrationDatumDecodeError {
#[error("Cardano credential not bytes")]
@@ -107,7 +119,7 @@ impl MidnightCNightObservationDataSource for MidnightCNightObservationDataSource
start_position: &CardanoPosition,
current_tip: McBlockHash,
tx_capacity: usize,
- utxo_overestimate: usize,
+ max_utxos: usize,
) -> Result> {
// Resolve current_tip -> CardanoPosition. This must preserve the historic
// replay semantics: query through the block's Cardano tip and only then
@@ -121,18 +133,18 @@ impl MidnightCNightObservationDataSource for MidnightCNightObservationDataSource
drop(_block_timer);
let end = end.increment();
- // The over-fetch bound is consensus-affecting and runtime-supplied, so it
- // must flow into the SQL row limit (see `bulk_pull`) rather than a fixed
- // client-side constant.
- let utxos =
- bulk_pull(&self.pool, config, start_position, &end, utxo_overestimate).await?;
- let (result, _full_window) = truncate_to_tx_capacity(
- utxos,
- tx_capacity,
- start_position,
- end,
- );
- Ok(result)
+ // This single query serves exactly one inherent, so fetch only one
+ // envelope's worth plus a sentinel row. Anything past `max_utxos` is
+ // discarded by `truncate_to_tx_capacity` anyway; a subquery returning the
+ // full `max_utxos + 1` rows flags the range row-limited
+ // (`complete = false`), holding the cursor short of the tip. Each category
+ // query is `ORDER BY (block_no, block_index, ...) LIMIT n`, so this prefix
+ // holds the same first-`max_utxos` rows the old whole-range pull did:
+ // byte-identical inherent, far less resident. The cache path still pulls
+ // whole windows with `LARGE_LIMIT`; both truncate to the same envelope.
+ let (utxos, complete) =
+ bulk_pull(&self.pool, config, start_position, &end, max_utxos.saturating_add(1)).await?;
+ Ok(truncate_to_tx_capacity(utxos, tx_capacity, max_utxos, complete, start_position, end))
}
}
);
@@ -333,7 +345,7 @@ impl MidnightCNightObservationDataSourceImpl {
let utxo = ObservedUtxo {
header,
data: ObservedUtxoData::AssetCreate(CreateData {
- value: row.quantity as u128,
+ value: checked_asset_quantity(row.quantity),
owner,
utxo_tx_hash: McTxHash(row.tx_hash.0),
utxo_tx_index: row.utxo_index.0,
@@ -390,7 +402,7 @@ impl MidnightCNightObservationDataSourceImpl {
let utxo = ObservedUtxo {
header,
data: ObservedUtxoData::AssetSpend(SpendData {
- value: row.quantity as u128,
+ value: checked_asset_quantity(row.quantity),
owner,
utxo_tx_hash: McTxHash(row.utxo_tx_hash.0),
utxo_tx_index: row.utxo_index.0,
@@ -404,3 +416,22 @@ impl MidnightCNightObservationDataSourceImpl {
Ok(utxos)
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn checked_asset_quantity_accepts_non_negative() {
+ assert_eq!(checked_asset_quantity(0), 0);
+ assert_eq!(checked_asset_quantity(1), 1);
+ assert_eq!(checked_asset_quantity(i64::MAX), i64::MAX as u128);
+ }
+
+ #[test]
+ #[should_panic(expected = "negative cNIGHT asset quantity")]
+ fn checked_asset_quantity_panics_on_negative() {
+ // A bare `as u128` would sign-extend -1 to ~1.8e19 and mint it as DUST.
+ let _ = checked_asset_quantity(-1);
+ }
+}
diff --git a/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs b/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs
index ab9c1af7c..692d85b6c 100644
--- a/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs
+++ b/primitives/mainchain-follower/src/data_source/cnight_observation_bulk.rs
@@ -36,10 +36,11 @@ use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
-/// Effectively-no-limit page size for bulk pulls. The query path supports
-/// `LIMIT` for paged use but the sliding window wants the whole range in
-/// one shot.
-const LARGE_LIMIT: usize = 5_000_000;
+/// Row ceiling for a "whole range" pull. Both the sliding window and the
+/// consensus path pass this and rely on completeness (not a row count) for
+/// determinism; a pull that actually returns this many rows is treated as
+/// truncated (see `bulk_pull`).
+pub const LARGE_LIMIT: usize = 5_000_000;
/// Default number of cardano blocks to keep in the sliding window when the
/// node config doesn't override it. Memory cost ≈ 5 KB × events-per-block,
@@ -72,23 +73,25 @@ pub enum BulkPullError {
Observation(#[from] MidnightCNightObservationDataSourceError),
}
-/// Pull every cnight observation event in `[start, end]` (inclusive) and
-/// return them sorted ascending by `tx_position`.
+/// Pull every cnight observation event in `[start, end]` (inclusive), sorted by
+/// `tx_position`, with `complete = true` iff no query hit `limit` (so the result
+/// is the whole range, not a row-limited prefix).
///
-/// Both endpoints are full `CardanoPosition`s so the per-call data source can
-/// pass exact `(block_number, tx_index_in_block)` boundaries while the bulk
-/// /sliding-window paths can pass whole-block ranges via
+/// `complete == false` means a query held >`limit` rows. At [`LARGE_LIMIT`] that
+/// is pathological, so it is logged at error level; callers must not advance the
+/// cursor to the tip on it (that would skip the unfetched rows).
+///
+/// Both endpoints are full `CardanoPosition`s so callers can pass exact
+/// `(block, tx_index)` boundaries or whole-block ranges via
/// `CardanoPosition::{min,max}_for_block`.
pub async fn bulk_pull(
pool: &PgPool,
cfg: &CNightAddresses,
start: &CardanoPosition,
end: &CardanoPosition,
- // Per-query SQL row limit (over-fetch bound). The consensus inherent path
- // passes the runtime-supplied `utxo_overestimate`; the background cache
- // refresh passes `LARGE_LIMIT` to pull a whole multi-block window.
+ // Per-query SQL row limit; callers pass `LARGE_LIMIT` for a whole-range pull.
limit: usize,
-) -> Result, BulkPullError> {
+) -> Result<(Vec, bool), BulkPullError> {
let data_source = MidnightCNightObservationDataSourceImpl::new(pool.clone(), None, 0);
let mapping_validator_address = Address::from_bech32(&cfg.mapping_validator_address)
@@ -156,51 +159,101 @@ pub async fn bulk_pull(
all.extend(v);
}
all.sort();
+ // A query that returned exactly `limit` rows may have more behind it, so the
+ // pull is not provably complete.
+ let complete =
+ counts.0 < limit && counts.1 < limit && counts.2 < limit && counts.3 < limit;
log::info!(
target: "cnight::sliding-window",
- "bulk_pull [{}/{}, {}/{}] -> reg={} dereg={} create={} spend={} (auth_ident={:?} cnight_ident={:?})",
+ "bulk_pull [{}/{}, {}/{}] -> reg={} dereg={} create={} spend={} complete={} (auth_ident={:?} cnight_ident={:?})",
start.block_number, start.tx_index_in_block,
end.block_number, end.tx_index_in_block,
- counts.0, counts.1, counts.2, counts.3, auth_token_ident, cnight_ident,
+ counts.0, counts.1, counts.2, counts.3, complete, auth_token_ident, cnight_ident,
);
- Ok(all)
+ if !complete {
+ log::error!(
+ target: "cnight::sliding-window",
+ "bulk_pull hit the {limit}-row limit in [{}, {}] (reg={} dereg={} create={} spend={}); \
+ the cNIGHT observation window may be incomplete and the cursor will not advance to the tip",
+ start.block_number, end.block_number, counts.0, counts.1, counts.2, counts.3,
+ );
+ }
+ Ok((all, complete))
}
-/// Truncate a sorted, unique-position event list to at most `tx_capacity`
-/// whole transactions. Returns the truncated `ObservedUtxos` plus a flag
-/// indicating whether the full input fit (`true`: all events accepted up to
-/// `fallback_end`; `false`: capacity hit and `result.end` is the position
-/// just past the last accepted event).
+/// Build the observation inherent from the **complete** sorted event list for
+/// `[start, fallback_end]`. Caps at `tx_capacity` whole transactions and
+/// `max_utxos` UTXOs (the runtime `process_tokens` envelope).
+///
+/// Consensus invariants:
+/// - Transactions are admitted whole, so `end` lands on a tx boundary; resuming
+/// there cannot skip a counted tx's UTXOs.
+/// - The cursor reaches `fallback_end` (the tip) ONLY when `complete` (the fetch
+/// wasn't row-limited); otherwise it stops at the last fully-observed tx.
+///
+/// Every node feeds this the complete range and the same `max_utxos`, so there is
+/// no fetch-size input left to disagree on — inherents are byte-identical.
pub fn truncate_to_tx_capacity(
events: Vec,
tx_capacity: usize,
+ max_utxos: usize,
+ complete: bool,
start_position: &CardanoPosition,
fallback_end: CardanoPosition,
-) -> (ObservedUtxos, bool) {
- let mut truncated: Vec = Vec::with_capacity(events.len().min(tx_capacity * 64));
+) -> ObservedUtxos {
+ let n = events.len();
+ let mut truncated: Vec = Vec::with_capacity(n.min(max_utxos));
let mut num_txs: usize = 0;
- let mut cur_tx: Option = None;
- for utxo in events {
- if cur_tx.as_ref().is_none_or(|tx| tx < &utxo.header.tx_position) {
- num_txs += 1;
- cur_tx = Some(utxo.header.tx_position.clone());
+ let mut capped = false;
+ let mut i = 0usize;
+ while i < n {
+ // Extent [i, j) of the whole tx at `i` — its UTXOs share a position.
+ let mut j = i + 1;
+ while j < n
+ && events[j].header.tx_position.partial_cmp(&events[i].header.tx_position)
+ == Some(core::cmp::Ordering::Equal)
+ {
+ j += 1;
}
- if num_txs == tx_capacity {
+ let tx_len = j - i;
+ // A lone tx bigger than the whole envelope is still admitted (cap check
+ // skipped while empty) so the runtime's bound rejects it loudly instead
+ // of us stalling forever.
+ let exceeds_tx_cap = num_txs + 1 > tx_capacity;
+ let exceeds_max_utxos = !truncated.is_empty() && truncated.len() + tx_len > max_utxos;
+ if exceeds_tx_cap || exceeds_max_utxos {
+ capped = true;
break;
}
- truncated.push(utxo);
+ truncated.extend_from_slice(&events[i..j]);
+ num_txs += 1;
+ i = j;
}
- let full_window = num_txs < tx_capacity;
- let end = if full_window {
+
+ let end = if capped {
+ // Caps are far below LARGE_LIMIT, so the last accepted tx is whole.
+ boundary_after(&truncated, start_position)
+ } else if complete {
fallback_end
} else {
- truncated
- .last()
- .map(|u| u.header.tx_position.clone())
- .unwrap_or_else(|| start_position.clone())
- .increment()
+ // Row-limited: the final fetched tx may be missing UTXOs past the limit,
+ // so drop it and resume only as far as we proved complete.
+ if let Some(last) = truncated.last().map(|u| u.header.tx_position.clone()) {
+ truncated.retain(|u| u.header.tx_position < last);
+ }
+ boundary_after(&truncated, start_position)
};
- (ObservedUtxos { start: start_position.clone(), end, utxos: truncated }, full_window)
+
+ ObservedUtxos { start: start_position.clone(), end, utxos: truncated }
+}
+
+/// Position just past the last accepted event, or `start` if none were accepted.
+fn boundary_after(truncated: &[ObservedUtxo], start_position: &CardanoPosition) -> CardanoPosition {
+ truncated
+ .last()
+ .map(|u| u.header.tx_position.clone())
+ .unwrap_or_else(|| start_position.clone())
+ .increment()
}
/// Cached result of the previous `get_utxos_up_to_capacity` call. During
@@ -421,9 +474,9 @@ impl RefreshContext {
CardanoPosition::min_for_block(from_block),
CardanoPosition::max_for_block(target_end),
);
- // Warming the cache means pulling a whole multi-block window, so the per-query
- // limit is the wide `LARGE_LIMIT` rather than the per-block over-fetch bound.
- let extension =
+ // Whole multi-block window, so `LARGE_LIMIT`. `_complete`: bulk_pull
+ // already error-logs a truncated pull (which would leave a gap here).
+ let (extension, _complete) =
bulk_pull(&self.pool, &self.cnight_addresses, &start, &end, LARGE_LIMIT).await?;
{
let mut events_guard =
@@ -522,7 +575,7 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour
start_position: &CardanoPosition,
current_tip: McBlockHash,
tx_capacity: usize,
- utxo_overestimate: usize,
+ max_utxos: usize,
) -> Result> {
// Same-tip cache: if `current_tip` and `start_position` are both
// unchanged, the Cardano window hasn't grown, so reuse the previous
@@ -598,7 +651,7 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour
start_position,
current_tip,
tx_capacity,
- utxo_overestimate,
+ max_utxos,
)
.await;
}
@@ -612,7 +665,7 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour
start_position,
current_tip,
tx_capacity,
- utxo_overestimate,
+ max_utxos,
)
.await;
}
@@ -625,8 +678,10 @@ impl MidnightCNightObservationDataSource for BulkCachedCNightObservationDataSour
Ok(guard) => slice_range(&guard, start_position, &end).to_vec(),
Err(_) => Vec::new(),
};
- let (result, _full_window) =
- truncate_to_tx_capacity(window, tx_capacity, start_position, end);
+ // Window holds the complete range, so `complete = true` — same inputs as
+ // the db fallback, hence the same inherent.
+ let result =
+ truncate_to_tx_capacity(window, tx_capacity, max_utxos, true, start_position, end);
if let Ok(mut guard) = self.last_observation.lock() {
*guard = Some(LastObservation {
@@ -780,4 +835,84 @@ mod tests {
existing.iter().map(|u| u.header.tx_position.block_number).collect();
assert_eq!(block_numbers, (14..20).collect::>());
}
+
+ /// `block_number`-th transaction, `utxo_index`-th UTXO within it. UTXOs that
+ /// share `block_number` belong to the same Cardano transaction (one distinct
+ /// `tx_position`).
+ fn utxo_with_index(block_number: u32, utxo_index: u16) -> ObservedUtxo {
+ let mut u = utxo(block_number, 0);
+ u.header.utxo_index = UtxoIndexInTx(utxo_index);
+ u
+ }
+
+ /// 50 transactions, 5 UTXOs each — distinct `tx_position` per transaction.
+ fn fifty_txs_five_utxos() -> Vec {
+ (0..50u32).flat_map(|tx| (0..5u16).map(move |u| utxo_with_index(tx, u))).collect()
+ }
+
+ /// The cNIGHT observation skip bug, fixed: a row-limited (incomplete) fetch
+ /// must NEVER advance `next_cardano_position` to the tip. The rows between
+ /// the last observed tx and the tip would be skipped forever, and a node
+ /// that DID fetch them would build a different inherent (check_inherent
+ /// split). Before the fix, "fewer than tx_capacity distinct txs" was treated
+ /// as "range complete" and the cursor jumped to the tip regardless.
+ #[test]
+ fn incomplete_fetch_must_not_advance_to_tip() {
+ // The range holds 50 txs but the fetch was row-limited to the first 40.
+ let fetched: Vec =
+ (0..40u32).flat_map(|tx| (0..5u16).map(move |u| utxo_with_index(tx, u))).collect();
+ let tip = pos(100, 0);
+
+ let obs = truncate_to_tx_capacity(
+ fetched, /* tx_capacity */ 1000, /* max_utxos */ 100_000, /* complete */ false,
+ &pos(0, 0), tip.clone(),
+ );
+
+ assert_ne!(obs.end, tip, "advanced to tip on an incomplete fetch -> skips unfetched txs");
+ // The last fetched tx may be partial, so it is dropped; resume at the last
+ // provably-whole tx (tx 38), never past observed data.
+ assert_eq!(obs.end, pos(38, 0).increment());
+ assert!(
+ obs.utxos.iter().all(|u| u.header.tx_position < pos(39, 0)),
+ "retained a tx at/after the truncation frontier",
+ );
+ }
+
+ /// A complete fetch under both caps reports the whole range and advances to
+ /// the tip — the steady-state case, and what both the in-memory cache slice
+ /// and the LARGE_LIMIT db fetch produce for the same block (hence no
+ /// cache-vs-fallback divergence).
+ #[test]
+ fn complete_fetch_advances_to_tip() {
+ let events = fifty_txs_five_utxos();
+ let tip = pos(100, 0);
+ let obs =
+ truncate_to_tx_capacity(events.clone(), 1000, 100_000, true, &pos(0, 0), tip.clone());
+ assert_eq!(obs.end, tip);
+ assert_eq!(obs.utxos.len(), events.len());
+ }
+
+ /// The UTXO envelope cap truncates at a whole-tx boundary: never
+ /// mid-transaction (resuming would skip the rest of that tx's UTXOs) and
+ /// never above the runtime acceptance bound.
+ #[test]
+ fn utxo_envelope_cap_truncates_at_whole_tx() {
+ // 250 events. Cap 22 -> 4 whole txs (20 UTXOs); the 5th (would reach 25)
+ // is held back.
+ let obs = truncate_to_tx_capacity(fifty_txs_five_utxos(), 1000, 22, true, &pos(0, 0), pos(100, 0));
+ assert_eq!(obs.utxos.len(), 20, "did not cut on a whole-tx boundary");
+ assert!(obs.utxos.len() <= 22);
+ assert_eq!(obs.end, pos(3, 0).increment(), "must resume past the last whole tx");
+ }
+
+ /// The transaction-count cap admits at most `tx_capacity` whole transactions.
+ #[test]
+ fn tx_capacity_cap_truncates_at_whole_tx() {
+ let obs = truncate_to_tx_capacity(fifty_txs_five_utxos(), 10, 100_000, true, &pos(0, 0), pos(100, 0));
+ let distinct: std::collections::BTreeSet =
+ obs.utxos.iter().map(|u| u.header.tx_position.block_number).collect();
+ assert_eq!(distinct.len(), 10);
+ assert_eq!(obs.utxos.len(), 50, "10 txs x 5 UTXOs");
+ assert_eq!(obs.end, pos(9, 0).increment());
+ }
}
diff --git a/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs b/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs
index ff6d54dd6..8dc355681 100644
--- a/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs
+++ b/primitives/mainchain-follower/src/data_source/cnight_observation_mock.rs
@@ -84,7 +84,7 @@ impl MidnightCNightObservationDataSource for CNightObservationDataSourceMock {
start: &CardanoPosition,
_current_tip: McBlockHash,
_tx_capacity: usize,
- _utxo_overestimate: usize,
+ _max_utxos: usize,
) -> Result> {
let mut end = start.clone();
end.block_number += 1;
diff --git a/primitives/mainchain-follower/src/db/queries/cnight_observation.rs b/primitives/mainchain-follower/src/db/queries/cnight_observation.rs
index f266e0c4a..e83eaaeb3 100644
--- a/primitives/mainchain-follower/src/db/queries/cnight_observation.rs
+++ b/primitives/mainchain-follower/src/db/queries/cnight_observation.rs
@@ -404,7 +404,7 @@ SELECT
low_ma_tx_out.ma_tx_out_id AS "ma_tx_out_id!",
low_tx_in.tx_in_id AS "tx_in_id!"
FROM
- (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 LIMIT 1), 0) AS id) AS block,
+ (SELECT COALESCE ((SELECT id FROM block WHERE block_no = $1 ORDER BY id ASC LIMIT 1), 0) AS id) AS block,
LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id < block.id ORDER BY block_id DESC LIMIT 1), 0) AS tx_id) AS low_tx,
LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id <= low_tx.tx_id ORDER BY tx_id DESC LIMIT 1), 0) AS tx_out_id) AS low_tx_out,
LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id <= low_tx_out.tx_out_id ORDER BY tx_out_id DESC LIMIT 1), 0) AS ma_tx_out_id) AS low_ma_tx_out,
@@ -435,7 +435,7 @@ SELECT
high_ma_tx_out.ma_tx_out_id AS "ma_tx_out_id!",
high_tx_in.tx_in_id AS "tx_in_id!"
FROM
- (SELECT id FROM block WHERE block_no = $1 LIMIT 1) AS block,
+ (SELECT id FROM block WHERE block_no = $1 ORDER BY id DESC LIMIT 1) AS block,
LATERAL (SELECT COALESCE((SELECT id FROM tx WHERE block_id > block.id ORDER BY block_id ASC LIMIT 1), 9223372036854775807) AS tx_id) AS high_tx,
LATERAL (SELECT COALESCE((SELECT id FROM tx_out WHERE tx_id >= high_tx.tx_id ORDER BY tx_id ASC LIMIT 1), 9223372036854775807) AS tx_out_id) AS high_tx_out,
LATERAL (SELECT COALESCE((SELECT id FROM ma_tx_out WHERE tx_out_id >= high_tx_out.tx_out_id ORDER BY tx_out_id ASC LIMIT 1), 9223372036854775807) AS ma_tx_out_id) AS high_ma_tx_out,
diff --git a/primitives/mainchain-follower/src/idp/cnight_observation.rs b/primitives/mainchain-follower/src/idp/cnight_observation.rs
index d00df21bd..8bd9ccb65 100644
--- a/primitives/mainchain-follower/src/idp/cnight_observation.rs
+++ b/primitives/mainchain-follower/src/idp/cnight_observation.rs
@@ -16,7 +16,7 @@
use crate::{MidnightCNightObservationDataSource, MidnightObservationTokenMovement, ObservedUtxo};
use midnight_primitives_cnight_observation::{
CNightAddresses, CNightObservationApi, CardanoPosition, INHERENT_IDENTIFIER, InherentError,
- TimestampUnixMillis,
+ TimestampUnixMillis, UTXO_PER_TX_OVERESTIMATE,
};
use parity_scale_codec::Decode;
use sidechain_domain::McBlockHash;
@@ -100,17 +100,11 @@ impl MidnightCNightObservationInherentDataProvider {
let mapping_validator_address =
String::from_utf8(api.get_mapping_validator_address(parent_hash)?)?;
let tx_capacity = api.get_utxo_capacity_per_block(parent_hash)?;
-
- // The over-fetch quantity used when querying db-sync is consensus-affecting:
- // validators must agree on it to produce identical inherents. The reduction
- // from 64x to 4x is therefore gated on the on-chain `CNightObservationApi`
- // version: v2+ runtimes use the new factor, older runtimes keep the legacy
- // 64x used by node binaries that shipped against v1.
- let api_version = api
- .api_version::>(parent_hash)?
- .ok_or(IDPCreationError::CNightObservationApiUnavailable)?;
- let overestimate_factor: u32 = if api_version >= 2 { 4 } else { 64 };
- let utxo_overestimate = tx_capacity.saturating_mul(overestimate_factor);
+ // UTXO acceptance envelope, derived here at the runtime-API boundary (where
+ // on-chain `tx_capacity` is read) and passed down — the db data source
+ // stays decoupled from runtime tokenomics. Matches the runtime's
+ // `process_tokens` bound so node and chain agree on what an inherent may hold.
+ let max_utxos = tx_capacity.saturating_mul(UTXO_PER_TX_OVERESTIMATE) as usize;
let (cnight_policy_id, cnight_asset_name) = api.get_cnight_token_identifier(parent_hash)?;
let auth_token_asset_name: String = api
@@ -136,7 +130,7 @@ impl MidnightCNightObservationInherentDataProvider {
&cardano_position_start,
mc_hash,
tx_capacity as usize,
- utxo_overestimate as usize,
+ max_utxos,
)
.await
.map_err(IDPCreationError::DataSourceError)?;
diff --git a/primitives/mainchain-follower/src/lib.rs b/primitives/mainchain-follower/src/lib.rs
index d9daa0b62..c04ef8d45 100644
--- a/primitives/mainchain-follower/src/lib.rs
+++ b/primitives/mainchain-follower/src/lib.rs
@@ -56,7 +56,7 @@ pub mod inherent_provider {
start_position: &CardanoPosition,
current_tip: McBlockHash,
tx_capacity: usize,
- utxo_overestimate: usize,
+ max_utxos: usize,
) -> Result>;
}
diff --git a/primitives/mainchain-follower/tests/cnight_equivalence.rs b/primitives/mainchain-follower/tests/cnight_equivalence.rs
index 704530292..31dbf4252 100644
--- a/primitives/mainchain-follower/tests/cnight_equivalence.rs
+++ b/primitives/mainchain-follower/tests/cnight_equivalence.rs
@@ -35,7 +35,6 @@
//! - `CNIGHT_EQUIV_FROM_BLOCK` first Cardano block_no (default: 0)
//! - `CNIGHT_EQUIV_TO_BLOCK` last Cardano block_no (default: max in db)
//! - `CNIGHT_EQUIV_TX_CAPACITY` whole-tx capacity per call (default: 200)
-//! - `CNIGHT_EQUIV_UTXO_OVERESTIMATE` per-query SQL over-fetch bound (default: 12800)
//! - `CNIGHT_EQUIV_MAX_COMPARISONS` cap on number of (start,tip) queries (default: 500)
//!
//! NOTE: the standard source clips its SQL window to `LIVE_PULL_BLOCK_DELTA`
@@ -47,7 +46,7 @@
use std::sync::Arc;
use midnight_primitives_cnight_observation::{
- CNightAddresses, CardanoPosition, TimestampUnixMillis,
+ CNightAddresses, CardanoPosition, TimestampUnixMillis, UTXO_PER_TX_OVERESTIMATE,
};
use midnight_primitives_mainchain_follower::data_source::{
BulkCacheConfig, BulkCachedCNightObservationDataSource, DEFAULT_WINDOW_SIZE,
@@ -114,9 +113,9 @@ async fn bulk_source_matches_standard_over_block_range() {
};
assert!(to_block > from_block, "empty block range [{from_block}, {to_block}]");
- // Consensus knobs — override to match the runtime config of the network.
+ // Consensus knob — override to match the runtime config of the network.
let tx_capacity: usize = env_parsed("CNIGHT_EQUIV_TX_CAPACITY").unwrap_or(200);
- let utxo_overestimate: usize = env_parsed("CNIGHT_EQUIV_UTXO_OVERESTIMATE").unwrap_or(12_800);
+ let max_utxos = tx_capacity * UTXO_PER_TX_OVERESTIMATE as usize;
let rows = sqlx::query(
"SELECT block_no::bigint AS block_no, hash FROM block \
@@ -151,9 +150,10 @@ async fn bulk_source_matches_standard_over_block_range() {
let window_start = whole_block_position(window_from, 0);
let window_end =
whole_block_position(window_to, u32::try_from(i32::MAX).expect("i32::MAX is non-negative"));
- let events = bulk_pull(&pool, &addresses, &window_start, &window_end, LARGE_LIMIT)
+ let (events, complete) = bulk_pull(&pool, &addresses, &window_start, &window_end, LARGE_LIMIT)
.await
.expect("bulk_pull window");
+ assert!(complete, "test window exceeded LARGE_LIMIT rows; widen LARGE_LIMIT or narrow the range");
eprintln!("cached window [{window_from}, {window_to}] holds {} events", events.len());
let db_fallback = Arc::new(MidnightCNightObservationDataSourceImpl::new(pool.clone(), None, 0));
@@ -195,23 +195,11 @@ async fn bulk_source_matches_standard_over_block_range() {
};
let tip = blocks[(i + tip_delta).min(blocks.len() - 1)].1.clone();
let standard_result = standard
- .get_utxos_up_to_capacity(
- &addresses,
- &start,
- tip.clone(),
- tx_capacity,
- utxo_overestimate,
- )
+ .get_utxos_up_to_capacity(&addresses, &start, tip.clone(), tx_capacity, max_utxos)
.await
.expect("standard source query");
let bulk_result = bulk
- .get_utxos_up_to_capacity(
- &addresses,
- &start,
- tip.clone(),
- tx_capacity,
- utxo_overestimate,
- )
+ .get_utxos_up_to_capacity(&addresses, &start, tip.clone(), tx_capacity, max_utxos)
.await
.expect("bulk source query");