blob: 773593d2028ef6f6a4bcd7287d643d5e75bf12f2 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.servlet;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ReadListener;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.DebugListener;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
// TODO need these on HTTP2 as well!
public class AsyncServletIOTest
{
private static final Logger LOG = Log.getLogger(AsyncServletIOTest.class);
protected AsyncIOServlet _servlet0=new AsyncIOServlet();
protected AsyncIOServlet2 _servlet2=new AsyncIOServlet2();
protected AsyncIOServlet3 _servlet3=new AsyncIOServlet3();
protected int _port;
protected Server _server = new Server();
protected ServletHandler _servletHandler;
protected ServerConnector _connector;
@Before
public void setUp() throws Exception
{
HttpConfiguration http_config = new HttpConfiguration();
http_config.setOutputBufferSize(4096);
_connector = new ServerConnector(_server,new HttpConnectionFactory(http_config));
_server.setConnectors(new Connector[]{ _connector });
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/ctx");
context.addEventListener(new DebugListener());
_server.setHandler(context);
_servletHandler=context.getServletHandler();
ServletHolder holder=new ServletHolder(_servlet0);
holder.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder,"/path/*");
ServletHolder holder2=new ServletHolder(_servlet2);
holder2.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder2,"/path2/*");
ServletHolder holder3=new ServletHolder(_servlet3);
holder3.setAsyncSupported(true);
_servletHandler.addServletWithMapping(holder3,"/path3/*");
_server.start();
_port=_connector.getLocalPort();
_owp.set(0);
_oda.set(0);
_read.set(0);
}
@After
public void tearDown() throws Exception
{
_server.stop();
}
@Test
public void testEmpty() throws Exception
{
process();
}
@Test
public void testWrite() throws Exception
{
process(10);
}
@Test
public void testWrites() throws Exception
{
process(10,1,20,10);
}
@Test
public void testWritesFlushWrites() throws Exception
{
process(10,1,0,20,10);
}
@Test
public void testBigWrite() throws Exception
{
process(102400);
}
@Test
public void testBigWrites() throws Exception
{
process(102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400,102400);
Assert.assertThat("On Write Possible",_owp.get(),greaterThanOrEqualTo(1));
}
@Test
public void testRead() throws Exception
{
process("Hello!!!\r\n");
}
@Test
public void testBigRead() throws Exception
{
process("Now is the time for all good men to come to the aid of the party. How now Brown Cow. The quick brown fox jumped over the lazy dog. The moon is blue to a fish in love.\r\n");
}
@Test
public void testReadWrite() throws Exception
{
process("Hello!!!\r\n",10);
}
@Test
public void testAsync2() throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("GET /ctx/path2/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Connection: close\r\n")
.append("\r\n");
int port=_port;
List<String> list = new ArrayList<>();
try (Socket socket = new Socket("localhost",port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
String line = in.readLine();
LOG.debug("response-line: "+line);
Assert.assertThat(line,startsWith("HTTP/1.1 200 OK"));
// Skip headers
while (line!=null)
{
line = in.readLine();
LOG.debug("header-line: "+line);
if (line.length()==0)
break;
}
// Get body slowly
while (true)
{
line = in.readLine();
LOG.debug("body: "+line);
if (line==null)
break;
list.add(line);
}
}
Assert.assertEquals(list.get(0),"data");
Assert.assertTrue(_servlet2.completed.await(5, TimeUnit.SECONDS));
}
@Test
public void testAsyncConsumeAll() throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("GET /ctx/path3/info HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Content-Type: text/plain\r\n")
.append("Content-Length: 10\r\n")
.append("\r\n")
.append("0");
int port=_port;
try (Socket socket = new Socket("localhost",port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
String line = in.readLine();
LOG.debug("response-line: "+line);
Assert.assertThat(line,startsWith("HTTP/1.1 200 OK"));
// Skip headers
while (line!=null)
{
line = in.readLine();
LOG.debug("header-line: "+line);
if (line.length()==0)
break;
}
// Get body
line = in.readLine();
LOG.debug("body: "+line);
Assert.assertEquals("DONE",line);
// The connection should be aborted
line = in.readLine();
Assert.assertNull(line);
}
}
public synchronized List<String> process(String content,int... writes) throws Exception
{
return process(content.getBytes(StandardCharsets.ISO_8859_1),writes);
}
public synchronized List<String> process(int... writes) throws Exception
{
return process((byte[])null,writes);
}
public synchronized List<String> process(byte[] content, int... writes) throws Exception
{
StringBuilder request = new StringBuilder(512);
request.append("GET /ctx/path/info");
char s='?';
for (int w: writes)
{
request.append(s).append("w=").append(w);
s='&';
}
request.append(" HTTP/1.1\r\n")
.append("Host: localhost\r\n")
.append("Connection: close\r\n");
if (content!=null)
request.append("Content-Length: ").append(content.length).append("\r\n")
.append("Content-Type: text/plain\r\n");
request.append("\r\n");
int port=_port;
List<String> list = new ArrayList<>();
try (Socket socket = new Socket("localhost",port))
{
socket.setSoTimeout(1000000);
OutputStream out = socket.getOutputStream();
out.write(request.toString().getBytes(StandardCharsets.ISO_8859_1));
if (content!=null && content.length>0)
{
Thread.sleep(100);
out.write(content[0]);
Thread.sleep(100);
int half=(content.length-1)/2;
out.write(content,1,half);
Thread.sleep(100);
out.write(content,1+half,content.length-half-1);
}
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()),102400);
// response line
String line = in.readLine();
LOG.debug("response-line: "+line);
Assert.assertThat(line,startsWith("HTTP/1.1 200 OK"));
// Skip headers
while (line!=null)
{
line = in.readLine();
LOG.debug("header-line: "+line);
if (line.length()==0)
break;
}
// Get body slowly
while (true)
{
line = in.readLine();
if (line==null)
break;
LOG.debug("body: "+brief(line));
list.add(line);
Thread.sleep(50);
}
}
// check lines
int w=0;
for (String line : list)
{
LOG.debug("line: "+brief(line));
if ("-".equals(line))
continue;
assertEquals("Line Length",writes[w],line.length());
assertEquals("Line Contents",line.charAt(0),'0'+(w%10));
w++;
if (w<writes.length && writes[w]<=0)
w++;
}
if (content!=null)
Assert.assertEquals("Content Length",content.length,_read.get());
return list;
}
private static String brief(String line)
{
return line.length()+"\t"+(line.length()>40?(line.substring(0,40)+"..."):line);
}
static AtomicInteger _owp = new AtomicInteger();
static AtomicInteger _oda = new AtomicInteger();
static AtomicInteger _read = new AtomicInteger();
private static class AsyncIOServlet extends HttpServlet
{
private static final long serialVersionUID = -8161977157098646562L;
public AsyncIOServlet()
{
}
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws ServletException, IOException
{
final AsyncContext async = request.startAsync();
final AtomicInteger complete = new AtomicInteger(2);
final AtomicBoolean onDataAvailable = new AtomicBoolean(false);
// Asynchronous Read
if (request.getContentLength()>0)
{
// System.err.println("reading "+request.getContentLength());
final ServletInputStream in=request.getInputStream();
in.setReadListener(new ReadListener()
{
byte[] _buf=new byte[32];
@Override
public void onError(Throwable t)
{
if (complete.decrementAndGet()==0)
async.complete();
}
@Override
public void onDataAvailable() throws IOException
{
if (!onDataAvailable.compareAndSet(false,true))
throw new IllegalStateException();
//System.err.println("ODA");
while (in.isReady() && !in.isFinished())
{
_oda.incrementAndGet();
int len=in.read(_buf);
//System.err.println("read "+len);
if (len>0)
_read.addAndGet(len);
}
if (!onDataAvailable.compareAndSet(true,false))
throw new IllegalStateException();
}
@Override
public void onAllDataRead() throws IOException
{
if (onDataAvailable.get())
{
LOG.warn("OADR too early!");
_read.set(-1);
}
// System.err.println("OADR");
if (complete.decrementAndGet()==0)
async.complete();
}
});
}
else
complete.decrementAndGet();
// Asynchronous Write
final String[] writes = request.getParameterValues("w");
final ServletOutputStream out = response.getOutputStream();
out.setWriteListener(new WriteListener()
{
int _w=0;
@Override
public void onWritePossible() throws IOException
{
LOG.debug("OWP");
_owp.incrementAndGet();
while (writes!=null && _w< writes.length)
{
int write=Integer.valueOf(writes[_w++]);
if (write==0)
out.flush();
else
{
byte[] buf=new byte[write+1];
Arrays.fill(buf,(byte)('0'+((_w-1)%10)));
buf[write]='\n';
out.write(buf);
}
if (!out.isReady())
return;
}
if (complete.decrementAndGet()==0)
async.complete();
}
@Override
public void onError(Throwable t)
{
async.complete();
}
});
}
}
@SuppressWarnings("serial")
public class AsyncIOServlet2 extends HttpServlet
{
public CountDownLatch completed = new CountDownLatch(1);
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException
{
new SampleAsycListener(request,response);
}
class SampleAsycListener implements WriteListener, AsyncListener
{
final ServletResponse response;
final ServletOutputStream servletOutputStream;
final AsyncContext asyncContext;
volatile boolean written=false;
SampleAsycListener(HttpServletRequest request,HttpServletResponse response) throws IOException
{
asyncContext = request.startAsync();
asyncContext.setTimeout(10000L);
asyncContext.addListener(this);
servletOutputStream = response.getOutputStream();
servletOutputStream.setWriteListener(this);
this.response=response;
}
@Override
public void onWritePossible() throws IOException
{
if (!written)
{
written=true;
response.setContentLength(5);
servletOutputStream.write("data\n".getBytes());
}
if (servletOutputStream.isReady())
{
asyncContext.complete();
}
}
@Override
public void onError(final Throwable t)
{
t.printStackTrace();
asyncContext.complete();
}
@Override
public void onComplete(final AsyncEvent event) throws IOException
{
completed.countDown();
}
@Override
public void onTimeout(final AsyncEvent event) throws IOException
{
asyncContext.complete();
}
@Override
public void onError(final AsyncEvent event) throws IOException
{
asyncContext.complete();
}
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
}
}
}
@SuppressWarnings("serial")
public class AsyncIOServlet3 extends HttpServlet
{
public CountDownLatch completed = new CountDownLatch(1);
@Override
public void doGet(final HttpServletRequest request, final HttpServletResponse response) throws IOException
{
AsyncContext async = request.startAsync();
request.getInputStream().setReadListener(new ReadListener()
{
@Override
public void onError(Throwable t)
{
}
@Override
public void onDataAvailable() throws IOException
{
}
@Override
public void onAllDataRead() throws IOException
{
}
});
response.setStatus(200);
response.getOutputStream().print("DONE");
async.complete();
}
}
}