mirror of
https://github.com/deavmi/birchwood
synced 2024-09-19 17:43:20 +02:00
Events
- Moved `IRCEventType`, `IRCEvent` and `PongEvent` to a new module `events` Client - Removed old `recvQueue`, `sendQueue`, their respective locks and threads - Removed any initializations of the aforementioned - Added calls to start the `ReceiverThread` and `SenderThread` on call to `connect()` - Updated `disconnect()`, `sendMessage(string)` and `receiveQ(ubyte[])` to use the new `ReceiverThread` and `SenderThread` Receiver - Added missing import
This commit is contained in:
parent
d89fe0fd06
commit
9d7682bfdb
@ -14,6 +14,7 @@ import birchwood.protocol.messages : Message, encodeMessage, decodeMessage, isVa
|
||||
// import birchwood.protocol.constants : ReplyType;
|
||||
import birchwood.client.receiver : ReceiverThread;
|
||||
import birchwood.client.sender : SenderThread;
|
||||
import birchwood.client.events;
|
||||
|
||||
|
||||
// TODO: Remove this import
|
||||
@ -26,54 +27,6 @@ __gshared static this()
|
||||
logger = new DefaultLogger();
|
||||
}
|
||||
|
||||
|
||||
public final enum IRCEventType : ulong
|
||||
{
|
||||
GENERIC_EVENT = 1,
|
||||
PONG_EVENT
|
||||
}
|
||||
|
||||
|
||||
/* TODO: Move to an events.d class */
|
||||
public final class IRCEvent : EventyEvent
|
||||
{
|
||||
private Message msg;
|
||||
|
||||
this(Message msg)
|
||||
{
|
||||
super(IRCEventType.GENERIC_EVENT);
|
||||
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
public Message getMessage()
|
||||
{
|
||||
return msg;
|
||||
}
|
||||
|
||||
public override string toString()
|
||||
{
|
||||
return msg.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO: make PongEvent (id 2 buit-in) */
|
||||
public final class PongEvent : EventyEvent
|
||||
{
|
||||
private string pingID;
|
||||
|
||||
this(string pingID)
|
||||
{
|
||||
super(IRCEventType.PONG_EVENT);
|
||||
this.pingID = pingID;
|
||||
}
|
||||
|
||||
public string getID()
|
||||
{
|
||||
return pingID;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Make abstract and for unit tests make a `DefaultClient`
|
||||
// ... which logs outputs for the `onX()` handler functions
|
||||
public class Client : Thread
|
||||
@ -87,10 +40,6 @@ public class Client : Thread
|
||||
|
||||
package Socket socket;
|
||||
|
||||
/* Message queues (and handlers) */
|
||||
private SList!(ubyte[]) recvQueue, sendQueue;
|
||||
private Mutex recvQueueLock, sendQueueLock;
|
||||
private Thread recvHandler, sendHandler;
|
||||
|
||||
private ReceiverThread receiver;
|
||||
private SenderThread sender;
|
||||
@ -618,22 +567,17 @@ public class Client : Thread
|
||||
this.socket = new Socket(connInfo.getAddr().addressFamily(), SocketType.STREAM, ProtocolType.TCP);
|
||||
this.socket.connect(connInfo.getAddr());
|
||||
|
||||
/* Initialize queue locks */
|
||||
this.recvQueueLock = new Mutex();
|
||||
this.sendQueueLock = new Mutex();
|
||||
|
||||
/* Start the event engine */
|
||||
this.engine = new Engine();
|
||||
|
||||
/* Regsiter default handler */
|
||||
/* Register default handler */
|
||||
initEvents();
|
||||
|
||||
/* TODO: Clean this up and place elsewhere */
|
||||
this.recvHandler = new Thread(&recvHandlerFunc);
|
||||
this.recvHandler.start();
|
||||
|
||||
this.sendHandler = new Thread(&sendHandlerFunc);
|
||||
this.sendHandler.start();
|
||||
/**
|
||||
* Start the receive and send queue manager
|
||||
*/
|
||||
this.receiver.start();
|
||||
this.sender.start();
|
||||
|
||||
/* Set running sttaus to true */
|
||||
running = true;
|
||||
@ -662,193 +606,10 @@ public class Client : Thread
|
||||
*/
|
||||
private void receiveQ(ubyte[] message)
|
||||
{
|
||||
/* Lock queue */
|
||||
recvQueueLock.lock();
|
||||
|
||||
// TODO: Update the below to call `receiver.newRecv()`
|
||||
/* Add to queue */
|
||||
recvQueue.insertAfter(recvQueue[], message);
|
||||
|
||||
/* Unlock queue */
|
||||
recvQueueLock.unlock();
|
||||
/* Enqueue the message to the receive queue */
|
||||
receiver.rq(message);
|
||||
}
|
||||
|
||||
/* TODO: Spawn a thread worker that reacts */
|
||||
|
||||
/**
|
||||
* This function is run as part of the "reactor"
|
||||
* thread and its job is to effectively dequeue
|
||||
* messages from the receive queue and call the
|
||||
* correct handler function with the message as
|
||||
* the event payload.
|
||||
*
|
||||
* It pays high priority to looking for a PING
|
||||
* message first and handling those and then doing
|
||||
* a second pass for other messages
|
||||
*
|
||||
* TODO: Do decode here and triggering of events here
|
||||
*/
|
||||
|
||||
/**
|
||||
* The receive queue worker function
|
||||
*
|
||||
* This has the job of dequeuing messages
|
||||
* in the receive queue, decoding them
|
||||
* into Message objects and then emitting
|
||||
* an event depending on the type of message
|
||||
*
|
||||
* Handles PINGs along with normal messages
|
||||
*
|
||||
* TODO: Our high load average is from here
|
||||
* ... it is getting lock a lot and spinning here
|
||||
* ... we should use libsnooze to avoid this
|
||||
*/
|
||||
private void recvHandlerFunc()
|
||||
{
|
||||
while(running)
|
||||
{
|
||||
/* Lock the receieve queue */
|
||||
recvQueueLock.lock();
|
||||
|
||||
/* Message being analysed */
|
||||
Message curMsg;
|
||||
|
||||
/* Search for a PING */
|
||||
ubyte[] pingMessage;
|
||||
|
||||
ulong pos = 0;
|
||||
foreach(ubyte[] message; recvQueue[])
|
||||
{
|
||||
if(indexOf(cast(string)message, "PING") > -1)
|
||||
{
|
||||
pingMessage = message;
|
||||
recvQueue.linearRemoveElement(message);
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
pos++;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* TODO: Plan of action
|
||||
*
|
||||
* 1. Firstly, we must run `parseReceivedMessage()` on the dequeued
|
||||
* ping message (if any)
|
||||
* 2. Then (if there was a PING) trigger said PING handler
|
||||
* 3. Normal message handling; `parseReceivedMessage()` on one of the messages
|
||||
* (make the dequeue amount configurable possibly)
|
||||
* 4. Trigger generic handler
|
||||
* 5. We might need to also have a queue for commands ISSUED and command-replies
|
||||
* RECEIVED and then match those first and do something with them (tasky-esque)
|
||||
* 6. We can just make a generic reply queue of these things - we have to maybe to this
|
||||
* - we can cache or remember stuff when we get 353
|
||||
*/
|
||||
|
||||
|
||||
|
||||
|
||||
/* If we found a PING */
|
||||
if(pingMessage.length > 0)
|
||||
{
|
||||
/* Decode the message and parse it */
|
||||
curMsg = Message.parseReceivedMessage(decodeMessage(pingMessage));
|
||||
logger.log("Found a ping: "~curMsg.toString());
|
||||
|
||||
// string ogMessage = cast(string)pingMessage;
|
||||
// long idxSigStart = indexOf(ogMessage, ":")+1;
|
||||
// long idxSigEnd = lastIndexOf(ogMessage, '\r');
|
||||
|
||||
// string pingID = ogMessage[idxSigStart..idxSigEnd];
|
||||
string pingID = curMsg.getParams();
|
||||
|
||||
|
||||
// this.socket.send(encodeMessage("PONG "~pingID));
|
||||
// string messageToSend = "PONG "~pingID;
|
||||
|
||||
// sendMessage(messageToSend);
|
||||
|
||||
// logger.log("Ponged");
|
||||
|
||||
/* TODO: Implement */
|
||||
// TODO: Remove the Eventy push and replace with a handler call (on second thought no)
|
||||
EventyEvent pongEvent = new PongEvent(pingID);
|
||||
engine.push(pongEvent);
|
||||
}
|
||||
|
||||
/* Now let's go message by message */
|
||||
if(!recvQueue.empty())
|
||||
{
|
||||
ubyte[] message = recvQueue.front();
|
||||
|
||||
/* Decode message */
|
||||
string messageNormal = decodeMessage(message);
|
||||
|
||||
recvQueue.linearRemoveElement(recvQueue.front());
|
||||
|
||||
// writeln("Normal message: "~messageNormal);
|
||||
|
||||
|
||||
|
||||
/* TODO: Parse message and call correct handler */
|
||||
curMsg = Message.parseReceivedMessage(messageNormal);
|
||||
|
||||
// TODO: Remove the Eventy push and replace with a handler call (on second thought no)
|
||||
EventyEvent ircEvent = new IRCEvent(curMsg);
|
||||
engine.push(ircEvent);
|
||||
}
|
||||
|
||||
|
||||
|
||||
/* Unlock the receive queue */
|
||||
recvQueueLock.unlock();
|
||||
|
||||
/* TODO: Threading yield here */
|
||||
Thread.yield();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The send queue worker function
|
||||
*
|
||||
* TODO: Same issue as recvHandlerFunc
|
||||
* ... we should I/O wait (sleep) here
|
||||
*/
|
||||
private void sendHandlerFunc()
|
||||
{
|
||||
/* TODO: Hoist up into ConnInfo */
|
||||
ulong fakeLagInBetween = 1;
|
||||
|
||||
while(running)
|
||||
{
|
||||
|
||||
/* TODO: handle normal messages (xCount with fakeLagInBetween) */
|
||||
|
||||
/* Lock queue */
|
||||
sendQueueLock.lock();
|
||||
|
||||
foreach(ubyte[] message; sendQueue[])
|
||||
{
|
||||
this.socket.send(message);
|
||||
Thread.sleep(dur!("seconds")(fakeLagInBetween));
|
||||
}
|
||||
|
||||
/* Empty the send queue */
|
||||
sendQueue.clear();
|
||||
|
||||
/* Unlock queue */
|
||||
sendQueueLock.unlock();
|
||||
|
||||
/* TODO: Yield */
|
||||
Thread.yield();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* TODO: Make send queue which is used on another thread to send messages
|
||||
*
|
||||
@ -868,15 +629,8 @@ public class Client : Thread
|
||||
/* Encode the mesage */
|
||||
ubyte[] encodedMessage = encodeMessage(messageOut);
|
||||
|
||||
/* Lock queue */
|
||||
sendQueueLock.lock();
|
||||
|
||||
// TODO: Update the below to call `sender.newSend()`
|
||||
/* Add to queue */
|
||||
sendQueue.insertAfter(sendQueue[], encodedMessage);
|
||||
|
||||
/* Unlock queue */
|
||||
sendQueueLock.unlock();
|
||||
/* Enqueue the message to the send queue */
|
||||
sender.sq(encodedMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -910,13 +664,13 @@ public class Client : Thread
|
||||
socket.close();
|
||||
logger.log("disconnect() socket closed");
|
||||
|
||||
/* Wait for reeceive handler to realise it needs to stop */
|
||||
recvHandler.join();
|
||||
logger.log("disconnect() recvHandler stopped");
|
||||
/* Wait for receive queue manager to realise it needs to stop */
|
||||
receiver.join();
|
||||
logger.log("disconnect() recvQueue manager stopped");
|
||||
|
||||
/* Wait for the send handler to realise it needs to stop */
|
||||
sendHandler.join();
|
||||
logger.log("disconnect() sendHandler stopped");
|
||||
/* Wait for the send queue manager to realise it needs to stop */
|
||||
sender.join();
|
||||
logger.log("disconnect() sendQueue manager stopped");
|
||||
|
||||
/* TODO: Stop eventy (FIXME: I don't know if this is implemented in Eventy yet, do this!) */
|
||||
engine.shutdown();
|
||||
|
51
source/birchwood/client/events.d
Normal file
51
source/birchwood/client/events.d
Normal file
@ -0,0 +1,51 @@
|
||||
module birchwood.client.events;
|
||||
|
||||
import eventy : EventyEvent = Event;
|
||||
import birchwood.protocol.messages : Message;
|
||||
|
||||
public final enum IRCEventType : ulong
|
||||
{
|
||||
GENERIC_EVENT = 1,
|
||||
PONG_EVENT
|
||||
}
|
||||
|
||||
|
||||
/* TODO: Move to an events.d class */
|
||||
public final class IRCEvent : EventyEvent
|
||||
{
|
||||
private Message msg;
|
||||
|
||||
this(Message msg)
|
||||
{
|
||||
super(IRCEventType.GENERIC_EVENT);
|
||||
|
||||
this.msg = msg;
|
||||
}
|
||||
|
||||
public Message getMessage()
|
||||
{
|
||||
return msg;
|
||||
}
|
||||
|
||||
public override string toString()
|
||||
{
|
||||
return msg.toString();
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO: make PongEvent (id 2 buit-in) */
|
||||
public final class PongEvent : EventyEvent
|
||||
{
|
||||
private string pingID;
|
||||
|
||||
this(string pingID)
|
||||
{
|
||||
super(IRCEventType.PONG_EVENT);
|
||||
this.pingID = pingID;
|
||||
}
|
||||
|
||||
public string getID()
|
||||
{
|
||||
return pingID;
|
||||
}
|
||||
}
|
@ -14,6 +14,7 @@ import libsnooze;
|
||||
import birchwood.client;
|
||||
import birchwood.protocol.messages : Message, decodeMessage;
|
||||
import std.string : indexOf;
|
||||
import birchwood.client.events : PongEvent, IRCEvent;
|
||||
|
||||
public final class ReceiverThread : Thread
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user