Seems to work

This commit is contained in:
Tristan B. Velloza Kildaire 2021-09-09 19:51:42 +02:00
parent c17dbc8f2a
commit 4124ed889f
1 changed files with 160 additions and 5 deletions

View File

@ -33,10 +33,10 @@ unittest
import tristanable.encoding;
DataMessage outMsg = new DataMessage(1, cast(byte[])"Hello there");
DataMessage outMsg = new DataMessage(0, cast(byte[])"Hello there");
/* Await one byte (let it setup) */
byte[] l1bytebuf = [1];
byte[] l1bytebuf = [0];
client.receive(l1bytebuf);
/* Encode tristanable encode */
@ -62,10 +62,20 @@ unittest
Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
clientSocket.connect(servSocket.localAddress);
Manager manager = new Manager(clientSocket);
TQueue q = new TQueue(1);
TQueue q = new TQueue(6);
manager.addQueue(q);
TaskManager tMan = new TaskManager(manager);
tMan.start();
ReverseTask task = new ReverseTask("");
task.eventType = new ReverseEvent(1, "Hello");
tMan.pushJob(task);
clientSocket.send([cast(byte)1]);
@ -78,7 +88,6 @@ unittest
if(cmp(cast(string)item.getData(), "Hello there") == 0)
{
assert(true);
}
else
{
@ -93,6 +102,129 @@ unittest
//TaskManager tman = new TaskManager();
}
// unittest
// {
// import std.socket;
// Socket servSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
// servSocket.bind(parseAddress("::1", 0));
// servSocket.listen(0);
// auto serverThread = new class Thread
// {
// this()
// {
// super(&worker);
// }
// private void worker()
// {
// runing = true;
// while(runing)
// {
// Socket client = servSocket.accept();
// import tristanable.encoding;
// DataMessage outMsg = new DataMessage(1, cast(byte[])"Hello there");
// /* Await one byte (let it setup) */
// byte[] l1bytebuf = [1];
// client.receive(l1bytebuf);
// /* Encode tristanable encode */
// client.send(encodeForSend(outMsg));
// }
// }
// private bool runing;
// public void stopThread()
// {
// runing=false;
// }
// };
// serverThread.start();
// Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
// clientSocket.connect(servSocket.localAddress);
// Manager manager = new Manager(clientSocket);
// TQueue q = new TQueue(1);
// manager.addQueue(q);
// clientSocket.send([cast(byte)1]);
// QueueItem item = q.dequeue();
// writeln("Received:", cast(string)item.getData());
// import std.string;
// if(cmp(cast(string)item.getData(), "Hello there") == 0)
// {
// assert(true);
// }
// else
// {
// assert(false);
// }
// manager.shutdown();
// serverThread.stopThread();
// //manager.start();
// //TaskManager tman = new TaskManager();
// }
public class ReverseEvent : Event
{
public string message;
this(ulong id, string message)
{
super(id);
this.message = message;
}
}
public final class ReverseTask : Task
{
this(string words)
{
super(cast(byte[])words);
}
public static void reverseHandler(Event e)
{
import std.stdio;
import std.string;
writeln(capitalize((cast(ReverseEvent)e).message));
}
private void sethandlers()
{
/* Set some signal handlers */
handlers = [new Signal([eventType.id], &reverseHandler)];
}
}
public class Task
{
/**
@ -108,7 +240,6 @@ public class Task
return eventType;
}
private byte[] dataToSend;
this(byte[] dataToSend)
@ -170,6 +301,7 @@ public final class TaskManager : Thread
this.currentTasksLock = new Mutex();
eventEngine = new Engine();
eventEngine.start();
}
private void worker()
@ -178,11 +310,16 @@ public final class TaskManager : Thread
{
currentTasksLock.lock();
Task[] tasksToBeRemoved;
foreach(Task task; currentTasks)
{
/* Find the matching tristananble queue */
TQueue tQueue = manager.getQueue(task.getID());
writeln(tQueue);
/* TODO: Poll queue here */
if(tQueue.poll())
{
@ -194,13 +331,31 @@ public final class TaskManager : Thread
/* TODO: Add dispatch here */
eventEngine.push(task.getEvent());
/* Add to list of tasks to delete (for-loop list safety) */
tasksToBeRemoved~=task;
}
}
/* Delete the tasks marked for deletion */
foreach(Task task; tasksToBeRemoved)
{
removeTask(task);
}
currentTasksLock.unlock();
}
}
private void removeTask(Task task)
{
currentTasksLock.lock();
currentTasks.linearRemoveElement(task);
currentTasksLock.unlock();
}
/**
* Given a Task, `task`, this will
*/