//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.http2;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;

/**
 * <p>A flow control strategy that accumulates updates and emits window control
 * frames when the accumulated value reaches a threshold.</p>
 * <p>The sender flow control window is represented in the receiver as two
 * buckets: a bigger bucket, initially full, that is drained when data is
 * received, and a smaller bucket, initially empty, that is filled when data is
 * consumed. Only the smaller bucket can refill the bigger bucket.</p>
 * <p>The smaller bucket is defined as a fraction of the bigger bucket.</p>
 * <p>For a more visual representation, see the
 * <a href="http://en.wikipedia.org/wiki/Shishi-odoshi">rocking bamboo fountain</a>.</p>
 * <p>The algorithm works in this way.</p>
 * <p>The initial bigger bucket (BB) capacity is 100, and let's imagine the smaller
 * bucket (SB) being 40% of the bigger bucket: 40.</p>
 * <p>The receiver receives a data frame of 60, so now BB=40; the data frame is
 * passed to the application that consumes 25, so now SB=25. Since SB is not full,
 * no window control frames are emitted.</p>
 * <p>The application consumes other 20, so now SB=45. Since SB is full, its 45
 * are transferred to BB, which is now BB=85, and a window control frame is sent
 * with delta=45.</p>
 * <p>The application consumes the remaining 15, so now SB=15, and no window
 * control frame is emitted.</p>
 */
@ManagedObject
public class BufferingFlowControlStrategy extends AbstractFlowControlStrategy
{
    private final AtomicInteger maxSessionRecvWindow = new AtomicInteger(DEFAULT_WINDOW_SIZE);
    private final AtomicInteger sessionLevel = new AtomicInteger();
    private final Map<IStream, AtomicInteger> streamLevels = new ConcurrentHashMap<>();
    private float bufferRatio;

    public BufferingFlowControlStrategy(float bufferRatio)
    {
        this(DEFAULT_WINDOW_SIZE, bufferRatio);
    }

    public BufferingFlowControlStrategy(int initialStreamSendWindow, float bufferRatio)
    {
        super(initialStreamSendWindow);
        this.bufferRatio = bufferRatio;
    }

    @ManagedAttribute("The ratio between the receive buffer and the consume buffer")
    public float getBufferRatio()
    {
        return bufferRatio;
    }

    public void setBufferRatio(float bufferRatio)
    {
        this.bufferRatio = bufferRatio;
    }

    @Override
    public void onStreamCreated(IStream stream)
    {
        super.onStreamCreated(stream);
        streamLevels.put(stream, new AtomicInteger());
    }

    @Override
    public void onStreamDestroyed(IStream stream)
    {
        streamLevels.remove(stream);
        super.onStreamDestroyed(stream);
    }

    @Override
    public void onDataConsumed(ISession session, IStream stream, int length)
    {
        if (length <= 0)
            return;

        float ratio = bufferRatio;

        WindowUpdateFrame windowFrame = null;
        int level = sessionLevel.addAndGet(length);
        int maxLevel = (int)(maxSessionRecvWindow.get() * ratio);
        if (level > maxLevel)
        {
            if (sessionLevel.compareAndSet(level, 0))
            {
                session.updateRecvWindow(level);
                if (LOG.isDebugEnabled())
                    LOG.debug("Data consumed, {} bytes, updated session recv window by {}/{} for {}", length, level, maxLevel, session);
                windowFrame = new WindowUpdateFrame(0, level);
            }
            else
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("Data consumed, {} bytes, concurrent session recv window level {}/{} for {}", length, sessionLevel, maxLevel, session);
            }
        }
        else
        {
            if (LOG.isDebugEnabled())
                LOG.debug("Data consumed, {} bytes, session recv window level {}/{} for {}", length, level, maxLevel, session);
        }

        Frame[] windowFrames = Frame.EMPTY_ARRAY;
        if (stream != null)
        {
            if (stream.isClosed())
            {
                if (LOG.isDebugEnabled())
                    LOG.debug("Data consumed, {} bytes, ignoring update stream recv window for closed {}", length, stream);
            }
            else
            {
                AtomicInteger streamLevel = streamLevels.get(stream);
                if (streamLevel != null)
                {
                    level = streamLevel.addAndGet(length);
                    maxLevel = (int)(getInitialStreamRecvWindow() * ratio);
                    if (level > maxLevel)
                    {
                        level = streamLevel.getAndSet(0);
                        stream.updateRecvWindow(level);
                        if (LOG.isDebugEnabled())
                            LOG.debug("Data consumed, {} bytes, updated stream recv window by {}/{} for {}", length, level, maxLevel, stream);
                        WindowUpdateFrame frame = new WindowUpdateFrame(stream.getId(), level);
                        if (windowFrame == null)
                            windowFrame = frame;
                        else
                            windowFrames = new Frame[]{frame};
                    }
                    else
                    {
                        if (LOG.isDebugEnabled())
                            LOG.debug("Data consumed, {} bytes, stream recv window level {}/{} for {}", length, level, maxLevel, stream);
                    }
                }
            }
        }

        if (windowFrame != null)
            session.frames(stream, Callback.NOOP, windowFrame, windowFrames);
    }

    @Override
    public void windowUpdate(ISession session, IStream stream, WindowUpdateFrame frame)
    {
        super.windowUpdate(session, stream, frame);

        // Window updates cannot be negative.
        // The SettingsFrame.INITIAL_WINDOW_SIZE setting
        // only influences the *stream* window size.
        // Therefore the session window can only be enlarged,
        // and here we keep track of its max value.

        // Updating the max session recv window is done here
        // so that if a peer decides to send an unilateral
        // window update to enlarge the session window,
        // without the corresponding data consumption, here
        // we can track it.
        // Note that it is not perfect, since there is a time
        // window between the session recv window being updated
        // before the window update frame is sent, and the
        // invocation of this method: in between data may arrive
        // and reduce the session recv window size.
        // But eventually the max value will be seen.

        // Note that we cannot avoid the time window described
        // above by updating the session recv window from here
        // because there is a race between the sender and the
        // receiver: the sender may receive a window update and
        // send more data, while this method has not yet been
        // invoked; when the data is received the session recv
        // window may become negative and the connection will
        // be closed (per specification).

        if (frame.getStreamId() == 0)
        {
            int sessionWindow = session.updateRecvWindow(0);
            Atomics.updateMax(maxSessionRecvWindow, sessionWindow);
        }
    }

    @Override
    public String toString()
    {
        return String.format("%s@%x[ratio=%.2f,sessionLevel=%s,sessionStallTime=%dms,streamsStallTime=%dms]",
                getClass().getSimpleName(),
                hashCode(),
                bufferRatio,
                sessionLevel,
                getSessionStallTime(),
                getStreamsStallTime());
    }
}
