Removed old files

This commit is contained in:
Tristan B. Velloza Kildaire 2022-11-22 17:55:19 +02:00
parent 2ad4e92792
commit 3ea8e90a94
13 changed files with 41 additions and 937 deletions

View File

@ -1,19 +0,0 @@
tasky
=====
![](branding/logo.png)
## Creating a `Descriptor`
Before we can spawn any jobs one needs to first create a description
of the type of `Job` that will be spawned.
## Adding to your project
The tasky library is [available on DUB](https://code.dlang.org/packages/tasky) and can easily
be added to your project ny running the following command:
```bash
dub add tasky
```

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

5
docs/api.md Normal file
View File

@ -0,0 +1,5 @@
API
===
This document contains documentation on each user-facing component of the Tasky library.

6
docs/index.md Normal file
View File

@ -0,0 +1,6 @@
# Tasky
Welcome to the Tasky project homepage. Here you will find all information pertaining to the library - how to use it in your projects, API documentation and how you can
contribute!

30
docs/setup.md Normal file
View File

@ -0,0 +1,30 @@
Setting up
==========
Setting up your project to be able to use the Tasky framework is a very easy process. For this you will need the
D package manager, [`dub`](https://code.dlang.org/), installed. If you had installed the D programming language
via the [`dmd`](https://dlang.org/download.html#dmd) package then dub would already be installed along with it.
### Setting up dub repository
Once that is done, you will want to initialize your current project directory as a dub repository by running the
following and then filling in the queries as you are prompted:
```bash
cd my-project/
dub init
```
### Adding Tasky
You can now add the Tasky library dependency to your project with one command:
```
dub add tasky
```
Upon your next build (via dub) the package will be fetched, however to force it _now_ just run:
```
dub build
```

View File

@ -1,15 +0,0 @@
{
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2021, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>3.1.3",
"eventy": "0.2.4",
"tristanable": "2.6.12"
},
"description": "Tagged network-message task engine",
"license": "LGPL v3",
"name": "tasky",
"targetType": "library"
}

View File

@ -1,8 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "3.1.3",
"eventy": "0.2.4",
"tristanable": "2.6.12"
}
}

View File

@ -1,325 +0,0 @@
/**
* Engine
*
* Contains the core components of the tasky
* library, this is effectively the entry
* point to the library
*/
module tasky.engine;
import eventy.engine : EvEngine = Engine;
import eventy.event : Event;
import tasky.jobs : Descriptor;
import tristanable;
import std.socket : Socket;
import core.thread : Thread, dur;
import tasky.exceptions : SessionError;
public class TaskyEvent : Event
{
private byte[] payload;
this(ulong descID, byte[] payload)
{
super(descID);
this.payload = payload;
}
public byte[] getPayload()
{
return payload;
}
}
public final class Engine : Thread
{
/**
* Tristanable sub-system
*/
private Manager tmanager;
/**
* Eventy sub-system
*/
private EvEngine evEngine;
private bool running;
this(Socket socket)
{
/* Set the worker function */
super(&worker);
/* Create a new event engine */
evEngine = new EvEngine();
/* Create a new tristanable manager */
tmanager = new Manager(socket, dur!("msecs")(100), true);
}
/**
* Start the sub-systems
*/
private void startTasky()
{
/* Start the event engine */
evEngine.start();
/* Start the tristanable queue filter */
tmanager.start();
/* Start the loop */
running = true;
}
/**
* Worker thread function which checks the tristanable
* queues for whichever has messages on them and then
* dispatches a job-response for them via eventy
*/
private void worker()
{
/* Start all sub-systems */
startTasky();
while(running)
{
/**
* Loop through each queue, poll for
* any new data, pull off one item
* at most
*
* TODO: Different queuing systems
*/
Queue[] tQueues = tmanager.getQueues();
foreach(Queue tQueue; tQueues)
{
/* Descriptor ID */
ulong descID = tQueue.getTag();
try
{
/* Check if the queue has mail */
if(tQueue.poll())
{
/**
* Dequeue the data item and push
* event into the event loop containing
* it
*/
QueueItem data = tQueue.dequeue();
evEngine.push(new TaskyEvent(descID, data.getData()));
}
}
/* Catch the error when the underlying socket for Manager dies */
catch(ManagerError e)
{
/* TODO: We can only enablke this if off thread, doesn't make sense on thread, in other words it maybe makes sense */
/* TO call engine .run() that start a new thread seeing as thie point is to make this the backbone */
import std.stdio;
// writeln("YOO");
// throw new SessionError("Underlying socket (TManager) is dead");
// break;
}
}
/* TODO: Yield away somehow */
import core.thread : dur;
// sleep(dur!("msecs")(500));
}
}
/**
* Stop the task engine
*/
public void shutdown()
{
/* Stop the loop */
running = false;
/* TODO: Stop tristsnable (must be implemented in tristanable first) */
tmanager.shutdown();
/* TODO: Stop eventy (mjst be implemented in eventy first) */
evEngine.shutdown();
}
/**
* Register a Descriptor with tasky
*/
public void registerDescriptor(Descriptor desc)
{
/* Add a queue based on the descriptor ID */
evEngine.addQueue(desc.getDescriptorClass());
/* Add a signal handler that handles said descriptor ID */
evEngine.addSignalHandler(desc);
/* Create a new queue for this Job */
Queue tQueue = new Queue(tmanager, desc.getDescriptorClass());
/* Add the Queue to tristanable */
tmanager.addQueue(tQueue);
}
unittest
{
/* Results array for unit testing */
bool[4] results;
import std.conv : to;
import core.thread : dur;
import std.string : cmp;
import std.datetime.stopwatch : StopWatch;
bool runDone;
/* Job type */
Descriptor jobType = new class Descriptor {
public override void handler_TaskyEvent(TaskyEvent e)
{
import std.stdio : writeln;
writeln("Event id ", e.id);
string data = cast(string)e.payload;
writeln(data);
if(cmp(data, "Hello 1") == 0)
{
results[0] = true;
}
else if(cmp(data, "Hello 2") == 0)
{
results[1] = true;
}
}
};
ulong jobTypeDI = jobType.getDescriptorClass;
ulong job2C = 0;
/* Job type */
Descriptor jobType2 = new class Descriptor {
public override void handler_TaskyEvent(TaskyEvent e)
{
import std.stdio : writeln;
writeln("Event id ", e.id);
writeln("OTHER event type");
string data = cast(string)e.payload;
writeln(data);
// job2C++;
// assert(cmp(cast(string)eT.payload, ""))
if(cmp(data, "Bye-bye! 3") == 0)
{
results[2] = true;
}
else if(cmp(data, "Bye-bye! 4") == 0)
{
results[3] = true;
}
}
};
ulong jobTypeDI2 = jobType2.getDescriptorClass;
import std.socket;
import std.stdio;
Socket serverSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
serverSocket.bind(parseAddress("::1", 0));
Address serverAddress = serverSocket.localAddress();
Thread serverThread = new class Thread {
this()
{
super(&worker);
serverSocket.listen(0);
}
public void worker()
{
Socket clientSocket = serverSocket.accept();
sleep(dur!("seconds")(2));
import tristanable.encoding : DataMessage, encodeForSend;
DataMessage dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 1");
writeln("Server send 1: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI, cast(byte[])"Hello 2");
writeln("Server send 2: ", clientSocket.send(encodeForSend(dMesg)));
sleep(dur!("seconds")(1));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 3");
writeln("Server send 3: ", clientSocket.send(encodeForSend(dMesg)));
dMesg = new DataMessage(jobTypeDI2, cast(byte[])"Bye-bye! 4");
writeln("Server send 4: ", clientSocket.send(encodeForSend(dMesg)));
while(!runDone)
{
}
}
};
serverThread.start();
Socket clientSocket = new Socket(AddressFamily.INET6, SocketType.STREAM, ProtocolType.TCP);
clientSocket.connect(parseAddress("::1", to!(ushort)(serverAddress.toPortString())));
/* Create a new Tasky engine */
Engine e = new Engine(clientSocket);
/**
* Setup the job types that are wanted
*/
e.registerDescriptor(jobType);
e.registerDescriptor(jobType2);
/* Start the tasky engine */
e.start();
/**
* Await the expected result, but if this does not complete
* within 4 seconds then expect it failed
*/
StopWatch watch;
watch.start();
while(!results[0] || !results[1] || !results[2] || !results[3])
{
if(watch.peek() > dur!("seconds")(4))
{
runDone = true;
assert(false);
}
}
writeln("Got to done testcase");
runDone = true;
/* TODO: Shutdown tasky here (shutdown eventy and tristanable) */
// e.shutdown();
// clientSocket.close;
}
}

View File

@ -1,44 +0,0 @@
/**
* Exceptions
*
* Base definitions for exceptions appear here
*/
module tasky.exceptions;
import std.exception;
public abstract class TaskyException : Exception
{
this(string msg)
{
super("TaskyException:"~msg);
}
}
public final class SubmissionException : TaskyException
{
this(string msg)
{
super("SubmissionException: "~msg);
}
}
public final class DescriptorException : TaskyException
{
this(string msg)
{
super("DescriptorException: "~msg);
}
}
/**
* Raised if the underlying socket dies (connection closes)
* or (TODO: check that Tasky shutdown does not cause this to weirdly go off by calling tmanager.shutdown())
*/
public final class SessionError : TaskyException
{
this(string msg)
{
super("SessionError: "~msg);
}
}

View File

@ -1,505 +0,0 @@
/**
* Jobs
*
* Contains tools for describing different types
* of jobs and what event handlers will be triggered
* for them along with the creation of actual
* Jobs (schedulable units).
*/
module tasky.jobs;
/* TODO: Remove this import */
import std.stdio;
import tasky.exceptions : TaskyException, DescriptorException;
/* TODO: DList stuff */
import std.container.dlist;
import core.sync.mutex : Mutex;
import std.string : cmp;
import eventy.signal : Signal;
import eventy.event : Event;
// <<<<<<< Updated upstream
import std.conv : to;
// =======
import tasky.engine : Engine, TaskyEvent;
// >>>>>>> Stashed changes
/**
* A Job to be scheduled
*/
public final class Job
{
/**
* TODO: Comment
*/
private Descriptor descriptor;
private byte[] payload;
/**
* Returns the classification of this Job, i.e.
* its Descriptor number
*/
public ulong getJobTypeID()
{
return descriptor.getDescriptorClass();
}
private this(Descriptor jobType, byte[] payload)
{
/* TODO: Extract needed information from here */
this.descriptor = jobType;
this.payload = payload;
}
protected Job newJob(Descriptor jobType, byte[] payload)
{
/**
* This is mark protected for a reason, don't
* try and call this directly
*/
assert(jobType);
assert(payload.length);
return new Job(jobType, payload);
}
}
public final class JobException : TaskyException
{
this(string message)
{
super("(JobError) "~message);
}
}
/**
* Descriptor
*
* This represents a type of Job, represented
* by a unique ID. Along with this is an associated
* signal handler provided by the user which is
* to be run on completion of said Job
*
* TODO: Add support for custom IDs
*/
public abstract class Descriptor : Signal
{
/**
* Descriptor ID reservation sub-system
*/
private static __gshared Mutex descQueueLock;
private static __gshared DList!(ulong) descQueue;
/**
* All descriptors (pool)
*/
private static __gshared DList!(Descriptor) descPool;
/**
* Descriptor data
*
* The signal handler that handles the running
* of any job associated with this Descriptor
*
* We should `alias can we?
*/
private immutable ulong descriptorClass;
/**
* Static initialization of the descriptor
* class ID queue's lock
*/
__gshared static this()
{
descQueueLock = new Mutex();
}
/* TODO: Static (and _gshared cross threads) ID tracker */
/**
* Checks whether a Descriptor class has been registered
* previously that has the same ID but not the same
* equality (i.e. not spawned from the same object)
*/
public static bool isDescriptorClass(Descriptor descriptor)
{
/* TODO: Add the implementation for this */
return false;
}
/**
* Returns true if the given descriptor ID is in
* use, false otherwise
*
* @param
*/
private static bool isDescIDInUse(ulong descID)
{
descQueueLock.lock();
foreach(ulong descIDCurr; descQueue)
{
if(descID == descIDCurr)
{
descQueueLock.unlock();
return true;
}
}
descQueueLock.unlock();
return false;
}
/**
* Test unique descriptor class ID generation
* and tracking
*/
unittest
{
ulong s1 = addDescQueue();
ulong s2 = addDescQueue();
assert(s1 != s2);
}
/**
* Finds the next valid descriptor class ID,
* reserves it and returns it
*/
private static ulong addDescQueue()
{
ulong descID;
descQueueLock.lock();
do
{
descID = generateDescID();
}
while(isDescIDInUse(descID));
descQueue ~= descID;
descQueueLock.unlock();
return descID;
}
/**
* Gneerates a Descriptor ID
*
* This returns a string that is a hash of
* the current time
*/
private static ulong generateDescID()
{
/* Get current time */
import std.datetime.systime : Clock;
string time = Clock.currTime().toString();
/* Get random number */
/* TODO: Get random number */
string randnum;
/* Create data string */
string data = time~randnum;
/* Calculate the hash */
import std.digest.sha;
import std.digest;
SHA1Digest sha = new SHA1Digest();
/**
* We will store the digest as the first 8
* bytes of the hash
*/
ulong digest;
ubyte[] hashDigest = sha.digest(data);
digest = *(cast(ulong*)hashDigest.ptr);
return digest;
}
/**
* Creates a new Descriptor
*
* FIXME: What if we cannot get a valid ID? Throw an exception
*/
this()
{
/* Grab a descriptor ID */
descriptorClass = addDescQueue();
/**
* Setup a new Eventy Signal handler
* which handles only the typeID
* of `descriptorClass`
*/
super([descriptorClass]);
}
/**
* Given a descriptor class this will attempt adding it,
* on failure false is returned, on sucess, true
*/
private bool addClass(ulong descriptorClass)
{
bool status;
descQueueLock.lock();
/* Check if we can add it */
if(!isDescIDInUse(descriptorClass))
{
/* Add it to the ID queue */
descQueue ~= descriptorClass;
/* Set status to successful */
status = true;
}
else
{
/* Set status to failure */
status = false;
}
descQueueLock.unlock();
return status;
}
/**
* Creates a new Descriptor (with a given fixed descriptor class)
*
* TODO: Future support (add this in after TaskyEvent things)
*/
this(ulong descriptorClass)
{
/* Attempt adding */
if(addClass(descriptorClass))
{
/* Set the descriptor ID */
this.descriptorClass = descriptorClass;
}
else
{
/* Throw an exception if the ID is already in use */
throw new DescriptorException("Given ID '"~to!(string)(descriptorClass)~"' is already in use");
}
/**
* Setup a new Eventy Signal handler
* which handles only the typeID
* of `descriptorClass`
*/
super([descriptorClass]);
}
/**
* Tests custom descriptor IDs
*
* FIXME: The unittest ordering to be kinda important
*/
unittest
{
/**
* Add a descriptor with ID 1, this should pass
*/
try
{
class DescTest : Descriptor
{
this()
{
super(1);
}
public override void handler_TaskyEvent(TaskyEvent e)
{
writeln("Event id ", e.id);
}
}
Descriptor newDesc = new DescTest();
assert(newDesc.getDescriptorClass() == 1);
}
catch(DescriptorException e)
{
assert(false);
}
}
unittest
{
/**
* Add a descriptor with ID 2, this should pass
*/
try
{
class DescTest : Descriptor
{
this()
{
super(2);
}
public override void handler_TaskyEvent(TaskyEvent e)
{
writeln("Event id ", e.id);
}
}
Descriptor newDesc = new DescTest();
assert(newDesc.getDescriptorClass() == 2);
}
catch(DescriptorException e)
{
assert(false);
}
}
unittest
{
/**
* Add a descriptor with ID 2, this should pass
*/
try
{
class DescTest : Descriptor
{
this()
{
super(2);
}
public override void handler_TaskyEvent(TaskyEvent e)
{
writeln("Event id ", e.id);
}
}
Descriptor newDesc = new DescTest();
assert(false);
}
catch(DescriptorException e)
{
assert(true);
}
}
unittest
{
try
{
/**
* Create a uniqye Descriptor for a future
* Job that will run the function `test`
* on completion (reply)
*/
class DescTest : Descriptor
{
this()
{
}
public override void handler_TaskyEvent(TaskyEvent e)
{
writeln("Event id ", e.id);
}
}
new DescTest();
assert(true);
}
catch(TaskyException)
{
assert(false);
}
}
/**
* Instantiates a Job based on this Descriptor
* ("Job template") with the given payload
* to be sent
*/
public final Job spawnJob(byte[] payload)
{
Job instantiatedDescriptor;
if(payload.length == 0)
{
throw new JobException("JobSpawnError: Empty payloads not allowed");
}
else
{
instantiatedDescriptor = new Job(this, payload);
}
return instantiatedDescriptor;
}
public final ulong getDescriptorClass()
{
return descriptorClass;
}
/**
* Override this to handle Event
*
* TODO: This should be final and non-abstract
* and take in `Event e`, then the suer overrides
* one that takes in a TaskyEvent, this handler
* must call that one and THAT one must be abstract.
*
* This would make a lot more sense seeing how we
* always want to pack data.
*
* TODO: Make this named _entry and other named handler
*
*/
public final override void handler(Event e)
{
handler_TaskyEvent(cast(TaskyEvent)e);
}
public abstract void handler_TaskyEvent(TaskyEvent e);
}

View File

@ -1,5 +0,0 @@
module tasky;
public import tasky.engine;
public import tasky.jobs;
public import tasky.exceptions;

16
todo.md
View File

@ -1,16 +0,0 @@
# TODO
I think when we throw that session exception (even though that is the wrong) place,
by throwing we may throw it when the next queue we'd iterate to was filled and we looped
pver something flushed, but throw because invalid (now) queue operation
We should think about this and maybe chekc rather using a public, isAlice, and if so wthen finish up to try read as
much as possible from the queues even when socket is dead but THEN never loop again (kill the Tasky loop).
It is always passing, but I should consider the above as it is a possible case.
But I do have something hanging, I need to check what that is, I believe wait that may be tristanable threads etc, I don't think newSys is erroring correctly
It might with or without newSys be hanging, socket error shit fr do be annoying me, idk needs more testing.
wait no shit looks fine lmao, we are reaching `"Err"` which is good