diff --git a/dub.json b/dub.json index ca4d704..5f4f6b9 100644 --- a/dub.json +++ b/dub.json @@ -5,7 +5,7 @@ "copyright": "Copyright © 2021, Tristan B. Kildaire", "dependencies": { "bformat": "~>3.1.3", - "eventy": "0.2.4", + "eventy": "0.2.5", "tristanable": "2.6.12" }, "description": "Tagged network-message task engine", diff --git a/dub.selections.json b/dub.selections.json index 3014aef..8bd89ae 100644 --- a/dub.selections.json +++ b/dub.selections.json @@ -2,7 +2,7 @@ "fileVersion": 1, "versions": { "bformat": "3.1.3", - "eventy": "0.2.4", + "eventy": "0.2.5", "tristanable": "2.6.12" } } diff --git a/libtasky.a b/libtasky.a new file mode 100644 index 0000000..09660c0 Binary files /dev/null and b/libtasky.a differ diff --git a/source/tasky/engine.d b/source/tasky/engine.d deleted file mode 100644 index 8324729..0000000 --- a/source/tasky/engine.d +++ /dev/null @@ -1,325 +0,0 @@ -/** -* Engine -* -* Contains the core components of the tasky -* library, this is effectively the entry -* point to the library -*/ -module tasky.engine; - -import eventy.engine : EvEngine = Engine; -import eventy.event : Event; -import tasky.jobs : Descriptor; -import tristanable; -import std.socket : Socket; -import core.thread : Thread, dur; -import tasky.exceptions : SessionError; - -public class TaskyEvent : Event -{ - private byte[] payload; - - this(ulong descID, byte[] payload) - { - super(descID); - this.payload = payload; - } - - public byte[] getPayload() - { - return payload; - } -} - -public final class Engine : Thread -{ - /** - * Tristanable sub-system - */ - private Manager tmanager; - - /** - * Eventy sub-system - */ - private EvEngine evEngine; - - private bool running; - - this(Socket socket) - { - /* Set the worker function */ - super(&worker); - - /* Create a new event engine */ - evEngine = new EvEngine(); - - /* Create a new tristanable manager */ - tmanager = new Manager(socket, dur!("msecs")(100), true); - } - - /** - * Start the sub-systems - */ - private void startTasky() - { - /* Start the event engine */ - evEngine.start(); - - /* Start the tristanable queue filter */ - tmanager.start(); - - /* Start the loop */ - running = true; - } - - /** - * Worker thread function which checks the tristanable - * queues for whichever has messages on them and then - * dispatches a job-response for them via eventy - */ - private void worker() - { - /* Start all sub-systems */ - startTasky(); - - while(running) - { - /** - * Loop through each queue, poll for - * any new data, pull off one item - * at most - * - * TODO: Different queuing systems - */ - Queue[] tQueues = tmanager.getQueues(); - foreach(Queue tQueue; tQueues) - { - /* Descriptor ID */ - ulong descID = tQueue.getTag(); - - - try - { - /* Check if the queue has mail */ - if(tQueue.poll()) - { - /** - * Dequeue the data item and push - * event into the event loop containing - * it - */ - QueueItem data = tQueue.dequeue(); - evEngine.push(new TaskyEvent(descID, data.getData())); - } - } - /* Catch the error when the underlying socket for Manager dies */ - catch(ManagerError e) - { - /* TODO: We can only enablke this if off thread, doesn't make sense on thread, in other words it maybe makes sense */ - /* TO call engine .run() that start a new thread seeing as thie point is to make this the backbone */ - import std.stdio; - // writeln("YOO"); - // throw new SessionError("Underlying socket (TManager) is dead"); - // break; - } - - - } - - /* TODO: Yield away somehow */ - import core.thread : dur; - // sleep(dur!("msecs")(500)); - } - } - - /** - * Stop the task engine - */ - public void shutdown() - { - /* Stop the loop */ - running = false; - - /* TODO: Stop tristsnable (must be implemented in tristanable first) */ - tmanager.shutdown(); - - /* TODO: Stop eventy (mjst be implemented in eventy first) */ - evEngine.shutdown(); - } - - /** - * Register a Descriptor with tasky - */ - public void registerDescriptor(Descriptor desc) - { - /* Add a queue based on the descriptor ID */ - evEngine.addQueue(desc.getDescriptorClass()); - - /* Add a signal handler that handles said descriptor ID */ - evEngine.addSignalHandler(desc); - - /* Create a new queue for this Job */ - Queue tQueue = new Queue(tmanager, desc.getDescriptorClass()); - - /* Add the Queue to tristanable */ - tmanager.addQueue(tQueue); - } - - - unittest - { - /* Results array for unit testing */ - bool[4] results; - - import std.conv : to; - import core.thread : dur; - import std.string : cmp; - import std.datetime.stopwatch : StopWatch; - - bool runDone; - - /* Job type */ - Descriptor jobType = new class Descriptor { - public override void handler_TaskyEvent(TaskyEvent e) - { - import std.stdio : writeln; - writeln("Event id ", e.id); - - string data = cast(string)e.payload; - writeln(data); - - if(cmp(data, "Hello 1") == 0) - { - results[0] = true; - } - else if(cmp(data, "Hello 2") == 0) - { - results[1] = true; - } - } - }; - - ulong jobTypeDI = jobType.getDescriptorClass; - - ulong job2C = 0; - - /* Job type */ - Descriptor jobType2 = new class Descriptor { - public override void handler_TaskyEvent(TaskyEvent e) - { - import std.stdio : writeln; - writeln("Event id ", e.id); - - writeln("OTHER event type"); - - string data = cast(string)e.payload; - writeln(data); - // job2C++; - // assert(cmp(cast(string)eT.payload, "")) - - if(cmp(data, "Bye-bye! 3") == 0) - { - results[2] = true; - } - else if(cmp(data, "Bye-bye! 4") == 0) - { - results[3] = true; - } - } - }; - - ulong jobTypeDI2 = jobType2.getDescriptorClass; - - - import std.socket; - import std.stdio; - Socket serverSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); - serverSocket.bind(parseAddress("::1", 0)); - Address serverAddress = serverSocket.localAddress(); - - Thread serverThread = new class Thread { - this() - { - super(&worker); - serverSocket.listen(0); - - } - - public void worker() - { - - Socket clientSocket = serverSocket.accept(); - - sleep(dur!("seconds")(2)); - - import tristanable.encoding : DataMessage, encodeForSend; - DataMessage dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 1"); - writeln("Server send 1: ", clientSocket.send(encodeForSend(dMesg))); - dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 2"); - writeln("Server send 2: ", clientSocket.send(encodeForSend(dMesg))); - - sleep(dur!("seconds")(1)); - - dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 3"); - writeln("Server send 3: ", clientSocket.send(encodeForSend(dMesg))); - dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 4"); - writeln("Server send 4: ", clientSocket.send(encodeForSend(dMesg))); - - - while(!runDone) - { - - } - - } - }; - - serverThread.start(); - - Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); - clientSocket.connect(parseAddress("::1", to!(ushort)(serverAddress.toPortString()))); - - - - - - /* Create a new Tasky engine */ - Engine e = new Engine(clientSocket); - - /** - * Setup the job types that are wanted - */ - e.registerDescriptor(jobType); - e.registerDescriptor(jobType2); - - - /* Start the tasky engine */ - e.start(); - - - /** - * Await the expected result, but if this does not complete - * within 4 seconds then expect it failed - */ - StopWatch watch; - watch.start(); - while(!results[0] || !results[1] || !results[2] || !results[3]) - { - if(watch.peek() > dur!("seconds")(4)) - { - runDone = true; - assert(false); - } - } - - writeln("Got to done testcase"); - - runDone = true; - - - /* TODO: Shutdown tasky here (shutdown eventy and tristanable) */ - // e.shutdown(); - - // clientSocket.close; - } -} diff --git a/source/tasky/exceptions.d b/source/tasky/exceptions.d deleted file mode 100644 index d901885..0000000 --- a/source/tasky/exceptions.d +++ /dev/null @@ -1,44 +0,0 @@ -/** -* Exceptions -* -* Base definitions for exceptions appear here -*/ -module tasky.exceptions; - -import std.exception; - -public abstract class TaskyException : Exception -{ - this(string msg) - { - super("TaskyException:"~msg); - } -} - -public final class SubmissionException : TaskyException -{ - this(string msg) - { - super("SubmissionException: "~msg); - } -} - -public final class DescriptorException : TaskyException -{ - this(string msg) - { - super("DescriptorException: "~msg); - } -} - -/** -* Raised if the underlying socket dies (connection closes) -* or (TODO: check that Tasky shutdown does not cause this to weirdly go off by calling tmanager.shutdown()) -*/ -public final class SessionError : TaskyException -{ - this(string msg) - { - super("SessionError: "~msg); - } -} \ No newline at end of file diff --git a/source/tasky/jobs.d b/source/tasky/jobs.d deleted file mode 100644 index 57b4b6c..0000000 --- a/source/tasky/jobs.d +++ /dev/null @@ -1,505 +0,0 @@ -/** -* Jobs -* -* Contains tools for describing different types -* of jobs and what event handlers will be triggered -* for them along with the creation of actual -* Jobs (schedulable units). -*/ -module tasky.jobs; - -/* TODO: Remove this import */ -import std.stdio; - -import tasky.exceptions : TaskyException, DescriptorException; -/* TODO: DList stuff */ -import std.container.dlist; -import core.sync.mutex : Mutex; - -import std.string : cmp; -import eventy.signal : Signal; -import eventy.event : Event; - -// <<<<<<< Updated upstream -import std.conv : to; -// ======= - -import tasky.engine : Engine, TaskyEvent; -// >>>>>>> Stashed changes - -/** -* A Job to be scheduled -*/ -public final class Job -{ - - - /** - * TODO: Comment - */ - private Descriptor descriptor; - private byte[] payload; - - /** - * Returns the classification of this Job, i.e. - * its Descriptor number - */ - public ulong getJobTypeID() - { - return descriptor.getDescriptorClass(); - } - - private this(Descriptor jobType, byte[] payload) - { - /* TODO: Extract needed information from here */ - this.descriptor = jobType; - this.payload = payload; - } - - protected Job newJob(Descriptor jobType, byte[] payload) - { - /** - * This is mark protected for a reason, don't - * try and call this directly - */ - assert(jobType); - assert(payload.length); - - return new Job(jobType, payload); - } - - - - -} - -public final class JobException : TaskyException -{ - this(string message) - { - super("(JobError) "~message); - } -} - -/** -* Descriptor -* -* This represents a type of Job, represented -* by a unique ID. Along with this is an associated -* signal handler provided by the user which is -* to be run on completion of said Job -* -* TODO: Add support for custom IDs -*/ -public abstract class Descriptor : Signal -{ - /** - * Descriptor ID reservation sub-system - */ - private static __gshared Mutex descQueueLock; - private static __gshared DList!(ulong) descQueue; - - /** - * All descriptors (pool) - */ - private static __gshared DList!(Descriptor) descPool; - - /** - * Descriptor data - * - * The signal handler that handles the running - * of any job associated with this Descriptor - * - * We should `alias can we? - */ - private immutable ulong descriptorClass; - - /** - * Static initialization of the descriptor - * class ID queue's lock - */ - __gshared static this() - { - descQueueLock = new Mutex(); - } - - /* TODO: Static (and _gshared cross threads) ID tracker */ - /** - * Checks whether a Descriptor class has been registered - * previously that has the same ID but not the same - * equality (i.e. not spawned from the same object) - */ - public static bool isDescriptorClass(Descriptor descriptor) - { - /* TODO: Add the implementation for this */ - return false; - } - - - /** - * Returns true if the given descriptor ID is in - * use, false otherwise - * - * @param - */ - private static bool isDescIDInUse(ulong descID) - { - descQueueLock.lock(); - - foreach(ulong descIDCurr; descQueue) - { - if(descID == descIDCurr) - { - descQueueLock.unlock(); - return true; - } - } - - descQueueLock.unlock(); - - return false; - } - - - - - /** - * Test unique descriptor class ID generation - * and tracking - */ - unittest - { - ulong s1 = addDescQueue(); - ulong s2 = addDescQueue(); - - assert(s1 != s2); - } - - - - /** - * Finds the next valid descriptor class ID, - * reserves it and returns it - */ - private static ulong addDescQueue() - { - ulong descID; - - descQueueLock.lock(); - - - do - { - descID = generateDescID(); - } - while(isDescIDInUse(descID)); - - - descQueue ~= descID; - - - descQueueLock.unlock(); - - return descID; - } - - /** - * Gneerates a Descriptor ID - * - * This returns a string that is a hash of - * the current time - */ - private static ulong generateDescID() - { - /* Get current time */ - import std.datetime.systime : Clock; - string time = Clock.currTime().toString(); - - /* Get random number */ - /* TODO: Get random number */ - string randnum; - - /* Create data string */ - string data = time~randnum; - - /* Calculate the hash */ - import std.digest.sha; - import std.digest; - - SHA1Digest sha = new SHA1Digest(); - - /** - * We will store the digest as the first 8 - * bytes of the hash - */ - ulong digest; - ubyte[] hashDigest = sha.digest(data); - - digest = *(cast(ulong*)hashDigest.ptr); - - return digest; - } - - - - - - - - - /** - * Creates a new Descriptor - * - * FIXME: What if we cannot get a valid ID? Throw an exception - */ - this() - { - /* Grab a descriptor ID */ - descriptorClass = addDescQueue(); - - /** - * Setup a new Eventy Signal handler - * which handles only the typeID - * of `descriptorClass` - */ - super([descriptorClass]); - } - - /** - * Given a descriptor class this will attempt adding it, - * on failure false is returned, on sucess, true - */ - private bool addClass(ulong descriptorClass) - { - bool status; - - descQueueLock.lock(); - - /* Check if we can add it */ - if(!isDescIDInUse(descriptorClass)) - { - /* Add it to the ID queue */ - descQueue ~= descriptorClass; - - /* Set status to successful */ - status = true; - } - else - { - /* Set status to failure */ - status = false; - } - - descQueueLock.unlock(); - - return status; - } - - - /** - * Creates a new Descriptor (with a given fixed descriptor class) - * - * TODO: Future support (add this in after TaskyEvent things) - */ - this(ulong descriptorClass) - { - /* Attempt adding */ - if(addClass(descriptorClass)) - { - /* Set the descriptor ID */ - this.descriptorClass = descriptorClass; - } - else - { - /* Throw an exception if the ID is already in use */ - throw new DescriptorException("Given ID '"~to!(string)(descriptorClass)~"' is already in use"); - } - - /** - * Setup a new Eventy Signal handler - * which handles only the typeID - * of `descriptorClass` - */ - super([descriptorClass]); - } - - - /** - * Tests custom descriptor IDs - * - * FIXME: The unittest ordering to be kinda important - */ - - unittest - { - /** - * Add a descriptor with ID 1, this should pass - */ - try - { - class DescTest : Descriptor - { - this() - { - super(1); - } - - public override void handler_TaskyEvent(TaskyEvent e) - { - writeln("Event id ", e.id); - } - } - - Descriptor newDesc = new DescTest(); - assert(newDesc.getDescriptorClass() == 1); - } - catch(DescriptorException e) - { - assert(false); - } - - } - - unittest - { - /** - * Add a descriptor with ID 2, this should pass - */ - try - { - class DescTest : Descriptor - { - this() - { - super(2); - } - - public override void handler_TaskyEvent(TaskyEvent e) - { - writeln("Event id ", e.id); - } - } - - Descriptor newDesc = new DescTest(); - assert(newDesc.getDescriptorClass() == 2); - } - catch(DescriptorException e) - { - assert(false); - } - } - - unittest - { - /** - * Add a descriptor with ID 2, this should pass - */ - try - { - class DescTest : Descriptor - { - this() - { - super(2); - } - - public override void handler_TaskyEvent(TaskyEvent e) - { - writeln("Event id ", e.id); - } - } - - Descriptor newDesc = new DescTest(); - assert(false); - } - catch(DescriptorException e) - { - assert(true); - } - } - - unittest - { - - - try - { - /** - * Create a uniqye Descriptor for a future - * Job that will run the function `test` - * on completion (reply) - */ - class DescTest : Descriptor - { - this() - { - - } - - - public override void handler_TaskyEvent(TaskyEvent e) - { - writeln("Event id ", e.id); - } - - } - - new DescTest(); - - assert(true); - } - catch(TaskyException) - { - assert(false); - } - } - - /** - * Instantiates a Job based on this Descriptor - * ("Job template") with the given payload - * to be sent - */ - public final Job spawnJob(byte[] payload) - { - Job instantiatedDescriptor; - - if(payload.length == 0) - { - throw new JobException("JobSpawnError: Empty payloads not allowed"); - } - else - { - instantiatedDescriptor = new Job(this, payload); - } - - return instantiatedDescriptor; - } - - public final ulong getDescriptorClass() - { - return descriptorClass; - } - - /** - * Override this to handle Event - * - * TODO: This should be final and non-abstract - * and take in `Event e`, then the suer overrides - * one that takes in a TaskyEvent, this handler - * must call that one and THAT one must be abstract. - * - * This would make a lot more sense seeing how we - * always want to pack data. - * - * TODO: Make this named _entry and other named handler - * - */ - public final override void handler(Event e) - { - handler_TaskyEvent(cast(TaskyEvent)e); - } - - public abstract void handler_TaskyEvent(TaskyEvent e); -} - - diff --git a/source/tasky/package.d b/source/tasky/package.d index 5de4fa9..4d1edc5 100644 --- a/source/tasky/package.d +++ b/source/tasky/package.d @@ -1,5 +1,3 @@ module tasky; -public import tasky.engine; -public import tasky.jobs; -public import tasky.exceptions; +// TODO: do this