Compare commits

...

2 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire 3c97e9db9d Sender
- Calling `sq(ubyte[])` now will wake up the condition variable
2023-10-23 22:14:05 +02:00
Tristan B. Velloza Kildaire c25c28c256 Sender
- Calling `sq(ubyte[])` now will wake up the condition variable

Receiver

- Calling `rq(ubyte[])` now will wake up the condition variable
- Updated `recvHandlerFunc()` to use the condition variable
- Callind `end()` will wakeup the sleeping thread
2023-10-23 22:13:52 +02:00
2 changed files with 28 additions and 56 deletions

View File

@ -7,6 +7,7 @@ import core.thread : Thread, dur;
import std.container.slist : SList;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import eventy : EventyEvent = Event;
@ -39,11 +40,10 @@ public final class ReceiverThread : Thread
private Mutex recvQueueLock;
/**
* The libsnooze event to await on which
* when we wake up signals a new message
* to be processed and received
* Condition variable for waking
* up receive queue reader
*/
private Event receiveEvent;
private Condition recvQueueCond;
/**
* The associated IRC client
@ -64,9 +64,8 @@ public final class ReceiverThread : Thread
{
super(&recvHandlerFunc);
this.client = client;
this.receiveEvent = new Event();
this.recvQueueLock = new Mutex();
this.receiveEvent.ensure(this);
this.recvQueueCond = new Condition(this.recvQueueLock);
}
/**
@ -84,14 +83,11 @@ public final class ReceiverThread : Thread
/* Add to queue */
recvQueue.insertAfter(recvQueue[], encodedMessage);
/* Wake the sleeping message handler */
recvQueueCond.notify();
/* Unlock queue */
recvQueueLock.unlock();
/**
* Wake up all threads waiting on this event
* (if any, and if so it would only be the receiver)
*/
receiveEvent.notifyAll();
}
/**
@ -108,34 +104,12 @@ public final class ReceiverThread : Thread
{
while(client.isRunning())
{
// TODO: We could look at libsnooze wait starvation or mutex racing (future thought)
try
{
receiveEvent.wait();
}
catch(InterruptedException e)
{
version(unittest)
{
writeln("wait() interrupted");
}
continue;
}
catch(FatalException e)
{
// TODO: This should crash and end
version(unittest)
{
writeln("wait() had a FATAL error!!!!!!!!!!!");
}
continue;
}
/* Lock the receieve queue */
/* Lock the queue */
recvQueueLock.lock();
/* Sleep till woken (new message) */
recvQueueCond.wait(); // TODO: Check SyncError?
/* Parsed messages */
SList!(Message) currentMessageQueue;
@ -237,14 +211,16 @@ public final class ReceiverThread : Thread
*/
public void end()
{
// TODO: See above notes about libsnooze behaviour due
// ... to usage in our context
receiveEvent.notifyAll();
/* Lock the queue */
recvQueueLock.lock();
/* Wake up sleeping thread (so it can exit) */
recvQueueCond.notify();
/* Unlock the queue */
recvQueueLock.unlock();
// Wait on the manager thread to end
join();
// Dispose the eventy event (TODO: We could do this then join for same effect)
receiveEvent.dispose();
}
}

View File

@ -7,6 +7,7 @@ import core.thread : Thread, dur;
import std.container.slist : SList;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import birchwood.client;
@ -31,11 +32,10 @@ public final class SenderThread : Thread
private Mutex sendQueueLock;
/**
* The libsnooze event to await on which
* when we wake up signals a new message
* to be processed and sent
* Condition variable for waking
* up send queue reader
*/
private Event sendEvent;
private Condition sendQueueCond;
/**
* The associated IRC client
@ -56,9 +56,8 @@ public final class SenderThread : Thread
{
super(&sendHandlerFunc);
this.client = client;
this.sendEvent = new Event();
this.sendQueueLock = new Mutex();
this.sendEvent.ensure(this);
this.sendQueueCond = new Condition(this.sendQueueLock);
}
/**
@ -76,14 +75,11 @@ public final class SenderThread : Thread
/* Add to queue */
sendQueue.insertAfter(sendQueue[], encodedMessage);
/* Wake the sleeping message handler */
sendQueueCond.notify();
/* Unlock queue */
sendQueueLock.unlock();
/**
* Wake up all threads waiting on this event
* (if any, and if so it would only be the sender)
*/
sendEvent.notifyAll();
}
/**