Connection
- Print out the message we are handling in `handle()` Daemon - Added some testing, but broken, config reading code - Enabled debugging in the logger Stream - Added a `STREAM`-based listener which produces `SockStream`-based `Connection` objects
This commit is contained in:
parent
f1fe8dd2c4
commit
c85a6a1fca
|
@ -79,7 +79,7 @@ public class Connection : Thread
|
|||
|
||||
private void handle(TaggedMessage incomingMessage)
|
||||
{
|
||||
|
||||
logger.dbg("Examining message '"~incomingMessage.toString()~"' ...");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -7,9 +7,46 @@ public static __gshared GoggaLogger logger;
|
|||
__gshared static this()
|
||||
{
|
||||
logger = new GoggaLogger();
|
||||
logger.enableDebug();
|
||||
}
|
||||
|
||||
import std.stdio;
|
||||
import river.core;
|
||||
import river.impls.file : FileStream;
|
||||
import std.json;
|
||||
|
||||
void main()
|
||||
{
|
||||
logger.info("Starting renaissance...");
|
||||
|
||||
// TODO: Add command-line parsing here, using jcli
|
||||
JSONValue config = getConfig("renaissance.json");
|
||||
}
|
||||
|
||||
JSONValue getConfig(string configPath)
|
||||
{
|
||||
File configFile;
|
||||
configFile.open(configPath);
|
||||
|
||||
// TODO: Wait for FileSTream from river-streams package to become
|
||||
// ... available
|
||||
Stream fileStream = new FileStream(configFile);
|
||||
|
||||
byte[] fileData;
|
||||
logger.dbg("Before calling size(): ", configFile.tell());
|
||||
fileData.length =configFile.size(); //FIXME: THis seems to seek it ahead and not return it ?
|
||||
logger.dbg("AFTER calling size(): ", configFile.tell());
|
||||
configFile.rewind();
|
||||
logger.dbg("File size: ", fileData.length);
|
||||
|
||||
|
||||
// NOTE: Throws a StreamException on error
|
||||
fileStream.readFully(fileData);
|
||||
|
||||
logger.dbg(fileData);
|
||||
|
||||
|
||||
JSONValue json = parseJSON("");
|
||||
|
||||
return json;
|
||||
}
|
|
@ -0,0 +1,113 @@
|
|||
module renaissance.listeners.stream;
|
||||
|
||||
import renaissance.listeners;
|
||||
|
||||
import renaissance.server;
|
||||
import std.socket;
|
||||
import river.core;
|
||||
import river.impls.sock : SockStream;
|
||||
import core.thread;
|
||||
import renaissance.connection;
|
||||
import renaissance.daemon : logger;
|
||||
|
||||
public class StreamListener : Listener
|
||||
{
|
||||
/**
|
||||
* Address to bind and listen for
|
||||
* connections on
|
||||
*/
|
||||
private Address bindAddr;
|
||||
|
||||
/**
|
||||
* The server socket to listen for
|
||||
* incoming connections on
|
||||
*/
|
||||
private Socket servSock;
|
||||
|
||||
/**
|
||||
* The connetion loop thread
|
||||
*/
|
||||
private Thread workerThread;
|
||||
|
||||
/**
|
||||
* Whether or not we are running
|
||||
*
|
||||
* TODO: Look into making volatile for caching issues
|
||||
*/
|
||||
private bool isRunning = false;
|
||||
|
||||
private this(Server server, Address bindAddr)
|
||||
{
|
||||
super(server);
|
||||
|
||||
/* Create the socket */
|
||||
servSock = new Socket(bindAddr.addressFamily(), SocketType.STREAM);
|
||||
|
||||
/* When started, the thread should run the connectionLoop() */
|
||||
workerThread = new Thread(&connectionLoop);
|
||||
}
|
||||
|
||||
private void connectionLoop()
|
||||
{
|
||||
while(isRunning)
|
||||
{
|
||||
Socket clientSocket = servSock.accept();
|
||||
logger.info("New incoming connection on listener '"~this.toString()~"' from '"~clientSocket.toString()~"'");
|
||||
// TODO: add log here
|
||||
|
||||
/**
|
||||
* Create a `SockStream` from the `Socket`,
|
||||
* a new connection handler with the stream
|
||||
* and then start the handler on its own thread
|
||||
*/
|
||||
Stream clientStream = new SockStream(clientSocket);
|
||||
Connection clientConnection = Connection.newConnection(server, clientStream);
|
||||
clientConnection.start();
|
||||
}
|
||||
}
|
||||
|
||||
public override void startListener()
|
||||
{
|
||||
try
|
||||
{
|
||||
servSock.bind(bindAddr);
|
||||
}
|
||||
catch(SocketOSException sockErr)
|
||||
{
|
||||
throw new ListenerException("Could not bind listener to address '"~bindAddr.toString()~"'");
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
servSock.listen(0); // TODO: Make this configurable, the queueing limit (currently unlimited)
|
||||
}
|
||||
catch(SocketOSException sockErr)
|
||||
{
|
||||
throw new ListenerException("Could not listen on socket '"~servSock.toString()~"'");
|
||||
}
|
||||
|
||||
/* Start the worker thread */
|
||||
workerThread.start();
|
||||
}
|
||||
|
||||
// TODO: Look into not allowing it to run again, or maybe not, allow re-run
|
||||
public override void stopListener()
|
||||
{
|
||||
/* Set running status to false */
|
||||
isRunning = false;
|
||||
|
||||
/* Close the server socket, unblocking any `accept()` call */
|
||||
servSock.close();
|
||||
}
|
||||
|
||||
public static StreamListener create(Server server, Address bindAddress)
|
||||
{
|
||||
StreamListener streamListener = new StreamListener(server, bindAddress);
|
||||
|
||||
// TODO: Set bind address here
|
||||
|
||||
|
||||
|
||||
return streamListener;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue