//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.http2;

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;

public class HTTP2Flusher extends IteratingCallback
{
    private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);

    private final Queue<WindowEntry> windows = new ArrayDeque<>();
    private final List<Entry> frames = new ArrayList<>();
    private final Map<IStream, Integer> streams = new HashMap<>();
    private final List<Entry> resets = new ArrayList<>();
    private final List<Entry> actives = new ArrayList<>();
    private final HTTP2Session session;
    private final ByteBufferPool.Lease lease;
    private Throwable terminated;

    public HTTP2Flusher(HTTP2Session session)
    {
        this.session = session;
        this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
    }

    public void window(IStream stream, WindowUpdateFrame frame)
    {
        Throwable closed;
        synchronized (this)
        {
            closed = terminated;
            if (closed == null)
                windows.offer(new WindowEntry(stream, frame));
        }
        // Flush stalled data.
        if (closed == null)
            iterate();
    }

    public boolean prepend(Entry entry)
    {
        Throwable closed;
        synchronized (this)
        {
            closed = terminated;
            if (closed == null)
            {
                frames.add(0, entry);
                if (LOG.isDebugEnabled())
                    LOG.debug("Prepended {}, frames={}", entry, frames.size());
            }
        }
        if (closed == null)
            return true;
        closed(entry, closed);
        return false;
    }

    public boolean append(Entry entry)
    {
        Throwable closed;
        synchronized (this)
        {
            closed = terminated;
            if (closed == null)
            {
                frames.add(entry);
                if (LOG.isDebugEnabled())
                    LOG.debug("Appended {}, frames={}", entry, frames.size());
            }
        }
        if (closed == null)
            return true;
        closed(entry, closed);
        return false;
    }

    private Entry remove(int index)
    {
        synchronized (this)
        {
            return frames.remove(index);
        }
    }

    public int getQueueSize()
    {
        synchronized (this)
        {
            return frames.size();
        }
    }

    @Override
    protected Action process() throws Throwable
    {
        if (LOG.isDebugEnabled())
            LOG.debug("Flushing {}", session);

        synchronized (this)
        {
            if (terminated != null)
                throw terminated;

            // First thing, update the window sizes, so we can
            // reason about the frames to remove from the queue.
            while (!windows.isEmpty())
            {
                WindowEntry entry = windows.poll();
                entry.perform();
            }

            // Now the window sizes cannot change.
            // Window updates that happen concurrently will
            // be queued and processed on the next iteration.
            int sessionWindow = session.getSendWindow();

            int index = 0;
            int size = frames.size();
            while (index < size)
            {
                Entry entry = frames.get(index);
                IStream stream = entry.stream;

                // If the stream has been reset, don't send the frame.
                if (stream != null && stream.isReset() && !entry.isProtocol())
                {
                    remove(index);
                    --size;
                    resets.add(entry);
                    if (LOG.isDebugEnabled())
                        LOG.debug("Gathered for reset {}", entry);
                    continue;
                }

                // Check if the frame fits in the flow control windows.
                int remaining = entry.dataRemaining();
                if (remaining > 0)
                {
                    if (sessionWindow <= 0)
                    {
                        ++index;
                        // There may be *non* flow controlled frames to send.
                        continue;
                    }

                    if (stream != null)
                    {
                        // The stream may have a smaller window than the session.
                        Integer streamWindow = streams.get(stream);
                        if (streamWindow == null)
                        {
                            streamWindow = stream.updateSendWindow(0);
                            streams.put(stream, streamWindow);
                        }

                        // Is it a frame belonging to an already stalled stream ?
                        if (streamWindow <= 0)
                        {
                            ++index;
                            // There may be *non* flow controlled frames to send.
                            continue;
                        }
                    }

                    // The frame fits both flow control windows, reduce them.
                    sessionWindow -= remaining;
                    if (stream != null)
                        streams.put(stream, streams.get(stream) - remaining);
                }

                // The frame will be written, remove it from the queue.
                remove(index);
                --size;
                actives.add(entry);

                if (LOG.isDebugEnabled())
                    LOG.debug("Gathered for write {}", entry);
            }
            streams.clear();
        }

        // Perform resets outside the sync block.
        for (int i = 0; i < resets.size(); ++i)
        {
            Entry entry = resets.get(i);
            entry.reset();
        }
        resets.clear();

        if (actives.isEmpty())
        {
            if (LOG.isDebugEnabled())
                LOG.debug("Flushed {}", session);
            return Action.IDLE;
        }

        for (int i = 0; i < actives.size(); ++i)
        {
            Entry entry = actives.get(i);
            Throwable failure = entry.generate(lease);
            if (failure != null)
            {
                // Failure to generate the entry is catastrophic.
                failed(failure);
                return Action.SUCCEEDED;
            }
        }

        List<ByteBuffer> byteBuffers = lease.getByteBuffers();
        if (LOG.isDebugEnabled())
            LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
        session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
        return Action.SCHEDULED;
    }

    @Override
    public void succeeded()
    {
        lease.recycle();

        if (LOG.isDebugEnabled())
            LOG.debug("Written {} frames for {}", actives.size(), actives);

        actives.forEach(Entry::succeeded);
        actives.clear();

        super.succeeded();
    }

    @Override
    protected void onCompleteSuccess()
    {
        throw new IllegalStateException();
    }

    @Override
    protected void onCompleteFailure(Throwable x)
    {
        lease.recycle();

        Throwable closed;
        synchronized (this)
        {
            closed = terminated;
            terminated = x;
            if (LOG.isDebugEnabled())
                LOG.debug("{}, active/queued={}/{}", closed != null ? "Closing" : "Failing", actives.size(), frames.size());
            actives.addAll(frames);
            frames.clear();
        }

        actives.forEach(entry -> entry.failed(x));
        actives.clear();

        // If the failure came from within the
        // flusher, we need to close the connection.
        if (closed == null)
            session.abort(x);
    }

    void terminate(Throwable cause)
    {
        Throwable closed;
        synchronized (this)
        {
            closed = terminated;
            terminated = cause;
            if (LOG.isDebugEnabled())
                LOG.debug("{}", closed != null ? "Terminated" : "Terminating");
        }
        if (closed == null)
            iterate();
    }

    private void closed(Entry entry, Throwable failure)
    {
        entry.failed(failure);
    }

    public static abstract class Entry extends Callback.Nested
    {
        protected final Frame frame;
        protected final IStream stream;

        protected Entry(Frame frame, IStream stream, Callback callback)
        {
            super(callback);
            this.frame = frame;
            this.stream = stream;
        }

        public int dataRemaining()
        {
            return 0;
        }

        public Throwable generate(ByteBufferPool.Lease lease)
        {
            return null;
        }

        public void reset()
        {
            failed(new EofException("reset"));
        }

        @Override
        public void failed(Throwable x)
        {
            if (stream != null)
            {
                stream.close();
                stream.getSession().removeStream(stream);
            }
            super.failed(x);
        }

        public boolean isProtocol()
        {
            switch (frame.getType())
            {
                case PRIORITY:
                case RST_STREAM:
                case GO_AWAY:
                case WINDOW_UPDATE:
                case DISCONNECT:
                    return true;
                default:
                    return false;
            }
        }

        @Override
        public String toString()
        {
            return frame.toString();
        }
    }

    private class WindowEntry
    {
        private final IStream stream;
        private final WindowUpdateFrame frame;

        public WindowEntry(IStream stream, WindowUpdateFrame frame)
        {
            this.stream = stream;
            this.frame = frame;
        }

        public void perform()
        {
            FlowControlStrategy flowControl = session.getFlowControlStrategy();
            flowControl.onWindowUpdate(session, stream, frame);
        }
    }
}
