Merge branch 'nextgen' into nextgen_listener

This commit is contained in:
Tristan B. Velloza Kildaire 2023-04-07 12:28:32 +02:00
commit 077dfe457b
5 changed files with 74 additions and 15 deletions

View File

@ -3,6 +3,8 @@
*/
module tristanable.exceptions;
import std.conv : to;
/**
* The type of sub-error of the `TristanableException`
*/
@ -31,16 +33,29 @@ public enum ErrorType
*/
public class TristanableException : Exception
{
/**
* The sub-error type
*/
private ErrorType err;
/**
* Constructs a new `TristanableException` with the provided
* sub-error type
*
* Params:
* err = the `ErrorType`
*/
this(ErrorType err)
{
// TODO: Do this
super("TODO: Do this");
super(this.classinfo.name~": "~to!(string)(err));
this.err = err;
}
/**
* Retrieve the sub-error type
*
* Returns: the sub-error type as a `ErrorType`
*/
public ErrorType getError()
{
return err;

View File

@ -35,10 +35,12 @@ public class Manager
/**
* Currently registered queues
*
* NOTE: Make a ulong map to this later
*/
private SList!(Queue) queues;
/**
* Lock for currently registered queues
*/
private Mutex queuesLock;
/**
@ -68,14 +70,21 @@ public class Manager
this.watcher = new Watcher(this, socket);
}
// TODO: comment
// Starts the watcher
/**
* Starts the management of the socket,
* resulting in queues being updated upon
* reciving messages tagged for them
*/
public void start()
{
watcher.startWatcher();
}
// Stops the watcher
/**
* Stops the management of the socket, resulting
* in ending the updating of queues and closing
* the underlying connection
*/
public void stop()
{
watcher.shutdown();
@ -286,7 +295,6 @@ public class Manager
return potentialDefaultQueue;
}
/**
* Returns the default queue
*

View File

@ -1,3 +1,8 @@
/**
* Facilitates the reading of messages from the socket,
* decoding thereof and final enqueuing thereof into their
* respective queus
*/
module tristanable.manager.watcher;
import core.thread : Thread;
@ -28,9 +33,16 @@ public class Watcher : Thread
*/
private Socket socket;
// TODO: make package-level in a way such
// ... that only Manager can access this constructor
// TODO: Add constructor doc
/**
* Creates a new `Watcher` that is associated
* with the provided `Manager` such that it can
* add to its registered queues. The provided `Socket`
* is such that it can be read from and managed.
*
* Params:
* manager = the `Manager` to associate with
* socket = the underlying `Socket` to read data from
*/
package this(Manager manager, Socket socket)
{
this.manager = manager;
@ -39,6 +51,9 @@ public class Watcher : Thread
super(&watch);
}
/**
* Starts the underlying thread
*/
package void startWatcher()
{
/* Start the watch method on a new thread */
@ -114,6 +129,10 @@ public class Watcher : Thread
}
}
/**
* Shuts down the watcher, unblocks the blocking read in the loop
* resulting in the watcher thread ending
*/
package void shutdown()
{
/* Unblock all calls to `recv()` and disallow future ones */

View File

@ -22,4 +22,4 @@ public import tristanable.exceptions : TristanableException, ErrorType;
/**
* Encoding/decoding of the tristanable format
*/
public import tristanable.encoding : TaggedMessage;
public import tristanable.encoding : TaggedMessage;

View File

@ -18,15 +18,27 @@ version(unittest)
import std.conv : to;
}
/**
* Represents a queue whereby messages of a certain tag/id
* can be enqueued to (by the `Watcher`) and dequeued from
* (by the user application)
*/
public class Queue
{
/**
* Everytime a thread calls `.dequeue()` on this queue
*
* The libsnooze event used to sleep/wake
* on queue events
*/
private Event event;
/**
* The queue of messages
*/
private SList!(TaggedMessage) queue;
/**
* The lock for the message queue
*/
private Mutex queueLock;
/**
@ -161,6 +173,11 @@ public class Queue
return dequeuedMessage;
}
/**
* Get the id/tag of this queue
*
* Returns: the queue's id
*/
public ulong getID()
{
return queueID;