From 0279b899827abacbc46cd9394c41025b5869987b Mon Sep 17 00:00:00 2001 From: Jordi Boggiano Date: Wed, 30 Apr 2014 16:27:39 +0200 Subject: [PATCH 1/4] Add TimeoutReceiver to allow recv()ing with a timeout --- src/libstd/comm/mod.rs | 65 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index ce1c09af07cad..63770cc2c9003 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -120,6 +120,18 @@ //! }); //! rx.recv(); //! ``` +//! +//! Reading from a channel with a timeout: +//! +//! ``` +//! let (tx, rx) = channel(); +//! let mut rx = TimeoutReceiver::new(rx, 10000); +//! match rx.recv() { +//! Some(val) => println!("Received {}", val), +//! None => println!("timed out, no message received in 10 seconds"), +//! } +//! ``` + // A description of how Rust's channel implementation works // @@ -284,6 +296,7 @@ use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use sync::arc::UnsafeArc; +use io::timer::Timer; pub use comm::select::{Select, Handle}; @@ -331,6 +344,13 @@ pub struct Receiver { marker: marker::NoShare, } +/// Wraps a Receiver and allows receiving messages with a timeout strategy +pub struct TimeoutReceiver { + inner: Receiver, + timer: Timer, + timeout: u64, +} + /// An iterator over messages on a receiver, this iterator will block /// whenever `next` is called, waiting for a new message, and `None` will be /// returned when the corresponding channel has hung up. @@ -937,6 +957,51 @@ impl select::Packet for Receiver { } } +impl TimeoutReceiver { + pub fn new(inner: Receiver, timeout: u64) -> TimeoutReceiver { + // create a timer object + let timer = Timer::new().unwrap(); + TimeoutReceiver { inner: inner, timer: timer, timeout: timeout } + } + + /// Blocks waiting for a value on the inner receiver or returns None after + /// timeout has passed + /// + /// This method can not fail as opposed to the original Receiver::recv, but + /// it returns an Option instead of the raw value. None is returned if the + /// recv timed out. + pub fn recv(&mut self) -> Option { + let oneshot = self.timer.oneshot(self.timeout); + + // alternately read from the oneshot timer and the wrapped channel + loop { + println!("Trying"); + match self.inner.try_recv() { + Ok(v) => return Some(v), + _ => () + } + if oneshot.try_recv().is_ok() { + return None; + } + } + } + + /// Wrapper for the inner Receiver's try_recv method. + pub fn try_recv(&self) -> Result { + self.inner.try_recv() + } + + /// Wrapper for the inner Receiver's recv_opt method. + pub fn recv_opt(&self) -> Result { + self.inner.recv_opt() + } + + /// Wrapper for the inner Receiver's iter method. + pub fn iter<'a>(&'a self) -> Messages<'a, T> { + self.inner.iter() + } +} + impl<'a, T: Send> Iterator for Messages<'a, T> { fn next(&mut self) -> Option { self.rx.recv_opt().ok() } } From aa7df15d806b0f23c58e72a0021370e91d7e5039 Mon Sep 17 00:00:00 2001 From: Jordi Boggiano Date: Thu, 1 May 2014 17:13:12 +0200 Subject: [PATCH 2/4] Update recv to use select properly --- src/libstd/comm/mod.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 63770cc2c9003..f52b60eaeb649 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -973,17 +973,19 @@ impl TimeoutReceiver { pub fn recv(&mut self) -> Option { let oneshot = self.timer.oneshot(self.timeout); - // alternately read from the oneshot timer and the wrapped channel - loop { - println!("Trying"); - match self.inner.try_recv() { - Ok(v) => return Some(v), - _ => () - } - if oneshot.try_recv().is_ok() { - return None; - } + let sel = Select::new(); + let mut timeout_rx = sel.handle(&oneshot); + let mut inner_rx = sel.handle(&self.inner); + unsafe { + timeout_rx.add(); + inner_rx.add(); + } + let ret = sel.wait(); + if ret == inner_rx.id() { + return Some(inner_rx.recv()); } + + return None; } /// Wrapper for the inner Receiver's try_recv method. From d07e298712c6a545d1038f877449e5bba86c0e1f Mon Sep 17 00:00:00 2001 From: Jordi Boggiano Date: Thu, 1 May 2014 17:22:28 +0200 Subject: [PATCH 3/4] Remove trailing white-space --- src/libstd/comm/mod.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index f52b60eaeb649..d5fe42ade34fe 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -120,9 +120,9 @@ //! }); //! rx.recv(); //! ``` -//! +//! //! Reading from a channel with a timeout: -//! +//! //! ``` //! let (tx, rx) = channel(); //! let mut rx = TimeoutReceiver::new(rx, 10000); @@ -968,7 +968,7 @@ impl TimeoutReceiver { /// timeout has passed /// /// This method can not fail as opposed to the original Receiver::recv, but - /// it returns an Option instead of the raw value. None is returned if the + /// it returns an Option instead of the raw value. None is returned if the /// recv timed out. pub fn recv(&mut self) -> Option { let oneshot = self.timer.oneshot(self.timeout); @@ -981,7 +981,7 @@ impl TimeoutReceiver { inner_rx.add(); } let ret = sel.wait(); - if ret == inner_rx.id() { + if ret == inner_rx.id() { return Some(inner_rx.recv()); } From 2653af7131d657f09208ab4e4be84a1162575cc2 Mon Sep 17 00:00:00 2001 From: Jordi Boggiano Date: Thu, 1 May 2014 17:42:54 +0200 Subject: [PATCH 4/4] Add docblock to please the CI gods --- src/libstd/comm/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index d5fe42ade34fe..a42bf3ed4499b 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -958,8 +958,8 @@ impl select::Packet for Receiver { } impl TimeoutReceiver { + /// Creates a TimeoutReceiver pub fn new(inner: Receiver, timeout: u64) -> TimeoutReceiver { - // create a timer object let timer = Timer::new().unwrap(); TimeoutReceiver { inner: inner, timer: timer, timeout: timeout } }