StreamListener

- Backoff mechanism now uses a condition variable
- Implemented `nudge()`
This commit is contained in:
Tristan B. Velloza Kildaire 2023-11-24 00:28:44 +02:00
parent ec3748f916
commit 4adcb7197e
1 changed files with 43 additions and 3 deletions

View File

@ -9,6 +9,8 @@ import river.impls.sock : SockStream;
import core.thread;
import renaissance.connection;
import renaissance.logging;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
public class StreamListener : Listener
{
@ -29,6 +31,9 @@ public class StreamListener : Listener
*/
private Thread workerThread;
private Mutex backoffMutex;
private Condition backoffCond;
/**
* Whether or not we are running
*
@ -48,9 +53,13 @@ public class StreamListener : Listener
/* When started, the thread should run the connectionLoop() */
workerThread = new Thread(&connectionLoop);
/* Initialize the backoff facilities */
this.backoffMutex = new Mutex();
this.backoffCond = new Condition(this.backoffMutex);
}
Duration backoff = dur!("seconds")(1);
Duration backoffDuration = dur!("seconds")(1);
private void connectionLoop()
{
@ -70,8 +79,8 @@ public class StreamListener : Listener
// TODO: Handling accept (which creates a new socket pair) is a problem
// ... we must code a backoff in hopes some client disconnects freeing
// ... up space for a new fd pair to be created
logger.warn("Waiting ", this.backoff, " many seconds before retrying...");
Thread.sleep(this.backoff);
logger.warn("Waiting ", this.backoffDuration, " many seconds before retrying...");
backoff();
logger.warn("Retrying the accept");
continue;
}
@ -87,6 +96,18 @@ public class StreamListener : Listener
}
}
private void backoff()
{
// Lock the mutex
this.backoffMutex.lock();
// Wait on the condition for `backoffDuration`-many duration
this.backoffCond.wait(this.backoffDuration);
// Unlock the mutex
this.backoffMutex.unlock();
}
public override void startListener()
{
try
@ -124,6 +145,25 @@ public class StreamListener : Listener
servSock.close();
}
/**
* Wakes up the sleeping
* backoff sleeper which
* may have been activated
* in the case the `accept()`
* call was failing
*/
public override void nudge()
{
// Lock the mutex
this.backoffMutex.lock();
// Wake up any sleeper (only one possible)
this.backoffCond.notify();
// Unlock the mutex
this.backoffMutex.unlock();
}
public static StreamListener create(Server server, Address bindAddress)
{
StreamListener streamListener = new StreamListener(server, bindAddress);