- Added an import for the `select(...)` function
- Added wrapper functions which handle the `FD_ZERO` and `FD_SET` macros which ImportC cannot refer to directly

Event

- Cleaned up imports
- `wait()` now calls `wait(Duration)` with a duration of `0`
- Implemented a wait with a timeout, `wait(Duration)` which returns `true` if not timed out and `false` on timeout
-  Implemented the underlying `select(...)`-based `wait(Duration)` inside of `wait(timeval)`
This commit is contained in:
Tristan B. Velloza Kildaire 2023-02-25 16:24:56 +02:00
parent 7ecac9e4cb
commit b82fb40f6f
2 changed files with 127 additions and 27 deletions

View File

@ -1,2 +1,13 @@
#include<unistd.h>
#include<sys/select.h>
void fdSetZero(fd_set* set)
{
FD_ZERO(set);
}
void fdSetSet(int fd, fd_set* set)
{
FD_SET(fd, set);
}

View File

@ -1,13 +1,12 @@
module libsnooze.event;
import libsnooze.clib : pipe, write, read;
import core.thread : Thread;
import libsnooze.clib : select, fd_set, fdSetZero, fdSetSet;
import libsnooze.clib : timeval, time_t, suseconds_t;
import core.thread : Thread, Duration, dur;
import core.sync.mutex : Mutex;
import libsnooze.exceptions : SnoozeError;
// TODO: Remove the below import - it is only for testing
import std.stdio : writeln;
public class Event
{
/* Array of [readFD, writeFD] pairs/arrays */
@ -45,28 +44,11 @@ public class Event
}
/**
* Wait on this event
* Wait on this event indefinately
*/
public final void wait()
{
import core.thread;
/* Get the thread object (TID) for the calling thread */
Thread callingThread = Thread.getThis();
/* Lock the pipe-pairs */
pipesLock.lock();
/* Checks if a pipe-pair exists, if not creates it */
int[2] pipePair = pipeExistenceEnsure(callingThread);
/* Unlock the pipe-pairs */
pipesLock.unlock();
/* Get the read end and read 1 byte (blockingly) */
int readFD = pipePair[0];
byte singleBuff;
read(readFD, &singleBuff, 1);
wait(dur!("seconds")(0));
}
private int[2] pipeExistenceEnsure(Thread thread)
@ -79,6 +61,7 @@ public class Event
/* If it is not in the pair, create a pipe-pair and save it */
if(!(thread in pipes))
{
// TODO: Add a catch here, then unlock then rethrow
pipes[thread] = newPipe(); //TODO: If bad (exception)
}
@ -91,6 +74,111 @@ public class Event
return pipePair;
}
// NOTE: Returns true on woken, false on timeout
private final bool wait(timeval timestruct)
{
/* Get the thread object (TID) for the calling thread */
Thread callingThread = Thread.getThis();
/* Lock the pipe-pairs */
pipesLock.lock();
/* Checks if a pipe-pair exists, if not creates it */
// TODO: Add a catch here, then unlock, rethrow
int[2] pipePair = pipeExistenceEnsure(callingThread);
/* Unlock the pipe-pairs */
pipesLock.unlock();
/* Get the reand-end of the pipe fd */
int readFD = pipePair[0];
// TODO: IO/queue block using select with a timeout
// select();
// NOTE: Not sure why but nfdsmust be the highest fd number that is being monitored+1
// ... so in our case that must be `pipePair[0]+1`
// Setup the fd_set for read fs struct
/**
* Setup the fd_set for read file descriptors
*
* 1. Initialize the struct with FD_ZERO
* 2. Add the file descriptor of interest i.e. `readFD`
*/
fd_set readFDs;
fdSetZero(&readFDs);
fdSetSet(readFD, &readFDs);
/**
* Now block till we have a change in `readFD`'s state
* (i.e. it becomes readbale without a block). However,
* if a timeout was specified we can then return after
* said timeout.
*/
int status = select(readFD+1, &readFDs, null, null, &timestruct);
/**
* If timeout was 0 then it blocks till readable and hence the
* status would then be non-zero. The only way it can be `0` is if
* the timeout was non-zero meaning it returned after timing out and
* nothing changed in any fd_set(s) (nothing became readable)
*/
if(status == 0)
{
// TODO: Handle timeout
return false;
}
// TODO: Check the -1 case
else
{
// TODO: Perform read now to clear sttaus for next wait()
/* Get the read end and read 1 byte (won't block) */
byte singleBuff;
read(readFD, &singleBuff, 1);
// TODO: ENsure no IO exception
return true;
}
// TODO: Then perform read to remove the status of "readbale"
// ... such that the next call to select still blocks if a notify()
// ... is yet to be called
}
/**
* Waits on the event with a given timeout
*
* Params:
* duration = the timeout
*/
public final bool wait(Duration duration)
{
/* Split out the duration into seconds and microseconds */
time_t seconds;
suseconds_t microseconds;
duration.split!("seconds", "msecs")(seconds, microseconds);
version(dbg)
{
/* If debugging enable, then print out these duirng compilation */
pragma(msg, time_t);
pragma(msg, suseconds_t);
}
/* Generate the timeval struct */
timeval timestruct;
timestruct.tv_sec = seconds;
timestruct.tv_usec = microseconds;
/* Call wait with this time duration */
return wait(timestruct);
}
/**
* Wakes up a single thread specified
*
@ -156,10 +244,6 @@ public class Event
private int[2] newPipe()
{
// writeln(pipes[0]);
/* Allocate space for the two FDs */
int[2] pipePair;
@ -181,6 +265,9 @@ unittest
{
import std.conv : to;
import core.thread : dur;
import std.stdio : writeln;
import libsnooze.clib : select, fd_set, fdSetZero, fdSetSet;
Event event = new Event();
@ -226,6 +313,8 @@ unittest
{
import std.conv : to;
import core.thread : dur;
import std.stdio : writeln;
import libsnooze.clib : select, fd_set, fdSetZero, fdSetSet;
Event event = new Event();