tristanable/source/tristanable/manager.d

283 lines
6.1 KiB
D

module tristanable.manager;
import tristanable.watcher : Watcher;
import tristanable.request : Request;
import tristanable.garbage : GarbageCollector;
import tristanable.encoding : DataMessage;
import tristanable.notifications : NotificationReply;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
/* TODO: Watcher class to watch for stuff, and add to manager's queues */
/* TODO: maneger class to use commands on, enqueue and wait for dequeue */
public final class Manager
{
/* TODO: Insert queues here */
/**
* The queue of outstanding requests
*/
private Request[] requestQueue;
/**
* Reserved tags
*/
private ulong[] reservedTags;
/**
* The queue of received notifications
*/
private NotificationReply[] notificationQueue;
/**
* The associated Watcher object for this manager.
*/
private Watcher watcher;
/**
* The list mutex
*/
private Mutex queueMutex;
/**
* The notification queue mutex
*/
private Mutex notificationMutex;
/**
* The remote host
*/
private Socket socket;
/**
* The garbage collector
*/
private GarbageCollector gc;
this(Socket endpoint)
{
/* Set the socket */
socket = endpoint;
/* Create the watcher */
watcher = new Watcher(this, endpoint);
/* Create the garbage collector */
gc = new GarbageCollector(this);
/* Initialize the `requestQueue` mutex */
queueMutex = new Mutex();
/* Initialize the `notificationQueue` mutex */
notificationMutex = new Mutex();
/* Start the watcher */
watcher.start();
/* Start the garbage collector */
gc.start();
}
public void stopManager()
{
/* Will caue watcher to not block */
socket.close();
/* Stop watcher */
watcher.stopWatcher();
/* Stop gc */
gc.stopGC();
/* Wait for watcher thread to stop */
watcher.join();
/* Wait for garbage collector thread to stop */
gc.join();
}
public void sendMessage(ulong tag, byte[] data)
{
/* Encode the message */
DataMessage dataMessage = new DataMessage(tag, data);
/* Construct the message array */
byte[] messageData = dataMessage.encode();
/* Send the message */
bSendMessage(socket, messageData);
/* Create a new Request */
Request newRequest = new Request(tag);
/* Lock the queue for reading */
lockQueue();
/* Add the request to the request queue */
requestQueue ~= newRequest;
/* Unlock the queue */
unlockQueue();
}
public bool isValidTag(ulong tag)
{
for(ulong i = 0; i < requestQueue.length; i++)
{
/* Get the request */
Request request = requestQueue[i];
/**
* Only if the tag is found then return true
* and if it is the fresh tagged request (not
* ones that are dead using the) same tag.
*/
if(request.isDead == false && request.tag == tag)
{
return true;
}
}
return false;
}
public ulong getTagPosition(ulong tag)
{
for(ulong i = 0; i < requestQueue.length; i++)
{
/* Get the request */
Request request = requestQueue[i];
/**
* Only if the tag is found then return its
* posistion and if it is the fresh tagged
* request (not ones that are dead using the)
* same tag.
*/
if(request.isDead == false && request.tag == tag)
{
return i;
}
}
return 0;
}
public byte[] receiveMessage(ulong tag)
{
/* The received data */
byte[] receivedData;
bool active = true;
/* Loop till fulfilled */
while(active)
{
/* Lock the queue for reading */
lockQueue();
/* Throw an exception if it doesn't exist */
if(!isValidTag(tag))
{
/* Unlock the queue */
unlockQueue();
/* Throw exception here */
throw new TristanFokop("Invalid tag");
}
/* Get the request */
Request request = requestQueue[getTagPosition(tag)];
/* Check if the request has been fulfilled */
if(request.isFulfilled())
{
receivedData = request.pullData();
active = false;
}
/* Unlock the queue */
unlockQueue();
}
return receivedData;
}
public Request[] getQueue()
{
return requestQueue;
}
public Request[]* getQueueVariable()
{
return &requestQueue;
}
public void lockQueue()
{
queueMutex.lock();
}
public void unlockQueue()
{
queueMutex.unlock();
}
public void lockNotificationQueue()
{
notificationMutex.lock();
}
public void unlockNotificationQueue()
{
notificationMutex.unlock();
}
public NotificationReply[] popNotifications()
{
/* The notifications at this moment */
NotificationReply[] currentNotificationSet;
/* Lock the notification queue */
lockNotificationQueue();
/* Copy the current notifications */
currentNotificationSet = notificationQueue;
/* Empty the notification list */
notificationQueue.length = 0;
/* Unlock the notification queue */
unlockNotificationQueue();
return currentNotificationSet;
}
public void reserveTag(ulong tag)
{
reservedTags ~= tag;
}
public bool isReservedTag(ulong tag)
{
foreach(ulong currentTag; reservedTags)
{
if(currentTag == tag)
{
return true;
}
}
return false;
}
}
public final class TristanFokop : Exception
{
this(string message)
{
super(message);
}
}