Compare commits

...

15 Commits

Author SHA1 Message Date
Tristan B. Velloza Kildaire 5d9c978629
Merge pull request #2 from besterprotocol/feature/river_stream
Feature/river stream
2023-04-30 13:04:33 +02:00
Tristan B. Velloza Kildaire 575db2d998 - Updated `README.md` to reflect support for river-based `Stream`s 2023-04-30 13:02:17 +02:00
Tristan B. Velloza Kildaire 40ce197bb7 Merge branch 'feature/oop' into feature/river_stream 2023-04-30 13:01:49 +02:00
Tristan B. Velloza Kildaire ed71224233 Merge branch 'master' into feature/oop 2023-04-30 13:01:03 +02:00
Tristan B. Velloza Kildaire 1094d3d61f - Updated description in `dub.json` and README 2023-04-30 12:59:58 +02:00
Tristan B. Velloza Kildaire d93e96f0f8 BClient
- Documented unit test
2023-04-30 12:54:11 +02:00
Tristan B. Velloza Kildaire 84e9b401a0 BClient
- Documented both constructors
2023-04-30 12:53:00 +02:00
Tristan B. Velloza Kildaire e1a167fdbf Package ( `bformat`)
- Updated comment for `bformat.client`

- Renamed `sockets` module to `client` module
2023-04-30 12:49:13 +02:00
Tristan B. Velloza Kildaire 00664bd844 Sockets
- Added a unittest which tests `BClient`
2023-04-30 12:42:59 +02:00
Tristan B. Velloza Kildaire 8e0c161e02 Sockets
- Cleaned up whitespace
2023-04-30 01:06:18 +02:00
Tristan B. Velloza Kildaire f49d8349ce BClient
- `sendMessage(byte[])` now encodes using `bformat.marshall`'s `encodeBformat(byte[])` rather than its own code
2023-04-30 01:03:19 +02:00
Tristan B. Velloza Kildaire 3b57b6c1ae BClient
- handle erroneous `writeFully(byte[])` call in `sendMessage(byte[])`
2023-04-30 00:55:39 +02:00
Tristan B. Velloza Kildaire 3a9fccb3f2 BClient
- Translated `receiveMessage(ref byte[])` over to using the `Stream`, `stream`
- Translated `sendMessage(byte[])` over to using the `Stream`, `stream`
2023-04-30 00:41:39 +02:00
Tristan B. Velloza Kildaire b94c433115 Sockets
- Underlying transport is now a river-based `Stream`
- Second constructor added which takes in a `Stream`, the original constructor now calls it with an adhoc created `SockStream` to wrap the consumed `Socket`
2023-04-30 00:14:05 +02:00
Tristan B. Velloza Kildaire d22455c2f4 - Added `river` as a dependency 2023-04-30 00:03:21 +02:00
5 changed files with 218 additions and 167 deletions

View File

@ -3,7 +3,7 @@ bformat
[![D](https://github.com/besterprotocol/bformat/actions/workflows/d.yml/badge.svg)](https://github.com/besterprotocol/bformat/actions/workflows/d.yml)
A simple message format for length prefixed messages
A simple message format for automatically length-prefixing messages over any [`Socket`](https://dlang.org/phobos/std_socket.html#.Socket) or [River-based](https://github.com/deavmi/river) `Stream`.
## What is bformat?

View File

@ -2,9 +2,12 @@
"authors": [
"Tristan B. Velloza Kildaire"
],
"homepage": "https://deavmi.assigned.network/projects/bformat/",
"copyright": "Copyright © 2023, Tristan B. Kildaire",
"dependencies": {
"river": "~>0.3.6"
},
"description": "bformat socket writer and reader",
"homepage": "https://deavmi.assigned.network/projects/bformat/",
"license": "LGPL v3",
"name": "bformat",
"targetType": "library"

209
source/bformat/client.d Normal file
View File

@ -0,0 +1,209 @@
/**
* Socket encoding/decoding functions
*/
module bformat.client;
import std.socket : Socket;
import river.core;
import river.impls.sock : SockStream;
/**
* Bformat client to encode and decode via a
* `Socket` or river-based `Stream`
*/
public class BClient
{
/**
* Underlying stream
*/
private Stream stream;
/**
* Constructs a new `BClient` for encoding and decoding
* to and from the provided `Socket`
*
* Params:
* socket = the `Socket` to use for writing and reading
*/
this(Socket socket)
{
this(new SockStream(socket));
}
/**
* Constructs a new `BClient` for encoding and decoding
* to and from the provided river-based `Stream`
*
* Params:
* stream = the `Stream` to use for writing and reading
*/
this(Stream stream)
{
this.stream = stream;
}
/**
* Receives a message from the provided socket
* by decoding the streamed bytes into bformat
* and finally placing the resulting payload in
* the provided array
*
* Params:
* originator = the socket to receive from
* receiveMessage = the nbuffer to receive into
*
* Returns: true if the receive succeeded, false otheriwse
*/
public bool receiveMessage(ref byte[] receiveMessage)
{
/* Construct a buffer to receive into */
byte[] receiveBuffer;
/* Get the length of the message */
byte[4] messageLengthBytes;
try
{
stream.readFully(messageLengthBytes);
}
catch(StreamException streamErr)
{
/* If there was an error reading from the socket */
return false;
}
/* Response message length */
uint messageLength;
/* Little endian version you simply read if off the bone (it's already in the correct order) */
version(LittleEndian)
{
messageLength = *cast(int*)messageLengthBytes.ptr;
}
/* Big endian requires we byte-sapped the little-endian encoded number */
version(BigEndian)
{
byte[] swappedLength;
swappedLength.length = 4;
swappedLength[0] = messageLengthBytes[3];
swappedLength[1] = messageLengthBytes[2];
swappedLength[2] = messageLengthBytes[1];
swappedLength[3] = messageLengthBytes[0];
messageLength = *cast(int*)swappedLength.ptr;
}
/* Read the full message */
receiveBuffer.length = messageLength;
try
{
stream.readFully(receiveBuffer);
receiveMessage = receiveBuffer;
/* If there was no error receiving the message */
return true;
}
catch(StreamException streamErr)
{
/* If there was an error reading from the socket */
return false;
}
}
/**
* Encodes the provided message into the bformat format
* and sends it over the provided socket
*
* Params:
* recipient = the socket to send over
* message = the message to encode and send
*
* Returns: true if the send succeeded, false otherwise
*/
public bool sendMessage(byte[] message)
{
/* The message buffer */
byte[] messageBuffer;
import bformat.marshall : encodeBformat;
messageBuffer = encodeBformat(message);
try
{
/* Send the message */
stream.writeFully(messageBuffer);
return true;
}
catch(StreamException streamError)
{
return false;
}
}
}
version(unittest)
{
import std.socket;
import core.thread;
import std.stdio;
}
/**
* Create a server that encodes a message to the client
* and then let the client decode it from us; both making
* use of `BClient` to accomplish this
*/
unittest
{
UnixAddress unixAddr = new UnixAddress("/tmp/bformatServer.sock");
scope(exit)
{
import std.stdio;
remove(cast(char*)unixAddr.path());
}
Socket serverSocket = new Socket(AddressFamily.UNIX, SocketType.STREAM);
serverSocket.bind(unixAddr);
serverSocket.listen(0);
class ServerThread : Thread
{
private Socket servSock;
this(Socket servSock)
{
this.servSock = servSock;
super(&worker);
}
private void worker()
{
Socket clientSock = servSock.accept();
BClient bClient = new BClient(clientSock);
byte[] message = cast(byte[])"ABBA";
bClient.sendMessage(message);
}
}
Thread serverThread = new ServerThread(serverSocket);
serverThread.start();
Socket client = new Socket(AddressFamily.UNIX, SocketType.STREAM);
client.connect(unixAddr);
BClient bClient = new BClient(client);
byte[] receivedMessage;
bClient.receiveMessage(receivedMessage);
assert(receivedMessage == "ABBA");
writeln(receivedMessage);
writeln(cast(string)receivedMessage);
}

View File

@ -4,18 +4,11 @@
module bformat;
/**
* Encodes the provided message into the bformat format
* and sends it over the provided socket
* Provides a client which consumes a stream
* which can encode and decode messages to
* and from it
*/
public import bformat.sockets : BClient;
/**
* Receives a message from the provided socket
* by decoding the streamed bytes into bformat
* and finally placing the resulting payload in
* the provided array
*/
// public import bformat.sockets : receiveMessage;
public import bformat.client : BClient;
/**
* Encodes the provided message into the bformat format

View File

@ -1,154 +0,0 @@
/**
* Socket encoding/decoding functions
*/
module bformat.sockets;
import std.socket : Socket, SocketFlags, MSG_WAITALL;
public class BClient
{
/**
* Underlying socket
*/
private Socket socket;
// TODO: comment
this(Socket socket)
{
this.socket = socket;
}
/**
* Receives a message from the provided socket
* by decoding the streamed bytes into bformat
* and finally placing the resulting payload in
* the provided array
*
* Params:
* originator = the socket to receive from
* receiveMessage = the nbuffer to receive into
*
* Returns: true if the receive succeeded, false otheriwse
*/
public bool receiveMessage(ref byte[] receiveMessage)
{
/* Construct a buffer to receive into */
byte[] receiveBuffer;
bool status = true;
/* The amount of bytes received */
long bytesReceived;
/* Get the length of the message */
byte[4] messageLengthBytes;
bytesReceived = socket.receive(messageLengthBytes, cast(SocketFlags)MSG_WAITALL);
/* If there was an error reading from the socket */
if(!(bytesReceived > 0))
{
status = false;
}
/* If the receive was successful */
else
{
/* Response message length */
uint messageLength;
/* Little endian version you simply read if off the bone (it's already in the correct order) */
version(LittleEndian)
{
messageLength = *cast(int*)messageLengthBytes.ptr;
}
/* Big endian requires we byte-sapped the little-endian encoded number */
version(BigEndian)
{
byte[] swappedLength;
swappedLength.length = 4;
swappedLength[0] = messageLengthBytes[3];
swappedLength[1] = messageLengthBytes[2];
swappedLength[2] = messageLengthBytes[1];
swappedLength[3] = messageLengthBytes[0];
messageLength = *cast(int*)swappedLength.ptr;
}
/* Read the full message */
receiveBuffer.length = messageLength;
bytesReceived = socket.receive(receiveBuffer, cast(SocketFlags)MSG_WAITALL);
/* If there was an error reading from the socket */
if(!(bytesReceived > 0))
{
status = false;
}
/* If there was no error receiving the message */
else
{
receiveMessage = receiveBuffer;
}
}
return status;
}
/**
* Encodes the provided message into the bformat format
* and sends it over the provided socket
*
* Params:
* recipient = the socket to send over
* message = the message to encode and send
*
* Returns: true if the send succeeded, false otherwise
*/
public bool sendMessage(byte[] message)
{
/* The message buffer */
byte[] messageBuffer;
/* Encode the 4 byte message length header (little endian) */
int payloadLength = cast(int)message.length;
byte* lengthBytes = cast(byte*)&payloadLength;
/* On little endian simply get the bytes as is (it would be encoded as little endian) */
version(LittleEndian)
{
messageBuffer ~= *(lengthBytes+0);
messageBuffer ~= *(lengthBytes+1);
messageBuffer ~= *(lengthBytes+2);
messageBuffer ~= *(lengthBytes+3);
}
/* On Big Endian you must swap the big-endian-encoded number to be in little endian ordering */
version(BigEndian)
{
messageBuffer ~= *(lengthBytes+3);
messageBuffer ~= *(lengthBytes+2);
messageBuffer ~= *(lengthBytes+1);
messageBuffer ~= *(lengthBytes+0);
}
/* Add the message to the buffer */
messageBuffer ~= cast(byte[])message;
/* Send the message */
long bytesSent = socket.send(messageBuffer);
/* TODO: Compact this */
return bytesSent > 0;
}
}