blob: b1e3b91a7c5c7a3a459af3b8db6f63999681de76 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http.client;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.AsyncContext;
import javax.servlet.ServletException;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.client.util.InputStreamContentProvider;
import org.eclipse.jetty.client.util.InputStreamResponseListener;
import org.eclipse.jetty.client.util.OutputStreamContentProvider;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class HttpClientStreamTest extends AbstractTest
{
public HttpClientStreamTest(Transport transport)
{
super(transport);
}
@Test
public void testFileUpload() throws Exception
{
// Prepare a big file to upload
Path targetTestsDir = MavenTestingUtils.getTargetTestingDir().toPath();
Files.createDirectories(targetTestsDir);
Path upload = Paths.get(targetTestsDir.toString(), "http_client_upload.big");
try (OutputStream output = Files.newOutputStream(upload, StandardOpenOption.CREATE))
{
byte[] kb = new byte[1024];
for (int i = 0; i < 10 * 1024; ++i)
output.write(kb);
}
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentLength(0);
response.flushBuffer();
InputStream in = request.getInputStream();
byte[] buffer = new byte[1024];
while (true)
{
int read = in.read(buffer);
if (read < 0)
break;
}
}
});
final AtomicLong requestTime = new AtomicLong();
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.file(upload)
.onRequestSuccess(request -> requestTime.set(System.nanoTime()))
.timeout(30, TimeUnit.SECONDS)
.send();
long responseTime = System.nanoTime();
Assert.assertEquals(200, response.getStatus());
Assert.assertTrue(requestTime.get() <= responseTime);
// Give some time to the server to consume the request content
// This is just to avoid exception traces in the test output
Thread.sleep(1000);
}
@Test
public void testDownload() throws Exception
{
final byte[] data = new byte[128 * 1024];
byte value = 1;
Arrays.fill(data, value);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(data);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
int length = 0;
while (input.read() == value)
{
if (length % 100 == 0)
Thread.sleep(1);
++length;
}
Assert.assertEquals(data.length, length);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
Assert.assertFalse(result.isFailed());
Assert.assertSame(response, result.getResponse());
}
@Test
public void testDownloadOfUTF8Content() throws Exception
{
final byte[] data = new byte[]{(byte)0xC3, (byte)0xA8}; // UTF-8 representation of &egrave;
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(data);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
for (byte b : data)
{
int read = input.read();
Assert.assertTrue(read >= 0);
Assert.assertEquals(b & 0xFF, read);
}
Assert.assertEquals(-1, input.read());
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
Assert.assertFalse(result.isFailed());
Assert.assertSame(response, result.getResponse());
}
@Test
public void testDownloadWithFailure() throws Exception
{
final byte[] data = new byte[64 * 1024];
byte value = 1;
Arrays.fill(data, value);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
// Say we want to send this much...
response.setContentLength(2 * data.length);
// ...but write only half...
response.getOutputStream().write(data);
// ...then shutdown output
baseRequest.getHttpChannel().getEndPoint().shutdownOutput();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
int length = 0;
try
{
length = 0;
while (input.read() == value)
{
if (length % 100 == 0)
Thread.sleep(1);
++length;
}
Assert.fail();
}
catch (IOException x)
{
// Expected.
}
Assert.assertThat(length, Matchers.lessThanOrEqualTo(data.length));
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
Assert.assertTrue(result.isFailed());
}
@Test(expected = AsynchronousCloseException.class)
public void testInputStreamResponseListenerClosedBeforeReading() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
InputStream stream = listener.getInputStream();
// Close the stream immediately.
stream.close();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(new BytesContentProvider(new byte[]{0, 1, 2, 3}))
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
stream.read(); // Throws
}
@Test(expected = AsynchronousCloseException.class)
public void testInputStreamResponseListenerClosedBeforeContent() throws Exception
{
AtomicReference<AsyncContext> contextRef = new AtomicReference<>();
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
contextRef.set(request.startAsync());
response.flushBuffer();
}
});
CountDownLatch latch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
super.onContent(response, content, new Callback()
{
@Override
public void failed(Throwable x)
{
latch.countDown();
callback.failed(x);
}
});
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
InputStream input = listener.getInputStream();
input.close();
AsyncContext asyncContext = contextRef.get();
asyncContext.getResponse().getOutputStream().write(new byte[1024]);
asyncContext.complete();
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
input.read(); // Throws
}
@Test
public void testInputStreamResponseListenerClosedWhileWaiting() throws Exception
{
byte[] chunk1 = new byte[]{0, 1};
byte[] chunk2 = new byte[]{2, 3};
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.setContentLength(chunk1.length + chunk2.length);
ServletOutputStream output = response.getOutputStream();
output.write(chunk1);
output.flush();
output.write(chunk2);
}
});
CountDownLatch failedLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
super.onContent(response, content, new Callback()
{
@Override
public void failed(Throwable x)
{
failedLatch.countDown();
callback.failed(x);
}
});
contentLatch.countDown();
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait until we get some content.
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Close the stream.
InputStream stream = listener.getInputStream();
stream.close();
// Make sure that the callback has been invoked.
Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerFailedWhileWaiting() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
byte[] data = new byte[1024];
response.setContentLength(data.length);
ServletOutputStream output = response.getOutputStream();
output.write(data);
}
});
CountDownLatch failedLatch = new CountDownLatch(1);
CountDownLatch contentLatch = new CountDownLatch(1);
InputStreamResponseListener listener = new InputStreamResponseListener()
{
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
super.onContent(response, content, new Callback()
{
@Override
public void failed(Throwable x)
{
failedLatch.countDown();
callback.failed(x);
}
});
contentLatch.countDown();
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
// Wait until we get some content.
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
// Abort the response.
response.abort(new Exception());
// Make sure that the callback has been invoked.
Assert.assertTrue(failedLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerFailedBeforeResponse() throws Exception
{
start(new EmptyServerHandler());
int port = connector.getLocalPort();
server.stop();
InputStreamResponseListener listener = new InputStreamResponseListener();
// Connect to the wrong port
client.newRequest("localhost", port)
.scheme(getScheme())
.send(listener);
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertNotNull(result);
}
@Test(expected = ExecutionException.class)
public void testInputStreamContentProviderThrowingWhileReading() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final byte[] data = new byte[]{0, 1, 2, 3};
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(new InputStreamContentProvider(new InputStream()
{
private int index = 0;
@Override
public int read() throws IOException
{
// Will eventually throw ArrayIndexOutOfBounds
return data[index++];
}
}, data.length / 2))
.timeout(5, TimeUnit.SECONDS)
.send();
}
@Test(expected = AsynchronousCloseException.class)
public void testDownloadWithCloseBeforeContent() throws Exception
{
final byte[] data = new byte[128 * 1024];
byte value = 3;
Arrays.fill(data, value);
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.flushBuffer();
try
{
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
response.getOutputStream().write(data);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
input.close();
latch.countDown();
input.read(); // Throws
}
@Test(expected = AsynchronousCloseException.class)
public void testDownloadWithCloseMiddleOfContent() throws Exception
{
final byte[] data1 = new byte[1024];
final byte[] data2 = new byte[1024];
final CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(data1);
response.flushBuffer();
try
{
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
response.getOutputStream().write(data2);
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
for (byte datum1 : data1)
Assert.assertEquals(datum1, input.read());
input.close();
latch.countDown();
input.read(); // Throws
}
@Test
public void testDownloadWithCloseEndOfContent() throws Exception
{
final byte[] data = new byte[1024];
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
response.getOutputStream().write(data);
response.flushBuffer();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertNotNull(response);
Assert.assertEquals(200, response.getStatus());
InputStream input = listener.getInputStream();
Assert.assertNotNull(input);
for (byte datum : data)
Assert.assertEquals(datum, input.read());
// Read EOF
Assert.assertEquals(-1, input.read());
input.close();
// Must not throw
Assert.assertEquals(-1, input.read());
}
@Slow
@Test
public void testUploadWithDeferredContentProviderFromInputStream() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
try (DeferredContentProvider content = new DeferredContentProvider())
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(result ->
{
if (result.isSucceeded() && result.getResponse().getStatus() == 200)
latch.countDown();
});
// Make sure we provide the content *after* the request has been "sent".
Thread.sleep(1000);
try (ByteArrayInputStream input = new ByteArrayInputStream(new byte[1024]))
{
byte[] buffer = new byte[200];
int read;
while ((read = input.read(buffer)) >= 0)
content.offer(ByteBuffer.wrap(buffer, 0, read));
}
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentAvailableCallbacksNotifiedOnce() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), new ByteArrayOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger succeeds = new AtomicInteger();
try (DeferredContentProvider content = new DeferredContentProvider())
{
// Make the content immediately available.
content.offer(ByteBuffer.allocate(1024), new Callback()
{
@Override
public void succeeded()
{
succeeds.incrementAndGet();
}
});
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(result ->
{
if (result.isSucceeded() && result.getResponse().getStatus() == 200)
latch.countDown();
});
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
Assert.assertEquals(1, succeeds.get());
}
@Test
public void testUploadWithDeferredContentProviderRacingWithSend() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final byte[] data = new byte[512];
final DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public void setListener(Listener listener)
{
super.setListener(listener);
// Simulate a concurrent call
offer(ByteBuffer.wrap(data));
close();
}
};
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == 200 &&
Arrays.equals(data, getContent()))
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentProviderRacingWithIterator() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final CountDownLatch latch = new CountDownLatch(1);
final byte[] data = new byte[512];
final AtomicReference<DeferredContentProvider> contentRef = new AtomicReference<>();
final DeferredContentProvider content = new DeferredContentProvider()
{
@Override
public Iterator<ByteBuffer> iterator()
{
return new Iterator<ByteBuffer>()
{
// Data for the deferred content iterator:
// [0] => deferred
// [1] => deferred
// [2] => data
private final byte[][] iteratorData = new byte[3][];
private final AtomicInteger index = new AtomicInteger();
{
iteratorData[0] = null;
iteratorData[1] = null;
iteratorData[2] = data;
}
@Override
public boolean hasNext()
{
return index.get() < iteratorData.length;
}
@Override
public ByteBuffer next()
{
byte[] chunk = iteratorData[index.getAndIncrement()];
ByteBuffer result = chunk == null ? null : ByteBuffer.wrap(chunk);
if (index.get() < iteratorData.length)
{
contentRef.get().offer(result == null ? BufferUtil.EMPTY_BUFFER : result);
contentRef.get().close();
}
return result;
}
@Override
public void remove()
{
}
};
}
};
contentRef.set(content);
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == 200 &&
Arrays.equals(data, getContent()))
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithOutputStream() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final byte[] data = new byte[512];
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener()
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded() &&
result.getResponse().getStatus() == 200 &&
Arrays.equals(data, getContent()))
latch.countDown();
}
});
// Make sure we provide the content *after* the request has been "sent".
Thread.sleep(1000);
try (OutputStream output = content.getOutputStream())
{
output.write(data);
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testBigUploadWithOutputStreamFromInputStream() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
IO.copy(request.getInputStream(), response.getOutputStream());
}
});
final byte[] data = new byte[16 * 1024 * 1024];
new Random().nextBytes(data);
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(new BufferingResponseListener(data.length)
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isSucceeded());
Assert.assertEquals(200, result.getResponse().getStatus());
Assert.assertArrayEquals(data, getContent());
latch.countDown();
}
});
// Make sure we provide the content *after* the request has been "sent".
Thread.sleep(1000);
try (InputStream input = new ByteArrayInputStream(data); OutputStream output = content.getOutputStream())
{
byte[] buffer = new byte[1024];
while (true)
{
int read = input.read(buffer);
if (read < 0)
break;
output.write(buffer, 0, read);
}
}
Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@Test
public void testUploadWithOutputStreamFailureToConnect() throws Exception
{
start(new EmptyServerHandler());
final byte[] data = new byte[512];
final CountDownLatch latch = new CountDownLatch(1);
OutputStreamContentProvider content = new OutputStreamContentProvider();
client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(result ->
{
if (result.isFailed())
latch.countDown();
});
try (OutputStream output = content.getOutputStream())
{
output.write(data);
Assert.fail();
}
catch (IOException x)
{
// Expected
}
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithDeferredContentProviderFailsMultipleOffers() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch failLatch = new CountDownLatch(2);
final Callback callback = new Callback()
{
@Override
public void failed(Throwable x)
{
failLatch.countDown();
}
};
final CountDownLatch completeLatch = new CountDownLatch(1);
final DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.onRequestBegin(request ->
{
content.offer(ByteBuffer.wrap(new byte[256]), callback);
content.offer(ByteBuffer.wrap(new byte[256]), callback);
request.abort(new Exception("explicitly_thrown_by_test"));
})
.send(result ->
{
if (result.isFailed())
completeLatch.countDown();
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(failLatch.await(5, TimeUnit.SECONDS));
// Make sure that adding more content results in the callback to be failed.
final CountDownLatch latch = new CountDownLatch(1);
content.offer(ByteBuffer.wrap(new byte[128]), new Callback()
{
@Override
public void failed(Throwable x)
{
latch.countDown();
}
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithConnectFailureClosesStream() throws Exception
{
start(new EmptyServerHandler());
final CountDownLatch closeLatch = new CountDownLatch(1);
InputStream stream = new ByteArrayInputStream("test".getBytes(StandardCharsets.UTF_8))
{
@Override
public void close() throws IOException
{
super.close();
closeLatch.countDown();
}
};
InputStreamContentProvider content = new InputStreamContentProvider(stream);
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("0.0.0.1", connector.getLocalPort())
.scheme(getScheme())
.content(content)
.send(result ->
{
Assert.assertTrue(result.isFailed());
completeLatch.countDown();
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testUploadWithConcurrentServerCloseClosesStream() throws Exception
{
final CountDownLatch serverLatch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
serverLatch.countDown();
}
});
final AtomicBoolean commit = new AtomicBoolean();
final CountDownLatch closeLatch = new CountDownLatch(1);
InputStream stream = new InputStream()
{
@Override
public int read() throws IOException
{
// This method will be called few times before
// the request is committed.
// We wait for the request to commit, and we
// wait for the request to reach the server,
// to be sure that the server endPoint has
// been created, before stopping the connector.
if (commit.get())
{
try
{
Assert.assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
connector.stop();
return 0;
}
catch (Throwable x)
{
throw new IOException(x);
}
}
else
{
return connector.isStopped() ? -1 : 0;
}
}
@Override
public void close() throws IOException
{
super.close();
closeLatch.countDown();
}
};
InputStreamContentProvider provider = new InputStreamContentProvider(stream, 1);
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.content(provider)
.onRequestCommit(request -> commit.set(true))
.send(result ->
{
Assert.assertTrue(result.isFailed());
completeLatch.countDown();
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
@Test
public void testInputStreamResponseListenerBufferedRead() throws Exception
{
AtomicReference<AsyncContext> asyncContextRef = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
start(new AbstractHandler()
{
@Override
public void handle(String target, org.eclipse.jetty.server.Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
asyncContextRef.set(request.startAsync());
latch.countDown();
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.timeout(5, TimeUnit.SECONDS)
.send(listener);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
AsyncContext asyncContext = asyncContextRef.get();
Assert.assertNotNull(asyncContext);
Random random = new Random();
byte[] chunk = new byte[64];
random.nextBytes(chunk);
ServletOutputStream output = asyncContext.getResponse().getOutputStream();
output.write(chunk);
output.flush();
// Use a buffer larger than the data
// written to test that the read returns.
byte[] buffer = new byte[2 * chunk.length];
InputStream stream = listener.getInputStream();
int totalRead = 0;
while (totalRead < chunk.length)
{
int read = stream.read(buffer);
Assert.assertTrue(read > 0);
totalRead += read;
}
asyncContext.complete();
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(200, response.getStatus());
}
@Test
public void testInputStreamResponseListenerWithRedirect() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
if (target.startsWith("/303"))
response.sendRedirect("/200");
}
});
InputStreamResponseListener listener = new InputStreamResponseListener();
client.newRequest("localhost", connector.getLocalPort())
.scheme(getScheme())
.path("/303")
.followRedirects(true)
.send(listener);
Response response = listener.get(5, TimeUnit.SECONDS);
Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
Result result = listener.await(5, TimeUnit.SECONDS);
Assert.assertTrue(result.isSucceeded());
}
}