diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 9064be9ad34..e8b64281271 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -6024,13 +6024,13 @@ where 2*(1 + 71); // two signatures + sighash type flags if let Some(spk) = a_scriptpubkey { ret += ((8+1) + // output values and script length - spk.len() as u64) - * 4; // scriptpubkey and witness multiplier + spk.len() as u64) // scriptpubkey + * 4; // witness multiplier } if let Some(spk) = b_scriptpubkey { ret += ((8+1) + // output values and script length - spk.len() as u64) - * 4; // scriptpubkey and witness multiplier + spk.len() as u64) // scriptpubkey + * 4; // witness multiplier } ret } diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index bda4a41c97b..0379397e2fc 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -1933,9 +1933,11 @@ pub trait BaseMessageHandler { /// Handle a peer (re)connecting. /// - /// May return an `Err(())` if the features the peer supports are not sufficient to communicate - /// with us. Implementors should be somewhat conservative about doing so, however, as other - /// message handlers may still wish to communicate with this peer. + /// May return an `Err(())` to indicate that we should immediately disconnect from the peer + /// (e.g. because the features they support are not sufficient to communicate with us). + /// + /// Note, of course, that other message handlers may wish to communicate with the peer, which + /// disconnecting them will prevent. /// /// [`Self::peer_disconnected`] will not be called if `Err(())` is returned. fn peer_connected(&self, their_node_id: PublicKey, msg: &Init, inbound: bool) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 51195679145..d5643604041 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -98,9 +98,11 @@ pub trait CustomMessageHandler: wire::CustomMessageReader { /// Handle a peer connecting. /// - /// May return an `Err(())` if the features the peer supports are not sufficient to communicate - /// with us. Implementors should be somewhat conservative about doing so, however, as other - /// message handlers may still wish to communicate with this peer. + /// May return an `Err(())` to indicate that we should immediately disconnect from the peer + /// (e.g. because the features they support are not sufficient to communicate with us). + /// + /// Note, of course, that other message handlers may wish to communicate with the peer, which + /// disconnecting them will prevent. /// /// [`Self::peer_disconnected`] will not be called if `Err(())` is returned. fn peer_connected(&self, their_node_id: PublicKey, msg: &Init, inbound: bool) @@ -581,7 +583,7 @@ where /// [`IgnoringMessageHandler`]. pub custom_message_handler: CustomM, - /// A message handler which only allows sending messages. This should generally be a + /// A message handler which can be used to send messages. This should generally be a /// [`ChainMonitor`]. /// /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor @@ -1210,7 +1212,7 @@ struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>); impl core::fmt::Display for OptionalFromDebugger<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { if let Some((node_id, _)) = self.0 { - write!(f, " from {}", log_pubkey!(node_id)) + write!(f, " from {}", node_id) } else { Ok(()) } @@ -1373,6 +1375,7 @@ where | self.message_handler.route_handler.provided_init_features(their_node_id) | self.message_handler.onion_message_handler.provided_init_features(their_node_id) | self.message_handler.custom_message_handler.provided_init_features(their_node_id) + | self.message_handler.send_only_message_handler.provided_init_features(their_node_id) } /// Indicates a new outbound connection has been established to a node with the given `node_id` @@ -1676,13 +1679,15 @@ where /// Append a message to a peer's pending outbound/write buffer fn enqueue_message(&self, peer: &mut Peer, message: &M) { let their_node_id = peer.their_node_id.map(|p| p.0); - let logger = WithContext::from(&self.logger, their_node_id, None, None); - // `unwrap` SAFETY: `their_node_id` is guaranteed to be `Some` after the handshake - let node_id = peer.their_node_id.unwrap().0; - if is_gossip_msg(message.type_id()) { - log_gossip!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(node_id)); + if let Some(node_id) = their_node_id { + let logger = WithContext::from(&self.logger, their_node_id, None, None); + if is_gossip_msg(message.type_id()) { + log_gossip!(logger, "Enqueueing message {:?} to {}", message, node_id); + } else { + log_trace!(logger, "Enqueueing message {:?} to {}", message, node_id); + } } else { - log_trace!(logger, "Enqueueing message {:?} to {}", message, log_pubkey!(node_id)); + debug_assert!(false, "node_id should be set by the time we send a message"); } peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); @@ -1785,21 +1790,26 @@ where macro_rules! insert_node_id { () => { - let their_node_id = peer.their_node_id.map(|p| p.0); - let logger = WithContext::from(&self.logger, their_node_id, None, None); - match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { + let their_node_id = if let Some((node_id, _)) = peer.their_node_id { + node_id + } else { + debug_assert!(false, "Should have a node_id to insert"); + return Err(PeerHandleError {}); + }; + let logger = WithContext::from(&self.logger, Some(their_node_id), None, None); + match self.node_id_to_descriptor.lock().unwrap().entry(their_node_id) { hash_map::Entry::Occupied(e) => { - log_trace!(logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); + log_trace!(logger, "Got second connection with {}, closing", their_node_id); // Unset `their_node_id` so that we don't generate a peer_disconnected event + peer.their_node_id = None; // Check that the peers map is consistent with the // node_id_to_descriptor map, as this has been broken // before. - peer.their_node_id = None; debug_assert!(peers.get(e.get()).is_some()); return Err(PeerHandleError { }) }, hash_map::Entry::Vacant(entry) => { - log_debug!(logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); + log_debug!(logger, "Finished noise handshake for connection with {}", their_node_id); entry.insert(peer_descriptor.clone()) }, }; @@ -2038,7 +2048,7 @@ where logger, "Received commitment_signed batch {:?} from {}", batch, - log_pubkey!(their_node_id) + their_node_id, ); let chan_handler = &self.message_handler.chan_handler; chan_handler.handle_commitment_signed_batch(their_node_id, channel_id, batch); @@ -2093,7 +2103,7 @@ where log_debug!( logger, "Peer {} requires features unknown to us: {:?}", - log_pubkey!(their_node_id), + their_node_id, msg.features.required_unknown_bits_from(&our_features) ); return Err(PeerHandleError {}.into()); @@ -2103,7 +2113,7 @@ where log_debug!( logger, "We require features unknown to our peer {}: {:?}", - log_pubkey!(their_node_id), + their_node_id, our_features.required_unknown_bits_from(&msg.features) ); return Err(PeerHandleError {}.into()); @@ -2116,7 +2126,7 @@ where log_info!( logger, "Received peer Init message from {}: {}", - log_pubkey!(their_node_id), + their_node_id, msg.features ); @@ -2125,55 +2135,68 @@ where peer_lock.sync_status = InitSyncTracker::ChannelsSyncing(0); } - let connection = peer_lock.inbound_connection; + let inbound = peer_lock.inbound_connection; let route_handler = &self.message_handler.route_handler; - if let Err(()) = route_handler.peer_connected(their_node_id, &msg, connection) { + if let Err(()) = route_handler.peer_connected(their_node_id, &msg, inbound) { log_debug!( logger, "Route Handler decided we couldn't communicate with peer {}", - log_pubkey!(their_node_id) + their_node_id, ); return Err(PeerHandleError {}.into()); } let chan_handler = &self.message_handler.chan_handler; - if let Err(()) = chan_handler.peer_connected(their_node_id, &msg, connection) { + if let Err(()) = chan_handler.peer_connected(their_node_id, &msg, inbound) { log_debug!( logger, "Channel Handler decided we couldn't communicate with peer {}", - log_pubkey!(their_node_id) + their_node_id, ); self.message_handler.route_handler.peer_disconnected(their_node_id); return Err(PeerHandleError {}.into()); } let onion_message_handler = &self.message_handler.onion_message_handler; - if let Err(()) = onion_message_handler.peer_connected(their_node_id, &msg, connection) { + if let Err(()) = onion_message_handler.peer_connected(their_node_id, &msg, inbound) { log_debug!( logger, "Onion Message Handler decided we couldn't communicate with peer {}", - log_pubkey!(their_node_id) + their_node_id, ); self.message_handler.route_handler.peer_disconnected(their_node_id); self.message_handler.chan_handler.peer_disconnected(their_node_id); return Err(PeerHandleError {}.into()); } let custom_handler = &self.message_handler.custom_message_handler; - if let Err(()) = custom_handler.peer_connected(their_node_id, &msg, connection) { + if let Err(()) = custom_handler.peer_connected(their_node_id, &msg, inbound) { log_debug!( logger, "Custom Message Handler decided we couldn't communicate with peer {}", - log_pubkey!(their_node_id) + their_node_id, ); self.message_handler.route_handler.peer_disconnected(their_node_id); self.message_handler.chan_handler.peer_disconnected(their_node_id); self.message_handler.onion_message_handler.peer_disconnected(their_node_id); return Err(PeerHandleError {}.into()); } + let sends_handler = &self.message_handler.send_only_message_handler; + if let Err(()) = sends_handler.peer_connected(their_node_id, &msg, inbound) { + log_debug!( + logger, + "Sending-Only Message Handler decided we couldn't communicate with peer {}", + their_node_id, + ); + self.message_handler.route_handler.peer_disconnected(their_node_id); + self.message_handler.chan_handler.peer_disconnected(their_node_id); + self.message_handler.onion_message_handler.peer_disconnected(their_node_id); + self.message_handler.custom_message_handler.peer_disconnected(their_node_id); + return Err(PeerHandleError {}.into()); + } peer_lock.awaiting_pong_timer_tick_intervals = 0; peer_lock.their_features = Some(msg.features); return Ok(None); } else if peer_lock.their_features.is_none() { - log_debug!(logger, "Peer {} sent non-Init first message", log_pubkey!(their_node_id)); + log_debug!(logger, "Peer {} sent non-Init first message", their_node_id); return Err(PeerHandleError {}.into()); } @@ -2183,8 +2206,7 @@ where if peer_lock.message_batch.is_some() { let error = format!( "Peer {} sent start_batch for channel {} before previous batch completed", - log_pubkey!(their_node_id), - &msg.channel_id + their_node_id, &msg.channel_id ); log_debug!(logger, "{}", error); return Err(LightningError { @@ -2200,8 +2222,7 @@ where if batch_size <= 1 { let error = format!( "Peer {} sent start_batch for channel {} not strictly greater than 1", - log_pubkey!(their_node_id), - &msg.channel_id + their_node_id, &msg.channel_id ); log_debug!(logger, "{}", error); return Err(LightningError { @@ -2218,8 +2239,7 @@ where if batch_size > BATCH_SIZE_LIMIT { let error = format!( "Peer {} sent start_batch for channel {} exceeding the limit", - log_pubkey!(their_node_id), - &msg.channel_id + their_node_id, &msg.channel_id ); log_debug!(logger, "{}", error); return Err(LightningError { @@ -2240,7 +2260,7 @@ where log_debug!( logger, "Peer {} sent start_batch for channel {} without a known message type; ignoring", - log_pubkey!(their_node_id), + their_node_id, &msg.channel_id, ); return Ok(None); @@ -2259,7 +2279,7 @@ where &mut message_batch.messages; if msg.channel_id != message_batch.channel_id { - let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", log_pubkey!(their_node_id), message_batch.channel_id, &msg.channel_id); + let error = format!("Peer {} sent batched commitment_signed for the wrong channel (expected: {}, actual: {})", their_node_id, message_batch.channel_id, &msg.channel_id); log_debug!(logger, "{}", error); return Err(LightningError { err: error.clone(), @@ -2290,7 +2310,7 @@ where log_debug!( logger, "Peer {} sent an unexpected message for a commitment_signed batch", - log_pubkey!(their_node_id) + their_node_id, ); }, } @@ -2351,19 +2371,9 @@ where MessageHandlingError, > { if is_gossip_msg(message.type_id()) { - log_gossip!( - logger, - "Received message {:?} from {}", - message, - log_pubkey!(their_node_id) - ); + log_gossip!(logger, "Received message {:?} from {}", message, their_node_id); } else { - log_trace!( - logger, - "Received message {:?} from {}", - message, - log_pubkey!(their_node_id) - ); + log_trace!(logger, "Received message {:?} from {}", message, their_node_id); } let mut should_forward = None; @@ -2380,7 +2390,7 @@ where log_debug!( logger, "Got Err message from {}: {}", - log_pubkey!(their_node_id), + their_node_id, PrintableString(&msg.data) ); self.message_handler.chan_handler.handle_error(their_node_id, &msg); @@ -2392,7 +2402,7 @@ where log_debug!( logger, "Got warning message from {}: {}", - log_pubkey!(their_node_id), + their_node_id, PrintableString(&msg.data) ); }, @@ -2832,47 +2842,47 @@ where match event { MessageSendEvent::SendPeerStorage { ref node_id, ref msg } => { log_debug!( - self.logger, + WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendPeerStorage event in peer_handler for {}", - log_pubkey!(node_id) + node_id, ); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendPeerStorageRetrieval { ref node_id, ref msg } => { log_debug!( - self.logger, + WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendPeerStorageRetrieval event in peer_handler for {}", - log_pubkey!(node_id) + node_id, ); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.common_fields.temporary_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", - log_pubkey!(node_id), + node_id, &msg.temporary_channel_id, ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index)); // TODO: If the peer is gone we should generate a DiscardFunding event @@ -2881,13 +2891,13 @@ where }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, @@ -2899,7 +2909,7 @@ where None, ); log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, @@ -2911,7 +2921,7 @@ where None, ); log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, @@ -2923,7 +2933,7 @@ where None, ); log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, @@ -2935,67 +2945,67 @@ where None, ); log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, @@ -3013,7 +3023,7 @@ where }, } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(*channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails, {} commits for channel {}", - log_pubkey!(node_id), + node_id, update_add_htlcs.len(), update_fulfill_htlcs.len(), update_fail_htlcs.len(), @@ -3049,37 +3059,37 @@ where }, MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendClosingComplete { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingComplete event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendClosingSig { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSig event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), + node_id, &msg.channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, @@ -3089,7 +3099,7 @@ where ref update_msg, } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", - log_pubkey!(node_id), + node_id, msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); self.enqueue_message( @@ -3174,7 +3184,7 @@ where }, MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => { log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", - log_pubkey!(node_id), msg.contents.short_channel_id); + node_id, msg.contents.short_channel_id); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::HandleError { node_id, action } => { @@ -3183,10 +3193,10 @@ where msgs::ErrorAction::DisconnectPeer { msg } => { if let Some(msg) = msg.as_ref() { log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), msg.data); + node_id, msg.data); } else { log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {}", - log_pubkey!(node_id)); + node_id); } // We do not have the peers write lock, so we just store that we're // about to disconnect the peer and do it after we finish @@ -3198,7 +3208,7 @@ where }, msgs::ErrorAction::DisconnectPeerWithWarning { msg } => { log_trace!(logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), msg.data); + node_id, msg.data); // We do not have the peers write lock, so we just store that we're // about to disconnect the peer and do it after we finish // processing most messages. @@ -3210,7 +3220,7 @@ where logger, level, "Received a HandleError event to be ignored for node {}", - log_pubkey!(node_id) + node_id, ); }, msgs::ErrorAction::IgnoreDuplicateGossip => {}, @@ -3218,12 +3228,12 @@ where log_debug!( logger, "Received a HandleError event to be ignored for node {}", - log_pubkey!(node_id) + node_id, ); }, msgs::ErrorAction::SendErrorMessage { ref msg } => { log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), + node_id, msg.data); self.enqueue_message( &mut *get_peer_for_forwarding!(&node_id)?, @@ -3235,7 +3245,7 @@ where ref log_level, } => { log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), + node_id, msg.data); self.enqueue_message( &mut *get_peer_for_forwarding!(&node_id)?, @@ -3246,20 +3256,20 @@ where }, MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelRangeQuery event in peer_handler for node {} with first_blocknum={}, number_of_blocks={}", - log_pubkey!(node_id), + node_id, msg.first_blocknum, msg.number_of_blocks); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendShortIdsQuery event in peer_handler for node {} with num_scids={}", - log_pubkey!(node_id), + node_id, msg.short_channel_ids.len()); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", - log_pubkey!(node_id), + node_id, msg.short_channel_ids.len(), msg.first_blocknum, msg.number_of_blocks, @@ -3268,7 +3278,7 @@ where }, MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => { log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendGossipTimestampFilter event in peer_handler for node {} with first_timestamp={}, timestamp_range={}", - log_pubkey!(node_id), + node_id, msg.first_timestamp, msg.timestamp_range); self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); @@ -3387,6 +3397,7 @@ where self.message_handler.chan_handler.peer_disconnected(node_id); self.message_handler.onion_message_handler.peer_disconnected(node_id); self.message_handler.custom_message_handler.peer_disconnected(node_id); + self.message_handler.send_only_message_handler.peer_disconnected(node_id); } descriptor.disconnect_socket(); } @@ -3407,7 +3418,7 @@ where log_trace!( logger, "Handling disconnection of peer {} because {}", - log_pubkey!(node_id), + node_id, reason ); let removed = self.node_id_to_descriptor.lock().unwrap().remove(&node_id); @@ -3419,6 +3430,7 @@ where self.message_handler.chan_handler.peer_disconnected(node_id); self.message_handler.onion_message_handler.peer_disconnected(node_id); self.message_handler.custom_message_handler.peer_disconnected(node_id); + self.message_handler.send_only_message_handler.peer_disconnected(node_id); } }, }; @@ -3510,8 +3522,8 @@ where debug_assert!(peer.channel_encryptor.is_ready_for_encryption()); debug_assert!(peer.their_node_id.is_some()); + // We use a loop as a `goto` to skip writing the Ping message: loop { - // Used as a `goto` to skip writing a Ping message. if peer.awaiting_pong_timer_tick_intervals == -1 { // Magic value set in `maybe_send_extra_ping`. peer.awaiting_pong_timer_tick_intervals = 1; @@ -3604,7 +3616,8 @@ where let features = self.message_handler.chan_handler.provided_node_features() | self.message_handler.route_handler.provided_node_features() | self.message_handler.onion_message_handler.provided_node_features() - | self.message_handler.custom_message_handler.provided_node_features(); + | self.message_handler.custom_message_handler.provided_node_features() + | self.message_handler.send_only_message_handler.provided_node_features(); let announcement = msgs::UnsignedNodeAnnouncement { features, timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel), @@ -3728,6 +3741,7 @@ mod tests { chan_handler: test_utils::TestChannelMessageHandler, routing_handler: test_utils::TestRoutingMessageHandler, custom_handler: TestCustomMessageHandler, + send_only_handler: TestBaseMsgHandler, logger: test_utils::TestLogger, node_signer: test_utils::TestNodeSigner, } @@ -3780,6 +3794,32 @@ mod tests { } } + struct TestBaseMsgHandler(test_utils::ConnectionTracker); + + impl BaseMessageHandler for TestBaseMsgHandler { + fn get_and_clear_pending_msg_events(&self) -> Vec { + Vec::new() + } + + fn peer_disconnected(&self, their_node_id: PublicKey) { + self.0.peer_disconnected(their_node_id); + } + + fn peer_connected( + &self, their_node_id: PublicKey, _msg: &Init, _inbound: bool, + ) -> Result<(), ()> { + self.0.peer_connected(their_node_id) + } + + fn provided_node_features(&self) -> NodeFeatures { + NodeFeatures::empty() + } + + fn provided_init_features(&self, _: PublicKey) -> InitFeatures { + InitFeatures::empty() + } + } + fn create_peermgr_cfgs(peer_count: usize) -> Vec { let mut cfgs = Vec::new(); for i in 0..peer_count { @@ -3796,6 +3836,7 @@ mod tests { logger: test_utils::TestLogger::with_id(i.to_string()), routing_handler: test_utils::TestRoutingMessageHandler::new(), custom_handler: TestCustomMessageHandler::new(features), + send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()), node_signer: test_utils::TestNodeSigner::new(node_secret), }); } @@ -3819,6 +3860,7 @@ mod tests { logger: test_utils::TestLogger::new(), routing_handler: test_utils::TestRoutingMessageHandler::new(), custom_handler: TestCustomMessageHandler::new(features), + send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()), node_signer: test_utils::TestNodeSigner::new(node_secret), }); } @@ -3837,6 +3879,7 @@ mod tests { logger: test_utils::TestLogger::new(), routing_handler: test_utils::TestRoutingMessageHandler::new(), custom_handler: TestCustomMessageHandler::new(features), + send_only_handler: TestBaseMsgHandler(test_utils::ConnectionTracker::new()), node_signer: test_utils::TestNodeSigner::new(node_secret), }); } @@ -3844,20 +3887,7 @@ mod tests { cfgs } - fn create_network<'a>( - peer_count: usize, cfgs: &'a Vec, - ) -> Vec< - PeerManager< - FileDescriptor, - &'a test_utils::TestChannelMessageHandler, - &'a test_utils::TestRoutingMessageHandler, - IgnoringMessageHandler, - &'a test_utils::TestLogger, - &'a TestCustomMessageHandler, - &'a test_utils::TestNodeSigner, - IgnoringMessageHandler, - >, - > { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); for i in 0..peer_count { let ephemeral_bytes = [i as u8; 32]; @@ -3866,7 +3896,7 @@ mod tests { route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {}, custom_message_handler: &cfgs[i].custom_handler, - send_only_message_handler: IgnoringMessageHandler {}, + send_only_message_handler: &cfgs[i].send_only_handler, }; let peer = PeerManager::new( msg_handler, @@ -3889,7 +3919,7 @@ mod tests { &'a test_utils::TestLogger, &'a TestCustomMessageHandler, &'a test_utils::TestNodeSigner, - IgnoringMessageHandler, + &'a TestBaseMsgHandler, >; fn try_establish_connection<'a>( @@ -4265,6 +4295,7 @@ mod tests { let chan_handler = peers[handler & 1].message_handler.chan_handler; let route_handler = peers[handler & 1].message_handler.route_handler; let custom_message_handler = peers[handler & 1].message_handler.custom_message_handler; + let send_only_msg_handler = peers[handler & 1].message_handler.send_only_message_handler; match handler & !1 { 0 => { @@ -4276,6 +4307,9 @@ mod tests { 4 => { custom_message_handler.conn_tracker.fail_connections.store(true, Ordering::Release); }, + 6 => { + send_only_msg_handler.0.fail_connections.store(true, Ordering::Release); + }, _ => panic!(), } let (_sd1, _sd2, a_refused, b_refused) = try_establish_connection(&peers[0], &peers[1]); @@ -4291,16 +4325,18 @@ mod tests { chan_handler.conn_tracker.had_peers.load(Ordering::Acquire) || route_handler.conn_tracker.had_peers.load(Ordering::Acquire) || custom_message_handler.conn_tracker.had_peers.load(Ordering::Acquire) + || send_only_msg_handler.0.had_peers.load(Ordering::Acquire) ); // And both message handlers doing tracking should see the disconnection assert!(chan_handler.conn_tracker.connected_peers.lock().unwrap().is_empty()); assert!(route_handler.conn_tracker.connected_peers.lock().unwrap().is_empty()); assert!(custom_message_handler.conn_tracker.connected_peers.lock().unwrap().is_empty()); + assert!(send_only_msg_handler.0.connected_peers.lock().unwrap().is_empty()); } #[test] fn test_peer_connected_error_disconnects() { - for i in 0..6 { + for i in 0..8 { do_test_peer_connected_error_disconnects(i); } }