diff --git a/.gitignore b/.gitignore index 1568aed..d16dea1 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,4 @@ docs/ *.lst source/tristanable/queue.d dub.selections.json +tristanable-test-library diff --git a/README.md b/README.md index 8ade50f..f9033d0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -![](https://code.dlang.org/packages/tristanable/logo?s=5ef1c9f1250f57dd4c37efbf) +![](branding/logo_small.png) tristanable =========== @@ -6,19 +6,19 @@ tristanable [![D](https://github.com/deavmi/tristanable/actions/workflows/d.yml/badge.svg)](https://github.com/deavmi/tristanable/actions/workflows/d.yml) ![DUB](https://img.shields.io/dub/v/tristanable?color=%23c10000ff%20&style=flat-square) ![DUB](https://img.shields.io/dub/dt/tristanable?style=flat-square) ![DUB](https://img.shields.io/dub/l/tristanable?style=flat-square) [![Coverage Status](https://coveralls.io/repos/github/deavmi/tristanable/badge.svg?branch=master)](https://coveralls.io/github/deavmi/tristanable?branch=master) -**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own resepctively tagged queues indicated by their _"type"_ or `id`. +**Tristanable** is a library for D-based libraries and applications that need a way to receive variable-length messages of different types (via a `Socket`) and place these messages into their own respectively tagged queues indicated by their _"type"_ or `id`. ## What problems does it solve? ### Human example -Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will inetrpret those bytes as if they were a weather message. +Say now you made a request to a server with a tag `1` and expect a reply with that same tag `1`. Now, for a moment, think about what would happen in a tagless system. You would be expecting a reply, say now the weather report for your area, but what if the server has another thread that writes an instant messenger notification to the server's socket before the weather message is sent? Now you will interpret those bytes as if they were a weather message. -Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irresepctoive of wether (no pun intended) the weather report arrives before the "IM notification" or after. +Tristanable provides a way for you to receive the "IM notification first" but block and dequeue (when it arrives in the queue) for the "weather report". Irrespective of wether (no pun intended) the weather report arrives before the "IM notification" or after. ### Code example -If we wanted to implement the following we would do the following. One note is that instead of waiting on messages of a specific _"type"_ (or rather **tag**), tristanable provides not just a one-message lengthb uffer per tag but infact a full queue per tag, meaning any received message with tag `1` will be enqueued and not dropped after the first message of type `1` is buffered. +If we wanted to implement the following we would do the following. One note is that instead of waiting on messages of a specific _"type"_ (or rather **tag**), tristanable provides not just a one-message length buffer per tag but in fact a full queue per tag, meaning any received message with tag `1` will be enqueued and not dropped after the first message of type `1` is buffered. ```d import tristanable.manager; @@ -42,7 +42,7 @@ manager.addQueue(instantNotification); QueueItem message = weatherQueue.dequeue(); ``` -Surely, there must be some sort of encoding mechanism too? The messages afterall need to be encoded. **No problem!**, we have that sorted: +Surely, there must be some sort of encoding mechanism too? The messages after all need to be encoded. **No problem!**, we have that sorted: ```d import tristanable.encoding; @@ -58,7 +58,7 @@ DataMessage tristanEncoded = new DataMessage(tag, data); socket.send(encodeForSend(tristanEncoded)); ``` -And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](http://deavmi.assigned.network/projects/bformat). +And let tristanable handle it! We even handle the message lengths and everything using another great project [bformat](https://deavmi.assigned.network/projects/bformat). ## Format @@ -67,8 +67,8 @@ And let tristanable handle it! We even handle the message lengths and everything ``` ## Using tristanable in your D project -You can easily add the library (source-based) to your project by running the following command in your -project's root: + +You can easily add the library (source-based) to your project by running the following command in your project's root: ```bash dub add tristanable diff --git a/branding/logo_base.xcf b/branding/logo_base.xcf new file mode 100644 index 0000000..c6e7a0e Binary files /dev/null and b/branding/logo_base.xcf differ diff --git a/branding/logo_small.png b/branding/logo_small.png new file mode 100644 index 0000000..3bbb90d Binary files /dev/null and b/branding/logo_small.png differ diff --git a/branding/logo_small.xcf b/branding/logo_small.xcf new file mode 100644 index 0000000..7e1b1b8 Binary files /dev/null and b/branding/logo_small.xcf differ diff --git a/dub.json b/dub.json index b61459e..3e0a8db 100644 --- a/dub.json +++ b/dub.json @@ -2,12 +2,13 @@ "authors": [ "Tristan B. Velloza Kildaire" ], - "homepage": "https://deavmi.assigned.network/projects/tristanable", "copyright": "Copyright © 2023, Tristan B. Kildaire", "dependencies": { - "libsnooze": "0.3.3" + "bformat": ">=4.1.1", + "niknaks": ">=0.3.0" }, "description": "Tristanable network message queuing framework", + "homepage": "https://deavmi.assigned.network/projects/tristanable", "license": "LGPL-3.0", "name": "tristanable", "targetType": "library" diff --git a/source/tristanable/encoding.d b/source/tristanable/encoding.d new file mode 100644 index 0000000..6fec612 --- /dev/null +++ b/source/tristanable/encoding.d @@ -0,0 +1,168 @@ +/** + * Encoding/decoding of the tristanable format + */ +module tristanable.encoding; + +import std.conv : to; +import niknaks.bits : bytesToIntegral, Order, order, toBytes; + +/** + * Represents a tagged message that has been decoded + * from its raw byte encoding, this is a tuple of + * a numeric tag and a byte array of payload data + * + * Also provides a static method to decode from such + * raw encoding and an instance method to do the reverse + */ +public final class TaggedMessage +{ + /** + * This message's tag + */ + private ulong tag; + + /** + * The payload + */ + private byte[] data; + + /** + * Constructs a new TaggedMessage with the given tag and payload + * + * Params: + * tag = the tag to use + * data = the payload + */ + this(ulong tag, byte[] data) + { + this.tag = tag; + this.data = data; + } + + /** + * Parameterless constructor used for decoder + */ + private this() {} + + /** + * Decodes the wire-formatted tristanable bytes into an instance + * of TaggedMessage whereby the tag and data can be seperately + * accessed and manipulated + * + * Params: + * encodedMessage = the wire-format encoded bytes + * Returns: an instance of TaggedMessage + */ + public static TaggedMessage decode(byte[] encodedMessage) + { + /* The decoded message */ + TaggedMessage decodedMessage = new TaggedMessage(); + + /* The decoded tag */ + ulong decodedTag; + + /* Take ulong-many bytes and only flip them to LE if not on LE host */ + decodedTag = order(bytesToIntegral!(ushort)(cast(ubyte[])encodedMessage), Order.LE); + + + /* Set the tag */ + decodedMessage.setTag(decodedTag); + + /* Set the data *(9-th byte onwards) */ + decodedMessage.setPayload(encodedMessage[8..$]); + + return decodedMessage; + } + + /** + * Encodes the tagged message into the tristanable + * wire format ready for transmission + * + * Returns: the encoded bytes + */ + public byte[] encode() + { + /* The encoded bytes */ + byte[] encodedMessage; + + /* If on little endian then no re-order, if host is BE flip (the tag) */ + encodedMessage ~= toBytes(order(tag, Order.LE)); + + + /* Tack on the data */ + encodedMessage ~= data; + + return encodedMessage; + } + + /** + * Get the message's payload + * + * Returns: the payload + */ + public byte[] getPayload() + { + return data; + } + + /** + * Get the message's tag + * + * Returns: the tag + */ + public ulong getTag() + { + return tag; + } + + /** + * Set the message's payload + * + * Params: + * newPayload = the payload to use + */ + public void setPayload(byte[] newPayload) + { + this.data = newPayload; + } + + /** + * Set the message's tag + * + * Params: + * newTag = the tag to use + */ + public void setTag(ulong newTag) + { + this.tag = newTag; + } + + /** + * Returns a string representation of the TaggedMessage + * + * Returns: the string represenation + */ + public override string toString() + { + return "TMessage [Tag: "~to!(string)(tag)~", Payload: "~to!(string)(data)~"]"; + } +} + +/** + * Test encoding and decoding + */ +unittest +{ + /* Setup testing data */ + TaggedMessage testData = new TaggedMessage(420, [1,2,3]); + + /* Encode */ + byte[] encoded = testData.encode(); + + /* Decode */ + TaggedMessage decoded = TaggedMessage.decode(encoded); + + /* Now ensure that `decoded` == original `testData` */ + assert(decoded.getTag() == testData.getTag); + assert(decoded.getPayload() == testData.getPayload()); +} \ No newline at end of file diff --git a/source/tristanable/exceptions.d b/source/tristanable/exceptions.d new file mode 100644 index 0000000..e8d5208 --- /dev/null +++ b/source/tristanable/exceptions.d @@ -0,0 +1,73 @@ +/** + * Error handling type definitions + */ +module tristanable.exceptions; + +import std.conv : to; + +/** + * The type of sub-error of the `TristanableException` + */ +public enum ErrorType +{ + /** + * If the requested queue could not be found + */ + QUEUE_NOT_FOUND, + + /** + * If the queue wanting to be registered has already + * been registered under the same tag + */ + QUEUE_ALREADY_EXISTS, + + /** + * If no default queue is configured + */ + NO_DEFAULT_QUEUE, + + /** + * The blocking call to `dequeue()`, somehow, failed + */ + DEQUEUE_FAILED, + + /** + * The call to `enqueue()`, somehow, failed + */ + ENQUEUE_FAILED +} + +/** + * Any sort of error that occurs during runtime of the tristanable + * engine + */ +public class TristanableException : Exception +{ + /** + * The sub-error type + */ + private ErrorType err; + + /** + * Constructs a new `TristanableException` with the provided + * sub-error type + * + * Params: + * err = the `ErrorType` + */ + this(ErrorType err) + { + super(this.classinfo.name~": "~to!(string)(err)); + this.err = err; + } + + /** + * Retrieve the sub-error type + * + * Returns: the sub-error type as a `ErrorType` + */ + public ErrorType getError() + { + return err; + } +} \ No newline at end of file diff --git a/source/tristanable/manager.d b/source/tristanable/manager.d deleted file mode 100644 index e3b6c37..0000000 --- a/source/tristanable/manager.d +++ /dev/null @@ -1,15 +0,0 @@ -module tristanable.manager; - -import tristanable.queue : Queue; - -/** - * Allows one to add new queues, control - * existing ones by waiting on them etc - */ -public class Manager -{ - /* Queues */ - private Queue[] queues; - - -} \ No newline at end of file diff --git a/source/tristanable/manager/config.d b/source/tristanable/manager/config.d new file mode 100644 index 0000000..44c4f4a --- /dev/null +++ b/source/tristanable/manager/config.d @@ -0,0 +1,34 @@ +/** + * Configuration for the manager + */ +module tristanable.manager.config; + +/** + * Manager parameters + */ +public struct Config +{ + /** + * If set to true then when one uses + * `sendMessage(TaggedMessage)` the + * manager will check if a queue with + * said tag has not been registered + * and if so, then register it for + * us before encoding-and-sending + */ + public bool registerOnSend; +} + +/** + * Generates the default configuration to use for + * the manager + * + * Returns: the Config + */ +public Config defaultConfig() +{ + Config config; + config.registerOnSend = false; + + return config; +} \ No newline at end of file diff --git a/source/tristanable/manager/manager.d b/source/tristanable/manager/manager.d new file mode 100644 index 0000000..014e9db --- /dev/null +++ b/source/tristanable/manager/manager.d @@ -0,0 +1,549 @@ +/** + * Management of a tristanable instance + */ +module tristanable.manager.manager; + +import std.socket; +import tristanable.queue.queue : Queue; +import core.sync.mutex : Mutex; +import tristanable.manager.watcher : Watcher; +import tristanable.encoding : TaggedMessage; +import tristanable.exceptions; +import std.container.slist : SList; +import tristanable.manager.config; +import river.core; +import river.impls.sock : SockStream; +import bformat.client; + +/** + * Manages a provided socket by spawning + * a watcher thread to read from it and file + * mail into the corresponding queues. + * + * Queues are managed via this an instance + * of a manager. + */ +public class Manager +{ + /** + * Configuration + */ + private Config config; + + /** + * The bformat client to read and write from + */ + private BClient bClient; + + /** + * Currently registered queues + */ + private SList!(Queue) queues; + + /** + * Lock for currently registered queues + */ + private Mutex queuesLock; + + /** + * Default queue + */ + private Queue defaultQueue; + + /** + * Watcher which manages the socket and + * enqueues new messages into the respective + * quueue for us + */ + private Watcher watcher; + + /** + * Constructs a new manager which will read from + * this socket and file mail for us + * + * Params: + * stream = the underlying stream to use + */ + this(Stream stream, Config config = defaultConfig()) + { + this.bClient = new BClient(stream); + this.queuesLock = new Mutex(); + this.config = config; + this.watcher = new Watcher(this, bClient); + } + + // TODO: Comment this + // This is for backwards compatibility (whereby a `Socket` was taken in) + this(Socket socket, Config config = defaultConfig()) + { + this(new SockStream(socket), config); + } + + /** + * Starts the management of the socket, + * resulting in queues being updated upon + * reciving messages tagged for them + */ + public void start() + { + watcher.startWatcher(); + } + + /** + * Stops the management of the socket, resulting + * in ending the updating of queues and closing + * the underlying connection + */ + public void stop() + { + watcher.shutdown(); + } + + /** + * Retrieves the queue mathcing the provided id + * + * Params: + * id = the id to lookup by + * Returns: the Queue + * Throws: TristanableException if the queue is not found + */ + public Queue getQueue(ulong id) + { + /* The found queue */ + Queue queue = getQueue_nothrow(id); + + /* If no queue is found then throw an error */ + if(queue is null) + { + throw new TristanableException(ErrorType.QUEUE_NOT_FOUND); + } + + return queue; + } + + /** + * Retrieves the queue mathcing the provided id + * + * This is the nothrow version + * + * Params: + * id = the id to lookup by + * Returns: the Queue if found, null otherwise + */ + public Queue getQueue_nothrow(ulong id) + { + /* The found queue */ + Queue queue; + + /* Lock the queue of queues */ + queuesLock.lock(); + + /* On return or error */ + scope(exit) + { + /* Unlock the queue of queues */ + queuesLock.unlock(); + } + + /* Search for the queue */ + foreach(Queue curQueue; queues) + { + if(curQueue.getID() == id) + { + queue = curQueue; + break; + } + } + + return queue; + } + + /** + * Get a new queue thatis unique in its tag + * (unused/not regustered yet), register it + * and then return it + * + * Returns: the newly registered Queue + */ + public Queue getUniqueQueue() + { + /* The newly created queue */ + Queue uniqueQueue; + + /* Lock the queue of queues */ + queuesLock.lock(); + + /* On return or error */ + scope(exit) + { + /* Unlock the queue of queues */ + queuesLock.unlock(); + } + + // TODO: Throw exception if all tags used + /* The unused tag */ + ulong unusedTag = 0; + + /* Try the current tag and ensure no queue uses it */ + tagLoop: for(ulong curPotentialTag = 0; true; curPotentialTag++) + { + foreach(Queue curQueue; queues) + { + if(curQueue.getID() == curPotentialTag) + { + continue tagLoop; + } + } + + /* Then we have found a unique tag */ + unusedTag = curPotentialTag; + break; + } + + /* Create the queue */ + uniqueQueue = new Queue(unusedTag); + + /* Register it */ + registerQueue(uniqueQueue); + + return uniqueQueue; + } + + /** + * Registers the given queue with the manager + * + * Params: + * queue = the queue to register + * Throws: + * TristanableException if a queue with the provided id already exists + */ + public void registerQueue(Queue queue) + { + /* Try to register the queue */ + bool status = registerQueue_nothrow(queue); + + /* If registration was not successful */ + if(!status) + { + throw new TristanableException(ErrorType.QUEUE_ALREADY_EXISTS); + } + } + + /** + * Registers the given queue with the manager + * + * Params: + * queue = the queue to register + * Returns: true if registration was successful, false otherwise + */ + public bool registerQueue_nothrow(Queue queue) + { + /* Lock the queue of queues */ + queuesLock.lock(); + + /* On return or error */ + scope(exit) + { + /* Unlock the queue of queues */ + queuesLock.unlock(); + } + + /* Search for the queue, throw an exception if it exists */ + foreach(Queue curQueue; queues) + { + if(curQueue.getID() == queue.getID()) + { + /* Registration failed */ + return false; + } + } + + /* Insert the queue as it does not exist */ + queues.insertAfter(queues[], queue); + + /* Registration was a success */ + return true; + } + + /** + * De-registers the given queue from the manager + * + * Params: + * queue = the queue to de-register + * Throws: + * TristanableException if a queue with the provided id cannot be found + */ + public void releaseQueue(Queue queue) + { + /* Try to de-register the queue */ + bool status = releaseQueue_nothrow(queue); + + /* If de-registration was not successful */ + if(!status) + { + throw new TristanableException(ErrorType.QUEUE_NOT_FOUND); + } + } + + /** + * De-registers the given queue from the manager + * + * Params: + * queue = the queue to de-register + * Returns: true if de-registration was successful, false otherwise + */ + public bool releaseQueue_nothrow(Queue queue) + { + /* Lock the queue of queues */ + queuesLock.lock(); + + /* On return or error */ + scope(exit) + { + /* Unlock the queue of queues */ + queuesLock.unlock(); + } + + /* Search for the queue, return false if it does NOT exist */ + foreach(Queue curQueue; queues) + { + if(curQueue.getID() == queue.getID()) + { + /* Remove the queue */ + queues.linearRemoveElement(queue); + + /* De-registration succeeded */ + return true; + } + } + + /* De-registration failed */ + return false; + } + + /** + * Sets the default queue + * + * The default queue, when set/enabled, is the queue that will + * be used to enqueue messages that have a tag which doesn't + * match any of the normally registered queues. + * + * Please note that the ID of the queue passed in here does not + * mean anything in this context; only the queuing facilities + * of the Queue object are used + * + * Params: + * queue = the default queue to use + */ + public void setDefaultQueue(Queue queue) + { + this.defaultQueue = queue; + } + + /** + * Returns the default queue + * + * Returns: the default queue + * Throws: + * TristanableException if there is no default queue + */ + public Queue getDefaultQueue() + { + /* The potential default queue */ + Queue potentialDefaultQueue = getDefaultQueue_nothrow(); + + if(potentialDefaultQueue is null) + { + throw new TristanableException(ErrorType.NO_DEFAULT_QUEUE); + } + + return potentialDefaultQueue; + } + + /** + * Returns the default queue + * + * This is the nothrow version + * + * Returns: the default queue if found, null otherwise + */ + public Queue getDefaultQueue_nothrow() + { + return defaultQueue; + } + + /** + * Sends the provided message over the socket + * + * Params: + * message = the TaggedMessage to send + */ + public void sendMessage(TaggedMessage message) + { + /** + * If a queue with the tag of the message does + * not exist, then register it if the config + * option was enabled + */ + if(config.registerOnSend) + { + /* Create a Queue with the tag */ + Queue createdQueue = new Queue(message.getTag()); + + /* Attempt to register the queue */ + registerQueue_nothrow(createdQueue); + } + + /* Encode the message */ + byte[] encodedMessage = message.encode(); + + /* Send it using bformat (encode-and-send) */ + bClient.sendMessage(encodedMessage); + } +} + + + +// TODO: Fix this, write it in a nicer way +// ... or make a private constructor here that +// ... does not take it in +version(unittest) +{ + Socket nullSock = null; +} + +/** + * Test retrieving a queue which does not + * exist + */ +unittest +{ + /* Create a manager */ + Manager manager = new Manager(nullSock); + + /* Shouldn't be found */ + try + { + manager.getQueue(69); + assert(false); + } + catch(TristanableException e) + { + assert(e.getError() == ErrorType.QUEUE_NOT_FOUND); + } + + /* Shouldn't be found */ + assert(manager.getQueue_nothrow(69) is null); +} + +/** + * Test registering a queue and then fetching it + */ +unittest +{ + /* Create a manager */ + Manager manager = new Manager(nullSock); + + /* Create a new queue with tag 69 */ + Queue queue = new Queue(69); + + try + { + /* Register the queue */ + manager.registerQueue(queue); + + /* Fetch the queue */ + Queue fetchedQueue = manager.getQueue(69); + + /* Ensure the queue we fetched is the one we stored (the references would be equal) */ + assert(fetchedQueue == queue); + } + catch(TristanableException e) + { + assert(false); + } + + /* Should be found */ + assert(manager.getQueue_nothrow(69) !is null); +} + +/** + * Tests registering a queue and then registering + * another queue with the same id + */ +unittest +{ + /* Create a manager */ + Manager manager = new Manager(nullSock); + + /* Create a new queue with tag 69 */ + Queue queue = new Queue(69); + + /* Register the queue */ + manager.registerQueue(queue); + + try + { + /* Register the queue (try again) */ + manager.registerQueue(queue); + + assert(false); + } + catch(TristanableException e) + { + assert(e.getError() == ErrorType.QUEUE_ALREADY_EXISTS); + } +} + +/** + * Tests registering a queue, de-registering it and + * then registering it again + */ +unittest +{ + /* Create a manager */ + Manager manager = new Manager(nullSock); + + /* Create a new queue with tag 69 */ + Queue queue = new Queue(69); + + /* Register the queue */ + manager.registerQueue(queue); + + /* Ensure it is registered */ + assert(queue == manager.getQueue(69)); + + /* De-register the queue */ + manager.releaseQueue(queue); + + /* Ensure it is de-registered */ + assert(manager.getQueue_nothrow(69) is null); + + /* Register the queue (again) */ + manager.registerQueue(queue); + + /* Ensure it is registered (again) */ + assert(queue == manager.getQueue(69)); +} + +/** + * Tests registering a queue using the "next available queue" + * method + */ +unittest +{ + /* Create a manager */ + Manager manager = new Manager(nullSock); + + /* Get the next 3 available queues */ + Queue queue1 = manager.getUniqueQueue(); + Queue queue2 = manager.getUniqueQueue(); + Queue queue3 = manager.getUniqueQueue(); + + /* The queues should have tags [0, 1, 2] respectively */ + assert(queue1.getID() == 0); + assert(queue2.getID() == 1); + assert(queue3.getID() == 2); +} + +// TODO: Add testing for queue existence (internal method) \ No newline at end of file diff --git a/source/tristanable/manager/package.d b/source/tristanable/manager/package.d new file mode 100644 index 0000000..6ae10cb --- /dev/null +++ b/source/tristanable/manager/package.d @@ -0,0 +1,15 @@ +/** + * Interface which manages a provided socket + * and enqueuing and dequeuing of queues + */ +module tristanable.manager; + +/** + * The management facilities + */ +public import tristanable.manager.manager : Manager; + +/** + * Configuration for the manager + */ +public import tristanable.manager.config : Config, defaultConfig; \ No newline at end of file diff --git a/source/tristanable/manager/watcher.d b/source/tristanable/manager/watcher.d new file mode 100644 index 0000000..f07518f --- /dev/null +++ b/source/tristanable/manager/watcher.d @@ -0,0 +1,297 @@ +/** + * Facilitates the reading of messages from the socket, + * decoding thereof and final enqueuing thereof into their + * respective queus + */ +module tristanable.manager.watcher; + +import core.thread : Thread; +import tristanable.manager.manager : Manager; +import std.socket; +import bformat; +import tristanable.encoding; +import tristanable.exceptions; +import tristanable.queue.queue; +import bformat.client; + +/** + * Watches the socket on a thread of its own, + * performs the decoding of the incoming messages + * and places them into the correct queues via + * the associated Manager instance + */ +public class Watcher : Thread +{ + /** + * The associated manager to use + * such that we can place new mail + * into their respective inboxes (queues) + */ + private Manager manager; + + /** + * The BClient to read from + */ + private BClient bClient; + + /** + * Creates a new `Watcher` that is associated + * with the provided `Manager` such that it can + * add to its registered queues. The provided `Socket` + * is such that it can be read from and managed. + * + * Params: + * manager = the `Manager` to associate with + * bclient = the underlying `BClient` to read data from + */ + package this(Manager manager, BClient bClient) + { + this.manager = manager; + this.bClient = bClient; + + super(&watch); + } + + /** + * Starts the underlying thread + */ + package void startWatcher() + { + /* Start the watch method on a new thread */ + start(); + } + + /** + * Watches the socket for incoming messages + * and decodes them on the fly, placing + * the final message in the respective queue + */ + private void watch() + { + import std.stdio; + + while(true) + { + /* Do a bformat read-and-decode */ + byte[] wireTristan; + version(unittest) { writeln("Before bformat recv()"); } + bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read + version(unittest) { writeln("After bformat recv()"); } + version(unittest) { writeln("bformat recv() status: ", recvStatus); } + + if(recvStatus) + { + /* Decode the received bytes into a tagged message */ + TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan); + version(unittest) { writeln("Watcher received: ", decodedMessage); } + + /* Search for the queue with the id provided */ + ulong messageTag = decodedMessage.getTag(); + Queue potentialQueue = manager.getQueue_nothrow(messageTag); + + /* If a queue can be found */ + if(potentialQueue !is null) + { + /* Enqueue the message */ + potentialQueue.enqueue(decodedMessage); + } + /* If the queue if not found */ + else + { + /** + * Look for a default queue, and if one is found + * then enqueue the message there. Otherwise, drop + * it by simply doing nothing. + */ + try + { + potentialQueue = manager.getDefaultQueue(); + + /* Enqueue the message */ + potentialQueue.enqueue(decodedMessage); + } + catch(TristanableException e) {} + } + + version(unittest) { writeln("drip"); } + } + /** + * If there was an error receiving on the socket. + * + * This can be either because we have shut the socket down + * or the remote end has closed the connection. + * + * In any case, exit the loop therefore ending this thread. + */ + else + { + break; + } + } + } + + /** + * Shuts down the watcher, unblocks the blocking read in the loop + * resulting in the watcher thread ending + */ + package void shutdown() + { + /* Closes the bformat reader */ + bClient.close(); + } +} + +/** + * Set up a server which will send some tagged messages to us (the client), + * where we have setup a `Manager` to watch the queues with tags `42` and `69`, + * we then dequeue some messages from both queus. Finally, we shut down the manager. + */ +unittest +{ + import std.socket; + import std.stdio; + import core.thread; + + Address serverAddress = parseAddress("::1", 0); + Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + server.bind(serverAddress); + server.listen(0); + + class ServerThread : Thread + { + this() + { + super(&worker); + } + + private void worker() + { + Socket clientSocket = server.accept(); + BClient bClient = new BClient(clientSocket); + + Thread.sleep(dur!("seconds")(7)); + writeln("Server start"); + + /** + * Create a tagged message to send + * + * tag 42 payload Cucumber 😳️ + */ + TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️"); + byte[] tEncoded = message.encode(); + writeln("server send status: ", bClient.sendMessage(tEncoded)); + + writeln("server send [done]"); + + /** + * Create a tagged message to send + * + * tag 69 payload Hello + */ + message = new TaggedMessage(69, cast(byte[])"Hello"); + tEncoded = message.encode(); + writeln("server send status: ", bClient.sendMessage(tEncoded)); + + writeln("server send [done]"); + + /** + * Create a tagged message to send + * + * tag 69 payload Bye + */ + message = new TaggedMessage(69, cast(byte[])"Bye"); + tEncoded = message.encode(); + writeln("server send status: ", bClient.sendMessage(tEncoded)); + + writeln("server send [done]"); + + /** + * Create a tagged message to send + * + * tag 100 payload Bye + */ + message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1"); + tEncoded = message.encode(); + writeln("server send status: ", bClient.sendMessage(tEncoded)); + + writeln("server send [done]"); + + /** + * Create a tagged message to send + * + * tag 200 payload Bye + */ + message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2"); + tEncoded = message.encode(); + writeln("server send status: ", bClient.sendMessage(tEncoded)); + + writeln("server send [done]"); + } + } + + ServerThread serverThread = new ServerThread(); + serverThread.start(); + + Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); + + writeln(server.localAddress); + + + Manager manager = new Manager(client); + + Queue sixtyNine = new Queue(69); + Queue fortyTwo = new Queue(42); + + manager.registerQueue(sixtyNine); + manager.registerQueue(fortyTwo); + + // Register a default queue (tag ignored) + Queue defaultQueue = new Queue(2332); + manager.setDefaultQueue(defaultQueue); + + + /* Connect our socket to the server */ + client.connect(server.localAddress); + + /* Start the manager and let it manage the socket */ + manager.start(); + + /* Block on the unittest thread for a received message */ + writeln("unittest thread: Dequeue() blocking..."); + TaggedMessage dequeuedMessage = sixtyNine.dequeue(); + writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'"); + assert(dequeuedMessage.getTag() == 69); + assert(dequeuedMessage.getPayload() == cast(byte[])"Hello"); + + /* Block on the unittest thread for a received message */ + writeln("unittest thread: Dequeue() blocking..."); + dequeuedMessage = sixtyNine.dequeue(); + writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'"); + assert(dequeuedMessage.getTag() == 69); + assert(dequeuedMessage.getPayload() == cast(byte[])"Bye"); + + /* Block on the unittest thread for a received message */ + writeln("unittest thread: Dequeue() blocking..."); + dequeuedMessage = fortyTwo.dequeue(); + writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'"); + assert(dequeuedMessage.getTag() == 42); + assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️"); + + + /* Dequeue two messages from the default queue */ + writeln("unittest thread: Dequeue() blocking..."); + dequeuedMessage = defaultQueue.dequeue(); + writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'"); + assert(dequeuedMessage.getTag() == 100); + assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1"); + + writeln("unittest thread: Dequeue() blocking..."); + dequeuedMessage = defaultQueue.dequeue(); + writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'"); + assert(dequeuedMessage.getTag() == 200); + assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2"); + + + /* Stop the manager */ + manager.stop(); +} \ No newline at end of file diff --git a/source/tristanable/package.d b/source/tristanable/package.d index bdbabde..7e0fbf8 100644 --- a/source/tristanable/package.d +++ b/source/tristanable/package.d @@ -3,6 +3,23 @@ */ module tristanable; +/** + * Interface which manages a provided socket + * and enqueuing and dequeuing of queues + */ +public import tristanable.manager; -public import tristanable.manager : Manager; -public import tristanable.queue : Queue, QueueItem; \ No newline at end of file +/** + * A queue of queue items all of the same tag + */ +public import tristanable.queue.queue : Queue; + +/** + * Error handling type definitions + */ +public import tristanable.exceptions : TristanableException, ErrorType; + +/** + * Encoding/decoding of the tristanable format + */ +public import tristanable.encoding : TaggedMessage; \ No newline at end of file diff --git a/source/tristanable/queue.d b/source/tristanable/queue.d deleted file mode 100644 index cc3e954..0000000 --- a/source/tristanable/queue.d +++ /dev/null @@ -1,53 +0,0 @@ -module tristanable.queue; - -import libsnooze; -import core.sync.mutex : Mutex; - -public class Queue -{ - /** - * Everytime a thread calls `.dequeue()` on this queue - * - */ - private Event event; - - private QueueItem queue; - private Mutex queueLock; - - - private this() - { - /* Initialize the queue lock */ - this.queueLock = new Mutex(); - - /* Initialize the event */ - this.event = new Event(); - } - - public void dequeue() - { - // TODO: Make us wait on the event (optional with a time-out) - - // TODO: Lock queue - queueLock.lock(); - - // TODO: Get item off queue - - // TODO: Unlock queue - queueLock.unlock(); - } - - public static Queue newQueue(ulong queueID) - { - Queue queue; - - // TODO: Implement me - - return queue; - } -} - -public class QueueItem -{ - -} \ No newline at end of file diff --git a/source/tristanable/queue/queue.d b/source/tristanable/queue/queue.d new file mode 100644 index 0000000..8db3aa8 --- /dev/null +++ b/source/tristanable/queue/queue.d @@ -0,0 +1,240 @@ +/** + * A queue of queue items all of the same tag + */ +module tristanable.queue.queue; + +import core.sync.mutex : Mutex; +import core.sync.condition : Condition; +import core.sync.exception : SyncError; +import std.container.slist : SList; +import tristanable.encoding; +import core.time : Duration, dur; +import tristanable.exceptions; + +version(unittest) +{ + import std.stdio; + import std.conv : to; +} + +/** + * Represents a queue whereby messages of a certain tag/id + * can be enqueued to (by the `Watcher`) and dequeued from + * (by the user application) + */ +public class Queue +{ + /** + * This queue's unique ID + */ + private ulong queueID; + + /** + * The libsnooze event used to sleep/wake + * on queue events + * Mutex for the condition variable + */ + private Mutex mutex; + + /** + * The condition variable used to sleep/wake + * on queue of events + */ + private Condition signal; + + /** + * The queue of messages + */ + private SList!(TaggedMessage) queue; + + /** + * The lock for the message queue + */ + private Mutex queueLock; + + /** + * If a message is enqueued prior + * to us sleeping then we won't + * wake up and return for it. + * + * Therefore a periodic wakeup + * is required. + */ + private Duration wakeInterval; + + /** + * Constructs a new Queue and immediately sets up the notification + * sub-system for the calling thread (the thread constructing this + * object) which ensures that a call to dequeue will immediately + * unblock on the first message received under this tag + * + * Params: + * queueID = the id to use for this queue + */ + this(ulong queueID) + { + /* Initialize the queue lock */ + this.queueLock = new Mutex(); + + /* Initialize the condition variable */ + this.mutex = new Mutex(); + this.signal = new Condition(this.mutex); + + /* Set the queue id */ + this.queueID = queueID; + + /* Set the slumber interval */ + this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value + } + + /** + * Returns the current wake interval + * for the queue checker + * + * Returns: the `Duration` + */ + public Duration getWakeInterval() + { + return this.wakeInterval; + } + + /** + * Sets the wake up interval + * + * Params: + * interval = the new interval + */ + public void setWakeInterval(Duration interval) + { + this.wakeInterval = interval; + } + + /** + * Enqueues the provided tagged message onto this queue + * and then wakes up any thread that has called dequeue + * on this queue as well + * + * On error enqueueing a `TristanableException` will be + * thrown. + * + * Params: + * message = the TaggedMessage to enqueue + */ + public void enqueue(TaggedMessage message) + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Enqueuing '"~to!(string)(message)~"'..."); + } + + scope(exit) + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Enqueued '"~to!(string)(message)~"'!"); + } + + /* Unlock the item queue */ + queueLock.unlock(); + } + + /* Lock the item queue */ + queueLock.lock(); + + /* Add the item to the queue */ + queue.insertAfter(queue[], message); + + /* Wake up anyone wanting to dequeue from us */ + try + { + // TODO: Make us wait on the event (optional with a time-out) + signal.notifyAll(); + } + catch(SyncError snozErr) + { + // Throw an exception on a fatal exception + throw new TristanableException(ErrorType.ENQUEUE_FAILED); + } + } + + // TODO: Make a version of this which can time out + + /** + * Blocks till a message can be dequeued from this queue + * + * On error dequeueing a `TristanableException` will be + * thrown. + * + * Returns: the dequeued TaggedMessage + */ + public TaggedMessage dequeue() + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Dequeueing..."); + } + + /* The dequeued message */ + TaggedMessage dequeuedMessage; + + scope(exit) + { + version(unittest) + { + writeln("queue["~to!(string)(queueID)~"]: Dequeued '"~to!(string)(dequeuedMessage)~"'!"); + } + } + + /* Block till we dequeue a message successfully */ + while(dequeuedMessage is null) + { + scope(exit) + { + // Unlock the mutex + this.mutex.unlock(); + } + + // Lock the mutex + this.mutex.lock(); + + try + { + this.signal.wait(this.wakeInterval); + } + catch(SyncError e) + { + // Throw an exception on a fatal exception + throw new TristanableException(ErrorType.DEQUEUE_FAILED); + } + + + /* Lock the item queue */ + queueLock.lock(); + + /* Consume the front of the queue (if non-empty) */ + if(!queue.empty()) + { + /* Pop the front item off */ + dequeuedMessage = queue.front(); + + /* Remove the front item from the queue */ + queue.linearRemoveElement(dequeuedMessage); + } + + /* Unlock the item queue */ + queueLock.unlock(); + } + + return dequeuedMessage; + } + + /** + * Get the id/tag of this queue + * + * Returns: the queue's id + */ + public ulong getID() + { + return queueID; + } +} \ No newline at end of file diff --git a/tristanable-test-library b/tristanable-test-library deleted file mode 100755 index e9c700c..0000000 Binary files a/tristanable-test-library and /dev/null differ