Added garbage collection to the queue.

This commit is contained in:
Tristan B. Kildaire 2020-06-23 11:09:44 +02:00
parent c1712d8afe
commit 02ca3d9bb5
2 changed files with 68 additions and 0 deletions

View File

@ -0,0 +1,51 @@
module tristanable.garbage;
import tristanable.manager : Manager;
import tristanable.request : Request;
import std.socket : Socket;
import core.thread : Thread, Duration, dur;
import bmessage : receiveMessage;
public final class GarbageCollector : Thread
{
/**
* The associated manager
*/
private Manager manager;
/**
* The queue variable pointer
*/
private Request[]* requestQueueVariable;
this(Manager manager)
{
/* Set the worker function */
super(&cleaner);
/* Set the manager */
this.manager = manager;
/* Set the pointer */
requestQueueVariable = cast(Request[]*)manager.getQueueVariable();
}
private void cleaner()
{
while(true)
{
/* Lock the queue */
manager.lockQueue();
/* TODO: Add clean up here */
/* Unlock the queue */
manager.unlockQueue();
/* Sleep for 60 seconds after cleaning up */
sleep(dur!("seconds")(60));
}
}
}

View File

@ -2,6 +2,7 @@ module tristanable.manager;
import tristanable.watcher : Watcher;
import tristanable.request : Request;
import tristanable.garbage : GarbageCollector;
import std.socket : Socket;
import core.sync.mutex : Mutex;
import bmessage : bSendMessage = sendMessage;
@ -32,6 +33,11 @@ public final class Manager
*/
private Socket socket;
/**
* The garbage collector
*/
private GarbageCollector gc;
this(Socket endpoint)
{
/* Set the socket */
@ -40,11 +46,17 @@ public final class Manager
/* Create the watcher */
watcher = new Watcher(this, endpoint);
/* Create the garbage collector */
gc = new GarbageCollector(this);
/* Initialize the `requestQueue` mutex */
queueMutex = new Mutex();
/* Start the watcher */
watcher.start();
/* Start the garbage collector */
gc.start();
}
public void sendMessage(ulong tag, byte[] data)
@ -135,6 +147,11 @@ public final class Manager
return requestQueue;
}
public ref Request[] getQueueVariable()
{
return requestQueue;
}
public void lockQueue()
{
queueMutex.lock();