-
Notifications
You must be signed in to change notification settings - Fork 415
Direct connect for OnionMessage
sending
#2723
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
79f212b
8412e83
ddee928
17af8d5
1114c3c
b86f02a
ba2a822
06b05df
cfaa7f3
ae98517
36ecc8e
ce68f22
6ca81ff
e25af3e
210407e
89e630b
d46519b
0b83116
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure}; | |
#[cfg(feature = "std")] | ||
use lightning::events::{EventHandler, EventsProvider}; | ||
use lightning::ln::channelmanager::ChannelManager; | ||
use lightning::ln::msgs::OnionMessageHandler; | ||
use lightning::ln::peer_handler::APeerManager; | ||
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; | ||
use lightning::routing::utxo::UtxoLookup; | ||
|
@@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30; | |
#[cfg(test)] | ||
const PING_TIMER: u64 = 1; | ||
|
||
#[cfg(not(test))] | ||
const ONION_MESSAGE_HANDLER_TIMER: u64 = 10; | ||
#[cfg(test)] | ||
const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; | ||
|
||
/// Prune the network graph of stale entries hourly. | ||
const NETWORK_PRUNE_TIMER: u64 = 60 * 60; | ||
|
||
|
@@ -270,18 +276,20 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri | |
} | ||
|
||
macro_rules! define_run_body { | ||
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, | ||
$channel_manager: ident, $process_channel_manager_events: expr, | ||
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, | ||
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, | ||
$check_slow_await: expr) | ||
=> { { | ||
( | ||
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, | ||
$channel_manager: ident, $process_channel_manager_events: expr, | ||
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, | ||
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, | ||
$timer_elapsed: expr, $check_slow_await: expr | ||
) => { { | ||
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); | ||
$channel_manager.timer_tick_occurred(); | ||
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); | ||
$chain_monitor.rebroadcast_pending_claims(); | ||
|
||
let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); | ||
let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); | ||
let mut last_ping_call = $get_timer(PING_TIMER); | ||
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); | ||
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); | ||
|
@@ -291,6 +299,7 @@ macro_rules! define_run_body { | |
loop { | ||
$process_channel_manager_events; | ||
$process_chain_monitor_events; | ||
$process_onion_message_handler_events; | ||
|
||
// Note that the PeerManager::process_events may block on ChannelManager's locks, | ||
// hence it comes last here. When the ChannelManager finishes whatever it's doing, | ||
|
@@ -334,6 +343,11 @@ macro_rules! define_run_body { | |
$channel_manager.timer_tick_occurred(); | ||
last_freshness_call = $get_timer(FRESHNESS_TIMER); | ||
} | ||
if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { | ||
log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); | ||
$peer_manager.onion_message_handler().timer_tick_occurred(); | ||
last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); | ||
} | ||
if await_slow { | ||
// On various platforms, we may be starved of CPU cycles for several reasons. | ||
// E.g. on iOS, if we've been in the background, we will be entirely paused. | ||
|
@@ -603,8 +617,7 @@ pub async fn process_events_async< | |
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync, | ||
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync, | ||
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send, | ||
APM: APeerManager + Send + Sync, | ||
PM: 'static + Deref<Target = APM> + Send + Sync, | ||
PM: 'static + Deref + Send + Sync, | ||
S: 'static + Deref<Target = SC> + Send + Sync, | ||
SC: for<'b> WriteableScore<'b>, | ||
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin, | ||
|
@@ -627,6 +640,7 @@ where | |
L::Target: 'static + Logger, | ||
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>, | ||
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, | ||
PM::Target: APeerManager + Send + Sync, | ||
{ | ||
let mut should_break = false; | ||
let async_event_handler = |event| { | ||
|
@@ -650,10 +664,12 @@ where | |
event_handler(event).await; | ||
} | ||
}; | ||
define_run_body!(persister, | ||
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, | ||
define_run_body!( | ||
persister, chain_monitor, | ||
chain_monitor.process_pending_events_async(async_event_handler).await, | ||
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, | ||
gossip_sync, peer_manager, logger, scorer, should_break, { | ||
peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, | ||
gossip_sync, logger, scorer, should_break, { | ||
let fut = Selector { | ||
a: channel_manager.get_event_or_persistence_needed_future(), | ||
b: chain_monitor.get_update_future(), | ||
|
@@ -673,7 +689,29 @@ where | |
task::Poll::Ready(exit) => { should_break = exit; true }, | ||
task::Poll::Pending => false, | ||
} | ||
}, mobile_interruptable_platform) | ||
}, mobile_interruptable_platform | ||
) | ||
} | ||
|
||
#[cfg(feature = "futures")] | ||
async fn process_onion_message_handler_events_async< | ||
EventHandlerFuture: core::future::Future<Output = ()>, | ||
EventHandler: Fn(Event) -> EventHandlerFuture, | ||
PM: 'static + Deref + Send + Sync, | ||
>( | ||
peer_manager: &PM, handler: EventHandler | ||
) | ||
where | ||
PM::Target: APeerManager + Send + Sync, | ||
{ | ||
use lightning::events::EventsProvider; | ||
|
||
let events = core::cell::RefCell::new(Vec::new()); | ||
peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe after async functions in traits are stabilized, we can add an async event processing method to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, possibly. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand the reason for this now? Can we not just add a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
for event in events.into_inner() { | ||
handler(event).await | ||
} | ||
} | ||
|
||
#[cfg(feature = "std")] | ||
|
@@ -742,8 +780,7 @@ impl BackgroundProcessor { | |
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync, | ||
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync, | ||
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send, | ||
APM: APeerManager + Send + Sync, | ||
PM: 'static + Deref<Target = APM> + Send + Sync, | ||
PM: 'static + Deref + Send + Sync, | ||
S: 'static + Deref<Target = SC> + Send + Sync, | ||
SC: for <'b> WriteableScore<'b>, | ||
>( | ||
|
@@ -763,6 +800,7 @@ impl BackgroundProcessor { | |
L::Target: 'static + Logger, | ||
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>, | ||
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, | ||
PM::Target: APeerManager + Send + Sync, | ||
{ | ||
let stop_thread = Arc::new(AtomicBool::new(false)); | ||
let stop_thread_clone = stop_thread.clone(); | ||
|
@@ -782,14 +820,18 @@ impl BackgroundProcessor { | |
} | ||
event_handler.handle_event(event); | ||
}; | ||
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), | ||
define_run_body!( | ||
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), | ||
channel_manager, channel_manager.process_pending_events(&event_handler), | ||
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), | ||
peer_manager, | ||
peer_manager.onion_message_handler().process_pending_events(&event_handler), | ||
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), | ||
{ Sleeper::from_two_futures( | ||
channel_manager.get_event_or_persistence_needed_future(), | ||
chain_monitor.get_update_future() | ||
).wait_timeout(Duration::from_millis(100)); }, | ||
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) | ||
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false | ||
) | ||
}); | ||
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } | ||
} | ||
|
@@ -1362,9 +1404,11 @@ mod tests { | |
|
||
#[test] | ||
fn test_timer_tick_called() { | ||
// Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, | ||
// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and | ||
// `PeerManager::timer_tick_occurred` every `PING_TIMER`. | ||
// Test that: | ||
// - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, | ||
// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, | ||
// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and | ||
// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`. | ||
let (_, nodes) = create_nodes(1, "test_timer_tick_called"); | ||
let data_dir = nodes[0].kv_store.get_data_dir(); | ||
let persister = Arc::new(Persister::new(data_dir)); | ||
|
@@ -1375,9 +1419,11 @@ mod tests { | |
let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); | ||
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); | ||
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); | ||
let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string(); | ||
if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() && | ||
log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() && | ||
log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() { | ||
log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() && | ||
log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() { | ||
break | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -530,6 +530,25 @@ pub enum Event { | |
/// serialized prior to LDK version 0.0.117. | ||
sender_intended_total_msat: Option<u64>, | ||
}, | ||
/// Indicates that a peer connection with a node is needed in order to send an [`OnionMessage`]. | ||
/// | ||
/// Typically, this happens when a [`MessageRouter`] is unable to find a complete path to a | ||
/// [`Destination`]. Once a connection is established, any messages buffered by an | ||
/// [`OnionMessageHandler`] may be sent. | ||
/// | ||
/// This event will not be generated for onion message forwards; only for sends including | ||
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost. | ||
/// | ||
/// [`OnionMessage`]: msgs::OnionMessage | ||
/// [`MessageRouter`]: crate::onion_message::MessageRouter | ||
/// [`Destination`]: crate::onion_message::Destination | ||
/// [`OnionMessageHandler`]: crate::ln::msgs::OnionMessageHandler | ||
ConnectionNeeded { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why did you end up going the event route vs having an "start opening a connection async" trait (or even just doing it via the onion message router)? I guess an event could let us reuse it later in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would require adding another parameterization for users to implement (i.e., either a new trait on And since it needs to be async, the trait implementation wouldn't be enough. Users would need to write code to process connection requests in the background from what their trait enqueued. If we wrote some of this logic for them, then we'd need to do it for each language's IO layer. But that wouldn't fit well into any peer management logic that they may have (e.g., LDK Node's Seems much simpler just to surface an event for them so that no additional parameterization and processing code is needed, other than handling one more event variant. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, fair, I guess. I think the current piping through of the needs-connected nodes is also a bit gross, but I'm not sure its more gross. One further thing to consider is what we want the eventual end state to be. If we want to to generate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we pipe the needs-connect nodes through There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm... that wouldn't work for uses of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The custom trait implementations would need to support connecting to peers outbound, would it not work in some other way? I'm not convinced it's cleaner than the current approach though :/ just nicer in terms of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure that I follow. All custom implementations should work with the current approach of using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Discussed offline. Updated to have events processed directly from any |
||
/// The node id for the node needing a connection. | ||
node_id: PublicKey, | ||
/// Sockets for connecting to the node. | ||
addresses: Vec<msgs::SocketAddress>, | ||
}, | ||
/// Indicates a request for an invoice failed to yield a response in a reasonable amount of time | ||
/// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an | ||
/// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed. | ||
|
@@ -1190,6 +1209,10 @@ impl Writeable for Event { | |
(0, payment_id, required), | ||
}) | ||
}, | ||
&Event::ConnectionNeeded { .. } => { | ||
35u8.write(writer)?; | ||
// Never write ConnectionNeeded events as buffered onion messages aren't serialized. | ||
}, | ||
// Note that, going forward, all new events must only write data inside of | ||
// `write_tlv_fields`. Versions 0.0.101+ will ignore odd-numbered events that write | ||
// data via `write_tlv_fields`. | ||
|
@@ -1200,8 +1223,7 @@ impl Writeable for Event { | |
impl MaybeReadable for Event { | ||
fn read<R: io::Read>(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> { | ||
match Readable::read(reader)? { | ||
// Note that we do not write a length-prefixed TLV for FundingGenerationReady events, | ||
// unlike all other events, thus we return immediately here. | ||
// Note that we do not write a length-prefixed TLV for FundingGenerationReady events. | ||
0u8 => Ok(None), | ||
1u8 => { | ||
let f = || { | ||
|
@@ -1588,6 +1610,8 @@ impl MaybeReadable for Event { | |
}; | ||
f() | ||
}, | ||
// Note that we do not write a length-prefixed TLV for ConnectionNeeded events. | ||
35u8 => Ok(None), | ||
// Versions prior to 0.0.100 did not ignore odd types, instead returning InvalidValue. | ||
// Version 0.0.100 failed to properly ignore odd types, possibly resulting in corrupt | ||
// reads. | ||
|
Uh oh!
There was an error while loading. Please reload this page.