Compare commits

...

10 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire 815e411e64 Watcher (unitests)
- Renamed
2023-11-26 19:27:04 +02:00
Tristan B. Velloza Kildaire 8ac7a5485c Watcher
- When we exit the read loop (upon an error) call `stop_FailedWatcher()` on the associated `Manager`

Watcher (unittests)

- Added a unit test for the above test case
2023-11-26 19:25:36 +02:00
Tristan B. Velloza Kildaire c5a6612a4b Manager
- Updated documentation for `stop()`
- Added `stop(ErrorType)` which will let you set the error type used for unblocking the `dequeue()` calls
- Added `stop_FailedWatcher()` which sets the `ErrorType` to `WATCHER_FAILED`
- `shutdownAllQueues()` now takes in a `ErrorType`
2023-11-26 19:23:21 +02:00
Tristan B. Velloza Kildaire aff9787dba ErrorType
- Added new member `WATCHER_FAILED`
2023-11-26 19:16:30 +02:00
Tristan B. Velloza Kildaire 75547033d9 Watcher
- Don't call `stop()` on the `Manager` when exiting the loop
2023-11-26 19:11:52 +02:00
Tristan B. Velloza Kildaire 169f47dd8d Watcher
- Hoisted common imports into a `version(unittest)`

Watcher (unittests)

- Added a unittest for testing `stop()` to unblock `dequeue()`s
2023-11-26 19:11:25 +02:00
Tristan B. Velloza Kildaire 31f7b6355f Manager
- Implemented `shutdownAllQueues()`
- Calling `stop()` now shuts down all queues
2023-11-26 19:09:46 +02:00
Tristan B. Velloza Kildaire 08757f27f2 ErrorType
- Added new member `MANAGER_SHUTDOWN`
2023-11-26 19:00:56 +02:00
Tristan B. Velloza Kildaire 713c102da5 Queue
- Added an `exitReason` and an `alive` (set to `true` on construction)
- Calling `shutdownQueue(ErrorType)` will set the exit reason, will also set the aliveness to `false` and wake up ALL `dequeue()`'s blocking
- `dequeue()` first check in wakeup routine duty cycle is to check if we are alive
2023-11-26 18:58:53 +02:00
Tristan B. Velloza Kildaire 198cb52342 ErrorType
- Added enum member `UNSET`
2023-11-26 18:51:38 +02:00
4 changed files with 343 additions and 7 deletions

View File

@ -10,6 +10,23 @@ import std.conv : to;
*/
public enum ErrorType
{
/**
* Unset
*/
UNSET,
/**
* If the manager has already
* been shutdown
*/
MANAGER_SHUTDOWN,
/**
* If the watcher has failed
* to stay alive
*/
WATCHER_FAILED,
/**
* If the requested queue could not be found
*/

View File

@ -93,12 +93,67 @@ public class Manager
* Stops the management of the socket, resulting
* in ending the updating of queues and closing
* the underlying connection
*
* Calling this will also unblock any calls that
* were blocking whilst doing a `dequeue()`
*/
public void stop()
{
/* Stop with the given reason */
stop(ErrorType.MANAGER_SHUTDOWN);
}
/**
* Only called by the `Watcher` and for
* the purpose of setting a custom error
* type.
*
* Called when the network read fails
*/
void stop_FailedWatcher()
{
/* Stop with the given reason */
stop(ErrorType.WATCHER_FAILED);
}
/**
* Stops the watcher service and then
* unblocks all calls to `dequeue()`
* by shutting down each `Queue`
*
* Params:
* reason = the reason for the
* shutdown
*/
private void stop(ErrorType reason)
{
/* Stop the watcher */
watcher.shutdown();
// TODO: Unblock ALL queues here
/* Unblock all `dequeue()` calls */
shutdownAllQueues(reason);
}
/**
* Shuts down all registered queues
*/
protected void shutdownAllQueues(ErrorType reason)
{
/* Lock the queue of queues */
queuesLock.lock();
/* On return or error */
scope(exit)
{
/* Unlock the queue of queues */
queuesLock.unlock();
}
/* Shutdown each queue */
foreach(Queue queue; this.queues)
{
queue.shutdownQueue(reason);
}
}
/**

View File

@ -129,7 +129,13 @@ public class Watcher : Thread
}
}
// TODO: Unblock all `dequeue()`'s here
version(unittest) { writeln("Exited watcher loop"); }
// NOTE: This will also be run on normal user-initiated `stop()`
// ... but will just try shutdown an alreayd shutdown manager
// ... again and try shut our already-closed river stream
// Shutdown and unblock all `dequeue()` calls
this.manager.stop_FailedWatcher();
}
/**
@ -143,6 +149,13 @@ public class Watcher : Thread
}
}
version(unittest)
{
import std.socket;
import std.stdio;
import core.thread;
}
/**
* Set up a server which will send some tagged messages to us (the client),
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
@ -150,10 +163,6 @@ public class Watcher : Thread
*/
unittest
{
import std.socket;
import std.stdio;
import core.thread;
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
@ -296,4 +305,224 @@ unittest
/* Stop the manager */
manager.stop();
}
/**
* Setup a `Manager` and then block on a `dequeue()`
* but from another thread shutdown the `Manager`.
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{
writeln("<<<<< Test 3 start >>>>>");
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
class ServerThread : Thread
{
this()
{
super(&worker);
}
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
/**
* Create a tagged message to send
*
* tag 42 payload Cucumber 😳
*/
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode();
writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]");
sleep(dur!("seconds")(15));
writeln("Server ending");
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
manager.registerQueue(sixtyNine);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
// The failing exception
TristanableException failingException;
class DequeueThread : Thread
{
private Queue testQueue;
this(Queue testQueue)
{
super(&worker);
this.testQueue = testQueue;
}
public void worker()
{
try
{
writeln("dequeuThread: Before dequeue()");
this.testQueue.dequeue();
writeln("dequeueThread: After dequeue() [should not get here]");
}
catch(TristanableException e)
{
writeln("Got tristanable exception during dequeue(): "~e.toString());
// TODO: Fliup boolean is all cgood and assret it later
failingException = e;
}
}
}
DequeueThread dequeueThread = new DequeueThread(sixtyNine);
dequeueThread.start();
// Stop the manager
manager.stop();
writeln("drop");
// Wait for the dequeueing thread to stop
dequeueThread.join();
// Check condition
assert(failingException !is null);
assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN);
}
/**
* Setup a server which dies (kills its connection to us)
* midway whilst we are doing a `dequeue()`
*
* This is to test the exception triggering mechanism
* for such a case
*/
unittest
{
writeln("<<<<< Test 4 start >>>>>");
Address serverAddress = parseAddress("::1", 0);
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
server.bind(serverAddress);
server.listen(0);
class ServerThread : Thread
{
this()
{
super(&worker);
}
private void worker()
{
Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
Thread.sleep(dur!("seconds")(7));
writeln("Server start");
sleep(dur!("seconds")(15));
writeln("Server ending");
// Close the connection
bClient.close();
}
}
ServerThread serverThread = new ServerThread();
serverThread.start();
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
writeln(server.localAddress);
Manager manager = new Manager(client);
Queue sixtyNine = new Queue(69);
manager.registerQueue(sixtyNine);
/* Connect our socket to the server */
client.connect(server.localAddress);
/* Start the manager and let it manage the socket */
manager.start();
// The failing exception
TristanableException failingException;
class DequeueThread : Thread
{
private Queue testQueue;
this(Queue testQueue)
{
super(&worker);
this.testQueue = testQueue;
}
public void worker()
{
try
{
writeln("dequeuThread: Before dequeue()");
this.testQueue.dequeue();
writeln("dequeueThread: After dequeue() [should not get here]");
}
catch(TristanableException e)
{
writeln("Got tristanable exception during dequeue(): "~e.toString());
// TODO: Fliup boolean is all cgood and assret it later
failingException = e;
}
}
}
DequeueThread dequeueThread = new DequeueThread(sixtyNine);
dequeueThread.start();
// Wait for the dequeueing thread to stop
dequeueThread.join();
// Check condition
assert(failingException !is null);
assert(failingException.getError() == ErrorType.WATCHER_FAILED);
}

View File

@ -62,6 +62,13 @@ public class Queue
*/
private Duration wakeInterval;
/**
* Reason for a `dequeue()`
* to have failed
*/
private ErrorType exitReason;
private bool alive;
/**
* Constructs a new Queue and immediately sets up the notification
* sub-system for the calling thread (the thread constructing this
@ -85,6 +92,9 @@ public class Queue
/* Set the slumber interval */
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
/* Set status to alive */
this.alive = true;
}
/**
@ -157,6 +167,25 @@ public class Queue
}
}
public void shutdownQueue(ErrorType reason)
{
// Set running state and reason
this.alive = false;
this.exitReason = reason;
// Wakeup sleeping dequeue()
// Lock the mutex
this.mutex.lock();
// Awake all condition variable sleepers
this.signal.notifyAll();
// Unlock the mutex
this.mutex.unlock();
}
// TODO: Make a version of this which can time out
/**
@ -188,6 +217,13 @@ public class Queue
/* Block till we dequeue a message successfully */
while(dequeuedMessage is null)
{
/* Check if this queue is still alive */
if(!this.alive)
{
// Throw an exception to unblock the calling `dequeue()`
throw new TristanableException(this.exitReason);
}
scope(exit)
{
// Unlock the mutex
@ -207,7 +243,6 @@ public class Queue
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
}
/* Lock the item queue */
queueLock.lock();