mirror of
https://github.com/deavmi/birchwood
synced 2024-09-19 23:03:44 +02:00
ReceiverThread
- Copied across `recvHandlerFunc()` from `Client` SenderThread - Copied across `sendHandlerFunc()` from `Client`
This commit is contained in:
parent
86cc450a37
commit
8e9790a518
@ -9,7 +9,7 @@ import core.sync.mutex : Mutex;
|
||||
import libsnooze.clib;
|
||||
import libsnooze;
|
||||
|
||||
import birchwood.client.core : Client;
|
||||
import birchwood.client;
|
||||
|
||||
public final class ReceiverThread : Thread
|
||||
{
|
||||
@ -42,4 +42,134 @@ public final class ReceiverThread : Thread
|
||||
{
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
/**
|
||||
* The receive queue worker function
|
||||
*
|
||||
* This has the job of dequeuing messages
|
||||
* in the receive queue, decoding them
|
||||
* into Message objects and then emitting
|
||||
* an event depending on the type of message
|
||||
*
|
||||
* Handles PINGs along with normal messages
|
||||
*
|
||||
* TODO: Our high load average is from here
|
||||
* ... it is getting lock a lot and spinning here
|
||||
* ... we should use libsnooze to avoid this
|
||||
*/
|
||||
private void recvHandlerFunc()
|
||||
{
|
||||
while(running)
|
||||
{
|
||||
// TODO: Insert libsnooze wait here
|
||||
|
||||
// TODO: Add a for-loop here which one can configure which is
|
||||
// ... a "per iteration" how much to process and act on
|
||||
|
||||
// TODO: We could look at libsnooze wait starvation or mutex racing (future thought)
|
||||
|
||||
/* Lock the receieve queue */
|
||||
recvQueueLock.lock();
|
||||
|
||||
/* Message being analysed */
|
||||
Message curMsg;
|
||||
|
||||
/* Search for a PING */
|
||||
ubyte[] pingMessage;
|
||||
|
||||
ulong pos = 0;
|
||||
foreach(ubyte[] message; recvQueue[])
|
||||
{
|
||||
if(indexOf(cast(string)message, "PING") > -1)
|
||||
{
|
||||
pingMessage = message;
|
||||
recvQueue.linearRemoveElement(message);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
pos++;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* TODO: Plan of action
|
||||
*
|
||||
* 1. Firstly, we must run `parseReceivedMessage()` on the dequeued
|
||||
* ping message (if any)
|
||||
* 2. Then (if there was a PING) trigger said PING handler
|
||||
* 3. Normal message handling; `parseReceivedMessage()` on one of the messages
|
||||
* (make the dequeue amount configurable possibly)
|
||||
* 4. Trigger generic handler
|
||||
* 5. We might need to also have a queue for commands ISSUED and command-replies
|
||||
* RECEIVED and then match those first and do something with them (tasky-esque)
|
||||
* 6. We can just make a generic reply queue of these things - we have to maybe to this
|
||||
* - we can cache or remember stuff when we get 353
|
||||
*/
|
||||
|
||||
|
||||
|
||||
|
||||
/* If we found a PING */
|
||||
if(pingMessage.length > 0)
|
||||
{
|
||||
/* Decode the message and parse it */
|
||||
curMsg = Message.parseReceivedMessage(decodeMessage(pingMessage));
|
||||
logger.log("Found a ping: "~curMsg.toString());
|
||||
|
||||
// string ogMessage = cast(string)pingMessage;
|
||||
// long idxSigStart = indexOf(ogMessage, ":")+1;
|
||||
// long idxSigEnd = lastIndexOf(ogMessage, '\r');
|
||||
|
||||
// string pingID = ogMessage[idxSigStart..idxSigEnd];
|
||||
string pingID = curMsg.getParams();
|
||||
|
||||
|
||||
// this.socket.send(encodeMessage("PONG "~pingID));
|
||||
// string messageToSend = "PONG "~pingID;
|
||||
|
||||
// sendMessage(messageToSend);
|
||||
|
||||
// logger.log("Ponged");
|
||||
|
||||
/* TODO: Implement */
|
||||
// TODO: Remove the Eventy push and replace with a handler call (on second thought no)
|
||||
Event pongEvent = new PongEvent(pingID);
|
||||
engine.push(pongEvent);
|
||||
}
|
||||
|
||||
/* Now let's go message by message */
|
||||
if(!recvQueue.empty())
|
||||
{
|
||||
ubyte[] message = recvQueue.front();
|
||||
|
||||
/* Decode message */
|
||||
string messageNormal = decodeMessage(message);
|
||||
|
||||
recvQueue.linearRemoveElement(recvQueue.front());
|
||||
|
||||
// writeln("Normal message: "~messageNormal);
|
||||
|
||||
|
||||
|
||||
/* TODO: Parse message and call correct handler */
|
||||
curMsg = Message.parseReceivedMessage(messageNormal);
|
||||
|
||||
// TODO: Remove the Eventy push and replace with a handler call (on second thought no)
|
||||
Event ircEvent = new IRCEvent(curMsg);
|
||||
engine.push(ircEvent);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* Unlock the receive queue */
|
||||
recvQueueLock.unlock();
|
||||
|
||||
/* TODO: Threading yield here */
|
||||
Thread.yield();
|
||||
}
|
||||
}
|
||||
}
|
@ -9,7 +9,7 @@ import core.sync.mutex : Mutex;
|
||||
import libsnooze.clib;
|
||||
import libsnooze;
|
||||
|
||||
import birchwood.client.core : Client;
|
||||
import birchwood.client;
|
||||
|
||||
public final class SenderThread : Thread
|
||||
{
|
||||
@ -42,4 +42,46 @@ public final class SenderThread : Thread
|
||||
{
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
/**
|
||||
* The send queue worker function
|
||||
*
|
||||
* TODO: Same issue as recvHandlerFunc
|
||||
* ... we should I/O wait (sleep) here
|
||||
*/
|
||||
private void sendHandlerFunc()
|
||||
{
|
||||
/* TODO: Hoist up into ConnInfo */
|
||||
ulong fakeLagInBetween = 1;
|
||||
|
||||
while(running)
|
||||
{
|
||||
// TODO: Insert libsnooze wait here
|
||||
|
||||
// TODO: Add a for-loop here which one can configure which is
|
||||
// ... a "per iteration" how much to process and act on
|
||||
|
||||
// TODO: We could look at libsnooze wait starvation or mutex racing (future thought)
|
||||
|
||||
/* TODO: handle normal messages (xCount with fakeLagInBetween) */
|
||||
|
||||
/* Lock queue */
|
||||
sendQueueLock.lock();
|
||||
|
||||
foreach(ubyte[] message; sendQueue[])
|
||||
{
|
||||
this.socket.send(message);
|
||||
Thread.sleep(dur!("seconds")(fakeLagInBetween));
|
||||
}
|
||||
|
||||
/* Empty the send queue */
|
||||
sendQueue.clear();
|
||||
|
||||
/* Unlock queue */
|
||||
sendQueueLock.unlock();
|
||||
|
||||
/* TODO: Yield */
|
||||
Thread.yield();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user