From 3010619d3dc7bf08cf024f376391ce37bef2c38d Mon Sep 17 00:00:00 2001 From: Joost Jager Date: Mon, 9 Jun 2025 14:54:25 +0200 Subject: [PATCH] Persist sweeper state as part of background process To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart. --- lightning/src/util/sweep.rs | 60 +++++++++++++++++++++++-------------- 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 0fae91bebc2..182d70c11e4 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -382,7 +382,7 @@ where output_spender: O, change_destination_source: D, kv_store: K, logger: L, ) -> Self { let outputs = Vec::new(); - let sweeper_state = Mutex::new(SweeperState { outputs, best_block }); + let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false }); Self { sweeper_state, pending_sweep: AtomicBool::new(false), @@ -446,7 +446,10 @@ where } self.persist_state(&*state_lock).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }) + })?; + state_lock.dirty = false; + + Ok(()) } /// Returns a list of the currently tracked spendable outputs. @@ -503,11 +506,19 @@ where // See if there is anything to sweep before requesting a change address. { - let sweeper_state = self.sweeper_state.lock().unwrap(); + let mut sweeper_state = self.sweeper_state.lock().unwrap(); let cur_height = sweeper_state.best_block.height; let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height)); if !has_respends { + // If there is nothing to sweep, we still persist the state if it is dirty. + if sweeper_state.dirty { + self.persist_state(&sweeper_state).map_err(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + })?; + sweeper_state.dirty = false; + } + return Ok(()); } } @@ -531,7 +542,15 @@ where .collect(); if respend_descriptors.is_empty() { - // It could be that a tx confirmed and there is now nothing to sweep anymore. + // It could be that a tx confirmed and there is now nothing to sweep anymore. We still persist the state + // if it is dirty. + if sweeper_state.dirty { + self.persist_state(&sweeper_state).map_err(|e| { + log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); + })?; + sweeper_state.dirty = false; + } + return Ok(()); } @@ -563,6 +582,7 @@ where self.persist_state(&sweeper_state).map_err(|e| { log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); })?; + sweeper_state.dirty = false; self.broadcaster.broadcast_transactions(&[&spending_tx]); } @@ -588,6 +608,8 @@ where } true }); + + sweeper_state.dirty = true; } fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> { @@ -641,6 +663,8 @@ where } } } + + sweeper_state.dirty = true; } fn best_block_updated_internal( @@ -648,6 +672,8 @@ where ) { sweeper_state.best_block = BestBlock::new(header.block_hash(), height); self.prune_confirmed_outputs(sweeper_state); + + sweeper_state.dirty = true; } } @@ -671,12 +697,8 @@ where assert_eq!(state_lock.best_block.height, height - 1, "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height"); - self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - self.best_block_updated_internal(&mut *state_lock, header, height); - - let _ = self.persist_state(&*state_lock).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + self.transactions_confirmed_internal(&mut state_lock, header, txdata, height); + self.best_block_updated_internal(&mut state_lock, header, height); } fn block_disconnected(&self, header: &Header, height: u32) { @@ -698,9 +720,7 @@ where } } - self.persist_state(&*state_lock).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + state_lock.dirty = true; } } @@ -720,9 +740,6 @@ where ) { let mut state_lock = self.sweeper_state.lock().unwrap(); self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height); - self.persist_state(&*state_lock).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); } fn transaction_unconfirmed(&self, txid: &Txid) { @@ -743,18 +760,13 @@ where .filter(|o| o.status.confirmation_height() >= Some(unconf_height)) .for_each(|o| o.status.unconfirmed()); - self.persist_state(&*state_lock).unwrap_or_else(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + state_lock.dirty = true; } } fn best_block_updated(&self, header: &Header, height: u32) { let mut state_lock = self.sweeper_state.lock().unwrap(); - self.best_block_updated_internal(&mut *state_lock, header, height); - let _ = self.persist_state(&*state_lock).map_err(|e| { - log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e); - }); + self.best_block_updated_internal(&mut state_lock, header, height); } fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option)> { @@ -783,11 +795,13 @@ where struct SweeperState { outputs: Vec, best_block: BestBlock, + dirty: bool, } impl_writeable_tlv_based!(SweeperState, { (0, outputs, required_vec), (2, best_block, required), + (_unused, dirty, (static_value, false)), }); /// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a