| // |
| // ======================================================================== |
| // Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.http.client; |
| |
| import java.io.ByteArrayInputStream; |
| import java.io.IOException; |
| import java.io.InterruptedIOException; |
| import java.io.UncheckedIOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Deque; |
| import java.util.Queue; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingDeque; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import javax.servlet.AsyncContext; |
| import javax.servlet.DispatcherType; |
| import javax.servlet.ReadListener; |
| import javax.servlet.ServletException; |
| import javax.servlet.ServletInputStream; |
| import javax.servlet.ServletOutputStream; |
| import javax.servlet.WriteListener; |
| import javax.servlet.http.HttpServlet; |
| import javax.servlet.http.HttpServletRequest; |
| import javax.servlet.http.HttpServletResponse; |
| |
| import org.eclipse.jetty.client.api.ContentResponse; |
| import org.eclipse.jetty.client.api.Destination; |
| import org.eclipse.jetty.client.api.Response; |
| import org.eclipse.jetty.client.api.Result; |
| import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; |
| import org.eclipse.jetty.client.util.BufferingResponseListener; |
| import org.eclipse.jetty.client.util.DeferredContentProvider; |
| import org.eclipse.jetty.client.util.InputStreamContentProvider; |
| import org.eclipse.jetty.client.util.StringContentProvider; |
| import org.eclipse.jetty.http.HttpHeader; |
| import org.eclipse.jetty.http.HttpMethod; |
| import org.eclipse.jetty.http.HttpStatus; |
| import org.eclipse.jetty.http2.HTTP2Session; |
| import org.eclipse.jetty.http2.api.Session; |
| import org.eclipse.jetty.http2.client.http.HttpConnectionOverHTTP2; |
| import org.eclipse.jetty.io.Connection; |
| import org.eclipse.jetty.server.Handler; |
| import org.eclipse.jetty.server.HttpChannel; |
| import org.eclipse.jetty.server.Request; |
| import org.eclipse.jetty.server.handler.ContextHandler; |
| import org.eclipse.jetty.server.handler.ContextHandler.Context; |
| import org.eclipse.jetty.util.FuturePromise; |
| import org.eclipse.jetty.util.log.StacklessLogging; |
| import org.hamcrest.Matchers; |
| import org.junit.Assert; |
| import org.junit.Assume; |
| import org.junit.Test; |
| |
| import static org.hamcrest.Matchers.containsString; |
| import static org.hamcrest.Matchers.instanceOf; |
| import static org.hamcrest.Matchers.is; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| |
| public class AsyncIOServletTest extends AbstractTest |
| { |
| private static final ThreadLocal<RuntimeException> scope = new ThreadLocal<>(); |
| |
| public AsyncIOServletTest(Transport transport) |
| { |
| super(transport == Transport.FCGI ? null : transport); |
| } |
| |
| @Override |
| protected void startServer(Handler handler) throws Exception |
| { |
| if (handler == context) |
| { |
| // Add this listener before the context is started, so it's durable. |
| context.addEventListener(new ContextHandler.ContextScopeListener() |
| { |
| @Override |
| public void enterScope(Context context, Request request, Object reason) |
| { |
| checkScope(); |
| scope.set(new RuntimeException()); |
| } |
| |
| @Override |
| public void exitScope(Context context, Request request) |
| { |
| assertScope(); |
| scope.set(null); |
| } |
| }); |
| } |
| super.startServer(handler); |
| } |
| |
| private void assertScope() |
| { |
| Assert.assertNotNull("Not in scope", scope.get()); |
| } |
| |
| private void checkScope() |
| { |
| RuntimeException callScope = scope.get(); |
| if (callScope != null) |
| throw callScope; |
| } |
| |
| protected void stopServer() throws Exception |
| { |
| super.stopServer(); |
| checkScope(); |
| scope.set(null); |
| } |
| |
| private void sleep(long ms) |
| { |
| try |
| { |
| Thread.sleep(ms); |
| } |
| catch (InterruptedException e) |
| { |
| throw new UncheckedIOException(new InterruptedIOException()); |
| } |
| } |
| |
| @Test |
| public void testAsyncReadThrowsException() throws Exception |
| { |
| testAsyncReadThrows(new NullPointerException("explicitly_thrown_by_test")); |
| } |
| |
| @Test |
| public void testAsyncReadThrowsError() throws Exception |
| { |
| testAsyncReadThrows(new Error("explicitly_thrown_by_test")); |
| } |
| |
| private void testAsyncReadThrows(Throwable throwable) throws Exception |
| { |
| CountDownLatch latch = new CountDownLatch(1); |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| AsyncContext asyncContext = request.startAsync(request, response); |
| request.getInputStream().setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| if (throwable instanceof RuntimeException) |
| throw (RuntimeException)throwable; |
| if (throwable instanceof Error) |
| throw (Error)throwable; |
| throw new IOException(throwable); |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| assertScope(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| Assert.assertThat("onError type", t, instanceOf(throwable.getClass())); |
| Assert.assertThat("onError message", t.getMessage(), is(throwable.getMessage())); |
| latch.countDown(); |
| response.setStatus(500); |
| asyncContext.complete(); |
| } |
| }); |
| } |
| }); |
| |
| ContentResponse response = client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .content(new StringContentProvider("0123456789")) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(); |
| |
| assertTrue(latch.await(5, TimeUnit.SECONDS)); |
| assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); |
| } |
| |
| @Test |
| public void testAsyncReadIdleTimeout() throws Exception |
| { |
| int status = 567; |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| AsyncContext asyncContext = request.startAsync(request, response); |
| asyncContext.setTimeout(0); |
| ServletInputStream inputStream = request.getInputStream(); |
| inputStream.setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| while (inputStream.isReady() && !inputStream.isFinished()) |
| inputStream.read(); |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| assertScope(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| response.setStatus(status); |
| // Do not put Connection: close header here, the test |
| // verifies that the server closes no matter what. |
| asyncContext.complete(); |
| } |
| }); |
| } |
| }); |
| connector.setIdleTimeout(1000); |
| CountDownLatch closeLatch = new CountDownLatch(1); |
| connector.addBean(new Connection.Listener() |
| { |
| @Override |
| public void onOpened(Connection connection) |
| { |
| } |
| |
| @Override |
| public void onClosed(Connection connection) |
| { |
| closeLatch.countDown(); |
| } |
| }); |
| |
| String data = "0123456789"; |
| DeferredContentProvider content = new DeferredContentProvider(); |
| content.offer(ByteBuffer.wrap(data.getBytes(StandardCharsets.UTF_8))); |
| CountDownLatch responseLatch = new CountDownLatch(1); |
| CountDownLatch clientLatch = new CountDownLatch(1); |
| client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .content(content) |
| .onResponseSuccess(r -> responseLatch.countDown()) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(result -> |
| { |
| assertEquals(status, result.getResponse().getStatus()); |
| clientLatch.countDown(); |
| }); |
| |
| assertTrue(closeLatch.await(5, TimeUnit.SECONDS)); |
| assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); |
| content.close(); |
| assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testOnErrorThrows() throws Exception |
| { |
| AtomicInteger errors = new AtomicInteger(); |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| if (request.getDispatcherType() == DispatcherType.ERROR) |
| { |
| response.flushBuffer(); |
| return; |
| } |
| |
| request.startAsync(request, response); |
| request.getInputStream().setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| throw new NullPointerException("explicitly_thrown_by_test_1"); |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| assertScope(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| errors.incrementAndGet(); |
| throw new NullPointerException("explicitly_thrown_by_test_2") |
| {{ |
| this.initCause(t); |
| }}; |
| } |
| }); |
| } |
| }); |
| |
| try (StacklessLogging stackless = new StacklessLogging(HttpChannel.class)) |
| { |
| ContentResponse response = client.newRequest(newURI()) |
| .path(servletPath) |
| .content(new StringContentProvider("0123456789")) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(); |
| |
| assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); |
| Assert.assertEquals(1, errors.get()); |
| } |
| } |
| |
| @Test |
| public void testAsyncWriteThrowsException() throws Exception |
| { |
| testAsyncWriteThrows(new NullPointerException("explicitly_thrown_by_test")); |
| } |
| |
| @Test |
| public void testAsyncWriteThrowsError() throws Exception |
| { |
| testAsyncWriteThrows(new Error("explicitly_thrown_by_test")); |
| } |
| |
| private void testAsyncWriteThrows(Throwable throwable) throws Exception |
| { |
| CountDownLatch latch = new CountDownLatch(1); |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| AsyncContext asyncContext = request.startAsync(request, response); |
| response.getOutputStream().setWriteListener(new WriteListener() |
| { |
| @Override |
| public void onWritePossible() throws IOException |
| { |
| assertScope(); |
| if (throwable instanceof RuntimeException) |
| throw (RuntimeException)throwable; |
| if (throwable instanceof Error) |
| throw (Error)throwable; |
| throw new IOException(throwable); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| latch.countDown(); |
| response.setStatus(500); |
| asyncContext.complete(); |
| Assert.assertSame(throwable, t); |
| } |
| }); |
| } |
| }); |
| |
| ContentResponse response = client.newRequest(newURI()) |
| .path(servletPath) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(); |
| |
| assertTrue(latch.await(5, TimeUnit.SECONDS)); |
| assertEquals(HttpStatus.INTERNAL_SERVER_ERROR_500, response.getStatus()); |
| } |
| |
| @Test |
| public void testAsyncWriteClosed() throws Exception |
| { |
| String text = "Now is the winter of our discontent. How Now Brown Cow. The quick brown fox jumped over the lazy dog.\n"; |
| for (int i = 0; i < 10; i++) |
| text = text + text; |
| byte[] data = text.getBytes(StandardCharsets.UTF_8); |
| |
| CountDownLatch errorLatch = new CountDownLatch(1); |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| response.flushBuffer(); |
| |
| AsyncContext async = request.startAsync(); |
| ServletOutputStream out = response.getOutputStream(); |
| out.setWriteListener(new WriteListener() |
| { |
| @Override |
| public void onWritePossible() throws IOException |
| { |
| assertScope(); |
| |
| // Wait for the failure to arrive to |
| // the server while we are about to write. |
| sleep(1000); |
| |
| out.write(data); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| async.complete(); |
| errorLatch.countDown(); |
| } |
| }); |
| } |
| }); |
| |
| CountDownLatch clientLatch = new CountDownLatch(1); |
| client.newRequest(newURI()) |
| .path(servletPath) |
| .onResponseHeaders(response -> |
| { |
| if (response.getStatus() == HttpStatus.OK_200) |
| response.abort(new IOException("explicitly_closed_by_test")); |
| }) |
| .send(result -> |
| { |
| if (result.isFailed()) |
| clientLatch.countDown(); |
| }); |
| |
| assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); |
| assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testIsReadyAtEOF() throws Exception |
| { |
| String text = "TEST\n"; |
| byte[] data = text.getBytes(StandardCharsets.UTF_8); |
| |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| response.flushBuffer(); |
| |
| AsyncContext async = request.startAsync(); |
| ServletInputStream input = request.getInputStream(); |
| ServletOutputStream output = response.getOutputStream(); |
| |
| input.setReadListener(new ReadListener() |
| { |
| transient int _i = 0; |
| transient boolean _minusOne = false; |
| transient boolean _finished = false; |
| |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| while (input.isReady() && !input.isFinished()) |
| { |
| int b = input.read(); |
| if (b == -1) |
| _minusOne = true; |
| else if (data[_i++] != b) |
| throw new IllegalStateException(); |
| } |
| |
| if (input.isFinished()) |
| _finished = true; |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| assertScope(); |
| output.write(String.format("i=%d eof=%b finished=%b", _i, _minusOne, _finished).getBytes(StandardCharsets.UTF_8)); |
| async.complete(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| t.printStackTrace(); |
| async.complete(); |
| } |
| }); |
| } |
| }); |
| |
| ContentResponse response = client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .header(HttpHeader.CONNECTION, "close") |
| .content(new StringContentProvider(text)) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(); |
| |
| String responseContent = response.getContentAsString(); |
| assertThat(responseContent, containsString("i=" + data.length + " eof=true finished=true")); |
| } |
| |
| @Test |
| public void testOnAllDataRead() throws Exception |
| { |
| String success = "SUCCESS"; |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| response.flushBuffer(); |
| |
| AsyncContext async = request.startAsync(); |
| async.setTimeout(5000); |
| ServletInputStream in = request.getInputStream(); |
| ServletOutputStream out = response.getOutputStream(); |
| |
| in.setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| try |
| { |
| sleep(1000); |
| if (!in.isReady()) |
| throw new IllegalStateException(); |
| if (in.read() != 'X') |
| throw new IllegalStateException(); |
| if (!in.isReady()) |
| throw new IllegalStateException(); |
| if (in.read() != -1) |
| throw new IllegalStateException(); |
| } |
| catch (IOException x) |
| { |
| throw new UncheckedIOException(x); |
| } |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| assertScope(); |
| out.write(success.getBytes(StandardCharsets.UTF_8)); |
| async.complete(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| t.printStackTrace(); |
| async.complete(); |
| } |
| }); |
| } |
| }); |
| |
| byte[] data = "X".getBytes(StandardCharsets.UTF_8); |
| CountDownLatch clientLatch = new CountDownLatch(1); |
| DeferredContentProvider content = new DeferredContentProvider() |
| { |
| @Override |
| public long getLength() |
| { |
| return data.length; |
| } |
| }; |
| client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .content(content) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(new BufferingResponseListener() |
| { |
| @Override |
| public void onComplete(Result result) |
| { |
| if (result.isSucceeded()) |
| { |
| Response response = result.getResponse(); |
| String content = getContentAsString(); |
| if (response.getStatus() == HttpStatus.OK_200 && success.equals(content)) |
| clientLatch.countDown(); |
| } |
| } |
| }); |
| |
| sleep(100); |
| content.offer(ByteBuffer.wrap(data)); |
| content.close(); |
| |
| assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testOtherThreadOnAllDataRead() throws Exception |
| { |
| String success = "SUCCESS"; |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| response.flushBuffer(); |
| |
| AsyncContext async = request.startAsync(); |
| async.setTimeout(0); |
| ServletInputStream input = request.getInputStream(); |
| ServletOutputStream output = response.getOutputStream(); |
| |
| if (request.getDispatcherType() == DispatcherType.ERROR) |
| throw new IllegalStateException(); |
| |
| input.setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| async.start(() -> |
| { |
| assertScope(); |
| try |
| { |
| sleep(1000); |
| if (!input.isReady()) |
| throw new IllegalStateException(); |
| if (input.read() != 'X') |
| throw new IllegalStateException(); |
| if (!input.isReady()) |
| throw new IllegalStateException(); |
| if (input.read() != -1) |
| throw new IllegalStateException(); |
| } |
| catch (IOException x) |
| { |
| throw new UncheckedIOException(x); |
| } |
| }); |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| output.write(success.getBytes(StandardCharsets.UTF_8)); |
| async.complete(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| t.printStackTrace(); |
| async.complete(); |
| } |
| }); |
| } |
| }); |
| |
| byte[] data = "X".getBytes(StandardCharsets.UTF_8); |
| CountDownLatch clientLatch = new CountDownLatch(1); |
| DeferredContentProvider content = new DeferredContentProvider(); |
| client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .content(content) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(new BufferingResponseListener() |
| { |
| @Override |
| public void onComplete(Result result) |
| { |
| if (result.isSucceeded()) |
| { |
| Response response = result.getResponse(); |
| String content = getContentAsString(); |
| if (response.getStatus() == HttpStatus.OK_200 && success.equals(content)) |
| clientLatch.countDown(); |
| } |
| } |
| }); |
| |
| sleep(100); |
| content.offer(ByteBuffer.wrap(data)); |
| content.close(); |
| |
| assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testCompleteBeforeOnAllDataRead() throws Exception |
| { |
| String success = "SUCCESS"; |
| AtomicBoolean allDataRead = new AtomicBoolean(false); |
| |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| response.flushBuffer(); |
| |
| AsyncContext async = request.startAsync(); |
| ServletInputStream input = request.getInputStream(); |
| ServletOutputStream output = response.getOutputStream(); |
| |
| input.setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| while (input.isReady()) |
| { |
| int b = input.read(); |
| if (b < 0) |
| { |
| output.write(success.getBytes(StandardCharsets.UTF_8)); |
| async.complete(); |
| return; |
| } |
| } |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| assertScope(); |
| output.write("FAILURE".getBytes(StandardCharsets.UTF_8)); |
| allDataRead.set(true); |
| throw new IllegalStateException(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| t.printStackTrace(); |
| } |
| }); |
| } |
| }); |
| |
| ContentResponse response = client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .header(HttpHeader.CONNECTION, "close") |
| .content(new StringContentProvider("XYZ")) |
| .timeout(5, TimeUnit.SECONDS) |
| .send(); |
| |
| assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); |
| assertThat(response.getContentAsString(), Matchers.equalTo(success)); |
| } |
| |
| @Test |
| public void testEmptyAsyncRead() throws Exception |
| { |
| AtomicBoolean oda = new AtomicBoolean(); |
| CountDownLatch latch = new CountDownLatch(1); |
| |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| assertScope(); |
| AsyncContext asyncContext = request.startAsync(request, response); |
| response.setStatus(200); |
| response.getOutputStream().close(); |
| request.getInputStream().setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| assertScope(); |
| oda.set(true); |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| assertScope(); |
| asyncContext.complete(); |
| latch.countDown(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| assertScope(); |
| t.printStackTrace(); |
| asyncContext.complete(); |
| } |
| }); |
| } |
| }); |
| |
| ContentResponse response = client.newRequest(newURI()) |
| .path(servletPath) |
| .header(HttpHeader.CONNECTION, "close") |
| .timeout(5, TimeUnit.SECONDS) |
| .send(); |
| |
| assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); |
| assertTrue(latch.await(5, TimeUnit.SECONDS)); |
| // onDataAvailable must not be called. |
| Assert.assertFalse(oda.get()); |
| } |
| |
| @Test |
| public void testWriteFromOnDataAvailable() throws Exception |
| { |
| Queue<Throwable> errors = new ConcurrentLinkedQueue<>(); |
| CountDownLatch writeLatch = new CountDownLatch(1); |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| AsyncContext asyncContext = request.startAsync(); |
| request.getInputStream().setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| ServletInputStream input = request.getInputStream(); |
| ServletOutputStream output = response.getOutputStream(); |
| while (input.isReady()) |
| { |
| byte[] buffer = new byte[512]; |
| int read = input.read(buffer); |
| if (read < 0) |
| { |
| asyncContext.complete(); |
| break; |
| } |
| if (output.isReady()) |
| output.write(buffer, 0, read); |
| else |
| Assert.fail(); |
| } |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| errors.offer(t); |
| } |
| }); |
| response.getOutputStream().setWriteListener(new WriteListener() |
| { |
| @Override |
| public void onWritePossible() throws IOException |
| { |
| writeLatch.countDown(); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| errors.offer(t); |
| } |
| }); |
| } |
| }); |
| |
| String content = "0123456789ABCDEF"; |
| DeferredContentProvider contentProvider = new DeferredContentProvider(); |
| contentProvider.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); |
| CountDownLatch clientLatch = new CountDownLatch(1); |
| client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .content(contentProvider) |
| .send(new BufferingResponseListener() |
| { |
| @Override |
| public void onComplete(Result result) |
| { |
| if (result.isSucceeded()) |
| { |
| Response response = result.getResponse(); |
| assertThat(response.getStatus(), Matchers.equalTo(HttpStatus.OK_200)); |
| assertThat(getContentAsString(), Matchers.equalTo(content)); |
| assertThat(errors, Matchers.hasSize(0)); |
| clientLatch.countDown(); |
| } |
| } |
| }); |
| |
| assertTrue(writeLatch.await(5, TimeUnit.SECONDS)); |
| |
| contentProvider.close(); |
| |
| assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testAsyncReadEarlyEOF() throws Exception |
| { |
| // SSLEngine receives the close alert from the client, and when |
| // the server passes the response to encrypt and write, SSLEngine |
| // only generates the close alert back, without encrypting the |
| // response, so we need to skip the transports over TLS. |
| Assume.assumeThat(transport, Matchers.not(Matchers.isOneOf(Transport.HTTPS, Transport.H2))); |
| |
| String content = "jetty"; |
| int responseCode = HttpStatus.NO_CONTENT_204; |
| CountDownLatch readLatch = new CountDownLatch(content.length()); |
| CountDownLatch errorLatch = new CountDownLatch(1); |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| AsyncContext asyncContext = request.startAsync(); |
| ServletInputStream input = request.getInputStream(); |
| input.setReadListener(new ReadListener() |
| { |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| while (input.isReady() && !input.isFinished()) |
| { |
| int read = input.read(); |
| // System.err.printf("%x%n", read); |
| readLatch.countDown(); |
| } |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| } |
| |
| @Override |
| public void onError(Throwable x) |
| { |
| response.setStatus(responseCode); |
| asyncContext.complete(); |
| errorLatch.countDown(); |
| } |
| }); |
| } |
| }); |
| |
| CountDownLatch responseLatch = new CountDownLatch(1); |
| DeferredContentProvider contentProvider = new DeferredContentProvider(); |
| contentProvider.offer(ByteBuffer.wrap(content.getBytes(StandardCharsets.UTF_8))); |
| org.eclipse.jetty.client.api.Request request = client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .content(contentProvider) |
| .onResponseSuccess(response -> responseLatch.countDown()); |
| |
| Destination destination = client.getDestination(getScheme(), "localhost", connector.getLocalPort()); |
| FuturePromise<org.eclipse.jetty.client.api.Connection> promise = new FuturePromise<>(); |
| destination.newConnection(promise); |
| org.eclipse.jetty.client.api.Connection connection = promise.get(5, TimeUnit.SECONDS); |
| CountDownLatch clientLatch = new CountDownLatch(1); |
| connection.send(request, result -> |
| { |
| assertThat(result.getResponse().getStatus(), Matchers.equalTo(responseCode)); |
| clientLatch.countDown(); |
| }); |
| |
| assertTrue(readLatch.await(5, TimeUnit.SECONDS)); |
| |
| switch (transport) |
| { |
| case HTTP: |
| case HTTPS: |
| ((HttpConnectionOverHTTP)connection).getEndPoint().shutdownOutput(); |
| break; |
| case H2C: |
| case H2: |
| // In case of HTTP/2, we not only send the request, but also the preface and |
| // SETTINGS frames. SETTINGS frame need to be replied, so we want to wait to |
| // write the reply before shutting output down, so that the test does not fail. |
| Thread.sleep(1000); |
| Session session = ((HttpConnectionOverHTTP2)connection).getSession(); |
| ((HTTP2Session)session).getEndPoint().shutdownOutput(); |
| break; |
| default: |
| Assert.fail(); |
| } |
| |
| // Wait for the response to arrive before finishing the request. |
| assertTrue(responseLatch.await(5, TimeUnit.SECONDS)); |
| contentProvider.close(); |
| |
| assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); |
| assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); |
| } |
| |
| @Test |
| public void testWriteListenerFromOtherThread() throws Exception |
| { |
| start(new HttpServlet() |
| { |
| @Override |
| protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException |
| { |
| AsyncContext asyncContext = request.startAsync(); |
| asyncContext.setTimeout(0); |
| request.getInputStream().setReadListener(new Listener(asyncContext)); |
| } |
| }); |
| |
| int cores = 4; |
| int iterations = 10; |
| CountDownLatch latch = new CountDownLatch(cores * iterations); |
| Deque<Throwable> failures = new LinkedBlockingDeque<>(); |
| for (int i = 0; i < cores; ++i) |
| { |
| client.getExecutor().execute(() -> |
| { |
| for (int j = 0; j < iterations; ++j) |
| { |
| try |
| { |
| ContentResponse response = client.newRequest(newURI()) |
| .method(HttpMethod.POST) |
| .path(servletPath) |
| .content(new InputStreamContentProvider(new ByteArrayInputStream(new byte[16 * 1024]) |
| { |
| @Override |
| public int read(byte[] b, int off, int len) |
| { |
| sleep(5); |
| return super.read(b, off, Math.min(len, 4242)); |
| } |
| })) |
| .send(); |
| Assert.assertEquals(HttpStatus.OK_200, response.getStatus()); |
| latch.countDown(); |
| } |
| catch (Throwable x) |
| { |
| failures.offer(x); |
| } |
| } |
| }); |
| } |
| |
| Assert.assertTrue(latch.await(30, TimeUnit.SECONDS)); |
| Assert.assertTrue(failures.isEmpty()); |
| } |
| |
| private class Listener implements ReadListener, WriteListener |
| { |
| private final Executor executor = Executors.newFixedThreadPool(32); |
| private final CompletableFuture<?> inputComplete = new CompletableFuture<>(); |
| private final CompletableFuture<?> outputComplete = new CompletableFuture<>(); |
| private final AtomicBoolean responseWritten = new AtomicBoolean(); |
| private final AsyncContext asyncContext; |
| private final HttpServletResponse response; |
| private final ServletInputStream input; |
| private final ServletOutputStream output; |
| |
| public Listener(AsyncContext asyncContext) throws IOException |
| { |
| this.asyncContext = asyncContext; |
| this.response = (HttpServletResponse)asyncContext.getResponse(); |
| this.input = asyncContext.getRequest().getInputStream(); |
| this.output = response.getOutputStream(); |
| CompletableFuture.allOf(inputComplete, outputComplete) |
| .whenComplete((ignoredResult, ignoredThrowable) -> asyncContext.complete()); |
| // Dispatch setting the write listener to another thread. |
| executor.execute(() -> output.setWriteListener(this)); |
| } |
| |
| @Override |
| public void onDataAvailable() throws IOException |
| { |
| byte[] buffer = new byte[16 * 1024]; |
| while (input.isReady()) |
| { |
| if (input.read(buffer) < 0) |
| return; |
| } |
| } |
| |
| @Override |
| public void onAllDataRead() throws IOException |
| { |
| inputComplete.complete(null); |
| } |
| |
| @Override |
| public void onWritePossible() throws IOException |
| { |
| // Dispatch OWP to another thread. |
| executor.execute(() -> |
| { |
| while (output.isReady()) |
| { |
| if (responseWritten.compareAndSet(false, true)) |
| { |
| try |
| { |
| response.setStatus(HttpServletResponse.SC_OK); |
| response.setContentType("text/plain;charset=utf-8"); |
| output.write("Hello world".getBytes()); |
| } |
| catch (IOException x) |
| { |
| throw new UncheckedIOException(x); |
| } |
| } |
| else |
| { |
| outputComplete.complete(null); |
| return; |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public void onError(Throwable t) |
| { |
| response.setStatus(HttpServletResponse.SC_INTERNAL_SERVER_ERROR); |
| asyncContext.complete(); |
| } |
| } |
| } |