Merge branch 'nextgen' into nextgen_listener

This commit is contained in:
Tristan B. Velloza Kildaire 2023-04-06 13:22:18 +02:00
commit ac7fea19d1
3 changed files with 104 additions and 38 deletions

View File

@ -72,7 +72,7 @@ public class Manager
// Starts the watcher
public void start()
{
watcher.start();
watcher.startWatcher();
}
// Stops the watcher

View File

@ -39,6 +39,12 @@ public class Watcher : Thread
super(&watch);
}
package void startWatcher()
{
/* Start the watch method on a new thread */
start();
}
/**
* Watches the socket for incoming messages
* and decodes them on the fly, placing
@ -46,54 +52,76 @@ public class Watcher : Thread
*/
private void watch()
{
import std.stdio;
while(true)
{
/* Do a bformat read-and-decode */
byte[] wireTristan;
receiveMessage(socket, wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("Before bformat recv()"); }
bool recvStatus = receiveMessage(socket, wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("After bformat recv()"); }
version(unittest) { writeln("bformat recv() status: ", recvStatus); }
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
import std.stdio;
writeln("Watcher received: ", decodedMessage);
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
if(recvStatus)
{
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
version(unittest) { writeln("Watcher received: ", decodedMessage); }
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
}
version(unittest) { writeln("drip"); }
}
/**
* If there was an error receiving on the socket.
*
* This can be either because we have shut the socket down
* or the remote end has closed the connection.
*
* In any case, exit the loop therefore ending this thread.
*/
else
{
break;
}
// TODO: Implement me
}
}
package void shutdown()
{
/* Unblock all calls to `recv()` and disallow future ones */
// TODO: Would we want to do the same for sends? */
socket.shutdown(SocketShutdown.RECEIVE);
/* Close the connection */
socket.close();
}
}
@ -120,13 +148,13 @@ unittest
{
Socket clientSocket = server.accept();
Thread.sleep(dur!("seconds")(4));
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/**
* Create a tagged message to send
*
* tag 42 payload Hello
* tag 42 payload Cucumber 😳
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
@ -144,7 +172,17 @@ unittest
writeln("server send status: ", sendMessage(clientSocket, tEncoded));
writeln("server send [done]");
/**
* Create a tagged message to send
*
* tag 69 payload Bye
*/
message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode();
writeln("server send status: ", sendMessage(clientSocket, tEncoded));
writeln("server send [done]");
}
}
@ -176,9 +214,24 @@ unittest
TaggedMessage dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == "Hello");
assert(dequeuedMessage.getPayload() == cast(byte[])"Hello");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = sixtyNine.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 69);
assert(dequeuedMessage.getPayload() == cast(byte[])"Bye");
/* Block on the unittest thread for a received message */
writeln("unittest thread: Dequeue() blocking...");
dequeuedMessage = fortyTwo.dequeue();
writeln("unittest thread: Got '"~dequeuedMessage.toString()~"' decode payload to string '"~cast(string)dequeuedMessage.getPayload()~"'");
assert(dequeuedMessage.getTag() == 42);
assert(dequeuedMessage.getPayload() == cast(byte[])"Cucumber 😳️");
// while(true){}
/* Stop the manager */
manager.stop();
}

View File

@ -10,6 +10,7 @@ import libsnooze;
import core.sync.mutex : Mutex;
import std.container.slist : SList;
import tristanable.encoding;
import core.thread : dur;
version(unittest)
{
@ -34,6 +35,15 @@ public class Queue
private ulong queueID;
/**
* Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this
* object) which ensures that a call to dequeue will immediately
* unblock on the first message received under this tag
*
* Params:
* queueID = the id to use for this queue
*/
this(ulong queueID)
{
/* Initialize the queue lock */
@ -44,6 +54,9 @@ public class Queue
/* Set the queue id */
this.queueID = queueID;
/* Ensure pipe existence (see https://deavmi.assigned.network/git/deavmi/tristanable/issues/5) */
event.wait(dur!("seconds")(0));
}
/**