Compare commits
10 Commits
6c9df711f8
...
815e411e64
Author | SHA1 | Date |
---|---|---|
Tristan B. Velloza Kildaire | 815e411e64 | |
Tristan B. Velloza Kildaire | 8ac7a5485c | |
Tristan B. Velloza Kildaire | c5a6612a4b | |
Tristan B. Velloza Kildaire | aff9787dba | |
Tristan B. Velloza Kildaire | 75547033d9 | |
Tristan B. Velloza Kildaire | 169f47dd8d | |
Tristan B. Velloza Kildaire | 31f7b6355f | |
Tristan B. Velloza Kildaire | 08757f27f2 | |
Tristan B. Velloza Kildaire | 713c102da5 | |
Tristan B. Velloza Kildaire | 198cb52342 |
|
@ -10,6 +10,23 @@ import std.conv : to;
|
|||
*/
|
||||
public enum ErrorType
|
||||
{
|
||||
/**
|
||||
* Unset
|
||||
*/
|
||||
UNSET,
|
||||
|
||||
/**
|
||||
* If the manager has already
|
||||
* been shutdown
|
||||
*/
|
||||
MANAGER_SHUTDOWN,
|
||||
|
||||
/**
|
||||
* If the watcher has failed
|
||||
* to stay alive
|
||||
*/
|
||||
WATCHER_FAILED,
|
||||
|
||||
/**
|
||||
* If the requested queue could not be found
|
||||
*/
|
||||
|
|
|
@ -93,12 +93,67 @@ public class Manager
|
|||
* Stops the management of the socket, resulting
|
||||
* in ending the updating of queues and closing
|
||||
* the underlying connection
|
||||
*
|
||||
* Calling this will also unblock any calls that
|
||||
* were blocking whilst doing a `dequeue()`
|
||||
*/
|
||||
public void stop()
|
||||
{
|
||||
/* Stop with the given reason */
|
||||
stop(ErrorType.MANAGER_SHUTDOWN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Only called by the `Watcher` and for
|
||||
* the purpose of setting a custom error
|
||||
* type.
|
||||
*
|
||||
* Called when the network read fails
|
||||
*/
|
||||
void stop_FailedWatcher()
|
||||
{
|
||||
/* Stop with the given reason */
|
||||
stop(ErrorType.WATCHER_FAILED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops the watcher service and then
|
||||
* unblocks all calls to `dequeue()`
|
||||
* by shutting down each `Queue`
|
||||
*
|
||||
* Params:
|
||||
* reason = the reason for the
|
||||
* shutdown
|
||||
*/
|
||||
private void stop(ErrorType reason)
|
||||
{
|
||||
/* Stop the watcher */
|
||||
watcher.shutdown();
|
||||
|
||||
// TODO: Unblock ALL queues here
|
||||
/* Unblock all `dequeue()` calls */
|
||||
shutdownAllQueues(reason);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuts down all registered queues
|
||||
*/
|
||||
protected void shutdownAllQueues(ErrorType reason)
|
||||
{
|
||||
/* Lock the queue of queues */
|
||||
queuesLock.lock();
|
||||
|
||||
/* On return or error */
|
||||
scope(exit)
|
||||
{
|
||||
/* Unlock the queue of queues */
|
||||
queuesLock.unlock();
|
||||
}
|
||||
|
||||
/* Shutdown each queue */
|
||||
foreach(Queue queue; this.queues)
|
||||
{
|
||||
queue.shutdownQueue(reason);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -129,7 +129,13 @@ public class Watcher : Thread
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Unblock all `dequeue()`'s here
|
||||
version(unittest) { writeln("Exited watcher loop"); }
|
||||
|
||||
// NOTE: This will also be run on normal user-initiated `stop()`
|
||||
// ... but will just try shutdown an alreayd shutdown manager
|
||||
// ... again and try shut our already-closed river stream
|
||||
// Shutdown and unblock all `dequeue()` calls
|
||||
this.manager.stop_FailedWatcher();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -143,6 +149,13 @@ public class Watcher : Thread
|
|||
}
|
||||
}
|
||||
|
||||
version(unittest)
|
||||
{
|
||||
import std.socket;
|
||||
import std.stdio;
|
||||
import core.thread;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up a server which will send some tagged messages to us (the client),
|
||||
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
|
||||
|
@ -150,10 +163,6 @@ public class Watcher : Thread
|
|||
*/
|
||||
unittest
|
||||
{
|
||||
import std.socket;
|
||||
import std.stdio;
|
||||
import core.thread;
|
||||
|
||||
Address serverAddress = parseAddress("::1", 0);
|
||||
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
server.bind(serverAddress);
|
||||
|
@ -296,4 +305,224 @@ unittest
|
|||
|
||||
/* Stop the manager */
|
||||
manager.stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup a `Manager` and then block on a `dequeue()`
|
||||
* but from another thread shutdown the `Manager`.
|
||||
*
|
||||
* This is to test the exception triggering mechanism
|
||||
* for such a case
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
writeln("<<<<< Test 3 start >>>>>");
|
||||
|
||||
Address serverAddress = parseAddress("::1", 0);
|
||||
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
server.bind(serverAddress);
|
||||
server.listen(0);
|
||||
|
||||
class ServerThread : Thread
|
||||
{
|
||||
this()
|
||||
{
|
||||
super(&worker);
|
||||
}
|
||||
|
||||
private void worker()
|
||||
{
|
||||
Socket clientSocket = server.accept();
|
||||
BClient bClient = new BClient(clientSocket);
|
||||
|
||||
Thread.sleep(dur!("seconds")(7));
|
||||
writeln("Server start");
|
||||
|
||||
/**
|
||||
* Create a tagged message to send
|
||||
*
|
||||
* tag 42 payload Cucumber 😳️
|
||||
*/
|
||||
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
|
||||
byte[] tEncoded = message.encode();
|
||||
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||
|
||||
writeln("server send [done]");
|
||||
|
||||
sleep(dur!("seconds")(15));
|
||||
|
||||
writeln("Server ending");
|
||||
}
|
||||
}
|
||||
|
||||
ServerThread serverThread = new ServerThread();
|
||||
serverThread.start();
|
||||
|
||||
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
|
||||
writeln(server.localAddress);
|
||||
|
||||
|
||||
Manager manager = new Manager(client);
|
||||
|
||||
Queue sixtyNine = new Queue(69);
|
||||
|
||||
manager.registerQueue(sixtyNine);
|
||||
|
||||
|
||||
/* Connect our socket to the server */
|
||||
client.connect(server.localAddress);
|
||||
|
||||
/* Start the manager and let it manage the socket */
|
||||
manager.start();
|
||||
|
||||
|
||||
// The failing exception
|
||||
TristanableException failingException;
|
||||
|
||||
class DequeueThread : Thread
|
||||
{
|
||||
private Queue testQueue;
|
||||
|
||||
this(Queue testQueue)
|
||||
{
|
||||
super(&worker);
|
||||
this.testQueue = testQueue;
|
||||
}
|
||||
|
||||
public void worker()
|
||||
{
|
||||
try
|
||||
{
|
||||
writeln("dequeuThread: Before dequeue()");
|
||||
this.testQueue.dequeue();
|
||||
writeln("dequeueThread: After dequeue() [should not get here]");
|
||||
}
|
||||
catch(TristanableException e)
|
||||
{
|
||||
writeln("Got tristanable exception during dequeue(): "~e.toString());
|
||||
|
||||
// TODO: Fliup boolean is all cgood and assret it later
|
||||
failingException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DequeueThread dequeueThread = new DequeueThread(sixtyNine);
|
||||
dequeueThread.start();
|
||||
|
||||
// Stop the manager
|
||||
manager.stop();
|
||||
writeln("drop");
|
||||
|
||||
// Wait for the dequeueing thread to stop
|
||||
dequeueThread.join();
|
||||
|
||||
// Check condition
|
||||
assert(failingException !is null);
|
||||
assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN);
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup a server which dies (kills its connection to us)
|
||||
* midway whilst we are doing a `dequeue()`
|
||||
*
|
||||
* This is to test the exception triggering mechanism
|
||||
* for such a case
|
||||
*/
|
||||
unittest
|
||||
{
|
||||
writeln("<<<<< Test 4 start >>>>>");
|
||||
|
||||
Address serverAddress = parseAddress("::1", 0);
|
||||
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
server.bind(serverAddress);
|
||||
server.listen(0);
|
||||
|
||||
class ServerThread : Thread
|
||||
{
|
||||
this()
|
||||
{
|
||||
super(&worker);
|
||||
}
|
||||
|
||||
private void worker()
|
||||
{
|
||||
Socket clientSocket = server.accept();
|
||||
BClient bClient = new BClient(clientSocket);
|
||||
|
||||
Thread.sleep(dur!("seconds")(7));
|
||||
writeln("Server start");
|
||||
|
||||
sleep(dur!("seconds")(15));
|
||||
|
||||
writeln("Server ending");
|
||||
|
||||
// Close the connection
|
||||
bClient.close();
|
||||
}
|
||||
}
|
||||
|
||||
ServerThread serverThread = new ServerThread();
|
||||
serverThread.start();
|
||||
|
||||
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
|
||||
writeln(server.localAddress);
|
||||
|
||||
|
||||
Manager manager = new Manager(client);
|
||||
|
||||
Queue sixtyNine = new Queue(69);
|
||||
|
||||
manager.registerQueue(sixtyNine);
|
||||
|
||||
|
||||
/* Connect our socket to the server */
|
||||
client.connect(server.localAddress);
|
||||
|
||||
/* Start the manager and let it manage the socket */
|
||||
manager.start();
|
||||
|
||||
|
||||
// The failing exception
|
||||
TristanableException failingException;
|
||||
|
||||
class DequeueThread : Thread
|
||||
{
|
||||
private Queue testQueue;
|
||||
|
||||
this(Queue testQueue)
|
||||
{
|
||||
super(&worker);
|
||||
this.testQueue = testQueue;
|
||||
}
|
||||
|
||||
public void worker()
|
||||
{
|
||||
try
|
||||
{
|
||||
writeln("dequeuThread: Before dequeue()");
|
||||
this.testQueue.dequeue();
|
||||
writeln("dequeueThread: After dequeue() [should not get here]");
|
||||
}
|
||||
catch(TristanableException e)
|
||||
{
|
||||
writeln("Got tristanable exception during dequeue(): "~e.toString());
|
||||
|
||||
// TODO: Fliup boolean is all cgood and assret it later
|
||||
failingException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
DequeueThread dequeueThread = new DequeueThread(sixtyNine);
|
||||
dequeueThread.start();
|
||||
|
||||
// Wait for the dequeueing thread to stop
|
||||
dequeueThread.join();
|
||||
|
||||
// Check condition
|
||||
assert(failingException !is null);
|
||||
assert(failingException.getError() == ErrorType.WATCHER_FAILED);
|
||||
}
|
|
@ -62,6 +62,13 @@ public class Queue
|
|||
*/
|
||||
private Duration wakeInterval;
|
||||
|
||||
/**
|
||||
* Reason for a `dequeue()`
|
||||
* to have failed
|
||||
*/
|
||||
private ErrorType exitReason;
|
||||
private bool alive;
|
||||
|
||||
/**
|
||||
* Constructs a new Queue and immediately sets up the notification
|
||||
* sub-system for the calling thread (the thread constructing this
|
||||
|
@ -85,6 +92,9 @@ public class Queue
|
|||
|
||||
/* Set the slumber interval */
|
||||
this.wakeInterval = dur!("msecs")(50); // TODO: Decide on value
|
||||
|
||||
/* Set status to alive */
|
||||
this.alive = true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,6 +167,25 @@ public class Queue
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public void shutdownQueue(ErrorType reason)
|
||||
{
|
||||
// Set running state and reason
|
||||
this.alive = false;
|
||||
this.exitReason = reason;
|
||||
|
||||
// Wakeup sleeping dequeue()
|
||||
|
||||
// Lock the mutex
|
||||
this.mutex.lock();
|
||||
|
||||
// Awake all condition variable sleepers
|
||||
this.signal.notifyAll();
|
||||
|
||||
// Unlock the mutex
|
||||
this.mutex.unlock();
|
||||
}
|
||||
|
||||
// TODO: Make a version of this which can time out
|
||||
|
||||
/**
|
||||
|
@ -188,6 +217,13 @@ public class Queue
|
|||
/* Block till we dequeue a message successfully */
|
||||
while(dequeuedMessage is null)
|
||||
{
|
||||
/* Check if this queue is still alive */
|
||||
if(!this.alive)
|
||||
{
|
||||
// Throw an exception to unblock the calling `dequeue()`
|
||||
throw new TristanableException(this.exitReason);
|
||||
}
|
||||
|
||||
scope(exit)
|
||||
{
|
||||
// Unlock the mutex
|
||||
|
@ -207,7 +243,6 @@ public class Queue
|
|||
throw new TristanableException(ErrorType.DEQUEUE_FAILED);
|
||||
}
|
||||
|
||||
|
||||
/* Lock the item queue */
|
||||
queueLock.lock();
|
||||
|
||||
|
|
Loading…
Reference in New Issue