mirror of https://github.com/deavmi/tasky.git
Compare commits
75 Commits
|
@ -13,3 +13,6 @@ tasky-test-*
|
|||
*.o
|
||||
*.obj
|
||||
*.lst
|
||||
dub.selections.json
|
||||
.gitignore
|
||||
libtasky.a
|
||||
|
|
|
@ -0,0 +1,18 @@
|
|||
tasky
|
||||
=====
|
||||
|
||||
![](branding/logo.png)
|
||||
|
||||
## What is it?
|
||||
|
||||
TODO: Describe here.
|
||||
|
||||
|
||||
## Adding to your project
|
||||
|
||||
The tasky library is [available on DUB](https://code.dlang.org/packages/tasky) and can easily
|
||||
be added to your project ny running the following command:
|
||||
|
||||
```bash
|
||||
dub add tasky
|
||||
```
|
Binary file not shown.
After Width: | Height: | Size: 24 KiB |
Binary file not shown.
14
dub.json
14
dub.json
|
@ -1,15 +1,13 @@
|
|||
{
|
||||
"authors": [
|
||||
"Tristan B. Kildaire"
|
||||
"Tristan B. Velloza Kildaire"
|
||||
],
|
||||
"copyright": "Copyright © 2021, Tristan B. Kildaire",
|
||||
"copyright": "Copyright © 2023, Tristan B. Kildaire",
|
||||
"dependencies": {
|
||||
"bformat": "~>3.1.3",
|
||||
"eventy": "0.1.3",
|
||||
"tristanable": "~>2.3.10"
|
||||
"tristanable": "3.2.0-beta"
|
||||
},
|
||||
"description": "A minimal D application.",
|
||||
"license": "LGPL+",
|
||||
"description": "Tagged network-message task engine",
|
||||
"license": "LGPL v3",
|
||||
"name": "tasky",
|
||||
"targetType": "library"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,8 +0,0 @@
|
|||
{
|
||||
"fileVersion": 1,
|
||||
"versions": {
|
||||
"bformat": "3.1.3",
|
||||
"eventy": "0.1.3",
|
||||
"tristanable": "2.3.13"
|
||||
}
|
||||
}
|
|
@ -1,375 +1,54 @@
|
|||
module tasky.engine;
|
||||
|
||||
import std.container.dlist : DList;
|
||||
import core.sync.mutex : Mutex;
|
||||
import tristanable.manager;
|
||||
import std.socket;
|
||||
import tristanable.queue : Queue;
|
||||
import tristanable.queueitem;
|
||||
import tristanable.encoding : DataMessage, encodeForSend;
|
||||
import eventy;
|
||||
import tristanable : Manager, Queue, TaggedMessage;
|
||||
import tasky.request : Request;
|
||||
|
||||
import core.thread : Thread;
|
||||
|
||||
import std.stdio;
|
||||
|
||||
unittest
|
||||
public class Engine
|
||||
{
|
||||
import std.stdio;
|
||||
private Manager tManager;
|
||||
|
||||
/**
|
||||
* Server process
|
||||
*/
|
||||
Socket servSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
servSocket.bind(parseAddress("::1", 0));
|
||||
servSocket.listen(0);
|
||||
|
||||
auto serverThread = new class Thread
|
||||
this(Manager tristanableManager)
|
||||
{
|
||||
this()
|
||||
{
|
||||
super(&worker);
|
||||
}
|
||||
|
||||
private void worker()
|
||||
{
|
||||
|
||||
while(true)
|
||||
{
|
||||
Socket client = servSocket.accept();
|
||||
|
||||
import bmessage;
|
||||
|
||||
byte[] data;
|
||||
receiveMessage(client, data);
|
||||
writeln("Server received: ", data);
|
||||
|
||||
byte[] dataOut = [65,66,66,65];
|
||||
DataMessage dOut = new DataMessage(0, dataOut);
|
||||
client.send(bmessage.encodeBformat(dOut.encode()));
|
||||
|
||||
|
||||
/* Wait for a single byte (for preparation) */
|
||||
// byte[] k = [1];
|
||||
// client.receive(k);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/* Start the server thread */
|
||||
serverThread.start();
|
||||
|
||||
/* Open a socket to the server */
|
||||
Socket conn = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
|
||||
conn.connect(servSocket.localAddress);
|
||||
|
||||
/* Start the task manager */
|
||||
TaskManager taskManager = new TaskManager(conn);
|
||||
|
||||
/* Create a Task to submit as a job */
|
||||
TestTask testTask = new TestTask("Hello, world, this is a test message");
|
||||
taskManager.submitTask(testTask);
|
||||
|
||||
|
||||
}
|
||||
|
||||
public final class TaskManager : Thread
|
||||
{
|
||||
/**
|
||||
* Job queue
|
||||
*/
|
||||
private DList!(Job) jobs;
|
||||
private Mutex jobsLock;
|
||||
|
||||
/*
|
||||
* Tristanable queue filter
|
||||
*/
|
||||
private Manager manager;
|
||||
|
||||
/**
|
||||
* Event-loop
|
||||
*/
|
||||
private Engine eventEngine;
|
||||
|
||||
this(Socket socket)
|
||||
{
|
||||
super(&worker);
|
||||
|
||||
/* Initialize tristanable */
|
||||
manager = new Manager(socket);
|
||||
|
||||
/* Initialize the event-loop */
|
||||
eventEngine = new Engine();
|
||||
|
||||
/* Start the event engine */
|
||||
eventEngine.start();
|
||||
|
||||
/* Initialize job queue lock */
|
||||
jobsLock = new Mutex();
|
||||
|
||||
/* Start the thread */
|
||||
start();
|
||||
this.tManager = tristanableManager;
|
||||
}
|
||||
|
||||
private void worker()
|
||||
// TODO: Continue working on this
|
||||
|
||||
// TODO: Allow registering ResponseAnonymous with handlers etc
|
||||
|
||||
/**
|
||||
* Takes a request and sends it through to the endpoint
|
||||
* afterwhich we block for a response and when we get one
|
||||
* we run the handler, specified by the original request,
|
||||
* on the response data
|
||||
*
|
||||
* Params:
|
||||
* req = the `Request` to send
|
||||
*/
|
||||
public void makeRequest(Request req)
|
||||
{
|
||||
while(true)
|
||||
/* Get a unique queue */
|
||||
Queue newQueue = tManager.getUniqueQueue();
|
||||
|
||||
/* Create a tagged message with the tag */
|
||||
ulong tag = newQueue.getID();
|
||||
TaggedMessage tReq = new TaggedMessage(tag, req.getRequestData());
|
||||
|
||||
/* Send the message */
|
||||
tManager.sendMessage(tReq);
|
||||
|
||||
/* Does this Request expect a response? */
|
||||
// TODO: We need not register the queue even if this is the case
|
||||
if(req.expectsResponse())
|
||||
{
|
||||
/* Lock the job queue */
|
||||
jobsLock.lock();
|
||||
/* Await for a response */
|
||||
byte[] resp = newQueue.dequeue().getPayload();
|
||||
|
||||
/* Clean list (list of jobs to be removed) */
|
||||
Job[] cleanList;
|
||||
|
||||
// writeln("Task: Loop begin");
|
||||
|
||||
foreach(Job job; jobs)
|
||||
{
|
||||
writeln("Tasky: Job process begin ", job);
|
||||
|
||||
/* If the job is fulfilled */
|
||||
if(job.isFulfilled())
|
||||
{
|
||||
/* Get the Event for dispatching */
|
||||
Event dispatchEvent = job.getEventForDispatch();
|
||||
|
||||
|
||||
writeln("Tasky: Job is fulfilled ", job);
|
||||
|
||||
/* Dispatch the event */
|
||||
eventEngine.push(dispatchEvent);
|
||||
|
||||
/* Free the tristanable tag for this job */
|
||||
job.complete();
|
||||
|
||||
/* Add job to the deletion queue */
|
||||
cleanList ~= job;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
/* Delete tje jobs */
|
||||
foreach(Job job; cleanList)
|
||||
{
|
||||
jobs.linearRemoveElement(job);
|
||||
}
|
||||
|
||||
/* Unlock the job queue */
|
||||
jobsLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Job
|
||||
*
|
||||
* Represents an enqueued (in-progress) task with
|
||||
* an associated tristanable tag
|
||||
*
|
||||
* Created by the task manager and not to be used
|
||||
* by the user at all
|
||||
*/
|
||||
private final class Job
|
||||
{
|
||||
private Task task;
|
||||
private Queue tristanableTag;
|
||||
|
||||
this(Task task, Queue tristanableTag)
|
||||
{
|
||||
this.task = task;
|
||||
this.tristanableTag = tristanableTag;
|
||||
/* Run the response handler with the response */
|
||||
req.process(resp);
|
||||
}
|
||||
|
||||
public Task getTask()
|
||||
{
|
||||
return task;
|
||||
}
|
||||
|
||||
public DataMessage encode()
|
||||
{
|
||||
/* Get the Task's data to be sent */
|
||||
byte[] taskPayload = task.getData();
|
||||
|
||||
/* Encode into tristanable format */
|
||||
DataMessage tEncoded = new DataMessage(tristanableTag.getTag(), taskPayload);
|
||||
|
||||
return tEncoded;
|
||||
}
|
||||
|
||||
public Event getEventForDispatch()
|
||||
{
|
||||
/* Dequeue the data from the tristanable queue */
|
||||
QueueItem queueItem = tristanableTag.dequeue();
|
||||
byte[] receivedData = queueItem.getData();
|
||||
|
||||
/* Parse into Event (based on the Job's task type) and return */
|
||||
Event eventToDispatch = task.getEvent(receivedData);
|
||||
|
||||
return eventToDispatch;
|
||||
}
|
||||
|
||||
public bool isFulfilled()
|
||||
{
|
||||
return tristanableTag.poll();
|
||||
}
|
||||
|
||||
public void complete()
|
||||
{
|
||||
manager.removeQueue(tristanableTag);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Registers the type of Task by the Event it returns
|
||||
*
|
||||
* This is always called by `submitTask` but is only
|
||||
* ever used once to
|
||||
*/
|
||||
public void registerTaskType(Task task)
|
||||
{
|
||||
/* Task typeID */
|
||||
ulong typeID = task.getTypeID();
|
||||
|
||||
/* Get the EventHandler */
|
||||
EventHandler handler = task.getHandler();
|
||||
|
||||
/* Check if there is already such a handler */
|
||||
/* FIXME: This should (in eventy) take a ulong, semantics of taking in EVent give it a weird meaning */
|
||||
bool signalExists = eventEngine.getSignalsForEvent(new Event(typeID)).length > 0;
|
||||
|
||||
/* If no such signal handler exists, then add it */
|
||||
if(!signalExists)
|
||||
{
|
||||
Signal signalHandler = new Signal([typeID], handler);
|
||||
eventEngine.addSignalHandler(signalHandler);
|
||||
|
||||
/* Because this happens at the same time and a queue for this type would exist add that too */
|
||||
/* TODO: Make eventy crash if typeID for non-existent queue */
|
||||
eventEngine.addQueue(typeID);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Submits a new Task, enqueues it as a job,
|
||||
* sends the payload
|
||||
*/
|
||||
public void submitTask(Task task)
|
||||
{
|
||||
/* Get a unique tristanable ID for the new job */
|
||||
Queue newQueue = manager.generateQueue();
|
||||
|
||||
/* If the queue generation was successful */
|
||||
if(newQueue)
|
||||
{
|
||||
/* Register the task (if not already done) */
|
||||
registerTaskType(task);
|
||||
|
||||
/* Create a new job */
|
||||
Job newJob = new Job(task, newQueue);
|
||||
|
||||
/* Lock the job queue */
|
||||
jobsLock.lock();
|
||||
|
||||
/* Enqueue the job */
|
||||
jobs ~= newJob;
|
||||
|
||||
/* Unlock the job queue */
|
||||
jobsLock.unlock();
|
||||
|
||||
/* Get the DataMessage of the job */
|
||||
DataMessage jobDMessage = newJob.encode();
|
||||
|
||||
/* Encode for sending (bformat) */
|
||||
byte[] bEncoded = encodeForSend(jobDMessage);
|
||||
|
||||
/* Send the payload */
|
||||
manager.getSocket().send(bEncoded);
|
||||
|
||||
writeln("Tasky: Sent payload");
|
||||
}
|
||||
/* If unsuccessful, throw exception */
|
||||
else
|
||||
{
|
||||
/* TODO: Add an exception */
|
||||
}
|
||||
|
||||
/* Lock the jobs */
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Represents a Task
|
||||
*/
|
||||
public abstract class Task
|
||||
{
|
||||
private byte[] data;
|
||||
private ulong typeID;
|
||||
private EventHandler handler;
|
||||
|
||||
/*
|
||||
* Constructs a new Task with the given data to be
|
||||
* sent and a typeID that reoresents which Signal
|
||||
* handler to call
|
||||
*/
|
||||
this(byte[] data, ulong typeID, EventHandler handler)
|
||||
{
|
||||
this.data = data;
|
||||
this.typeID = typeID;
|
||||
this.handler = handler;
|
||||
}
|
||||
|
||||
public byte[] getData()
|
||||
{
|
||||
return data;
|
||||
}
|
||||
|
||||
public ulong getTypeID()
|
||||
{
|
||||
return typeID;
|
||||
}
|
||||
|
||||
public EventHandler getHandler()
|
||||
{
|
||||
return handler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Intended to take the received data from the Job's
|
||||
* tristanable queue and decode it as per this Task's
|
||||
* type
|
||||
*/
|
||||
public abstract Event getEvent(byte[] dataIn);
|
||||
}
|
||||
|
||||
public final class TestTask : Task
|
||||
{
|
||||
this(string payloadOut)
|
||||
{
|
||||
super(cast(byte[])payloadOut, 69, &TestTaskHandlerFunc);
|
||||
}
|
||||
|
||||
private static void TestTaskHandlerFunc(Event e)
|
||||
{
|
||||
import std.stdio;
|
||||
writeln("<<<<Tasky task diapatched>>>>", e);
|
||||
}
|
||||
|
||||
public override Event getEvent(byte[] dataIn)
|
||||
{
|
||||
auto event = new class Event
|
||||
{
|
||||
this()
|
||||
{
|
||||
/* TestTask is of type 69 for signal dispatching in Eventy */
|
||||
super(getTypeID());
|
||||
}
|
||||
};
|
||||
|
||||
return event;
|
||||
/* De-register the queue */
|
||||
tManager.releaseQueue(newQueue);
|
||||
}
|
||||
}
|
|
@ -1,3 +1,4 @@
|
|||
module tasky;
|
||||
|
||||
public import tasky.engine;
|
||||
public import tasky.engine;
|
||||
public import tasky.request : Request, ResponseHandler;
|
|
@ -0,0 +1,38 @@
|
|||
module tasky.request;
|
||||
|
||||
import tristanable.encoding : TaggedMessage;
|
||||
|
||||
public alias ResponseHandler = void function(byte[]);
|
||||
|
||||
public abstract class Request
|
||||
{
|
||||
private byte[] requestMessage;
|
||||
|
||||
private ResponseHandler respFunc;
|
||||
|
||||
protected this(byte[] requestMessage, ResponseHandler respFunc)
|
||||
{
|
||||
this.requestMessage = requestMessage;
|
||||
this.respFunc = respFunc;
|
||||
}
|
||||
|
||||
protected this(byte[] requestMessage)
|
||||
{
|
||||
this(requestMessage, null);
|
||||
}
|
||||
|
||||
package final byte[] getRequestData()
|
||||
{
|
||||
return requestMessage;
|
||||
}
|
||||
|
||||
package final void process(byte[] responseData)
|
||||
{
|
||||
respFunc(responseData);
|
||||
}
|
||||
|
||||
package final bool expectsResponse()
|
||||
{
|
||||
return respFunc !is null;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue