From 743744410cedf903772094d7cf81291a1eea56d6 Mon Sep 17 00:00:00 2001 From: "Tristan B. Kildaire" Date: Tue, 17 May 2022 19:13:12 +0200 Subject: [PATCH] WIP: Adding some handling exposed via tristanable --- dub.json | 2 +- dub.selections.json | 2 +- source/tasky/engine.d | 55 ++++++++++++++++++++++++++++----------- source/tasky/exceptions.d | 12 +++++++++ 4 files changed, 54 insertions(+), 17 deletions(-) diff --git a/dub.json b/dub.json index 2230fa5..35fb4f9 100644 --- a/dub.json +++ b/dub.json @@ -6,7 +6,7 @@ "dependencies": { "bformat": "~>3.1.3", "eventy": "0.2.2", - "tristanable": "2.6.1" + "tristanable": "2.6.9" }, "description": "Tagged network-message task engine", "license": "LGPL v3", diff --git a/dub.selections.json b/dub.selections.json index 0b1dc22..3cdaacc 100644 --- a/dub.selections.json +++ b/dub.selections.json @@ -3,6 +3,6 @@ "versions": { "bformat": "3.1.3", "eventy": "0.2.2", - "tristanable": "2.6.1" + "tristanable": "2.6.9" } } diff --git a/source/tasky/engine.d b/source/tasky/engine.d index eea53e0..69a3cde 100644 --- a/source/tasky/engine.d +++ b/source/tasky/engine.d @@ -12,7 +12,8 @@ import eventy.event : Event; import tasky.jobs : Descriptor; import tristanable; import std.socket : Socket; -import core.thread : Thread; +import core.thread : Thread, dur; +import tasky.exceptions : SessionError; public final class Engine : Thread { @@ -40,7 +41,7 @@ public final class Engine : Thread /* TODO: Check for exceptions */ /* Create a new tristanable manager */ - tmanager = new Manager(socket); + tmanager = new Manager(socket, dur!("msecs")(100), true); /* Start the loop */ running = true; @@ -80,17 +81,33 @@ public final class Engine : Thread /* Descriptor ID */ ulong descID = tQueue.getTag(); - /* Check if the queue has mail */ - if(tQueue.poll()) - { - /** - * Dequeue the data item and push - * event into the event loop containing - * it - */ - QueueItem data = tQueue.dequeue(); - evEngine.push(new TaskyEvent(descID, data.getData())); - } + + try + { + /* Check if the queue has mail */ + if(tQueue.poll()) + { + /** + * Dequeue the data item and push + * event into the event loop containing + * it + */ + QueueItem data = tQueue.dequeue(); + evEngine.push(new TaskyEvent(descID, data.getData())); + } + } + /* Catch the error when the underlying socket for Manager dies */ + catch(ManagerError e) + { + /* TODO: We can only enablke this if off thread, doesn't make sense on thread, in other words it maybe makes sense */ + /* TO call engine .run() that start a new thread seeing as thie point is to make this the backbone */ + import std.stdio; + // writeln("YOO"); + // throw new SessionError("Underlying socket (TManager) is dead"); + // break; + } + + } /* TODO: Yield away somehow */ @@ -126,7 +143,7 @@ public final class Engine : Thread evEngine.addSignalHandler(desc); /* Create a new queue for this Job */ - Queue tQueue = new Queue(desc.getDescriptorClass()); + Queue tQueue = new Queue(tmanager, desc.getDescriptorClass()); /* Add the Queue to tristanable */ tmanager.addQueue(tQueue); @@ -227,6 +244,8 @@ public final class Engine : Thread dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 2"); writeln("Server send 2: ", clientSocket.send(encodeForSend(dMesg))); + sleep(dur!("seconds")(1)); + dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 3"); writeln("Server send 3: ", clientSocket.send(encodeForSend(dMesg))); dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 4"); @@ -247,6 +266,8 @@ public final class Engine : Thread clientSocket.connect(parseAddress("::1", to!(ushort)(serverAddress.toPortString()))); + + Engine e = new Engine(clientSocket); @@ -270,10 +291,14 @@ public final class Engine : Thread } } + writeln("Got to done testcase"); + runDone = true; /* TODO: Shutdown tasky here (shutdown eventy and tristanable) */ - e.shutdown(); + // e.shutdown(); + + // clientSocket.close; } } diff --git a/source/tasky/exceptions.d b/source/tasky/exceptions.d index 85dafe5..9f9c5d3 100644 --- a/source/tasky/exceptions.d +++ b/source/tasky/exceptions.d @@ -21,4 +21,16 @@ public final class SubmissionException : TaskyException { super("SubmissionException: "~msg); } +} + +/** +* Raised if the underlying socket dies (connection closes) +* or (TODO: check that Tasky shutdown does not cause this to weirdly go off by calling tmanager.shutdown()) +*/ +public final class SessionError : TaskyException +{ + this(string msg) + { + super("SessionError: "~msg); + } } \ No newline at end of file