//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.continuation;

import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.Timer;
import java.util.TimerTask;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;


public abstract class ContinuationBase
{
    protected SuspendServlet _servlet=new SuspendServlet();
    protected int _port;

    protected void doNormal(String type) throws Exception
    {
        String response=process(null,null);
        assertContains(type,response);
        assertContains("NORMAL",response);
        assertNotContains("history: onTimeout",response);
        assertNotContains("history: onComplete",response);
    }

    protected void doSleep() throws Exception
    {
        String response=process("sleep=200",null);
        assertContains("SLEPT",response);
        assertNotContains("history: onTimeout",response);
        assertNotContains("history: onComplete",response);
    }

    protected void doSuspend() throws Exception
    {
        String response=process("suspend=200",null);
        assertContains("TIMEOUT",response);
        assertContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
    }

    protected void doSuspendWaitResume() throws Exception
    {
        String response=process("suspend=200&resume=10",null);
        assertContains("RESUMED",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
    }

    protected void doSuspendResume() throws Exception
    {
        String response=process("suspend=200&resume=0",null);
        assertContains("RESUMED",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
    }

    protected void doSuspendWaitComplete() throws Exception
    {
        String response=process("suspend=200&complete=50",null);
        assertContains("COMPLETED",response);
        assertContains("history: initial",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
        assertNotContains("history: !initial",response);
    }

    protected void doSuspendComplete() throws Exception
    {
        String response=process("suspend=200&complete=0",null);
        assertContains("COMPLETED",response);
        assertContains("history: initial",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
        assertNotContains("history: !initial",response);
    }

    protected void doSuspendWaitResumeSuspendWaitResume() throws Exception
    {
        String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
        assertEquals(2,count(response,"history: suspend"));
        assertEquals(2,count(response,"history: resume"));
        assertEquals(0,count(response,"history: onTimeout"));
        assertEquals(1,count(response,"history: onComplete"));
        assertContains("RESUMED",response);
    }

    protected void doSuspendWaitResumeSuspendComplete() throws Exception
    {
        String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
        assertEquals(2,count(response,"history: suspend"));
        assertEquals(1,count(response,"history: resume"));
        assertEquals(0,count(response,"history: onTimeout"));
        assertEquals(1,count(response,"history: onComplete"));
        assertContains("COMPLETED",response);
    }

    protected void doSuspendWaitResumeSuspend() throws Exception
    {
        String response=process("suspend=1000&resume=10&suspend2=10",null);
        assertEquals(2,count(response,"history: suspend"));
        assertEquals(1,count(response,"history: resume"));
        assertEquals(1,count(response,"history: onTimeout"));
        assertEquals(1,count(response,"history: onComplete"));
        assertContains("TIMEOUT",response);
    }

    protected void doSuspendTimeoutSuspendResume() throws Exception
    {
        String response=process("suspend=10&suspend2=1000&resume2=10",null);
        assertEquals(2,count(response,"history: suspend"));
        assertEquals(1,count(response,"history: resume"));
        assertEquals(1,count(response,"history: onTimeout"));
        assertEquals(1,count(response,"history: onComplete"));
        assertContains("RESUMED",response);
    }

    protected void doSuspendTimeoutSuspendComplete() throws Exception
    {
        String response=process("suspend=10&suspend2=1000&complete2=10",null);
        assertEquals(2,count(response,"history: suspend"));
        assertEquals(0,count(response,"history: resume"));
        assertEquals(1,count(response,"history: onTimeout"));
        assertEquals(1,count(response,"history: onComplete"));
        assertContains("COMPLETED",response);
    }

    protected void doSuspendTimeoutSuspend() throws Exception
    {
        String response=process("suspend=10&suspend2=10",null);
        assertEquals(2,count(response,"history: suspend"));
        assertEquals(0,count(response,"history: resume"));
        assertEquals(2,count(response,"history: onTimeout"));
        assertEquals(1,count(response,"history: onComplete"));
        assertContains("TIMEOUT",response);
    }

    protected void doSuspendThrowResume() throws Exception
    {
        String response=process("suspend=200&resume=10&undispatch=true",null);
        assertContains("RESUMED",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
    }

    protected void doSuspendResumeThrow() throws Exception
    {
        String response=process("suspend=200&resume=0&undispatch=true",null);
        assertContains("RESUMED",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
    }

    protected void doSuspendThrowComplete() throws Exception
    {
        String response=process("suspend=200&complete=10&undispatch=true",null);
        assertContains("COMPLETED",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
    }

    protected void doSuspendCompleteThrow() throws Exception
    {
        String response=process("suspend=200&complete=0&undispatch=true",null);
        assertContains("COMPLETED",response);
        assertNotContains("history: onTimeout",response);
        assertContains("history: onComplete",response);
    }


    private int count(String responses,String substring)
    {
        int count=0;
        int i=responses.indexOf(substring);
        while (i>=0)
        {
            count++;
            i=responses.indexOf(substring,i+substring.length());
        }

        return count;
    }

    protected void assertContains(String content,String response)
    {
        assertThat(response,startsWith("HTTP/1.1 200 OK"));
        assertThat(response,containsString(content));
    }

    protected void assertNotContains(String content,String response)
    {
        assertThat(response,startsWith("HTTP/1.1 200 OK"));
        assertThat(response,not(containsString(content)));
    }

    public synchronized String process(String query,String content) throws Exception
    {
        String request = "GET /";

        if (query!=null)
            request+="?"+query;
        request+=" HTTP/1.1\r\n"+
        "Host: localhost\r\n"+
        "Connection: close\r\n";
        if (content==null)
            request+="\r\n";
        else
        {
            request+="Content-Length: "+content.length()+"\r\n";
            request+="\r\n" + content;
        }

        int port=_port;
        String response=null;
        try (Socket socket = new Socket("localhost",port);)
        {
            socket.setSoTimeout(10000);
            socket.getOutputStream().write(request.getBytes(StandardCharsets.UTF_8));
            socket.getOutputStream().flush();

            response = toString(socket.getInputStream());
        }
        catch(Exception e)
        {
            System.err.println("failed on port "+port);
            e.printStackTrace();
            throw e;
        }
        return response;
    }


    protected abstract String toString(InputStream in) throws IOException;


    private static class SuspendServlet extends HttpServlet
    {
        private Timer _timer=new Timer();

        public SuspendServlet()
        {}

        /* ------------------------------------------------------------ */
        @Override
        protected void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
        {
            final Continuation continuation = ContinuationSupport.getContinuation(request);

            response.addHeader("history",continuation.getClass().toString());

            int read_before=0;
            long sleep_for=-1;
            long suspend_for=-1;
            long suspend2_for=-1;
            long resume_after=-1;
            long resume2_after=-1;
            long complete_after=-1;
            long complete2_after=-1;
            boolean undispatch=false;

            if (request.getParameter("read")!=null)
                read_before=Integer.parseInt(request.getParameter("read"));
            if (request.getParameter("sleep")!=null)
                sleep_for=Integer.parseInt(request.getParameter("sleep"));
            if (request.getParameter("suspend")!=null)
                suspend_for=Integer.parseInt(request.getParameter("suspend"));
            if (request.getParameter("suspend2")!=null)
                suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
            if (request.getParameter("resume")!=null)
                resume_after=Integer.parseInt(request.getParameter("resume"));
            if (request.getParameter("resume2")!=null)
                resume2_after=Integer.parseInt(request.getParameter("resume2"));
            if (request.getParameter("complete")!=null)
                complete_after=Integer.parseInt(request.getParameter("complete"));
            if (request.getParameter("complete2")!=null)
                complete2_after=Integer.parseInt(request.getParameter("complete2"));
            if (request.getParameter("undispatch")!=null)
                undispatch=Boolean.parseBoolean(request.getParameter("undispatch"));

            if (continuation.isInitial())
            {
                response.addHeader("history","initial");
                if (read_before>0)
                {
                    byte[] buf=new byte[read_before];
                    request.getInputStream().read(buf);
                }
                else if (read_before<0)
                {
                    InputStream in = request.getInputStream();
                    int b=in.read();
                    while(b!=-1)
                        b=in.read();
                }

                if (suspend_for>=0)
                {
                    if (suspend_for>0)
                        continuation.setTimeout(suspend_for);
                    continuation.addContinuationListener(__listener);
                    response.addHeader("history","suspend");
                    continuation.suspend(response);

                    if (complete_after>0)
                    {
                        TimerTask complete = new TimerTask()
                        {
                            @Override
                            public void run()
                            {
                                try
                                {
                                    response.setStatus(200);
                                    response.getOutputStream().println("COMPLETED\n");
                                    continuation.complete();
                                }
                                catch(Exception e)
                                {
                                    e.printStackTrace();
                                }
                            }
                        };
                        synchronized (_timer)
                        {
                            _timer.schedule(complete,complete_after);
                        }
                    }
                    else if (complete_after==0)
                    {
                        response.setStatus(200);
                        response.getOutputStream().println("COMPLETED\n");
                        continuation.complete();
                    }
                    else if (resume_after>0)
                    {
                        TimerTask resume = new TimerTask()
                        {
                            @Override
                            public void run()
                            {
                                ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
                                continuation.resume();
                            }
                        };
                        synchronized (_timer)
                        {
                            _timer.schedule(resume,resume_after);
                        }
                    }
                    else if (resume_after==0)
                    {
                        ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume");
                        continuation.resume();
                    }

                    if (undispatch)
                        continuation.undispatch();
                }
                else if (sleep_for>=0)
                {
                    try
                    {
                        Thread.sleep(sleep_for);
                    }
                    catch (InterruptedException e)
                    {
                        e.printStackTrace();
                    }
                    response.setStatus(200);
                    response.getOutputStream().println("SLEPT\n");
                }
                else
                {
                    response.setStatus(200);
                    response.getOutputStream().println("NORMAL\n");
                }
            }
            else
            {
                response.addHeader("history","!initial");
                if (suspend2_for>=0 && request.getAttribute("2nd")==null)
                {
                    request.setAttribute("2nd","cycle");

                    if (suspend2_for>0)
                        continuation.setTimeout(suspend2_for);
                    // continuation.addContinuationListener(__listener);
                    response.addHeader("history","suspend");
                    continuation.suspend(response);

                    if (complete2_after>0)
                    {
                        TimerTask complete = new TimerTask()
                        {
                            @Override
                            public void run()
                            {
                                try
                                {
                                    response.setStatus(200);
                                    response.getOutputStream().println("COMPLETED\n");
                                    continuation.complete();
                                }
                                catch(Exception e)
                                {
                                    e.printStackTrace();
                                }
                            }
                        };
                        synchronized (_timer)
                        {
                            _timer.schedule(complete,complete2_after);
                        }
                    }
                    else if (complete2_after==0)
                    {
                        response.setStatus(200);
                        response.getOutputStream().println("COMPLETED\n");
                        continuation.complete();
                    }
                    else if (resume2_after>0)
                    {
                        TimerTask resume = new TimerTask()
                        {
                            @Override
                            public void run()
                            {
                                response.addHeader("history","resume");
                                continuation.resume();
                            }
                        };
                        synchronized (_timer)
                        {
                            _timer.schedule(resume,resume2_after);
                        }
                    }
                    else if (resume2_after==0)
                    {
                        response.addHeader("history","resume");
                        continuation.resume();
                    }
                    if (undispatch)
                        continuation.undispatch();
                    return;
                }
                else if (continuation.isExpired())
                {
                    response.setStatus(200);
                    response.getOutputStream().println("TIMEOUT\n");
                }
                else if (continuation.isResumed())
                {
                    response.setStatus(200);
                    response.getOutputStream().println("RESUMED\n");
                }
                else
                {
                    response.setStatus(200);
                    response.getOutputStream().println("unknown???\n");
                }
            }
        }
    }


    private static ContinuationListener __listener = new ContinuationListener()
    {
        @Override
        public void onComplete(Continuation continuation)
        {
            ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onComplete");
        }

        @Override
        public void onTimeout(Continuation continuation)
        {
            ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","onTimeout");
        }

    };
}
