tristanable/source/tristanable/manager/manager.d

608 lines
14 KiB
D

/**
* Management of a tristanable instance
*/
module tristanable.manager.manager;
import std.socket;
import tristanable.queue.queue : Queue;
import core.sync.mutex : Mutex;
import tristanable.manager.watcher : Watcher;
import tristanable.encoding : TaggedMessage;
import tristanable.exceptions;
import std.container.slist : SList;
import tristanable.manager.config;
import river.core;
import river.impls.sock : SockStream;
import bformat.client;
/**
* Manages a provided socket by spawning
* a watcher thread to read from it and file
* mail into the corresponding queues.
*
* Queues are managed via this an instance
* of a manager.
*/
public class Manager
{
/**
* Configuration
*/
private Config config;
/**
* The bformat client to read and write from
*/
private BClient bClient;
/**
* Currently registered queues
*/
private SList!(Queue) queues;
/**
* Lock for currently registered queues
*/
private Mutex queuesLock;
/**
* Default queue
*/
private Queue defaultQueue;
/**
* Watcher which manages the socket and
* enqueues new messages into the respective
* quueue for us
*/
private Watcher watcher;
/**
* Constructs a new manager which will read from
* this socket and file mail for us
*
* Params:
* stream = the underlying stream to use
*/
this(Stream stream, Config config = defaultConfig())
{
this.bClient = new BClient(stream);
this.queuesLock = new Mutex();
this.config = config;
this.watcher = new Watcher(this, bClient);
}
// TODO: Comment this
// This is for backwards compatibility (whereby a `Socket` was taken in)
this(Socket socket, Config config = defaultConfig())
{
this(new SockStream(socket), config);
}
/**
* Starts the management of the socket,
* resulting in queues being updated upon
* reciving messages tagged for them
*/
public void start()
{
watcher.startWatcher();
}
/**
* Stops the management of the socket, resulting
* in ending the updating of queues and closing
* the underlying connection
*
* Calling this will also unblock any calls that
* were blocking whilst doing a `dequeue()`
*/
public void stop()
{
/* Stop with the given reason */
stop(ErrorType.MANAGER_SHUTDOWN);
}
/**
* Only called by the `Watcher` and for
* the purpose of setting a custom error
* type.
*
* Called when the network read fails
*/
void stop_FailedWatcher()
{
/* Stop with the given reason */
stop(ErrorType.WATCHER_FAILED);
}
/**
* Stops the watcher service and then
* unblocks all calls to `dequeue()`
* by shutting down each `Queue`
*
* Params:
* reason = the reason for the
* shutdown
*/
private void stop(ErrorType reason)
{
/* Stop the watcher */
watcher.shutdown();
/* Unblock all `dequeue()` calls */
shutdownAllQueues(reason);
}
/**
* Shuts down all registered queues
*/
protected void shutdownAllQueues(ErrorType reason)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
// TODO: Shutdown default queue - see mtsafety
/* Shutdown each queue */
foreach(Queue queue; this.queues)
{
queue.shutdownQueue(reason);
}
}
/**
* Retrieves the queue mathcing the provided id
*
* Params:
* id = the id to lookup by
* Returns: the Queue
* Throws: TristanableException if the queue is not found
*/
public Queue getQueue(ulong id)
{
/* The found queue */
Queue queue = getQueue_nothrow(id);
/* If no queue is found then throw an error */
if(queue is null)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
return queue;
}
/**
* Retrieves the queue mathcing the provided id
*
* This is the nothrow version
*
* Params:
* id = the id to lookup by
* Returns: the Queue if found, null otherwise
*/
public Queue getQueue_nothrow(ulong id)
{
/* The found queue */
Queue queue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == id)
{
queue = curQueue;
break;
}
}
return queue;
}
/**
* Get a new queue thatis unique in its tag
* (unused/not regustered yet), register it
* and then return it
*
* Returns: the newly registered Queue
*/
public Queue getUniqueQueue()
{
/* The newly created queue */
Queue uniqueQueue;
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
// TODO: Throw exception if all tags used
/* The unused tag */
ulong unusedTag = 0;
/* Try the current tag and ensure no queue uses it */
tagLoop: for(ulong curPotentialTag = 0; true; curPotentialTag++)
{
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == curPotentialTag)
{
continue tagLoop;
}
}
/* Then we have found a unique tag */
unusedTag = curPotentialTag;
break;
}
/* Create the queue */
uniqueQueue = new Queue(unusedTag);
/* Register it */
registerQueue(uniqueQueue);
return uniqueQueue;
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Throws:
* TristanableException if a queue with the provided id already exists
*/
public void registerQueue(Queue queue)
{
/* Try to register the queue */
bool status = registerQueue_nothrow(queue);
/* If registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Registers the given queue with the manager
*
* Params:
* queue = the queue to register
* Returns: true if registration was successful, false otherwise
*/
public bool registerQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, throw an exception if it exists */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Registration failed */
return false;
}
}
/* Insert the queue as it does not exist */
queues.insertAfter(queues[], queue);
/* Registration was a success */
return true;
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Throws:
* TristanableException if a queue with the provided id cannot be found
*/
public void releaseQueue(Queue queue)
{
/* Try to de-register the queue */
bool status = releaseQueue_nothrow(queue);
/* If de-registration was not successful */
if(!status)
{
throw new TristanableException(ErrorType.QUEUE_NOT_FOUND);
}
}
/**
* De-registers the given queue from the manager
*
* Params:
* queue = the queue to de-register
* Returns: true if de-registration was successful, false otherwise
*/
public bool releaseQueue_nothrow(Queue queue)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Search for the queue, return false if it does NOT exist */
foreach(Queue curQueue; queues)
{
if(curQueue.getID() == queue.getID())
{
/* Remove the queue */
queues.linearRemoveElement(queue);
/* De-registration succeeded */
return true;
}
}
/* De-registration failed */
return false;
}
/**
* Sets the default queue
*
* The default queue, when set/enabled, is the queue that will
* be used to enqueue messages that have a tag which doesn't
* match any of the normally registered queues.
*
* Please note that the ID of the queue passed in here does not
* mean anything in this context; only the queuing facilities
* of the Queue object are used
*
* Params:
* queue = the default queue to use
*/
public void setDefaultQueue(Queue queue)
{
this.defaultQueue = queue;
}
/**
* Returns the default queue
*
* Returns: the default queue
* Throws:
* TristanableException if there is no default queue
*/
public Queue getDefaultQueue()
{
/* The potential default queue */
Queue potentialDefaultQueue = getDefaultQueue_nothrow();
if(potentialDefaultQueue is null)
{
throw new TristanableException(ErrorType.NO_DEFAULT_QUEUE);
}
return potentialDefaultQueue;
}
/**
* Returns the default queue
*
* This is the nothrow version
*
* Returns: the default queue if found, null otherwise
*/
public Queue getDefaultQueue_nothrow()
{
return defaultQueue;
}
/**
* Sends the provided message over the socket
*
* Params:
* message = the TaggedMessage to send
*/
public void sendMessage(TaggedMessage message)
{
/**
* If a queue with the tag of the message does
* not exist, then register it if the config
* option was enabled
*/
if(config.registerOnSend)
{
/* Create a Queue with the tag */
Queue createdQueue = new Queue(message.getTag());
/* Attempt to register the queue */
registerQueue_nothrow(createdQueue);
}
/* Encode the message */
byte[] encodedMessage = message.encode();
/* Send it using bformat (encode-and-send) */
bClient.sendMessage(encodedMessage);
}
}
// TODO: Fix this, write it in a nicer way
// ... or make a private constructor here that
// ... does not take it in
version(unittest)
{
Socket nullSock = null;
}
/**
* Test retrieving a queue which does not
* exist
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Shouldn't be found */
try
{
manager.getQueue(69);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_NOT_FOUND);
}
/* Shouldn't be found */
assert(manager.getQueue_nothrow(69) is null);
}
/**
* Test registering a queue and then fetching it
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
try
{
/* Register the queue */
manager.registerQueue(queue);
/* Fetch the queue */
Queue fetchedQueue = manager.getQueue(69);
/* Ensure the queue we fetched is the one we stored (the references would be equal) */
assert(fetchedQueue == queue);
}
catch(TristanableException e)
{
assert(false);
}
/* Should be found */
assert(manager.getQueue_nothrow(69) !is null);
}
/**
* Tests registering a queue and then registering
* another queue with the same id
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
try
{
/* Register the queue (try again) */
manager.registerQueue(queue);
assert(false);
}
catch(TristanableException e)
{
assert(e.getError() == ErrorType.QUEUE_ALREADY_EXISTS);
}
}
/**
* Tests registering a queue, de-registering it and
* then registering it again
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */
Queue queue = new Queue(69);
/* Register the queue */
manager.registerQueue(queue);
/* Ensure it is registered */
assert(queue == manager.getQueue(69));
/* De-register the queue */
manager.releaseQueue(queue);
/* Ensure it is de-registered */
assert(manager.getQueue_nothrow(69) is null);
/* Register the queue (again) */
manager.registerQueue(queue);
/* Ensure it is registered (again) */
assert(queue == manager.getQueue(69));
}
/**
* Tests registering a queue using the "next available queue"
* method
*/
unittest
{
/* Create a manager */
Manager manager = new Manager(nullSock);
/* Get the next 3 available queues */
Queue queue1 = manager.getUniqueQueue();
Queue queue2 = manager.getUniqueQueue();
Queue queue3 = manager.getUniqueQueue();
/* The queues should have tags [0, 1, 2] respectively */
assert(queue1.getID() == 0);
assert(queue2.getID() == 1);
assert(queue3.getID() == 2);
}
// TODO: Add testing for queue existence (internal method)