Watcher
- Hoisted common imports into a `version(unittest)` Watcher (unittests) - Added a unittest for testing `stop()` to unblock `dequeue()`s
This commit is contained in:
parent
31f7b6355f
commit
169f47dd8d
|
@ -130,6 +130,9 @@ public class Watcher : Thread
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Unblock all `dequeue()`'s here
|
// TODO: Unblock all `dequeue()`'s here
|
||||||
|
// TODO: Get a reason for exiting (either cause of error OR shutdoiwn (see below (which in turn is called by the Manager)))
|
||||||
|
version(unittest) { writeln("Exited watcher loop"); }
|
||||||
|
this.manager.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -143,6 +146,13 @@ public class Watcher : Thread
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
version(unittest)
|
||||||
|
{
|
||||||
|
import std.socket;
|
||||||
|
import std.stdio;
|
||||||
|
import core.thread;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set up a server which will send some tagged messages to us (the client),
|
* Set up a server which will send some tagged messages to us (the client),
|
||||||
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
|
* where we have setup a `Manager` to watch the queues with tags `42` and `69`,
|
||||||
|
@ -150,10 +160,6 @@ public class Watcher : Thread
|
||||||
*/
|
*/
|
||||||
unittest
|
unittest
|
||||||
{
|
{
|
||||||
import std.socket;
|
|
||||||
import std.stdio;
|
|
||||||
import core.thread;
|
|
||||||
|
|
||||||
Address serverAddress = parseAddress("::1", 0);
|
Address serverAddress = parseAddress("::1", 0);
|
||||||
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||||
server.bind(serverAddress);
|
server.bind(serverAddress);
|
||||||
|
@ -296,4 +302,132 @@ unittest
|
||||||
|
|
||||||
/* Stop the manager */
|
/* Stop the manager */
|
||||||
manager.stop();
|
manager.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup a server which dies (kills its connection to us)
|
||||||
|
* midway whilst we are doing a `dequeue()`
|
||||||
|
*
|
||||||
|
* This is to test the exception triggering mechanism
|
||||||
|
* for such a case
|
||||||
|
*/
|
||||||
|
unittest
|
||||||
|
{
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Setup a `Manager` and then block on a `dequeue()`
|
||||||
|
* but from another thread shutdown the `Manager`.
|
||||||
|
*
|
||||||
|
* This is to test the exception triggering mechanism
|
||||||
|
* for such a case
|
||||||
|
*/
|
||||||
|
unittest
|
||||||
|
{
|
||||||
|
writeln("<<<<< Test 4 start >>>>>");
|
||||||
|
|
||||||
|
Address serverAddress = parseAddress("::1", 0);
|
||||||
|
Socket server = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||||
|
server.bind(serverAddress);
|
||||||
|
server.listen(0);
|
||||||
|
|
||||||
|
class ServerThread : Thread
|
||||||
|
{
|
||||||
|
this()
|
||||||
|
{
|
||||||
|
super(&worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void worker()
|
||||||
|
{
|
||||||
|
Socket clientSocket = server.accept();
|
||||||
|
BClient bClient = new BClient(clientSocket);
|
||||||
|
|
||||||
|
Thread.sleep(dur!("seconds")(7));
|
||||||
|
writeln("Server start");
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a tagged message to send
|
||||||
|
*
|
||||||
|
* tag 42 payload Cucumber 😳️
|
||||||
|
*/
|
||||||
|
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
|
||||||
|
byte[] tEncoded = message.encode();
|
||||||
|
writeln("server send status: ", bClient.sendMessage(tEncoded));
|
||||||
|
|
||||||
|
writeln("server send [done]");
|
||||||
|
|
||||||
|
sleep(dur!("seconds")(15));
|
||||||
|
|
||||||
|
writeln("Server ending");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ServerThread serverThread = new ServerThread();
|
||||||
|
serverThread.start();
|
||||||
|
|
||||||
|
Socket client = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||||
|
|
||||||
|
writeln(server.localAddress);
|
||||||
|
|
||||||
|
|
||||||
|
Manager manager = new Manager(client);
|
||||||
|
|
||||||
|
Queue sixtyNine = new Queue(69);
|
||||||
|
|
||||||
|
manager.registerQueue(sixtyNine);
|
||||||
|
|
||||||
|
|
||||||
|
/* Connect our socket to the server */
|
||||||
|
client.connect(server.localAddress);
|
||||||
|
|
||||||
|
/* Start the manager and let it manage the socket */
|
||||||
|
manager.start();
|
||||||
|
|
||||||
|
|
||||||
|
// The failing exception
|
||||||
|
TristanableException failingException;
|
||||||
|
|
||||||
|
class DequeueThread : Thread
|
||||||
|
{
|
||||||
|
private Queue testQueue;
|
||||||
|
|
||||||
|
this(Queue testQueue)
|
||||||
|
{
|
||||||
|
super(&worker);
|
||||||
|
this.testQueue = testQueue;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void worker()
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
writeln("dequeuThread: Before dequeue()");
|
||||||
|
this.testQueue.dequeue();
|
||||||
|
writeln("dequeueThread: After dequeue() [should not get here]");
|
||||||
|
}
|
||||||
|
catch(TristanableException e)
|
||||||
|
{
|
||||||
|
writeln("Got tristanable exception during dequeue(): "~e.toString());
|
||||||
|
|
||||||
|
// TODO: Fliup boolean is all cgood and assret it later
|
||||||
|
failingException = e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
DequeueThread dequeueThread = new DequeueThread(sixtyNine);
|
||||||
|
dequeueThread.start();
|
||||||
|
|
||||||
|
// Stop the manager
|
||||||
|
manager.stop();
|
||||||
|
writeln("drop");
|
||||||
|
|
||||||
|
// Wait for the dequeueing thread to stop
|
||||||
|
dequeueThread.join();
|
||||||
|
|
||||||
|
// Check condition
|
||||||
|
assert(failingException !is null);
|
||||||
|
assert(failingException.getError() == ErrorType.MANAGER_SHUTDOWN);
|
||||||
}
|
}
|
Loading…
Reference in New Issue