Skip to content

Commit 3010619

Browse files
committed
Persist sweeper state as part of background process
To prepare for an async kv store trait that must be awaited, this commit moves the kv store calls from the chain notification handlers to the background process. It uses a dirty flag to communicate that there is something to persist. The block height is part of the persisted data. If that data does not make it to disk, the chain notifications are replayed after restart.
1 parent afe4285 commit 3010619

File tree

1 file changed

+37
-23
lines changed

1 file changed

+37
-23
lines changed

lightning/src/util/sweep.rs

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ where
382382
output_spender: O, change_destination_source: D, kv_store: K, logger: L,
383383
) -> Self {
384384
let outputs = Vec::new();
385-
let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
385+
let sweeper_state = Mutex::new(SweeperState { outputs, best_block, dirty: false });
386386
Self {
387387
sweeper_state,
388388
pending_sweep: AtomicBool::new(false),
@@ -446,7 +446,10 @@ where
446446
}
447447
self.persist_state(&*state_lock).map_err(|e| {
448448
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
449-
})
449+
})?;
450+
state_lock.dirty = false;
451+
452+
Ok(())
450453
}
451454

452455
/// Returns a list of the currently tracked spendable outputs.
@@ -503,11 +506,19 @@ where
503506

504507
// See if there is anything to sweep before requesting a change address.
505508
{
506-
let sweeper_state = self.sweeper_state.lock().unwrap();
509+
let mut sweeper_state = self.sweeper_state.lock().unwrap();
507510

508511
let cur_height = sweeper_state.best_block.height;
509512
let has_respends = sweeper_state.outputs.iter().any(|o| filter_fn(o, cur_height));
510513
if !has_respends {
514+
// If there is nothing to sweep, we still persist the state if it is dirty.
515+
if sweeper_state.dirty {
516+
self.persist_state(&sweeper_state).map_err(|e| {
517+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
518+
})?;
519+
sweeper_state.dirty = false;
520+
}
521+
511522
return Ok(());
512523
}
513524
}
@@ -531,7 +542,15 @@ where
531542
.collect();
532543

533544
if respend_descriptors.is_empty() {
534-
// It could be that a tx confirmed and there is now nothing to sweep anymore.
545+
// It could be that a tx confirmed and there is now nothing to sweep anymore. We still persist the state
546+
// if it is dirty.
547+
if sweeper_state.dirty {
548+
self.persist_state(&sweeper_state).map_err(|e| {
549+
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
550+
})?;
551+
sweeper_state.dirty = false;
552+
}
553+
535554
return Ok(());
536555
}
537556

@@ -563,6 +582,7 @@ where
563582
self.persist_state(&sweeper_state).map_err(|e| {
564583
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
565584
})?;
585+
sweeper_state.dirty = false;
566586

567587
self.broadcaster.broadcast_transactions(&[&spending_tx]);
568588
}
@@ -588,6 +608,8 @@ where
588608
}
589609
true
590610
});
611+
612+
sweeper_state.dirty = true;
591613
}
592614

593615
fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
@@ -641,13 +663,17 @@ where
641663
}
642664
}
643665
}
666+
667+
sweeper_state.dirty = true;
644668
}
645669

646670
fn best_block_updated_internal(
647671
&self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
648672
) {
649673
sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
650674
self.prune_confirmed_outputs(sweeper_state);
675+
676+
sweeper_state.dirty = true;
651677
}
652678
}
653679

@@ -671,12 +697,8 @@ where
671697
assert_eq!(state_lock.best_block.height, height - 1,
672698
"Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
673699

674-
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
675-
self.best_block_updated_internal(&mut *state_lock, header, height);
676-
677-
let _ = self.persist_state(&*state_lock).map_err(|e| {
678-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
679-
});
700+
self.transactions_confirmed_internal(&mut state_lock, header, txdata, height);
701+
self.best_block_updated_internal(&mut state_lock, header, height);
680702
}
681703

682704
fn block_disconnected(&self, header: &Header, height: u32) {
@@ -698,9 +720,7 @@ where
698720
}
699721
}
700722

701-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
702-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
703-
});
723+
state_lock.dirty = true;
704724
}
705725
}
706726

@@ -720,9 +740,6 @@ where
720740
) {
721741
let mut state_lock = self.sweeper_state.lock().unwrap();
722742
self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
723-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
724-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
725-
});
726743
}
727744

728745
fn transaction_unconfirmed(&self, txid: &Txid) {
@@ -743,18 +760,13 @@ where
743760
.filter(|o| o.status.confirmation_height() >= Some(unconf_height))
744761
.for_each(|o| o.status.unconfirmed());
745762

746-
self.persist_state(&*state_lock).unwrap_or_else(|e| {
747-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
748-
});
763+
state_lock.dirty = true;
749764
}
750765
}
751766

752767
fn best_block_updated(&self, header: &Header, height: u32) {
753768
let mut state_lock = self.sweeper_state.lock().unwrap();
754-
self.best_block_updated_internal(&mut *state_lock, header, height);
755-
let _ = self.persist_state(&*state_lock).map_err(|e| {
756-
log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
757-
});
769+
self.best_block_updated_internal(&mut state_lock, header, height);
758770
}
759771

760772
fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
@@ -783,11 +795,13 @@ where
783795
struct SweeperState {
784796
outputs: Vec<TrackedSpendableOutput>,
785797
best_block: BestBlock,
798+
dirty: bool,
786799
}
787800

788801
impl_writeable_tlv_based!(SweeperState, {
789802
(0, outputs, required_vec),
790803
(2, best_block, required),
804+
(_unused, dirty, (static_value, false)),
791805
});
792806

793807
/// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a

0 commit comments

Comments
 (0)