diff --git a/source/tristanable/encoding.d b/source/tristanable/encoding.d new file mode 100644 index 0000000..4c860cc --- /dev/null +++ b/source/tristanable/encoding.d @@ -0,0 +1,64 @@ +module tristanable.encoding; + +/** + * Represents a tagged message that has been decoded + * from its raw byte encoding, this is a tuple of + * a numeric tag and a byte array of payload data + * + * Also provides a static method to decode from such + * raw encoding and an instance method to do the reverse + */ +public final class TaggedMessage +{ + private ulong tag; + private byte[] data; + + this(ulong tag, byte[] data) + { + this.tag = tag; + this.data = data; + } + + public static TaggedMessage decode(byte[] encodedMessage) + { + TaggedMessage decodedMessage; + + // TODO: Implement me + + return decodedMessage; + } + + public byte[] encode() + { + byte[] encodedMessage; + + + return encodedMessage; + } + + public byte[] getPayload() + { + return data; + } + + public ulong getTag() + { + return tag; + } + + public void setPayload(byte[] newPayload) + { + this.data = newPayload; + } + + public void setTag(ulong newTag) + { + this.tag = newTag; + } +} + +unittest +{ + // TODO: Test encoding + // TODO: Test decoding +} \ No newline at end of file diff --git a/source/tristanable/exceptions.d b/source/tristanable/exceptions.d new file mode 100644 index 0000000..0e7926c --- /dev/null +++ b/source/tristanable/exceptions.d @@ -0,0 +1,15 @@ +module tristanable.exceptions; + +public enum Error +{ + QueueExists +} + +public class TristanableException : Exception +{ + this(Error err) + { + // TODO: Do this + super("TODO: Do this"); + } +} \ No newline at end of file diff --git a/source/tristanable/manager.d b/source/tristanable/manager.d index e3b6c37..bf6f85a 100644 --- a/source/tristanable/manager.d +++ b/source/tristanable/manager.d @@ -1,15 +1,55 @@ +/** + * Management of a tristanable instance + */ module tristanable.manager; +import std.socket; import tristanable.queue : Queue; +import core.sync.mutex : Mutex; /** - * Allows one to add new queues, control - * existing ones by waiting on them etc + * Manages a provided socket by spawning + * a watcher thread to read from it and file + * mail into the corresponding queues. + * + * Queues are managed via this an instance + * of a manager. */ public class Manager { - /* Queues */ - private Queue[] queues; + /** + * The underlying socket to read from + */ + private Socket socket; - + /** + * Currently registered queues + * + * NOTE: Make a ulong map to this later + */ + private Queue[] queues; + private Mutex queuesLock; + + /** + * Constructs a new manager which will read from + * this socket and file mail for us + * + * Params: + * socket = the underlying socket to use + */ + this(Socket socket) + { + this.socket = socket; + this.queuesLock = new Mutex(); + } + + + public void registerQueue(Queue queue) + { + // TODO: Lock queue + + // TODO: Insert queue only if non-existent, else throw an exception + + // TODO: Unlock queue + } } \ No newline at end of file diff --git a/source/tristanable/package.d b/source/tristanable/package.d index bdbabde..c4f6211 100644 --- a/source/tristanable/package.d +++ b/source/tristanable/package.d @@ -3,6 +3,25 @@ */ module tristanable; - +/** + * Interface which manages a provided socket + * and enqueuing and dequeuing of queues + */ public import tristanable.manager : Manager; -public import tristanable.queue : Queue, QueueItem; \ No newline at end of file + +// TODO: In future make `QueueItem` just `TaggedMessage` +/** + * A queue of queue items all of the same tag + */ +public import tristanable.queue : Queue; + +/** + * A decoded item that is placed on the queue + * for consumption + */ +public import tristanable.queueitem : QueueItem; + +/** + * Error handling type definitions + */ +public import tristanable.exceptions : TristanableException, Error; \ No newline at end of file diff --git a/source/tristanable/queue.d b/source/tristanable/queue.d index cc3e954..61ce8c5 100644 --- a/source/tristanable/queue.d +++ b/source/tristanable/queue.d @@ -1,6 +1,10 @@ module tristanable.queue; +// TODO: Examine the below import which seemingly fixes stuff for libsnooze +import libsnooze.clib; import libsnooze; + +import tristanable.queueitem : QueueItem; import core.sync.mutex : Mutex; public class Queue @@ -13,6 +17,11 @@ public class Queue private QueueItem queue; private Mutex queueLock; + + /** + * This queue's unique ID + */ + private ulong queueID; private this() @@ -26,7 +35,15 @@ public class Queue public void dequeue() { - // TODO: Make us wait on the event (optional with a time-out) + try + { + // TODO: Make us wait on the event (optional with a time-out) + event.wait(); + } + catch(SnoozeError snozErr) + { + // TODO: Add error handling for libsnooze exceptions here + } // TODO: Lock queue queueLock.lock(); @@ -45,9 +62,4 @@ public class Queue return queue; } -} - -public class QueueItem -{ - } \ No newline at end of file diff --git a/source/tristanable/queueitem.d b/source/tristanable/queueitem.d new file mode 100644 index 0000000..4f6337d --- /dev/null +++ b/source/tristanable/queueitem.d @@ -0,0 +1,6 @@ +module tristanable.queueitem; + +public class QueueItem +{ + +} \ No newline at end of file diff --git a/source/tristanable/watcher.d b/source/tristanable/watcher.d new file mode 100644 index 0000000..9550b40 --- /dev/null +++ b/source/tristanable/watcher.d @@ -0,0 +1,37 @@ +module tristanable.watcher; + +import core.thread : Thread; +import tristanable.manager : Manager; +import std.socket; + +/** + * Watches the socket on a thread of its own, + * performs the decoding of the incoming messages + * and places them into the correct queues via + * the associated Manager instance + */ +public class Watcher : Thread +{ + /** + * The associated manager to use + * such that we can place new mail + * into their respective inboxes (queues) + */ + private Manager manager; + + /** + * The underlying socket to read from + */ + private Socket socket; + + + + + private void watch() + { + while(true) + { + // TODO: Implement me + } + } +} \ No newline at end of file