diff --git a/source/tristanable/queue.d b/source/tristanable/queue.d index 28764d4..00cddd9 100644 --- a/source/tristanable/queue.d +++ b/source/tristanable/queue.d @@ -8,7 +8,7 @@ import core.sync.condition : Condition; import core.sync.exception : SyncError; import std.container.slist : SList; import tristanable.encoding; -import core.thread : dur; +import core.time : Duration, dur; import tristanable.exceptions; version(unittest) @@ -50,6 +50,15 @@ public class Queue */ private ulong queueID; + /** + * If a message is enqueued prior + * to us sleeping then we won't + * wake up and return for it. + * + * Therefore a periodic wakeup + * is required. + */ + private Duration wakeInterval; /** * Constructs a new Queue and immediately sets up the notification @@ -71,6 +80,31 @@ public class Queue /* Set the queue id */ this.queueID = queueID; + + /* Set the slumber interval */ + this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value + } + + /** + * Returns the current wake interval + * for the queue checker + * + * Returns: the `Duration` + */ + public Duration getWakeInterval() + { + return this.wakeInterval; + } + + /** + * Sets the wake up interval + * + * Params: + * interval = the new interval + */ + public void setWakeInterval(Duration interval) + { + this.wakeInterval = interval; } /** @@ -152,45 +186,25 @@ public class Queue /* Block till we dequeue a message successfully */ while(dequeuedMessage is null) { - /** - * Call `wait()` and catch any interrupts - * in which case loop back and call `wait()` - * again - */ - while(true) + scope(exit) { - try - { - // TODO: Make us wait on the event (optional with a time-out) - event.wait(); - } - catch(InterruptedException e) - { - version(unittest) - { - import std.stdio; - writeln("dequeue() had libsnooze wait() get interrupted!"); - } - - // Retry the wait() - continue; - } - catch(FatalException fatalErr) - { - version(unittest) - { - import std.stdio; - writeln("dequeue() had libsnooze wait() get FATALLY fail! Exception will now throw..."); - } - - // Throw an exception on a fatal exception - throw new TristanableException(ErrorType.DEQUEUE_FAILED); - } - - // On successful wait() wake-up exit this wait()-retry loop - break; + // Unlock the mutex + this.mutex.unlock(); } - + + // Lock the mutex + this.mutex.lock(); + + try + { + this.signal.wait(this.wakeInterval); + } + catch(SyncError e) + { + // Throw an exception on a fatal exception + throw new TristanableException(ErrorType.DEQUEUE_FAILED); + } + /* Lock the item queue */ queueLock.lock();