From e2b180838dd1c8fb3cb2269a1e28452c49cf1767 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 6 Jan 2014 15:23:37 -0800 Subject: [PATCH 1/2] Shuffle around ownership in concurrent queues Beforehand, using a concurrent queue always mandated that the "shared state" be stored internally to the queues in order to provide a safe interface. This isn't quite as flexible as one would want in some circumstances, so instead this commit moves the queues to not containing the shared state. The queues no longer have a "default useful safe" interface, but rather a "default safe" interface (minus the useful part). The queues have to be shared manually through an Arc or some other means. This allows them to be a little more flexible at the cost of a usability hindrance. I plan on using this new flexibility to upgrade a channel to a shared channel seamlessly. --- src/libgreen/lib.rs | 1 + src/libgreen/message_queue.rs | 61 +++++++ src/libgreen/sched.rs | 16 +- src/librustuv/queue.rs | 51 +++--- src/libstd/comm/mod.rs | 129 +++++++-------- src/libstd/comm/select.rs | 12 +- src/libstd/sync/mpsc_queue.rs | 154 +++++++----------- src/libstd/sync/spsc_queue.rs | 290 ++++++++++++++-------------------- 8 files changed, 344 insertions(+), 370 deletions(-) create mode 100644 src/libgreen/message_queue.rs diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index d7e94c6d48eba..aa062321b1329 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -47,6 +47,7 @@ use task::GreenTask; mod macros; mod simple; +mod message_queue; pub mod basic; pub mod context; diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs new file mode 100644 index 0000000000000..3a118476affb7 --- /dev/null +++ b/src/libgreen/message_queue.rs @@ -0,0 +1,61 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use mpsc = std::sync::mpsc_queue; +use std::sync::arc::UnsafeArc; + +pub enum PopResult { + Inconsistent, + Empty, + Data(T), +} + +pub fn queue() -> (Consumer, Producer) { + let (a, b) = UnsafeArc::new2(mpsc::Queue::new()); + (Consumer { inner: a }, Producer { inner: b }) +} + +pub struct Producer { + priv inner: UnsafeArc>, +} + +pub struct Consumer { + priv inner: UnsafeArc>, +} + +impl Consumer { + pub fn pop(&mut self) -> PopResult { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => Inconsistent, + mpsc::Empty => Empty, + mpsc::Data(t) => Data(t), + } + } + + pub fn casual_pop(&mut self) -> Option { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => None, + mpsc::Empty => None, + mpsc::Data(t) => Some(t), + } + } +} + +impl Producer { + pub fn push(&mut self, t: T) { + unsafe { (*self.inner.get()).push(t); } + } +} + +impl Clone for Producer { + fn clone(&self) -> Producer { + Producer { inner: self.inner.clone() } + } +} diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index b0b88e4be7936..20eca48cff9e5 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -17,7 +17,6 @@ use std::rt::task::Task; use std::sync::deque; use std::unstable::mutex::Mutex; use std::unstable::raw; -use mpsc = std::sync::mpsc_queue; use TaskState; use context::Context; @@ -25,6 +24,7 @@ use coroutine::Coroutine; use sleeper_list::SleeperList; use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; +use msgq = message_queue; /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified @@ -47,9 +47,9 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - message_queue: mpsc::Consumer, + message_queue: msgq::Consumer, /// Producer used to clone sched handles from - message_producer: mpsc::Producer, + message_producer: msgq::Producer, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -143,7 +143,7 @@ impl Scheduler { state: TaskState) -> Scheduler { - let (consumer, producer) = mpsc::queue(()); + let (consumer, producer) = msgq::queue(); let mut sched = Scheduler { pool_id: pool_id, sleeper_list: sleeper_list, @@ -215,7 +215,7 @@ impl Scheduler { // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(match message { mpsc::Empty => true, _ => false }); + rtassert!(match message { msgq::Empty => true, _ => false }); stask.task.get_mut_ref().destroyed = true; } @@ -340,8 +340,8 @@ impl Scheduler { // // I have chosen to take route #2. match self.message_queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty | mpsc::Inconsistent => None + msgq::Data(t) => Some(t), + msgq::Empty | msgq::Inconsistent => None } }; @@ -849,7 +849,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: mpsc::Producer, + priv queue: msgq::Producer, sched_id: uint } diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 32f8d8532a209..dc18c794d237a 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -24,6 +24,7 @@ use std::cast; use std::libc::{c_void, c_int}; use std::rt::task::BlockedTask; use std::unstable::sync::LittleLock; +use std::sync::arc::UnsafeArc; use mpsc = std::sync::mpsc_queue; use async::AsyncWatcher; @@ -39,46 +40,46 @@ enum Message { struct State { handle: *uvll::uv_async_t, lock: LittleLock, // see comments in async_cb for why this is needed + queue: mpsc::Queue, } /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { - priv producer: mpsc::Producer, - priv consumer: mpsc::Consumer, + priv queue: UnsafeArc, priv refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { - priv queue: mpsc::Producer, + priv queue: UnsafeArc, } extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { assert_eq!(status, 0); - let state: &mut QueuePool = unsafe { + let pool: &mut QueuePool = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - let packet = unsafe { state.consumer.packet() }; + let state: &mut State = unsafe { cast::transmute(pool.queue.get()) }; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the // entire queue in a loop. loop { - match state.consumer.pop() { + match state.queue.pop() { mpsc::Data(Task(task)) => { task.wake().map(|t| t.reawaken(true)); } mpsc::Data(Increment) => unsafe { - if state.refcnt == 0 { - uvll::uv_ref((*packet).handle); + if pool.refcnt == 0 { + uvll::uv_ref(state.handle); } - state.refcnt += 1; + pool.refcnt += 1; }, mpsc::Data(Decrement) => unsafe { - state.refcnt -= 1; - if state.refcnt == 0 { - uvll::uv_unref((*packet).handle); + pool.refcnt -= 1; + if pool.refcnt == 0 { + uvll::uv_unref(state.handle); } }, mpsc::Empty | mpsc::Inconsistent => break @@ -99,9 +100,9 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { // If we acquire the mutex here, then we are guaranteed that there are no // longer any senders which are holding on to their handles, so we can // safely allow the event loop to exit. - if state.refcnt == 0 { + if pool.refcnt == 0 { unsafe { - let _l = (*packet).lock.lock(); + let _l = state.lock.lock(); } } } @@ -109,14 +110,14 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { impl QueuePool { pub fn new(loop_: &mut Loop) -> ~QueuePool { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); - let (c, p) = mpsc::queue(State { + let state = UnsafeArc::new(State { handle: handle, lock: LittleLock::new(), + queue: mpsc::Queue::new(), }); let q = ~QueuePool { - producer: p, - consumer: c, refcnt: 0, + queue: state, }; unsafe { @@ -132,23 +133,23 @@ impl QueuePool { pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { - uvll::uv_ref((*self.producer.packet()).handle); + uvll::uv_ref((*self.queue.get()).handle); } self.refcnt += 1; } - Queue { queue: self.producer.clone() } + Queue { queue: self.queue.clone() } } pub fn handle(&self) -> *uvll::uv_async_t { - unsafe { (*self.producer.packet()).handle } + unsafe { (*self.queue.get()).handle } } } impl Queue { pub fn push(&mut self, task: BlockedTask) { - self.queue.push(Task(task)); unsafe { - uvll::uv_async_send((*self.queue.packet()).handle); + (*self.queue.get()).queue.push(Task(task)); + uvll::uv_async_send((*self.queue.get()).handle); } } } @@ -161,7 +162,7 @@ impl Clone for Queue { // and if the queue is dropped later on it'll see the increment for the // decrement anyway. unsafe { - cast::transmute_mut(self).queue.push(Increment); + (*self.queue.get()).queue.push(Increment); } Queue { queue: self.queue.clone() } } @@ -172,9 +173,9 @@ impl Drop for Queue { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { - let state = self.queue.packet(); + let state = self.queue.get(); let _l = (*state).lock.lock(); - self.queue.push(Decrement); + (*state).queue.push(Decrement); uvll::uv_async_send((*state).handle); } } diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index bf37e5fca6a5f..fc70ba8cd7c16 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -237,6 +237,7 @@ use result::{Ok, Err}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use rt::thread::Thread; +use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed}; use vec::OwnedVector; @@ -272,24 +273,6 @@ macro_rules! test ( mod select; -/////////////////////////////////////////////////////////////////////////////// -// Helper type to abstract ports for channels and shared channels -/////////////////////////////////////////////////////////////////////////////// - -enum Consumer { - SPSC(spsc::Consumer), - MPSC(mpsc::Consumer), -} - -impl Consumer{ - unsafe fn packet(&self) -> *mut Packet { - match *self { - SPSC(ref c) => c.packet(), - MPSC(ref c) => c.packet(), - } - } -} - /////////////////////////////////////////////////////////////////////////////// // Public structs /////////////////////////////////////////////////////////////////////////////// @@ -298,7 +281,7 @@ impl Consumer{ /// one task #[no_freeze] // can't share ports in an arc pub struct Port { - priv queue: Consumer, + priv inner: PortInner, } /// An iterator over messages received on a port, this iterator will block @@ -312,7 +295,7 @@ pub struct PortIterator<'a, T> { /// task #[no_freeze] // can't share chans in an arc pub struct Chan { - priv queue: spsc::Producer, + priv inner: UnsafeArc>, } /// The sending-half of Rust's channel type. This half can be shared among many @@ -320,23 +303,33 @@ pub struct Chan { #[no_freeze] // technically this implementation is shareable, but it shouldn't // be required to be shareable in an arc pub struct SharedChan { - priv queue: mpsc::Producer, + priv inner: UnsafeArc>, } /////////////////////////////////////////////////////////////////////////////// // Internal struct definitions /////////////////////////////////////////////////////////////////////////////// +enum PortInner { + Single(UnsafeArc>), + Shared(UnsafeArc>), +} + +struct SingleInner { + queue: spsc::Queue, + packet: Packet, +} + +struct SharedInner { + queue: mpsc::Queue, + packet: Packet, +} + struct Packet { cnt: AtomicInt, // How many items are on this channel steals: int, // How many times has a port received without blocking? to_wake: Option, // Task to wake up - // This lock is used to wake up native threads blocked in select. The - // `lock` field is not used because the thread blocking in select must - // block on only one mutex. - //selection_lock: Option>, - // The number of channels which are currently using this packet. This is // used to reference count shared channels. channels: AtomicInt, @@ -355,6 +348,15 @@ struct Packet { static DISCONNECTED: int = int::min_value; static RESCHED_FREQ: int = 200; +impl PortInner { + fn packet<'a>(&'a mut self) -> &'a mut Packet { + match *self { + Single(ref arc) => unsafe { &mut (*arc.get()).packet }, + Shared(ref arc) => unsafe { &mut (*arc.get()).packet }, + } + } +} + impl Packet { fn new() -> Packet { Packet { @@ -528,9 +530,11 @@ impl Chan { pub fn new() -> (Port, Chan) { // arbitrary 128 size cache -- this is just a max cache size, not a // maximum buffer size - let (c, p) = spsc::queue(128, Packet::new()); - let c = SPSC(c); - (Port { queue: c }, Chan { queue: p }) + let (a, b) = UnsafeArc::new2(SingleInner { + queue: spsc::Queue::new(128), + packet: Packet::new(), + }); + (Port { inner: Single(a) }, Chan { inner: b }) } /// Sends a value along this channel to be received by the corresponding @@ -579,16 +583,15 @@ impl Chan { fn try(&self, t: T, can_resched: bool) -> bool { unsafe { - let this = cast::transmute_mut(self); - this.queue.push(t); - let packet = this.queue.packet(); - match (*packet).increment() { + let inner = self.inner.get(); + (*inner).queue.push(t); + match (*inner).packet.increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(can_resched); true } + -1 => { (*inner).packet.wakeup(can_resched); true } // Also as above, SPSC queues must be >= -2 -2 => true, // We succeeded if we sent data - DISCONNECTED => this.queue.is_empty(), + DISCONNECTED => (*inner).queue.is_empty(), // In order to prevent starvation of other tasks in situations // where a task sends repeatedly without ever receiving, we // occassionally yield instead of doing a send immediately. @@ -613,7 +616,7 @@ impl Chan { #[unsafe_destructor] impl Drop for Chan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -623,9 +626,11 @@ impl SharedChan { /// same time. All data sent on any channel will become available on the /// provided port as well. pub fn new() -> (Port, SharedChan) { - let (c, p) = mpsc::queue(Packet::new()); - let c = MPSC(c); - (Port { queue: c }, SharedChan { queue: p }) + let (a, b) = UnsafeArc::new2(SharedInner { + queue: mpsc::Queue::new(), + packet: Packet::new(), + }); + (Port { inner: Shared(a) }, SharedChan { inner: b }) } /// Equivalent method to `send` on the `Chan` type (using the same @@ -665,17 +670,15 @@ impl SharedChan { // preflight check serves as the definitive "this will never be // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" - let packet = self.queue.packet(); - if (*packet).cnt.load(Relaxed) < DISCONNECTED + 1024 { + let inner = self.inner.get(); + if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 { return false } - let this = cast::transmute_mut(self); - this.queue.push(t); - - match (*packet).increment() { + (*inner).queue.push(t); + match (*inner).packet.increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(true); } + -1 => { (*inner).packet.wakeup(true); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); @@ -690,15 +693,15 @@ impl SharedChan { impl Clone for SharedChan { fn clone(&self) -> SharedChan { - unsafe { (*self.queue.packet()).channels.fetch_add(1, SeqCst); } - SharedChan { queue: self.queue.clone() } + unsafe { (*self.inner.get()).packet.channels.fetch_add(1, SeqCst); } + SharedChan { inner: self.inner.clone() } } } #[unsafe_destructor] impl Drop for SharedChan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -750,18 +753,18 @@ impl Port { // See the comment about yielding on sends, but the same applies here. // If a thread is spinning in try_recv we should try - unsafe { - let packet = this.queue.packet(); - (*packet).recv_cnt += 1; - if (*packet).recv_cnt % RESCHED_FREQ == 0 { + { + let packet = this.inner.packet(); + packet.recv_cnt += 1; + if packet.recv_cnt % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } } - let ret = match this.queue { - SPSC(ref mut queue) => queue.pop(), - MPSC(ref mut queue) => match queue.pop() { + let ret = match this.inner { + Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() }, + Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => Some(t), mpsc::Empty => None, @@ -794,7 +797,7 @@ impl Port { let data; loop { Thread::yield_now(); - match queue.pop() { + match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => { data = t; break } mpsc::Empty => fail!("inconsistent => empty"), mpsc::Inconsistent => {} @@ -805,7 +808,7 @@ impl Port { } }; if increment && ret.is_some() { - unsafe { (*this.queue.packet()).steals += 1; } + this.inner.packet().steals += 1; } return ret; } @@ -830,7 +833,7 @@ impl Port { let this; unsafe { this = cast::transmute_mut(self); - packet = this.queue.packet(); + packet = this.inner.packet(); let task: ~Task = Local::take(); task.deschedule(1, |task| { assert!((*packet).to_wake.is_none()); @@ -845,8 +848,8 @@ impl Port { let data = self.try_recv_inc(false); if data.is_none() && - unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED { - fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) }); + packet.cnt.load(SeqCst) != DISCONNECTED { + fail!("bug: woke up too soon {}", packet.cnt.load(SeqCst)); } return data; } @@ -868,9 +871,7 @@ impl Drop for Port { // All we need to do is store that we're disconnected. If the channel // half has already disconnected, then we'll just deallocate everything // when the shared packet is deallocated. - unsafe { - (*self.queue.packet()).cnt.store(DISCONNECTED, SeqCst); - } + self.inner.packet().cnt.store(DISCONNECTED, SeqCst); } } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 302c9d9ea469b..f2a79246df1bf 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -125,18 +125,18 @@ impl Select { let id = this.next_id; this.next_id += 1; unsafe { - let packet = port.queue.packet(); + let packet = port.inner.packet(); assert!(!(*packet).selecting.load(Relaxed)); assert_eq!((*packet).selection_id, 0); (*packet).selection_id = id; if this.head.is_null() { - this.head = packet; - this.tail = packet; + this.head = packet as *mut Packet; + this.tail = packet as *mut Packet; } else { (*packet).select_prev = this.tail; assert!((*packet).select_next.is_null()); - (*this.tail).select_next = packet; - this.tail = packet; + (*this.tail).select_next = packet as *mut Packet; + this.tail = packet as *mut Packet; } } Handle { id: id, selector: this, port: port } @@ -293,7 +293,7 @@ impl Drop for Select { #[unsafe_destructor] impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { - unsafe { self.selector.remove(self.port.queue.packet()) } + unsafe { self.selector.remove(self.port.inner.packet()) } } } diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index a249d6ed2e8ce..c911c07fdf012 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -39,12 +39,10 @@ // /queues/non-intrusive-mpsc-node-based-queue use cast; -use clone::Clone; use kinds::Send; use ops::Drop; use option::{Option, None, Some}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; /// A result of the `pop` function. @@ -65,40 +63,12 @@ struct Node { value: Option, } -struct State { - head: AtomicPtr>, - tail: *mut Node, - packet: P, -} - -/// The consumer half of this concurrent queue. This half is used to receive -/// data from the producers. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// The production half of the concurrent queue. This handle may be cloned in -/// order to make handles for new producers. -pub struct Producer { - priv state: UnsafeArc>, -} - -impl Clone for Producer { - fn clone(&self) -> Producer { - Producer { state: self.state.clone() } - } -} - -/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" -/// of information which will be shared by the consumer and the producer which -/// can be re-acquired via the `packet` function. This is helpful when extra -/// state is shared between the producer and consumer, but note that there is no -/// synchronization performed of this data. -pub fn queue(p: P) -> (Consumer, Producer) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue { + priv head: AtomicPtr>, + priv tail: *mut Node, } impl Node { @@ -110,67 +80,26 @@ impl Node { } } -impl State { - unsafe fn new(p: P) -> State { - let stub = Node::new(None); - State { +impl Queue { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: stub, - packet: p, } } - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - - unsafe fn pop(&mut self) -> PopResult { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node = cast::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } -} - -#[unsafe_destructor] -impl Drop for State { - fn drop(&mut self) { + /// Pushes a new value onto this queue. + pub fn push(&mut self, t: T) { unsafe { - let mut cur = self.tail; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: ~Node = cast::transmute(cur); - cur = next; - } + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); } } -} - -impl Producer { - /// Pushes a new value onto this queue. - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - /// Gets an unsafe pointer to the user-defined packet shared by the - /// producers and the consumer. Note that care must be taken to ensure that - /// the lifetime of the queue outlives the usage of the returned pointer. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} -impl Consumer { /// Pops some data from this queue. /// /// Note that the current implementation means that this function cannot @@ -182,8 +111,23 @@ impl Consumer { /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. pub fn pop(&mut self) -> PopResult { - unsafe { (*self.state.get()).pop() } + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node = cast::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } } + /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. pub fn casual_pop(&mut self) -> Option { @@ -192,10 +136,19 @@ impl Consumer { Empty | Inconsistent => None, } } - /// Gets an unsafe pointer to the underlying user-defined packet. See - /// `Producer.packet` for more information. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +} + +#[unsafe_destructor] +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + let mut cur = self.tail; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: ~Node = cast::transmute(cur); + cur = next; + } + } } } @@ -203,12 +156,12 @@ impl Consumer { mod tests { use prelude::*; - use super::{queue, Data, Empty, Inconsistent}; + use super::{Queue, Data, Empty, Inconsistent}; use native; #[test] fn test_full() { - let (_, mut p) = queue(()); + let mut q = Queue::new(); p.push(~1); p.push(~2); } @@ -217,20 +170,20 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let (mut c, p) = queue(()); + let mut q = Queue::new(); match c.pop() { Empty => {} Inconsistent | Data(..) => fail!() } let (port, chan) = SharedChan::new(); + let q = UnsafeArc::new(q); for _ in range(0, nthreads) { - let q = p.clone(); let chan = chan.clone(); + let q = q.clone(); do native::task::spawn { - let mut q = q; for i in range(0, nmsgs) { - q.push(i); + unsafe { (*q.get()).push(i); } } chan.send(()); } @@ -238,11 +191,12 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match c.pop() { + match unsafe { (*q.get()).pop() } { Empty | Inconsistent => {}, Data(_) => { i += 1 } } } + drop(chan); for _ in range(0, nthreads) { port.recv(); } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 6f1b887c27156..8743299d0bd9a 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -38,7 +38,6 @@ use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send @@ -50,75 +49,25 @@ struct Node { next: AtomicPtr>, // next node in the queue } -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State { +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from - tail_prev: AtomicPtr>, // where to pop from + priv tail: *mut Node, // where to pop from + priv tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + priv head: *mut Node, // where to push to + priv first: *mut Node, // where to get new nodes from + priv tail_copy: *mut Node, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -/// Producer half of this queue. This handle is used to push data to the -/// consumer. -pub struct Producer { - priv state: UnsafeArc>, -} - -/// Consumer half of this queue. This handle is used to receive data from the -/// producer. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// Creates a new queue. The producer returned is connected to the consumer to -/// push all data to the consumer. -/// -/// # Arguments -/// -/// * `bound` - This queue implementation is implemented with a linked list, -/// and this means that a push is always a malloc. In order to -/// amortize this cost, an internal cache of nodes is maintained -/// to prevent a malloc from always being necessary. This bound is -/// the limit on the size of the cache (if desired). If the value -/// is 0, then the cache has no bound. Otherwise, the cache will -/// never grow larger than `bound` (although the queue itself -/// could be much larger. -/// -/// * `p` - This is the user-defined packet of data which will also be shared -/// between the producer and consumer. -pub fn queue(bound: uint, - p: P) -> (Consumer, Producer) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) + priv cache_bound: uint, + priv cache_additions: AtomicUint, + priv cache_subtractions: AtomicUint, } impl Node { @@ -132,49 +81,49 @@ impl Node { } } -impl Producer { - /// Pushes data onto the queue - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - /// Tests whether the queue is empty. Note that if this function returns - /// `false`, the return value is significant, but if the return value is - /// `true` then almost no meaning can be attached to the return value. - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - /// Acquires an unsafe pointer to the underlying user-defined packet. Note - /// that care must be taken to ensure that the queue outlives the usage of - /// the packet (because it is an unsafe pointer). - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl Consumer { - /// Pops some data from this queue, returning `None` when the queue is - /// empty. - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } - } - /// Same function as the producer's `packet` method. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +impl Queue { + /// Creates a new queue. The producer returned is connected to the consumer + /// to push all data to the consumer. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub fn new(bound: uint) -> Queue { + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + Queue { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } } -} -impl State { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&mut self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } } unsafe fn alloc(&mut self) -> *mut Node { @@ -208,50 +157,57 @@ impl State { Node::new() } - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // XXX: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&mut self) -> Option { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); - if size < self.cache_bound { + self.tail = next; + if self.cache_bound == 0 { self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node = cast::transmute(tail); + // XXX: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node = cast::transmute(tail); + } } + return ret; } - return ret; } - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); + /// Tests whether this queue is empty or not. Remember that there can only + /// be one tester/popper, and also keep in mind that the answer returned + /// from this is likely to change if it is `false`. + pub fn is_empty(&self) -> bool { + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + return next.is_null(); + } } } #[unsafe_destructor] -impl Drop for State { +impl Drop for Queue { fn drop(&mut self) { unsafe { let mut cur = self.first; @@ -267,44 +223,44 @@ impl Drop for State { #[cfg(test)] mod test { use prelude::*; - use super::queue; + use super::Queue; use native; #[test] fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(0); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(0); + q.push(~1); + q.push(~2); } #[test] fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(1); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] @@ -313,13 +269,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); + let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); do native::task::spawn { let mut c = c; for _ in range(0, 100000) { loop { - match c.pop() { + match unsafe { (*b.get()).pop() } { Some(1) => break, Some(_) => fail!(), None => {} @@ -329,7 +285,7 @@ mod test { chan.send(()); } for _ in range(0, 100000) { - p.push(1); + unsafe { (*a.get()).push(1); } } port.recv(); } From 3386a3bb85516b8bd8ab9ce12e8e9b0f7b3777ec Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 8 Jan 2014 18:31:48 -0800 Subject: [PATCH 2/2] Drop the full message queue when a `Port` is gone This fixes the deadlock where you send ports on channels, but then the receiving task fails, dropping the port, but the queue isn't dropped so if you then wait for data you'll wait forever. --- src/libstd/comm/mod.rs | 191 ++++++++++++++++++++++++++++++---- src/libstd/sync/mpsc_queue.rs | 9 +- src/libstd/sync/spsc_queue.rs | 15 +-- 3 files changed, 175 insertions(+), 40 deletions(-) diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index fc70ba8cd7c16..c5cfea28fab77 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -227,7 +227,6 @@ use cast; use clone::Clone; -use container::Container; use int; use iter::Iterator; use kinds::Send; @@ -295,7 +294,7 @@ pub struct PortIterator<'a, T> { /// task #[no_freeze] // can't share chans in an arc pub struct Chan { - priv inner: UnsafeArc>, + priv inner: UnsafeArc>, } /// The sending-half of Rust's channel type. This half can be shared among many @@ -303,7 +302,7 @@ pub struct Chan { #[no_freeze] // technically this implementation is shareable, but it shouldn't // be required to be shareable in an arc pub struct SharedChan { - priv inner: UnsafeArc>, + priv inner: UnsafeArc>, } /////////////////////////////////////////////////////////////////////////////// @@ -311,16 +310,16 @@ pub struct SharedChan { /////////////////////////////////////////////////////////////////////////////// enum PortInner { - Single(UnsafeArc>), - Shared(UnsafeArc>), + SingleInner(UnsafeArc>), + SharedInner(UnsafeArc>), } -struct SingleInner { +struct Stream { queue: spsc::Queue, packet: Packet, } -struct SharedInner { +struct Shared { queue: mpsc::Queue, packet: Packet, } @@ -339,6 +338,11 @@ struct Packet { select_next: *mut Packet, select_prev: *mut Packet, recv_cnt: int, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + go_home: AtomicBool, + sender_drain: AtomicInt, } /////////////////////////////////////////////////////////////////////////////// @@ -351,8 +355,8 @@ static RESCHED_FREQ: int = 200; impl PortInner { fn packet<'a>(&'a mut self) -> &'a mut Packet { match *self { - Single(ref arc) => unsafe { &mut (*arc.get()).packet }, - Shared(ref arc) => unsafe { &mut (*arc.get()).packet }, + SingleInner(ref arc) => unsafe { &mut (*arc.get()).packet }, + SharedInner(ref arc) => unsafe { &mut (*arc.get()).packet }, } } } @@ -370,6 +374,9 @@ impl Packet { select_next: 0 as *mut Packet, select_prev: 0 as *mut Packet, recv_cnt: 0, + + go_home: AtomicBool::new(false), + sender_drain: AtomicInt::new(0), } } @@ -530,11 +537,11 @@ impl Chan { pub fn new() -> (Port, Chan) { // arbitrary 128 size cache -- this is just a max cache size, not a // maximum buffer size - let (a, b) = UnsafeArc::new2(SingleInner { + let (a, b) = UnsafeArc::new2(Stream { queue: spsc::Queue::new(128), packet: Packet::new(), }); - (Port { inner: Single(a) }, Chan { inner: b }) + (Port { inner: SingleInner(a) }, Chan { inner: b }) } /// Sends a value along this channel to be received by the corresponding @@ -584,14 +591,35 @@ impl Chan { fn try(&self, t: T, can_resched: bool) -> bool { unsafe { let inner = self.inner.get(); + + // See the discussion in Port::drop for what's going on here + if (*inner).packet.go_home.load(Relaxed) { return false } + (*inner).queue.push(t); match (*inner).packet.increment() { // As described above, -1 == wakeup -1 => { (*inner).packet.wakeup(can_resched); true } // Also as above, SPSC queues must be >= -2 -2 => true, - // We succeeded if we sent data - DISCONNECTED => (*inner).queue.is_empty(), + + DISCONNECTED => { + // After the go_home check, the port could have been + // dropped, so we need to be sure to drain the queue here + // (we own the queue now that the port is gone). Note that + // it is only possible for there to be one item in the queue + // because the port ensures that there is 0 data in the + // queue when it flags disconnected, and we could have only + // pushed on one more item + let first = (*inner).queue.pop(); + let second = (*inner).queue.pop(); + assert!(second.is_none()); + + match first { + Some(..) => false, // we failed to send the data + None => true, // we successfully sent data + } + } + // In order to prevent starvation of other tasks in situations // where a task sends repeatedly without ever receiving, we // occassionally yield instead of doing a send immediately. @@ -626,11 +654,11 @@ impl SharedChan { /// same time. All data sent on any channel will become available on the /// provided port as well. pub fn new() -> (Port, SharedChan) { - let (a, b) = UnsafeArc::new2(SharedInner { + let (a, b) = UnsafeArc::new2(Shared { queue: mpsc::Queue::new(), packet: Packet::new(), }); - (Port { inner: Shared(a) }, SharedChan { inner: b }) + (Port { inner: SharedInner(a) }, SharedChan { inner: b }) } /// Equivalent method to `send` on the `Chan` type (using the same @@ -645,6 +673,10 @@ impl SharedChan { /// semantics) pub fn try_send(&self, t: T) -> bool { unsafe { + let inner = self.inner.get(); + // See Port::drop for what's going on + if (*inner).packet.go_home.load(Relaxed) { return false } + // Note that the multiple sender case is a little tricker // semantically than the single sender case. The logic for // incrementing is "add and if disconnected store disconnected". @@ -670,15 +702,49 @@ impl SharedChan { // preflight check serves as the definitive "this will never be // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" - let inner = self.inner.get(); if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 { return false } (*inner).queue.push(t); match (*inner).packet.increment() { - DISCONNECTED => {} // oh well, we tried -1 => { (*inner).packet.wakeup(true); } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + 1024 => { + if (*inner).packet.sender_drain.fetch_add(1, SeqCst) == 0 { + loop { + // drain the queue + loop { + match (*inner).queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if (*inner).packet.sender_drain.compare_and_swap( + 1, 0, SeqCst) == 1 { + break + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. + } + } + n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); @@ -763,8 +829,8 @@ impl Port { } let ret = match this.inner { - Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() }, - Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } { + SingleInner(ref mut arc) => unsafe { (*arc.get()).queue.pop() }, + SharedInner(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => Some(t), mpsc::Empty => None, @@ -868,10 +934,69 @@ impl<'a, T: Send> Iterator for PortIterator<'a, T> { #[unsafe_destructor] impl Drop for Port { fn drop(&mut self) { - // All we need to do is store that we're disconnected. If the channel - // half has already disconnected, then we'll just deallocate everything - // when the shared packet is deallocated. - self.inner.packet().cnt.store(DISCONNECTED, SeqCst); + // Dropping a port seems like a fairly trivial thing. In theory all we + // need to do is flag that we're disconnected and then everything else + // can take over (we don't have anyone to wake up). + // + // The catch for Ports is that we want to drop the entire contents of + // the queue. There are multiple reasons for having this property, the + // largest of which is that if another port is waiting in this channel + // (but not received yet), then waiting on that port will cause a + // deadlock. + // + // So if we accept that we must now destroy the entire contents of the + // queue, this code may make a bit more sense. The tricky part is that + // we can't let any in-flight sends go un-dropped, we have to make sure + // *everything* is dropped and nothing new will come onto the channel. + + // The first thing we do is set a flag saying that we're done for. All + // sends are gated on this flag, so we're immediately guaranteed that + // there are a bounded number of active sends that we'll have to deal + // with. + self.inner.packet().go_home.store(true, Relaxed); + + // Now that we're guaranteed to deal with a bounded number of senders, + // we need to drain the queue. This draining process happens atomically + // with respect to the "count" of the channel. If the count is nonzero + // (with steals taken into account), then there must be data on the + // channel. In this case we drain everything and then try again. We will + // continue to fail while active senders send data while we're dropping + // data, but eventually we're guaranteed to break out of this loop + // (because there is a bounded number of senders). + let mut steals = self.inner.packet().steals; + while { + let cnt = self.inner.packet().cnt.compare_and_swap( + steals, DISCONNECTED, SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + match self.inner { + SingleInner(ref mut arc) => { + loop { + match unsafe { (*arc.get()).queue.pop() } { + Some(..) => { steals += 1; } + None => break + } + } + } + SharedInner(ref mut arc) => { + // See the discussion in 'try_recv_inc' for why we yield + // control of this thread. + loop { + match unsafe { (*arc.get()).queue.pop() } { + mpsc::Data(..) => { steals += 1; } + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + } + } + } + + // At this point in time, we have gated all future senders from sending, + // and we have flagged the channel as being disconnected. The senders + // still have some responsibility, however, because some sends may not + // complete until after we flag the disconnection. There are more + // details in the sending methods that see DISCONNECTED } } @@ -1322,4 +1447,24 @@ mod test { drop(chan); assert_eq!(count_port.recv(), 4); }) + + test!(fn pending_dropped() { + let (a, b) = Chan::new(); + let (c, d) = Chan::<()>::new(); + b.send(d); + drop(a); + c.recv(); + } #[should_fail]) + + test!(fn pending_dropped_stress() { + let (a, b) = Chan::new(); + let (c, d) = Chan::new(); + do spawn { + while b.try_send(~3) { continue } + d.send(()); + } + a.recv(); + drop(a); + c.recv(); + }) } diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index c911c07fdf012..eef63d759021f 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -156,14 +156,15 @@ impl Drop for Queue { mod tests { use prelude::*; - use super::{Queue, Data, Empty, Inconsistent}; use native; + use super::{Queue, Data, Empty, Inconsistent}; + use sync::arc::UnsafeArc; #[test] fn test_full() { let mut q = Queue::new(); - p.push(~1); - p.push(~2); + q.push(~1); + q.push(~2); } #[test] @@ -171,7 +172,7 @@ mod tests { let nthreads = 8u; let nmsgs = 1000u; let mut q = Queue::new(); - match c.pop() { + match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 8743299d0bd9a..18e6cf1b6e444 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -193,17 +193,6 @@ impl Queue { return ret; } } - - /// Tests whether this queue is empty or not. Remember that there can only - /// be one tester/popper, and also keep in mind that the answer returned - /// from this is likely to change if it is `false`. - pub fn is_empty(&self) -> bool { - unsafe { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); - } - } } #[unsafe_destructor] @@ -223,8 +212,9 @@ impl Drop for Queue { #[cfg(test)] mod test { use prelude::*; - use super::Queue; use native; + use super::Queue; + use sync::arc::UnsafeArc; #[test] fn smoke() { @@ -272,7 +262,6 @@ mod test { let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); do native::task::spawn { - let mut c = c; for _ in range(0, 100000) { loop { match unsafe { (*b.get()).pop() } {