From 64163aed0ad403292ec1466b5df2b52ec872de59 Mon Sep 17 00:00:00 2001 From: "Tristan B. Velloza Kildaire" Date: Sun, 30 Apr 2023 19:30:11 +0200 Subject: [PATCH] - Upgraded to new `bformat` version `4.1.0` and migrated to using `BClient` (unit tests seem to pass) --- dub.json | 2 +- source/tristanable/manager/manager.d | 43 ++++++++++++++++++++-------- source/tristanable/manager/watcher.d | 28 +++++++++--------- 3 files changed, 45 insertions(+), 28 deletions(-) diff --git a/dub.json b/dub.json index fb9735a..279a9a8 100644 --- a/dub.json +++ b/dub.json @@ -4,7 +4,7 @@ ], "copyright": "Copyright © 2023, Tristan B. Kildaire", "dependencies": { - "bformat": "3.1.13", + "bformat": "4.1.0", "libsnooze": "0.3.3" }, "description": "Tristanable network message queuing framework", diff --git a/source/tristanable/manager/manager.d b/source/tristanable/manager/manager.d index f4a4042..ec0c047 100644 --- a/source/tristanable/manager/manager.d +++ b/source/tristanable/manager/manager.d @@ -11,7 +11,9 @@ import tristanable.encoding : TaggedMessage; import tristanable.exceptions; import std.container.slist : SList; import tristanable.manager.config; -import bformat.sockets : bformatSendMessage = sendMessage; +import river.core; +import river.impls.sock : SockStream; +import bformat.client; /** * Manages a provided socket by spawning @@ -29,9 +31,9 @@ public class Manager private Config config; /** - * The underlying socket to read from + * The bformat client to read and write from */ - private Socket socket; + private BClient bClient; /** * Currently registered queues @@ -60,14 +62,21 @@ public class Manager * this socket and file mail for us * * Params: - * socket = the underlying socket to use + * stream = the underlying stream to use */ - this(Socket socket, Config config = defaultConfig()) + this(Stream stream, Config config = defaultConfig()) { - this.socket = socket; + this.bClient = new BClient(stream); this.queuesLock = new Mutex(); this.config = config; - this.watcher = new Watcher(this, socket); + this.watcher = new Watcher(this, bClient); + } + + // TODO: Comment this + // This is for backwards compatibility (whereby a `Socket` was taken in) + this(Socket socket, Config config = defaultConfig()) + { + this(new SockStream(socket), config); } /** @@ -333,10 +342,20 @@ public class Manager byte[] encodedMessage = message.encode(); /* Send it using bformat (encode-and-send) */ - bformatSendMessage(socket, encodedMessage); + bClient.sendMessage(encodedMessage); } } + + +// TODO: Fix this, write it in a nicer way +// ... or make a private constructor here that +// ... does not take it in +version(unittest) +{ + Socket nullSock = null; +} + /** * Test retrieving a queue which does not * exist @@ -344,7 +363,7 @@ public class Manager unittest { /* Create a manager */ - Manager manager = new Manager(null); + Manager manager = new Manager(nullSock); /* Shouldn't be found */ try @@ -367,7 +386,7 @@ unittest unittest { /* Create a manager */ - Manager manager = new Manager(null); + Manager manager = new Manager(nullSock); /* Create a new queue with tag 69 */ Queue queue = new Queue(69); @@ -399,7 +418,7 @@ unittest unittest { /* Create a manager */ - Manager manager = new Manager(null); + Manager manager = new Manager(nullSock); /* Create a new queue with tag 69 */ Queue queue = new Queue(69); @@ -427,7 +446,7 @@ unittest unittest { /* Create a manager */ - Manager manager = new Manager(null); + Manager manager = new Manager(nullSock); /* Get the next 3 available queues */ Queue queue1 = manager.getUniqueQueue(); diff --git a/source/tristanable/manager/watcher.d b/source/tristanable/manager/watcher.d index 336d0a4..4c9ba38 100644 --- a/source/tristanable/manager/watcher.d +++ b/source/tristanable/manager/watcher.d @@ -12,6 +12,7 @@ import bformat; import tristanable.encoding; import tristanable.exceptions; import tristanable.queue; +import bformat.client; /** * Watches the socket on a thread of its own, @@ -29,9 +30,9 @@ public class Watcher : Thread private Manager manager; /** - * The underlying socket to read from + * The BClient to read from */ - private Socket socket; + private BClient bClient; /** * Creates a new `Watcher` that is associated @@ -41,12 +42,12 @@ public class Watcher : Thread * * Params: * manager = the `Manager` to associate with - * socket = the underlying `Socket` to read data from + * bclient = the underlying `BClient` to read data from */ - package this(Manager manager, Socket socket) + package this(Manager manager, BClient bClient) { this.manager = manager; - this.socket = socket; + this.bClient = bClient; super(&watch); } @@ -74,7 +75,7 @@ public class Watcher : Thread /* Do a bformat read-and-decode */ byte[] wireTristan; version(unittest) { writeln("Before bformat recv()"); } - bool recvStatus = receiveMessage(socket, wireTristan); // TODO: Add a check for the status of read + bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read version(unittest) { writeln("After bformat recv()"); } version(unittest) { writeln("bformat recv() status: ", recvStatus); } @@ -135,12 +136,8 @@ public class Watcher : Thread */ package void shutdown() { - /* Unblock all calls to `recv()` and disallow future ones */ - // TODO: Would we want to do the same for sends? */ - socket.shutdown(SocketShutdown.RECEIVE); - - /* Close the connection */ - socket.close(); + /* Closes the bformat reader */ + bClient.close(); } } @@ -170,6 +167,7 @@ unittest private void worker() { Socket clientSocket = server.accept(); + BClient bClient = new BClient(clientSocket); Thread.sleep(dur!("seconds")(7)); writeln("Server start"); @@ -181,7 +179,7 @@ unittest */ TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️"); byte[] tEncoded = message.encode(); - writeln("server send status: ", sendMessage(clientSocket, tEncoded)); + writeln("server send status: ", bClient.sendMessage(tEncoded)); writeln("server send [done]"); @@ -192,7 +190,7 @@ unittest */ message = new TaggedMessage(69, cast(byte[])"Hello"); tEncoded = message.encode(); - writeln("server send status: ", sendMessage(clientSocket, tEncoded)); + writeln("server send status: ", bClient.sendMessage(tEncoded)); writeln("server send [done]"); @@ -203,7 +201,7 @@ unittest */ message = new TaggedMessage(69, cast(byte[])"Bye"); tEncoded = message.encode(); - writeln("server send status: ", sendMessage(clientSocket, tEncoded)); + writeln("server send status: ", bClient.sendMessage(tEncoded)); writeln("server send [done]"); }