WIP: tristanable-ng

This commit is contained in:
Tristan B. Kildaire 2020-09-29 11:57:25 +02:00
parent 02a29b5293
commit 1f07b06316
4 changed files with 240 additions and 0 deletions

View File

@ -0,0 +1,52 @@
public final class Manager
{
/* All queues */
private Queue[] queues;
private Mutex queuesLock;
/* TODO Add drop queue? */
this()
{
}
public void addQueue()
{
}
private bool isValidTag_callerThreadSafe(ulong tag)
{
bool tagExists;
foreach(Queue queue; queues)
{
if(queue.getTag() == tag)
{
tagExists = true;
break;
}
}
return tagExists;
}
public bool isValidTag(ulong tag)
{
/* Whether or not such a tagged queue exists */
bool tagExists;
queuesLock.lock();
tagExists = isValidTag_callerThreadSafe(tag);
queuesLock.unlock();
return tagExists;
}
}

123
source/tristanable/queue.d Normal file
View File

@ -0,0 +1,123 @@
/**
* Queue
*
* Represents a queue with a tag.
*
* Any messages that are received with
* the matching tag (to this queue) are
* then enqueued to this queue
*/
module tristanable.queue;
import tristanable.queueitem : QueueItem;
public final class Queue
{
/* This queue's tag */
private ulong tag;
/* The queue */
private QueueItem[] queue;
/* The queue mutex */
private Mutex queueLock;
/**
* Construct a new queue with the given
* tag
*/
this(ulong tag)
{
this.tag = tag;
/* Initialize the mutex */
queueLock = new Mutex();
}
public void enqueue(QueueItem item)
{
/* Lock the queue */
queueLock.lock();
/* Add it to the queue */
queue ~= item;
/* Unlock the queue */
queueLock.unlock();
}
/**
* Attempts to coninuously dequeue the
* head of the queue
*/
public QueueItem dequeue()
{
/* The head of the queue */
QueueItem queueHead;
while(!queueHead)
{
/* Lock the queue */
queueLock.lock();
/* Check if we can dequeue anything */
if(queue.length)
{
/* If we can then dequeue */
queueHead = queue[0];
/* Chop off the head */
offWithTheHead();
}
/* Unlock the queue */
queueLock.unlock();
/**
* Move away from this thread, let
* the watcher (presumably) try
* access our queue (successfully)
* by getting a lock on it
*
* Prevents us possibly racing back
* and locking queue again hence
* starving the system
*/
Thread.getThis().yield();
}
return queueHead;
}
/**
* Shifts the list and regenerates it to remove
* the current head
*
* Not thread safe but only called by thread
* safe (mutex locking) method
*/
private void offWithTheHead()
{
/* The new queue */
QueueItem[] newQueue;
/* Add everything but the first */
for(ulong i = 1; i < queue.length; i++)
{
newQueue ~= queue[i];
}
/* Make the the new queue */
queue = newQueue;
}
/**
* Returns the tag for this queue
*/
public ulong getTag()
{
return tag;
}
}

View File

@ -0,0 +1,18 @@
module tristanable.queueitem;
public final class QueueItem
{
/* This item's data */
private byte[] data;
/* TODO: */
this(byte[] data)
{
this.data = data;
}
public byte[] getData()
{
return data;
}
}

View File

@ -0,0 +1,47 @@
//module
public final class Watcher : Thread
{
this()
{
}
private void run()
{
/* Continuously dequeue tristanable packets from socket */
while(true)
{
/* Receive payload (tag+data) */
byte[] receivedPayload;
/* Block for socket response */
bool recvStatus = receiveMessage(endpoint, receivedPayload);
/* If the receive was successful */
if(recvStatus)
{
/* Decode the ttag-encoded message */
DataMessage message = DataMessage.decode(receivedPayload);
}
/* If the receive failed */
else
{
/* TODO: Stop everything */
break;
}
/**
* Like in `dequeue` we don't want the possibility
* of racing back to the top of the loop and locking
* the mutex again right before a thread switch,
* so we make sure that a switch occurs to a different
* thread
*/
Thread.getThis().yield();
}
}
}