- Added package-level accessible `startWatcher()` method which calls `start()` for us
- Added some debugging prints which will now only be compiled-in during unittest builds
- If the bformat `receiveMessage(Socket, ref byte[])` method fails (returns `false`) then exit the loop, only continue decoding if it is `true`
- Implemented package-level accesible `shutdown()` method

Manager

- `start()` now calls `watcher.startWatcher()` instead of `watcher.start()`
This commit is contained in:
Tristan B. Velloza Kildaire 2023-04-06 13:13:38 +02:00
parent 2fa77e639f
commit 375a611a82
2 changed files with 61 additions and 34 deletions

View File

@ -72,7 +72,7 @@ public class Manager
// Starts the watcher
public void start()
{
watcher.start();
watcher.startWatcher();
}
// Stops the watcher

View File

@ -39,6 +39,12 @@ public class Watcher : Thread
super(&watch);
}
package void startWatcher()
{
/* Start the watch method on a new thread */
start();
}
/**
* Watches the socket for incoming messages
* and decodes them on the fly, placing
@ -46,54 +52,76 @@ public class Watcher : Thread
*/
private void watch()
{
import std.stdio;
while(true)
{
/* Do a bformat read-and-decode */
byte[] wireTristan;
receiveMessage(socket, wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("Before bformat recv()"); }
bool recvStatus = receiveMessage(socket, wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("After bformat recv()"); }
version(unittest) { writeln("bformat recv() status: ", recvStatus); }
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
import std.stdio;
writeln("Watcher received: ", decodedMessage);
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
if(recvStatus)
{
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
/* Decode the received bytes into a tagged message */
TaggedMessage decodedMessage = TaggedMessage.decode(wireTristan);
version(unittest) { writeln("Watcher received: ", decodedMessage); }
/* Search for the queue with the id provided */
ulong messageTag = decodedMessage.getTag();
Queue potentialQueue = manager.getQueue_nothrow(messageTag);
/* If a queue can be found */
if(potentialQueue !is null)
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
/* If the queue if not found */
else
{
/**
* Look for a default queue, and if one is found
* then enqueue the message there. Otherwise, drop
* it by simply doing nothing.
*/
try
{
potentialQueue = manager.getDefaultQueue();
/* Enqueue the message */
potentialQueue.enqueue(decodedMessage);
}
catch(TristanableException e) {}
}
version(unittest) { writeln("drip"); }
}
/**
* If there was an error receiving on the socket.
*
* This can be either because we have shut the socket down
* or the remote end has closed the connection.
*
* In any case, exit the loop therefore ending this thread.
*/
else
{
break;
}
// TODO: Implement me
}
}
package void shutdown()
{
/* Unblock all calls to `recv()` and disallow future ones */
// TODO: Would we want to do the same for sends? */
socket.shutdown(SocketShutdown.RECEIVE);
/* Close the connection */
socket.close();
}
}
@ -155,7 +183,6 @@ unittest
writeln("server send status: ", sendMessage(clientSocket, tEncoded));
writeln("server send [done]");
}
}