Compare commits

...

26 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire a51df3b699 Engine
- If a `Request` is not expecting a response then do not call `dequeue()` on the `Queue` registered nor `process`byte[])`
- Added a TODO comment
2023-05-04 14:17:35 +02:00
Tristan B. Velloza Kildaire 8387d0ee78 Request
- Implemented `expectsResponse()` which returns `false` if no response handler is assigned (implying no response is expected), `true` otherwise
2023-05-04 14:15:19 +02:00
Tristan B. Velloza Kildaire 7af21a909a Request
- Allow constructing WITHOUT a `ResponseHandler` (it remains as `null)
2023-05-04 11:04:32 +02:00
Tristan B. Velloza Kildaire 7072c852dc Request
- Removed now-completed TODO
2023-05-04 11:03:11 +02:00
Tristan B. Velloza Kildaire 93d6d49c20 - Clean up 2023-05-04 10:54:20 +02:00
Tristan B. Velloza Kildaire bb1dcef991 Engine
- `makeRequest(Request)` now cleans up by removing the adhoc `Queue` it registered in the beginning ofd the process
2023-05-04 09:55:00 +02:00
Tristan B. Velloza Kildaire 20da370f0e - Upgraded to `tristanable` `v3.2.0-beta` which introduces the required `releaseQueue(Queue)` and `releaseQueue_nothrow(Queue)` methods 2023-05-04 09:54:15 +02:00
Tristan B. Velloza Kildaire 4ef69ac670 Engine
- Documented `makeRequest(Request req)`
2023-05-03 21:58:06 +02:00
Tristan B. Velloza Kildaire 0b1c8242fd Engine
- Consumes a tristanable `Manager`
- Provides a `makeRequest(Request)` function which will generate a unique tristanable `Queue`, construct a `TaggedMessage` of the queue's ID with the gievn request data, it will then send the message via the tristanable `Manager`'s `sendMessage(TaggedMessage)` method, then await on the queue and lastly run the `Request`'s `ResponseHandler` function with the received data from the dequeued `TaggedMessage`

Request

- Defines a request with data to send and a function to handle a response
2023-05-03 21:20:19 +02:00
Tristan B. Velloza Kildaire bed096df27 - Removed direct dependency `bformat` and uneeded dependency `eventy` 2023-05-03 21:14:21 +02:00
Tristan B. Velloza Kildaire b0d0b19a4a - Added `tasky.engine` module
- Added `tasky.engine` module to the `tasky` package
2023-05-03 18:41:03 +02:00
Tristan B. Velloza Kildaire 0051099c1f - Updated the `.gitignore` 2023-05-03 18:40:27 +02:00
Tristan B. Velloza Kildaire 85a7558d79 - Upgraded tristanable from version `2.6.12` to the new beta `3.0.1-beta` 2023-04-05 15:51:25 +02:00
Tristan B. Velloza Kildaire 1cf310d92a - Updated `.gitignore`
- Removed `libtasky.a`
2023-03-19 18:20:51 +02:00
Tristan B. Velloza Kildaire 958e678ed2 - Removed `dub.selections.json` 2023-03-19 18:19:50 +02:00
Tristan B. Velloza Kildaire 852619f4a0 Dub
- Updated package copyright and authors
2023-03-19 18:11:23 +02:00
Tristan B. Velloza Kildaire b59bbf9c36 Restart 2023-03-19 18:09:58 +02:00
Tristan B. Velloza Kildaire 8e35e282c1 Descriptor must override a method that takes in a TaskyEvent as those will only ever be created, this means you no longer need to do an annoying cast from Event to TaskyEvent 2022-05-24 20:45:55 +02:00
Tristan B. Velloza Kildaire 726ad57706 Make TaskyEvent accessible at the module level 2022-05-24 20:28:28 +02:00
Tristan B. Velloza Kildaire 246adcc1f9 Added some rudimentary unittests that should have their order enforced
else stuff can go really wrong, these things run as part of a single process,
no restarts etc. Hence state is persisted across unittests
2022-05-24 19:53:26 +02:00
Tristan B. Velloza Kildaire 0d850d0508 Fixed compilation error by adding an implementation
for the DescriptorException used in jobs.d
2022-05-24 19:40:45 +02:00
Tristan B. Velloza Kildaire a7c18d5e10 Added ability to use a custom descriptor ID, throws exception if the given
descriptor ID is in use already
2022-05-24 19:39:49 +02:00
Tristan B. Velloza Kildaire 89347cb6d2 Future: Added addClass(ulong) to add a custom descriptor ID 2022-05-24 19:32:44 +02:00
Tristan B. Velloza Kildaire 2251a00284 Descriptor queue safety fix
Fixed a bug whereby if one were to get new Descriptor classes across
several threads then the Mutex for the descriptor queue would be reinitialzied
upon static initialization of the thread as the static initialization block
was not set to __gshared, causing a global re-write of a re-initted Mutex,
never the same Mutex
2022-05-24 19:23:46 +02:00
Tristan B. Velloza Kildaire 8ede79bf49 Future: Add support for fixed descriptor IDs 2022-05-24 19:20:39 +02:00
Tristan B. Velloza Kildaire bc6c90411a Added comment 2022-05-24 19:16:23 +02:00
10 changed files with 88 additions and 711 deletions

3
.gitignore vendored
View File

@ -13,3 +13,6 @@ tasky-test-*
*.o
*.obj
*.lst
dub.selections.json
.gitignore
libtasky.a

View File

@ -3,10 +3,9 @@ tasky
![](branding/logo.png)
## Creating a `Descriptor`
## What is it?
Before we can spawn any jobs one needs to first create a description
of the type of `Job` that will be spawned.
TODO: Describe here.
## Adding to your project

View File

@ -1,12 +1,10 @@
{
"authors": [
"Tristan B. Kildaire"
"Tristan B. Velloza Kildaire"
],
"copyright": "Copyright © 2021, Tristan B. Kildaire",
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>3.1.3",
"eventy": "0.2.4",
"tristanable": "2.6.12"
"tristanable": "3.2.0-beta"
},
"description": "Tagged network-message task engine",
"license": "LGPL v3",

View File

@ -1,8 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "3.1.3",
"eventy": "0.2.4",
"tristanable": "2.6.12"
}
}

View File

@ -1,327 +1,54 @@
/**
* Engine
*
* Contains the core components of the tasky
* library, this is effectively the entry
* point to the library
*/
module tasky.engine;
import eventy.engine : EvEngine = Engine;
import eventy.event : Event;
import tasky.jobs : Descriptor;
import tristanable;
import std.socket : Socket;
import core.thread : Thread, dur;
import tasky.exceptions : SessionError;
import tristanable : Manager, Queue, TaggedMessage;
import tasky.request : Request;
public final class Engine : Thread
public class Engine
{
/**
* Tristanable sub-system
*/
private Manager tmanager;
private Manager tManager;
/**
* Eventy sub-system
*/
private EvEngine evEngine;
this(Manager tristanableManager)
{
this.tManager = tristanableManager;
}
private bool running;
// TODO: Continue working on this
this(Socket socket)
{
/* Set the worker function */
super(&worker);
// TODO: Allow registering ResponseAnonymous with handlers etc
/* Create a new event engine */
evEngine = new EvEngine();
/* Create a new tristanable manager */
tmanager = new Manager(socket, dur!("msecs")(100), true);
}
/**
* Takes a request and sends it through to the endpoint
* afterwhich we block for a response and when we get one
* we run the handler, specified by the original request,
* on the response data
*
* Params:
* req = the `Request` to send
*/
public void makeRequest(Request req)
{
/* Get a unique queue */
Queue newQueue = tManager.getUniqueQueue();
/**
* Start the sub-systems
*/
private void startTasky()
{
/* Start the event engine */
evEngine.start();
/* Create a tagged message with the tag */
ulong tag = newQueue.getID();
TaggedMessage tReq = new TaggedMessage(tag, req.getRequestData());
/* Start the tristanable queue filter */
tmanager.start();
/* Send the message */
tManager.sendMessage(tReq);
/* Start the loop */
running = true;
}
/* Does this Request expect a response? */
// TODO: We need not register the queue even if this is the case
if(req.expectsResponse())
{
/* Await for a response */
byte[] resp = newQueue.dequeue().getPayload();
public class TaskyEvent : Event
{
private byte[] payload;
/* Run the response handler with the response */
req.process(resp);
}
this(ulong descID, byte[] payload)
{
super(descID);
this.payload = payload;
}
public byte[] getPayload()
{
return payload;
}
}
/**
* Worker thread function which checks the tristanable
* queues for whichever has messages on them and then
* dispatches a job-response for them via eventy
*/
private void worker()
{
/* Start all sub-systems */
startTasky();
while(running)
{
/**
* Loop through each queue, poll for
* any new data, pull off one item
* at most
*
* TODO: Different queuing systems
*/
Queue[] tQueues = tmanager.getQueues();
foreach(Queue tQueue; tQueues)
{
/* Descriptor ID */
ulong descID = tQueue.getTag();
try
{
/* Check if the queue has mail */
if(tQueue.poll())
{
/**
* Dequeue the data item and push
* event into the event loop containing
* it
*/
QueueItem data = tQueue.dequeue();
evEngine.push(new TaskyEvent(descID, data.getData()));
}
}
/* Catch the error when the underlying socket for Manager dies */
catch(ManagerError e)
{
/* TODO: We can only enablke this if off thread, doesn't make sense on thread, in other words it maybe makes sense */
/* TO call engine .run() that start a new thread seeing as thie point is to make this the backbone */
import std.stdio;
// writeln("YOO");
// throw new SessionError("Underlying socket (TManager) is dead");
// break;
}
}
/* TODO: Yield away somehow */
import core.thread : dur;
// sleep(dur!("msecs")(500));
}
}
/**
* Stop the task engine
*/
public void shutdown()
{
/* Stop the loop */
running = false;
/* TODO: Stop tristsnable (must be implemented in tristanable first) */
tmanager.shutdown();
/* TODO: Stop eventy (mjst be implemented in eventy first) */
evEngine.shutdown();
}
/**
* Register a Descriptor with tasky
*/
public void registerDescriptor(Descriptor desc)
{
/* Add a queue based on the descriptor ID */
evEngine.addQueue(desc.getDescriptorClass());
/* Add a signal handler that handles said descriptor ID */
evEngine.addSignalHandler(desc);
/* Create a new queue for this Job */
Queue tQueue = new Queue(tmanager, desc.getDescriptorClass());
/* Add the Queue to tristanable */
tmanager.addQueue(tQueue);
}
unittest
{
/* Results array for unit testing */
bool[4] results;
import std.conv : to;
import core.thread : dur;
import std.string : cmp;
import std.datetime.stopwatch : StopWatch;
bool runDone;
/* Job type */
Descriptor jobType = new class Descriptor {
public override void handler(Event e)
{
import std.stdio : writeln;
writeln("Event id ", e.id);
TaskyEvent eT = cast(TaskyEvent)e;
string data = cast(string)eT.payload;
writeln(data);
if(cmp(data, "Hello 1") == 0)
{
results[0] = true;
}
else if(cmp(data, "Hello 2") == 0)
{
results[1] = true;
}
}
};
ulong jobTypeDI = jobType.getDescriptorClass;
ulong job2C = 0;
/* Job type */
Descriptor jobType2 = new class Descriptor {
public override void handler(Event e)
{
import std.stdio : writeln;
writeln("Event id ", e.id);
writeln("OTHER event type");
TaskyEvent eT = cast(TaskyEvent)e;
string data = cast(string)eT.payload;
writeln(data);
// job2C++;
// assert(cmp(cast(string)eT.payload, ""))
if(cmp(data, "Bye-bye! 3") == 0)
{
results[2] = true;
}
else if(cmp(data, "Bye-bye! 4") == 0)
{
results[3] = true;
}
}
};
ulong jobTypeDI2 = jobType2.getDescriptorClass;
import std.socket;
import std.stdio;
Socket serverSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
serverSocket.bind(parseAddress("::1", 0));
Address serverAddress = serverSocket.localAddress();
Thread serverThread = new class Thread {
this()
{
super(&worker);
serverSocket.listen(0);
}
public void worker()
{
Socket clientSocket = serverSocket.accept();
sleep(dur!("seconds")(2));
import tristanable.encoding : DataMessage, encodeForSend;
DataMessage dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 1");
writeln("Server send 1: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 2");
writeln("Server send 2: ", clientSocket.send(encodeForSend(dMesg)));
sleep(dur!("seconds")(1));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 3");
writeln("Server send 3: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 4");
writeln("Server send 4: ", clientSocket.send(encodeForSend(dMesg)));
while(!runDone)
{
}
}
};
serverThread.start();
Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
clientSocket.connect(parseAddress("::1", to!(ushort)(serverAddress.toPortString())));
/* Create a new Tasky engine */
Engine e = new Engine(clientSocket);
/**
* Setup the job types that are wanted
*/
e.registerDescriptor(jobType);
e.registerDescriptor(jobType2);
/* Start the tasky engine */
e.start();
/**
* Await the expected result, but if this does not complete
* within 4 seconds then expect it failed
*/
StopWatch watch;
watch.start();
while(!results[0] || !results[1] || !results[2] || !results[3])
{
if(watch.peek() > dur!("seconds")(4))
{
runDone = true;
assert(false);
}
}
writeln("Got to done testcase");
runDone = true;
/* TODO: Shutdown tasky here (shutdown eventy and tristanable) */
// e.shutdown();
// clientSocket.close;
}
}
/* De-register the queue */
tManager.releaseQueue(newQueue);
}
}

View File

@ -1,36 +0,0 @@
/**
* Exceptions
*
* Base definitions for exceptions appear here
*/
module tasky.exceptions;
import std.exception;
public abstract class TaskyException : Exception
{
this(string msg)
{
super("TaskyException:"~msg);
}
}
public final class SubmissionException : TaskyException
{
this(string msg)
{
super("SubmissionException: "~msg);
}
}
/**
* Raised if the underlying socket dies (connection closes)
* or (TODO: check that Tasky shutdown does not cause this to weirdly go off by calling tmanager.shutdown())
*/
public final class SessionError : TaskyException
{
this(string msg)
{
super("SessionError: "~msg);
}
}

View File

@ -1,327 +0,0 @@
/**
* Jobs
*
* Contains tools for describing different types
* of jobs and what event handlers will be triggered
* for them along with the creation of actual
* Jobs (schedulable units).
*/
module tasky.jobs;
/* TODO: Remove this import */
import std.stdio;
import tasky.exceptions : TaskyException;
/* TODO: DList stuff */
import std.container.dlist;
import core.sync.mutex : Mutex;
import std.string : cmp;
import eventy.signal : Signal;
import eventy.event : Event;
/**
* A Job to be scheduled
*/
public final class Job
{
/**
* TODO: Comment
*/
private Descriptor descriptor;
private byte[] payload;
/**
* Returns the classification of this Job, i.e.
* its Descriptor number
*/
public ulong getJobTypeID()
{
return descriptor.getDescriptorClass();
}
private this(Descriptor jobType, byte[] payload)
{
/* TODO: Extract needed information from here */
this.descriptor = jobType;
this.payload = payload;
}
protected Job newJob(Descriptor jobType, byte[] payload)
{
/**
* This is mark protected for a reason, don't
* try and call this directly
*/
assert(jobType);
assert(payload.length);
return new Job(jobType, payload);
}
}
public final class JobException : TaskyException
{
this(string message)
{
super("(JobError) "~message);
}
}
/**
* Descriptor
*
* This represents a type of Job, represented
* by a unique ID. Along with this is an associated
* signal handler provided by the user which is
* to be run on completion of said Job
*/
public abstract class Descriptor : Signal
{
/**
* Descriptor ID reservation sub-system
*/
private static __gshared Mutex descQueueLock;
private static __gshared DList!(ulong) descQueue;
/**
* All descriptors (pool)
*/
private static __gshared DList!(Descriptor) descPool;
/**
* Descriptor data
*
* The signal handler that handles the running
* of any job associated with this Descriptor
*
* We should `alias can we?
*/
private immutable ulong descriptorClass;
/**
* Static initialization of the descriptor
* class ID queue's lock
*/
static this()
{
descQueueLock = new Mutex();
}
/* TODO: Static (and _gshared cross threads) ID tracker */
/**
* Checks whether a Descriptor class has been registered
* previously that has the same ID but not the same
* equality (i.e. not spawned from the same object)
*/
public static bool isDescriptorClass(Descriptor descriptor)
{
/* TODO: Add the implementation for this */
return false;
}
/**
* Returns true if the given descriptor ID is in
* use, false otherwise
*
* @param
*/
private static bool isDescIDInUse(ulong descID)
{
descQueueLock.lock();
foreach(ulong descIDCurr; descQueue)
{
if(descID == descIDCurr)
{
descQueueLock.unlock();
return true;
}
}
descQueueLock.unlock();
return false;
}
/**
* Test unique descriptor class ID generation
* and tracking
*/
unittest
{
ulong s1 = addDescQueue();
ulong s2 = addDescQueue();
assert(s1 != s2);
}
/**
* Finds the next valid descriptor class ID,
* reserves it and returns it
*/
private static ulong addDescQueue()
{
ulong descID;
descQueueLock.lock();
do
{
descID = generateDescID();
}
while(isDescIDInUse(descID));
descQueue ~= descID;
descQueueLock.unlock();
return descID;
}
/**
* Gneerates a Descriptor ID
*
* This returns a string that is a hash of
* the current time
*/
private static ulong generateDescID()
{
/* Get current time */
import std.datetime.systime : Clock;
string time = Clock.currTime().toString();
/* Get random number */
/* TODO: Get random number */
string randnum;
/* Create data string */
string data = time~randnum;
/* Calculate the hash */
import std.digest.sha;
import std.digest;
SHA1Digest sha = new SHA1Digest();
/**
* We will store the digest as the first 8
* bytes of the hash
*/
ulong digest;
ubyte[] hashDigest = sha.digest(data);
digest = *(cast(ulong*)hashDigest.ptr);
return digest;
}
/**
* Creates a new Descriptor
*/
this()
{
/* Grab a descripor ID */
descriptorClass = addDescQueue();
/**
* Setup a new Eventy Signal handler
* which handles only the typeID
* of `descriptorClass`
*/
super([descriptorClass]);
}
unittest
{
try
{
/**
* Create a uniqye Descriptor for a future
* Job that will run the function `test`
* on completion (reply)
*/
class DescTest : Descriptor
{
this()
{
}
public override void handler(Event e)
{
writeln("Event id ", e.id);
}
}
new DescTest();
assert(true);
}
catch(TaskyException)
{
assert(false);
}
}
/**
* Instantiates a Job based on this Descriptor
* ("Job template") with the given payload
* to be sent
*/
public final Job spawnJob(byte[] payload)
{
Job instantiatedDescriptor;
if(payload.length == 0)
{
throw new JobException("JobSpawnError: Empty payloads not allowed");
}
else
{
instantiatedDescriptor = new Job(this, payload);
}
return instantiatedDescriptor;
}
public final ulong getDescriptorClass()
{
return descriptorClass;
}
/**
* Override this to handle Event
*/
public abstract override void handler(Event e);
}

View File

@ -1,5 +1,4 @@
module tasky;
public import tasky.engine;
public import tasky.jobs;
public import tasky.exceptions;
public import tasky.request : Request, ResponseHandler;

38
source/tasky/request.d Normal file
View File

@ -0,0 +1,38 @@
module tasky.request;
import tristanable.encoding : TaggedMessage;
public alias ResponseHandler = void function(byte[]);
public abstract class Request
{
private byte[] requestMessage;
private ResponseHandler respFunc;
protected this(byte[] requestMessage, ResponseHandler respFunc)
{
this.requestMessage = requestMessage;
this.respFunc = respFunc;
}
protected this(byte[] requestMessage)
{
this(requestMessage, null);
}
package final byte[] getRequestData()
{
return requestMessage;
}
package final void process(byte[] responseData)
{
respFunc(responseData);
}
package final bool expectsResponse()
{
return respFunc !is null;
}
}

16
todo.md
View File

@ -1,16 +0,0 @@
# TODO
I think when we throw that session exception (even though that is the wrong) place,
by throwing we may throw it when the next queue we'd iterate to was filled and we looped
pver something flushed, but throw because invalid (now) queue operation
We should think about this and maybe chekc rather using a public, isAlice, and if so wthen finish up to try read as
much as possible from the queues even when socket is dead but THEN never loop again (kill the Tasky loop).
It is always passing, but I should consider the above as it is a possible case.
But I do have something hanging, I need to check what that is, I believe wait that may be tristanable threads etc, I don't think newSys is erroring correctly
It might with or without newSys be hanging, socket error shit fr do be annoying me, idk needs more testing.
wait no shit looks fine lmao, we are reaching `"Err"` which is good