| // |
| // ======================================================================== |
| // Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.util; |
| |
| import java.util.AbstractQueue; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| import java.util.Objects; |
| import java.util.Queue; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.atomic.AtomicIntegerArray; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.AtomicReferenceArray; |
| |
| /** |
| * A concurrent, unbounded implementation of {@link Queue} that uses singly-linked array blocks |
| * to store elements. |
| * <p> |
| * This class is a drop-in replacement for {@link ConcurrentLinkedQueue}, with similar performance |
| * but producing less garbage because arrays are used to store elements rather than nodes. |
| * </p> |
| * <p> |
| * The algorithm used is a variation of the algorithm from Gidenstam, Sundell and Tsigas |
| * (http://www.adm.hb.se/~AGD/Presentations/CacheAwareQueue_OPODIS.pdf). |
| * </p> |
| * |
| * @param <T> the Array entry type |
| */ |
| public class ConcurrentArrayQueue<T> extends AbstractQueue<T> |
| { |
| public static final int DEFAULT_BLOCK_SIZE = 512; |
| public static final Object REMOVED_ELEMENT = new Object() |
| { |
| @Override |
| public String toString() |
| { |
| return "X"; |
| } |
| }; |
| |
| private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1; |
| private static final int TAIL_OFFSET = MemoryUtils.getIntegersPerCacheLine()*2 -1; |
| |
| private final AtomicReferenceArray<Block<T>> _blocks = new AtomicReferenceArray<>(TAIL_OFFSET + 1); |
| private final int _blockSize; |
| |
| public ConcurrentArrayQueue() |
| { |
| this(DEFAULT_BLOCK_SIZE); |
| } |
| |
| public ConcurrentArrayQueue(int blockSize) |
| { |
| _blockSize = blockSize; |
| Block<T> block = newBlock(); |
| _blocks.set(HEAD_OFFSET,block); |
| _blocks.set(TAIL_OFFSET,block); |
| } |
| |
| public int getBlockSize() |
| { |
| return _blockSize; |
| } |
| |
| protected Block<T> getHeadBlock() |
| { |
| return _blocks.get(HEAD_OFFSET); |
| } |
| |
| protected Block<T> getTailBlock() |
| { |
| return _blocks.get(TAIL_OFFSET); |
| } |
| |
| @Override |
| public boolean offer(T item) |
| { |
| item = Objects.requireNonNull(item); |
| |
| final Block<T> initialTailBlock = getTailBlock(); |
| Block<T> currentTailBlock = initialTailBlock; |
| int tail = currentTailBlock.tail(); |
| while (true) |
| { |
| if (tail == getBlockSize()) |
| { |
| Block<T> nextTailBlock = currentTailBlock.next(); |
| if (nextTailBlock == null) |
| { |
| nextTailBlock = newBlock(); |
| if (currentTailBlock.link(nextTailBlock)) |
| { |
| // Linking succeeded, loop |
| currentTailBlock = nextTailBlock; |
| } |
| else |
| { |
| // Concurrent linking, use other block and loop |
| currentTailBlock = currentTailBlock.next(); |
| } |
| } |
| else |
| { |
| // Not at last block, loop |
| currentTailBlock = nextTailBlock; |
| } |
| tail = currentTailBlock.tail(); |
| } |
| else |
| { |
| if (currentTailBlock.peek(tail) == null) |
| { |
| if (currentTailBlock.store(tail, item)) |
| { |
| // Item stored |
| break; |
| } |
| else |
| { |
| // Concurrent store, try next index |
| ++tail; |
| } |
| } |
| else |
| { |
| // Not free, try next index |
| ++tail; |
| } |
| } |
| } |
| |
| updateTailBlock(initialTailBlock, currentTailBlock); |
| |
| return true; |
| } |
| |
| private void updateTailBlock(Block<T> oldTailBlock, Block<T> newTailBlock) |
| { |
| // Update the tail block pointer if needs to |
| if (oldTailBlock != newTailBlock) |
| { |
| // The tail block pointer is allowed to lag behind. |
| // If this update fails, it means that other threads |
| // have filled this block and installed a new one. |
| casTailBlock(oldTailBlock, newTailBlock); |
| } |
| } |
| |
| protected boolean casTailBlock(Block<T> current, Block<T> update) |
| { |
| return _blocks.compareAndSet(TAIL_OFFSET,current,update); |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public T poll() |
| { |
| final Block<T> initialHeadBlock = getHeadBlock(); |
| Block<T> currentHeadBlock = initialHeadBlock; |
| int head = currentHeadBlock.head(); |
| T result = null; |
| while (true) |
| { |
| if (head == getBlockSize()) |
| { |
| Block<T> nextHeadBlock = currentHeadBlock.next(); |
| if (nextHeadBlock == null) |
| { |
| // We could have read that the next head block was null |
| // but another thread allocated a new block and stored a |
| // new item. This thread could not detect this, but that |
| // is ok, otherwise we would not be able to exit this loop. |
| |
| // Queue is empty |
| break; |
| } |
| else |
| { |
| // Use next block and loop |
| currentHeadBlock = nextHeadBlock; |
| head = currentHeadBlock.head(); |
| } |
| } |
| else |
| { |
| Object element = currentHeadBlock.peek(head); |
| if (element == REMOVED_ELEMENT) |
| { |
| // Already removed, try next index |
| ++head; |
| } |
| else |
| { |
| result = (T)element; |
| if (result != null) |
| { |
| if (currentHeadBlock.remove(head, result, true)) |
| { |
| // Item removed |
| break; |
| } |
| else |
| { |
| // Concurrent remove, try next index |
| ++head; |
| } |
| } |
| else |
| { |
| // Queue is empty |
| break; |
| } |
| } |
| } |
| } |
| |
| updateHeadBlock(initialHeadBlock, currentHeadBlock); |
| |
| return result; |
| } |
| |
| private void updateHeadBlock(Block<T> oldHeadBlock, Block<T> newHeadBlock) |
| { |
| // Update the head block pointer if needs to |
| if (oldHeadBlock != newHeadBlock) |
| { |
| // The head block pointer lagged behind. |
| // If this update fails, it means that other threads |
| // have emptied this block and pointed to a new one. |
| casHeadBlock(oldHeadBlock, newHeadBlock); |
| } |
| } |
| |
| protected boolean casHeadBlock(Block<T> current, Block<T> update) |
| { |
| return _blocks.compareAndSet(HEAD_OFFSET,current,update); |
| } |
| |
| @Override |
| public T peek() |
| { |
| Block<T> currentHeadBlock = getHeadBlock(); |
| int head = currentHeadBlock.head(); |
| while (true) |
| { |
| if (head == getBlockSize()) |
| { |
| Block<T> nextHeadBlock = currentHeadBlock.next(); |
| if (nextHeadBlock == null) |
| { |
| // Queue is empty |
| return null; |
| } |
| else |
| { |
| // Use next block and loop |
| currentHeadBlock = nextHeadBlock; |
| head = currentHeadBlock.head(); |
| } |
| } |
| else |
| { |
| T element = currentHeadBlock.peek(head); |
| if (element == REMOVED_ELEMENT) |
| { |
| // Already removed, try next index |
| ++head; |
| } |
| else |
| { |
| return element; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean remove(Object o) |
| { |
| Block<T> currentHeadBlock = getHeadBlock(); |
| int head = currentHeadBlock.head(); |
| boolean result = false; |
| while (true) |
| { |
| if (head == getBlockSize()) |
| { |
| Block<T> nextHeadBlock = currentHeadBlock.next(); |
| if (nextHeadBlock == null) |
| { |
| // Not found |
| break; |
| } |
| else |
| { |
| // Use next block and loop |
| currentHeadBlock = nextHeadBlock; |
| head = currentHeadBlock.head(); |
| } |
| } |
| else |
| { |
| Object element = currentHeadBlock.peek(head); |
| if (element == REMOVED_ELEMENT) |
| { |
| // Removed, try next index |
| ++head; |
| } |
| else |
| { |
| if (element == null) |
| { |
| // Not found |
| break; |
| } |
| else |
| { |
| if (element.equals(o)) |
| { |
| // Found |
| if (currentHeadBlock.remove(head, o, false)) |
| { |
| result = true; |
| break; |
| } |
| else |
| { |
| ++head; |
| } |
| } |
| else |
| { |
| // Not the one we're looking for |
| ++head; |
| } |
| } |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| @Override |
| public boolean removeAll(Collection<?> c) |
| { |
| // TODO: super invocations are based on iterator.remove(), which throws |
| return super.removeAll(c); |
| } |
| |
| @Override |
| public boolean retainAll(Collection<?> c) |
| { |
| // TODO: super invocations are based on iterator.remove(), which throws |
| return super.retainAll(c); |
| } |
| |
| @Override |
| public Iterator<T> iterator() |
| { |
| final List<Object[]> blocks = new ArrayList<>(); |
| Block<T> currentHeadBlock = getHeadBlock(); |
| while (currentHeadBlock != null) |
| { |
| Object[] elements = currentHeadBlock.arrayCopy(); |
| blocks.add(elements); |
| currentHeadBlock = currentHeadBlock.next(); |
| } |
| return new Iterator<T>() |
| { |
| private int blockIndex; |
| private int index; |
| |
| @Override |
| public boolean hasNext() |
| { |
| while (true) |
| { |
| if (blockIndex == blocks.size()) |
| return false; |
| |
| Object element = blocks.get(blockIndex)[index]; |
| |
| if (element == null) |
| return false; |
| |
| if (element != REMOVED_ELEMENT) |
| return true; |
| |
| advance(); |
| } |
| } |
| |
| @Override |
| public T next() |
| { |
| while (true) |
| { |
| if (blockIndex == blocks.size()) |
| throw new NoSuchElementException(); |
| |
| Object element = blocks.get(blockIndex)[index]; |
| |
| if (element == null) |
| throw new NoSuchElementException(); |
| |
| advance(); |
| |
| if (element != REMOVED_ELEMENT) { |
| @SuppressWarnings("unchecked") |
| T e = (T)element; |
| return e; |
| } |
| } |
| } |
| |
| private void advance() |
| { |
| if (++index == getBlockSize()) |
| { |
| index = 0; |
| ++blockIndex; |
| } |
| } |
| |
| @Override |
| public void remove() |
| { |
| throw new UnsupportedOperationException(); |
| } |
| }; |
| } |
| |
| @Override |
| public int size() |
| { |
| Block<T> currentHeadBlock = getHeadBlock(); |
| int head = currentHeadBlock.head(); |
| int size = 0; |
| while (true) |
| { |
| if (head == getBlockSize()) |
| { |
| Block<T> nextHeadBlock = currentHeadBlock.next(); |
| if (nextHeadBlock == null) |
| { |
| break; |
| } |
| else |
| { |
| // Use next block and loop |
| currentHeadBlock = nextHeadBlock; |
| head = currentHeadBlock.head(); |
| } |
| } |
| else |
| { |
| Object element = currentHeadBlock.peek(head); |
| if (element == REMOVED_ELEMENT) |
| { |
| // Already removed, try next index |
| ++head; |
| } |
| else if (element != null) |
| { |
| ++size; |
| ++head; |
| } |
| else |
| { |
| break; |
| } |
| } |
| } |
| return size; |
| } |
| |
| protected Block<T> newBlock() |
| { |
| return new Block<>(getBlockSize()); |
| } |
| |
| protected int getBlockCount() |
| { |
| int result = 0; |
| Block<T> headBlock = getHeadBlock(); |
| while (headBlock != null) |
| { |
| ++result; |
| headBlock = headBlock.next(); |
| } |
| return result; |
| } |
| |
| protected static final class Block<E> |
| { |
| private static final int headOffset = MemoryUtils.getIntegersPerCacheLine()-1; |
| private static final int tailOffset = MemoryUtils.getIntegersPerCacheLine()*2-1; |
| |
| private final AtomicReferenceArray<Object> elements; |
| private final AtomicReference<Block<E>> next = new AtomicReference<>(); |
| private final AtomicIntegerArray indexes = new AtomicIntegerArray(TAIL_OFFSET+1); |
| |
| protected Block(int blockSize) |
| { |
| elements = new AtomicReferenceArray<>(blockSize); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public E peek(int index) |
| { |
| return (E)elements.get(index); |
| } |
| |
| public boolean store(int index, E item) |
| { |
| boolean result = elements.compareAndSet(index, null, item); |
| if (result) |
| indexes.incrementAndGet(tailOffset); |
| return result; |
| } |
| |
| public boolean remove(int index, Object item, boolean updateHead) |
| { |
| boolean result = elements.compareAndSet(index, item, REMOVED_ELEMENT); |
| if (result && updateHead) |
| indexes.incrementAndGet(headOffset); |
| return result; |
| } |
| |
| public Block<E> next() |
| { |
| return next.get(); |
| } |
| |
| public boolean link(Block<E> nextBlock) |
| { |
| return next.compareAndSet(null, nextBlock); |
| } |
| |
| public int head() |
| { |
| return indexes.get(headOffset); |
| } |
| |
| public int tail() |
| { |
| return indexes.get(tailOffset); |
| } |
| |
| public Object[] arrayCopy() |
| { |
| Object[] result = new Object[elements.length()]; |
| for (int i = 0; i < result.length; ++i) |
| result[i] = elements.get(i); |
| return result; |
| } |
| } |
| } |