Compare commits

...

75 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire a51df3b699 Engine
- If a `Request` is not expecting a response then do not call `dequeue()` on the `Queue` registered nor `process`byte[])`
- Added a TODO comment
2023-05-04 14:17:35 +02:00
Tristan B. Velloza Kildaire 8387d0ee78 Request
- Implemented `expectsResponse()` which returns `false` if no response handler is assigned (implying no response is expected), `true` otherwise
2023-05-04 14:15:19 +02:00
Tristan B. Velloza Kildaire 7af21a909a Request
- Allow constructing WITHOUT a `ResponseHandler` (it remains as `null)
2023-05-04 11:04:32 +02:00
Tristan B. Velloza Kildaire 7072c852dc Request
- Removed now-completed TODO
2023-05-04 11:03:11 +02:00
Tristan B. Velloza Kildaire 93d6d49c20 - Clean up 2023-05-04 10:54:20 +02:00
Tristan B. Velloza Kildaire bb1dcef991 Engine
- `makeRequest(Request)` now cleans up by removing the adhoc `Queue` it registered in the beginning ofd the process
2023-05-04 09:55:00 +02:00
Tristan B. Velloza Kildaire 20da370f0e - Upgraded to `tristanable` `v3.2.0-beta` which introduces the required `releaseQueue(Queue)` and `releaseQueue_nothrow(Queue)` methods 2023-05-04 09:54:15 +02:00
Tristan B. Velloza Kildaire 4ef69ac670 Engine
- Documented `makeRequest(Request req)`
2023-05-03 21:58:06 +02:00
Tristan B. Velloza Kildaire 0b1c8242fd Engine
- Consumes a tristanable `Manager`
- Provides a `makeRequest(Request)` function which will generate a unique tristanable `Queue`, construct a `TaggedMessage` of the queue's ID with the gievn request data, it will then send the message via the tristanable `Manager`'s `sendMessage(TaggedMessage)` method, then await on the queue and lastly run the `Request`'s `ResponseHandler` function with the received data from the dequeued `TaggedMessage`

Request

- Defines a request with data to send and a function to handle a response
2023-05-03 21:20:19 +02:00
Tristan B. Velloza Kildaire bed096df27 - Removed direct dependency `bformat` and uneeded dependency `eventy` 2023-05-03 21:14:21 +02:00
Tristan B. Velloza Kildaire b0d0b19a4a - Added `tasky.engine` module
- Added `tasky.engine` module to the `tasky` package
2023-05-03 18:41:03 +02:00
Tristan B. Velloza Kildaire 0051099c1f - Updated the `.gitignore` 2023-05-03 18:40:27 +02:00
Tristan B. Velloza Kildaire 85a7558d79 - Upgraded tristanable from version `2.6.12` to the new beta `3.0.1-beta` 2023-04-05 15:51:25 +02:00
Tristan B. Velloza Kildaire 1cf310d92a - Updated `.gitignore`
- Removed `libtasky.a`
2023-03-19 18:20:51 +02:00
Tristan B. Velloza Kildaire 958e678ed2 - Removed `dub.selections.json` 2023-03-19 18:19:50 +02:00
Tristan B. Velloza Kildaire 852619f4a0 Dub
- Updated package copyright and authors
2023-03-19 18:11:23 +02:00
Tristan B. Velloza Kildaire b59bbf9c36 Restart 2023-03-19 18:09:58 +02:00
Tristan B. Velloza Kildaire 8e35e282c1 Descriptor must override a method that takes in a TaskyEvent as those will only ever be created, this means you no longer need to do an annoying cast from Event to TaskyEvent 2022-05-24 20:45:55 +02:00
Tristan B. Velloza Kildaire 726ad57706 Make TaskyEvent accessible at the module level 2022-05-24 20:28:28 +02:00
Tristan B. Velloza Kildaire 246adcc1f9 Added some rudimentary unittests that should have their order enforced
else stuff can go really wrong, these things run as part of a single process,
no restarts etc. Hence state is persisted across unittests
2022-05-24 19:53:26 +02:00
Tristan B. Velloza Kildaire 0d850d0508 Fixed compilation error by adding an implementation
for the DescriptorException used in jobs.d
2022-05-24 19:40:45 +02:00
Tristan B. Velloza Kildaire a7c18d5e10 Added ability to use a custom descriptor ID, throws exception if the given
descriptor ID is in use already
2022-05-24 19:39:49 +02:00
Tristan B. Velloza Kildaire 89347cb6d2 Future: Added addClass(ulong) to add a custom descriptor ID 2022-05-24 19:32:44 +02:00
Tristan B. Velloza Kildaire 2251a00284 Descriptor queue safety fix
Fixed a bug whereby if one were to get new Descriptor classes across
several threads then the Mutex for the descriptor queue would be reinitialzied
upon static initialization of the thread as the static initialization block
was not set to __gshared, causing a global re-write of a re-initted Mutex,
never the same Mutex
2022-05-24 19:23:46 +02:00
Tristan B. Velloza Kildaire 8ede79bf49 Future: Add support for fixed descriptor IDs 2022-05-24 19:20:39 +02:00
Tristan B. Velloza Kildaire bc6c90411a Added comment 2022-05-24 19:16:23 +02:00
Tristan B. Velloza Kildaire bfbf02efdb Added getPayload() method to TaskyEvent class 2022-05-24 15:08:37 +02:00
Tristan B. Velloza Kildaire 4380019665 Upgraded to tristanable v2.6.12 2022-05-21 15:19:15 +02:00
Tristan B. Velloza Kildaire b8ed656536 Fixed unit test 2022-05-19 21:08:50 +02:00
Tristan B. Velloza Kildaire e3ecc30497 Running status should be set from within startTasky() and after starting all sub-systems 2022-05-19 21:06:57 +02:00
Tristan B. Velloza Kildaire 92d1b1b900 Upgraded to new Eventy 2022-05-19 17:03:20 +02:00
Tristan B. Velloza Kildaire 8f8a1e724f Added stub method for future usage along with comments related to it 2022-05-19 16:51:44 +02:00
Tristan B. Velloza Kildaire 743744410c WIP: Adding some handling exposed via tristanable 2022-05-17 19:13:12 +02:00
Tristan B. Velloza Kildaire 3561ed8fda Added TODO list 2022-05-17 19:10:41 +02:00
Tristan B. Velloza Kildaire e64827b770 Upgraded tristanable 2022-05-10 15:24:42 +02:00
Tristan B. Velloza Kildaire 16ae65937a Added a shutdown() method which should stop tristanable and eventy, along with stopping the tasky main loop 2022-04-07 10:18:54 +02:00
Tristan B. Velloza Kildaire bcdc6121ca Cleaned up 2022-04-07 09:59:16 +02:00
Tristan B. Velloza Kildaire 8539485352 On timing out after 4 seconds of not having a task complete, fail the assertion test 2022-04-07 09:51:25 +02:00
Tristan B. Velloza Kildaire 1db1fad397 Cleaned up 2022-03-19 15:20:36 +02:00
Tristan B. Velloza Kildaire 78c22d3b4e Cleaned up 2022-03-19 15:20:27 +02:00
Tristan B. Velloza Kildaire d786130427 Removed uneeded function 2022-03-19 15:08:19 +02:00
Tristan B. Velloza Kildaire dfd0171c67 Pass unit tessts 2022-03-19 15:03:27 +02:00
Tristan B. Velloza Kildaire da5489a6b7 Using fixed tristanable 2022-03-19 13:40:21 +02:00
Tristan B. Velloza Kildaire 915b0ad115 WIP: GOtta figure out timing and maybe hot loops and shit 2022-03-19 12:57:59 +02:00
Tristan B. Velloza Kildaire 295d9ff672 Removed TODO
Don't let socket close
2022-03-19 12:29:04 +02:00
Tristan B. Velloza Kildaire e65459633c There are bugs or either timing is off for my test 2022-03-18 15:54:54 +02:00
Tristan B. Velloza Kildaire 20ec129f69 Testing multiple jobs 2022-03-17 16:47:09 +02:00
Tristan B. Velloza Kildaire 20e60a6f30 Works! 2022-03-17 16:32:52 +02:00
Tristan B. Velloza Kildaire 4c798aef6a WIP: Event dispatching loop 2022-03-16 17:10:15 +02:00
Tristan B. Velloza Kildaire 21e1b91abc Initialize tristanable, add a new tristanable queue on Descriptor registration 2022-01-21 10:21:45 +02:00
Tristan B. Velloza Kildaire d15d4bdb84 Removed TODO 2022-01-16 18:16:42 +02:00
Tristan B. Velloza Kildaire 773a2eb77f Added Tasky Engine initialization (partly), initialize the Eventy engine, implemented Descriptor registration 2022-01-16 18:12:10 +02:00
Tristan B. Velloza Kildaire 826f110b17 Added comments, Added descPool 2022-01-16 18:11:34 +02:00
Tristan B. Velloza Kildaire bd5179ee3d Updated to README.md 2022-01-16 16:17:16 +02:00
Tristan B. Velloza Kildaire df7f731218 Upgraded to new Eventy, integrated Eventy Signal\(\) into Descriptor 2022-01-16 15:16:13 +02:00
Tristan B. Velloza Kildaire 2712585684 Switched to 8-byte ulong descIDs, set descID to immutable, added signal handler creation and register the provided EventHandler with said Signal handler 2022-01-16 13:44:05 +02:00
Tristan B. Velloza Kildaire 36c84999cc Updated Descriptor class to use new string descIDs, updated Job to use new Desc IDs, added unit test for Descriptor creation 2022-01-16 13:04:23 +02:00
Tristan B. Velloza Kildaire 71e3476afd Make use of recursive mutex 2022-01-16 12:57:05 +02:00
Tristan B. Velloza Kildaire f532b1ef8b New descriptor class ID implementation 2022-01-16 12:54:13 +02:00
Tristan B. Velloza Kildaire 0d677f8610 Added descriptor class ID generator-and-reserver 2022-01-13 00:07:48 +02:00
Tristan B. Velloza Kildaire d657c36ec6 Initailzie descQueue and its Mutex 2022-01-12 18:44:11 +02:00
Tristan B. Velloza Kildaire a9b75d36d1 Moved Descriptor usage facility to the Descriptor class 2022-01-12 18:32:54 +02:00
Tristan B. Velloza Kildaire 197d1d82ab Added logo to the README 2022-01-12 17:55:47 +02:00
Tristan B. Velloza Kildaire 5ac0ccd408 Added branding 2022-01-12 17:55:31 +02:00
Tristan B. Velloza Kildaire c477243596 Updated README 2022-01-12 17:55:25 +02:00
Tristan B. Velloza Kildaire 4e48bf28e2 WIP: Added README 2022-01-12 17:48:12 +02:00
Tristan B. Velloza Kildaire 934852c2a8 WIP: Added plumbing for descriptor class clash handling 2022-01-12 17:47:50 +02:00
Tristan B. Velloza Kildaire 63db311cdd Work begun on implementing Jobs and their Descriptors 2022-01-12 17:29:03 +02:00
Tristan B. Velloza Kildaire 9bdb98423d Included exceptions module in package declaration 2022-01-12 17:28:41 +02:00
Tristan B. Velloza Kildaire e24834bc6c Cleaned up engine module 2022-01-12 17:28:25 +02:00
Tristan B. Velloza Kildaire 45acf418ca Added base exception type 2022-01-12 17:28:08 +02:00
Tristan B. Velloza Kildaire 90044e8ff5 Updated dub dependencies 2021-09-26 10:42:31 +02:00
Tristan B. Velloza Kildaire 0a16c111fc Cleaned up 2021-09-15 15:10:19 +02:00
Tristan B. Velloza Kildaire ccfad5a5c8 Updated description 2021-09-15 15:09:18 +02:00
Tristan B. Velloza Kildaire 2937208ee3 Fixed build 2021-09-15 14:55:53 +02:00
9 changed files with 106 additions and 377 deletions

3
.gitignore vendored
View File

@ -13,3 +13,6 @@ tasky-test-*
*.o
*.obj
*.lst
dub.selections.json
.gitignore
libtasky.a

18
README.md Normal file
View File

@ -0,0 +1,18 @@
tasky
=====
![](branding/logo.png)
## What is it?
TODO: Describe here.
## Adding to your project
The tasky library is [available on DUB](https://code.dlang.org/packages/tasky) and can easily
be added to your project ny running the following command:
```bash
dub add tasky
```

BIN
branding/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

BIN
branding/logo.xcf Normal file

Binary file not shown.

View File

@ -1,15 +1,13 @@
{
"authors": [
"Tristan B. Kildaire"
"Tristan B. Velloza Kildaire"
],
"copyright": "Copyright © 2021, Tristan B. Kildaire",
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>3.1.3",
"eventy": "0.1.3",
"tristanable": "~>2.3.10"
"tristanable": "3.2.0-beta"
},
"description": "A minimal D application.",
"license": "LGPL+",
"description": "Tagged network-message task engine",
"license": "LGPL v3",
"name": "tasky",
"targetType": "library"
}
}

View File

@ -1,8 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "3.1.3",
"eventy": "0.1.3",
"tristanable": "2.3.13"
}
}

View File

@ -1,375 +1,54 @@
module tasky.engine;
import std.container.dlist : DList;
import core.sync.mutex : Mutex;
import tristanable.manager;
import std.socket;
import tristanable.queue : Queue;
import tristanable.queueitem;
import tristanable.encoding : DataMessage, encodeForSend;
import eventy;
import tristanable : Manager, Queue, TaggedMessage;
import tasky.request : Request;
import core.thread : Thread;
import std.stdio;
unittest
public class Engine
{
import std.stdio;
private Manager tManager;
/**
* Server process
*/
Socket servSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
servSocket.bind(parseAddress("::1", 0));
servSocket.listen(0);
auto serverThread = new class Thread
this(Manager tristanableManager)
{
this()
{
super(&worker);
}
private void worker()
{
while(true)
{
Socket client = servSocket.accept();
import bmessage;
byte[] data;
receiveMessage(client, data);
writeln("Server received: ", data);
byte[] dataOut = [65,66,66,65];
DataMessage dOut = new DataMessage(0, dataOut);
client.send(bmessage.encodeBformat(dOut.encode()));
/* Wait for a single byte (for preparation) */
// byte[] k = [1];
// client.receive(k);
}
}
};
/* Start the server thread */
serverThread.start();
/* Open a socket to the server */
Socket conn = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
conn.connect(servSocket.localAddress);
/* Start the task manager */
TaskManager taskManager = new TaskManager(conn);
/* Create a Task to submit as a job */
TestTask testTask = new TestTask("Hello, world, this is a test message");
taskManager.submitTask(testTask);
}
public final class TaskManager : Thread
{
/**
* Job queue
*/
private DList!(Job) jobs;
private Mutex jobsLock;
/*
* Tristanable queue filter
*/
private Manager manager;
/**
* Event-loop
*/
private Engine eventEngine;
this(Socket socket)
{
super(&worker);
/* Initialize tristanable */
manager = new Manager(socket);
/* Initialize the event-loop */
eventEngine = new Engine();
/* Start the event engine */
eventEngine.start();
/* Initialize job queue lock */
jobsLock = new Mutex();
/* Start the thread */
start();
this.tManager = tristanableManager;
}
private void worker()
// TODO: Continue working on this
// TODO: Allow registering ResponseAnonymous with handlers etc
/**
* Takes a request and sends it through to the endpoint
* afterwhich we block for a response and when we get one
* we run the handler, specified by the original request,
* on the response data
*
* Params:
* req = the `Request` to send
*/
public void makeRequest(Request req)
{
while(true)
/* Get a unique queue */
Queue newQueue = tManager.getUniqueQueue();
/* Create a tagged message with the tag */
ulong tag = newQueue.getID();
TaggedMessage tReq = new TaggedMessage(tag, req.getRequestData());
/* Send the message */
tManager.sendMessage(tReq);
/* Does this Request expect a response? */
// TODO: We need not register the queue even if this is the case
if(req.expectsResponse())
{
/* Lock the job queue */
jobsLock.lock();
/* Await for a response */
byte[] resp = newQueue.dequeue().getPayload();
/* Clean list (list of jobs to be removed) */
Job[] cleanList;
// writeln("Task: Loop begin");
foreach(Job job; jobs)
{
writeln("Tasky: Job process begin ", job);
/* If the job is fulfilled */
if(job.isFulfilled())
{
/* Get the Event for dispatching */
Event dispatchEvent = job.getEventForDispatch();
writeln("Tasky: Job is fulfilled ", job);
/* Dispatch the event */
eventEngine.push(dispatchEvent);
/* Free the tristanable tag for this job */
job.complete();
/* Add job to the deletion queue */
cleanList ~= job;
}
}
/* Delete tje jobs */
foreach(Job job; cleanList)
{
jobs.linearRemoveElement(job);
}
/* Unlock the job queue */
jobsLock.unlock();
}
}
/**
* Job
*
* Represents an enqueued (in-progress) task with
* an associated tristanable tag
*
* Created by the task manager and not to be used
* by the user at all
*/
private final class Job
{
private Task task;
private Queue tristanableTag;
this(Task task, Queue tristanableTag)
{
this.task = task;
this.tristanableTag = tristanableTag;
/* Run the response handler with the response */
req.process(resp);
}
public Task getTask()
{
return task;
}
public DataMessage encode()
{
/* Get the Task's data to be sent */
byte[] taskPayload = task.getData();
/* Encode into tristanable format */
DataMessage tEncoded = new DataMessage(tristanableTag.getTag(), taskPayload);
return tEncoded;
}
public Event getEventForDispatch()
{
/* Dequeue the data from the tristanable queue */
QueueItem queueItem = tristanableTag.dequeue();
byte[] receivedData = queueItem.getData();
/* Parse into Event (based on the Job's task type) and return */
Event eventToDispatch = task.getEvent(receivedData);
return eventToDispatch;
}
public bool isFulfilled()
{
return tristanableTag.poll();
}
public void complete()
{
manager.removeQueue(tristanableTag);
}
}
/*
* Registers the type of Task by the Event it returns
*
* This is always called by `submitTask` but is only
* ever used once to
*/
public void registerTaskType(Task task)
{
/* Task typeID */
ulong typeID = task.getTypeID();
/* Get the EventHandler */
EventHandler handler = task.getHandler();
/* Check if there is already such a handler */
/* FIXME: This should (in eventy) take a ulong, semantics of taking in EVent give it a weird meaning */
bool signalExists = eventEngine.getSignalsForEvent(new Event(typeID)).length > 0;
/* If no such signal handler exists, then add it */
if(!signalExists)
{
Signal signalHandler = new Signal([typeID], handler);
eventEngine.addSignalHandler(signalHandler);
/* Because this happens at the same time and a queue for this type would exist add that too */
/* TODO: Make eventy crash if typeID for non-existent queue */
eventEngine.addQueue(typeID);
}
}
/**
* Submits a new Task, enqueues it as a job,
* sends the payload
*/
public void submitTask(Task task)
{
/* Get a unique tristanable ID for the new job */
Queue newQueue = manager.generateQueue();
/* If the queue generation was successful */
if(newQueue)
{
/* Register the task (if not already done) */
registerTaskType(task);
/* Create a new job */
Job newJob = new Job(task, newQueue);
/* Lock the job queue */
jobsLock.lock();
/* Enqueue the job */
jobs ~= newJob;
/* Unlock the job queue */
jobsLock.unlock();
/* Get the DataMessage of the job */
DataMessage jobDMessage = newJob.encode();
/* Encode for sending (bformat) */
byte[] bEncoded = encodeForSend(jobDMessage);
/* Send the payload */
manager.getSocket().send(bEncoded);
writeln("Tasky: Sent payload");
}
/* If unsuccessful, throw exception */
else
{
/* TODO: Add an exception */
}
/* Lock the jobs */
}
}
/**
* Represents a Task
*/
public abstract class Task
{
private byte[] data;
private ulong typeID;
private EventHandler handler;
/*
* Constructs a new Task with the given data to be
* sent and a typeID that reoresents which Signal
* handler to call
*/
this(byte[] data, ulong typeID, EventHandler handler)
{
this.data = data;
this.typeID = typeID;
this.handler = handler;
}
public byte[] getData()
{
return data;
}
public ulong getTypeID()
{
return typeID;
}
public EventHandler getHandler()
{
return handler;
}
/**
* Intended to take the received data from the Job's
* tristanable queue and decode it as per this Task's
* type
*/
public abstract Event getEvent(byte[] dataIn);
}
public final class TestTask : Task
{
this(string payloadOut)
{
super(cast(byte[])payloadOut, 69, &TestTaskHandlerFunc);
}
private static void TestTaskHandlerFunc(Event e)
{
import std.stdio;
writeln("<<<<Tasky task diapatched>>>>", e);
}
public override Event getEvent(byte[] dataIn)
{
auto event = new class Event
{
this()
{
/* TestTask is of type 69 for signal dispatching in Eventy */
super(getTypeID());
}
};
return event;
/* De-register the queue */
tManager.releaseQueue(newQueue);
}
}

View File

@ -1,3 +1,4 @@
module tasky;
public import tasky.engine;
public import tasky.engine;
public import tasky.request : Request, ResponseHandler;

38
source/tasky/request.d Normal file
View File

@ -0,0 +1,38 @@
module tasky.request;
import tristanable.encoding : TaggedMessage;
public alias ResponseHandler = void function(byte[]);
public abstract class Request
{
private byte[] requestMessage;
private ResponseHandler respFunc;
protected this(byte[] requestMessage, ResponseHandler respFunc)
{
this.requestMessage = requestMessage;
this.respFunc = respFunc;
}
protected this(byte[] requestMessage)
{
this(requestMessage, null);
}
package final byte[] getRequestData()
{
return requestMessage;
}
package final void process(byte[] responseData)
{
respFunc(responseData);
}
package final bool expectsResponse()
{
return respFunc !is null;
}
}