| /* |
| * Copyright (c) 2010, 2019 Oracle and/or its affiliates. All rights reserved. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0, which is available at |
| * http://www.eclipse.org/legal/epl-2.0. |
| * |
| * This Source Code may also be made available under the following Secondary |
| * Licenses when the conditions for such availability set forth in the |
| * Eclipse Public License v. 2.0 are satisfied: GNU General Public License, |
| * version 2 with the GNU Classpath Exception, which is available at |
| * https://www.gnu.org/software/classpath/license.html. |
| * |
| * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0 |
| */ |
| |
| package org.glassfish.jersey.grizzly.connector; |
| |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.net.URI; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import javax.ws.rs.ProcessingException; |
| import javax.ws.rs.client.Client; |
| import javax.ws.rs.core.Configuration; |
| import javax.ws.rs.core.Response; |
| |
| import org.glassfish.jersey.client.ClientProperties; |
| import org.glassfish.jersey.client.ClientRequest; |
| import org.glassfish.jersey.client.ClientResponse; |
| import org.glassfish.jersey.client.RequestEntityProcessing; |
| import org.glassfish.jersey.client.spi.AsyncConnectorCallback; |
| import org.glassfish.jersey.client.spi.Connector; |
| import org.glassfish.jersey.internal.Version; |
| import org.glassfish.jersey.internal.util.collection.ByteBufferInputStream; |
| import org.glassfish.jersey.internal.util.collection.NonBlockingInputStream; |
| import org.glassfish.jersey.message.internal.HeaderUtils; |
| import org.glassfish.jersey.message.internal.OutboundMessageContext; |
| |
| import org.glassfish.grizzly.memory.Buffers; |
| import org.glassfish.grizzly.memory.MemoryManager; |
| |
| import com.ning.http.client.AsyncHandler; |
| import com.ning.http.client.AsyncHttpClient; |
| import com.ning.http.client.AsyncHttpClientConfig; |
| import com.ning.http.client.HttpResponseBodyPart; |
| import com.ning.http.client.HttpResponseHeaders; |
| import com.ning.http.client.HttpResponseStatus; |
| import com.ning.http.client.ProxyServerSelector; |
| import com.ning.http.client.Request; |
| import com.ning.http.client.RequestBuilder; |
| import com.ning.http.client.providers.grizzly.FeedableBodyGenerator; |
| import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider; |
| import com.ning.http.util.ProxyUtils; |
| |
| /** |
| * The transport using the AsyncHttpClient. |
| * |
| * @author Stepan Kopriva |
| * @author Marek Potociar |
| */ |
| class GrizzlyConnector implements Connector { |
| |
| private final AsyncHttpClient grizzlyClient; |
| |
| /** |
| * Create new connector based on Grizzly asynchronous client library. |
| * |
| * @param client Jersey client instance to create the connector for. |
| * @param config Jersey client runtime configuration to be used to configure the connector parameters. |
| * @param asyncClientCustomizer Async HTTP Client configuration builder customizer. |
| */ |
| GrizzlyConnector(final Client client, |
| final Configuration config, |
| final GrizzlyConnectorProvider.AsyncClientCustomizer asyncClientCustomizer) { |
| AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder(); |
| |
| ExecutorService executorService; |
| if (config != null) { |
| final Object threadPoolSize = config.getProperties().get(ClientProperties.ASYNC_THREADPOOL_SIZE); |
| |
| if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) { |
| executorService = Executors.newFixedThreadPool((Integer) threadPoolSize); |
| } else { |
| executorService = Executors.newCachedThreadPool(); |
| } |
| |
| builder = builder.setExecutorService(executorService); |
| |
| builder.setConnectTimeout(ClientProperties.getValue(config.getProperties(), |
| ClientProperties.CONNECT_TIMEOUT, 10000)); |
| |
| builder.setRequestTimeout(ClientProperties.getValue(config.getProperties(), |
| ClientProperties.READ_TIMEOUT, 10000)); |
| |
| Object proxyUri; |
| proxyUri = config.getProperty(ClientProperties.PROXY_URI); |
| if (proxyUri != null) { |
| final URI u = getProxyUri(proxyUri); |
| final Properties proxyProperties = new Properties(); |
| proxyProperties.setProperty(ProxyUtils.PROXY_PROTOCOL, u.getScheme()); |
| proxyProperties.setProperty(ProxyUtils.PROXY_HOST, u.getHost()); |
| proxyProperties.setProperty(ProxyUtils.PROXY_PORT, String.valueOf(u.getPort())); |
| |
| final String userName = ClientProperties.getValue( |
| config.getProperties(), ClientProperties.PROXY_USERNAME, String.class); |
| if (userName != null) { |
| proxyProperties.setProperty(ProxyUtils.PROXY_USER, userName); |
| |
| final String password = ClientProperties.getValue( |
| config.getProperties(), ClientProperties.PROXY_PASSWORD, String.class); |
| if (password != null) { |
| proxyProperties.setProperty(ProxyUtils.PROXY_PASSWORD, password); |
| } |
| } |
| ProxyServerSelector proxyServerSelector = ProxyUtils.createProxyServerSelector(proxyProperties); |
| builder.setProxyServerSelector(proxyServerSelector); |
| } |
| } else { |
| executorService = Executors.newCachedThreadPool(); |
| builder.setExecutorService(executorService); |
| } |
| |
| builder.setAllowPoolingConnections(true); |
| if (client.getSslContext() != null) { |
| builder.setSSLContext(client.getSslContext()); |
| } |
| if (client.getHostnameVerifier() != null) { |
| builder.setHostnameVerifier(client.getHostnameVerifier()); |
| } |
| |
| if (asyncClientCustomizer != null) { |
| builder = asyncClientCustomizer.customize(client, config, builder); |
| } |
| |
| AsyncHttpClientConfig asyncClientConfig = builder.build(); |
| |
| this.grizzlyClient = new AsyncHttpClient(new GrizzlyAsyncHttpProvider(asyncClientConfig), asyncClientConfig); |
| } |
| |
| @SuppressWarnings("ChainOfInstanceofChecks") |
| private static URI getProxyUri(final Object proxy) { |
| if (proxy instanceof URI) { |
| return (URI) proxy; |
| } else if (proxy instanceof String) { |
| return URI.create((String) proxy); |
| } else { |
| throw new ProcessingException(LocalizationMessages.WRONG_PROXY_URI_TYPE(ClientProperties.PROXY_URI)); |
| } |
| } |
| |
| /** |
| * Get the underlying Grizzly {@link com.ning.http.client.AsyncHttpClient} instance. |
| * |
| * @return underlying Grizzly {@link com.ning.http.client.AsyncHttpClient} instance. |
| */ |
| public AsyncHttpClient getGrizzlyClient() { |
| return grizzlyClient; |
| } |
| |
| /** |
| * Sends the {@link javax.ws.rs.core.Request} via Grizzly transport and returns the {@link javax.ws.rs.core.Response}. |
| * |
| * @param request Jersey client request to be sent. |
| * @return received response. |
| */ |
| @Override |
| public ClientResponse apply(final ClientRequest request) { |
| final Request connectorRequest = translate(request); |
| final Map<String, String> clientHeadersSnapshot = writeOutBoundHeaders(request, connectorRequest); |
| |
| final CompletableFuture<ClientResponse> responseFuture = new CompletableFuture<>(); |
| final ByteBufferInputStream entityStream = new ByteBufferInputStream(); |
| final AtomicBoolean futureSet = new AtomicBoolean(false); |
| |
| try { |
| grizzlyClient.executeRequest(connectorRequest, new AsyncHandler<Void>() { |
| private volatile HttpResponseStatus status = null; |
| |
| @Override |
| public STATE onStatusReceived(final HttpResponseStatus responseStatus) throws Exception { |
| status = responseStatus; |
| return STATE.CONTINUE; |
| } |
| |
| @Override |
| public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception { |
| if (!futureSet.compareAndSet(false, true)) { |
| return STATE.ABORT; |
| } |
| |
| HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, request.getHeaders(), |
| GrizzlyConnector.this.getClass().getName(), |
| request.getConfiguration()); |
| |
| responseFuture.complete(translate(request, this.status, headers, entityStream)); |
| return STATE.CONTINUE; |
| } |
| |
| @Override |
| public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { |
| entityStream.put(bodyPart.getBodyByteBuffer()); |
| return STATE.CONTINUE; |
| } |
| |
| @Override |
| public Void onCompleted() throws Exception { |
| entityStream.closeQueue(); |
| return null; |
| } |
| |
| @Override |
| public void onThrowable(Throwable t) { |
| entityStream.closeQueue(t); |
| |
| if (futureSet.compareAndSet(false, true)) { |
| t = t instanceof IOException ? new ProcessingException(t.getMessage(), t) : t; |
| responseFuture.completeExceptionally(t); |
| } |
| } |
| }); |
| |
| return responseFuture.get(); |
| } catch (ExecutionException ex) { |
| Throwable e = ex.getCause() == null ? ex : ex.getCause(); |
| throw new ProcessingException(e.getMessage(), e); |
| } catch (InterruptedException ex) { |
| throw new ProcessingException(ex.getMessage(), ex); |
| } |
| } |
| |
| @Override |
| public Future<?> apply(final ClientRequest request, final AsyncConnectorCallback callback) { |
| final Request connectorRequest = translate(request); |
| final Map<String, String> clientHeadersSnapshot = writeOutBoundHeaders(request, connectorRequest); |
| final ByteBufferInputStream entityStream = new ByteBufferInputStream(); |
| final AtomicBoolean callbackInvoked = new AtomicBoolean(false); |
| |
| Throwable failure; |
| try { |
| return grizzlyClient.executeRequest(connectorRequest, new AsyncHandler<Void>() { |
| private volatile HttpResponseStatus status = null; |
| |
| @Override |
| public STATE onStatusReceived(final HttpResponseStatus responseStatus) throws Exception { |
| status = responseStatus; |
| return STATE.CONTINUE; |
| } |
| |
| @Override |
| public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception { |
| if (!callbackInvoked.compareAndSet(false, true)) { |
| return STATE.ABORT; |
| } |
| |
| HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, request.getHeaders(), |
| GrizzlyConnector.this.getClass().getName(), request.getConfiguration()); |
| // hand-off to grizzly's application thread pool for response processing |
| processResponse(new Runnable() { |
| @Override |
| public void run() { |
| callback.response(translate(request, status, headers, entityStream)); |
| } |
| }); |
| return STATE.CONTINUE; |
| } |
| |
| @Override |
| public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception { |
| entityStream.put(bodyPart.getBodyByteBuffer()); |
| return STATE.CONTINUE; |
| } |
| |
| @Override |
| public Void onCompleted() throws Exception { |
| entityStream.closeQueue(); |
| return null; |
| } |
| |
| @Override |
| public void onThrowable(Throwable t) { |
| entityStream.closeQueue(t); |
| |
| if (callbackInvoked.compareAndSet(false, true)) { |
| t = t instanceof IOException ? new ProcessingException(t.getMessage(), t) : t; |
| callback.failure(t); |
| } |
| } |
| }); |
| } catch (Throwable t) { |
| failure = t; |
| } |
| |
| if (callbackInvoked.compareAndSet(false, true)) { |
| callback.failure(failure); |
| } |
| CompletableFuture<Object> future = new CompletableFuture<>(); |
| future.completeExceptionally(failure); |
| return future; |
| } |
| |
| @Override |
| public void close() { |
| grizzlyClient.close(); |
| } |
| |
| private ClientResponse translate(final ClientRequest requestContext, |
| final HttpResponseStatus status, |
| final HttpResponseHeaders headers, |
| final NonBlockingInputStream entityStream) { |
| |
| final ClientResponse responseContext = new ClientResponse(new Response.StatusType() { |
| @Override |
| public int getStatusCode() { |
| return status.getStatusCode(); |
| } |
| |
| @Override |
| public Response.Status.Family getFamily() { |
| return Response.Status.Family.familyOf(status.getStatusCode()); |
| } |
| |
| @Override |
| public String getReasonPhrase() { |
| return status.getStatusText(); |
| } |
| }, requestContext); |
| |
| for (Map.Entry<String, List<String>> entry : headers.getHeaders().entrySet()) { |
| for (String value : entry.getValue()) { |
| responseContext.getHeaders().add(entry.getKey(), value); |
| } |
| } |
| |
| responseContext.setEntityStream(entityStream); |
| |
| return responseContext; |
| } |
| |
| private com.ning.http.client.Request translate(final ClientRequest requestContext) { |
| final String strMethod = requestContext.getMethod(); |
| final URI uri = requestContext.getUri(); |
| |
| RequestBuilder builder = new RequestBuilder(strMethod).setUrl(uri.toString()); |
| |
| builder.setFollowRedirects(requestContext.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true)); |
| |
| if (requestContext.hasEntity()) { |
| |
| final RequestEntityProcessing entityProcessing = |
| requestContext.resolveProperty(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class); |
| |
| if (entityProcessing == RequestEntityProcessing.BUFFERED) { |
| byte[] entityBytes = bufferEntity(requestContext); |
| builder = builder.setBody(entityBytes); |
| } else { |
| final FeedableBodyGenerator bodyGenerator = new FeedableBodyGenerator(); |
| final Integer chunkSize = requestContext.resolveProperty( |
| ClientProperties.CHUNKED_ENCODING_SIZE, ClientProperties.DEFAULT_CHUNK_SIZE); |
| bodyGenerator.setMaxPendingBytes(chunkSize); |
| final FeedableBodyGenerator.Feeder feeder = new FeedableBodyGenerator.SimpleFeeder(bodyGenerator) { |
| @Override |
| public void flush() throws IOException { |
| requestContext.writeEntity(); |
| } |
| }; |
| requestContext.setStreamProvider(new OutboundMessageContext.StreamProvider() { |
| |
| @Override |
| public OutputStream getOutputStream(int contentLength) throws IOException { |
| return new FeederAdapter(feeder); |
| } |
| }); |
| bodyGenerator.setFeeder(feeder); |
| builder.setBody(bodyGenerator); |
| } |
| } |
| |
| final GrizzlyConnectorProvider.RequestCustomizer requestCustomizer = requestContext.resolveProperty( |
| GrizzlyConnectorProvider.REQUEST_CUSTOMIZER, |
| GrizzlyConnectorProvider.RequestCustomizer.class); |
| if (requestCustomizer != null) { |
| builder = requestCustomizer.customize(requestContext, builder); |
| } |
| |
| return builder.build(); |
| } |
| |
| /** |
| * Submits the response processing on Grizzly client's application thread pool. |
| * |
| * @param responseTask task to be processed on application thread pool. |
| */ |
| private void processResponse(Runnable responseTask) { |
| this.grizzlyClient.getConfig().executorService().submit(responseTask); |
| } |
| |
| /** |
| * Utility OutputStream implementation that can feed Grizzly chunk-encoded body generator. |
| */ |
| private class FeederAdapter extends OutputStream { |
| |
| final FeedableBodyGenerator.Feeder delegate; |
| |
| /** |
| * Get me a new adapter for given feeder. |
| * |
| * @param bodyFeeder adaptee to get fed as an output stream. |
| */ |
| FeederAdapter(FeedableBodyGenerator.Feeder bodyFeeder) { |
| this.delegate = bodyFeeder; |
| } |
| |
| @Override |
| public void write(int b) throws IOException { |
| final byte[] buffer = new byte[1]; |
| buffer[0] = (byte) b; |
| delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, buffer), false); |
| } |
| |
| @Override |
| public void write(byte[] b) throws IOException { |
| delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b), false); |
| } |
| |
| @Override |
| public void write(byte[] b, int off, int len) throws IOException { |
| delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b, off, len), false); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| delegate.feed(Buffers.EMPTY_BUFFER, true); |
| } |
| } |
| |
| @SuppressWarnings("MagicNumber") |
| private byte[] bufferEntity(ClientRequest requestContext) { |
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(512); |
| requestContext.setStreamProvider(new OutboundMessageContext.StreamProvider() { |
| |
| @Override |
| public OutputStream getOutputStream(int contentLength) throws IOException { |
| return baos; |
| } |
| }); |
| try { |
| requestContext.writeEntity(); |
| } catch (IOException e) { |
| throw new ProcessingException(LocalizationMessages.ERROR_BUFFERING_ENTITY(), e); |
| } |
| return baos.toByteArray(); |
| } |
| |
| private static Map<String, String> writeOutBoundHeaders(final ClientRequest clientRequest, |
| final com.ning.http.client.Request request) { |
| Map<String, String> stringHeaders = |
| HeaderUtils.asStringHeadersSingleValue(clientRequest.getHeaders(), clientRequest.getConfiguration()); |
| |
| for (Map.Entry<String, String> e : stringHeaders.entrySet()) { |
| request.getHeaders().add(e.getKey(), e.getValue()); |
| } |
| return stringHeaders; |
| } |
| |
| @Override |
| public String getName() { |
| return String.format("Async HTTP Grizzly Connector %s", Version.getVersion()); |
| } |
| } |