diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index d7e94c6d48eba..aa062321b1329 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -47,6 +47,7 @@ use task::GreenTask; mod macros; mod simple; +mod message_queue; pub mod basic; pub mod context; diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs new file mode 100644 index 0000000000000..3a118476affb7 --- /dev/null +++ b/src/libgreen/message_queue.rs @@ -0,0 +1,61 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use mpsc = std::sync::mpsc_queue; +use std::sync::arc::UnsafeArc; + +pub enum PopResult { + Inconsistent, + Empty, + Data(T), +} + +pub fn queue() -> (Consumer, Producer) { + let (a, b) = UnsafeArc::new2(mpsc::Queue::new()); + (Consumer { inner: a }, Producer { inner: b }) +} + +pub struct Producer { + priv inner: UnsafeArc>, +} + +pub struct Consumer { + priv inner: UnsafeArc>, +} + +impl Consumer { + pub fn pop(&mut self) -> PopResult { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => Inconsistent, + mpsc::Empty => Empty, + mpsc::Data(t) => Data(t), + } + } + + pub fn casual_pop(&mut self) -> Option { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => None, + mpsc::Empty => None, + mpsc::Data(t) => Some(t), + } + } +} + +impl Producer { + pub fn push(&mut self, t: T) { + unsafe { (*self.inner.get()).push(t); } + } +} + +impl Clone for Producer { + fn clone(&self) -> Producer { + Producer { inner: self.inner.clone() } + } +} diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index b0b88e4be7936..20eca48cff9e5 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -17,7 +17,6 @@ use std::rt::task::Task; use std::sync::deque; use std::unstable::mutex::Mutex; use std::unstable::raw; -use mpsc = std::sync::mpsc_queue; use TaskState; use context::Context; @@ -25,6 +24,7 @@ use coroutine::Coroutine; use sleeper_list::SleeperList; use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; +use msgq = message_queue; /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified @@ -47,9 +47,9 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - message_queue: mpsc::Consumer, + message_queue: msgq::Consumer, /// Producer used to clone sched handles from - message_producer: mpsc::Producer, + message_producer: msgq::Producer, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -143,7 +143,7 @@ impl Scheduler { state: TaskState) -> Scheduler { - let (consumer, producer) = mpsc::queue(()); + let (consumer, producer) = msgq::queue(); let mut sched = Scheduler { pool_id: pool_id, sleeper_list: sleeper_list, @@ -215,7 +215,7 @@ impl Scheduler { // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(match message { mpsc::Empty => true, _ => false }); + rtassert!(match message { msgq::Empty => true, _ => false }); stask.task.get_mut_ref().destroyed = true; } @@ -340,8 +340,8 @@ impl Scheduler { // // I have chosen to take route #2. match self.message_queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty | mpsc::Inconsistent => None + msgq::Data(t) => Some(t), + msgq::Empty | msgq::Inconsistent => None } }; @@ -849,7 +849,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: mpsc::Producer, + priv queue: msgq::Producer, sched_id: uint } diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 32f8d8532a209..dc18c794d237a 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -24,6 +24,7 @@ use std::cast; use std::libc::{c_void, c_int}; use std::rt::task::BlockedTask; use std::unstable::sync::LittleLock; +use std::sync::arc::UnsafeArc; use mpsc = std::sync::mpsc_queue; use async::AsyncWatcher; @@ -39,46 +40,46 @@ enum Message { struct State { handle: *uvll::uv_async_t, lock: LittleLock, // see comments in async_cb for why this is needed + queue: mpsc::Queue, } /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { - priv producer: mpsc::Producer, - priv consumer: mpsc::Consumer, + priv queue: UnsafeArc, priv refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { - priv queue: mpsc::Producer, + priv queue: UnsafeArc, } extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { assert_eq!(status, 0); - let state: &mut QueuePool = unsafe { + let pool: &mut QueuePool = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - let packet = unsafe { state.consumer.packet() }; + let state: &mut State = unsafe { cast::transmute(pool.queue.get()) }; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the // entire queue in a loop. loop { - match state.consumer.pop() { + match state.queue.pop() { mpsc::Data(Task(task)) => { task.wake().map(|t| t.reawaken(true)); } mpsc::Data(Increment) => unsafe { - if state.refcnt == 0 { - uvll::uv_ref((*packet).handle); + if pool.refcnt == 0 { + uvll::uv_ref(state.handle); } - state.refcnt += 1; + pool.refcnt += 1; }, mpsc::Data(Decrement) => unsafe { - state.refcnt -= 1; - if state.refcnt == 0 { - uvll::uv_unref((*packet).handle); + pool.refcnt -= 1; + if pool.refcnt == 0 { + uvll::uv_unref(state.handle); } }, mpsc::Empty | mpsc::Inconsistent => break @@ -99,9 +100,9 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { // If we acquire the mutex here, then we are guaranteed that there are no // longer any senders which are holding on to their handles, so we can // safely allow the event loop to exit. - if state.refcnt == 0 { + if pool.refcnt == 0 { unsafe { - let _l = (*packet).lock.lock(); + let _l = state.lock.lock(); } } } @@ -109,14 +110,14 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { impl QueuePool { pub fn new(loop_: &mut Loop) -> ~QueuePool { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); - let (c, p) = mpsc::queue(State { + let state = UnsafeArc::new(State { handle: handle, lock: LittleLock::new(), + queue: mpsc::Queue::new(), }); let q = ~QueuePool { - producer: p, - consumer: c, refcnt: 0, + queue: state, }; unsafe { @@ -132,23 +133,23 @@ impl QueuePool { pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { - uvll::uv_ref((*self.producer.packet()).handle); + uvll::uv_ref((*self.queue.get()).handle); } self.refcnt += 1; } - Queue { queue: self.producer.clone() } + Queue { queue: self.queue.clone() } } pub fn handle(&self) -> *uvll::uv_async_t { - unsafe { (*self.producer.packet()).handle } + unsafe { (*self.queue.get()).handle } } } impl Queue { pub fn push(&mut self, task: BlockedTask) { - self.queue.push(Task(task)); unsafe { - uvll::uv_async_send((*self.queue.packet()).handle); + (*self.queue.get()).queue.push(Task(task)); + uvll::uv_async_send((*self.queue.get()).handle); } } } @@ -161,7 +162,7 @@ impl Clone for Queue { // and if the queue is dropped later on it'll see the increment for the // decrement anyway. unsafe { - cast::transmute_mut(self).queue.push(Increment); + (*self.queue.get()).queue.push(Increment); } Queue { queue: self.queue.clone() } } @@ -172,9 +173,9 @@ impl Drop for Queue { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { - let state = self.queue.packet(); + let state = self.queue.get(); let _l = (*state).lock.lock(); - self.queue.push(Decrement); + (*state).queue.push(Decrement); uvll::uv_async_send((*state).handle); } } diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index bf37e5fca6a5f..c5cfea28fab77 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -227,7 +227,6 @@ use cast; use clone::Clone; -use container::Container; use int; use iter::Iterator; use kinds::Send; @@ -237,6 +236,7 @@ use result::{Ok, Err}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use rt::thread::Thread; +use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed}; use vec::OwnedVector; @@ -272,24 +272,6 @@ macro_rules! test ( mod select; -/////////////////////////////////////////////////////////////////////////////// -// Helper type to abstract ports for channels and shared channels -/////////////////////////////////////////////////////////////////////////////// - -enum Consumer { - SPSC(spsc::Consumer), - MPSC(mpsc::Consumer), -} - -impl Consumer{ - unsafe fn packet(&self) -> *mut Packet { - match *self { - SPSC(ref c) => c.packet(), - MPSC(ref c) => c.packet(), - } - } -} - /////////////////////////////////////////////////////////////////////////////// // Public structs /////////////////////////////////////////////////////////////////////////////// @@ -298,7 +280,7 @@ impl Consumer{ /// one task #[no_freeze] // can't share ports in an arc pub struct Port { - priv queue: Consumer, + priv inner: PortInner, } /// An iterator over messages received on a port, this iterator will block @@ -312,7 +294,7 @@ pub struct PortIterator<'a, T> { /// task #[no_freeze] // can't share chans in an arc pub struct Chan { - priv queue: spsc::Producer, + priv inner: UnsafeArc>, } /// The sending-half of Rust's channel type. This half can be shared among many @@ -320,23 +302,33 @@ pub struct Chan { #[no_freeze] // technically this implementation is shareable, but it shouldn't // be required to be shareable in an arc pub struct SharedChan { - priv queue: mpsc::Producer, + priv inner: UnsafeArc>, } /////////////////////////////////////////////////////////////////////////////// // Internal struct definitions /////////////////////////////////////////////////////////////////////////////// +enum PortInner { + SingleInner(UnsafeArc>), + SharedInner(UnsafeArc>), +} + +struct Stream { + queue: spsc::Queue, + packet: Packet, +} + +struct Shared { + queue: mpsc::Queue, + packet: Packet, +} + struct Packet { cnt: AtomicInt, // How many items are on this channel steals: int, // How many times has a port received without blocking? to_wake: Option, // Task to wake up - // This lock is used to wake up native threads blocked in select. The - // `lock` field is not used because the thread blocking in select must - // block on only one mutex. - //selection_lock: Option>, - // The number of channels which are currently using this packet. This is // used to reference count shared channels. channels: AtomicInt, @@ -346,6 +338,11 @@ struct Packet { select_next: *mut Packet, select_prev: *mut Packet, recv_cnt: int, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + go_home: AtomicBool, + sender_drain: AtomicInt, } /////////////////////////////////////////////////////////////////////////////// @@ -355,6 +352,15 @@ struct Packet { static DISCONNECTED: int = int::min_value; static RESCHED_FREQ: int = 200; +impl PortInner { + fn packet<'a>(&'a mut self) -> &'a mut Packet { + match *self { + SingleInner(ref arc) => unsafe { &mut (*arc.get()).packet }, + SharedInner(ref arc) => unsafe { &mut (*arc.get()).packet }, + } + } +} + impl Packet { fn new() -> Packet { Packet { @@ -368,6 +374,9 @@ impl Packet { select_next: 0 as *mut Packet, select_prev: 0 as *mut Packet, recv_cnt: 0, + + go_home: AtomicBool::new(false), + sender_drain: AtomicInt::new(0), } } @@ -528,9 +537,11 @@ impl Chan { pub fn new() -> (Port, Chan) { // arbitrary 128 size cache -- this is just a max cache size, not a // maximum buffer size - let (c, p) = spsc::queue(128, Packet::new()); - let c = SPSC(c); - (Port { queue: c }, Chan { queue: p }) + let (a, b) = UnsafeArc::new2(Stream { + queue: spsc::Queue::new(128), + packet: Packet::new(), + }); + (Port { inner: SingleInner(a) }, Chan { inner: b }) } /// Sends a value along this channel to be received by the corresponding @@ -579,16 +590,36 @@ impl Chan { fn try(&self, t: T, can_resched: bool) -> bool { unsafe { - let this = cast::transmute_mut(self); - this.queue.push(t); - let packet = this.queue.packet(); - match (*packet).increment() { + let inner = self.inner.get(); + + // See the discussion in Port::drop for what's going on here + if (*inner).packet.go_home.load(Relaxed) { return false } + + (*inner).queue.push(t); + match (*inner).packet.increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(can_resched); true } + -1 => { (*inner).packet.wakeup(can_resched); true } // Also as above, SPSC queues must be >= -2 -2 => true, - // We succeeded if we sent data - DISCONNECTED => this.queue.is_empty(), + + DISCONNECTED => { + // After the go_home check, the port could have been + // dropped, so we need to be sure to drain the queue here + // (we own the queue now that the port is gone). Note that + // it is only possible for there to be one item in the queue + // because the port ensures that there is 0 data in the + // queue when it flags disconnected, and we could have only + // pushed on one more item + let first = (*inner).queue.pop(); + let second = (*inner).queue.pop(); + assert!(second.is_none()); + + match first { + Some(..) => false, // we failed to send the data + None => true, // we successfully sent data + } + } + // In order to prevent starvation of other tasks in situations // where a task sends repeatedly without ever receiving, we // occassionally yield instead of doing a send immediately. @@ -613,7 +644,7 @@ impl Chan { #[unsafe_destructor] impl Drop for Chan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -623,9 +654,11 @@ impl SharedChan { /// same time. All data sent on any channel will become available on the /// provided port as well. pub fn new() -> (Port, SharedChan) { - let (c, p) = mpsc::queue(Packet::new()); - let c = MPSC(c); - (Port { queue: c }, SharedChan { queue: p }) + let (a, b) = UnsafeArc::new2(Shared { + queue: mpsc::Queue::new(), + packet: Packet::new(), + }); + (Port { inner: SharedInner(a) }, SharedChan { inner: b }) } /// Equivalent method to `send` on the `Chan` type (using the same @@ -640,6 +673,10 @@ impl SharedChan { /// semantics) pub fn try_send(&self, t: T) -> bool { unsafe { + let inner = self.inner.get(); + // See Port::drop for what's going on + if (*inner).packet.go_home.load(Relaxed) { return false } + // Note that the multiple sender case is a little tricker // semantically than the single sender case. The logic for // incrementing is "add and if disconnected store disconnected". @@ -665,17 +702,49 @@ impl SharedChan { // preflight check serves as the definitive "this will never be // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" - let packet = self.queue.packet(); - if (*packet).cnt.load(Relaxed) < DISCONNECTED + 1024 { + if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 { return false } - let this = cast::transmute_mut(self); - this.queue.push(t); + (*inner).queue.push(t); + match (*inner).packet.increment() { + -1 => { (*inner).packet.wakeup(true); } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + 1024 => { + if (*inner).packet.sender_drain.fetch_add(1, SeqCst) == 0 { + loop { + // drain the queue + loop { + match (*inner).queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if (*inner).packet.sender_drain.compare_and_swap( + 1, 0, SeqCst) == 1 { + break + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. + } + } - match (*packet).increment() { - DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(true); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); @@ -690,15 +759,15 @@ impl SharedChan { impl Clone for SharedChan { fn clone(&self) -> SharedChan { - unsafe { (*self.queue.packet()).channels.fetch_add(1, SeqCst); } - SharedChan { queue: self.queue.clone() } + unsafe { (*self.inner.get()).packet.channels.fetch_add(1, SeqCst); } + SharedChan { inner: self.inner.clone() } } } #[unsafe_destructor] impl Drop for SharedChan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -750,18 +819,18 @@ impl Port { // See the comment about yielding on sends, but the same applies here. // If a thread is spinning in try_recv we should try - unsafe { - let packet = this.queue.packet(); - (*packet).recv_cnt += 1; - if (*packet).recv_cnt % RESCHED_FREQ == 0 { + { + let packet = this.inner.packet(); + packet.recv_cnt += 1; + if packet.recv_cnt % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } } - let ret = match this.queue { - SPSC(ref mut queue) => queue.pop(), - MPSC(ref mut queue) => match queue.pop() { + let ret = match this.inner { + SingleInner(ref mut arc) => unsafe { (*arc.get()).queue.pop() }, + SharedInner(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => Some(t), mpsc::Empty => None, @@ -794,7 +863,7 @@ impl Port { let data; loop { Thread::yield_now(); - match queue.pop() { + match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => { data = t; break } mpsc::Empty => fail!("inconsistent => empty"), mpsc::Inconsistent => {} @@ -805,7 +874,7 @@ impl Port { } }; if increment && ret.is_some() { - unsafe { (*this.queue.packet()).steals += 1; } + this.inner.packet().steals += 1; } return ret; } @@ -830,7 +899,7 @@ impl Port { let this; unsafe { this = cast::transmute_mut(self); - packet = this.queue.packet(); + packet = this.inner.packet(); let task: ~Task = Local::take(); task.deschedule(1, |task| { assert!((*packet).to_wake.is_none()); @@ -845,8 +914,8 @@ impl Port { let data = self.try_recv_inc(false); if data.is_none() && - unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED { - fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) }); + packet.cnt.load(SeqCst) != DISCONNECTED { + fail!("bug: woke up too soon {}", packet.cnt.load(SeqCst)); } return data; } @@ -865,12 +934,69 @@ impl<'a, T: Send> Iterator for PortIterator<'a, T> { #[unsafe_destructor] impl Drop for Port { fn drop(&mut self) { - // All we need to do is store that we're disconnected. If the channel - // half has already disconnected, then we'll just deallocate everything - // when the shared packet is deallocated. - unsafe { - (*self.queue.packet()).cnt.store(DISCONNECTED, SeqCst); + // Dropping a port seems like a fairly trivial thing. In theory all we + // need to do is flag that we're disconnected and then everything else + // can take over (we don't have anyone to wake up). + // + // The catch for Ports is that we want to drop the entire contents of + // the queue. There are multiple reasons for having this property, the + // largest of which is that if another port is waiting in this channel + // (but not received yet), then waiting on that port will cause a + // deadlock. + // + // So if we accept that we must now destroy the entire contents of the + // queue, this code may make a bit more sense. The tricky part is that + // we can't let any in-flight sends go un-dropped, we have to make sure + // *everything* is dropped and nothing new will come onto the channel. + + // The first thing we do is set a flag saying that we're done for. All + // sends are gated on this flag, so we're immediately guaranteed that + // there are a bounded number of active sends that we'll have to deal + // with. + self.inner.packet().go_home.store(true, Relaxed); + + // Now that we're guaranteed to deal with a bounded number of senders, + // we need to drain the queue. This draining process happens atomically + // with respect to the "count" of the channel. If the count is nonzero + // (with steals taken into account), then there must be data on the + // channel. In this case we drain everything and then try again. We will + // continue to fail while active senders send data while we're dropping + // data, but eventually we're guaranteed to break out of this loop + // (because there is a bounded number of senders). + let mut steals = self.inner.packet().steals; + while { + let cnt = self.inner.packet().cnt.compare_and_swap( + steals, DISCONNECTED, SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + match self.inner { + SingleInner(ref mut arc) => { + loop { + match unsafe { (*arc.get()).queue.pop() } { + Some(..) => { steals += 1; } + None => break + } + } + } + SharedInner(ref mut arc) => { + // See the discussion in 'try_recv_inc' for why we yield + // control of this thread. + loop { + match unsafe { (*arc.get()).queue.pop() } { + mpsc::Data(..) => { steals += 1; } + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + } + } } + + // At this point in time, we have gated all future senders from sending, + // and we have flagged the channel as being disconnected. The senders + // still have some responsibility, however, because some sends may not + // complete until after we flag the disconnection. There are more + // details in the sending methods that see DISCONNECTED } } @@ -1321,4 +1447,24 @@ mod test { drop(chan); assert_eq!(count_port.recv(), 4); }) + + test!(fn pending_dropped() { + let (a, b) = Chan::new(); + let (c, d) = Chan::<()>::new(); + b.send(d); + drop(a); + c.recv(); + } #[should_fail]) + + test!(fn pending_dropped_stress() { + let (a, b) = Chan::new(); + let (c, d) = Chan::new(); + do spawn { + while b.try_send(~3) { continue } + d.send(()); + } + a.recv(); + drop(a); + c.recv(); + }) } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 302c9d9ea469b..f2a79246df1bf 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -125,18 +125,18 @@ impl Select { let id = this.next_id; this.next_id += 1; unsafe { - let packet = port.queue.packet(); + let packet = port.inner.packet(); assert!(!(*packet).selecting.load(Relaxed)); assert_eq!((*packet).selection_id, 0); (*packet).selection_id = id; if this.head.is_null() { - this.head = packet; - this.tail = packet; + this.head = packet as *mut Packet; + this.tail = packet as *mut Packet; } else { (*packet).select_prev = this.tail; assert!((*packet).select_next.is_null()); - (*this.tail).select_next = packet; - this.tail = packet; + (*this.tail).select_next = packet as *mut Packet; + this.tail = packet as *mut Packet; } } Handle { id: id, selector: this, port: port } @@ -293,7 +293,7 @@ impl Drop for Select { #[unsafe_destructor] impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { - unsafe { self.selector.remove(self.port.queue.packet()) } + unsafe { self.selector.remove(self.port.inner.packet()) } } } diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index a249d6ed2e8ce..eef63d759021f 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -39,12 +39,10 @@ // /queues/non-intrusive-mpsc-node-based-queue use cast; -use clone::Clone; use kinds::Send; use ops::Drop; use option::{Option, None, Some}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; /// A result of the `pop` function. @@ -65,40 +63,12 @@ struct Node { value: Option, } -struct State { - head: AtomicPtr>, - tail: *mut Node, - packet: P, -} - -/// The consumer half of this concurrent queue. This half is used to receive -/// data from the producers. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// The production half of the concurrent queue. This handle may be cloned in -/// order to make handles for new producers. -pub struct Producer { - priv state: UnsafeArc>, -} - -impl Clone for Producer { - fn clone(&self) -> Producer { - Producer { state: self.state.clone() } - } -} - -/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" -/// of information which will be shared by the consumer and the producer which -/// can be re-acquired via the `packet` function. This is helpful when extra -/// state is shared between the producer and consumer, but note that there is no -/// synchronization performed of this data. -pub fn queue(p: P) -> (Consumer, Producer) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue { + priv head: AtomicPtr>, + priv tail: *mut Node, } impl Node { @@ -110,67 +80,26 @@ impl Node { } } -impl State { - unsafe fn new(p: P) -> State { - let stub = Node::new(None); - State { +impl Queue { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: stub, - packet: p, } } - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); - } - - unsafe fn pop(&mut self) -> PopResult { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node = cast::transmute(tail); - return Data(ret); - } - - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} - } -} - -#[unsafe_destructor] -impl Drop for State { - fn drop(&mut self) { + /// Pushes a new value onto this queue. + pub fn push(&mut self, t: T) { unsafe { - let mut cur = self.tail; - while !cur.is_null() { - let next = (*cur).next.load(Relaxed); - let _: ~Node = cast::transmute(cur); - cur = next; - } + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); } } -} - -impl Producer { - /// Pushes a new value onto this queue. - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - /// Gets an unsafe pointer to the user-defined packet shared by the - /// producers and the consumer. Note that care must be taken to ensure that - /// the lifetime of the queue outlives the usage of the returned pointer. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} -impl Consumer { /// Pops some data from this queue. /// /// Note that the current implementation means that this function cannot @@ -182,8 +111,23 @@ impl Consumer { /// This inconsistent state means that this queue does indeed have data, but /// it does not currently have access to it at this time. pub fn pop(&mut self) -> PopResult { - unsafe { (*self.state.get()).pop() } + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node = cast::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + } } + /// Attempts to pop data from this queue, but doesn't attempt too hard. This /// will canonicalize inconsistent states to a `None` value. pub fn casual_pop(&mut self) -> Option { @@ -192,10 +136,19 @@ impl Consumer { Empty | Inconsistent => None, } } - /// Gets an unsafe pointer to the underlying user-defined packet. See - /// `Producer.packet` for more information. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +} + +#[unsafe_destructor] +impl Drop for Queue { + fn drop(&mut self) { + unsafe { + let mut cur = self.tail; + while !cur.is_null() { + let next = (*cur).next.load(Relaxed); + let _: ~Node = cast::transmute(cur); + cur = next; + } + } } } @@ -203,34 +156,35 @@ impl Consumer { mod tests { use prelude::*; - use super::{queue, Data, Empty, Inconsistent}; use native; + use super::{Queue, Data, Empty, Inconsistent}; + use sync::arc::UnsafeArc; #[test] fn test_full() { - let (_, mut p) = queue(()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(); + q.push(~1); + q.push(~2); } #[test] fn test() { let nthreads = 8u; let nmsgs = 1000u; - let (mut c, p) = queue(()); - match c.pop() { + let mut q = Queue::new(); + match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() } let (port, chan) = SharedChan::new(); + let q = UnsafeArc::new(q); for _ in range(0, nthreads) { - let q = p.clone(); let chan = chan.clone(); + let q = q.clone(); do native::task::spawn { - let mut q = q; for i in range(0, nmsgs) { - q.push(i); + unsafe { (*q.get()).push(i); } } chan.send(()); } @@ -238,11 +192,12 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match c.pop() { + match unsafe { (*q.get()).pop() } { Empty | Inconsistent => {}, Data(_) => { i += 1 } } } + drop(chan); for _ in range(0, nthreads) { port.recv(); } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 6f1b887c27156..18e6cf1b6e444 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -38,7 +38,6 @@ use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send @@ -50,75 +49,25 @@ struct Node { next: AtomicPtr>, // next node in the queue } -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State { +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from - tail_prev: AtomicPtr>, // where to pop from + priv tail: *mut Node, // where to pop from + priv tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + priv head: *mut Node, // where to push to + priv first: *mut Node, // where to get new nodes from + priv tail_copy: *mut Node, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -/// Producer half of this queue. This handle is used to push data to the -/// consumer. -pub struct Producer { - priv state: UnsafeArc>, -} - -/// Consumer half of this queue. This handle is used to receive data from the -/// producer. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// Creates a new queue. The producer returned is connected to the consumer to -/// push all data to the consumer. -/// -/// # Arguments -/// -/// * `bound` - This queue implementation is implemented with a linked list, -/// and this means that a push is always a malloc. In order to -/// amortize this cost, an internal cache of nodes is maintained -/// to prevent a malloc from always being necessary. This bound is -/// the limit on the size of the cache (if desired). If the value -/// is 0, then the cache has no bound. Otherwise, the cache will -/// never grow larger than `bound` (although the queue itself -/// could be much larger. -/// -/// * `p` - This is the user-defined packet of data which will also be shared -/// between the producer and consumer. -pub fn queue(bound: uint, - p: P) -> (Consumer, Producer) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) + priv cache_bound: uint, + priv cache_additions: AtomicUint, + priv cache_subtractions: AtomicUint, } impl Node { @@ -132,49 +81,49 @@ impl Node { } } -impl Producer { - /// Pushes data onto the queue - pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - /// Tests whether the queue is empty. Note that if this function returns - /// `false`, the return value is significant, but if the return value is - /// `true` then almost no meaning can be attached to the return value. - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - /// Acquires an unsafe pointer to the underlying user-defined packet. Note - /// that care must be taken to ensure that the queue outlives the usage of - /// the packet (because it is an unsafe pointer). - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl Consumer { - /// Pops some data from this queue, returning `None` when the queue is - /// empty. - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } - } - /// Same function as the producer's `packet` method. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P +impl Queue { + /// Creates a new queue. The producer returned is connected to the consumer + /// to push all data to the consumer. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub fn new(bound: uint) -> Queue { + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + Queue { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } } -} -impl State { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. + pub fn push(&mut self, t: T) { + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } } unsafe fn alloc(&mut self) -> *mut Node { @@ -208,50 +157,46 @@ impl State { Node::new() } - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); - - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // XXX: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&mut self) -> Option { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); - if size < self.cache_bound { + self.tail = next; + if self.cache_bound == 0 { self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node = cast::transmute(tail); + // XXX: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node = cast::transmute(tail); + } } + return ret; } - return ret; - } - - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); } } #[unsafe_destructor] -impl Drop for State { +impl Drop for Queue { fn drop(&mut self) { unsafe { let mut cur = self.first; @@ -267,44 +212,45 @@ impl Drop for State { #[cfg(test)] mod test { use prelude::*; - use super::queue; use native; + use super::Queue; + use sync::arc::UnsafeArc; #[test] fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(0); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(0); + q.push(~1); + q.push(~2); } #[test] fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(1); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] @@ -313,13 +259,12 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); + let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); do native::task::spawn { - let mut c = c; for _ in range(0, 100000) { loop { - match c.pop() { + match unsafe { (*b.get()).pop() } { Some(1) => break, Some(_) => fail!(), None => {} @@ -329,7 +274,7 @@ mod test { chan.send(()); } for _ in range(0, 100000) { - p.push(1); + unsafe { (*a.get()).push(1); } } port.recv(); }