1
0
mirror of https://github.com/deavmi/birchwood synced 2024-09-20 11:43:22 +02:00
- Calling `sq(ubyte[])` now will wake up the condition variable

Receiver

- Calling `rq(ubyte[])` now will wake up the condition variable
- Updated `recvHandlerFunc()` to use the condition variable
- Callind `end()` will wakeup the sleeping thread
This commit is contained in:
Tristan B. Velloza Kildaire 2023-10-23 22:13:52 +02:00
parent ea2b07045a
commit c25c28c256

View File

@ -7,6 +7,7 @@ import core.thread : Thread, dur;
import std.container.slist : SList; import std.container.slist : SList;
import core.sync.mutex : Mutex; import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import eventy : EventyEvent = Event; import eventy : EventyEvent = Event;
@ -39,11 +40,10 @@ public final class ReceiverThread : Thread
private Mutex recvQueueLock; private Mutex recvQueueLock;
/** /**
* The libsnooze event to await on which * Condition variable for waking
* when we wake up signals a new message * up receive queue reader
* to be processed and received
*/ */
private Event receiveEvent; private Condition recvQueueCond;
/** /**
* The associated IRC client * The associated IRC client
@ -64,9 +64,8 @@ public final class ReceiverThread : Thread
{ {
super(&recvHandlerFunc); super(&recvHandlerFunc);
this.client = client; this.client = client;
this.receiveEvent = new Event();
this.recvQueueLock = new Mutex(); this.recvQueueLock = new Mutex();
this.receiveEvent.ensure(this); this.recvQueueCond = new Condition(this.recvQueueLock);
} }
/** /**
@ -84,14 +83,11 @@ public final class ReceiverThread : Thread
/* Add to queue */ /* Add to queue */
recvQueue.insertAfter(recvQueue[], encodedMessage); recvQueue.insertAfter(recvQueue[], encodedMessage);
/* Wake the sleeping message handler */
recvQueueCond.notify();
/* Unlock queue */ /* Unlock queue */
recvQueueLock.unlock(); recvQueueLock.unlock();
/**
* Wake up all threads waiting on this event
* (if any, and if so it would only be the receiver)
*/
receiveEvent.notifyAll();
} }
/** /**
@ -108,34 +104,12 @@ public final class ReceiverThread : Thread
{ {
while(client.isRunning()) while(client.isRunning())
{ {
// TODO: We could look at libsnooze wait starvation or mutex racing (future thought) /* Lock the queue */
try
{
receiveEvent.wait();
}
catch(InterruptedException e)
{
version(unittest)
{
writeln("wait() interrupted");
}
continue;
}
catch(FatalException e)
{
// TODO: This should crash and end
version(unittest)
{
writeln("wait() had a FATAL error!!!!!!!!!!!");
}
continue;
}
/* Lock the receieve queue */
recvQueueLock.lock(); recvQueueLock.lock();
/* Sleep till woken (new message) */
recvQueueCond.wait(); // TODO: Check SyncError?
/* Parsed messages */ /* Parsed messages */
SList!(Message) currentMessageQueue; SList!(Message) currentMessageQueue;
@ -237,14 +211,16 @@ public final class ReceiverThread : Thread
*/ */
public void end() public void end()
{ {
// TODO: See above notes about libsnooze behaviour due /* Lock the queue */
// ... to usage in our context recvQueueLock.lock();
receiveEvent.notifyAll();
/* Wake up sleeping thread (so it can exit) */
recvQueueCond.notify();
/* Unlock the queue */
recvQueueLock.unlock();
// Wait on the manager thread to end // Wait on the manager thread to end
join(); join();
// Dispose the eventy event (TODO: We could do this then join for same effect)
receiveEvent.dispose();
} }
} }