//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Stress;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(AdvancedRunner.class)
public class QueueBenchmarkTest
{
    private static final Logger logger = Log.getLogger(QueueBenchmarkTest.class);
    private static final Runnable ELEMENT = () -> {};
    private static final Runnable END = () -> {};

    @Stress("High CPU")
    @Test
    public void testQueues() throws Exception
    {
        int cores = Runtime.getRuntime().availableProcessors();
        Assume.assumeTrue(cores > 1);

        final int readers = cores / 2;
        final int writers = readers;
        final int iterations = 16 * 1024 * 1024;

        final List<Queue<Runnable>> queues = new ArrayList<>();
        queues.add(new ConcurrentArrayQueue<>()); // Jetty lock-free queue, allocating array blocks
        queues.add(new ConcurrentLinkedQueue<>()); // JDK lock-free queue, allocating nodes
        queues.add(new ArrayBlockingQueue<>(iterations * writers)); // JDK lock-based, circular array queue
        queues.add(new BlockingArrayQueue<>(iterations * writers)); // Jetty lock-based, circular array queue

        testQueues(readers, writers, iterations, queues, false);
    }

    @Stress("High CPU")
    @Test
    public void testBlockingQueues() throws Exception
    {
        int cores = Runtime.getRuntime().availableProcessors();
        Assume.assumeTrue(cores > 1);

        final int readers = cores / 2;
        final int writers = readers;
        final int iterations = 16 * 1024 * 1024;

        final List<Queue<Runnable>> queues = new ArrayList<>();
        queues.add(new LinkedBlockingQueue<>());
        queues.add(new ArrayBlockingQueue<>(iterations * writers));
        queues.add(new BlockingArrayQueue<>(iterations * writers));

        testQueues(readers, writers, iterations, queues, true);
    }

    private void testQueues(final int readers, final int writers, final int iterations, List<Queue<Runnable>> queues, final boolean blocking) throws Exception
    {
        final int runs = 8;
        int threads = readers + writers;
        final CyclicBarrier barrier = new CyclicBarrier(threads + 1);

        for (final Queue<Runnable> queue : queues)
        {
            for (int r = 0; r < runs; ++r)
            {
                for (int i = 0; i < readers; ++i)
                {
                    Thread thread = new Thread()
                    {
                        @Override
                        public void run()
                        {
                            await(barrier);
                            consume(queue, writers, blocking);
                            await(barrier);
                        }
                    };
                    thread.start();
                }
                for (int i = 0; i < writers; ++i)
                {
                    Thread thread = new Thread()
                    {
                        @Override
                        public void run()
                        {
                            await(barrier);
                            produce(queue, readers, iterations);
                            await(barrier);
                        }
                    };
                    thread.start();
                }

                await(barrier);
                long begin = System.nanoTime();
                await(barrier);
                long end = System.nanoTime();
                long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin);
                logger.info("{} Readers/Writers: {}/{} => {} ms", queue.getClass().getSimpleName(), readers, writers, elapsed);
            }
        }
    }

    private static void consume(Queue<Runnable> queue, int writers, boolean blocking)
    {
        while (true)
        {
            Runnable element = blocking ? take(queue) : poll(queue);
            if (element == END)
                if (--writers == 0)
                    break;
        }
    }

    private static void produce(Queue<Runnable> queue, int readers, int iterations)
    {
        for (int i = 0; i < iterations; ++i)
            append(queue, ELEMENT);
        for (int i = 0; i < readers; ++i)
            append(queue, END);
    }

    private static void append(Queue<Runnable> queue, Runnable element)
    {
        if (!queue.offer(element))
            logger.warn("Queue {} capacity is too small", queue);
    }

    private static Runnable take(Queue<Runnable> queue)
    {
        try
        {
            return ((BlockingQueue<Runnable>)queue).take();
        }
        catch (InterruptedException x)
        {
            throw new RuntimeException(x);
        }
    }

    private static Runnable poll(Queue<Runnable> queue)
    {
        int loops = 0;
        while (true)
        {
            Runnable element = queue.poll();
            if (element != null)
                return element;
            // Busy loop
            sleepMicros(1);
            ++loops;
            if (loops % 16 == 0)
                logger.warn("Spin looping while polling empty queue: {} spins: ", loops);
        }
    }

    private static void sleepMicros(long sleep)
    {
        try
        {
            TimeUnit.MICROSECONDS.sleep(sleep);
        }
        catch (InterruptedException x)
        {
            throw new RuntimeException(x);
        }
    }

    private static void await(CyclicBarrier barrier)
    {
        try
        {
            barrier.await();
        }
        catch (Exception x)
        {
            throw new RuntimeException(x);
        }
    }
}
