Skip to content

Commit 5bd0ea3

Browse files
joostjagerclaude
andcommitted
Add DeferredChainMonitor wrapper for batched monitor persistence
Introduce DeferredChainMonitor, a wrapper around ChainMonitor that queues watch_channel and update_channel operations, returning InProgress until flush() is called. This enables batched persistence of monitor updates after ChannelManager persistence, ensuring correct ordering. The wrapper implements all public traits that ChainMonitor supports (Listen, Confirm, EventsProvider, etc.) as pass-throughs, allowing it to be used as a drop-in replacement. Includes comprehensive tests covering the full channel lifecycle with payment flows using DeferredChainMonitor. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 1f5cef4 commit 5bd0ea3

File tree

4 files changed

+1147
-291
lines changed

4 files changed

+1147
-291
lines changed

lightning-background-processor/src/lib.rs

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ use fwd_batch::BatchDelay;
3232

3333
use lightning::chain;
3434
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
35-
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
35+
use lightning::chain::chainmonitor::Persist;
36+
use lightning::chain::deferred::DeferredChainMonitor;
3637
#[cfg(feature = "std")]
3738
use lightning::events::EventHandler;
3839
#[cfg(feature = "std")]
@@ -853,7 +854,8 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
853854
/// # fn send_data(&mut self, _data: &[u8], _continue_read: bool) -> usize { 0 }
854855
/// # fn disconnect_socket(&mut self) {}
855856
/// # }
856-
/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857+
/// # type InnerChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
858+
/// # type ChainMonitor<B, F, FE> = lightning::chain::deferred::DeferredChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<StoreSync>, Arc<lightning::sign::KeysManager>>;
857859
/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
858860
/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
859861
/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
@@ -964,7 +966,9 @@ pub async fn process_events_async<
964966
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
965967
EventHandler: Fn(Event) -> EventHandlerFuture,
966968
ES: Deref,
967-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
969+
M: Deref<
970+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
971+
>,
968972
CM: Deref,
969973
OM: Deref,
970974
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1155,7 +1159,7 @@ where
11551159
// Capture the number of pending monitor writes before persisting the channel manager.
11561160
// We'll only flush this many writes after the manager is persisted, to avoid flushing
11571161
// monitor updates that arrived after the manager state was captured.
1158-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1162+
let pending_monitor_writes = chain_monitor.pending_operation_count();
11591163

11601164
if channel_manager.get_cm().get_and_clear_needs_persistence() {
11611165
log_trace!(logger, "Persisting ChannelManager...");
@@ -1427,7 +1431,7 @@ where
14271431
.await?;
14281432

14291433
// Flush all pending monitor writes after final channel manager persistence.
1430-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1434+
let pending_monitor_writes = chain_monitor.pending_operation_count();
14311435
if pending_monitor_writes > 0 {
14321436
log_trace!(logger, "Flushing {} monitor writes on shutdown", pending_monitor_writes);
14331437
chain_monitor.flush(pending_monitor_writes);
@@ -1485,7 +1489,9 @@ pub async fn process_events_async_with_kv_store_sync<
14851489
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
14861490
EventHandler: Fn(Event) -> EventHandlerFuture,
14871491
ES: Deref,
1488-
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
1492+
M: Deref<
1493+
Target = DeferredChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1494+
>,
14891495
CM: Deref,
14901496
OM: Deref,
14911497
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
@@ -1600,7 +1606,15 @@ impl BackgroundProcessor {
16001606
ES: 'static + Deref + Send,
16011607
M: 'static
16021608
+ Deref<
1603-
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
1609+
Target = DeferredChainMonitor<
1610+
<CM::Target as AChannelManager>::Signer,
1611+
CF,
1612+
T,
1613+
F,
1614+
L,
1615+
P,
1616+
ES,
1617+
>,
16041618
>
16051619
+ Send
16061620
+ Sync,
@@ -1744,7 +1758,7 @@ impl BackgroundProcessor {
17441758
}
17451759

17461760
// Capture the number of pending monitor writes before persisting the channel manager.
1747-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1761+
let pending_monitor_writes = chain_monitor.pending_operation_count();
17481762

17491763
if channel_manager.get_cm().get_and_clear_needs_persistence() {
17501764
log_trace!(logger, "Persisting ChannelManager...");
@@ -1885,7 +1899,7 @@ impl BackgroundProcessor {
18851899
)?;
18861900

18871901
// Flush all pending monitor writes after final channel manager persistence.
1888-
let pending_monitor_writes = chain_monitor.pending_monitor_operation_count();
1902+
let pending_monitor_writes = chain_monitor.pending_operation_count();
18891903
if pending_monitor_writes > 0 {
18901904
log_trace!(
18911905
logger,
@@ -1978,7 +1992,7 @@ mod tests {
19781992
use core::sync::atomic::{AtomicBool, Ordering};
19791993
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
19801994
use lightning::chain::transaction::OutPoint;
1981-
use lightning::chain::{chainmonitor, BestBlock, Confirm, Filter};
1995+
use lightning::chain::{deferred, BestBlock, Confirm, Filter};
19821996
use lightning::events::{Event, PathFailure, ReplayEvent};
19831997
use lightning::ln::channelmanager;
19841998
use lightning::ln::channelmanager::{
@@ -2068,7 +2082,7 @@ mod tests {
20682082
Arc<test_utils::TestLogger>,
20692083
>;
20702084

2071-
type ChainMonitor = chainmonitor::ChainMonitor<
2085+
type ChainMonitor = deferred::DeferredChainMonitor<
20722086
InMemorySigner,
20732087
Arc<test_utils::TestChainSource>,
20742088
Arc<test_utils::TestBroadcaster>,
@@ -2496,7 +2510,7 @@ mod tests {
24962510
let now = Duration::from_secs(genesis_block.header.time as u64);
24972511
let keys_manager =
24982512
Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos(), true));
2499-
let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(
2513+
let chain_monitor = Arc::new(deferred::DeferredChainMonitor::new(
25002514
Some(Arc::clone(&chain_source)),
25012515
Arc::clone(&tx_broadcaster),
25022516
Arc::clone(&logger),
@@ -2640,19 +2654,25 @@ mod tests {
26402654
tx.clone(),
26412655
)
26422656
.unwrap();
2657+
// Flush deferred monitor operations so messages aren't held back
2658+
$node_a.chain_monitor.flush_all();
26432659
let msg_a = get_event_msg!(
26442660
$node_a,
26452661
MessageSendEvent::SendFundingCreated,
26462662
$node_b.node.get_our_node_id()
26472663
);
26482664
$node_b.node.handle_funding_created($node_a.node.get_our_node_id(), &msg_a);
2665+
// Flush node_b's monitor so it releases the FundingSigned message
2666+
$node_b.chain_monitor.flush_all();
26492667
get_event!($node_b, Event::ChannelPending);
26502668
let msg_b = get_event_msg!(
26512669
$node_b,
26522670
MessageSendEvent::SendFundingSigned,
26532671
$node_a.node.get_our_node_id()
26542672
);
26552673
$node_a.node.handle_funding_signed($node_b.node.get_our_node_id(), &msg_b);
2674+
// Flush node_a's monitor for the final update
2675+
$node_a.chain_monitor.flush_all();
26562676
get_event!($node_a, Event::ChannelPending);
26572677
tx
26582678
}};
@@ -3099,11 +3119,17 @@ mod tests {
30993119
.node
31003120
.funding_transaction_generated(temporary_channel_id, node_1_id, funding_tx.clone())
31013121
.unwrap();
3122+
// Flush node_0's deferred monitor operations so the FundingCreated message is released
3123+
nodes[0].chain_monitor.flush_all();
31023124
let msg_0 = get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, node_1_id);
31033125
nodes[1].node.handle_funding_created(node_0_id, &msg_0);
3126+
// Flush node_1's deferred monitor operations so events and FundingSigned are released
3127+
nodes[1].chain_monitor.flush_all();
31043128
get_event!(nodes[1], Event::ChannelPending);
31053129
let msg_1 = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, node_0_id);
31063130
nodes[0].node.handle_funding_signed(node_1_id, &msg_1);
3131+
// Flush node_0's monitor for the funding_signed update
3132+
nodes[0].chain_monitor.flush_all();
31073133
channel_pending_recv
31083134
.recv_timeout(EVENT_DEADLINE)
31093135
.expect("ChannelPending not handled within deadline");
@@ -3164,6 +3190,8 @@ mod tests {
31643190
error_message.to_string(),
31653191
)
31663192
.unwrap();
3193+
// Flush the monitor update triggered by force close so the commitment tx is broadcasted
3194+
nodes[0].chain_monitor.flush_all();
31673195
let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
31683196
confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
31693197

0 commit comments

Comments
 (0)