blob: 1db02503fe35101e25439dfbc3ed233204f50705 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.rhttp.loadtest;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.Address;
import org.eclipse.jetty.client.ContentExchange;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.mortbay.jetty.rhttp.client.RHTTPClient;
import org.mortbay.jetty.rhttp.client.JettyClient;
import org.mortbay.jetty.rhttp.client.RHTTPListener;
import org.mortbay.jetty.rhttp.client.RHTTPRequest;
import org.mortbay.jetty.rhttp.client.RHTTPResponse;
/**
* @version $Revision$ $Date$
*/
public class Loader
{
private final List<RHTTPClient> clients = new ArrayList<RHTTPClient>();
private final AtomicLong start = new AtomicLong();
private final AtomicLong end = new AtomicLong();
private final AtomicLong responses = new AtomicLong();
private final AtomicLong failures = new AtomicLong();
private final AtomicLong minLatency = new AtomicLong();
private final AtomicLong maxLatency = new AtomicLong();
private final AtomicLong totLatency = new AtomicLong();
private final ConcurrentMap<Long, AtomicLong> latencies = new ConcurrentHashMap<Long, AtomicLong>();
private final String nodeName;
public static void main(String[] args) throws Exception
{
String nodeName = "";
if (args.length > 0)
nodeName = args[0];
Loader loader = new Loader(nodeName);
loader.run();
}
public Loader(String nodeName)
{
this.nodeName = nodeName;
}
private void run() throws Exception
{
HttpClient httpClient = new HttpClient();
httpClient.setMaxConnectionsPerAddress(40000);
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setMaxThreads(500);
threadPool.setDaemon(true);
httpClient.setThreadPool(threadPool);
httpClient.setIdleTimeout(5000);
httpClient.start();
Random random = new Random();
BufferedReader console = new BufferedReader(new InputStreamReader(System.in));
System.err.print("server [localhost]: ");
String value = console.readLine().trim();
if (value.length() == 0)
value = "localhost";
String host = value;
System.err.print("port [8080]: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "8080";
int port = Integer.parseInt(value);
System.err.print("context []: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "";
String context = value;
System.err.print("external path [/]: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "/";
String externalPath = value;
System.err.print("gateway path [/__gateway]: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "/__gateway";
String gatewayPath = value;
int clients = 100;
int batchCount = 1000;
int batchSize = 5;
long batchPause = 5;
int requestSize = 50;
while (true)
{
System.err.println("-----");
System.err.print("clients [" + clients + "]: ");
value = console.readLine();
if (value == null)
break;
value = value.trim();
if (value.length() == 0)
value = "" + clients;
clients = Integer.parseInt(value);
System.err.println("Waiting for clients to be ready...");
Address gatewayAddress = new Address(host, port);
String gatewayURI = context + gatewayPath;
// Create or remove the necessary clients
int currentClients = this.clients.size();
if (currentClients < clients)
{
for (int i = 0; i < clients - currentClients; ++i)
{
final RHTTPClient client = new JettyClient(httpClient, gatewayAddress, gatewayURI, nodeName + (currentClients + i));
client.addListener(new EchoListener(client));
client.connect();
this.clients.add(client);
// Give some time to the server to accept connections and
// reply to handshakes and connects
if (i % 10 == 0)
{
Thread.sleep(100);
}
}
}
else if (currentClients > clients)
{
for (int i = 0; i < currentClients - clients; ++i)
{
RHTTPClient client = this.clients.remove(currentClients - i - 1);
client.disconnect();
}
}
System.err.println("Clients ready");
currentClients = this.clients.size();
if (currentClients > 0)
{
System.err.print("batch count [" + batchCount + "]: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "" + batchCount;
batchCount = Integer.parseInt(value);
System.err.print("batch size [" + batchSize + "]: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "" + batchSize;
batchSize = Integer.parseInt(value);
System.err.print("batch pause [" + batchPause + "]: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "" + batchPause;
batchPause = Long.parseLong(value);
System.err.print("request size [" + requestSize + "]: ");
value = console.readLine().trim();
if (value.length() == 0)
value = "" + requestSize;
requestSize = Integer.parseInt(value);
String requestBody = "";
for (int i = 0; i < requestSize; i++)
requestBody += "x";
String externalURL = "http://" + host + ":" + port + context + externalPath;
if (!externalURL.endsWith("/"))
externalURL += "/";
reset();
long start = System.nanoTime();
long expected = 0;
for (int i = 0; i < batchCount; ++i)
{
for (int j = 0; j < batchSize; ++j)
{
int clientIndex = random.nextInt(this.clients.size());
RHTTPClient client = this.clients.get(clientIndex);
String targetId = client.getTargetId();
String url = externalURL + targetId;
ExternalExchange exchange = new ExternalExchange();
exchange.setMethod("GET");
exchange.setURL(url);
exchange.setRequestContent(new ByteArrayBuffer(requestBody, "UTF-8"));
exchange.send(httpClient);
++expected;
}
if (batchPause > 0)
Thread.sleep(batchPause);
}
long end = System.nanoTime();
long elapsedNanos = end - start;
if (elapsedNanos > 0)
{
System.err.print("Messages - Elapsed | Rate = ");
System.err.print(TimeUnit.NANOSECONDS.toMillis(elapsedNanos));
System.err.print(" ms | ");
System.err.print(expected * 1000 * 1000 * 1000 / elapsedNanos);
System.err.println(" requests/s ");
}
waitForResponses(expected);
printReport(expected);
}
}
}
private void reset()
{
start.set(0L);
end.set(0L);
responses.set(0L);
failures.set(0L);
minLatency.set(Long.MAX_VALUE);
maxLatency.set(0L);
totLatency.set(0L);
}
private void updateLatencies(long start, long end)
{
long latency = end - start;
// Update the latencies using a non-blocking algorithm
long oldMinLatency = minLatency.get();
while (latency < oldMinLatency)
{
if (minLatency.compareAndSet(oldMinLatency, latency)) break;
oldMinLatency = minLatency.get();
}
long oldMaxLatency = maxLatency.get();
while (latency > oldMaxLatency)
{
if (maxLatency.compareAndSet(oldMaxLatency, latency)) break;
oldMaxLatency = maxLatency.get();
}
totLatency.addAndGet(latency);
latencies.putIfAbsent(latency, new AtomicLong(0L));
latencies.get(latency).incrementAndGet();
}
private boolean waitForResponses(long expected) throws InterruptedException
{
long arrived = responses.get() + failures.get();
long lastArrived = 0;
int maxRetries = 20;
int retries = maxRetries;
while (arrived < expected)
{
System.err.println("Waiting for responses to arrive " + arrived + "/" + expected);
Thread.sleep(500);
if (lastArrived == arrived)
{
--retries;
if (retries == 0) break;
}
else
{
lastArrived = arrived;
retries = maxRetries;
}
arrived = responses.get() + failures.get();
}
if (arrived < expected)
{
System.err.println("Interrupting wait for responses " + arrived + "/" + expected);
return false;
}
else
{
System.err.println("All responses arrived " + arrived + "/" + expected);
return true;
}
}
public void printReport(long expectedCount)
{
long responseCount = responses.get() + failures.get();
System.err.print("Messages - Success/Failures/Expected = ");
System.err.print(responses.get());
System.err.print("/");
System.err.print(failures.get());
System.err.print("/");
System.err.println(expectedCount);
long elapsedNanos = end.get() - start.get();
if (elapsedNanos > 0)
{
System.err.print("Messages - Elapsed | Rate = ");
System.err.print(TimeUnit.NANOSECONDS.toMillis(elapsedNanos));
System.err.print(" ms | ");
System.err.print(responseCount * 1000 * 1000 * 1000 / elapsedNanos);
System.err.println(" responses/s ");
}
if (latencies.size() > 1)
{
long maxLatencyBucketFrequency = 0L;
long[] latencyBucketFrequencies = new long[20];
long latencyRange = maxLatency.get() - minLatency.get();
for (Iterator<Map.Entry<Long, AtomicLong>> entries = latencies.entrySet().iterator(); entries.hasNext();)
{
Map.Entry<Long, AtomicLong> entry = entries.next();
long latency = entry.getKey();
Long bucketIndex = (latency - minLatency.get()) * latencyBucketFrequencies.length / latencyRange;
int index = bucketIndex.intValue() == latencyBucketFrequencies.length ? latencyBucketFrequencies.length - 1 : bucketIndex.intValue();
long value = entry.getValue().get();
latencyBucketFrequencies[index] += value;
if (latencyBucketFrequencies[index] > maxLatencyBucketFrequency) maxLatencyBucketFrequency = latencyBucketFrequencies[index];
entries.remove();
}
System.err.println("Messages - Latency Distribution Curve (X axis: Frequency, Y axis: Latency):");
for (int i = 0; i < latencyBucketFrequencies.length; i++)
{
long latencyBucketFrequency = latencyBucketFrequencies[i];
int value = Math.round(latencyBucketFrequency * (float) latencyBucketFrequencies.length / maxLatencyBucketFrequency);
if (value == latencyBucketFrequencies.length) value = value - 1;
for (int j = 0; j < value; ++j) System.err.print(" ");
System.err.print("@");
for (int j = value + 1; j < latencyBucketFrequencies.length; ++j) System.err.print(" ");
System.err.print(" _ ");
System.err.print(TimeUnit.NANOSECONDS.toMillis((latencyRange * (i + 1) / latencyBucketFrequencies.length) + minLatency.get()));
System.err.println(" ms (" + latencyBucketFrequency + ")");
}
}
System.err.print("Messages - Latency Min/Ave/Max = ");
System.err.print(TimeUnit.NANOSECONDS.toMillis(minLatency.get()) + "/");
System.err.print(responseCount == 0 ? "-/" : TimeUnit.NANOSECONDS.toMillis(totLatency.get() / responseCount) + "/");
System.err.println(TimeUnit.NANOSECONDS.toMillis(maxLatency.get()) + " ms");
}
private class ExternalExchange extends ContentExchange
{
private volatile long sendTime;
private ExternalExchange()
{
super(true);
}
private void send(HttpClient httpClient) throws IOException
{
this.sendTime = System.nanoTime();
httpClient.send(this);
}
@Override
protected void onResponseComplete() throws IOException
{
if (getResponseStatus() == 200)
responses.incrementAndGet();
else
failures.incrementAndGet();
long arrivalTime = System.nanoTime();
if (start.get() == 0L)
start.set(arrivalTime);
end.set(arrivalTime);
updateLatencies(sendTime, arrivalTime);
}
@Override
protected void onException(Throwable x)
{
failures.incrementAndGet();
}
}
private static class EchoListener implements RHTTPListener
{
private final RHTTPClient client;
public EchoListener(RHTTPClient client)
{
this.client = client;
}
public void onRequest(RHTTPRequest request) throws Exception
{
RHTTPResponse response = new RHTTPResponse(request.getId(), 200, "OK", new HashMap<String, String>(), request.getBody());
client.deliver(response);
}
}
}