diff --git a/source/tasky/engine.d b/source/tasky/engine.d new file mode 100644 index 0000000..4aea105 --- /dev/null +++ b/source/tasky/engine.d @@ -0,0 +1,276 @@ +module tasky.engine; + +import std.container.dlist : DList; +import core.sync.mutex : Mutex; +import tristanable.manager; +import std.socket : Socket; +import tristanable.queue : Queue; +import tristanable.encoding : DataMessage, encodeForSend; +import eventy; + +import core.thread : Thread; + +public final class TaskManager : Thread +{ + /** + * Job queue + */ + private DList!(Job) jobs; + private Mutex jobsLock; + + /* + * Tristanable queue filter + */ + private Manager manager; + + /** + * Event-loop + */ + private Engine eventEngine; + + this(Socket socket) + { + super(&worker); + + /* Initialize tristanable */ + manager = new Manager(socket); + + /* Initialize the event-loop */ + eventEngine = new Engine(); + + /* Start the thread */ + start(); + } + + private void worker() + { + while(true) + { + /* Lock the job queue */ + jobsLock.lock(); + + /* Clean list (list of jobs to be removed) */ + Job[] cleanList; + + foreach(Job job; jobs) + { + /* If the job is fulfilled */ + if(job.isFulfilled()) + { + /* Get the job's task */ + Task jobTask = job.getTask(); + + /* Free the tristanable tag for this job */ + job.complete(); + + /* Add job to the deletion queue */ + cleanList ~= job; + } + } + + /* Delete tje jobs */ + foreach(Job job; cleanList) + { + jobs.linearRemoveElement(job); + } + + /* Unlock the job queue */ + jobsLock.unlock(); + } + } + + + /** + * Job + * + * Represents an enqueued (in-progress) task with + * an associated tristanable tag + * + * Created by the task manager and not to be used + * by the user at all + */ + private final class Job + { + private Task task; + private Queue tristanableTag; + + this(Task task, Queue tristanableTag) + { + this.task = task; + this.tristanableTag = tristanableTag; + } + + public Task getTask() + { + return task; + } + + public DataMessage encode() + { + /* Get the Task's data to be sent */ + byte[] taskPayload = task.getData(); + + /* Encode into tristanable format */ + DataMessage tEncoded = new DataMessage(tristanableTag.getTag(), taskPayload); + + return tEncoded; + } + + public bool isFulfilled() + { + return tristanableTag.poll(); + } + + public void complete() + { + manager.removeQueue(tristanableTag); + } + } + + /* + * Registers the type of Task by the Event it returns + * + * This is always called by `submitTask` but is only + * ever used once to + */ + public void registerTaskType(Task task) + { + /* Task typeID */ + ulong typeID = task.getTypeID(); + + /* Get the EventHandler */ + EventHandler handler = task.getHandler(); + + /* Check if there is already such a handler */ + /* FIXME: This should (in eventy) take a ulong, semantics of taking in EVent give it a weird meaning */ + bool signalExists = eventEngine.getSignalsForEvent(new Event(typeID)).length > 0; + + /* If no such signal handler exists, then add it */ + if(!signalExists) + { + Signal signalHandler = new Signal([typeID], handler); + eventEngine.addSignalHandler(signalHandler); + } + + } + + /** + * Submits a new Task, enqueues it as a job, + * sends the payload + */ + public void submitTask(Task task) + { + /* Get a unique tristanable ID for the new job */ + Queue newQueue = manager.generateQueue(); + + /* If the queue generation was successful */ + if(newQueue) + { + /* Register the task (if not already done) */ + registerTaskType(task); + + /* Create a new job */ + Job newJob = new Job(task, newQueue); + + /* Lock the job queue */ + jobsLock.lock(); + + /* Enqueue the job */ + jobs ~= newJob; + + /* Unlock the job queue */ + jobsLock.unlock(); + + /* Get the DataMessage of the job */ + DataMessage jobDMessage = newJob.encode(); + + /* Encode for sending (bformat) */ + byte[] bEncoded = encodeForSend(jobDMessage); + + /* Send the payload */ + manager.getSocket().send(bEncoded); + } + /* If unsuccessful, throw exception */ + else + { + /* TODO: Add an exception */ + } + + /* Lock the jobs */ + } + +} + + + +/** +* Represents a Task +*/ +public abstract class Task +{ + private byte[] data; + private ulong typeID; + private EventHandler handler; + + /* + * Constructs a new Task with the given data to be + * sent and a typeID that reoresents which Signal + * handler to call + */ + this(byte[] data, ulong typeID, EventHandler handler) + { + this.data = data; + this.typeID = typeID; + this.handler = handler; + } + + public byte[] getData() + { + return data; + } + + public ulong getTypeID() + { + return typeID; + } + + public EventHandler getHandler() + { + return handler; + } + + /** + * Intended to take the received data from the Job's + * tristanable queue and decode it as per this Task's + * type + */ + public abstract Event getEvent(byte[] dataIn); +} + +public final class TestTask : Task +{ + this(string payloadOut) + { + super(cast(byte[])payloadOut, 69, &TestTaskHandlerFunc); + } + + private static void TestTaskHandlerFunc(Event e) + { + import std.stdio; + writeln("Poes", e); + } + + public override Event getEvent(byte[] dataIn) + { + auto event = new class Event + { + this() + { + /* TestTask is of type 69 for signal dispatching in Eventy */ + super(getTypeID()); + } + }; + + return event; + } +} \ No newline at end of file diff --git a/source/tasky/package.d b/source/tasky/package.d new file mode 100644 index 0000000..05da23b --- /dev/null +++ b/source/tasky/package.d @@ -0,0 +1,3 @@ +module tasky; + +public import tasky.engine; \ No newline at end of file