diff --git a/source/birchwood/client/client.d b/source/birchwood/client/client.d index 4fe0dcf..48aaaa4 100644 --- a/source/birchwood/client/client.d +++ b/source/birchwood/client/client.d @@ -14,6 +14,7 @@ import birchwood.protocol.messages : Message, encodeMessage, decodeMessage, isVa // import birchwood.protocol.constants : ReplyType; import birchwood.client.receiver : ReceiverThread; import birchwood.client.sender : SenderThread; +import birchwood.client.events; // TODO: Remove this import @@ -26,54 +27,6 @@ __gshared static this() logger = new DefaultLogger(); } - -public final enum IRCEventType : ulong -{ - GENERIC_EVENT = 1, - PONG_EVENT -} - - -/* TODO: Move to an events.d class */ -public final class IRCEvent : EventyEvent -{ - private Message msg; - - this(Message msg) - { - super(IRCEventType.GENERIC_EVENT); - - this.msg = msg; - } - - public Message getMessage() - { - return msg; - } - - public override string toString() - { - return msg.toString(); - } -} - -/* TODO: make PongEvent (id 2 buit-in) */ -public final class PongEvent : EventyEvent -{ - private string pingID; - - this(string pingID) - { - super(IRCEventType.PONG_EVENT); - this.pingID = pingID; - } - - public string getID() - { - return pingID; - } -} - // TODO: Make abstract and for unit tests make a `DefaultClient` // ... which logs outputs for the `onX()` handler functions public class Client : Thread @@ -87,10 +40,6 @@ public class Client : Thread package Socket socket; - /* Message queues (and handlers) */ - private SList!(ubyte[]) recvQueue, sendQueue; - private Mutex recvQueueLock, sendQueueLock; - private Thread recvHandler, sendHandler; private ReceiverThread receiver; private SenderThread sender; @@ -618,22 +567,17 @@ public class Client : Thread this.socket = new Socket(connInfo.getAddr().addressFamily(), SocketType.STREAM, ProtocolType.TCP); this.socket.connect(connInfo.getAddr()); - /* Initialize queue locks */ - this.recvQueueLock = new Mutex(); - this.sendQueueLock = new Mutex(); - /* Start the event engine */ this.engine = new Engine(); - /* Regsiter default handler */ + /* Register default handler */ initEvents(); - /* TODO: Clean this up and place elsewhere */ - this.recvHandler = new Thread(&recvHandlerFunc); - this.recvHandler.start(); - - this.sendHandler = new Thread(&sendHandlerFunc); - this.sendHandler.start(); + /** + * Start the receive and send queue manager + */ + this.receiver.start(); + this.sender.start(); /* Set running sttaus to true */ running = true; @@ -662,193 +606,10 @@ public class Client : Thread */ private void receiveQ(ubyte[] message) { - /* Lock queue */ - recvQueueLock.lock(); - - // TODO: Update the below to call `receiver.newRecv()` - /* Add to queue */ - recvQueue.insertAfter(recvQueue[], message); - - /* Unlock queue */ - recvQueueLock.unlock(); + /* Enqueue the message to the receive queue */ + receiver.rq(message); } - /* TODO: Spawn a thread worker that reacts */ - - /** - * This function is run as part of the "reactor" - * thread and its job is to effectively dequeue - * messages from the receive queue and call the - * correct handler function with the message as - * the event payload. - * - * It pays high priority to looking for a PING - * message first and handling those and then doing - * a second pass for other messages - * - * TODO: Do decode here and triggering of events here - */ - - /** - * The receive queue worker function - * - * This has the job of dequeuing messages - * in the receive queue, decoding them - * into Message objects and then emitting - * an event depending on the type of message - * - * Handles PINGs along with normal messages - * - * TODO: Our high load average is from here - * ... it is getting lock a lot and spinning here - * ... we should use libsnooze to avoid this - */ - private void recvHandlerFunc() - { - while(running) - { - /* Lock the receieve queue */ - recvQueueLock.lock(); - - /* Message being analysed */ - Message curMsg; - - /* Search for a PING */ - ubyte[] pingMessage; - - ulong pos = 0; - foreach(ubyte[] message; recvQueue[]) - { - if(indexOf(cast(string)message, "PING") > -1) - { - pingMessage = message; - recvQueue.linearRemoveElement(message); - break; - } - - - - - - pos++; - } - - - /** - * TODO: Plan of action - * - * 1. Firstly, we must run `parseReceivedMessage()` on the dequeued - * ping message (if any) - * 2. Then (if there was a PING) trigger said PING handler - * 3. Normal message handling; `parseReceivedMessage()` on one of the messages - * (make the dequeue amount configurable possibly) - * 4. Trigger generic handler - * 5. We might need to also have a queue for commands ISSUED and command-replies - * RECEIVED and then match those first and do something with them (tasky-esque) - * 6. We can just make a generic reply queue of these things - we have to maybe to this - * - we can cache or remember stuff when we get 353 - */ - - - - - /* If we found a PING */ - if(pingMessage.length > 0) - { - /* Decode the message and parse it */ - curMsg = Message.parseReceivedMessage(decodeMessage(pingMessage)); - logger.log("Found a ping: "~curMsg.toString()); - - // string ogMessage = cast(string)pingMessage; - // long idxSigStart = indexOf(ogMessage, ":")+1; - // long idxSigEnd = lastIndexOf(ogMessage, '\r'); - - // string pingID = ogMessage[idxSigStart..idxSigEnd]; - string pingID = curMsg.getParams(); - - - // this.socket.send(encodeMessage("PONG "~pingID)); - // string messageToSend = "PONG "~pingID; - - // sendMessage(messageToSend); - - // logger.log("Ponged"); - - /* TODO: Implement */ - // TODO: Remove the Eventy push and replace with a handler call (on second thought no) - EventyEvent pongEvent = new PongEvent(pingID); - engine.push(pongEvent); - } - - /* Now let's go message by message */ - if(!recvQueue.empty()) - { - ubyte[] message = recvQueue.front(); - - /* Decode message */ - string messageNormal = decodeMessage(message); - - recvQueue.linearRemoveElement(recvQueue.front()); - - // writeln("Normal message: "~messageNormal); - - - - /* TODO: Parse message and call correct handler */ - curMsg = Message.parseReceivedMessage(messageNormal); - - // TODO: Remove the Eventy push and replace with a handler call (on second thought no) - EventyEvent ircEvent = new IRCEvent(curMsg); - engine.push(ircEvent); - } - - - - /* Unlock the receive queue */ - recvQueueLock.unlock(); - - /* TODO: Threading yield here */ - Thread.yield(); - } - } - - /** - * The send queue worker function - * - * TODO: Same issue as recvHandlerFunc - * ... we should I/O wait (sleep) here - */ - private void sendHandlerFunc() - { - /* TODO: Hoist up into ConnInfo */ - ulong fakeLagInBetween = 1; - - while(running) - { - - /* TODO: handle normal messages (xCount with fakeLagInBetween) */ - - /* Lock queue */ - sendQueueLock.lock(); - - foreach(ubyte[] message; sendQueue[]) - { - this.socket.send(message); - Thread.sleep(dur!("seconds")(fakeLagInBetween)); - } - - /* Empty the send queue */ - sendQueue.clear(); - - /* Unlock queue */ - sendQueueLock.unlock(); - - /* TODO: Yield */ - Thread.yield(); - } - } - - /** * TODO: Make send queue which is used on another thread to send messages * @@ -868,15 +629,8 @@ public class Client : Thread /* Encode the mesage */ ubyte[] encodedMessage = encodeMessage(messageOut); - /* Lock queue */ - sendQueueLock.lock(); - - // TODO: Update the below to call `sender.newSend()` - /* Add to queue */ - sendQueue.insertAfter(sendQueue[], encodedMessage); - - /* Unlock queue */ - sendQueueLock.unlock(); + /* Enqueue the message to the send queue */ + sender.sq(encodedMessage); } /** @@ -910,13 +664,13 @@ public class Client : Thread socket.close(); logger.log("disconnect() socket closed"); - /* Wait for reeceive handler to realise it needs to stop */ - recvHandler.join(); - logger.log("disconnect() recvHandler stopped"); + /* Wait for receive queue manager to realise it needs to stop */ + receiver.join(); + logger.log("disconnect() recvQueue manager stopped"); - /* Wait for the send handler to realise it needs to stop */ - sendHandler.join(); - logger.log("disconnect() sendHandler stopped"); + /* Wait for the send queue manager to realise it needs to stop */ + sender.join(); + logger.log("disconnect() sendQueue manager stopped"); /* TODO: Stop eventy (FIXME: I don't know if this is implemented in Eventy yet, do this!) */ engine.shutdown(); diff --git a/source/birchwood/client/events.d b/source/birchwood/client/events.d new file mode 100644 index 0000000..df64b8e --- /dev/null +++ b/source/birchwood/client/events.d @@ -0,0 +1,51 @@ +module birchwood.client.events; + +import eventy : EventyEvent = Event; +import birchwood.protocol.messages : Message; + +public final enum IRCEventType : ulong +{ + GENERIC_EVENT = 1, + PONG_EVENT +} + + +/* TODO: Move to an events.d class */ +public final class IRCEvent : EventyEvent +{ + private Message msg; + + this(Message msg) + { + super(IRCEventType.GENERIC_EVENT); + + this.msg = msg; + } + + public Message getMessage() + { + return msg; + } + + public override string toString() + { + return msg.toString(); + } +} + +/* TODO: make PongEvent (id 2 buit-in) */ +public final class PongEvent : EventyEvent +{ + private string pingID; + + this(string pingID) + { + super(IRCEventType.PONG_EVENT); + this.pingID = pingID; + } + + public string getID() + { + return pingID; + } +} \ No newline at end of file diff --git a/source/birchwood/client/receiver.d b/source/birchwood/client/receiver.d index eab99b9..0d71d3e 100644 --- a/source/birchwood/client/receiver.d +++ b/source/birchwood/client/receiver.d @@ -14,6 +14,7 @@ import libsnooze; import birchwood.client; import birchwood.protocol.messages : Message, decodeMessage; import std.string : indexOf; +import birchwood.client.events : PongEvent, IRCEvent; public final class ReceiverThread : Thread {