//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.util;

import java.util.AbstractList;
import java.util.Collection;
import java.util.Iterator;
import java.util.ListIterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * A BlockingQueue backed by a circular array capable or growing.
 * <p/>
 * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array.
 * <p/>
 * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call.
 * <p/>
 * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to
 * {@link Integer#MAX_VALUE}.
 * 
 * @param <E>
 *            The element type
 */
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{
    /**
     * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of
     * the array itself).
     */
    private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
    /**
     * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it.
     */
    private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
    /**
     * Default initial capacity, 128.
     */
    public static final int DEFAULT_CAPACITY = 128;
    /**
     * Default growth factor, 64.
     */
    public static final int DEFAULT_GROWTH = 64;

    private final int _maxCapacity;
    private final int _growCapacity;
    /**
     * Array that holds the head and tail indexes, separated by a cache line to avoid false sharing
     */
    private final int[] _indexes = new int[TAIL_OFFSET + 1];
    private final Lock _tailLock = new ReentrantLock();
    private final AtomicInteger _size = new AtomicInteger();
    private final Lock _headLock = new ReentrantLock();
    private final Condition _notEmpty = _headLock.newCondition();
    private Object[] _elements;

    /**
     * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
     * 
     * @see #DEFAULT_CAPACITY
     * @see #DEFAULT_GROWTH
     */
    public BlockingArrayQueue()
    {
        _elements = new Object[DEFAULT_CAPACITY];
        _growCapacity = DEFAULT_GROWTH;
        _maxCapacity = Integer.MAX_VALUE;
    }

    /**
     * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter.
     * 
     * @param maxCapacity
     *            the maximum capacity
     */
    public BlockingArrayQueue(int maxCapacity)
    {
        _elements = new Object[maxCapacity];
        _growCapacity = -1;
        _maxCapacity = maxCapacity;
    }

    /**
     * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
     * 
     * @param capacity
     *            the initial capacity
     * @param growBy
     *            the growth factor
     */
    public BlockingArrayQueue(int capacity, int growBy)
    {
        _elements = new Object[capacity];
        _growCapacity = growBy;
        _maxCapacity = Integer.MAX_VALUE;
    }

    /**
     * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
     * 
     * @param capacity
     *            the initial capacity
     * @param growBy
     *            the growth factor
     * @param maxCapacity
     *            the maximum capacity
     */
    public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
    {
        if (capacity > maxCapacity)
            throw new IllegalArgumentException();
        _elements = new Object[capacity];
        _growCapacity = growBy;
        _maxCapacity = maxCapacity;
    }

    /*----------------------------------------------------------------------------*/
    /* Collection methods */
    /*----------------------------------------------------------------------------*/

    @Override
    public void clear()
    {

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                _indexes[HEAD_OFFSET] = 0;
                _indexes[TAIL_OFFSET] = 0;
                _size.set(0);
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    @Override
    public int size()
    {
        return _size.get();
    }

    @Override
    public Iterator<E> iterator()
    {
        return listIterator();
    }

    /*----------------------------------------------------------------------------*/
    /* Queue methods */
    /*----------------------------------------------------------------------------*/

    @SuppressWarnings("unchecked")
    @Override
    public E poll()
    {
        if (_size.get() == 0)
            return null;

        E e = null;

        _headLock.lock(); // Size cannot shrink
        try
        {
            if (_size.get() > 0)
            {
                final int head = _indexes[HEAD_OFFSET];
                e = (E)_elements[head];
                _elements[head] = null;
                _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;
                if (_size.decrementAndGet() > 0)
                    _notEmpty.signal();
            }
        }
        finally
        {
            _headLock.unlock();
        }
        return e;
    }

    @SuppressWarnings("unchecked")
    @Override
    public E peek()
    {
        if (_size.get() == 0)
            return null;

        E e = null;

        _headLock.lock(); // Size cannot shrink
        try
        {
            if (_size.get() > 0)
                e = (E)_elements[_indexes[HEAD_OFFSET]];
        }
        finally
        {
            _headLock.unlock();
        }
        return e;
    }

    @Override
    public E remove()
    {
        E e = poll();
        if (e == null)
            throw new NoSuchElementException();
        return e;
    }

    @Override
    public E element()
    {
        E e = peek();
        if (e == null)
            throw new NoSuchElementException();
        return e;
    }

    /*----------------------------------------------------------------------------*/
    /* BlockingQueue methods */
    /*----------------------------------------------------------------------------*/

    @Override
    public boolean offer(E e)
    {
        Objects.requireNonNull(e);

        boolean notEmpty = false;
        _tailLock.lock(); // Size cannot grow... only shrink
        try
        {
            int size = _size.get();
            if (size >= _maxCapacity)
                return false;

            // Should we expand array?
            if (size == _elements.length)
            {
                _headLock.lock();
                try
                {
                    if (!grow())
                        return false;
                }
                finally
                {
                    _headLock.unlock();
                }
            }

            // Re-read head and tail after a possible grow
            int tail = _indexes[TAIL_OFFSET];
            _elements[tail] = e;
            _indexes[TAIL_OFFSET] = (tail + 1) % _elements.length;
            notEmpty = _size.getAndIncrement() == 0;
        }
        finally
        {
            _tailLock.unlock();
        }

        if (notEmpty)
        {
            _headLock.lock();
            try
            {
                _notEmpty.signal();
            }
            finally
            {
                _headLock.unlock();
            }
        }

        return true;
    }

    @Override
    public boolean add(E e)
    {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException();
    }

    @Override
    public void put(E o) throws InterruptedException
    {
        // The mechanism to await and signal when the queue is full is not implemented
        throw new UnsupportedOperationException();
    }

    @Override
    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
    {
        // The mechanism to await and signal when the queue is full is not implemented
        throw new UnsupportedOperationException();
    }

    @SuppressWarnings("unchecked")
    @Override
    public E take() throws InterruptedException
    {
        E e = null;

        _headLock.lockInterruptibly(); // Size cannot shrink
        try
        {
            try
            {
                while (_size.get() == 0)
                {
                    _notEmpty.await();
                }
            }
            catch (InterruptedException ie)
            {
                _notEmpty.signal();
                throw ie;
            }

            final int head = _indexes[HEAD_OFFSET];
            e = (E)_elements[head];
            _elements[head] = null;
            _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;

            if (_size.decrementAndGet() > 0)
                _notEmpty.signal();
        }
        finally
        {
            _headLock.unlock();
        }
        return e;
    }

    @SuppressWarnings("unchecked")
    @Override
    public E poll(long time, TimeUnit unit) throws InterruptedException
    {
        long nanos = unit.toNanos(time);
        E e = null;

        _headLock.lockInterruptibly(); // Size cannot shrink
        try
        {
            try
            {
                while (_size.get() == 0)
                {
                    if (nanos <= 0)
                        return null;
                    nanos = _notEmpty.awaitNanos(nanos);
                }
            }
            catch (InterruptedException x)
            {
                _notEmpty.signal();
                throw x;
            }

            int head = _indexes[HEAD_OFFSET];
            e = (E)_elements[head];
            _elements[head] = null;
            _indexes[HEAD_OFFSET] = (head + 1) % _elements.length;

            if (_size.decrementAndGet() > 0)
                _notEmpty.signal();
        }
        finally
        {
            _headLock.unlock();
        }
        return e;
    }

    @Override
    public boolean remove(Object o)
    {

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                if (isEmpty())
                    return false;

                final int head = _indexes[HEAD_OFFSET];
                final int tail = _indexes[TAIL_OFFSET];
                final int capacity = _elements.length;

                int i = head;
                while (true)
                {
                    if (Objects.equals(_elements[i],o))
                    {
                        remove(i >= head?i - head:capacity - head + i);
                        return true;
                    }
                    ++i;
                    if (i == capacity)
                        i = 0;
                    if (i == tail)
                        return false;
                }
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    @Override
    public int remainingCapacity()
    {

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                return getCapacity() - size();
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    @Override
    public int drainTo(Collection<? super E> c)
    {
        throw new UnsupportedOperationException();
    }

    @Override
    public int drainTo(Collection<? super E> c, int maxElements)
    {
        throw new UnsupportedOperationException();
    }

    /*----------------------------------------------------------------------------*/
    /* List methods */
    /*----------------------------------------------------------------------------*/

    @SuppressWarnings("unchecked")
    @Override
    public E get(int index)
    {

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                if (index < 0 || index >= _size.get())
                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
                int i = _indexes[HEAD_OFFSET] + index;
                int capacity = _elements.length;
                if (i >= capacity)
                    i -= capacity;
                return (E)_elements[i];
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    @Override
    public void add(int index, E e)
    {
        if (e == null)
            throw new NullPointerException();

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                final int size = _size.get();

                if (index < 0 || index > size)
                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");

                if (index == size)
                {
                    add(e);
                }
                else
                {
                    if (_indexes[TAIL_OFFSET] == _indexes[HEAD_OFFSET])
                        if (!grow())
                            throw new IllegalStateException("full");

                    // Re-read head and tail after a possible grow
                    int i = _indexes[HEAD_OFFSET] + index;
                    int capacity = _elements.length;

                    if (i >= capacity)
                        i -= capacity;

                    _size.incrementAndGet();
                    int tail = _indexes[TAIL_OFFSET];
                    _indexes[TAIL_OFFSET] = tail = (tail + 1) % capacity;

                    if (i < tail)
                    {
                        System.arraycopy(_elements,i,_elements,i + 1,tail - i);
                        _elements[i] = e;
                    }
                    else
                    {
                        if (tail > 0)
                        {
                            System.arraycopy(_elements,0,_elements,1,tail);
                            _elements[0] = _elements[capacity - 1];
                        }

                        System.arraycopy(_elements,i,_elements,i + 1,capacity - i - 1);
                        _elements[i] = e;
                    }
                }
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    @Override
    public E set(int index, E e)
    {
        Objects.requireNonNull(e);

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                if (index < 0 || index >= _size.get())
                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");

                int i = _indexes[HEAD_OFFSET] + index;
                int capacity = _elements.length;
                if (i >= capacity)
                    i -= capacity;
                E old = (E)_elements[i];
                _elements[i] = e;
                return old;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    @SuppressWarnings("unchecked")
    @Override
    public E remove(int index)
    {

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                if (index < 0 || index >= _size.get())
                    throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");

                int i = _indexes[HEAD_OFFSET] + index;
                int capacity = _elements.length;
                if (i >= capacity)
                    i -= capacity;
                E old = (E)_elements[i];

                int tail = _indexes[TAIL_OFFSET];
                if (i < tail)
                {
                    System.arraycopy(_elements,i + 1,_elements,i,tail - i);
                    --_indexes[TAIL_OFFSET];
                }
                else
                {
                    System.arraycopy(_elements,i + 1,_elements,i,capacity - i - 1);
                    _elements[capacity - 1] = _elements[0];
                    if (tail > 0)
                    {
                        System.arraycopy(_elements,1,_elements,0,tail);
                        --_indexes[TAIL_OFFSET];
                    }
                    else
                    {
                        _indexes[TAIL_OFFSET] = capacity - 1;
                    }
                    _elements[_indexes[TAIL_OFFSET]] = null;
                }

                _size.decrementAndGet();

                return old;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    @Override
    public ListIterator<E> listIterator(int index)
    {

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                Object[] elements = new Object[size()];
                if (size() > 0)
                {
                    int head = _indexes[HEAD_OFFSET];
                    int tail = _indexes[TAIL_OFFSET];
                    if (head < tail)
                    {
                        System.arraycopy(_elements,head,elements,0,tail - head);
                    }
                    else
                    {
                        int chunk = _elements.length - head;
                        System.arraycopy(_elements,head,elements,0,chunk);
                        System.arraycopy(_elements,0,elements,chunk,tail);
                    }
                }
                return new Itr(elements,index);
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /*----------------------------------------------------------------------------*/
    /* Additional methods */
    /*----------------------------------------------------------------------------*/

    /**
     * @return the current capacity of this queue
     */
    public int getCapacity()
    {
        _tailLock.lock();
        try
        {
            return _elements.length;
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    /**
     * @return the max capacity of this queue, or -1 if this queue is unbounded
     */
    public int getMaxCapacity()
    {
        return _maxCapacity;
    }

    /*----------------------------------------------------------------------------*/
    /* Implementation methods */
    /*----------------------------------------------------------------------------*/

    private boolean grow()
    {
        if (_growCapacity <= 0)
            return false;

        _tailLock.lock();
        try
        {

            _headLock.lock();
            try
            {
                final int head = _indexes[HEAD_OFFSET];
                final int tail = _indexes[TAIL_OFFSET];
                final int newTail;
                final int capacity = _elements.length;

                Object[] elements = new Object[capacity + _growCapacity];

                if (head < tail)
                {
                    newTail = tail - head;
                    System.arraycopy(_elements,head,elements,0,newTail);
                }
                else if (head > tail || _size.get() > 0)
                {
                    newTail = capacity + tail - head;
                    int cut = capacity - head;
                    System.arraycopy(_elements,head,elements,0,cut);
                    System.arraycopy(_elements,0,elements,cut,tail);
                }
                else
                {
                    newTail = 0;
                }

                _elements = elements;
                _indexes[HEAD_OFFSET] = 0;
                _indexes[TAIL_OFFSET] = newTail;
                return true;
            }
            finally
            {
                _headLock.unlock();
            }
        }
        finally
        {
            _tailLock.unlock();
        }
    }

    private class Itr implements ListIterator<E>
    {
        private final Object[] _elements;
        private int _cursor;

        public Itr(Object[] elements, int offset)
        {
            _elements = elements;
            _cursor = offset;
        }

        @Override
        public boolean hasNext()
        {
            return _cursor < _elements.length;
        }

        @SuppressWarnings("unchecked")
        @Override
        public E next()
        {
            return (E)_elements[_cursor++];
        }

        @Override
        public boolean hasPrevious()
        {
            return _cursor > 0;
        }

        @SuppressWarnings("unchecked")
        @Override
        public E previous()
        {
            return (E)_elements[--_cursor];
        }

        @Override
        public int nextIndex()
        {
            return _cursor + 1;
        }

        @Override
        public int previousIndex()
        {
            return _cursor - 1;
        }

        @Override
        public void remove()
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public void set(E e)
        {
            throw new UnsupportedOperationException();
        }

        @Override
        public void add(E e)
        {
            throw new UnsupportedOperationException();
        }
    }
}
