Skip to content

Commit 98b0be1

Browse files
Add MPP ID to pending_outbound_htlcs
We'll use this to correlate MPP shards in upcoming commits
1 parent e3297a2 commit 98b0be1

File tree

1 file changed

+45
-16
lines changed

1 file changed

+45
-16
lines changed

lightning/src/ln/channelmanager.rs

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -493,7 +493,7 @@ pub struct ChannelManager<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref,
493493
/// after reloading from disk while replaying blocks against ChannelMonitors.
494494
///
495495
/// Locked *after* channel_state.
496-
pending_outbound_payments: Mutex<HashSet<[u8; 32]>>,
496+
pending_outbound_payments: Mutex<HashSet<([u8; 32], Option<MppId>)>>,
497497

498498
our_network_key: SecretKey,
499499
our_network_pubkey: PublicKey,
@@ -1854,7 +1854,7 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
18541854
let onion_packet = onion_utils::construct_onion_packet(onion_payloads, onion_keys, prng_seed, payment_hash);
18551855

18561856
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier);
1857-
assert!(self.pending_outbound_payments.lock().unwrap().insert(session_priv_bytes));
1857+
assert!(self.pending_outbound_payments.lock().unwrap().insert((session_priv_bytes, mpp_id)));
18581858

18591859
let err: Result<(), _> = loop {
18601860
let mut channel_lock = self.channel_state.lock().unwrap();
@@ -2835,11 +2835,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
28352835
self.fail_htlc_backwards_internal(channel_state,
28362836
htlc_src, &payment_hash, HTLCFailReason::Reason { failure_code, data: onion_failure_data});
28372837
},
2838-
HTLCSource::OutboundRoute { session_priv, .. } => {
2838+
HTLCSource::OutboundRoute { session_priv, mpp_id, .. } => {
28392839
if {
28402840
let mut session_priv_bytes = [0; 32];
28412841
session_priv_bytes.copy_from_slice(&session_priv[..]);
2842-
self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
2842+
self.pending_outbound_payments.lock().unwrap().remove(&(session_priv_bytes, mpp_id))
28432843
} {
28442844
self.pending_events.lock().unwrap().push(
28452845
events::Event::PaymentFailed {
@@ -2875,11 +2875,11 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
28752875
// from block_connected which may run during initialization prior to the chain_monitor
28762876
// being fully configured. See the docs for `ChannelManagerReadArgs` for more.
28772877
match source {
2878-
HTLCSource::OutboundRoute { ref path, session_priv, .. } => {
2878+
HTLCSource::OutboundRoute { ref path, session_priv, mpp_id, .. } => {
28792879
if {
28802880
let mut session_priv_bytes = [0; 32];
28812881
session_priv_bytes.copy_from_slice(&session_priv[..]);
2882-
!self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
2882+
!self.pending_outbound_payments.lock().unwrap().remove(&(session_priv_bytes, mpp_id))
28832883
} {
28842884
log_trace!(self.logger, "Received duplicative fail for HTLC with payment_hash {}", log_bytes!(payment_hash.0));
28852885
return;
@@ -3126,12 +3126,12 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> ChannelMana
31263126

31273127
fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
31283128
match source {
3129-
HTLCSource::OutboundRoute { session_priv, .. } => {
3129+
HTLCSource::OutboundRoute { session_priv, mpp_id, .. } => {
31303130
mem::drop(channel_state_lock);
31313131
if {
31323132
let mut session_priv_bytes = [0; 32];
31333133
session_priv_bytes.copy_from_slice(&session_priv[..]);
3134-
self.pending_outbound_payments.lock().unwrap().remove(&session_priv_bytes)
3134+
self.pending_outbound_payments.lock().unwrap().remove(&(session_priv_bytes, mpp_id))
31353135
} {
31363136
let mut pending_events = self.pending_events.lock().unwrap();
31373137
pending_events.push(events::Event::PaymentSent {
@@ -5068,11 +5068,15 @@ impl<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Writeable f
50685068

50695069
let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();
50705070
(pending_outbound_payments.len() as u64).write(writer)?;
5071-
for session_priv in pending_outbound_payments.iter() {
5071+
let mut pending_outbound_mpp_ids = Vec::new();
5072+
for (session_priv, mpp_id) in pending_outbound_payments.iter() {
50725073
session_priv.write(writer)?;
5074+
pending_outbound_mpp_ids.push(mpp_id);
50735075
}
50745076

5075-
write_tlv_fields!(writer, {});
5077+
write_tlv_fields!(writer, {
5078+
(0, pending_outbound_mpp_ids, vec_type),
5079+
});
50765080

50775081
Ok(())
50785082
}
@@ -5326,14 +5330,39 @@ impl<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
53265330
}
53275331

53285332
let pending_outbound_payments_count: u64 = Readable::read(reader)?;
5329-
let mut pending_outbound_payments: HashSet<[u8; 32]> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
5333+
let mut pending_outbound_payments: HashSet<([u8; 32], Option<MppId>)> = HashSet::with_capacity(cmp::min(pending_outbound_payments_count as usize, MAX_ALLOC_SIZE/32));
5334+
let mut pending_outbound_session_privs = Vec::new();
5335+
53305336
for _ in 0..pending_outbound_payments_count {
5331-
if !pending_outbound_payments.insert(Readable::read(reader)?) {
5332-
return Err(DecodeError::InvalidValue);
5333-
}
5337+
pending_outbound_session_privs.push(Readable::read(reader)?);
53345338
}
53355339

5336-
read_tlv_fields!(reader, {});
5340+
let mut pending_outbound_mpp_ids_opt = Some(Vec::new());
5341+
read_tlv_fields!(reader, {
5342+
(0, pending_outbound_mpp_ids_opt, vec_type),
5343+
});
5344+
5345+
let pending_outbound_mpp_ids = match pending_outbound_mpp_ids_opt {
5346+
Some(outbounds) => outbounds,
5347+
None => Vec::new()
5348+
};
5349+
5350+
if pending_outbound_mpp_ids.len() == pending_outbound_session_privs.len() {
5351+
for (session_priv, mpp_id) in pending_outbound_session_privs.iter().zip(
5352+
pending_outbound_mpp_ids.iter()) {
5353+
if !pending_outbound_payments.insert((*session_priv, *mpp_id)) {
5354+
return Err(DecodeError::InvalidValue)
5355+
}
5356+
}
5357+
} else if pending_outbound_mpp_ids.len() == 0 {
5358+
for session_priv in pending_outbound_session_privs.iter() {
5359+
if !pending_outbound_payments.insert((*session_priv, None)) {
5360+
return Err(DecodeError::InvalidValue);
5361+
}
5362+
}
5363+
} else {
5364+
return Err(DecodeError::InvalidValue);
5365+
}
53375366

53385367
let mut secp_ctx = Secp256k1::new();
53395368
secp_ctx.seeded_randomize(&args.keys_manager.get_secure_random_bytes());
@@ -5577,7 +5606,7 @@ mod tests {
55775606
expect_payment_failed!(nodes[0], our_payment_hash, true);
55785607

55795608
// Send the second half of the original MPP payment.
5580-
nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, payment_id, &None).unwrap();
5609+
nodes[0].node.send_payment_along_path(&route.paths[0], &our_payment_hash, &Some(payment_secret), 200_000, cur_height, mpp_id, &None).unwrap();
55815610
check_added_monitors!(nodes[0], 1);
55825611
let mut events = nodes[0].node.get_and_clear_pending_msg_events();
55835612
assert_eq!(events.len(), 1);

0 commit comments

Comments
 (0)