CoapClient

- Added a `Condition` variable for the `requestsLock` mutex
- Added `transmitRequest(CoapRequest request)` which will take in the `CoapRequest`, yank the `CoapPacket` from it, encode it, send it over the underlying transport and then start the request's timer
- Added some code for a future "watcher" routine

CoapRequest

- Added a `StopWatch` timer which does not auto start
- Added `startTime()` which starts the timer
- Added `getAndReset()` which gets the elapsed time and resets it
This commit is contained in:
Tristan B. Velloza Kildaire 2023-09-22 16:19:24 +02:00
parent 2716fd3986
commit 31b5d13943
2 changed files with 90 additions and 2 deletions

View File

@ -5,7 +5,9 @@ import doap.client.messaging : CoapMessagingLayer;
import doap.protocol;
import doap.client.request : CoapRequestBuilder, CoapRequest, CoapRequestFuture;
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
import std.container.slist : SList;
import core.thread : dur, Duration, Thread;
/**
* A CoAP client
@ -43,6 +45,11 @@ public class CoapClient
*/
private Mutex requestsLock;
/**
* Condition variable for watcher signalling
*/
private Condition watcherSignal;
/**
* Creates a new CoAP client to the
* provided endpoint address
@ -56,6 +63,7 @@ public class CoapClient
this.messaging = new CoapMessagingLayer(this);
this.requestsLock = new Mutex();
this.watcherSignal = new Condition(this.requestsLock);
init();
}
@ -92,6 +100,8 @@ public class CoapClient
// Set status to running
this.running = true;
// Start the messaging layer
this.messaging.start();
}
@ -156,8 +166,8 @@ public class CoapClient
// Store the request
storeRequest(request);
// Send
this.socket.send(requestPacket.getBytes());
// Transmit the request
transmitRequest(request);
return future;
}
@ -205,6 +215,54 @@ public class CoapClient
return foundRequest;
}
/**
* Transmits the given request's associated
* packet to the underlying transport
*
* Params:
* request = the `CoapRequest` to put into
* flight
*/
private void transmitRequest(CoapRequest request)
{
// Encode the request packet and send it
this.socket.send(request.getRequestPacket().getBytes());
// Now start ticking the timer
request.startTime();
}
// private Duration sweepInterval;
private Duration retransmitTimeout;
private void watch()
{
while(true)
{
// TODO: Sleep on a
/**
* Acquire the requests lock so we
* can sleep on the condition
* (temporarily unlock mutex)
*/
// requestsLock.lock();
// watcherSignal.wait();
requestsLock.lock();
foreach(CoapRequest curReq; outgoingRequests)
{
if(curReq.getAndReset() >= retransmitTimeout)
{
// TODO: Retransmit
}
}
requestsLock.unlock();
Thread.sleep(retransmitTimeout);
}
}
}
/**

View File

@ -3,6 +3,8 @@ module doap.client.request;
import doap.client.client : CoapClient;
import doap.protocol;
import doap.exceptions;
import core.time : Duration;
import std.datetime.stopwatch : StopWatch, AutoStart;
/**
* Represents a request that has been made. This is
@ -32,6 +34,11 @@ package class CoapRequest
*/
package CoapRequestFuture future;
/**
* Stopwatch for counting elapsed time
*/
private StopWatch timer;
/**
* Constructs a new request
*
@ -44,6 +51,7 @@ package class CoapRequest
{
this.requestPacket = requestPacket;
this.future = future;
this.timer = StopWatch(AutoStart.no);
}
public CoapPacket getRequestPacket()
@ -55,6 +63,28 @@ package class CoapRequest
{
return this.requestPacket.getToken();
}
/**
* Starts the timer
*/
package void startTime()
{
timer.start();
}
/**
* Gets the current elapsed time and
* then resets it
*
* Returns: the elapsed time
*/
package Duration getAndReset()
{
// Get the value and restart timer
Duration elapsed = timer.peek();
timer.reset();
return elapsed;
}
}
/**