blob: 07976531aee6894ca0ca995655946cc5bf599e9d [file] [log] [blame]
/*
* Copyright (c) 2012, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.server;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.ws.rs.HttpMethod;
import javax.ws.rs.InternalServerErrorException;
import javax.ws.rs.NotFoundException;
import javax.ws.rs.ServiceUnavailableException;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ConnectionCallback;
import javax.ws.rs.container.TimeoutHandler;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import javax.inject.Provider;
import org.glassfish.jersey.internal.guava.Preconditions;
import org.glassfish.jersey.internal.inject.InjectionManager;
import org.glassfish.jersey.internal.inject.Injections;
import org.glassfish.jersey.internal.inject.Providers;
import org.glassfish.jersey.internal.util.Closure;
import org.glassfish.jersey.internal.util.Producer;
import org.glassfish.jersey.internal.util.PropertiesHelper;
import org.glassfish.jersey.internal.util.collection.Ref;
import org.glassfish.jersey.internal.util.collection.Refs;
import org.glassfish.jersey.internal.util.collection.Value;
import org.glassfish.jersey.message.internal.HeaderValueException;
import org.glassfish.jersey.message.internal.MessageBodyProviderNotFoundException;
import org.glassfish.jersey.message.internal.OutboundJaxrsResponse;
import org.glassfish.jersey.message.internal.OutboundMessageContext;
import org.glassfish.jersey.message.internal.TracingLogger;
import org.glassfish.jersey.process.internal.RequestContext;
import org.glassfish.jersey.process.internal.RequestScope;
import org.glassfish.jersey.process.internal.Stage;
import org.glassfish.jersey.process.internal.Stages;
import org.glassfish.jersey.server.internal.LocalizationMessages;
import org.glassfish.jersey.server.internal.ProcessingProviders;
import org.glassfish.jersey.server.internal.ServerTraceEvent;
import org.glassfish.jersey.server.internal.monitoring.EmptyRequestEventBuilder;
import org.glassfish.jersey.server.internal.monitoring.RequestEventBuilder;
import org.glassfish.jersey.server.internal.monitoring.RequestEventImpl;
import org.glassfish.jersey.server.internal.process.Endpoint;
import org.glassfish.jersey.server.internal.process.MappableException;
import org.glassfish.jersey.server.internal.process.RequestProcessingContext;
import org.glassfish.jersey.server.internal.routing.UriRoutingContext;
import org.glassfish.jersey.server.monitoring.ApplicationEventListener;
import org.glassfish.jersey.server.monitoring.RequestEvent;
import org.glassfish.jersey.server.monitoring.RequestEventListener;
import org.glassfish.jersey.server.spi.ContainerResponseWriter;
import org.glassfish.jersey.server.spi.ExternalRequestContext;
import org.glassfish.jersey.server.spi.ExternalRequestScope;
import org.glassfish.jersey.server.spi.ResponseErrorMapper;
import org.glassfish.jersey.spi.ExceptionMappers;
import static org.glassfish.jersey.server.AsyncContext.State.COMPLETED;
import static org.glassfish.jersey.server.AsyncContext.State.RESUMED;
import static org.glassfish.jersey.server.AsyncContext.State.RUNNING;
import static org.glassfish.jersey.server.AsyncContext.State.SUSPENDED;
/**
* Server-side request processing runtime.
*
* @author Marek Potociar
*/
public class ServerRuntime {
private final Stage<RequestProcessingContext> requestProcessingRoot;
private final ProcessingProviders processingProviders;
private final InjectionManager injectionManager;
private final ScheduledExecutorService backgroundScheduler;
private final Provider<ExecutorService> managedAsyncExecutor;
private final RequestScope requestScope;
private final ExceptionMappers exceptionMappers;
private final ApplicationEventListener applicationEventListener;
private final Configuration configuration;
private final ExternalRequestScope externalRequestScope;
private final TracingConfig tracingConfig;
private final TracingLogger.Level tracingThreshold;
private final boolean processResponseErrors;
/** Do not resolve relative URIs in the {@code Location} header */
private final boolean disableLocationHeaderRelativeUriResolution;
/** Resolve relative URIs according to RFC7231 (not JAX-RS 2.0 compliant */
private final boolean rfc7231LocationHeaderRelativeUriResolution;
static ServerRuntime createServerRuntime(
InjectionManager injectionManager,
ServerBootstrapBag bootstrapBag,
Stage<RequestProcessingContext> processingRoot,
ApplicationEventListener eventListener,
ProcessingProviders processingProviders) {
ScheduledExecutorService scheduledExecutorServiceSupplier =
injectionManager.getInstance(ScheduledExecutorService.class, BackgroundSchedulerLiteral.INSTANCE);
Provider<ExecutorService> asyncExecutorServiceSupplier =
() -> injectionManager.getInstance(ExecutorService.class, ManagedAsyncExecutorLiteral.INSTANCE);
return new ServerRuntime(
processingRoot,
processingProviders,
injectionManager,
scheduledExecutorServiceSupplier,
asyncExecutorServiceSupplier,
bootstrapBag.getRequestScope(),
bootstrapBag.getExceptionMappers(),
eventListener,
injectionManager.getInstance(ExternalRequestScope.class),
bootstrapBag.getConfiguration());
}
private ServerRuntime(final Stage<RequestProcessingContext> requestProcessingRoot,
final ProcessingProviders processingProviders,
final InjectionManager injectionManager,
final ScheduledExecutorService backgroundScheduler,
final Provider<ExecutorService> managedAsyncExecutorProvider,
final RequestScope requestScope,
final ExceptionMappers exceptionMappers,
final ApplicationEventListener applicationEventListener,
final ExternalRequestScope externalScope,
final Configuration configuration) {
this.requestProcessingRoot = requestProcessingRoot;
this.processingProviders = processingProviders;
this.injectionManager = injectionManager;
this.backgroundScheduler = backgroundScheduler;
this.managedAsyncExecutor = managedAsyncExecutorProvider;
this.requestScope = requestScope;
this.exceptionMappers = exceptionMappers;
this.applicationEventListener = applicationEventListener;
this.externalRequestScope = externalScope;
this.configuration = configuration;
this.tracingConfig = TracingUtils.getTracingConfig(configuration);
this.tracingThreshold = TracingUtils.getTracingThreshold(configuration);
this.processResponseErrors = PropertiesHelper.isProperty(
configuration.getProperty(ServerProperties.PROCESSING_RESPONSE_ERRORS_ENABLED));
this.disableLocationHeaderRelativeUriResolution = ServerProperties.getValue(configuration.getProperties(),
ServerProperties.LOCATION_HEADER_RELATIVE_URI_RESOLUTION_DISABLED,
Boolean.FALSE, Boolean.class);
this.rfc7231LocationHeaderRelativeUriResolution = ServerProperties.getValue(configuration.getProperties(),
ServerProperties.LOCATION_HEADER_RELATIVE_URI_RESOLUTION_RFC7231,
Boolean.FALSE, Boolean.class);
}
/**
* Process a container request.
*
* @param request container request to be processed.
*/
public void process(final ContainerRequest request) {
TracingUtils.initTracingSupport(tracingConfig, tracingThreshold, request);
TracingUtils.logStart(request);
final UriRoutingContext routingContext = request.getUriRoutingContext();
RequestEventBuilder monitoringEventBuilder = EmptyRequestEventBuilder.INSTANCE;
RequestEventListener monitoringEventListener = null;
if (applicationEventListener != null) {
monitoringEventBuilder = new RequestEventImpl.Builder()
.setContainerRequest(request)
.setExtendedUriInfo(routingContext);
monitoringEventListener = applicationEventListener.onRequest(
monitoringEventBuilder.build(RequestEvent.Type.START));
}
request.setProcessingProviders(processingProviders);
final RequestProcessingContext context = new RequestProcessingContext(injectionManager,
request,
routingContext,
monitoringEventBuilder,
monitoringEventListener);
request.checkState();
final Responder responder = new Responder(context, ServerRuntime.this);
final RequestContext requestScopeInstance = requestScope.createContext();
final AsyncResponderHolder asyncResponderHolder =
new AsyncResponderHolder(responder, externalRequestScope,
requestScopeInstance, externalRequestScope.open(injectionManager));
context.initAsyncContext(asyncResponderHolder);
requestScope.runInScope(requestScopeInstance, new Runnable() {
@Override
public void run() {
try {
// set base URI into response builder thread-local variable
// for later resolving of relative location URIs
if (!disableLocationHeaderRelativeUriResolution) {
final URI uriToUse =
rfc7231LocationHeaderRelativeUriResolution ? request.getRequestUri() : request.getBaseUri();
OutboundJaxrsResponse.Builder.setBaseUri(uriToUse);
}
final Ref<Endpoint> endpointRef = Refs.emptyRef();
final RequestProcessingContext data = Stages.process(context, requestProcessingRoot, endpointRef);
final Endpoint endpoint = endpointRef.get();
if (endpoint == null) {
// not found
throw new NotFoundException();
}
final ContainerResponse response = endpoint.apply(data);
if (!asyncResponderHolder.isAsync()) {
responder.process(response);
} else {
externalRequestScope.suspend(asyncResponderHolder.externalContext, injectionManager);
}
} catch (final Throwable throwable) {
responder.process(throwable);
} finally {
asyncResponderHolder.release();
// clear base URI from the thread
OutboundJaxrsResponse.Builder.clearBaseUri();
}
}
});
}
/**
* Get the Jersey server runtime background scheduler.
*
* @return server runtime background scheduler.
* @see BackgroundScheduler
*/
ScheduledExecutorService getBackgroundScheduler() {
return backgroundScheduler;
}
/**
* Ensure that the value a {@value HttpHeaders#LOCATION} header is an absolute URI, if present among headers.
* <p/>
* Relative URI value will be made absolute using a base request URI.
*
* @param location location URI; value of the HTTP {@value HttpHeaders#LOCATION} response header.
* @param headers mutable map of response headers.
* @param request container request.
* @param incompatible if set to {@code true}, uri will be resolved against the request uri, not the base uri;
* this is correct against RFC7231, but does violate the JAX-RS 2.0 specs
*/
private static void ensureAbsolute(final URI location, final MultivaluedMap<String, Object> headers,
final ContainerRequest request, final boolean incompatible) {
if (location == null || location.isAbsolute()) {
return;
}
// according to RFC7231 (HTTP/1.1), this field can contain one single URI reference
final URI uri = incompatible ? request.getRequestUri() : request.getBaseUri();
headers.putSingle(HttpHeaders.LOCATION, uri.resolve(location));
}
private static class AsyncResponderHolder implements Value<AsyncContext> {
private final Responder responder;
private final ExternalRequestScope externalScope;
private final RequestContext requestContext;
private final ExternalRequestContext<?> externalContext;
private volatile AsyncResponder asyncResponder;
private AsyncResponderHolder(final Responder responder,
final ExternalRequestScope externalRequestScope,
final RequestContext requestContext,
final ExternalRequestContext<?> externalContext) {
this.responder = responder;
this.externalScope = externalRequestScope;
this.requestContext = requestContext;
this.externalContext = externalContext;
}
@Override
public AsyncContext get() {
final AsyncResponder ar = new AsyncResponder(responder, requestContext, externalScope, externalContext);
asyncResponder = ar;
return ar;
}
public boolean isAsync() {
final AsyncResponder ar = asyncResponder;
return ar != null && !ar.isRunning();
}
public void release() {
if (asyncResponder == null) {
requestContext.release();
}
}
}
private static class Responder {
private static final Logger LOGGER = Logger.getLogger(Responder.class.getName());
private final RequestProcessingContext processingContext;
private final ServerRuntime runtime;
private final CompletionCallbackRunner completionCallbackRunner = new CompletionCallbackRunner();
private final ConnectionCallbackRunner connectionCallbackRunner = new ConnectionCallbackRunner();
private final TracingLogger tracingLogger;
public Responder(final RequestProcessingContext processingContext, final ServerRuntime runtime) {
this.processingContext = processingContext;
this.runtime = runtime;
this.tracingLogger = TracingLogger.getInstance(processingContext.request());
}
public void process(ContainerResponse response) {
processingContext.monitoringEventBuilder().setContainerResponse(response);
response = processResponse(response);
release(response);
}
private ContainerResponse processResponse(ContainerResponse response) {
final Stage<ContainerResponse> respondingRoot = processingContext.createRespondingRoot();
if (respondingRoot != null) {
response = Stages.process(response, respondingRoot);
}
writeResponse(response);
// no-exception zone
// the methods below are guaranteed to not throw any exceptions
completionCallbackRunner.onComplete(null);
return response;
}
/**
* Process {@code throwable} by using exception mappers and generating the mapped
* response if possible.
* <p>
* Note about logging:
* <ul>
* <li>
* we do not log exceptions that are mapped by ExceptionMappers.
* </li><li>
* All other exceptions are logged: WebApplicationExceptions with entities,
* exceptions that were unsuccessfully mapped
* </li>
* </ul>
* </p>
*
* @param throwable Exception to be processed.
*/
public void process(final Throwable throwable) {
final ContainerRequest request = processingContext.request();
processingContext.monitoringEventBuilder().setException(throwable, RequestEvent.ExceptionCause.ORIGINAL);
processingContext.triggerEvent(RequestEvent.Type.ON_EXCEPTION);
ContainerResponse response = null;
try {
final Response exceptionResponse = mapException(throwable);
try {
try {
response = convertResponse(exceptionResponse);
if (!runtime.disableLocationHeaderRelativeUriResolution) {
ensureAbsolute(response.getLocation(), response.getHeaders(), request,
runtime.rfc7231LocationHeaderRelativeUriResolution);
}
processingContext.monitoringEventBuilder().setContainerResponse(response)
.setResponseSuccessfullyMapped(true);
} finally {
processingContext.triggerEvent(RequestEvent.Type.EXCEPTION_MAPPING_FINISHED);
}
processResponse(response);
} catch (final Throwable respError) {
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_PROCESSING_RESPONSE_FROM_ALREADY_MAPPED_EXCEPTION());
processingContext.monitoringEventBuilder()
.setException(respError, RequestEvent.ExceptionCause.MAPPED_RESPONSE);
processingContext.triggerEvent(RequestEvent.Type.ON_EXCEPTION);
throw respError;
}
} catch (final Throwable responseError) {
if (throwable != responseError
&& !(throwable instanceof MappableException && throwable.getCause() == responseError)) {
LOGGER.log(Level.FINE, LocalizationMessages.ERROR_EXCEPTION_MAPPING_ORIGINAL_EXCEPTION(), throwable);
}
if (!processResponseError(responseError)) {
// Pass the exception to the container.
LOGGER.log(Level.FINE, LocalizationMessages.ERROR_EXCEPTION_MAPPING_THROWN_TO_CONTAINER(), responseError);
try {
request.getResponseWriter().failure(responseError);
} finally {
completionCallbackRunner.onComplete(responseError);
}
}
} finally {
release(response);
}
}
/**
* If {@value org.glassfish.jersey.server.ServerProperties#PROCESSING_RESPONSE_ERRORS_ENABLED} is set to true then try to
* handle errors raised during response processing.
*
* @param responseError a throwable that occurred during response processing.
* @return {@code true} if the given response error has been processed, {@code false} otherwise.
*/
private boolean processResponseError(final Throwable responseError) {
boolean processed = false;
if (runtime.processResponseErrors) {
// Try to obtain response from response error mapper.
final Iterable<ResponseErrorMapper> mappers = Providers.getAllProviders(runtime.injectionManager,
ResponseErrorMapper.class);
try {
Response processedError = null;
for (final ResponseErrorMapper mapper : mappers) {
processedError = mapper.toResponse(responseError);
if (processedError != null) {
break;
}
}
if (processedError != null) {
processResponse(new ContainerResponse(processingContext.request(), processedError));
processed = true;
}
} catch (final Throwable throwable) {
LOGGER.log(Level.FINE, LocalizationMessages.ERROR_EXCEPTION_MAPPING_PROCESSED_RESPONSE_ERROR(), throwable);
}
}
return processed;
}
private ContainerResponse convertResponse(final Response exceptionResponse) {
final ContainerResponse containerResponse = new ContainerResponse(processingContext.request(), exceptionResponse);
containerResponse.setMappedFromException(true);
return containerResponse;
}
@SuppressWarnings("unchecked")
private Response mapException(final Throwable originalThrowable) throws Throwable {
LOGGER.log(Level.FINER, LocalizationMessages.EXCEPTION_MAPPING_START(), originalThrowable);
final ThrowableWrap wrap = new ThrowableWrap(originalThrowable);
wrap.tryMappableException();
do {
final Throwable throwable = wrap.getCurrent();
if (wrap.isInMappable() || throwable instanceof WebApplicationException) {
// in case ServerProperties.PROCESSING_RESPONSE_ERRORS_ENABLED is true, allow
// wrapped MessageBodyProviderNotFoundException to propagate
if (runtime.processResponseErrors && throwable instanceof InternalServerErrorException
&& throwable.getCause() instanceof MessageBodyProviderNotFoundException) {
throw throwable;
}
Response waeResponse = null;
if (throwable instanceof WebApplicationException) {
final WebApplicationException webApplicationException = (WebApplicationException) throwable;
// set mapped throwable
processingContext.routingContext().setMappedThrowable(throwable);
waeResponse = webApplicationException.getResponse();
if (waeResponse.hasEntity()) {
LOGGER.log(Level.FINE, LocalizationMessages
.EXCEPTION_MAPPING_WAE_ENTITY(waeResponse.getStatus()), throwable);
return waeResponse;
}
}
final long timestamp = tracingLogger.timestamp(ServerTraceEvent.EXCEPTION_MAPPING);
final ExceptionMapper mapper = runtime.exceptionMappers.findMapping(throwable);
if (mapper != null) {
processingContext.monitoringEventBuilder().setExceptionMapper(mapper);
processingContext.triggerEvent(RequestEvent.Type.EXCEPTION_MAPPER_FOUND);
try {
final Response mappedResponse = mapper.toResponse(throwable);
if (tracingLogger.isLogEnabled(ServerTraceEvent.EXCEPTION_MAPPING)) {
tracingLogger.logDuration(ServerTraceEvent.EXCEPTION_MAPPING,
timestamp, mapper, throwable, throwable.getLocalizedMessage(),
mappedResponse != null ? mappedResponse.getStatusInfo() : "-no-response-");
}
// set mapped throwable
processingContext.routingContext().setMappedThrowable(throwable);
if (mappedResponse != null) {
// response successfully mapped
if (LOGGER.isLoggable(Level.FINER)) {
final String message = String.format(
"Exception '%s' has been mapped by '%s' to response '%s' (%s:%s).",
throwable.getLocalizedMessage(),
mapper.getClass().getName(),
mappedResponse.getStatusInfo().getReasonPhrase(),
mappedResponse.getStatusInfo().getStatusCode(),
mappedResponse.getStatusInfo().getFamily());
LOGGER.log(Level.FINER, message);
}
return mappedResponse;
} else {
return Response.noContent().build();
}
} catch (final Throwable mapperThrowable) {
// spec: If the exception mapping provider throws an exception while creating a Response
// then return a server error (status code 500) response to the client.
LOGGER.log(Level.SEVERE, LocalizationMessages.EXCEPTION_MAPPER_THROWS_EXCEPTION(mapper.getClass()),
mapperThrowable);
LOGGER.log(Level.SEVERE, LocalizationMessages.EXCEPTION_MAPPER_FAILED_FOR_EXCEPTION(), throwable);
return Response.serverError().build();
}
}
if (waeResponse != null) {
LOGGER.log(Level.FINE, LocalizationMessages
.EXCEPTION_MAPPING_WAE_NO_ENTITY(waeResponse.getStatus()), throwable);
return waeResponse;
}
}
// internal mapping
if (throwable instanceof HeaderValueException) {
if (((HeaderValueException) throwable).getContext() == HeaderValueException.Context.INBOUND) {
return Response.status(Response.Status.BAD_REQUEST).build();
}
}
if (!wrap.isInMappable() || !wrap.isWrapped()) {
// user failures (thrown from Resource methods or provider methods)
// spec: Unchecked exceptions and errors that have not been mapped MUST be re-thrown and allowed to
// propagate to the underlying container.
// not logged on this level.
throw wrap.getWrappedOrCurrent();
}
} while (wrap.unwrap() != null);
// jersey failures (not thrown from Resource methods or provider methods) -> rethrow
throw originalThrowable;
}
private ContainerResponse writeResponse(final ContainerResponse response) {
final ContainerRequest request = processingContext.request();
final ContainerResponseWriter writer = request.getResponseWriter();
if (!runtime.disableLocationHeaderRelativeUriResolution) {
ServerRuntime.ensureAbsolute(response.getLocation(), response.getHeaders(), response.getRequestContext(),
runtime.rfc7231LocationHeaderRelativeUriResolution);
}
if (!response.hasEntity()) {
tracingLogger.log(ServerTraceEvent.FINISHED, response.getStatusInfo());
tracingLogger.flush(response.getHeaders());
writer.writeResponseStatusAndHeaders(0, response);
setWrittenResponse(response);
return response;
}
final Object entity = response.getEntity();
boolean skipFinally = false;
final boolean isHead = request.getMethod().equals(HttpMethod.HEAD);
try {
response.setStreamProvider(new OutboundMessageContext.StreamProvider() {
@Override
public OutputStream getOutputStream(final int contentLength) throws IOException {
if (!runtime.disableLocationHeaderRelativeUriResolution) {
ServerRuntime.ensureAbsolute(response.getLocation(), response.getHeaders(),
response.getRequestContext(), runtime.rfc7231LocationHeaderRelativeUriResolution);
}
final OutputStream outputStream = writer.writeResponseStatusAndHeaders(contentLength, response);
return isHead ? null : outputStream;
}
});
if ((writer.enableResponseBuffering() || isHead) && !response.isChunked()) {
response.enableBuffering(runtime.configuration);
}
try {
response.setEntityStream(request.getWorkers().writeTo(
entity,
entity.getClass(),
response.getEntityType(),
response.getEntityAnnotations(),
response.getMediaType(),
response.getHeaders(),
request.getPropertiesDelegate(),
response.getEntityStream(),
request.getWriterInterceptors()));
} catch (final MappableException mpe) {
if (mpe.getCause() instanceof IOException) {
connectionCallbackRunner.onDisconnect(processingContext.asyncContext());
}
throw mpe;
}
tracingLogger.log(ServerTraceEvent.FINISHED, response.getStatusInfo());
tracingLogger.flush(response.getHeaders());
setWrittenResponse(response);
} catch (final Throwable ex) {
if (response.isCommitted()) {
/**
* We're done with processing here. There's nothing we can do about the exception so
* let's just log it.
*/
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_WRITING_RESPONSE_ENTITY(), ex);
} else {
skipFinally = true;
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new MappableException(ex);
}
}
} finally {
if (!skipFinally) {
boolean close = !response.isChunked();
if (response.isChunked()) {
try {
response.commitStream();
} catch (final Exception e) {
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_COMMITTING_OUTPUT_STREAM(), e);
close = true;
}
final ChunkedOutput chunked = (ChunkedOutput) entity;
try {
chunked.setContext(
runtime.requestScope,
runtime.requestScope.referenceCurrent(),
request,
response,
connectionCallbackRunner);
} catch (final IOException ex) {
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_WRITING_RESPONSE_ENTITY_CHUNK(), ex);
close = true;
}
// suspend the writer indefinitely (passing null timeout handler is ok in such case) if the output is not
// already closed.
// TODO what to do if we detect that the writer has already been suspended? override the timeout value?
if (!chunked.isClosed()
&& !writer.suspend(AsyncResponder.NO_TIMEOUT, TimeUnit.SECONDS, null)) {
LOGGER.fine(LocalizationMessages.ERROR_SUSPENDING_CHUNKED_OUTPUT_RESPONSE());
}
}
if (close) {
try {
// the response must be closed here instead of just flushed or committed. Some
// output streams writes out bytes only on close (for example GZipOutputStream).
response.close();
} catch (final Exception e) {
LOGGER.log(Level.SEVERE, LocalizationMessages.ERROR_CLOSING_COMMIT_OUTPUT_STREAM(), e);
}
}
}
}
return response;
}
private void setWrittenResponse(final ContainerResponse response) {
processingContext.monitoringEventBuilder()
.setContainerResponse(response)
.setSuccess(response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode())
.setResponseWritten(true);
}
private void release(final ContainerResponse responseContext) {
try {
processingContext.closeableService().close();
// Commit the container response writer if not in chunked mode
// responseContext may be null in case the request processing was cancelled.
if (responseContext != null && !responseContext.isChunked()) {
// responseContext.commitStream();
responseContext.close();
}
} catch (final Throwable throwable) {
LOGGER.log(Level.WARNING, LocalizationMessages.RELEASING_REQUEST_PROCESSING_RESOURCES_FAILED(), throwable);
} finally {
runtime.externalRequestScope.close();
processingContext.triggerEvent(RequestEvent.Type.FINISHED);
}
}
}
private static class AsyncResponder implements AsyncContext, ContainerResponseWriter.TimeoutHandler, CompletionCallback {
private static final Logger LOGGER = Logger.getLogger(AsyncResponder.class.getName());
private static final TimeoutHandler DEFAULT_TIMEOUT_HANDLER = new TimeoutHandler() {
@Override
public void handleTimeout(final AsyncResponse asyncResponse) {
throw new ServiceUnavailableException();
}
};
private final Object stateLock = new Object();
private State state = RUNNING;
private boolean cancelled = false;
private final Responder responder;
// TODO this instance should be released once async invocation is finished.
private final RequestContext requestContext;
private final ExternalRequestContext<?> foreignScopeInstance;
private final ExternalRequestScope requestScopeListener;
private volatile TimeoutHandler timeoutHandler = DEFAULT_TIMEOUT_HANDLER;
private final List<AbstractCallbackRunner<?>> callbackRunners;
public AsyncResponder(final Responder responder,
final RequestContext requestContext,
final ExternalRequestScope requestScopeListener,
final ExternalRequestContext<?> foreignScopeInstance) {
this.responder = responder;
this.requestContext = requestContext;
this.foreignScopeInstance = foreignScopeInstance;
this.requestScopeListener = requestScopeListener;
this.callbackRunners = Collections.unmodifiableList(Arrays.asList(
responder.completionCallbackRunner, responder.connectionCallbackRunner));
responder.completionCallbackRunner.register(this);
}
@Override
public void onTimeout(final ContainerResponseWriter responseWriter) {
final TimeoutHandler handler = timeoutHandler;
try {
synchronized (stateLock) {
if (state == SUSPENDED) {
handler.handleTimeout(this);
}
}
} catch (final Throwable throwable) {
resume(throwable);
}
}
@Override
public void onComplete(final Throwable throwable) {
synchronized (stateLock) {
state = COMPLETED;
}
}
@Override
public void invokeManaged(final Producer<Response> producer) {
responder.runtime.managedAsyncExecutor.get().submit(new Runnable() {
@Override
public void run() {
responder.runtime.requestScope.runInScope(requestContext, new Runnable() {
@Override
public void run() {
try {
requestScopeListener.resume(foreignScopeInstance, responder.runtime.injectionManager);
final Response response = producer.call();
if (response != null) {
resume(response);
}
} catch (final Throwable t) {
resume(t);
}
}
});
}
});
}
@Override
public boolean suspend() {
synchronized (stateLock) {
if (state == RUNNING) {
if (responder.processingContext.request().getResponseWriter().suspend(
AsyncResponse.NO_TIMEOUT, TimeUnit.SECONDS, this)) {
state = SUSPENDED;
return true;
}
}
}
return false;
}
@Override
public boolean resume(final Object response) {
return resume(new Runnable() {
@Override
public void run() {
try {
requestScopeListener.resume(foreignScopeInstance, responder.runtime.injectionManager);
final Response jaxrsResponse =
(response instanceof Response) ? (Response) response : Response.ok(response).build();
if (!responder.runtime.disableLocationHeaderRelativeUriResolution) {
ServerRuntime.ensureAbsolute(jaxrsResponse.getLocation(), jaxrsResponse.getHeaders(),
responder.processingContext.request(),
responder.runtime.rfc7231LocationHeaderRelativeUriResolution);
}
responder.process(new ContainerResponse(responder.processingContext.request(), jaxrsResponse));
} catch (final Throwable t) {
responder.process(t);
}
}
});
}
@Override
public boolean resume(final Throwable error) {
return resume(new Runnable() {
@Override
public void run() {
try {
requestScopeListener.resume(foreignScopeInstance, responder.runtime.injectionManager);
responder.process(new MappableException(error));
} catch (final Throwable error) {
// Ignore the exception - already resumed but may be rethrown by ContainerResponseWriter#failure.
}
}
});
}
private boolean resume(final Runnable handler) {
synchronized (stateLock) {
if (state != SUSPENDED) {
return false;
}
state = RESUMED;
}
try {
responder.runtime.requestScope.runInScope(requestContext, handler);
} finally {
requestContext.release();
}
return true;
}
@Override
public boolean cancel() {
return cancel(new Value<Response>() {
@Override
public Response get() {
return Response.status(Response.Status.SERVICE_UNAVAILABLE).build();
}
});
}
@Override
public boolean cancel(final int retryAfter) {
return cancel(new Value<Response>() {
@Override
public Response get() {
return Response
.status(Response.Status.SERVICE_UNAVAILABLE)
.header(HttpHeaders.RETRY_AFTER, retryAfter)
.build();
}
});
}
@Override
public boolean cancel(final Date retryAfter) {
return cancel(new Value<Response>() {
@Override
public Response get() {
return Response
.status(Response.Status.SERVICE_UNAVAILABLE)
.header(HttpHeaders.RETRY_AFTER, retryAfter)
.build();
}
});
}
private boolean cancel(final Value<Response> responseValue) {
synchronized (stateLock) {
if (cancelled) {
return true;
}
if (state != SUSPENDED) {
return false;
}
state = RESUMED;
cancelled = true;
}
responder.runtime.requestScope.runInScope(requestContext, new Runnable() {
@Override
public void run() {
try {
requestScopeListener.resume(foreignScopeInstance, responder.runtime.injectionManager);
final Response response = responseValue.get();
responder.process(new ContainerResponse(responder.processingContext.request(), response));
} catch (final Throwable t) {
responder.process(t);
}
}
});
return true;
}
public boolean isRunning() {
synchronized (stateLock) {
return state == RUNNING;
}
}
@Override
public boolean isSuspended() {
synchronized (stateLock) {
return state == SUSPENDED;
}
}
@Override
public boolean isCancelled() {
synchronized (stateLock) {
return cancelled;
}
}
@Override
public boolean isDone() {
synchronized (stateLock) {
return state == COMPLETED;
}
}
@Override
public boolean setTimeout(final long time, final TimeUnit unit) {
try {
responder.processingContext.request().getResponseWriter().setSuspendTimeout(time, unit);
return true;
} catch (final IllegalStateException ex) {
LOGGER.log(Level.FINER, "Unable to set timeout on the AsyncResponse.", ex);
return false;
}
}
@Override
public void setTimeoutHandler(final TimeoutHandler handler) {
timeoutHandler = handler;
}
@Override
public Collection<Class<?>> register(final Class<?> callback) {
Preconditions.checkNotNull(callback, LocalizationMessages.PARAM_NULL("callback"));
return register(Injections.getOrCreate(responder.runtime.injectionManager, callback));
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(final Class<?> callback, final Class<?>... callbacks) {
Preconditions.checkNotNull(callback, LocalizationMessages.PARAM_NULL("callback"));
Preconditions.checkNotNull(callbacks, LocalizationMessages.CALLBACK_ARRAY_NULL());
for (final Class<?> additionalCallback : callbacks) {
Preconditions.checkNotNull(additionalCallback, LocalizationMessages.CALLBACK_ARRAY_ELEMENT_NULL());
}
final Map<Class<?>, Collection<Class<?>>> results = new HashMap<>();
results.put(callback, register(callback));
for (final Class<?> c : callbacks) {
results.put(c, register(c));
}
return results;
}
@Override
public Collection<Class<?>> register(final Object callback) {
Preconditions.checkNotNull(callback, LocalizationMessages.PARAM_NULL("callback"));
final Collection<Class<?>> result = new LinkedList<>();
for (final AbstractCallbackRunner<?> runner : callbackRunners) {
if (runner.supports(callback.getClass())) {
if (runner.register(callback)) {
result.add(runner.getCallbackContract());
}
}
}
return result;
}
@Override
public Map<Class<?>, Collection<Class<?>>> register(final Object callback, final Object... callbacks) {
Preconditions.checkNotNull(callback, LocalizationMessages.PARAM_NULL("callback"));
Preconditions.checkNotNull(callbacks, LocalizationMessages.CALLBACK_ARRAY_NULL());
for (final Object additionalCallback : callbacks) {
Preconditions.checkNotNull(additionalCallback, LocalizationMessages.CALLBACK_ARRAY_ELEMENT_NULL());
}
final Map<Class<?>, Collection<Class<?>>> results = new HashMap<>();
results.put(callback.getClass(), register(callback));
for (final Object c : callbacks) {
results.put(c.getClass(), register(c));
}
return results;
}
}
/**
* Abstract composite callback runner.
* <p/>
* The runner supports registering multiple callbacks of a specific type and the execute the callback method
* on all the registered callbacks.
*
* @param <T> callback type
*/
abstract static class AbstractCallbackRunner<T> {
private final Queue<T> callbacks = new ConcurrentLinkedQueue<>();
private final Logger logger;
/**
* Create new callback runner.
*
* @param logger logger instance to be used by the runner to fire logging events.
*/
protected AbstractCallbackRunner(final Logger logger) {
this.logger = logger;
}
/**
* Return true if this callback runner supports the {@code callbackClass}.
*
* @param callbackClass Callback to be checked.
* @return True if this callback runner supports the {@code callbackClass}; false otherwise.
*/
public final boolean supports(final Class<?> callbackClass) {
return getCallbackContract().isAssignableFrom(callbackClass);
}
/**
* Get the callback contract supported by this callback runner.
*
* @return callback contract supported by this callback runner.
*/
public abstract Class<?> getCallbackContract();
/**
* Register new callback instance.
*
* @param callback new callback instance to be registered.
* @return {@code true} upon successful registration, {@code false} otherwise.
*/
@SuppressWarnings("unchecked")
public boolean register(final Object callback) {
return callbacks.offer((T) callback);
}
/**
* Execute all registered callbacks using the supplied invoker.
*
* @param invoker invoker responsible for to executing all registered callbacks.
*/
protected final void executeCallbacks(final Closure<T> invoker) {
for (final T callback : callbacks) {
try {
invoker.invoke(callback);
} catch (final Throwable t) {
logger.log(Level.WARNING, LocalizationMessages.ERROR_ASYNC_CALLBACK_FAILED(callback.getClass().getName()), t);
}
}
}
}
private static class CompletionCallbackRunner
extends AbstractCallbackRunner<CompletionCallback> implements CompletionCallback {
private static final Logger LOGGER = Logger.getLogger(CompletionCallbackRunner.class.getName());
private CompletionCallbackRunner() {
super(LOGGER);
}
@Override
public Class<?> getCallbackContract() {
return CompletionCallback.class;
}
@Override
public void onComplete(final Throwable throwable) {
executeCallbacks(new Closure<CompletionCallback>() {
@Override
public void invoke(final CompletionCallback callback) {
callback.onComplete(throwable);
}
});
}
}
private static class ConnectionCallbackRunner
extends AbstractCallbackRunner<ConnectionCallback> implements ConnectionCallback {
private static final Logger LOGGER = Logger.getLogger(ConnectionCallbackRunner.class.getName());
private ConnectionCallbackRunner() {
super(LOGGER);
}
@Override
public Class<?> getCallbackContract() {
return ConnectionCallback.class;
}
@Override
public void onDisconnect(final AsyncResponse disconnected) {
executeCallbacks(new Closure<ConnectionCallback>() {
@Override
public void invoke(final ConnectionCallback callback) {
callback.onDisconnect(disconnected);
}
});
}
}
/**
* The structure that holds original {@link Throwable}, top most wrapped {@link Throwable} for the cases where the
* exception is to be tried to be mapped but is wrapped in a known wrapping {@link Throwable}, and the current unwrapped
* {@link Throwable}. For instance, the original is {@link MappableException}, the wrapped is {@link CompletionException},
* and the current is {@code IllegalStateException}.
*/
private static class ThrowableWrap {
private final Throwable original;
private Throwable wrapped = null;
private Throwable current;
private boolean inMappable = false;
private ThrowableWrap(Throwable original) {
this.original = original;
this.current = original;
}
/**
* Gets the original {@link Throwable} to be mapped to an {@link ExceptionMapper}.
* @return the original Throwable.
*/
private Throwable getOriginal() {
return original;
}
/**
* Some exceptions can be unwrapped. If an {@link ExceptionMapper} is not found for them, the original wrapping
* {@link Throwable} is to be returned. If the exception was not wrapped, return current.
* @return the wrapped or current {@link Throwable}.
*/
private Throwable getWrappedOrCurrent() {
return wrapped != null ? wrapped : current;
}
/**
* Get current unwrapped {@link Throwable}.
* @return current {@link Throwable}.
*/
private Throwable getCurrent() {
return current;
}
/**
* Check whether the current is a known wrapping exception.
* @return true if the current is a known wrapping exception.
*/
private boolean isWrapped() {
final boolean isConcurrentWrap =
CompletionException.class.isInstance(current) || ExecutionException.class.isInstance(current);
return isConcurrentWrap;
}
/**
* Store the top most wrap exception and return the cause.
* @return the cause of the current {@link Throwable}.
*/
private Throwable unwrap() {
if (wrapped == null) {
wrapped = current;
}
current = current.getCause();
return current;
}
/**
* Set flag that the original {@link Throwable} is {@link MappableException} and unwrap the nested {@link Throwable}.
* @return true if the original {@link Throwable} is {@link MappableException}.
*/
private boolean tryMappableException() {
if (MappableException.class.isInstance(original)) {
inMappable = true;
current = original.getCause();
return true;
}
return false;
}
/**
* Return the flag that original {@link Throwable} is {@link MappableException}.
* @return true if the original {@link Throwable} is {@link MappableException}.
*/
private boolean isInMappable() {
return inMappable;
}
}
}