blob: 4b803d5e4741f2ce35022c377167d0d0d8725854 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* {@link HttpSender} abstracts the algorithm to send HTTP requests, so that subclasses only implement
* the transport-specific code to send requests over the wire, implementing
* {@link #sendHeaders(HttpExchange, HttpContent, Callback)} and
* {@link #sendContent(HttpExchange, HttpContent, Callback)}.
* <p>
* {@link HttpSender} governs two state machines.
* <p>
* The request state machine is updated by {@link HttpSender} as the various steps of sending a request
* are executed, see <code>RequestState</code>.
* At any point in time, a user thread may abort the request, which may (if the request has not been
* completely sent yet) move the request state machine to <code>RequestState#FAILURE</code>.
* The request state machine guarantees that the request steps are executed (by I/O threads) only if
* the request has not been failed already.
* <p>
* The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications
* (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, Throwable)})
* and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}).
* This state machine must guarantee that the request sending is never executed concurrently: only one of
* those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}.
*
* @see HttpReceiver
*/
public abstract class HttpSender implements AsyncContentProvider.Listener
{
protected static final Logger LOG = Log.getLogger(HttpSender.class);
private final AtomicReference<RequestState> requestState = new AtomicReference<>(RequestState.QUEUED);
private final AtomicReference<SenderState> senderState = new AtomicReference<>(SenderState.IDLE);
private final Callback commitCallback = new CommitCallback();
private final IteratingCallback contentCallback = new ContentCallback();
private final Callback lastCallback = new LastContentCallback();
private final HttpChannel channel;
private HttpContent content;
private Throwable failure;
protected HttpSender(HttpChannel channel)
{
this.channel = channel;
}
protected HttpChannel getHttpChannel()
{
return channel;
}
protected HttpExchange getHttpExchange()
{
return channel.getHttpExchange();
}
@Override
public void onContent()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
while (true)
{
SenderState current = senderState.get();
switch (current)
{
case IDLE:
{
SenderState newSenderState = SenderState.SENDING;
if (updateSenderState(current, newSenderState))
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
contentCallback.iterate();
return;
}
break;
}
case SENDING:
{
SenderState newSenderState = SenderState.SENDING_WITH_CONTENT;
if (updateSenderState(current, newSenderState))
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
return;
}
break;
}
case EXPECTING:
{
SenderState newSenderState = SenderState.EXPECTING_WITH_CONTENT;
if (updateSenderState(current, newSenderState))
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
return;
}
break;
}
case PROCEEDING:
{
SenderState newSenderState = SenderState.PROCEEDING_WITH_CONTENT;
if (updateSenderState(current, newSenderState))
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
return;
}
break;
}
case SENDING_WITH_CONTENT:
case EXPECTING_WITH_CONTENT:
case PROCEEDING_WITH_CONTENT:
case WAITING:
case COMPLETED:
case FAILED:
{
if (LOG.isDebugEnabled())
LOG.debug("Deferred content available, {}", current);
return;
}
default:
{
illegalSenderState(current);
return;
}
}
}
}
public void send(HttpExchange exchange)
{
if (!queuedToBegin(exchange))
return;
Request request = exchange.getRequest();
ContentProvider contentProvider = request.getContent();
HttpContent content = this.content = new HttpContent(contentProvider);
SenderState newSenderState = SenderState.SENDING;
if (expects100Continue(request))
newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
out: while (true)
{
SenderState current = senderState.get();
switch (current)
{
case IDLE:
case COMPLETED:
{
if (updateSenderState(current, newSenderState))
break out;
break;
}
default:
{
illegalSenderState(current);
return;
}
}
}
// Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the sender state has been updated
if (contentProvider instanceof AsyncContentProvider)
((AsyncContentProvider)contentProvider).setListener(this);
if (!beginToHeaders(exchange))
return;
sendHeaders(exchange, content, commitCallback);
}
protected boolean expects100Continue(Request request)
{
return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
}
protected boolean queuedToBegin(HttpExchange exchange)
{
if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request begin {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyBegin(request);
if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
return true;
terminateRequest(exchange);
return false;
}
protected boolean beginToHeaders(HttpExchange exchange)
{
if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim());
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyHeaders(request);
if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
return true;
terminateRequest(exchange);
return false;
}
protected boolean headersToCommit(HttpExchange exchange)
{
if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request committed {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyCommit(request);
if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
return true;
terminateRequest(exchange);
return false;
}
protected boolean someToContent(HttpExchange exchange, ByteBuffer content)
{
RequestState current = requestState.get();
switch (current)
{
case COMMIT:
case CONTENT:
{
if (!updateRequestState(current, RequestState.TRANSIENT))
return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content));
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyContent(request, content);
if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT))
return true;
terminateRequest(exchange);
return false;
}
default:
{
return false;
}
}
}
protected boolean someToSuccess(HttpExchange exchange)
{
RequestState current = requestState.get();
switch (current)
{
case COMMIT:
case CONTENT:
{
// Mark atomically the request as completed, with respect
// to concurrency between request success and request failure.
if (!exchange.requestComplete(null))
return false;
requestState.set(RequestState.QUEUED);
// Reset to be ready for another request.
reset();
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request success {}", request);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifySuccess(exchange.getRequest());
// Mark atomically the request as terminated, with
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, null, result);
return true;
}
default:
{
return false;
}
}
}
protected boolean anyToFailure(Throwable failure)
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return false;
// Mark atomically the request as completed, with respect
// to concurrency between request success and request failure.
if (exchange.requestComplete(failure))
return abort(exchange, failure);
return false;
}
private void terminateRequest(HttpExchange exchange)
{
// In abort(), the state is updated before the failure is recorded
// to avoid to overwrite it, so here we may read a null failure.
Throwable failure = this.failure;
if (failure == null)
failure = new HttpRequestException("Concurrent failure", exchange.getRequest());
Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result);
}
private void terminateRequest(HttpExchange exchange, Throwable failure, Result result)
{
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Terminating request {}", request);
if (result == null)
{
if (failure != null)
{
if (exchange.responseComplete(failure))
{
if (LOG.isDebugEnabled())
LOG.debug("Response failure from request {} {}", request, exchange);
getHttpChannel().abortResponse(exchange, failure);
}
}
}
else
{
HttpDestination destination = getHttpChannel().getHttpDestination();
boolean ordered = destination.getHttpClient().isStrictEventOrdering();
if (!ordered)
channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered)
channel.exchangeTerminated(exchange, result);
}
}
/**
* Implementations should send the HTTP headers over the wire, possibly with some content,
* in a single write, and notify the given {@code callback} of the result of this operation.
* <p>
* If there is more content to send, then {@link #sendContent(HttpExchange, HttpContent, Callback)}
* will be invoked.
*
* @param exchange the exchange to send
* @param content the content to send
* @param callback the callback to notify
*/
protected abstract void sendHeaders(HttpExchange exchange, HttpContent content, Callback callback);
/**
* Implementations should send the content at the {@link HttpContent} cursor position over the wire.
* <p>
* The {@link HttpContent} cursor is advanced by {@link HttpSender} at the right time, and if more
* content needs to be sent, this method is invoked again; subclasses need only to send the content
* at the {@link HttpContent} cursor position.
* <p>
* This method is invoked one last time when {@link HttpContent#isConsumed()} is true and therefore
* there is no actual content to send.
* This is done to allow subclasses to write "terminal" bytes (such as the terminal chunk when the
* transfer encoding is chunked) if their protocol needs to.
*
* @param exchange the exchange to send
* @param content the content to send
* @param callback the callback to notify
*/
protected abstract void sendContent(HttpExchange exchange, HttpContent content, Callback callback);
protected void reset()
{
HttpContent content = this.content;
this.content = null;
content.close();
senderState.set(SenderState.COMPLETED);
}
protected void dispose()
{
HttpContent content = this.content;
this.content = null;
if (content != null)
content.close();
senderState.set(SenderState.FAILED);
}
public void proceed(HttpExchange exchange, Throwable failure)
{
if (!expects100Continue(exchange.getRequest()))
return;
if (failure != null)
{
anyToFailure(failure);
return;
}
while (true)
{
SenderState current = senderState.get();
switch (current)
{
case EXPECTING:
{
// We are still sending the headers, but we already got the 100 Continue.
if (updateSenderState(current, SenderState.PROCEEDING))
{
if (LOG.isDebugEnabled())
LOG.debug("Proceeding while expecting");
return;
}
break;
}
case EXPECTING_WITH_CONTENT:
{
// More deferred content was submitted to onContent(), we already
// got the 100 Continue, but we may be still sending the headers
// (for example, with SSL we may have sent the encrypted data,
// received the 100 Continue but not yet updated the decrypted
// WriteFlusher so sending more content now may result in a
// WritePendingException).
if (updateSenderState(current, SenderState.PROCEEDING_WITH_CONTENT))
{
if (LOG.isDebugEnabled())
LOG.debug("Proceeding while scheduled");
return;
}
break;
}
case WAITING:
{
// We received the 100 Continue, now send the content if any.
if (updateSenderState(current, SenderState.SENDING))
{
if (LOG.isDebugEnabled())
LOG.debug("Proceeding while waiting");
contentCallback.iterate();
return;
}
break;
}
case FAILED:
{
return;
}
default:
{
illegalSenderState(current);
return;
}
}
}
}
public boolean abort(HttpExchange exchange, Throwable failure)
{
// Update the state to avoid more request processing.
boolean terminate;
out: while (true)
{
RequestState current = requestState.get();
switch (current)
{
case FAILURE:
{
return false;
}
default:
{
if (updateRequestState(current, RequestState.FAILURE))
{
terminate = current != RequestState.TRANSIENT;
break out;
}
break;
}
}
}
this.failure = failure;
dispose();
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
if (terminate)
{
// Mark atomically the request as terminated, with
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
}
return true;
}
private boolean updateRequestState(RequestState from, RequestState to)
{
boolean updated = requestState.compareAndSet(from, to);
if (!updated && LOG.isDebugEnabled())
LOG.debug("RequestState update failed: {} -> {}: {}", from, to, requestState.get());
return updated;
}
private boolean updateSenderState(SenderState from, SenderState to)
{
boolean updated = senderState.compareAndSet(from, to);
if (!updated && LOG.isDebugEnabled())
LOG.debug("SenderState update failed: {} -> {}: {}", from, to, senderState.get());
return updated;
}
private void illegalSenderState(SenderState current)
{
anyToFailure(new IllegalStateException("Expected " + current + " found " + senderState.get() + " instead"));
}
@Override
public String toString()
{
return String.format("%s@%x(req=%s,snd=%s,failure=%s)",
getClass().getSimpleName(),
hashCode(),
requestState,
senderState,
failure);
}
/**
* The request states {@link HttpSender} goes through when sending a request.
*/
private enum RequestState
{
/**
* One of the state transition methods is being executed.
*/
TRANSIENT,
/**
* The request is queued, the initial state
*/
QUEUED,
/**
* The request has been dequeued
*/
BEGIN,
/**
* The request headers (and possibly some content) is about to be sent
*/
HEADERS,
/**
* The request headers (and possibly some content) have been sent
*/
COMMIT,
/**
* The request content is being sent
*/
CONTENT,
/**
* The request is failed
*/
FAILURE
}
/**
* The sender states {@link HttpSender} goes through when sending a request.
*/
private enum SenderState
{
/**
* {@link HttpSender} is not sending request headers nor request content
*/
IDLE,
/**
* {@link HttpSender} is sending the request header or request content
*/
SENDING,
/**
* {@link HttpSender} is currently sending the request, and deferred content is available to be sent
*/
SENDING_WITH_CONTENT,
/**
* {@link HttpSender} is sending the headers but will wait for 100 Continue before sending the content
*/
EXPECTING,
/**
* {@link HttpSender} is currently sending the headers, will wait for 100 Continue, and deferred content is available to be sent
*/
EXPECTING_WITH_CONTENT,
/**
* {@link HttpSender} has sent the headers and is waiting for 100 Continue
*/
WAITING,
/**
* {@link HttpSender} is sending the headers, while 100 Continue has arrived
*/
PROCEEDING,
/**
* {@link HttpSender} is sending the headers, while 100 Continue has arrived, and deferred content is available to be sent
*/
PROCEEDING_WITH_CONTENT,
/**
* {@link HttpSender} has finished to send the request
*/
COMPLETED,
/**
* {@link HttpSender} has failed to send the request
*/
FAILED
}
private class CommitCallback implements Callback
{
@Override
public boolean isNonBlocking()
{
return content.isNonBlocking();
}
@Override
public void succeeded()
{
try
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
process();
}
catch (Throwable x)
{
anyToFailure(x);
}
}
@Override
public void failed(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}
private void process() throws Exception
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
if (!headersToCommit(exchange))
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
if (!content.hasContent())
{
// No content to send, we are done.
someToSuccess(exchange);
}
else
{
// Was any content sent while committing ?
ByteBuffer contentBuffer = content.getContent();
if (contentBuffer != null)
{
if (!someToContent(exchange, contentBuffer))
return;
}
while (true)
{
SenderState current = senderState.get();
switch (current)
{
case SENDING:
{
contentCallback.iterate();
return;
}
case SENDING_WITH_CONTENT:
{
// We have deferred content to send.
updateSenderState(current, SenderState.SENDING);
break;
}
case EXPECTING:
{
// We sent the headers, wait for the 100 Continue response.
if (updateSenderState(current, SenderState.WAITING))
return;
break;
}
case EXPECTING_WITH_CONTENT:
{
// We sent the headers, we have deferred content to send,
// wait for the 100 Continue response.
if (updateSenderState(current, SenderState.WAITING))
return;
break;
}
case PROCEEDING:
{
// We sent the headers, we have the 100 Continue response,
// we have no content to send.
if (updateSenderState(current, SenderState.IDLE))
return;
break;
}
case PROCEEDING_WITH_CONTENT:
{
// We sent the headers, we have the 100 Continue response,
// we have deferred content to send.
updateSenderState(current, SenderState.SENDING);
break;
}
case FAILED:
{
return;
}
default:
{
illegalSenderState(current);
return;
}
}
}
}
}
}
private class ContentCallback extends IteratingCallback
{
@Override
protected Action process() throws Exception
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return Action.IDLE;
HttpContent content = HttpSender.this.content;
if (content == null)
return Action.IDLE;
while (true)
{
boolean advanced = content.advance();
boolean lastContent = content.isLast();
if (LOG.isDebugEnabled())
LOG.debug("Content present {}, last {}, consumed {} for {}", advanced, lastContent, content.isConsumed(), exchange.getRequest());
if (advanced)
{
sendContent(exchange, content, this);
return Action.SCHEDULED;
}
if (lastContent)
{
sendContent(exchange, content, lastCallback);
return Action.IDLE;
}
SenderState current = senderState.get();
switch (current)
{
case SENDING:
{
if (updateSenderState(current, SenderState.IDLE))
{
if (LOG.isDebugEnabled())
LOG.debug("Content is deferred for {}", exchange.getRequest());
return Action.IDLE;
}
break;
}
case SENDING_WITH_CONTENT:
{
updateSenderState(current, SenderState.SENDING);
break;
}
default:
{
illegalSenderState(current);
return Action.IDLE;
}
}
}
}
@Override
public void succeeded()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
ByteBuffer buffer = content.getContent();
someToContent(exchange, buffer);
super.succeeded();
}
@Override
public void onCompleteFailure(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}
@Override
protected void onCompleteSuccess()
{
// Nothing to do, since we always return false from process().
// Termination is obtained via LastContentCallback.
}
}
private class LastContentCallback implements Callback
{
@Override
public boolean isNonBlocking()
{
return content.isNonBlocking();
}
@Override
public void succeeded()
{
HttpExchange exchange = getHttpExchange();
if (exchange == null)
return;
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.succeeded();
someToSuccess(exchange);
}
@Override
public void failed(Throwable failure)
{
HttpContent content = HttpSender.this.content;
if (content == null)
return;
content.failed(failure);
anyToFailure(failure);
}
}
}