- Upgraded to new `bformat` version `4.1.0` and migrated to using `BClient` (unit tests seem to pass)

This commit is contained in:
Tristan B. Velloza Kildaire 2023-04-30 19:30:11 +02:00
parent 97ecbd86bb
commit 64163aed0a
3 changed files with 45 additions and 28 deletions

View File

@ -4,7 +4,7 @@
], ],
"copyright": "Copyright © 2023, Tristan B. Kildaire", "copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": { "dependencies": {
"bformat": "3.1.13", "bformat": "4.1.0",
"libsnooze": "0.3.3" "libsnooze": "0.3.3"
}, },
"description": "Tristanable network message queuing framework", "description": "Tristanable network message queuing framework",

View File

@ -11,7 +11,9 @@ import tristanable.encoding : TaggedMessage;
import tristanable.exceptions; import tristanable.exceptions;
import std.container.slist : SList; import std.container.slist : SList;
import tristanable.manager.config; import tristanable.manager.config;
import bformat.sockets : bformatSendMessage = sendMessage; import river.core;
import river.impls.sock : SockStream;
import bformat.client;
/** /**
* Manages a provided socket by spawning * Manages a provided socket by spawning
@ -29,9 +31,9 @@ public class Manager
private Config config; private Config config;
/** /**
* The underlying socket to read from * The bformat client to read and write from
*/ */
private Socket socket; private BClient bClient;
/** /**
* Currently registered queues * Currently registered queues
@ -60,14 +62,21 @@ public class Manager
* this socket and file mail for us * this socket and file mail for us
* *
* Params: * Params:
* socket = the underlying socket to use * stream = the underlying stream to use
*/ */
this(Socket socket, Config config = defaultConfig()) this(Stream stream, Config config = defaultConfig())
{ {
this.socket = socket; this.bClient = new BClient(stream);
this.queuesLock = new Mutex(); this.queuesLock = new Mutex();
this.config = config; this.config = config;
this.watcher = new Watcher(this, socket); this.watcher = new Watcher(this, bClient);
}
// TODO: Comment this
// This is for backwards compatibility (whereby a `Socket` was taken in)
this(Socket socket, Config config = defaultConfig())
{
this(new SockStream(socket), config);
} }
/** /**
@ -333,10 +342,20 @@ public class Manager
byte[] encodedMessage = message.encode(); byte[] encodedMessage = message.encode();
/* Send it using bformat (encode-and-send) */ /* Send it using bformat (encode-and-send) */
bformatSendMessage(socket, encodedMessage); bClient.sendMessage(encodedMessage);
} }
} }
// TODO: Fix this, write it in a nicer way
// ... or make a private constructor here that
// ... does not take it in
version(unittest)
{
Socket nullSock = null;
}
/** /**
* Test retrieving a queue which does not * Test retrieving a queue which does not
* exist * exist
@ -344,7 +363,7 @@ public class Manager
unittest unittest
{ {
/* Create a manager */ /* Create a manager */
Manager manager = new Manager(null); Manager manager = new Manager(nullSock);
/* Shouldn't be found */ /* Shouldn't be found */
try try
@ -367,7 +386,7 @@ unittest
unittest unittest
{ {
/* Create a manager */ /* Create a manager */
Manager manager = new Manager(null); Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */ /* Create a new queue with tag 69 */
Queue queue = new Queue(69); Queue queue = new Queue(69);
@ -399,7 +418,7 @@ unittest
unittest unittest
{ {
/* Create a manager */ /* Create a manager */
Manager manager = new Manager(null); Manager manager = new Manager(nullSock);
/* Create a new queue with tag 69 */ /* Create a new queue with tag 69 */
Queue queue = new Queue(69); Queue queue = new Queue(69);
@ -427,7 +446,7 @@ unittest
unittest unittest
{ {
/* Create a manager */ /* Create a manager */
Manager manager = new Manager(null); Manager manager = new Manager(nullSock);
/* Get the next 3 available queues */ /* Get the next 3 available queues */
Queue queue1 = manager.getUniqueQueue(); Queue queue1 = manager.getUniqueQueue();

View File

@ -12,6 +12,7 @@ import bformat;
import tristanable.encoding; import tristanable.encoding;
import tristanable.exceptions; import tristanable.exceptions;
import tristanable.queue; import tristanable.queue;
import bformat.client;
/** /**
* Watches the socket on a thread of its own, * Watches the socket on a thread of its own,
@ -29,9 +30,9 @@ public class Watcher : Thread
private Manager manager; private Manager manager;
/** /**
* The underlying socket to read from * The BClient to read from
*/ */
private Socket socket; private BClient bClient;
/** /**
* Creates a new `Watcher` that is associated * Creates a new `Watcher` that is associated
@ -41,12 +42,12 @@ public class Watcher : Thread
* *
* Params: * Params:
* manager = the `Manager` to associate with * manager = the `Manager` to associate with
* socket = the underlying `Socket` to read data from * bclient = the underlying `BClient` to read data from
*/ */
package this(Manager manager, Socket socket) package this(Manager manager, BClient bClient)
{ {
this.manager = manager; this.manager = manager;
this.socket = socket; this.bClient = bClient;
super(&watch); super(&watch);
} }
@ -74,7 +75,7 @@ public class Watcher : Thread
/* Do a bformat read-and-decode */ /* Do a bformat read-and-decode */
byte[] wireTristan; byte[] wireTristan;
version(unittest) { writeln("Before bformat recv()"); } version(unittest) { writeln("Before bformat recv()"); }
bool recvStatus = receiveMessage(socket, wireTristan); // TODO: Add a check for the status of read bool recvStatus = bClient.receiveMessage(wireTristan); // TODO: Add a check for the status of read
version(unittest) { writeln("After bformat recv()"); } version(unittest) { writeln("After bformat recv()"); }
version(unittest) { writeln("bformat recv() status: ", recvStatus); } version(unittest) { writeln("bformat recv() status: ", recvStatus); }
@ -135,12 +136,8 @@ public class Watcher : Thread
*/ */
package void shutdown() package void shutdown()
{ {
/* Unblock all calls to `recv()` and disallow future ones */ /* Closes the bformat reader */
// TODO: Would we want to do the same for sends? */ bClient.close();
socket.shutdown(SocketShutdown.RECEIVE);
/* Close the connection */
socket.close();
} }
} }
@ -170,6 +167,7 @@ unittest
private void worker() private void worker()
{ {
Socket clientSocket = server.accept(); Socket clientSocket = server.accept();
BClient bClient = new BClient(clientSocket);
Thread.sleep(dur!("seconds")(7)); Thread.sleep(dur!("seconds")(7));
writeln("Server start"); writeln("Server start");
@ -181,7 +179,7 @@ unittest
*/ */
TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️"); TaggedMessage message = new TaggedMessage(42, cast(byte[])"Cucumber 😳️");
byte[] tEncoded = message.encode(); byte[] tEncoded = message.encode();
writeln("server send status: ", sendMessage(clientSocket, tEncoded)); writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]"); writeln("server send [done]");
@ -192,7 +190,7 @@ unittest
*/ */
message = new TaggedMessage(69, cast(byte[])"Hello"); message = new TaggedMessage(69, cast(byte[])"Hello");
tEncoded = message.encode(); tEncoded = message.encode();
writeln("server send status: ", sendMessage(clientSocket, tEncoded)); writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]"); writeln("server send [done]");
@ -203,7 +201,7 @@ unittest
*/ */
message = new TaggedMessage(69, cast(byte[])"Bye"); message = new TaggedMessage(69, cast(byte[])"Bye");
tEncoded = message.encode(); tEncoded = message.encode();
writeln("server send status: ", sendMessage(clientSocket, tEncoded)); writeln("server send status: ", bClient.sendMessage(tEncoded));
writeln("server send [done]"); writeln("server send [done]");
} }