blob: 2412d1282d12bfe929fd9600e3293547bcb2b07d [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
public class SelectChannelEndPointTest
{
private static final Logger LOG = Log.getLogger(SelectChannelEndPointTest.class);
protected CountDownLatch _lastEndPointLatch;
protected volatile EndPoint _lastEndPoint;
protected ServerSocketChannel _connector;
protected QueuedThreadPool _threadPool = new QueuedThreadPool();
protected Scheduler _scheduler = new TimerScheduler();
protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler)
{
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
return SelectChannelEndPointTest.this.newConnection(channel, endpoint);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000);
_lastEndPoint = endp;
_lastEndPointLatch.countDown();
return endp;
}
};
// Must be volatile or the test may fail spuriously
protected volatile int _blockAt = 0;
private volatile int _writeCount = 1;
@Before
public void startManager() throws Exception
{
_writeCount = 1;
_lastEndPoint = null;
_lastEndPointLatch = new CountDownLatch(1);
_connector = ServerSocketChannel.open();
_connector.socket().bind(null);
_scheduler.start();
_threadPool.start();
_manager.start();
}
@After
public void stopManager() throws Exception
{
_scheduler.stop();
_manager.stop();
_threadPool.stop();
_connector.close();
}
protected Socket newClient() throws IOException
{
return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort());
}
protected Connection newConnection(SocketChannel channel, EndPoint endpoint)
{
return new TestConnection(endpoint);
}
public class TestConnection extends AbstractConnection
{
volatile FutureCallback _blockingRead;
ByteBuffer _in = BufferUtil.allocate(32 * 1024);
ByteBuffer _out = BufferUtil.allocate(32 * 1024);
long _last = -1;
final CountDownLatch _latch;
public TestConnection(EndPoint endp)
{
super(endp, _threadPool);
_latch=null;
}
public TestConnection(EndPoint endp,CountDownLatch latch)
{
super(endp, _threadPool);
_latch=latch;
}
@Override
public void onOpen()
{
super.onOpen();
fillInterested();
}
@Override
public void onFillInterestedFailed(Throwable cause)
{
Callback blocking = _blockingRead;
if (blocking!=null)
{
_blockingRead=null;
blocking.failed(cause);
return;
}
super.onFillInterestedFailed(cause);
}
@Override
public void onFillable()
{
if (_latch!=null)
{
try
{
_latch.await();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
Callback blocking = _blockingRead;
if (blocking!=null)
{
_blockingRead=null;
blocking.succeeded();
return;
}
EndPoint _endp = getEndPoint();
try
{
_last = System.currentTimeMillis();
boolean progress = true;
while (progress)
{
progress = false;
// Fill the input buffer with everything available
BufferUtil.compact(_in);
if (BufferUtil.isFull(_in))
throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in));
int filled = _endp.fill(_in);
if (filled > 0)
progress = true;
// If the tests wants to block, then block
while (_blockAt > 0 && _endp.isOpen() && _in.remaining() < _blockAt)
{
FutureCallback future = _blockingRead = new FutureCallback();
fillInterested();
future.get();
filled = _endp.fill(_in);
progress |= filled > 0;
}
// Copy to the out buffer
if (BufferUtil.hasContent(_in) && BufferUtil.append(_out, _in) > 0)
progress = true;
// Blocking writes
if (BufferUtil.hasContent(_out))
{
ByteBuffer out = _out.duplicate();
BufferUtil.clear(_out);
for (int i = 0; i < _writeCount; i++)
{
FutureCallback blockingWrite = new FutureCallback();
_endp.write(blockingWrite, out.asReadOnlyBuffer());
blockingWrite.get();
}
progress = true;
}
// are we done?
if (_endp.isInputShutdown())
_endp.shutdownOutput();
}
if (_endp.isOpen())
fillInterested();
}
catch (ExecutionException e)
{
// Timeout does not close, so echo exception then shutdown
try
{
FutureCallback blockingWrite = new FutureCallback();
_endp.write(blockingWrite, BufferUtil.toBuffer("EE: " + BufferUtil.toString(_in)));
blockingWrite.get();
_endp.shutdownOutput();
}
catch (Exception e2)
{
// e2.printStackTrace();
}
}
catch (InterruptedException | EofException e)
{
SelectChannelEndPoint.LOG.ignore(e);
}
catch (Exception e)
{
SelectChannelEndPoint.LOG.warn(e);
}
finally
{
}
}
}
@Test
public void testEcho() throws Exception
{
Socket client = newClient();
client.setSoTimeout(60000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
// wait for read timeout
client.setSoTimeout(500);
long start = System.currentTimeMillis();
try
{
client.getInputStream().read();
Assert.fail();
}
catch (SocketTimeoutException e)
{
long duration = System.currentTimeMillis() - start;
Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L));
}
// write then shutdown
client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8));
// Verify echo server to client
for (char c : "Goodbye Cruel TLS".toCharArray())
{
int b = client.getInputStream().read();
Assert.assertThat("expect valid char integer", b, greaterThan(0));
assertEquals("expect characters to be same", c, (char)b);
}
client.close();
for (int i = 0; i < 10; ++i)
{
if (server.isOpen())
Thread.sleep(10);
else
break;
}
assertFalse(server.isOpen());
}
@Test
public void testShutdown() throws Exception
{
Socket client = newClient();
client.setSoTimeout(500);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
// wait for read timeout
long start = System.currentTimeMillis();
try
{
client.getInputStream().read();
Assert.fail();
}
catch (SocketTimeoutException e)
{
assertTrue(System.currentTimeMillis() - start >= 400);
}
// write then shutdown
client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8));
client.shutdownOutput();
// Verify echo server to client
for (char c : "Goodbye Cruel TLS".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
// Read close
assertEquals(-1, client.getInputStream().read());
}
@Test
public void testReadBlocked() throws Exception
{
Socket client = newClient();
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
OutputStream clientOutputStream = client.getOutputStream();
InputStream clientInputStream = client.getInputStream();
int specifiedTimeout = 1000;
client.setSoTimeout(specifiedTimeout);
// Write 8 and cause block waiting for 10
_blockAt = 10;
clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8));
clientOutputStream.flush();
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndPoint.setIdleTimeout(10 * specifiedTimeout);
Thread.sleep((11 * specifiedTimeout) / 10);
long start = System.currentTimeMillis();
try
{
int b = clientInputStream.read();
Assert.fail("Should have timed out waiting for a response, but read " + b);
}
catch (SocketTimeoutException e)
{
int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue();
Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4));
}
// write remaining characters
clientOutputStream.write("90ABCDEF".getBytes(StandardCharsets.UTF_8));
clientOutputStream.flush();
// Verify echo server to client
for (char c : "1234567890ABCDEF".toCharArray())
{
int b = clientInputStream.read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
}
@Test
public void testIdle() throws Exception
{
Socket client = newClient();
client.setSoTimeout(3000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
int idleTimeout = 500;
_lastEndPoint.setIdleTimeout(idleTimeout);
// read until idle shutdown received
long start = System.currentTimeMillis();
int b = client.getInputStream().read();
assertEquals(-1, b);
long idle = System.currentTimeMillis() - start;
assertTrue(idle > idleTimeout / 2);
assertTrue(idle < idleTimeout * 2);
// But endpoint may still be open for a little bit.
for (int i = 0; i < 10; ++i)
{
if (_lastEndPoint.isOpen())
Thread.sleep(2 * idleTimeout / 10);
else
break;
}
assertFalse(_lastEndPoint.isOpen());
}
@Test
public void testBlockedReadIdle() throws Exception
{
Socket client = newClient();
InputStream clientInputStream = client.getInputStream();
OutputStream clientOutputStream = client.getOutputStream();
client.setSoTimeout(5000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
clientOutputStream.write("HelloWorld".getBytes(StandardCharsets.UTF_8));
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = clientInputStream.read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
int idleTimeout = 500;
_lastEndPoint.setIdleTimeout(idleTimeout);
// Write 8 and cause block waiting for 10
_blockAt = 10;
clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8));
clientOutputStream.flush();
// read until idle shutdown received
long start = System.currentTimeMillis();
int b = clientInputStream.read();
assertEquals('E', b);
long idle = System.currentTimeMillis() - start;
assertTrue(idle > idleTimeout / 2);
assertTrue(idle < idleTimeout * 2);
for (char c : "E: 12345678".toCharArray())
{
b = clientInputStream.read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
b = clientInputStream.read();
assertEquals(-1,b);
// But endpoint is still open.
if(_lastEndPoint.isOpen())
// Wait for another idle callback
Thread.sleep(idleTimeout * 2);
// endpoint is closed.
assertFalse(_lastEndPoint.isOpen());
}
@Test
public void testStress() throws Exception
{
Socket client = newClient();
client.setSoTimeout(30000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
final int writes = 200000;
final byte[] bytes = "HelloWorld-".getBytes(StandardCharsets.UTF_8);
byte[] count = "0\n".getBytes(StandardCharsets.UTF_8);
BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream());
final CountDownLatch latch = new CountDownLatch(writes);
final InputStream in = new BufferedInputStream(client.getInputStream());
final long start = System.currentTimeMillis();
out.write(bytes);
out.write(count);
out.flush();
Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS));
_lastEndPoint.setIdleTimeout(5000);
new Thread()
{
@Override
public void run()
{
Thread.currentThread().setPriority(MAX_PRIORITY);
long last = -1;
int count = -1;
try
{
while (latch.getCount() > 0)
{
// Verify echo server to client
for (byte b0 : bytes)
{
int b = in.read();
Assert.assertThat(b, greaterThan(0));
assertEquals(0xff & b0, b);
}
count = 0;
int b = in.read();
while (b > 0 && b != '\n')
{
count = count * 10 + (b - '0');
b = in.read();
}
last = System.currentTimeMillis();
//if (latch.getCount()%1000==0)
// System.out.println(writes-latch.getCount());
latch.countDown();
}
}
catch (Throwable e)
{
long now = System.currentTimeMillis();
System.err.println("count=" + count);
System.err.println("latch=" + latch.getCount());
System.err.println("time=" + (now - start));
System.err.println("last=" + (now - last));
System.err.println("endp=" + _lastEndPoint);
System.err.println("conn=" + _lastEndPoint.getConnection());
e.printStackTrace();
}
}
}.start();
// Write client to server
for (int i = 1; i < writes; i++)
{
out.write(bytes);
out.write(Integer.toString(i).getBytes(StandardCharsets.ISO_8859_1));
out.write('\n');
if (i % 1000 == 0)
{
//System.err.println(i+"/"+writes);
out.flush();
}
Thread.yield();
}
out.flush();
long last = latch.getCount();
while (!latch.await(5, TimeUnit.SECONDS))
{
//System.err.println(latch.getCount());
if (latch.getCount() == last)
Assert.fail();
last = latch.getCount();
}
assertEquals(0, latch.getCount());
}
@Test
public void testWriteBlocked() throws Exception
{
Socket client = newClient();
client.setSoTimeout(10000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
_writeCount = 10000;
String data = "Now is the time for all good men to come to the aid of the party";
client.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));
BufferedInputStream in = new BufferedInputStream(client.getInputStream());
int byteNum = 0;
try
{
for (int i = 0; i < _writeCount; i++)
{
if (i % 1000 == 0)
TimeUnit.MILLISECONDS.sleep(200);
// Verify echo server to client
for (int j = 0; j < data.length(); j++)
{
char c = data.charAt(j);
int b = in.read();
byteNum++;
assertTrue(b > 0);
assertEquals("test-" + i + "/" + j,c,(char)b);
}
if (i == 0)
_lastEndPoint.setIdleTimeout(60000);
}
}
catch (SocketTimeoutException e)
{
System.err.println("SelectorManager.dump() = " + _manager.dump());
LOG.warn("Server: " + server);
LOG.warn("Error reading byte #" + byteNum,e);
throw e;
}
client.close();
for (int i = 0; i < 10; ++i)
{
if (server.isOpen())
Thread.sleep(10);
else
break;
}
assertFalse(server.isOpen());
}
// TODO make this test reliable
@Test
@Ignore
public void testRejectedExecution() throws Exception
{
_manager.stop();
_threadPool.stop();
final CountDownLatch latch = new CountDownLatch(1);
BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4);
_threadPool = new QueuedThreadPool(4,4,60000,q);
_manager = new SelectorManager(_threadPool, _scheduler, 1)
{
@Override
public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment)
{
return new TestConnection(endpoint,latch);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000);
_lastEndPoint = endp;
_lastEndPointLatch.countDown();
return endp;
}
};
_threadPool.start();
_manager.start();
AtomicInteger timeout = new AtomicInteger();
AtomicInteger rejections = new AtomicInteger();
AtomicInteger echoed = new AtomicInteger();
CountDownLatch closed = new CountDownLatch(20);
for (int i=0;i<20;i++)
{
new Thread()
{
public void run()
{
try(Socket client = newClient();)
{
client.setSoTimeout(5000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
client.getOutputStream().flush();
client.shutdownOutput();
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
assertEquals(-1,client.getInputStream().read());
echoed.incrementAndGet();
}
catch(SocketTimeoutException x)
{
x.printStackTrace();
timeout.incrementAndGet();
}
catch(Throwable x)
{
rejections.incrementAndGet();
}
finally
{
closed.countDown();
}
}
}.start();
}
// unblock the handling
latch.countDown();
// wait for all clients to complete or fail
closed.await();
// assert some clients must have been rejected
Assert.assertThat(rejections.get(),Matchers.greaterThan(0));
// but not all of them
Assert.assertThat(rejections.get(),Matchers.lessThan(20));
// none should have timed out
Assert.assertThat(timeout.get(),Matchers.equalTo(0));
// and the rest should have worked
Assert.assertThat(echoed.get(),Matchers.equalTo(20-rejections.get()));
// and the selector is still working for new requests
try(Socket client = newClient();)
{
client.setSoTimeout(5000);
SocketChannel server = _connector.accept();
server.configureBlocking(false);
_manager.accept(server);
// Write client to server
client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8));
client.getOutputStream().flush();
client.shutdownOutput();
// Verify echo server to client
for (char c : "HelloWorld".toCharArray())
{
int b = client.getInputStream().read();
assertTrue(b > 0);
assertEquals(c, (char)b);
}
assertEquals(-1,client.getInputStream().read());
}
}
}