WIP: Adding some handling exposed via tristanable

This commit is contained in:
Tristan B. Velloza Kildaire 2022-05-17 19:13:12 +02:00
parent 3561ed8fda
commit 743744410c
4 changed files with 54 additions and 17 deletions

View File

@ -6,7 +6,7 @@
"dependencies": {
"bformat": "~>3.1.3",
"eventy": "0.2.2",
"tristanable": "2.6.1"
"tristanable": "2.6.9"
},
"description": "Tagged network-message task engine",
"license": "LGPL v3",

View File

@ -3,6 +3,6 @@
"versions": {
"bformat": "3.1.3",
"eventy": "0.2.2",
"tristanable": "2.6.1"
"tristanable": "2.6.9"
}
}

View File

@ -12,7 +12,8 @@ import eventy.event : Event;
import tasky.jobs : Descriptor;
import tristanable;
import std.socket : Socket;
import core.thread : Thread;
import core.thread : Thread, dur;
import tasky.exceptions : SessionError;
public final class Engine : Thread
{
@ -40,7 +41,7 @@ public final class Engine : Thread
/* TODO: Check for exceptions */
/* Create a new tristanable manager */
tmanager = new Manager(socket);
tmanager = new Manager(socket, dur!("msecs")(100), true);
/* Start the loop */
running = true;
@ -80,17 +81,33 @@ public final class Engine : Thread
/* Descriptor ID */
ulong descID = tQueue.getTag();
/* Check if the queue has mail */
if(tQueue.poll())
{
/**
* Dequeue the data item and push
* event into the event loop containing
* it
*/
QueueItem data = tQueue.dequeue();
evEngine.push(new TaskyEvent(descID, data.getData()));
}
try
{
/* Check if the queue has mail */
if(tQueue.poll())
{
/**
* Dequeue the data item and push
* event into the event loop containing
* it
*/
QueueItem data = tQueue.dequeue();
evEngine.push(new TaskyEvent(descID, data.getData()));
}
}
/* Catch the error when the underlying socket for Manager dies */
catch(ManagerError e)
{
/* TODO: We can only enablke this if off thread, doesn't make sense on thread, in other words it maybe makes sense */
/* TO call engine .run() that start a new thread seeing as thie point is to make this the backbone */
import std.stdio;
// writeln("YOO");
// throw new SessionError("Underlying socket (TManager) is dead");
// break;
}
}
/* TODO: Yield away somehow */
@ -126,7 +143,7 @@ public final class Engine : Thread
evEngine.addSignalHandler(desc);
/* Create a new queue for this Job */
Queue tQueue = new Queue(desc.getDescriptorClass());
Queue tQueue = new Queue(tmanager, desc.getDescriptorClass());
/* Add the Queue to tristanable */
tmanager.addQueue(tQueue);
@ -227,6 +244,8 @@ public final class Engine : Thread
dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 2");
writeln("Server send 2: ", clientSocket.send(encodeForSend(dMesg)));
sleep(dur!("seconds")(1));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 3");
writeln("Server send 3: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 4");
@ -247,6 +266,8 @@ public final class Engine : Thread
clientSocket.connect(parseAddress("::1", to!(ushort)(serverAddress.toPortString())));
Engine e = new Engine(clientSocket);
@ -270,10 +291,14 @@ public final class Engine : Thread
}
}
writeln("Got to done testcase");
runDone = true;
/* TODO: Shutdown tasky here (shutdown eventy and tristanable) */
e.shutdown();
// e.shutdown();
// clientSocket.close;
}
}

View File

@ -21,4 +21,16 @@ public final class SubmissionException : TaskyException
{
super("SubmissionException: "~msg);
}
}
/**
* Raised if the underlying socket dies (connection closes)
* or (TODO: check that Tasky shutdown does not cause this to weirdly go off by calling tmanager.shutdown())
*/
public final class SessionError : TaskyException
{
this(string msg)
{
super("SessionError: "~msg);
}
}