Restarting project

This commit is contained in:
Tristan B. Velloza Kildaire 2023-02-26 21:55:13 +02:00
parent 586835a627
commit 17da826d07
21 changed files with 20 additions and 783 deletions

View File

@ -2,13 +2,12 @@
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>3.1.3"
"libsnooze": "0.2.5"
},
"description": "Tag-based asynchronous messaging framework",
"license": "LGPL-3.0",
"name": "tristanable",
"targetType": "library",
"sourcePaths": ["source/tristanable"]
"targetType": "library"
}

View File

@ -1,6 +1,7 @@
{
"fileVersion": 1,
"versions": {
"bformat": "3.1.3"
"bformat": "3.1.3",
"libsnooze": "0.2.5"
}
}

View File

@ -1,15 +0,0 @@
.dub
docs.json
__dummy.html
docs/
/example
example.so
example.dylib
example.dll
example.a
example.lib
example-test-*
*.exe
*.o
*.obj
*.lst

View File

@ -1,12 +0,0 @@
{
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"dependencies": {
"tristanable": "~>2.2.1"
},
"description": "A minimal D application.",
"license": "proprietary",
"name": "example"
}

View File

@ -1,7 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "1.0.8",
"tristanable": "2.2.1"
}
}

View File

@ -1,59 +0,0 @@
import std.stdio;
import tristanable.manager : Manager;
import std.socket;
import core.thread;
void main()
{
writeln("Edit source/app.d to start your project.");
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
socket.connect(parseAddress("127.0.0.1",7777));
Manager manager = new Manager(socket);
class bruh : Thread
{
this()
{
super(&run);
}
private void run()
{
while(true)
{
manager.lockQueue();
writeln(manager.getQueue());
manager.unlockQueue();
import core.thread;
Thread.sleep(dur!("seconds")(1));
}
}
}
new bruh().start();
manager.sendMessage(69, [77]);
manager.sendMessage(70, [78]);
byte[] receivedKaka = manager.receiveMessage(69);
writeln(receivedKaka);
receivedKaka = manager.receiveMessage(70);
writeln(receivedKaka);
manager.sendMessage(70, [78]);
receivedKaka = manager.receiveMessage(70);
writeln(receivedKaka);
}

View File

View File

@ -1,15 +0,0 @@
.dub
docs.json
__dummy.html
docs/
/server
server.so
server.dylib
server.dll
server.a
server.lib
server-test-*
*.exe
*.o
*.obj
*.lst

View File

@ -1,13 +0,0 @@
{
"authors": [
"Tristan B. Kildaire"
],
"copyright": "Copyright © 2020, Tristan B. Kildaire",
"dependencies": {
"bformat": "~>1.0.8",
"tristanable": "~>2.2.1"
},
"description": "A minimal D application.",
"license": "proprietary",
"name": "server"
}

View File

@ -1,7 +0,0 @@
{
"fileVersion": 1,
"versions": {
"bformat": "1.0.8",
"tristanable": "2.2.1"
}
}

View File

@ -1,37 +0,0 @@
import std.stdio;
import std.socket;
import tristanable.encoding : DataMessage;
import bmessage;
import core.thread;
void main()
{
writeln("Edit source/app.d to start your project.");
Socket socket = new Socket(AddressFamily.INET, SocketType.STREAM, ProtocolType.TCP);
socket.bind(parseAddress("127.0.0.1",7777));
socket.listen(1);
Socket conn = socket.accept();
byte[] receivedData;
while(true)
{
receiveMessage(conn, receivedData);
DataMessage message = DataMessage.decode(receivedData);
writeln("Tag: ", message.tag);
writeln("Data: ", message.data);
DataMessage d = new DataMessage(70, [2]);
sendMessage(conn, d.encode());
d = new DataMessage(69, [1]);
Thread.sleep(dur!("seconds")(5));
sendMessage(conn, d.encode());
}
}

View File

@ -1,67 +0,0 @@
module tristanable.encoding;
public final class DataMessage
{
private ulong tag;
private byte[] data;
public static DataMessage decode(byte[] bytes)
{
/* Fetch the `tag` */
ulong receivedTag = *(cast(ulong*)bytes.ptr);
/* Fetch the `data` */
byte[] receivedData = bytes[8..bytes.length];
return new DataMessage(receivedTag, receivedData);
}
/**
* Constructs a new DataMessage with
* the give `tag` and bytes `data`
*/
this(ulong tag, byte[] data)
{
this.tag = tag;
this.data = data;
}
public byte[] encode()
{
/* Construct the message array */
byte[] messageData;
/* Add the `tag` bytes */
messageData ~= *(cast(byte*)&tag);
messageData ~= *(cast(byte*)&tag+1);
messageData ~= *(cast(byte*)&tag+2);
messageData ~= *(cast(byte*)&tag+3);
messageData ~= *(cast(byte*)&tag+4);
messageData ~= *(cast(byte*)&tag+5);
messageData ~= *(cast(byte*)&tag+6);
messageData ~= *(cast(byte*)&tag+7);
/* Add the `data` bytes (the actual message) */
messageData ~= data;
return messageData;
}
public byte[] getData()
{
return data;
}
public ulong getTag()
{
return tag;
}
}
public static byte[] encodeForSend(DataMessage message)
{
import bmessage;
return encodeBformat(message.encode());
}

View File

@ -1,29 +0,0 @@
module tristanable.exceptions;
import tristanable.manager;
import tristanable.queue : Queue;
public final class TristanableException : Exception
{
this(Manager manager, string message)
{
super(generateMessage(message));
}
private string generateMessage(string errMesg)
{
string msg;
// msg = "TRistanable failure: "~errMesg~"\n\n";
// msg ~= "Queue stats:\n\n"
// Queue[] queues = manager.getQueues();
// foreach(Queue queue; queues)
// {
// msg ~= "Queue["~to!(string)(queue.getTag())~"]: "~
// }
// msg ~= manager.getQueues()
return msg;
}
}

View File

@ -1,217 +1,10 @@
module tristanable.manager;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
import tristanable.queue : Queue;
import tristanable.watcher;
import std.container.dlist;
import tristanable.exceptions;
/**
* Manager
*
* This is the core class that is to be instantiated
* that represents an instance of the tristanable
* framework. It is passed a Socket from which it
* reads from (using a bformat block reader).
*
* It contains a Watcher which does the reading and
* appending to respective queues (the user need not
* worry about this factum).
*
* The functions provided allow users to wait in a
* tight loop to dequeue ("receive" in a blcoking mannger)
* from a specified queue.
*/
public final class Manager
/**
* Allows one to add new queues, control
* existing ones by waiting on them etc
*/
public class Manager
{
/* All queues */
private DList!(Queue) queues;
private Mutex queuesLock;
/* TODO Add drop queue? */
/**
* The remote host
*/
private Socket socket;
private Watcher watcher;
/**
* Constructs a new Manager with the given
* endpoint Socket
*
*/
this(Socket socket)
{
/* TODO: Make sure the socket is in STREAM mode */
/* Set the socket */
this.socket = socket;
/* Initialize the queues mutex */
queuesLock = new Mutex();
/* Initialize the watcher */
watcher = new Watcher(this, socket);
}
public Queue getQueue(ulong tag)
{
Queue matchingQueue;
queuesLock.lock();
foreach(Queue queue; queues)
{
if(queue.getTag() == tag)
{
matchingQueue = queue;
break;
}
}
queuesLock.unlock();
return matchingQueue;
}
/* TODO: Probably remove this or keep it */
public bool isValidTag(ulong tag)
{
return !(getQueue(tag) is null);
}
/**
* Returns a new queue with a new ID,
* if all IDs are used then it returns
* null
*
* Use this if you don't care about reserving
* queues IDs and just want a throwaway queue
*
* FIXME: All tags in use, this won't handle it
*/
public Queue generateQueue()
{
/* Newly generated queue */
Queue newQueue;
queuesLock.lock();
ulong curGuess = 0;
bool bad = true;
reguess: while(bad)
{
if(isValidTag(curGuess))
{
curGuess++;
continue reguess;
}
bad = false;
}
/* Create the new queue with the free id found */
newQueue = new Queue(curGuess);
/* Add the queue (recursive mutex) */
addQueue(newQueue);
queuesLock.unlock();
return newQueue;
}
public Queue[] getQueues()
{
Queue[] queues;
queuesLock.lock();
foreach(Queue queue; this.queues)
{
queues ~= queue;
}
queuesLock.unlock();
return queues;
}
/**
* Removes the given Queue, `queue`, from the manager
*
* Throws a TristanableException if the id of the
* queue wanting to be removed is not in use by any
* queue already added
*/
public void removeQueue(Queue queue)
{
queuesLock.lock();
/* Make sure such a tag exists */
if(isValidTag(queue.getTag()))
{
queues.linearRemoveElement(queue);
}
else
{
/* Unlock queue before throwing an exception */
queuesLock.unlock();
throw new TristanableException(this, "Cannot remove a queue with an id not in use");
}
queuesLock.unlock();
}
/**
* Adds the given Queue, `queue`, to the manager
*
* Throws a TristanableException if the id of the
* queue wanting to be added is already in use by
* another already added queue
*/
public void addQueue(Queue queue)
{
queuesLock.lock();
/* Make sure such a tag does not exist already */
if(!isValidTag(queue.getTag()))
{
queues ~= queue;
}
else
{
/* Unlock queue before throwing an exception */
queuesLock.unlock();
throw new TristanableException(this, "Cannot add queue with id already in use");
}
queuesLock.unlock();
}
public Socket getSocket()
{
return socket;
}
/**
* TODO: Comment
* TODO: Testing
*/
public void shutdown()
{
/* TODO: Implement me */
/* Make the loop stop whenever it does */
watcher.shutdown();
/* Wait for the thread to end */
watcher.join();
}
}

View File

@ -1,7 +1,2 @@
module tristanable;
public import tristanable.encoding;
public import tristanable.manager;
public import tristanable.queue;
public import tristanable.queueitem;
public import tristanable.watcher;

View File

@ -1,172 +1,18 @@
/**
* Queue
*
* Represents a queue with a tag.
*
* Any messages that are received with
* the matching tag (to this queue) are
* then enqueued to this queue
*/
module tristanable.queue;
import tristanable.queueitem : QueueItem;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
import core.thread : Thread;
import std.container.dlist;
import std.range : walkLength;
public enum QueuePolicy : ubyte
public class Queue
{
LENGTH_CAP = 1
}
private this()
{
public final class Queue
{
/* This queue's tag */
private ulong tag;
}
/* The queue */
private DList!(QueueItem) queue;
public static Queue newQueue(ulong queueID)
{
Queue queue;
/* The queue mutex */
private Mutex queueLock;
// TODO: Implement me
/**
* Construct a new queue with the given
* tag
*/
this(ulong tag, QueuePolicy flags = cast(QueuePolicy)0)
{
this.tag = tag;
/* Initialize the mutex */
queueLock = new Mutex();
this.flags = flags;
}
public void setLengthCap(ulong lengthCap)
{
this.lengthCap = lengthCap;
}
public ulong getLengthCap(ulong lengthCap)
{
return lengthCap;
}
/**
* Queue policy settings
*/
private ulong lengthCap = 1;
private QueuePolicy flags;
public void enqueue(QueueItem item)
{
/* Lock the queue */
queueLock.lock();
/**
* Check to see if the queue has a length cap
*
* If so then determine whether to drop or
* keep dependent on current capacity
*/
if(flags & QueuePolicy.LENGTH_CAP)
{
if(walkLength(queue[]) == lengthCap)
{
goto unlock;
}
}
/* Add it to the queue */
queue ~= item;
unlock:
/* Unlock the queue */
queueLock.unlock();
}
/**
* Returns true if this queue has items ready
* to be dequeued, false otherwise
*/
public bool poll()
{
/* Status */
bool status;
/* Lock the queue */
queueLock.lock();
status = !queue.empty();
/* Unlock the queue */
queueLock.unlock();
return status;
}
/**
* Attempts to coninuously dequeue the
* head of the queue
*
* TODO: Add a timeout capability
* TODO: Add tryLock, yield on failure (with loop for recheck ofc)
* TODO: Possible multiple dequeue feature? Like .receive
*/
public QueueItem dequeue()
{
/* The head of the queue */
QueueItem queueHead;
while(!queueHead)
{
/* Lock the queue */
queueLock.lock();
/* Check if we can dequeue anything */
if(!queue.empty())
{
/* If we can then dequeue */
queueHead = queue.front();
queue.removeFront();
/* Chop off the head */
// offWithTheHead();
}
/* Unlock the queue */
queueLock.unlock();
/**
* Move away from this thread, let
* the watcher (presumably) try
* access our queue (successfully)
* by getting a lock on it
*
* Prevents us possibly racing back
* and locking queue again hence
* starving the system
*/
Thread.getThis().yield();
}
return queueHead;
}
/**
* Returns the tag for this queue
*/
public ulong getTag()
{
return tag;
}
return queue;
}
}

View File

@ -1,18 +0,0 @@
module tristanable.queueitem;
public final class QueueItem
{
/* This item's data */
private byte[] data;
/* TODO: */
this(byte[] data)
{
this.data = data;
}
public byte[] getData()
{
return data;
}
}

View File

@ -1,107 +0,0 @@
module tristanable.watcher;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : receiveMessage;
import tristanable.queue : Queue;
import tristanable.queueitem : QueueItem;
import tristanable.manager : Manager;
import core.thread : Thread;
import tristanable.encoding;
import tristanable.exceptions;
public final class Watcher : Thread
{
/* The manager */
private Manager manager;
/* The socket to read from */
private Socket socket;
private bool running;
this(Manager manager, Socket endpoint)
{
super(&run);
this.manager = manager;
socket = endpoint;
running = true;
start();
}
public void shutdown()
{
running=false;
/* Close the socket, causing an error, breaking the event loop */
socket.close();
}
private void run()
{
/* Continuously dequeue tristanable packets from socket */
while(true)
{
/* Receive payload (tag+data) */
byte[] receivedPayload;
/* Block for socket response */
bool recvStatus = receiveMessage(socket, receivedPayload);
/* If the receive was successful */
if(recvStatus)
{
/* Decode the ttag-encoded message */
DataMessage message = DataMessage.decode(receivedPayload);
/* TODO: Remove isTag, improve later, oneshot */
/* The matching queue (if any) */
Queue queue = manager.getQueue(message.getTag());
/* If the tag belongs to a queue */
if(queue)
{
/* Add an item to this queue */
queue.enqueue(new QueueItem(message.getData()));
}
/* If the tag is unknwon */
else
{
/* TODO: Add to dropped queue? */
/* Do nothing */
}
}
/* If the receive failed */
else
{
/* TODO: depending on `running`, different error */
/* TODO: Stop everything */
break;
}
/**
* Like in `dequeue` we don't want the possibility
* of racing back to the top of the loop and locking
* the mutex again right before a thread switch,
* so we make sure that a switch occurs to a different
* thread
*/
Thread.getThis().yield();
}
/* Check if we had an error */
if(running)
{
throw new TristanableException(manager, "bformat socket error");
}
else
{
/* Actual shut down, do nothing */
}
}
}

10
todo
View File

@ -1,10 +0,0 @@
- [x] Use queues
- [x] Immediate head chop after dequeue
- [x] Thread.yields
- [ ] Sleep option
- [x] Use linked list for queues (increase performance)
- [ ] shutdown option
- [x] Queue policies
- [x] Length cap
- [ ] Exceptions
- [x] Queue deletion

@ -1 +0,0 @@
Subproject commit cb68e9f673dd7b2f490ee4ff2262e45b41a90a0f

Binary file not shown.