Merge pull request #3 from deavmi/feature/cancellable_future

Cancellable future's
This commit is contained in:
Tristan B. Velloza Kildaire 2023-09-25 13:45:17 +02:00 committed by GitHub
commit 718b644535
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 76 additions and 5 deletions

View File

@ -111,8 +111,13 @@ public class CoapClient
// Shutdown the messaging layer
this.messaging.close();
// TODO: We must wake up other sleeprs with an error
// (somehow, pass it in, flag set)
// Cancel all active request futures
this.requestsLock.lock();
foreach(CoapRequest curReq; outgoingRequests)
{
curReq.future.cancel();
}
this.requestsLock.unlock();
}
/**

View File

@ -1,4 +1,4 @@
module doap.client;
public import doap.client.client : CoapClient;
public import doap.client.request : CoapRequestBuilder, CoapRequestFuture;
public import doap.client.request : CoapRequestBuilder, CoapRequestFuture, RequestState;

View File

@ -257,6 +257,29 @@ package class CoapRequestBuilder
import core.sync.mutex : Mutex;
import core.sync.condition : Condition;
/**
* The state of a `CoapRequestFuture`
*/
public enum RequestState
{
/**
* The future has been created
*/
CREATED,
/**
* The future has completed
* successfully
*/
COMPLETED,
/**
* The future was cancelled
*/
CANCELLED
}
/**
* This is returned to the user when he
* does a finalizing call on a `CoapRequestBuilder`
@ -277,7 +300,12 @@ public class CoapRequestFuture
* This is filled in by the
* messaging layer.
*/
private CoapPacket response;
private CoapPacket response; // TODO: Volatility?
/**
* State of the future
*/
private RequestState state; // TODO: Volatility?
/**
* Mutex (for the condition to use)
@ -298,6 +326,7 @@ public class CoapRequestFuture
*/
package this()
{
this.state = RequestState.CREATED;
this.mutex = new Mutex();
this.condition = new Condition(mutex);
}
@ -316,6 +345,23 @@ public class CoapRequestFuture
// Set the received response
this.response = response;
// Set completion state
this.state = RequestState.COMPLETED;
// Wake up the sleepers
this.condition.notifyAll();
}
/**
* Cancels this future such that
* all calls to `get()` will
* unblock and throw an exception
*/
package void cancel()
{
// Set cancelled state
this.state = RequestState.CANCELLED;
// Wake up the sleepers
this.condition.notifyAll();
}
@ -324,6 +370,8 @@ public class CoapRequestFuture
* Blocks until the response is received
*
* Returns: the response as a `CoapPacket`
* Throws:
* CoapException on cancelled request
*/
public CoapPacket get()
{
@ -337,7 +385,25 @@ public class CoapRequestFuture
// Upon waking up release lock
this.mutex.unlock();
return this.response;
// If successfully completed
if(this.state == RequestState.COMPLETED)
{
return this.response;
}
// On error
else
{
throw new CoapException("Request future cancelled");
}
}
/**
* Returns the state of this future
*
* Returns: the state
*/
public RequestState getState()
{
return this.state;
}
}