eventy/source/eventy/engine.d

577 lines
14 KiB
D

module eventy.engine;
import eventy.queues : Queue;
import eventy.signal : Signal;
import eventy.event : Event;
import eventy.config;
import std.container.dlist;
import core.sync.mutex : Mutex;
import core.thread : Thread, dur, Duration;
import eventy.exceptions;
import std.stdio;
// /* TODO: Move elsewhere, this thing thinks it's a delegate in the unit test, idk why */
// private void runner(Event e)
// {
// import std.stdio;
// writeln("Running event", e.id);
// }
unittest
{
Engine engine = new Engine();
engine.start();
/**
* Let the event engine know what typeIDs are
* allowed to be queued
*/
engine.addQueue(1);
engine.addQueue(2);
/**
* Create a new Signal Handler that will handles
* event types `1` and `2` with the given `handler()`
* function
*/
class SignalHandler1 : Signal
{
this()
{
super([1, 2]);
}
public override void handler(Event e)
{
import std.stdio;
writeln("Running event", e.id);
}
}
/**
* Tell the event engine that I want to register
* the following handler for its queues `1` and `2`
*/
Signal j = new SignalHandler1();
engine.addSignalHandler(j);
Event eTest = new Event(1);
engine.push(eTest);
eTest = new Event(2);
engine.push(eTest);
Thread.sleep(dur!("seconds")(2));
engine.push(eTest);
writeln("done with main thread code");
while(engine.hasPendingEvents()) {}
/* TODO: Before shutting down, actually test it out (i.e. all events ran) */
engine.shutdown();
}
// unittest
// {
// EngineSettings customSettings = {holdOffMode: HoldOffMode.YIELD};
// Engine engine = new Engine(customSettings);
// engine.start();
// /**
// * Let the event engine know what typeIDs are
// * allowed to be queued
// */
// engine.addQueue(1);
// engine.addQueue(2);
// /**
// * Create a new Signal Handler that will handles
// * event types `1` and `2` with the given `handler()`
// * function
// */
// class SignalHandler1 : Signal
// {
// this()
// {
// super([1, 2]);
// }
// public override void handler(Event e)
// {
// import std.stdio;
// writeln("Running event", e.id);
// }
// }
// /**
// * Tell the event engine that I want to register
// * the following handler for its queues `1` and `2`
// */
// Signal j = new SignalHandler1();
// engine.addSignalHandler(j);
// Event eTest = new Event(1);
// engine.push(eTest);
// eTest = new Event(2);
// engine.push(eTest);
// Thread.sleep(dur!("seconds")(2));
// engine.push(eTest);
// writeln("done with main thread code");
// /* TODO: Before shutting down, actually test it out (i.e. all events ran) */
// engine.shutdown();
// }
/**
* Engine
*
* An instance of this represents an engine that
* can, at any time, handle the delivery of new
* events, trigger the correct signal handlers
* for the respective events, remove signal
* handlers, add signal handlers, among many
* other things
*/
public final class Engine : Thread
{
/* TODO: Or use a queue data structure */
private DList!(Queue) queues;
private Mutex queueLock;
/* TODO: Or use a queue data structure */
private DList!(Signal) handlers;
private Mutex handlerLock;
/* Engine configuration */
private EngineSettings settings;
private bool running;
private DList!(DispatchWrapper) threadStore;
private Mutex threadStoreLock;
this(EngineSettings settings)
{
super(&run);
queueLock = new Mutex();
handlerLock = new Mutex();
threadStoreLock = new Mutex();
this.settings = settings;
}
/**
* Instantiates a new Eventy engine with the default
* settings
*/
this()
{
EngineSettings defaultSettings;
/* Yield if a lock fails (prevent potential thread starvation) */
defaultSettings.agressiveTryLock = false;
/* Make the event engine loop sleep (1) and for 200ms (2) (TODO: Adjust this) */
defaultSettings.holdOffMode = HoldOffMode.SLEEP;
defaultSettings.sleepTime = dur!("msecs")(200);
this(defaultSettings);
}
/**
* Set the event loop sleep time
*
* The load average will sky rocket if it is 0,
* which is just because it is calculated on how
* full the run queue is, length but also over time
* and even just one task continousy in it will
* make the average high
*
* Reason why it's always runnable is the process
* (the "thread") is a tight loop with no sleeps
* that would dequeue it from the run queue and/or
* no I/O system calls that would put it into the
* waiting queue
*/
public void setSleep(Duration time)
{
// sleepTime = time;
}
/**
* Adds the given Signal handler
*
* @param e the Signal handler to add
*/
/**
* Attaches a new signal handler to the engine
*
* Params:
* e = the signal handler to add
*/
public void addSignalHandler(Signal e)
{
/* Lock the signal-set */
handlerLock.lock();
/* Add the new handler */
handlers ~= e;
/* Unlock the signal-set */
handlerLock.unlock();
}
/**
* The main event loop
*
* This checks at a certain interval (see HoldOffMode) if
* there are any events in any of the queues, if so,
* the dispatcher for said event type is called
*/
private void run()
{
running = true;
while (running)
{
/**
* Lock the queue-set
*
* Additionally:
* Don't waste time spinning on mutex,
* if it is not lockable then yield
*/
while (!queueLock.tryLock_nothrow())
{
yield();
}
foreach (Queue queue; queues)
{
/* If the queue has evenets queued */
if (queue.hasEvents())
{
/* TODO: Add different dequeuing techniques */
/* Pop the first Event */
Event headEvent = queue.popEvent();
/* Get all signal-handlers for this event type */
Signal[] handlersMatched = getSignalsForEvent(headEvent);
/* Dispatch the signal handlers */
dispatch(handlersMatched, headEvent);
}
}
/* Unlock the queue set */
queueLock.unlock();
/* Activate hold off (dependening on the type) */
if(settings.holdOffMode == HoldOffMode.YIELD)
{
/* Yield to stop mutex starvation */
yield();
}
else if(settings.holdOffMode == HoldOffMode.SLEEP)
{
/* Sleep the thread (for given time) to stop mutex starvation */
sleep(settings.sleepTime);
}
else
{
/* This should never happen */
assert(false);
}
}
}
/**
* Stops the engine
*
* TODO: Examine cases where this may not work
* TODO: Should we perhaps kill all other things
* i.e. stopping running threads
*/
public void shutdown()
{
/* TODO: Insert a lock here, that dispatch should adhere too as well */
/* Stop the loop */
running = false;
}
/**
* Dispatch(Signal[] set, Event e)
*
* Creates a new thread per signal and dispatches the event to them
*
* TODO: Add ability to dispatch on this thread
*/
private void dispatch(Signal[] signalSet, Event e)
{
foreach (Signal signal; signalSet)
{
/* Create a new Thread */
// Thread handlerThread = getThread(signal, e);
DispatchWrapper handlerThread = new DispatchWrapper(signal, e);
/**
* TODO
*
* When we call `shutdown()` there may very well be a case of
* where the threadStoreLock unlocks after the clean up
* loop, but storeThread hangs here during that time,
* then proceeds to start the thread, we should therefore,
* either block on running changed (solution 1, not as granular)
*
* Solution 2: Block on dispatch being called <- use this method rather
* But still needs a running check, it must not go ahead if running is now
* false
*/
/* Store the thread */
storeThread(handlerThread);
/* Start the thread */
handlerThread.start();
}
}
/**
* Store the thread
*
* TODO: This can only be implemented if we use
* wrapper threads that exit, and we can signal
* removal from thread store then
*/
private void storeThread(DispatchWrapper t)
{
/* Lock the thread store from editing */
threadStoreLock.lock();
/* Add the thread */
threadStore ~= t;
/* Unlock the thread store for editing */
threadStoreLock.unlock();
}
/**
* Removes a thread from the thread store
*/
private void removeThread(DispatchWrapper t)
{
/* Lock the thread store from editing */
threadStoreLock.lock();
/* Remove the thread */
threadStore.linearRemoveElement(t);
/* Unlock the thread store for editing */
threadStoreLock.unlock();
}
/**
* DispatchWrapper
*
* Effectively a thread but with the Signal,
* Event included with clean-up routines
*/
private class DispatchWrapper : Thread
{
private Signal signal;
private Event e;
this(Signal signal, Event e)
{
super(&run);
this.signal = signal;
this.e = e;
}
private void run()
{
/* Run the signal handler */
signal.handler(e);
/* Remove myself from the thread store */
removeThread(this);
}
}
/**
* returns all signal(s) responsible for
* handling the type of Event provided
*
* @param e the Event type to match to
* @returns Signal[] the list of signal
* handlers that handle event e
*/
public Signal[] getSignalsForEvent(Event e)
{
/* Matched handlers */
Signal[] matchedHandlers;
/* Lock the signal-set */
handlerLock.lock();
/* Find all handlers matching */
foreach (Signal signal; handlers)
{
if (signal.handles(e.id))
{
matchedHandlers ~= signal;
}
}
/* Unlock the signal-set */
handlerLock.unlock();
return matchedHandlers;
}
/**
* Checks if there is a signal handler that handles
* the given event id
* Params:
* id = the event ID to check
* Returns:
*/
public bool isSignalExists(ulong id)
{
return getSignalsForEvent(new Event(id)).length != 0;
}
/**
* push(Event e)
*
* Provided an Event, `e`, this will enqueue the event
* to
*/
public void push(Event e)
{
Queue matchedQueue = findQueue(e.id);
if (matchedQueue)
{
/* Append to the queue */
matchedQueue.add(e);
}
}
/**
* Creates a new queue with the given id
* and then adds it
*
* @param id the id of the new queue to add
* @throws EventyException if a queue with
* the given id already exists
*/
public void addQueue(ulong id)
{
/* Create a new queue with the given id */
Queue newQueue = new Queue(id);
/* Lock the queue collection */
queueLock.lock();
/* If no such queue exists then add it (recursive mutex used) */
if (!findQueue(id))
{
/* Add the queue */
queues ~= newQueue;
}
else
{
throw new EventyException("Failure to add queue with ID already in use");
}
/* Unlock the queue collection */
queueLock.unlock();
}
/**
* Given an id, this will return
* the Queue associated with said
* id
*
* @param id the id of the Queue
* @returns The Queue if found but
* null otherwise
*/
public Queue findQueue(ulong id)
{
/* Lock the queue collection */
queueLock.lock();
/* Find the matching queue */
Queue matchedQueue;
foreach (Queue queue; queues)
{
if (queue.id == id)
{
matchedQueue = queue;
break;
}
}
/* Unlock the queue collection */
queueLock.unlock();
return matchedQueue;
}
/* TODO: Add coumentation */
public ulong[] getTypes()
{
/* TODO: Implement me */
return null;
}
/**
* Checks if any of the queues in the event engine
* have any pending events in them waiting dispatch
*
* Returns: <code>true</code> if there are pending events,
* <code>false</code> otherwise
*/
public bool hasPendingEvents()
{
bool isPending = false;
/* Lock the queues */
queueLock.lock();
foreach (Queue queue; queues)
{
if (queue.hasEvents())
{
isPending = true;
break;
}
}
/* Unlock the queues */
queueLock.unlock();
return isPending;
}
}