Added new changes

This commit is contained in:
Tristan B. Kildaire 2020-09-29 19:13:36 +02:00
parent 176310761c
commit 24d56c93a9
2 changed files with 91 additions and 9 deletions

View File

@ -1,3 +1,7 @@
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
public final class Manager
{
/* All queues */
@ -6,15 +10,64 @@ public final class Manager
/* TODO Add drop queue? */
this()
/**
* The remote host
*/
private Socket socket;
/**
* Constructs a new Manager with the given
* endpoint Socket
*
*/
this(Socket socket)
{
/* Set the socket */
this.socket = socket;
/* Initialize the queues mutex */
queuesLock = new Mutex();
/* Initialize the watcher */
}
public void addQueue()
public Queue getQueue(ulong tag)
{
Queue matchingQueue;
queuesLock.lock();
foreach(Queue queue; queues)
{
if(queue.getTag() == tag)
{
matchingQueue = queue;
break;
}
}
queuesLock.unlock();
return matchingQueue;
}
public void addQueue(Queue queue)
{
queuesLock.lock();
/* Make sure such a tag does not exist already */
if(!isValidTag_callerThreadSafe(queue.getTag()))
{
queues ~= queue;
}
else
{
/* TODO: Throw an error here */
}
queuesLock.unlock();
}
private bool isValidTag_callerThreadSafe(ulong tag)
@ -49,4 +102,9 @@ public final class Manager
return tagExists;
}
public void shutdown()
{
/* TODO: Implement me */
}
}

View File

@ -1,10 +1,18 @@
//module
module tristanable.watcher;
public final class Watcher : Thread
{
this()
/* The manager */
private Manager manager;
/* The socket to read from */
private Socket socket;
this(Manager manager, Socket endpoint)
{
super(&run);
this.manager = manager;
socket = endpoint;
}
private void run()
@ -24,8 +32,24 @@ public final class Watcher : Thread
/* Decode the ttag-encoded message */
DataMessage message = DataMessage.decode(receivedPayload);
/* TODO: Remove isTag, improve later, oneshot */
/* The matching queue (if any) */
Queue queue = manager.getQueue(message.getTag());
/* If the tag belongs to a queue */
if(queue)
{
/* Add an item to this queue */
queue.enqueue(new QueueItem(message.getData()));
}
/* If the tag is unknwon */
else
{
/* TODO: Add to dropped queue? */
/* Do nothing */
}
}
/* If the receive failed */
else