- Added a default queue
- `getQueue(ulong)` now calls `getQueue_nothrow(ulong)` with the same id
- Implemented `getQueue_nothrow(ulong)` which returns the `Queue` if found, `null` otherwise
- Added `getDefaultQueue()` which gets the default queue by calling `getDefaultQueue_nothrow(ulong)` with the same id
- Added `getDefaultQueue_nothrow(ulong)` which returns the default queue as a `Queue` object if it exists, else `null`
- Added `setDefaultQueue(Queue)` which sets the provided queue as the default queue (i.e. the queue where messages tagged with a tag of a queue not registered will be dumped into - if the default queue is set)

Watcher

- Set the worker thread, `watch`, in the constructor
- Added a TODO relating to checking if the socket read succeeded or not
- Added a debug print for the received `TaggedMessage` post-decode
- Extract the tag of the message and find the matching queue (potentially, if it exists)
- If the queue exists then add the `TaggedMessage` to said `Queue`
- If the queue doesn't exist then, get the so-called "Default queue", if it doesn't exist don't do anything, if it does then enqueue the message (the `TaggedMessage`) to said `Queue`

Unit test

- Added a unit test (WIP) for testing the `Manager` and `Watcher` mechanism
- Updated unittest to test the `getQueue_nothrow(ulong)` method
- Added a unit test to test adding a `Queue` with a tag that already exists in a `Queue` registered prior
This commit is contained in:
Tristan B. Velloza Kildaire 2023-03-31 11:31:56 +02:00
parent 8cf731089e
commit 16bbeeece4
2 changed files with 215 additions and 7 deletions

View File

@ -34,6 +34,11 @@ public class Manager
private SList!(Queue) queues;
private Mutex queuesLock;
/**
* Default queue
*/
private Queue defaultQueue;
/**
* Watcher which manages the socket and
* enqueues new messages into the respective
@ -71,6 +76,29 @@ public class Manager
* Throws: TristanableException if the queue is not found
*/
public Queue getQueue(ulong id)
{
/* The found queue */
Queue queue = getQueue_nothrow(id);
/* If no queue is found then throw an error */
if(queue is null)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
return queue;
}
/**
* Retrieves the queue mathcing the provided id
*
* This is the nothrow version
*
* Params:
* id = the id to lookup by
* Returns: the Queue if found, null otherwise
*/
public Queue getQueue_nothrow(ulong id)
{
/* The found queue */
Queue queue;
@ -95,15 +123,11 @@ public class Manager
}
}
/* If no queue is found then throw an error */
if(queue is null)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
return queue;
}
/**
* Registers the given queue with the manager
*
@ -137,6 +161,61 @@ public class Manager
queues.insertAfter(queues[], queue);
}
/**
* Sets the default queue
*
* The default queue, when set/enabled, is the queue that will
* be used to enqueue messages that have a tag which doesn't
* match any of the normally registered queues.
*
* Please note that the ID of the queue passed in here does not
* mean anything in this context; only the queuing facilities
* of the Queue object are used
*
* Params:
* queue = the default queue to use
*/
public void setDefaultQueue(Queue queue)
{
this.defaultQueue = queue;
}
/**
* Returns the default queue
*
* Returns: the default queue
* Throws:
* TristanableException if there is no default queue
*/
public Queue getDefaultQueue()
{
/* The potential default queue */
Queue potentialDefaultQueue = getDefaultQueue_nothrow();
if(potentialDefaultQueue is null)
{
throw new TristanableException(ErrorType.NO_DEFAULT_QUEUE);
}
return potentialDefaultQueue;
}
/**
* Returns the default queue
*
* This is the nothrow version
*
* Returns: the default queue if found, null otherwise
*/
public Queue getDefaultQueue_nothrow()
{
return defaultQueue;
}
public void sendMessage(TaggedMessage tag)
{
// TODO: Send the given message
@ -178,6 +257,9 @@ unittest
{
assert(e.getError() == ErrorType.QUEUE_NOT_FOUND);
}
/* Shouldn't be found */
assert(manager.getQueue_nothrow(69) is null);
}
/**
@ -206,4 +288,35 @@ unittest
{
assert(false);
}
/* Should be found */
assert(manager.getQueue_nothrow(69) !is null);
}
/**
* tests registering a queue and then registering
* another queue with the same id
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(null);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
try
{
/* Register the queue (try again) */
manager.registerQueue(queue);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_ALREADY_EXISTS);
}
}

View File

@ -5,6 +5,8 @@ import tristanable.manager.manager : Manager;
import std.socket;
import bformat;
import tristanable.encoding;
import tristanable.exceptions;
import tristanable.queue;
/**
* Watches the socket on a thread of its own,
@ -33,6 +35,8 @@ public class Watcher : Thread
{
this.manager = manager;
this.socket = socket;
super(&watch);
}
/**
@ -46,12 +50,103 @@ public class Watcher : Thread
{
/* Do a bformat read-and-decode */
byte[] wireTristan;
receiveMessage(socket, wireTristan);
receiveMessage(socket, wireTristan); // TODO: Add a check for the status of read
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
import std.stdio;
writeln("Watcher received: ", decodedMessage);
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
{
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
}
// TODO: Implement me
}
}
}
unittest
{
import std.socket;
import std.stdio;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
class ServerThread : Thread
{
this()
{
super(&worker);
}
private void worker()
{
Socket clientSocket = server.accept();
/**
* Create a tagged message to send
*
* tag 69 payload Hello
*/
TaggedMessage message = new TaggedMessage(69, cast(byte[])"Hello");
byte[] tEncoded = message.encode();
writeln("server send status: ", sendMessage(clientSocket, tEncoded));
writeln("server send [done]");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
manager.registerQueue(sixtyNine);
client.connect(server.localAddress);
manager.start();
// while(true){}
}