//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.rhttp.gateway;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.rhttp.client.RHTTPRequest;
import org.eclipse.jetty.rhttp.client.RHTTPResponse;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

/**
 * The servlet that handles the communication with the gateway clients.
 * @version $Revision$ $Date$
 */
public class ConnectorServlet extends HttpServlet
{
    private final Logger logger = Log.getLogger(getClass().toString());
    private final TargetIdRetriever targetIdRetriever = new StandardTargetIdRetriever();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final ConcurrentMap<String, Future<?>> expirations = new ConcurrentHashMap<String, Future<?>>();
    private final Gateway gateway;
    private long clientTimeout=15000;

    public ConnectorServlet(Gateway gateway)
    {
        this.gateway = gateway;
    }

    @Override
    public void init() throws ServletException 
    {
        String t = getInitParameter("clientTimeout");
        if (t!=null && !"".equals(t))
            clientTimeout=Long.parseLong(t);
    }

    @Override
    protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
    {
        String targetId = targetIdRetriever.retrieveTargetId(request);

        String uri = request.getRequestURI();
        String path = uri.substring(request.getServletPath().length());
        String[] segments = path.split("/");
        if (segments.length < 3)
            throw new ServletException("Invalid request to " + getClass().getSimpleName() + ": " + uri);

        String action = segments[2];
        if ("handshake".equals(action))
            serviceHandshake(targetId, request, response);
        else if ("connect".equals(action))
            serviceConnect(targetId, request, response);
        else if ("deliver".equals(action))
            serviceDeliver(targetId, request, response);
        else if ("disconnect".equals(action))
            serviceDisconnect(targetId, request, response);
        else
            throw new ServletException("Invalid request to " + getClass().getSimpleName() + ": " + uri);
    }

    private void serviceHandshake(String targetId, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
    {
        ClientDelegate client = gateway.getClientDelegate(targetId);
        if (client != null)
            throw new IOException("Client with targetId " + targetId + " is already connected");

        client = gateway.newClientDelegate(targetId);
        ClientDelegate existing = gateway.addClientDelegate(targetId, client);
        if (existing != null)
            throw new IOException("Client with targetId " + targetId + " is already connected");

        flush(client, httpRequest, httpResponse);
    }

    private void flush(ClientDelegate client, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
    {
        List<RHTTPRequest> requests = client.process(httpRequest);
        if (requests != null)
        {
            // Schedule before sending the requests, to avoid that the remote client
            // reconnects before we have scheduled the expiration timeout.
            if (!client.isClosed())
                schedule(client);

            ServletOutputStream output = httpResponse.getOutputStream();
            for (RHTTPRequest request : requests)
                output.write(request.getFrameBytes());
            // I could count the framed bytes of all requests and set a Content-Length header,
            // but the implementation of ServletOutputStream takes care of everything:
            // if the request was HTTP/1.1, then flushing result in a chunked response, but the
            // client know how to handle it; if the request was HTTP/1.0, then no chunking.
            // To avoid chunking in HTTP/1.1 I must set the Content-Length header.
            output.flush();
            logger.debug("Delivered to device {} requests {} ", client.getTargetId(), requests);
        }
    }

    private void schedule(ClientDelegate client)
    {
        Future<?> task = scheduler.schedule(new ClientExpirationTask(client), clientTimeout, TimeUnit.MILLISECONDS);
        Future<?> existing = expirations.put(client.getTargetId(), task);
        assert existing == null;
    }

    private void unschedule(String targetId)
    {
        Future<?> task = expirations.remove(targetId);
        if (task != null)
            task.cancel(false);
    }

    private void serviceConnect(String targetId, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws IOException
    {
        unschedule(targetId);

        ClientDelegate client = gateway.getClientDelegate(targetId);
        if (client == null)
        {
            // Expired client tries to connect without handshake
            httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED);
            return;
        }

        flush(client, httpRequest, httpResponse);

        if (client.isClosed())
            gateway.removeClientDelegate(targetId);
    }

    private void expireConnect(ClientDelegate client, long time)
    {
        String targetId = client.getTargetId();
        logger.info("Client with targetId {} missing, last seen {} ms ago, closing it", targetId, System.currentTimeMillis() - time);
        client.close();
        // If the client expired, means that it did not connect,
        // so there no request to resume, and we cleanup here
        // (while normally this cleanup is done in serviceConnect())
        unschedule(targetId);
        gateway.removeClientDelegate(targetId);
    }

    private void serviceDeliver(String targetId, HttpServletRequest httpRequest, HttpServletResponse httpResponse) throws ServletException, IOException
    {
        if (gateway.getClientDelegate(targetId) == null)
        {
            // Expired client tries to deliver without handshake
            httpResponse.sendError(HttpServletResponse.SC_UNAUTHORIZED);
            return;
        }

        byte[] body = Utils.read(httpRequest.getInputStream());

        RHTTPResponse response = RHTTPResponse.fromFrameBytes(body);

        ExternalRequest externalRequest = gateway.removeExternalRequest(response.getId());
        if (externalRequest != null)
        {
            externalRequest.respond(response);
            logger.debug("Deliver request from device {}, gateway request {}, response {}", new Object[] {targetId, externalRequest, response});
        }
        else
        {
            // We can arrive here for a race with the continuation expiration, which expired just before
            // the gateway client responded with a valid response; log this case ignore it.
            logger.debug("Deliver request from device {}, missing gateway request, response {}", targetId, response);
        }
    }

    private void serviceDisconnect(String targetId, HttpServletRequest request, HttpServletResponse response)
    {
        // Do not remove the ClientDelegate from the gateway here,
        // since closing the ClientDelegate will resume the connect request
        // and we remove the ClientDelegate from the gateway there
        ClientDelegate client = gateway.getClientDelegate(targetId);
        if (client != null)
            client.close();
    }

    private class ClientExpirationTask implements Runnable
    {
        private final long time = System.currentTimeMillis();
        private final ClientDelegate client;

        public ClientExpirationTask(ClientDelegate client)
        {
            this.client = client;
        }

        public void run()
        {
            expireConnect(client, time);
        }
    }
}
