Deleted old code

This commit is contained in:
Tristan B. Kildaire 2020-09-29 19:13:21 +02:00
parent 4fab6a3727
commit 176310761c
5 changed files with 0 additions and 628 deletions

View File

@ -1,77 +0,0 @@
module tristanable.garbage;
import tristanable.manager : Manager;
import tristanable.request : Request;
import std.socket : Socket;
import core.thread : Thread, Duration, dur;
import bmessage : receiveMessage;
public final class GarbageCollector : Thread
{
/**
* The associated manager
*/
private Manager manager;
/**
* The queue variable pointer
*/
private Request[]* requestQueueVariable;
/**
* Whether or not the watcher is active
*/
private bool isActive;
this(Manager manager)
{
/* Set the worker function */
super(&cleaner);
/* Set the manager */
this.manager = manager;
/* Set the pointer */
requestQueueVariable = cast(Request[]*)manager.getQueueVariable();
isActive = true;
}
public void stopGC()
{
isActive = false;
}
/* TODO: Add timeout ability */
private void cleaner()
{
while(isActive)
{
/* Lock the queue */
manager.lockQueue();
/* Construct a new list */
Request[] newList;
/* Only add to this list undead requests */
foreach(Request request; *requestQueueVariable)
{
if(!request.isDead)
{
newList ~= request;
}
}
/* Update the queue to the new queue */
*requestQueueVariable = newList;
/* Unlock the queue */
manager.unlockQueue();
/* Sleep for 60 seconds after cleaning up */
sleep(dur!("seconds")(60));
}
}
}

View File

@ -1,319 +0,0 @@
module tristanable.manager;
import tristanable.watcher : Watcher;
import tristanable.request : Request;
import tristanable.garbage : GarbageCollector;
import tristanable.encoding : DataMessage;
import tristanable.notifications : NotificationReply;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
/* TODO: Watcher class to watch for stuff, and add to manager's queues */
/* TODO: maneger class to use commands on, enqueue and wait for dequeue */
public final class Manager
{
/* TODO: Insert queues here */
/**
* The queue of outstanding requests
*/
private Request[] requestQueue;
/**
* Reserved tags
*/
private ulong[] reservedTags;
/**
* The queue of received notifications
*/
private NotificationReply[] notificationQueue;
/**
* The associated Watcher object for this manager.
*/
private Watcher watcher;
/**
* The list mutex
*/
private Mutex queueMutex;
/**
* The notification queue mutex
*/
private Mutex notificationMutex;
/**
* The reserved tags mutex
*/
private Mutex reservedTagsMutex;
/**
* The remote host
*/
private Socket socket;
/**
* The garbage collector
*/
private GarbageCollector gc;
this(Socket endpoint)
{
/* Set the socket */
socket = endpoint;
/* Create the watcher */
watcher = new Watcher(this, endpoint);
/* Create the garbage collector */
gc = new GarbageCollector(this);
/* Initialize the `requestQueue` mutex */
queueMutex = new Mutex();
/* Initialize the `notificationQueue` mutex */
notificationMutex = new Mutex();
/* Initialize the `reservedTags` mutex */
reservedTagsMutex = new Mutex();
/* Start the watcher */
watcher.start();
/* Start the garbage collector */
gc.start();
}
public void stopManager()
{
/* Will caue watcher to not block */
socket.close();
/* Stop watcher */
watcher.stopWatcher();
/* Stop gc */
gc.stopGC();
/* Wait for watcher thread to stop */
watcher.join();
/* Wait for garbage collector thread to stop */
gc.join();
}
public void sendMessage(ulong tag, byte[] data)
{
/* Encode the message */
DataMessage dataMessage = new DataMessage(tag, data);
/* Construct the message array */
byte[] messageData = dataMessage.encode();
/* Create a new Request */
Request newRequest = new Request(tag);
/* Lock the queue for reading */
lockQueue();
/* Add the request to the request queue */
requestQueue ~= newRequest;
/* Unlock the queue */
unlockQueue();
/* Send the message */
bSendMessage(socket, messageData);
}
public bool isValidTag(ulong tag)
{
for(ulong i = 0; i < requestQueue.length; i++)
{
/* Get the request */
Request request = requestQueue[i];
/**
* Only if the tag is found then return true
* and if it is the fresh tagged request (not
* ones that are dead using the) same tag.
*/
if(request.isDead == false && request.tag == tag)
{
return true;
}
}
return false;
}
public ulong getTagPosition(ulong tag)
{
for(ulong i = 0; i < requestQueue.length; i++)
{
/* Get the request */
Request request = requestQueue[i];
/**
* Only if the tag is found then return its
* posistion and if it is the fresh tagged
* request (not ones that are dead using the)
* same tag.
*/
if(request.isDead == false && request.tag == tag)
{
return i;
}
}
return 0;
}
public byte[] receiveMessage(ulong tag)
{
/* The received data */
byte[] receivedData;
bool active = true;
/* Loop till fulfilled */
while(active)
{
/* Lock the queue for reading */
lockQueue();
/* Throw an exception if it doesn't exist */
if(!isValidTag(tag))
{
/* Unlock the queue */
unlockQueue();
/* Throw exception here */
throw new TristanFokop("Invalid tag");
}
/* Get the request */
Request request = requestQueue[getTagPosition(tag)];
/* Check if the request has been fulfilled */
if(request.isFulfilled())
{
receivedData = request.pullData();
active = false;
}
/* Unlock the queue */
unlockQueue();
}
return receivedData;
}
public Request[] getQueue()
{
return requestQueue;
}
public Request[]* getQueueVariable()
{
return &requestQueue;
}
public void lockQueue()
{
queueMutex.lock();
}
public void unlockQueue()
{
queueMutex.unlock();
}
public void lockNotificationQueue()
{
notificationMutex.lock();
}
public void unlockNotificationQueue()
{
notificationMutex.unlock();
}
public NotificationReply[] popNotifications()
{
/* The notifications at this moment */
NotificationReply[] currentNotificationSet;
/* Lock the notification queue */
lockNotificationQueue();
/* Copy the current notifications */
currentNotificationSet = notificationQueue;
/* Empty the notification list */
notificationQueue.length = 0;
/* Unlock the notification queue */
unlockNotificationQueue();
return currentNotificationSet;
}
public void reserveTag(ulong tag)
{
/* Lock the reservedTags mutex */
reservedTagsMutex.lock();
/* Add the reserved tag */
reservedTags ~= tag;
/* Unlock the reservedTags mutex */
reservedTagsMutex.unlock();
}
public bool isReservedTag(ulong tag)
{
/* Lock the reservedTags mutex */
reservedTagsMutex.lock();
bool found;
foreach(ulong currentTag; reservedTags)
{
if(currentTag == tag)
{
found = true;
break;
}
}
/* Unlock the reservedTags mutex */
reservedTagsMutex.unlock();
return found;
}
public void addNotification(NotificationReply notificationReply)
{
/* Lock the notification queue */
lockNotificationQueue();
/* Append the notification */
notificationQueue ~= notificationReply;
/* Unlock the notification queue */
unlockNotificationQueue();
}
}
public final class TristanFokop : Exception
{
this(string message)
{
super(message);
}
}

View File

@ -1,39 +0,0 @@
/**
* NotificationReply
*
* When a tag is reserved and a message is received
* with such a tag then one of these is generated
* and added to the queue of notification replies.
*
* Multiple of these will be made and enqueued even
* if they have the same tag (duplicates allowed).
*
* This facilitates a notification system if one
* wants to use tristanable for that purpose (this
* is because notifications _just happen_ and have
* no prior request)
*/
module tristanable.notifications;
public class NotificationReply
{
private ulong tag;
private byte[] data;
this(ulong tag, byte[] data)
{
this.tag = tag;
this.data = data;
}
public byte[] getData()
{
return data;
}
public ulong getTag()
{
return tag;
}
}

View File

@ -1,77 +0,0 @@
module tristanable.request;
import std.conv : to;
/**
* Request
*
* This type represents a placeholder for an
* expected response caused by the sending of
* an original message with a matching tag.
*/
public final class Request
{
/**
* The data received
*/
public byte[] dataReceived;
/**
* Whether or not this request has been
* fulfilled or not.
*/
private bool fulfilled;
/**
* Whether the request has been depleted
*/
public bool isDead;
/**
* The tag for this request
*/
public ulong tag;
/**
* Make a new Request with the provided tag
* `tag`.
*/
this(ulong tag)
{
this.tag = tag;
}
public void fulfill(byte[] data)
{
dataReceived = data;
fulfilled = true;
}
public bool isFulfilled()
{
return fulfilled;
}
public byte[] pullData()
{
isDead = true;
return dataReceived;
}
override public string toString()
{
/* the toString string */
string toStringString;
/* Add the Request tag info */
toStringString ~= "Request (Tag: " ~ to!(string)(tag);
/* Add the Request arrival status */
toStringString ~= ", Arrived: " ~ to!(string)(fulfilled);
/* Add the IsDead tag info */
toStringString ~= ", Used: " ~ to!(string)(isDead) ~ ")";
return toStringString;
}
}

View File

@ -1,116 +0,0 @@
module tristanable.watcher;
import tristanable.manager : Manager;
import tristanable.request : Request;
import tristanable.notifications : NotificationReply;
import std.socket : Socket;
import core.thread : Thread;
import bmessage : receiveMessage;
/* TODO: Watcher class to watch for stuff, and add to manager's queues */
/* TODO: maneger class to use commands on, enqueue and wait for dequeue */
public final class Watcher : Thread
{
/**
* The associated Manager
*
* Used to access the queues.
*/
private Manager manager;
/**
* The endpoint host we are connected to
*/
private Socket endpoint;
/**
* Whether or not the watcher is active
*/
private bool isActive;
this(Manager manager, Socket endpoint)
{
super(&watchLoop);
this.manager = manager;
this.endpoint = endpoint;
isActive = true;
}
public void stopWatcher()
{
isActive = false;
}
private void watchLoop()
{
while(isActive)
{
/* The received message (tag+data) */
byte[] receivedPayload;
/* The message's tag */
ulong receivedTag;
/* The message's data */
byte[] receivedMessage;
/* Receive a message */
bool recvStatus = receiveMessage(endpoint, receivedPayload);
/* Only continue if the receive was a success */
if(recvStatus)
{
/* Fetch the `tag` */
receivedTag = *(cast(ulong*)receivedPayload.ptr);
/* Fetch the `data` */
receivedMessage = receivedPayload[8..receivedPayload.length];
/* Lock the queue for reading */
manager.lockQueue();
/* Get the queue */
Request[] currentQueue = manager.getQueue();
/* Check to see if this is a tag we are awaiting */
bool foundTag = manager.isValidTag(receivedTag);
ulong requestPosition = manager.getTagPosition(receivedTag);
/**
* Check if the tag was found
*
* This only accounts for tags requested
*/
if(foundTag)
{
/* Fulfill the request */
currentQueue[requestPosition].fulfill(receivedMessage);
}
/**
* Check if the tag was reservd
*/
else if(manager.isReservedTag(receivedTag))
{
/* Create the NotificationReply */
NotificationReply notifyReply = new NotificationReply(receivedTag, receivedMessage);
/* Add the notification */
manager.addNotification(notifyReply);
}
else
{
/* TODO: */
}
/* Unlock the queue */
manager.unlockQueue();
}
else
{
/* TODO: Add error handling */
}
}
}
}