tristanable/source/tristanable/manager/watcher.d

297 lines
9.6 KiB
D
Raw Normal View History

Nextgen: Tristanable v2 (#4) * - Added new logo to `README.md` - Fixed typos in `README.md` - Added new logo (source included) * - Added `bformat` version `3.1.13` as dependency * - Removed executable * - Updated `.gitignore` * - Use `https` link rather to `bformat` homepage * Package (tristanable) - Added public imports along with comments per each Encoding - Added a stub class, `TaggedMessage`, for encoding and decoding the tristanable byte payload Exceptions - Added `TristanableException` exception type along with the `Error` enum sub-type Manager - Added stub code for `Manager` to manage the queues and socket Queue - Added stub class representing a queue with a tag (`Queue`) QueueItem - Added stub class `QueueItem` which represents an item that is enqueued/dequeued onto a `Queue` Watcher - Added stub class `Watcher` which will manage the socket reading-wise * Manager - Added field `watcher` of type `Watcher` * Watcher - Added constructor which takes in an instance of `Manager` and an instance of `Socket` * Manager - Added unit test TODO * - Moved `Watcher` and `Manager` modules to their own package - Ensured `Watcher`'s constructor is package-level accessible only Manager - The constructor now creates an instance of `Watcher` - Added a `start()` method which calls `watcher.start()` * Manager - Added stub `sendMessage(TaggedMessage)` which will encode into the tristanable format, then wrap into bformat and send over the socket - Added import for `TaggedMessage` from `tristanable.encoding` module * Package (tristanable) - Added an import for `TaggedMessage` from module `tristanable.encoding` * Encoding - Added stub class `TaggedMessage` - Added constructor, static decoder (unimplemented), `encoder (implemented), getters and setters - Added module `tristanable.encoding` * - Attempt merge * Encoding - Added parameter-less (default) constructor marked as `private` to `TaggedMessage` - Added decoding support in `decode(byte[])` which will return a new instance of `TaggedMessage` - Added a unit test to test encoding and decoding * TaggedMessage - Added documentation for fields `tag` and `data` - Added documentation for both constructors - Added documentation for `getPayload()`, `getTag()`, `setPayload(byte[])` and `setTag(ulong)` * - Fixed formatting in `README.md` * Watcher - Added import for `bformat` and `encoding` module - Documented `watch()` - Added `bformat` read-and-decode `receiveMessage(Socket, ref byte[])` call followed by a `TaggedMessage.decode(byte[])` call * Watcher - Moved TODO below already completed code Exceptions - Renamed `Error` to `ErrorType` - Constructing a new `TristanableException` will now store the passed in `ErrorType` - Added `getError()` to `TristanableException` which returns the stored `ErrorType` - Added two new memebrs to enum `ErrorType`, namely `QUEUE_NOT_FOUND` and `QUEUE_ALREADY_EXISTS` * Manager - Changed from using D's dynamic arrays for the array of `Queue` objects to using an `SList!(T)` where `T` is the `Queue` type - Implemented `getQueue(ulong)` which returns the `Queue` object with the matching id/tag, else throws an instance of `TristanabaleException` - Implemented `registerQueue(Queue)` which will attempt to add the provided `Queue` given that a queue does not already exist with the provided queue's id; if that is the case then an instance of `TristanableException` is thrown Queue - Made the constructor take in the `ulong` queue ID - Made the constructor publically accessible - Implemented `getID()` which returns the `Queue`'s id as a `ulong` - Removed the static method `newQueue(ulong)` Unit test - Added a unit test to test `getQueue(ulong)` when the queue cannot be found - Added a unit test to test adding a queue and successfully retrieving it * Package (tristanable) - Fixed up the `exceptions` module import * Manager - Removed now-completed TODO in `registerQueue(QUeue)` * Manager - Documented `registerQueue(Queue)` * Exceptions - Added new member `NO_DEFAULT_QUEUE` to enum `ErrorType` * Queue - The actual queue is now an `SList!(TaggedMessage)` - Added a stub method `enqueue(TaggedMessage)` - Updated the stub method `dequeue()` which returns a `TaggedMessage` now * - Removed `QueueItem` * Encoding - Implemented `toString()` in `TaggedMessage` * Manager - Added a default queue - `getQueue(ulong)` now calls `getQueue_nothrow(ulong)` with the same id - Implemented `getQueue_nothrow(ulong)` which returns the `Queue` if found, `null` otherwise - Added `getDefaultQueue()` which gets the default queue by calling `getDefaultQueue_nothrow(ulong)` with the same id - Added `getDefaultQueue_nothrow(ulong)` which returns the default queue as a `Queue` object if it exists, else `null` - Added `setDefaultQueue(Queue)` which sets the provided queue as the default queue (i.e. the queue where messages tagged with a tag of a queue not registered will be dumped into - if the default queue is set) Watcher - Set the worker thread, `watch`, in the constructor - Added a TODO relating to checking if the socket read succeeded or not - Added a debug print for the received `TaggedMessage` post-decode - Extract the tag of the message and find the matching queue (potentially, if it exists) - If the queue exists then add the `TaggedMessage` to said `Queue` - If the queue doesn't exist then, get the so-called "Default queue", if it doesn't exist don't do anything, if it does then enqueue the message (the `TaggedMessage`) to said `Queue` Unit test - Added a unit test (WIP) for testing the `Manager` and `Watcher` mechanism - Updated unittest to test the `getQueue_nothrow(ulong)` method - Added a unit test to test adding a `Queue` with a tag that already exists in a `Queue` registered prior * Queue - Added imports for `std.stdio` and `to` from `std.conv` to be imported when compiling in `unittest` mode - Added documentation to `enqueue(TaggedMessage)` - Implemented `enqueue(TaggedMessage)` using libsnooze - Added documentation for `dequeue()` - Implemented `dequeue()` using libsnooze * Queue - Replaced now-completed TODO with an actual comment in `dequeue()` * Queue - Added entrance and exit debugs for `dequeue()` * Manager - Clean up I guess * Unit test (Watcher) - Unit test for watcher works * Watcher - Deleted old module that was unused * Package - Removed completed TODO * Exceptions - Removed unused enum member `QueueExists` of enum `ErrorType` * Encoding - Added module-level documentation Exceptions - Added module-level documentation Queue - Added module-level documentation Package (`tristanable.manager`) - Added module-level documentation * Manager - Implemented `getUniqueQueue()` which finds an unused tag, makes a `Queue` with said tag, registers it and then returns it - WIP: `shutdown()` method * Manager - Added unittest for `getUniqueQueue()` - Typo fix * Manager - Removed empty unittest * Config - Added new module `config` - Added new type `Config` which is used for configuring an instance of `Manager` - Added `defaultConfig()` which returns the default `Config` instance used * Config - Make the default configuration generated more explicit in `defaultConfig` - Remove initialization in `Config` (would be `false` in any case) * Manager - Added support for configuring the `Manager`, the constructor now uses the default configuration - Implemented `registerQueue_nothrow(Queue)` which returns `true` on success, `false` otherwise - `registerQueue(Queue)` now makes a sub-call to `registerQueue_nothrow(Queue)` - Implemented `sendMessage(TaggedMessage)` - Added comment for assertions in unittest * Package (`manager`) - Import the `Config` type and the `defaultConfig()` function * Watcher - Added stub `shutdown()` method that is intended to be called by `Manager` (package-level accessible) Unit tests - Sleep for 4 seconds instead of 2 before the server sends the two tagged messages - Send a messae tagged with tag `42` before that of the one tagged with `69` - Register a queue with tag `42` - Remove `WaitingThread`, we now receive both tagged messages (`42` and `69`) on the unittest thread - Added assertion to ensure the tagged message of `69` is indeed received correctly (tag AND payload) * Queue - Added documentation for the constructor `this(ulong)` - Fixed issue #5 Unit tests - The `==` operator on strings does some normalization stuff which results in differing byte sequences and therefore inequality (see the `"Cucumber 😳️"` case), therefore casting to `byte[]` - Send another message tagged with `69` - Fixed comment for server code sending tagged `42` message - Call `manager.stop()` right at the end of the unit test * Watcher - Added package-level accessible `startWatcher()` method which calls `start()` for us - Added some debugging prints which will now only be compiled-in during unittest builds - If the bformat `receiveMessage(Socket, ref byte[])` method fails (returns `false`) then exit the loop, only continue decoding if it is `true` - Implemented package-level accesible `shutdown()` method Manager - `start()` now calls `watcher.startWatcher()` instead of `watcher.start()` * Unit test - Sleep a little longer for profiling tests * Watcher - Documented unittest as it is a great example of how to sue tristanable * Exceptions - Documented `ErrorType` and all its members - Documented `TristanableException` * Exceptions - Added missing documentation - Fixed the message generation of the exception's message * Manager - Added documentation for `start()` and `stop()` * Manager - Removed now-completed TODO - Added documentation for `queuesLock` * Watcher - Added documentation to the constructor for `Watcher` * Watcher - Documented module `tristanable.manager.watcher` * Queue - Docuemneted `getId()` - Documented the `Queue` class - Documented fields `event`, `queue` and `queueLock` * Package - Removed whitespace * Watcher - Documented method `shutdown()` * - Upgraded to new `bformat` version `4.1.0` and migrated to using `BClient` (unit tests seem to pass) * Manager - Added a TODO for the future `removeQueue(Queue)` and `removeQueu_nothrow(Queue)` * Manager - Implemented `releaseQueue(Queue)` - Implemented `releaseQueue_nothrow(Queue)` Unit tests - Added unit test for `releaseQueue(Queue)` * Unit tests - Added a TODO * Dub - Now requires a minimum version of `libsnooze` of at least ` 1.0.0-beta` * Queue - Be specific, catch `FatalException` in `enqueue(TaggedMessage)` - Be specific, catch `InterruptedException` and `FatalException` seperately * Exceptions - Added enum members `DEQUEUE_FAILED` and `ENQUEUE_FAILED` to `ErrorType` enum * Queue - `enqueue(TaggedMessage)` can now throw a `TristanableException` if a `FataException` with `libsnooze` occurrs - `dequeue()` can now throw a `TristanableException` if a `FatalException` occurs during the call to `wait()` on `libsnooze` * Dub - Required minimum version of `bformat`, `4.1.0` * Dub - Upgraded `libsnooze` to version `1.3.0-beta` * Migrate from libsnooze (#8) * Dub - Removed `libsnooze` dependency * Queue - Removed `libsnooze` imports * Queue - Added mutex+condition variable * Queue - Removed old `ensure()` call * Queue - Switched one thing over to mutex+condvar * Queue - Switched to using condition variable - Added configurable slumber interval * Queue - Removed TODOs which are irrevevant for now * Queue - Removed `TListener` references Everything else - Removed reference to old/duplicate `queue.d` module * Hotfix/niknaks (#9) * Dub - Added `niknaks` package with a minimum version of `v0.3.0` * Encoding - Switched to niknaks for `decode()` * Encoding - `encode()` now uses niknaks * Watcher (unit tests) - Added testing for default queue
2023-10-02 14:49:54 +01:00
/**
* Facilitates the reading of messages from the socket,
* decoding thereof and final enqueuing thereof into their
* respective queus
*/
module tristanable.manager.watcher;
import core.thread : Thread;
import tristanable.manager.manager : Manager;
import std.socket;
import bformat;
import tristanable.encoding;
import tristanable.exceptions;
import tristanable.queue.queue;
import bformat.client;
/**
* Watches the socket on a thread of its own,
* performs the decoding of the incoming messages
* and places them into the correct queues via
* the associated Manager instance
*/
public class Watcher : Thread
{
/**
* The associated manager to use
* such that we can place new mail
* into their respective inboxes (queues)
*/
private Manager manager;
/**
* The BClient to read from
*/
private BClient bClient;
/**
* Creates a new `Watcher` that is associated
* with the provided `Manager` such that it can
* add to its registered queues. The provided `Socket`
* is such that it can be read from and managed.
*
* Params:
* manager = the `Manager` to associate with
* bclient = the underlying `BClient` to read data from
*/
package this(Manager manager, BClient bClient)
{
this.manager = manager;
this.bClient = bClient;
super(&watch);
}
/**
* Starts the underlying thread
*/
package void startWatcher()
{
/* Start the watch method on a new thread */
start();
}
/**
* Watches the socket for incoming messages
* and decodes them on the fly, placing
* the final message in the respective queue
*/
private void watch()
{
import std.stdio;
while(true)
{
/* Do a bformat read-and-decode */
byte[] wireTristan;
version(unittest) { writeln("Before bformat recv()"); }
bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("After bformat recv()"); }
version(unittest) { writeln("bformat recv() status: ", recvStatus); }
if(recvStatus)
{
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
version(unittest) { writeln("Watcher received: ", decodedMessage); }
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
{
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
}
version(unittest) { writeln("drip"); }
}
/**
* If there was an error receiving on the socket.
*
* This can be either because we have shut the socket down
* or the remote end has closed the connection.
*
* In any case, exit the loop therefore ending this thread.
*/
else
{
break;
}
}
}
/**
* Shuts down the watcher, unblocks the blocking read in the loop
* resulting in the watcher thread ending
*/
package void shutdown()
{
/* Closes the bformat reader */
bClient.close();
}
}
/**
* Set up a server which will send some tagged messages to us (the client),
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
* we then dequeue some messages from both queus. Finally, we shut down the manager.
*/
unittest
{
import std.socket;
import std.stdio;
import core.thread;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
class ServerThread : Thread
{
this()
{
super(&worker);
}
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 100 payload Bye
*/
message = new TaggedMessage(100, cast(byte[])"DEFQUEUE_1");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 200 payload Bye
*/
message = new TaggedMessage(200, cast(byte[])"DEFQUEUE_2");
tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
Queue fortyTwo = new Queue(42);
manager.registerQueue(sixtyNine);
manager.registerQueue(fortyTwo);
// Register a default queue (tag ignored)
Queue defaultQueue = new Queue(2332);
manager.setDefaultQueue(defaultQueue);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
/* Dequeue two messages from the default queue */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 100);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_1");
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = defaultQueue.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 200);
assert(dequeuedMessage.getPayload() == cast(byte[])"DEFQUEUE_2");
/* Stop the manager */
manager.stop();
}