Tristan B. Velloza Kildaire 2023-03-19 18:09:58 +02:00
"copyright": "Copyright © 2021, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>3.1.3",
"eventy": "0.2.4",
"eventy": "0.2.5",
"tristanable": "2.6.12"
"description": "Tagged network-message task engine",

"fileVersion": 1,
"versions": {
"bformat": "3.1.3",
"eventy": "0.2.4",
"eventy": "0.2.5",
"tristanable": "2.6.12"

* Engine
* Contains the core components of the tasky
* library, this is effectively the entry
* point to the library
module tasky.engine;
import eventy.engine : EvEngine = Engine;
import eventy.event : Event;
import tasky.jobs : Descriptor;
import tristanable;
import std.socket : Socket;
import core.thread : Thread, dur;
import tasky.exceptions : SessionError;
public class TaskyEvent : Event
private byte[] payload;
this(ulong descID, byte[] payload)
this.payload = payload;
public byte[] getPayload()
return payload;
public final class Engine : Thread
* Tristanable sub-system
private Manager tmanager;
* Eventy sub-system
private EvEngine evEngine;
private bool running;
this(Socket socket)
/* Set the worker function */
/* Create a new event engine */
evEngine = new EvEngine();
/* Create a new tristanable manager */
tmanager = new Manager(socket, dur!("msecs")(100), true);
* Start the sub-systems
private void startTasky()
/* Start the event engine */
/* Start the tristanable queue filter */
/* Start the loop */
running = true;
* Worker thread function which checks the tristanable
* queues for whichever has messages on them and then
* dispatches a job-response for them via eventy
private void worker()
/* Start all sub-systems */
* Loop through each queue, poll for
* any new data, pull off one item
* at most
* TODO: Different queuing systems
Queue[] tQueues = tmanager.getQueues();
foreach(Queue tQueue; tQueues)
/* Descriptor ID */
ulong descID = tQueue.getTag();
/* Check if the queue has mail */
* Dequeue the data item and push
* event into the event loop containing
* it
QueueItem data = tQueue.dequeue();
evEngine.push(new TaskyEvent(descID, data.getData()));
/* Catch the error when the underlying socket for Manager dies */
catch(ManagerError e)
/* TODO: We can only enablke this if off thread, doesn't make sense on thread, in other words it maybe makes sense */
/* TO call engine .run() that start a new thread seeing as thie point is to make this the backbone */
import std.stdio;
// writeln("YOO");
// throw new SessionError("Underlying socket (TManager) is dead");
// break;
/* TODO: Yield away somehow */
import core.thread : dur;
// sleep(dur!("msecs")(500));
* Stop the task engine
public void shutdown()
/* Stop the loop */
running = false;
/* TODO: Stop tristsnable (must be implemented in tristanable first) */
/* TODO: Stop eventy (mjst be implemented in eventy first) */
* Register a Descriptor with tasky
public void registerDescriptor(Descriptor desc)
/* Add a queue based on the descriptor ID */
/* Add a signal handler that handles said descriptor ID */
/* Create a new queue for this Job */
Queue tQueue = new Queue(tmanager, desc.getDescriptorClass());
/* Add the Queue to tristanable */
/* Results array for unit testing */
bool[4] results;
import std.conv : to;
import core.thread : dur;
import std.string : cmp;
import std.datetime.stopwatch : StopWatch;
bool runDone;
/* Job type */
Descriptor jobType = new class Descriptor {
public override void handler_TaskyEvent(TaskyEvent e)
import std.stdio : writeln;
writeln("Event id ", e.id);
string data = cast(string)e.payload;
if(cmp(data, "Hello 1") == 0)
results[0] = true;
else if(cmp(data, "Hello 2") == 0)
results[1] = true;
ulong jobTypeDI = jobType.getDescriptorClass;
ulong job2C = 0;
/* Job type */
Descriptor jobType2 = new class Descriptor {
public override void handler_TaskyEvent(TaskyEvent e)
import std.stdio : writeln;
writeln("Event id ", e.id);
writeln("OTHER event type");
string data = cast(string)e.payload;
// job2C++;
// assert(cmp(cast(string)eT.payload, ""))
if(cmp(data, "Bye-bye! 3") == 0)
results[2] = true;
else if(cmp(data, "Bye-bye! 4") == 0)
results[3] = true;
ulong jobTypeDI2 = jobType2.getDescriptorClass;
import std.socket;
import std.stdio;
Socket serverSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
serverSocket.bind(parseAddress("::1", 0));
Address serverAddress = serverSocket.localAddress();
Thread serverThread = new class Thread {
public void worker()
Socket clientSocket = serverSocket.accept();
import tristanable.encoding : DataMessage, encodeForSend;
DataMessage dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 1");
writeln("Server send 1: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 2");
writeln("Server send 2: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 3");
writeln("Server send 3: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 4");
writeln("Server send 4: ", clientSocket.send(encodeForSend(dMesg)));
Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
clientSocket.connect(parseAddress("::1", to!(ushort)(serverAddress.toPortString())));
/* Create a new Tasky engine */
Engine e = new Engine(clientSocket);
* Setup the job types that are wanted
/* Start the tasky engine */
* Await the expected result, but if this does not complete
* within 4 seconds then expect it failed
StopWatch watch;
while(!results[0] || !results[1] || !results[2] || !results[3])
if(watch.peek() > dur!("seconds")(4))
runDone = true;
writeln("Got to done testcase");
runDone = true;
/* TODO: Shutdown tasky here (shutdown eventy and tristanable) */
// e.shutdown();
// clientSocket.close;

* Exceptions
* Base definitions for exceptions appear here
module tasky.exceptions;
import std.exception;
public abstract class TaskyException : Exception
this(string msg)
public final class SubmissionException : TaskyException
this(string msg)
super("SubmissionException: "~msg);
public final class DescriptorException : TaskyException
this(string msg)
super("DescriptorException: "~msg);
* Raised if the underlying socket dies (connection closes)
* or (TODO: check that Tasky shutdown does not cause this to weirdly go off by calling tmanager.shutdown())
public final class SessionError : TaskyException
this(string msg)
super("SessionError: "~msg);

* Jobs
* Contains tools for describing different types
* of jobs and what event handlers will be triggered
* for them along with the creation of actual
* Jobs (schedulable units).
module tasky.jobs;
/* TODO: Remove this import */
import std.stdio;
import tasky.exceptions : TaskyException, DescriptorException;
/* TODO: DList stuff */
import std.container.dlist;
import core.sync.mutex : Mutex;
import std.string : cmp;
import eventy.signal : Signal;
import eventy.event : Event;
// <<<<<<< Updated upstream
import std.conv : to;
// =======
import tasky.engine : Engine, TaskyEvent;
// >>>>>>> Stashed changes
* A Job to be scheduled
public final class Job
* TODO: Comment
private Descriptor descriptor;
private byte[] payload;
* Returns the classification of this Job, i.e.
* its Descriptor number
public ulong getJobTypeID()
return descriptor.getDescriptorClass();
private this(Descriptor jobType, byte[] payload)
/* TODO: Extract needed information from here */
this.descriptor = jobType;
this.payload = payload;
protected Job newJob(Descriptor jobType, byte[] payload)
* This is mark protected for a reason, don't
* try and call this directly
return new Job(jobType, payload);
public final class JobException : TaskyException
this(string message)
super("(JobError) "~message);
* Descriptor
* This represents a type of Job, represented
* by a unique ID. Along with this is an associated
* signal handler provided by the user which is
* to be run on completion of said Job
* TODO: Add support for custom IDs
public abstract class Descriptor : Signal
* Descriptor ID reservation sub-system
private static __gshared Mutex descQueueLock;
private static __gshared DList!(ulong) descQueue;
* All descriptors (pool)
private static __gshared DList!(Descriptor) descPool;
* Descriptor data
* The signal handler that handles the running
* of any job associated with this Descriptor
* We should `alias can we?
private immutable ulong descriptorClass;
* Static initialization of the descriptor
* class ID queue's lock
__gshared static this()
descQueueLock = new Mutex();
/* TODO: Static (and _gshared cross threads) ID tracker */
* Checks whether a Descriptor class has been registered
* previously that has the same ID but not the same
* equality (i.e. not spawned from the same object)
public static bool isDescriptorClass(Descriptor descriptor)
/* TODO: Add the implementation for this */
return false;
* Returns true if the given descriptor ID is in
* use, false otherwise
* @param
private static bool isDescIDInUse(ulong descID)
foreach(ulong descIDCurr; descQueue)
if(descID == descIDCurr)
return true;
return false;
* Test unique descriptor class ID generation
* and tracking
ulong s1 = addDescQueue();
ulong s2 = addDescQueue();
assert(s1 != s2);
* Finds the next valid descriptor class ID,
* reserves it and returns it
private static ulong addDescQueue()
ulong descID;
descID = generateDescID();
descQueue ~= descID;
return descID;
* Gneerates a Descriptor ID
* This returns a string that is a hash of
* the current time
private static ulong generateDescID()
/* Get current time */
import std.datetime.systime : Clock;
string time = Clock.currTime().toString();
/* Get random number */
/* TODO: Get random number */
string randnum;
/* Create data string */
string data = time~randnum;
/* Calculate the hash */
import std.digest.sha;
import std.digest;
SHA1Digest sha = new SHA1Digest();
* We will store the digest as the first 8
* bytes of the hash
ulong digest;
ubyte[] hashDigest = sha.digest(data);
digest = *(cast(ulong*)hashDigest.ptr);
return digest;
* Creates a new Descriptor
* FIXME: What if we cannot get a valid ID? Throw an exception
/* Grab a descriptor ID */
descriptorClass = addDescQueue();
* Setup a new Eventy Signal handler
* which handles only the typeID
* of `descriptorClass`
* Given a descriptor class this will attempt adding it,
* on failure false is returned, on sucess, true
private bool addClass(ulong descriptorClass)
bool status;
/* Check if we can add it */
/* Add it to the ID queue */
descQueue ~= descriptorClass;
/* Set status to successful */
status = true;
/* Set status to failure */
status = false;
return status;
* Creates a new Descriptor (with a given fixed descriptor class)
* TODO: Future support (add this in after TaskyEvent things)
this(ulong descriptorClass)
/* Attempt adding */
/* Set the descriptor ID */
this.descriptorClass = descriptorClass;
/* Throw an exception if the ID is already in use */
throw new DescriptorException("Given ID '"~to!(string)(descriptorClass)~"' is already in use");
* Setup a new Eventy Signal handler
* which handles only the typeID
* of `descriptorClass`
* Tests custom descriptor IDs
* FIXME: The unittest ordering to be kinda important
* Add a descriptor with ID 1, this should pass
class DescTest : Descriptor
public override void handler_TaskyEvent(TaskyEvent e)
writeln("Event id ", e.id);
Descriptor newDesc = new DescTest();
assert(newDesc.getDescriptorClass() == 1);
catch(DescriptorException e)
* Add a descriptor with ID 2, this should pass
class DescTest : Descriptor
public override void handler_TaskyEvent(TaskyEvent e)
writeln("Event id ", e.id);
Descriptor newDesc = new DescTest();
assert(newDesc.getDescriptorClass() == 2);
catch(DescriptorException e)
* Add a descriptor with ID 2, this should pass
class DescTest : Descriptor
public override void handler_TaskyEvent(TaskyEvent e)
writeln("Event id ", e.id);
Descriptor newDesc = new DescTest();
catch(DescriptorException e)
* Create a uniqye Descriptor for a future
* Job that will run the function `test`
* on completion (reply)
class DescTest : Descriptor
public override void handler_TaskyEvent(TaskyEvent e)
writeln("Event id ", e.id);
new DescTest();
* Instantiates a Job based on this Descriptor
* ("Job template") with the given payload
* to be sent
public final Job spawnJob(byte[] payload)
Job instantiatedDescriptor;
if(payload.length == 0)
throw new JobException("JobSpawnError: Empty payloads not allowed");
instantiatedDescriptor = new Job(this, payload);
return instantiatedDescriptor;
public final ulong getDescriptorClass()
return descriptorClass;
* Override this to handle Event
* TODO: This should be final and non-abstract
* and take in `Event e`, then the suer overrides
* one that takes in a TaskyEvent, this handler
* must call that one and THAT one must be abstract.
* This would make a lot more sense seeing how we
* always want to pack data.
* TODO: Make this named _entry and other named handler
public final override void handler(Event e)
public abstract void handler_TaskyEvent(TaskyEvent e);

module tasky;
public import tasky.engine;
public import tasky.jobs;
public import tasky.exceptions;
// TODO: do this