| // |
| // ======================================================================== |
| // Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.util; |
| |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.CyclicBarrier; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.eclipse.jetty.toolchain.test.AdvancedRunner; |
| import org.eclipse.jetty.toolchain.test.annotation.Stress; |
| import org.eclipse.jetty.util.log.Log; |
| import org.eclipse.jetty.util.log.Logger; |
| import org.junit.Assume; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| |
| @RunWith(AdvancedRunner.class) |
| public class QueueBenchmarkTest |
| { |
| private static final Logger logger = Log.getLogger(QueueBenchmarkTest.class); |
| private static final Runnable ELEMENT = new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| } |
| }; |
| private static final Runnable END = new Runnable() |
| { |
| @Override |
| public void run() |
| { |
| } |
| }; |
| |
| @Stress("High CPU") |
| @Test |
| public void testQueues() throws Exception |
| { |
| int cores = Runtime.getRuntime().availableProcessors(); |
| Assume.assumeTrue(cores > 1); |
| |
| final int readers = cores / 2; |
| final int writers = readers; |
| final int iterations = 16 * 1024 * 1024; |
| |
| final List<Queue<Runnable>> queues = new ArrayList<>(); |
| queues.add(new ConcurrentArrayQueue<Runnable>()); // Jetty lock-free queue, allocating array blocks |
| queues.add(new ConcurrentLinkedQueue<Runnable>()); // JDK lock-free queue, allocating nodes |
| queues.add(new ArrayBlockingQueue<Runnable>(iterations * writers)); // JDK lock-based, circular array queue |
| queues.add(new BlockingArrayQueue<Runnable>(iterations * writers)); // Jetty lock-based, circular array queue |
| |
| testQueues(readers, writers, iterations, queues, false); |
| } |
| |
| @Stress("High CPU") |
| @Test |
| public void testBlockingQueues() throws Exception |
| { |
| int cores = Runtime.getRuntime().availableProcessors(); |
| Assume.assumeTrue(cores > 1); |
| |
| final int readers = cores / 2; |
| final int writers = readers; |
| final int iterations = 16 * 1024 * 1024; |
| |
| final List<Queue<Runnable>> queues = new ArrayList<>(); |
| queues.add(new LinkedBlockingQueue<Runnable>()); |
| queues.add(new ArrayBlockingQueue<Runnable>(iterations * writers)); |
| queues.add(new BlockingArrayQueue<Runnable>(iterations * writers)); |
| |
| testQueues(readers, writers, iterations, queues, true); |
| } |
| |
| private void testQueues(final int readers, final int writers, final int iterations, List<Queue<Runnable>> queues, final boolean blocking) throws Exception |
| { |
| final int runs = 8; |
| int threads = readers + writers; |
| final CyclicBarrier barrier = new CyclicBarrier(threads + 1); |
| |
| for (final Queue<Runnable> queue : queues) |
| { |
| for (int r = 0; r < runs; ++r) |
| { |
| for (int i = 0; i < readers; ++i) |
| { |
| Thread thread = new Thread() |
| { |
| @Override |
| public void run() |
| { |
| await(barrier); |
| consume(queue, writers, blocking); |
| await(barrier); |
| } |
| }; |
| thread.start(); |
| } |
| for (int i = 0; i < writers; ++i) |
| { |
| Thread thread = new Thread() |
| { |
| @Override |
| public void run() |
| { |
| await(barrier); |
| produce(queue, readers, iterations); |
| await(barrier); |
| } |
| }; |
| thread.start(); |
| } |
| |
| await(barrier); |
| long begin = System.nanoTime(); |
| await(barrier); |
| long end = System.nanoTime(); |
| long elapsed = TimeUnit.NANOSECONDS.toMillis(end - begin); |
| logger.info("{} Readers/Writers: {}/{} => {} ms", queue.getClass().getSimpleName(), readers, writers, elapsed); |
| } |
| } |
| } |
| |
| private static void consume(Queue<Runnable> queue, int writers, boolean blocking) |
| { |
| while (true) |
| { |
| Runnable element = blocking ? take(queue) : poll(queue); |
| if (element == END) |
| if (--writers == 0) |
| break; |
| } |
| } |
| |
| private static void produce(Queue<Runnable> queue, int readers, int iterations) |
| { |
| for (int i = 0; i < iterations; ++i) |
| append(queue, ELEMENT); |
| for (int i = 0; i < readers; ++i) |
| append(queue, END); |
| } |
| |
| private static void append(Queue<Runnable> queue, Runnable element) |
| { |
| if (!queue.offer(element)) |
| logger.warn("Queue {} capacity is too small", queue); |
| } |
| |
| private static Runnable take(Queue<Runnable> queue) |
| { |
| try |
| { |
| return ((BlockingQueue<Runnable>)queue).take(); |
| } |
| catch (InterruptedException x) |
| { |
| throw new RuntimeException(x); |
| } |
| } |
| |
| private static Runnable poll(Queue<Runnable> queue) |
| { |
| int loops = 0; |
| while (true) |
| { |
| Runnable element = queue.poll(); |
| if (element != null) |
| return element; |
| // Busy loop |
| sleepMicros(1); |
| ++loops; |
| if (loops % 16 == 0) |
| logger.warn("Spin looping while polling empty queue: {} spins: ", loops); |
| } |
| } |
| |
| private static void sleepMicros(long sleep) |
| { |
| try |
| { |
| TimeUnit.MICROSECONDS.sleep(sleep); |
| } |
| catch (InterruptedException x) |
| { |
| throw new RuntimeException(x); |
| } |
| } |
| |
| private static void await(CyclicBarrier barrier) |
| { |
| try |
| { |
| barrier.await(); |
| } |
| catch (Exception x) |
| { |
| throw new RuntimeException(x); |
| } |
| } |
| } |