CoapClient

- Removed the `Socket` instance
- `init()` now calls `begin()` on the messaging layer and no longer creates a new `Socket` itself
- `close()` now just calls `close()` on the messaging layer
- `transmitRequest(CoapRequest)` now calls `send(CoapPacket)` on the messaging layer instead of via a socket's `send(ubyte[])`

CoapMessagingLayer

- Added a `running` flag
- Added `begin()` which sets the running flag (ours) to `true`, it then creates and opens a `Socket` and then starts the reading loop
- Added `send(CoapPacket)` whcih wire-encoded the packet and then sends it over our `Socket`
- Added `close()` which sets the running flag to false, shutsdown the `Socket` (unblocking any blocking receives) and then closes the `Socket` (releasing the bound datagram port)
- Removed references to the `CoapClient`'s now-non-existent `Socket`
This commit is contained in:
Tristan B. Velloza Kildaire 2023-09-22 17:19:38 +02:00
parent 76feaade27
commit 0c81ce635f
2 changed files with 59 additions and 24 deletions

View File

@ -24,13 +24,6 @@ public class CoapClient
*/ */
package bool running; package bool running;
/**
* The datagram socket
*
* TODO: Move this into the messaging layer
*/
package Socket socket;
/** /**
* The messaging layer which provides * The messaging layer which provides
* request-response message match-ups * request-response message match-ups
@ -94,18 +87,11 @@ public class CoapClient
*/ */
private void init() private void init()
{ {
// TODO: IF connect fails then don't start messaging
this.socket = new Socket(this.address.addressFamily(), SocketType.DGRAM, ProtocolType.UDP);
// this.socket.blocking(true);
this.socket.connect(address);
// Set status to running // Set status to running
this.running = true; this.running = true;
// Start the messaging layer // Start the messaging layer
this.messaging.start(); this.messaging.begin();
} }
/** /**
@ -121,11 +107,8 @@ public class CoapClient
// Set status to not running // Set status to not running
this.running = false; this.running = false;
// Shutdown the socket (stopping the messaging layer) // Shutdown the messaging layer
this.socket.shutdown(SocketShutdown.BOTH); this.messaging.close();
// Unbind (disallow incoming datagrams to source port (from device))
this.socket.close();
// TODO: We must wake up other sleeprs with an error // TODO: We must wake up other sleeprs with an error
// (somehow, pass it in, flag set) // (somehow, pass it in, flag set)
@ -229,7 +212,7 @@ public class CoapClient
private void transmitRequest(CoapRequest request) private void transmitRequest(CoapRequest request)
{ {
// Encode the request packet and send it // Encode the request packet and send it
this.socket.send(request.getRequestPacket().getBytes()); this.messaging.send(request.getRequestPacket());
// Now start ticking the timer // Now start ticking the timer
request.startTime(); request.startTime();

View File

@ -13,6 +13,8 @@ import std.stdio;
import std.socket : Socket, SocketSet; import std.socket : Socket, SocketSet;
import std.socket : Address; import std.socket : Address;
import std.socket : Socket, Address, SocketType, ProtocolType, getAddress, parseAddress, InternetAddress, SocketShutdown;
// TODO: Generalize this and then make // TODO: Generalize this and then make
// ... a UDP version of it // ... a UDP version of it
@ -30,6 +32,16 @@ class CoapMessagingLayer : Thread
*/ */
private CoapClient client; private CoapClient client;
/**
* Running status
*/
private bool running;
/**
* The datagram socket
*/
private Socket socket;
/** /**
* Constructs a new messaging layer instance * Constructs a new messaging layer instance
* associated with the provided client * associated with the provided client
@ -49,11 +61,51 @@ class CoapMessagingLayer : Thread
* *
* Returns: the endpoint address * Returns: the endpoint address
*/ */
protected final Address getEndpointAddress() protected final Address getEndpointAddress() // Final in Interface
{ {
return this.client.address; return this.client.address;
} }
public void begin() // Candidate for Interface
{
// TODO: Handle socket errors nicely?
// Set status to running
this.running = true;
// TODO: IF connect fails then don't start messaging
this.socket = new Socket(getEndpointAddress().addressFamily(), SocketType.DGRAM, ProtocolType.UDP);
// this.socket.blocking(true);
this.socket.connect(getEndpointAddress());
// Start the thread (TODO: In future hide threading)
this.start();
}
public void send(CoapPacket packet) // Candidate for Interface
{
// Encode the packet and send the bytes
ubyte[] encodedPacket = packet.getBytes();
this.socket.send(encodedPacket);
}
public void close() // Candidate for Interface
{
// Set status to not running
this.running = false;
// Shutdown the socket (stopping the messaging layer)
this.socket.shutdown(SocketShutdown.BOTH);
// Unbind (disallow incoming datagrams to source port (from device))
this.socket.close();
// Wait till the reading-loop thread exits
this.join();
}
/** /**
* Reading loop which reads datagrams * Reading loop which reads datagrams
* from the socket * from the socket
@ -101,13 +153,13 @@ class CoapMessagingLayer : Thread
SocketFlags flags = cast(SocketFlags)(SocketFlags.PEEK | MSG_TRUNC); SocketFlags flags = cast(SocketFlags)(SocketFlags.PEEK | MSG_TRUNC);
byte[] data; byte[] data;
data.length = 1; // At least one else never does underlying recv() data.length = 1; // At least one else never does underlying recv()
ptrdiff_t dgramSize = client.socket.receive(data, flags); ptrdiff_t dgramSize = this.socket.receive(data, flags);
// If we have received something then dequeue it of the peeked length // If we have received something then dequeue it of the peeked length
if(dgramSize > 0) if(dgramSize > 0)
{ {
data.length = dgramSize; data.length = dgramSize;
client.socket.receive(data); this.socket.receive(data);
writeln("received size: ", dgramSize); writeln("received size: ", dgramSize);
writeln("received bytes: ", data); writeln("received bytes: ", data);