diff --git a/source/app.d b/source/app.d index 98e50f1..30a4c8c 100644 --- a/source/app.d +++ b/source/app.d @@ -33,10 +33,10 @@ unittest import tristanable.encoding; - DataMessage outMsg = new DataMessage(1, cast(byte[])"Hello there"); + DataMessage outMsg = new DataMessage(0, cast(byte[])"Hello there"); /* Await one byte (let it setup) */ - byte[] l1bytebuf = [1]; + byte[] l1bytebuf = [0]; client.receive(l1bytebuf); /* Encode tristanable encode */ @@ -62,10 +62,20 @@ unittest Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); clientSocket.connect(servSocket.localAddress); + + Manager manager = new Manager(clientSocket); - TQueue q = new TQueue(1); + TQueue q = new TQueue(6); manager.addQueue(q); + TaskManager tMan = new TaskManager(manager); + tMan.start(); + + + ReverseTask task = new ReverseTask(""); + task.eventType = new ReverseEvent(1, "Hello"); + + tMan.pushJob(task); clientSocket.send([cast(byte)1]); @@ -78,7 +88,6 @@ unittest if(cmp(cast(string)item.getData(), "Hello there") == 0) { assert(true); - } else { @@ -93,6 +102,129 @@ unittest //TaskManager tman = new TaskManager(); } +// unittest +// { +// import std.socket; +// Socket servSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); +// servSocket.bind(parseAddress("::1", 0)); +// servSocket.listen(0); + +// auto serverThread = new class Thread +// { +// this() +// { +// super(&worker); +// } + +// private void worker() +// { +// runing = true; +// while(runing) +// { +// Socket client = servSocket.accept(); + +// import tristanable.encoding; + +// DataMessage outMsg = new DataMessage(1, cast(byte[])"Hello there"); + +// /* Await one byte (let it setup) */ +// byte[] l1bytebuf = [1]; +// client.receive(l1bytebuf); + +// /* Encode tristanable encode */ +// client.send(encodeForSend(outMsg)); + + + + +// } + +// } + +// private bool runing; + +// public void stopThread() +// { +// runing=false; +// } +// }; + +// serverThread.start(); + +// Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP); +// clientSocket.connect(servSocket.localAddress); + +// Manager manager = new Manager(clientSocket); +// TQueue q = new TQueue(1); +// manager.addQueue(q); + + + +// clientSocket.send([cast(byte)1]); + +// QueueItem item = q.dequeue(); +// writeln("Received:", cast(string)item.getData()); + +// import std.string; + +// if(cmp(cast(string)item.getData(), "Hello there") == 0) +// { +// assert(true); + +// } +// else +// { +// assert(false); +// } + +// manager.shutdown(); + +// serverThread.stopThread(); + +// //manager.start(); +// //TaskManager tman = new TaskManager(); +// } + + + + +public class ReverseEvent : Event +{ + public string message; + + this(ulong id, string message) + { + super(id); + + this.message = message; + } +} + +public final class ReverseTask : Task +{ + + + this(string words) + { + super(cast(byte[])words); + } + + public static void reverseHandler(Event e) + { + import std.stdio; + import std.string; + writeln(capitalize((cast(ReverseEvent)e).message)); + } + + + + private void sethandlers() + { + /* Set some signal handlers */ + handlers = [new Signal([eventType.id], &reverseHandler)]; + } +} + public class Task { /** @@ -108,7 +240,6 @@ public class Task return eventType; } - private byte[] dataToSend; this(byte[] dataToSend) @@ -170,6 +301,7 @@ public final class TaskManager : Thread this.currentTasksLock = new Mutex(); eventEngine = new Engine(); + eventEngine.start(); } private void worker() @@ -178,11 +310,16 @@ public final class TaskManager : Thread { currentTasksLock.lock(); + Task[] tasksToBeRemoved; + foreach(Task task; currentTasks) { /* Find the matching tristananble queue */ TQueue tQueue = manager.getQueue(task.getID()); + + writeln(tQueue); + /* TODO: Poll queue here */ if(tQueue.poll()) { @@ -194,13 +331,31 @@ public final class TaskManager : Thread /* TODO: Add dispatch here */ eventEngine.push(task.getEvent()); + + /* Add to list of tasks to delete (for-loop list safety) */ + tasksToBeRemoved~=task; } } + /* Delete the tasks marked for deletion */ + foreach(Task task; tasksToBeRemoved) + { + removeTask(task); + } + currentTasksLock.unlock(); } } + private void removeTask(Task task) + { + currentTasksLock.lock(); + + currentTasks.linearRemoveElement(task); + + currentTasksLock.unlock(); + } + /** * Given a Task, `task`, this will */