Shutdown mechanism should work now

This commit is contained in:
Tristan B. Velloza Kildaire 2022-05-10 10:44:55 +02:00
parent cb68e9f673
commit 2be8a80a07
1 changed files with 66 additions and 4 deletions

View File

@ -2,8 +2,8 @@ module tristanable.watcher;
import tristanable.manager : Manager;
import tristanable.request : Request;
import std.socket : Socket;
import core.thread : Thread;
import std.socket : Socket, SocketSet;
import core.thread : Thread, dur, Duration;
import bmessage : receiveMessage;
/* TODO: Watcher class to watch for stuff, and add to manager's queues */
@ -27,11 +27,20 @@ public final class Watcher : Thread
*/
private bool isActive;
this(Manager manager, Socket endpoint)
/**
* Timeout for select()
*/
private Duration timeOut;
this(Manager manager, Socket endpoint, Duration timeOut = dur!("msecs")(100))
{
super(&watchLoop);
this.manager = manager;
this.endpoint = endpoint;
this.timeOut = timeOut;
initSelect();
isActive = true;
}
@ -40,6 +49,22 @@ public final class Watcher : Thread
isActive = false;
}
private SocketSet socketSetR, socketSetW, socketSetE;
/**
* Initializes the SocketSet which is needed for the use
* of the select() method0
*/
private void initSelect()
{
/* We acre about `endpoint` status changes */
socketSetR = new SocketSet();
socketSetR.add(endpoint);
socketSetW = new SocketSet();
socketSetE = new SocketSet();
}
private void watchLoop()
{
while(isActive)
@ -53,11 +78,48 @@ public final class Watcher : Thread
/* The message's data */
byte[] receivedMessage;
/* Check if the endpoint has any data available */
int status = Socket.select(socketSetR, socketSetW, socketSetE, timeOut);
/* If we timed out on the select() */
if(status == 0)
{
/* Check if we need to exit */
continue;
}
/* Interrupt */
else if (status == -1)
{
/* TODO: Not sure what we should do here */
}
/* Either data is available or a network occurred */
else
{
/* If we have data */
if(socketSetR.isSet(endpoint))
{
/* Do nothing (fall through) */
}
/* We have an error */
else
{
/* TODO: Handle this */
}
}
/* Receive a message */
bool recvStatus = receiveMessage(endpoint, receivedPayload);
/* TODO: Status check */
/* If there was some reading error */
if(!recvStatus)
{
/* TODO: Either we work with signals (preferred) or select and blocking */
/**
* I am thinking we can peek, and see if we have a potential message header (bmessage)
* Under that condition do some sort of blocking wait (in bmessage)
*/
}
/* Fetch the `tag` */
receivedTag = *(cast(ulong*)receivedPayload.ptr);