/*
 * Copyright (c) 2010, 2019 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0, which is available at
 * http://www.eclipse.org/legal/epl-2.0.
 *
 * This Source Code may also be made available under the following Secondary
 * Licenses when the conditions for such availability set forth in the
 * Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
 * version 2 with the GNU Classpath Exception, which is available at
 * https://www.gnu.org/software/classpath/license.html.
 *
 * SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
 */

package org.glassfish.jersey.grizzly.connector;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.ws.rs.ProcessingException;
import javax.ws.rs.client.Client;
import javax.ws.rs.core.Configuration;
import javax.ws.rs.core.Response;

import org.glassfish.jersey.client.ClientProperties;
import org.glassfish.jersey.client.ClientRequest;
import org.glassfish.jersey.client.ClientResponse;
import org.glassfish.jersey.client.RequestEntityProcessing;
import org.glassfish.jersey.client.spi.AsyncConnectorCallback;
import org.glassfish.jersey.client.spi.Connector;
import org.glassfish.jersey.internal.Version;
import org.glassfish.jersey.internal.util.collection.ByteBufferInputStream;
import org.glassfish.jersey.internal.util.collection.NonBlockingInputStream;
import org.glassfish.jersey.message.internal.HeaderUtils;
import org.glassfish.jersey.message.internal.OutboundMessageContext;

import org.glassfish.grizzly.memory.Buffers;
import org.glassfish.grizzly.memory.MemoryManager;

import com.ning.http.client.AsyncHandler;
import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.ProxyServerSelector;
import com.ning.http.client.Request;
import com.ning.http.client.RequestBuilder;
import com.ning.http.client.providers.grizzly.FeedableBodyGenerator;
import com.ning.http.client.providers.grizzly.GrizzlyAsyncHttpProvider;
import com.ning.http.util.ProxyUtils;

/**
 * The transport using the AsyncHttpClient.
 *
 * @author Stepan Kopriva
 * @author Marek Potociar
 */
class GrizzlyConnector implements Connector {

    private final AsyncHttpClient grizzlyClient;

    /**
     * Create new connector based on Grizzly asynchronous client library.
     *
     * @param client                Jersey client instance to create the connector for.
     * @param config                Jersey client runtime configuration to be used to configure the connector parameters.
     * @param asyncClientCustomizer Async HTTP Client configuration builder customizer.
     */
    GrizzlyConnector(final Client client,
                     final Configuration config,
                     final GrizzlyConnectorProvider.AsyncClientCustomizer asyncClientCustomizer) {
        AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();

        ExecutorService executorService;
        if (config != null) {
            final Object threadPoolSize = config.getProperties().get(ClientProperties.ASYNC_THREADPOOL_SIZE);

            if (threadPoolSize != null && threadPoolSize instanceof Integer && (Integer) threadPoolSize > 0) {
                executorService = Executors.newFixedThreadPool((Integer) threadPoolSize);
            } else {
                executorService = Executors.newCachedThreadPool();
            }

            builder = builder.setExecutorService(executorService);

            builder.setConnectTimeout(ClientProperties.getValue(config.getProperties(),
                                                                ClientProperties.CONNECT_TIMEOUT, 10000));

            builder.setRequestTimeout(ClientProperties.getValue(config.getProperties(),
                                                                ClientProperties.READ_TIMEOUT, 10000));

            Object proxyUri;
            proxyUri = config.getProperty(ClientProperties.PROXY_URI);
            if (proxyUri != null) {
                final URI u = getProxyUri(proxyUri);
                final Properties proxyProperties = new Properties();
                proxyProperties.setProperty(ProxyUtils.PROXY_PROTOCOL, u.getScheme());
                proxyProperties.setProperty(ProxyUtils.PROXY_HOST, u.getHost());
                proxyProperties.setProperty(ProxyUtils.PROXY_PORT, String.valueOf(u.getPort()));

                final String userName = ClientProperties.getValue(
                        config.getProperties(), ClientProperties.PROXY_USERNAME, String.class);
                if (userName != null) {
                    proxyProperties.setProperty(ProxyUtils.PROXY_USER, userName);

                    final String password = ClientProperties.getValue(
                            config.getProperties(), ClientProperties.PROXY_PASSWORD, String.class);
                    if (password != null) {
                        proxyProperties.setProperty(ProxyUtils.PROXY_PASSWORD, password);
                    }
                }
                ProxyServerSelector proxyServerSelector = ProxyUtils.createProxyServerSelector(proxyProperties);
                builder.setProxyServerSelector(proxyServerSelector);
            }
        } else {
            executorService = Executors.newCachedThreadPool();
            builder.setExecutorService(executorService);
        }

        builder.setAllowPoolingConnections(true);
        if (client.getSslContext() != null) {
            builder.setSSLContext(client.getSslContext());
        }
        if (client.getHostnameVerifier() != null) {
            builder.setHostnameVerifier(client.getHostnameVerifier());
        }

        if (asyncClientCustomizer != null) {
            builder = asyncClientCustomizer.customize(client, config, builder);
        }

        AsyncHttpClientConfig asyncClientConfig = builder.build();

        this.grizzlyClient = new AsyncHttpClient(new GrizzlyAsyncHttpProvider(asyncClientConfig), asyncClientConfig);
    }

    @SuppressWarnings("ChainOfInstanceofChecks")
    private static URI getProxyUri(final Object proxy) {
        if (proxy instanceof URI) {
            return (URI) proxy;
        } else if (proxy instanceof String) {
            return URI.create((String) proxy);
        } else {
            throw new ProcessingException(LocalizationMessages.WRONG_PROXY_URI_TYPE(ClientProperties.PROXY_URI));
        }
    }

    /**
     * Get the underlying Grizzly {@link com.ning.http.client.AsyncHttpClient} instance.
     *
     * @return underlying Grizzly {@link com.ning.http.client.AsyncHttpClient} instance.
     */
    public AsyncHttpClient getGrizzlyClient() {
        return grizzlyClient;
    }

    /**
     * Sends the {@link javax.ws.rs.core.Request} via Grizzly transport and returns the {@link javax.ws.rs.core.Response}.
     *
     * @param request Jersey client request to be sent.
     * @return received response.
     */
    @Override
    public ClientResponse apply(final ClientRequest request) {
        final Request connectorRequest = translate(request);
        final Map<String, String> clientHeadersSnapshot = writeOutBoundHeaders(request, connectorRequest);

        final CompletableFuture<ClientResponse> responseFuture = new CompletableFuture<>();
        final ByteBufferInputStream entityStream = new ByteBufferInputStream();
        final AtomicBoolean futureSet = new AtomicBoolean(false);

        try {
            grizzlyClient.executeRequest(connectorRequest, new AsyncHandler<Void>() {
                private volatile HttpResponseStatus status = null;

                @Override
                public STATE onStatusReceived(final HttpResponseStatus responseStatus) throws Exception {
                    status = responseStatus;
                    return STATE.CONTINUE;
                }

                @Override
                public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
                    if (!futureSet.compareAndSet(false, true)) {
                        return STATE.ABORT;
                    }

                    HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, request.getHeaders(),
                                                   GrizzlyConnector.this.getClass().getName(),
                                                   request.getConfiguration());

                    responseFuture.complete(translate(request, this.status, headers, entityStream));
                    return STATE.CONTINUE;
                }

                @Override
                public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
                    entityStream.put(bodyPart.getBodyByteBuffer());
                    return STATE.CONTINUE;
                }

                @Override
                public Void onCompleted() throws Exception {
                    entityStream.closeQueue();
                    return null;
                }

                @Override
                public void onThrowable(Throwable t) {
                    entityStream.closeQueue(t);

                    if (futureSet.compareAndSet(false, true)) {
                        t = t instanceof IOException ? new ProcessingException(t.getMessage(), t) : t;
                        responseFuture.completeExceptionally(t);
                    }
                }
            });

            return responseFuture.get();
        } catch (ExecutionException ex) {
            Throwable e = ex.getCause() == null ? ex : ex.getCause();
            throw new ProcessingException(e.getMessage(), e);
        } catch (InterruptedException ex) {
            throw new ProcessingException(ex.getMessage(), ex);
        }
    }

    @Override
    public Future<?> apply(final ClientRequest request, final AsyncConnectorCallback callback) {
        final Request connectorRequest = translate(request);
        final Map<String, String> clientHeadersSnapshot = writeOutBoundHeaders(request, connectorRequest);
        final ByteBufferInputStream entityStream = new ByteBufferInputStream();
        final AtomicBoolean callbackInvoked = new AtomicBoolean(false);

        Throwable failure;
        try {
            return grizzlyClient.executeRequest(connectorRequest, new AsyncHandler<Void>() {
                private volatile HttpResponseStatus status = null;

                @Override
                public STATE onStatusReceived(final HttpResponseStatus responseStatus) throws Exception {
                    status = responseStatus;
                    return STATE.CONTINUE;
                }

                @Override
                public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
                    if (!callbackInvoked.compareAndSet(false, true)) {
                        return STATE.ABORT;
                    }

                    HeaderUtils.checkHeaderChanges(clientHeadersSnapshot, request.getHeaders(),
                            GrizzlyConnector.this.getClass().getName(), request.getConfiguration());
                    // hand-off to grizzly's application thread pool for response processing
                    processResponse(new Runnable() {
                        @Override
                        public void run() {
                            callback.response(translate(request, status, headers, entityStream));
                        }
                    });
                    return STATE.CONTINUE;
                }

                @Override
                public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
                    entityStream.put(bodyPart.getBodyByteBuffer());
                    return STATE.CONTINUE;
                }

                @Override
                public Void onCompleted() throws Exception {
                    entityStream.closeQueue();
                    return null;
                }

                @Override
                public void onThrowable(Throwable t) {
                    entityStream.closeQueue(t);

                    if (callbackInvoked.compareAndSet(false, true)) {
                        t = t instanceof IOException ? new ProcessingException(t.getMessage(), t) : t;
                        callback.failure(t);
                    }
                }
            });
        } catch (Throwable t) {
            failure = t;
        }

        if (callbackInvoked.compareAndSet(false, true)) {
            callback.failure(failure);
        }
        CompletableFuture<Object> future = new CompletableFuture<>();
        future.completeExceptionally(failure);
        return future;
    }

    @Override
    public void close() {
        grizzlyClient.close();
    }

    private ClientResponse translate(final ClientRequest requestContext,
                                     final HttpResponseStatus status,
                                     final HttpResponseHeaders headers,
                                     final NonBlockingInputStream entityStream) {

        final ClientResponse responseContext = new ClientResponse(new Response.StatusType() {
            @Override
            public int getStatusCode() {
                return status.getStatusCode();
            }

            @Override
            public Response.Status.Family getFamily() {
                return Response.Status.Family.familyOf(status.getStatusCode());
            }

            @Override
            public String getReasonPhrase() {
                return status.getStatusText();
            }
        }, requestContext);

        for (Map.Entry<String, List<String>> entry : headers.getHeaders().entrySet()) {
            for (String value : entry.getValue()) {
                responseContext.getHeaders().add(entry.getKey(), value);
            }
        }

        responseContext.setEntityStream(entityStream);

        return responseContext;
    }

    private com.ning.http.client.Request translate(final ClientRequest requestContext) {
        final String strMethod = requestContext.getMethod();
        final URI uri = requestContext.getUri();

        RequestBuilder builder = new RequestBuilder(strMethod).setUrl(uri.toString());

        builder.setFollowRedirects(requestContext.resolveProperty(ClientProperties.FOLLOW_REDIRECTS, true));

        if (requestContext.hasEntity()) {

            final RequestEntityProcessing entityProcessing =
                    requestContext.resolveProperty(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.class);

            if (entityProcessing == RequestEntityProcessing.BUFFERED) {
                byte[] entityBytes = bufferEntity(requestContext);
                builder = builder.setBody(entityBytes);
            } else {
                final FeedableBodyGenerator bodyGenerator = new FeedableBodyGenerator();
                final Integer chunkSize = requestContext.resolveProperty(
                        ClientProperties.CHUNKED_ENCODING_SIZE, ClientProperties.DEFAULT_CHUNK_SIZE);
                bodyGenerator.setMaxPendingBytes(chunkSize);
                final FeedableBodyGenerator.Feeder feeder = new FeedableBodyGenerator.SimpleFeeder(bodyGenerator) {
                    @Override
                    public void flush() throws IOException {
                        requestContext.writeEntity();
                    }
                };
                requestContext.setStreamProvider(new OutboundMessageContext.StreamProvider() {

                    @Override
                    public OutputStream getOutputStream(int contentLength) throws IOException {
                        return new FeederAdapter(feeder);
                    }
                });
                bodyGenerator.setFeeder(feeder);
                builder.setBody(bodyGenerator);
            }
        }

        final GrizzlyConnectorProvider.RequestCustomizer requestCustomizer = requestContext.resolveProperty(
                GrizzlyConnectorProvider.REQUEST_CUSTOMIZER,
                GrizzlyConnectorProvider.RequestCustomizer.class);
        if (requestCustomizer != null) {
            builder = requestCustomizer.customize(requestContext, builder);
        }

        return builder.build();
    }

    /**
     * Submits the response processing on Grizzly client's application thread pool.
     *
     * @param responseTask task to be processed on application thread pool.
     */
    private void processResponse(Runnable responseTask) {
        this.grizzlyClient.getConfig().executorService().submit(responseTask);
    }

    /**
     * Utility OutputStream implementation that can feed Grizzly chunk-encoded body generator.
     */
    private class FeederAdapter extends OutputStream {

        final FeedableBodyGenerator.Feeder delegate;

        /**
         * Get me a new adapter for given feeder.
         *
         * @param bodyFeeder adaptee to get fed as an output stream.
         */
        FeederAdapter(FeedableBodyGenerator.Feeder bodyFeeder) {
            this.delegate = bodyFeeder;
        }

        @Override
        public void write(int b) throws IOException {
            final byte[] buffer = new byte[1];
            buffer[0] = (byte) b;
            delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, buffer), false);
        }

        @Override
        public void write(byte[] b) throws IOException {
            delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b), false);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            delegate.feed(Buffers.wrap(MemoryManager.DEFAULT_MEMORY_MANAGER, b, off, len), false);
        }

        @Override
        public void close() throws IOException {
            delegate.feed(Buffers.EMPTY_BUFFER, true);
        }
    }

    @SuppressWarnings("MagicNumber")
    private byte[] bufferEntity(ClientRequest requestContext) {
        final ByteArrayOutputStream baos = new ByteArrayOutputStream(512);
        requestContext.setStreamProvider(new OutboundMessageContext.StreamProvider() {

            @Override
            public OutputStream getOutputStream(int contentLength) throws IOException {
                return baos;
            }
        });
        try {
            requestContext.writeEntity();
        } catch (IOException e) {
            throw new ProcessingException(LocalizationMessages.ERROR_BUFFERING_ENTITY(), e);
        }
        return baos.toByteArray();
    }

    private static Map<String, String> writeOutBoundHeaders(final ClientRequest clientRequest,
                                                            final com.ning.http.client.Request request) {
        Map<String, String> stringHeaders =
                HeaderUtils.asStringHeadersSingleValue(clientRequest.getHeaders(), clientRequest.getConfiguration());

        for (Map.Entry<String, String> e : stringHeaders.entrySet()) {
            request.getHeaders().add(e.getKey(), e.getValue());
        }
        return stringHeaders;
    }

    @Override
    public String getName() {
        return String.format("Async HTTP Grizzly Connector %s", Version.getVersion());
    }
}
