blob: ae037ff7f3fed5cf2012a356198452b6b0618384 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.FutureCallback;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class WriteFlusherTest
{
@Test
public void testCompleteNoBlocking() throws Exception
{
testCompleteWrite(false);
}
@Test
public void testIgnorePreviousFailures() throws Exception
{
testCompleteWrite(true);
}
private void testCompleteWrite(boolean failBefore) throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
endPoint.setGrowOutput(true);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
if (failBefore)
flusher.onFail(new IOException("Ignored because no operation in progress"));
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("How "), BufferUtil.toBuffer("now "), BufferUtil.toBuffer("brown "), BufferUtil.toBuffer("cow!"));
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
Assert.assertEquals("How now brown cow!", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testClosedNoBlocking() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
endPoint.close();
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("foo"));
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
try
{
callback.get();
Assert.fail();
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
}
Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testCompleteBlocking() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("How now brown cow!"));
Assert.assertFalse(callback.isDone());
Assert.assertFalse(callback.isCancelled());
Assert.assertTrue(incompleteFlush.get());
try
{
callback.get(100, TimeUnit.MILLISECONDS);
Assert.fail();
}
catch (TimeoutException x)
{
incompleteFlush.set(false);
}
Assert.assertEquals("How now br", endPoint.takeOutputString());
flusher.completeWrite();
Assert.assertTrue(callback.isDone());
Assert.assertEquals("own cow!", endPoint.takeOutputString());
Assert.assertFalse(incompleteFlush.get());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testCloseWhileBlocking() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("How now brown cow!"));
Assert.assertFalse(callback.isDone());
Assert.assertFalse(callback.isCancelled());
Assert.assertTrue(incompleteFlush.get());
incompleteFlush.set(false);
Assert.assertEquals("How now br", endPoint.takeOutputString());
endPoint.close();
flusher.completeWrite();
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
try
{
callback.get();
Assert.fail();
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException);
Assert.assertThat(cause.getMessage(), Matchers.containsString("CLOSED"));
}
Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testFailWhileBlocking() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("How now brown cow!"));
Assert.assertFalse(callback.isDone());
Assert.assertFalse(callback.isCancelled());
Assert.assertTrue(incompleteFlush.get());
incompleteFlush.set(false);
Assert.assertEquals("How now br", endPoint.takeOutputString());
String reason = "Failure";
flusher.onFail(new IOException(reason));
flusher.completeWrite();
Assert.assertTrue(callback.isDone());
Assert.assertFalse(incompleteFlush.get());
try
{
callback.get();
Assert.fail();
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
Assert.assertTrue(cause instanceof IOException);
Assert.assertEquals(reason, cause.getMessage());
}
Assert.assertEquals("", endPoint.takeOutputString());
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testConcurrent() throws Exception
{
Random random = new Random();
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(100);
try
{
String reason = "THE_CAUSE";
ConcurrentWriteFlusher[] flushers = new ConcurrentWriteFlusher[50000];
FutureCallback[] futures = new FutureCallback[flushers.length];
for (int i = 0; i < flushers.length; ++i)
{
int size = 5 + random.nextInt(15);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], size);
ConcurrentWriteFlusher flusher = new ConcurrentWriteFlusher(endPoint, scheduler, random);
flushers[i] = flusher;
FutureCallback callback = new FutureCallback();
futures[i] = callback;
scheduler.schedule(() -> flusher.onFail(new Throwable(reason)), random.nextInt(75) + 1, TimeUnit.MILLISECONDS);
flusher.write(callback, BufferUtil.toBuffer("How Now Brown Cow."), BufferUtil.toBuffer(" The quick brown fox jumped over the lazy dog!"));
}
int completed = 0;
int failed = 0;
for (int i = 0; i < flushers.length; ++i)
{
try
{
futures[i].get(15, TimeUnit.SECONDS);
Assert.assertEquals("How Now Brown Cow. The quick brown fox jumped over the lazy dog!", flushers[i].getContent());
completed++;
}
catch (ExecutionException x)
{
Assert.assertEquals(reason, x.getCause().getMessage());
failed++;
}
}
Assert.assertThat(completed, Matchers.greaterThan(0));
Assert.assertThat(failed, Matchers.greaterThan(0));
Assert.assertEquals(flushers.length, completed + failed);
}
finally
{
scheduler.shutdown();
}
}
@Test
public void testPendingWriteDoesNotStoreConsumedBuffers() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 10);
int toWrite = endPoint.getOutput().capacity();
byte[] chunk1 = new byte[toWrite / 2];
Arrays.fill(chunk1, (byte)1);
ByteBuffer buffer1 = ByteBuffer.wrap(chunk1);
byte[] chunk2 = new byte[toWrite];
Arrays.fill(chunk1, (byte)2);
ByteBuffer buffer2 = ByteBuffer.wrap(chunk2);
AtomicBoolean incompleteFlush = new AtomicBoolean();
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
incompleteFlush.set(true);
}
};
flusher.write(Callback.NOOP, buffer1, buffer2);
Assert.assertTrue(incompleteFlush.get());
Assert.assertFalse(buffer1.hasRemaining());
// Reuse buffer1
buffer1.clear();
Arrays.fill(chunk1, (byte)3);
int remaining1 = buffer1.remaining();
// Complete the write
endPoint.takeOutput();
flusher.completeWrite();
// Make sure buffer1 is unchanged
Assert.assertEquals(remaining1, buffer1.remaining());
}
@Test(expected = WritePendingException.class)
public void testConcurrentWrites() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
CountDownLatch flushLatch = new CountDownLatch(1);
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
try
{
flushLatch.countDown();
Thread.sleep(2000);
return super.flush(buffers);
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
@Override
protected void onIncompleteFlush()
{
}
};
// Two concurrent writes.
new Thread(() -> flusher.write(Callback.NOOP, BufferUtil.toBuffer("foo"))).start();
Assert.assertTrue(flushLatch.await(1, TimeUnit.SECONDS));
// The second write throws WritePendingException.
flusher.write(Callback.NOOP, BufferUtil.toBuffer("bar"));
}
@Test
public void testConcurrentWriteAndOnFail() throws Exception
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], 16);
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected ByteBuffer[] flush(ByteBuffer[] buffers) throws IOException
{
ByteBuffer[] result = super.flush(buffers);
boolean notified = onFail(new Throwable());
Assert.assertFalse(notified);
return result;
}
@Override
protected void onIncompleteFlush()
{
}
};
FutureCallback callback = new FutureCallback();
flusher.write(callback, BufferUtil.toBuffer("foo"));
// Callback must be successfully completed.
callback.get(1, TimeUnit.SECONDS);
// Flusher must be idle - not failed - since the write succeeded.
Assert.assertTrue(flusher.isIdle());
}
@Test
public void testConcurrentIncompleteFlushAndOnFail() throws Exception
{
int capacity = 8;
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(new byte[0], capacity);
String reason = "the_reason";
WriteFlusher flusher = new WriteFlusher(endPoint)
{
@Override
protected void onIncompleteFlush()
{
onFail(new Throwable(reason));
}
};
FutureCallback callback = new FutureCallback();
byte[] content = new byte[capacity * 2];
flusher.write(callback, BufferUtil.toBuffer(content));
try
{
// Callback must be failed.
callback.get(1, TimeUnit.SECONDS);
}
catch (ExecutionException x)
{
Assert.assertEquals(reason, x.getCause().getMessage());
}
}
private static class ConcurrentWriteFlusher extends WriteFlusher implements Runnable
{
private final ByteArrayEndPoint endPoint;
private final ScheduledExecutorService scheduler;
private final Random random;
private String content = "";
private ConcurrentWriteFlusher(ByteArrayEndPoint endPoint, ScheduledThreadPoolExecutor scheduler, Random random)
{
super(endPoint);
this.endPoint = endPoint;
this.scheduler = scheduler;
this.random = random;
}
@Override
protected void onIncompleteFlush()
{
scheduler.schedule(this, 1 + random.nextInt(9), TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
content += endPoint.takeOutputString();
completeWrite();
}
private String getContent()
{
content += endPoint.takeOutputString();
return content;
}
}
}