Rewritten Netty Jersey implementation using direct ByteBuf consumption (#4312)
Signed-off-by: Venkat Ganesh <010gvr@gmail.com>
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
index 1a9cbe5..5531498 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
@@ -16,7 +16,6 @@
package org.glassfish.jersey.netty.connector;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
@@ -31,6 +30,7 @@
import org.glassfish.jersey.netty.connector.internal.NettyInputStream;
import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.HttpContent;
@@ -50,7 +50,7 @@
class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
private final NettyConnector connector;
- private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
+ private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
private final AsyncConnectorCallback asyncConnectorCallback;
private final ClientRequest jerseyRequest;
@@ -89,7 +89,7 @@
for (Map.Entry<String, String> entry : response.headers().entries()) {
jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
}
-
+ isList.clear(); // clearing the content - possible leftover from previous request processing.
// request entity handling.
if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
|| HttpUtil.isTransferEncodingChunked(response)) {
@@ -97,7 +97,7 @@
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
- isList.add(NettyInputStream.END_OF_INPUT_ERROR);
+ isList.add(Unpooled.EMPTY_BUFFER);
}
});
@@ -123,21 +123,16 @@
}
if (msg instanceof HttpContent) {
-
HttpContent httpContent = (HttpContent) msg;
ByteBuf content = httpContent.content();
-
if (content.isReadable()) {
- // copy bytes - when netty reads last chunk, it automatically closes the channel, which invalidates all
- // relates ByteBuffs.
- byte[] bytes = new byte[content.readableBytes()];
- content.getBytes(content.readerIndex(), bytes);
- isList.add(new ByteArrayInputStream(bytes));
+ content.retain();
+ isList.add(content);
}
if (msg instanceof LastHttpContent) {
- isList.add(NettyInputStream.END_OF_INPUT);
+ isList.add(Unpooled.EMPTY_BUFFER);
}
}
}
@@ -153,6 +148,6 @@
});
}
future.completeExceptionally(cause);
- isList.add(NettyInputStream.END_OF_INPUT_ERROR);
+ ctx.close();
}
}
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java
index ef14ba3..e5262d3 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java
@@ -42,10 +42,9 @@
public class JerseyChunkedInput extends OutputStream implements ChunkedInput<ByteBuf>, ChannelFutureListener {
private static final ByteBuffer VOID = ByteBuffer.allocate(0);
- private static final int CAPACITY = 8;
- // TODO this needs to be configurable, see JERSEY-3228
- private static final int WRITE_TIMEOUT = 10000;
- private static final int READ_TIMEOUT = 10000;
+ private static final int CAPACITY = Integer.getInteger("jersey.ci.capacity", 8);
+ private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.ci.read.timeout", 10000);
+ private static final int READ_TIMEOUT = Integer.getInteger("jersey.ci.write.timeout", 10000);
private final LinkedBlockingDeque<ByteBuffer> queue = new LinkedBlockingDeque<>(CAPACITY);
private final Channel ctx;
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
index aab83b3..741121f 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
@@ -20,77 +20,47 @@
import java.io.InputStream;
import java.util.concurrent.LinkedBlockingDeque;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
/**
* Input stream which servers as Request entity input.
* <p>
- * Converts Netty NIO buffers to an input streams and stores them in the queue,
- * waiting for Jersey to process it.
- *
- * @author Pavel Bucek
+ * Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey
*/
public class NettyInputStream extends InputStream {
- private volatile boolean end = false;
+ private final LinkedBlockingDeque<ByteBuf> isList;
- /**
- * End of input.
- */
- public static final InputStream END_OF_INPUT = new InputStream() {
- @Override
- public int read() throws IOException {
- return 0;
- }
-
- @Override
- public String toString() {
- return "END_OF_INPUT " + super.toString();
- }
- };
-
- /**
- * Unexpected end of input.
- */
- public static final InputStream END_OF_INPUT_ERROR = new InputStream() {
- @Override
- public int read() throws IOException {
- return 0;
- }
-
- @Override
- public String toString() {
- return "END_OF_INPUT_ERROR " + super.toString();
- }
- };
-
- private final LinkedBlockingDeque<InputStream> isList;
-
- public NettyInputStream(LinkedBlockingDeque<InputStream> isList) {
+ public NettyInputStream(LinkedBlockingDeque<ByteBuf> isList) {
this.isList = isList;
}
- private interface ISReader {
- int readFrom(InputStream take) throws IOException;
- }
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
- private int readInternal(ISReader isReader) throws IOException {
- if (end) {
- return -1;
- }
-
- InputStream take;
+ ByteBuf take;
try {
take = isList.take();
-
- if (checkEndOfInput(take)) {
+ boolean isReadable = take.isReadable();
+ int read = -1;
+ if (checkEndOfInputOrError(take)) {
+ take.release();
return -1;
}
- int read = isReader.readFrom(take);
-
- if (take.available() > 0) {
- isList.addFirst(take);
+ if (isReadable) {
+ int readableBytes = take.readableBytes();
+ read = Math.min(readableBytes, len);
+ take.readBytes(b, off, read);
+ if (read < len) {
+ take.release();
+ } else {
+ isList.addFirst(take);
+ }
} else {
- take.close();
+ read = 0;
+ take.release(); //We don't need `0`
}
return read;
@@ -100,33 +70,53 @@
}
@Override
- public int read(byte[] b, int off, int len) throws IOException {
- return readInternal(take -> take.read(b, off, len));
+ public int read() throws IOException {
+
+ ByteBuf take;
+ try {
+ take = isList.take();
+ boolean isReadable = take.isReadable();
+ if (checkEndOfInputOrError(take)) {
+ take.release();
+ return -1;
+ }
+
+ if (isReadable) {
+ return take.readInt();
+ } else {
+ take.release(); //We don't need `0`
+ }
+
+ return 0;
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted.", e);
+ }
}
@Override
- public int read() throws IOException {
- return readInternal(InputStream::read);
+ public void close() throws IOException {
+ if (isList != null) {
+ while (!isList.isEmpty()) {
+ try {
+ isList.take().release();
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted. Potential ByteBuf Leak.", e);
+ }
+ }
+ }
+ super.close();
}
@Override
public int available() throws IOException {
- InputStream peek = isList.peek();
- if (peek != null) {
- return peek.available();
+ ByteBuf peek = isList.peek();
+ if (peek != null && peek.isReadable()) {
+ return peek.readableBytes();
}
-
return 0;
}
- private boolean checkEndOfInput(InputStream take) throws IOException {
- if (take == END_OF_INPUT) {
- end = true;
- return true;
- } else if (take == END_OF_INPUT_ERROR) {
- end = true;
- throw new IOException("Connection was closed prematurely.");
- }
- return false;
+ private boolean checkEndOfInputOrError(ByteBuf take) throws IOException {
+ return take == Unpooled.EMPTY_BUFFER;
}
}
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
index b8da5fe..e64476d 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
@@ -28,8 +28,8 @@
import java.util.concurrent.LinkedBlockingDeque;
import javax.ws.rs.core.SecurityContext;
-
-import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -55,7 +55,7 @@
class JerseyHttp2ServerHandler extends ChannelDuplexHandler {
private final URI baseUri;
- private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
+ private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
private final NettyHttpContainer container;
private final ResourceConfig resourceConfig;
@@ -92,9 +92,9 @@
* Process incoming data.
*/
private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
- isList.add(new ByteBufInputStream(data.content(), true));
+ isList.add(data.content());
if (data.isEndStream()) {
- isList.add(NettyInputStream.END_OF_INPUT);
+ isList.add(Unpooled.EMPTY_BUFFER);
}
}
@@ -163,7 +163,7 @@
ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
@Override
public void operationComplete(Future<? super Void> future) throws Exception {
- isList.add(NettyInputStream.END_OF_INPUT_ERROR);
+ isList.add(Unpooled.EMPTY_BUFFER);
}
});
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
index 06d1782..712cb1f 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
@@ -16,16 +16,17 @@
package org.glassfish.jersey.netty.httpserver;
-import java.io.IOException;
-import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingDeque;
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.MediaType;
+
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -36,8 +37,6 @@
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
import org.glassfish.jersey.internal.PropertiesDelegate;
import org.glassfish.jersey.netty.connector.internal.NettyInputStream;
import org.glassfish.jersey.server.ContainerRequest;
@@ -46,23 +45,26 @@
/**
* {@link io.netty.channel.ChannelInboundHandler} which servers as a bridge
- * between Netty and Jersey.
+ * between Netty and Jersey. Handles additional validation on the payload size
+ * that is controlled by a JVM property {@code max.http.request.entitySizeMb}.
*
- * @author Pavel Bucek
+ * @author Pavel Bucek (pavel.bucek at oracle.com)
*/
class JerseyServerHandler extends ChannelInboundHandlerAdapter {
private final URI baseUri;
- private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
+ private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
private final NettyHttpContainer container;
private final ResourceConfig resourceConfig;
+ private static final long MAX_REQUEST_ENTITY_BYTES = Long.getLong("jersey.max.http.request.entitySizeMb", new Long(50000))
+ .longValue() * 1024 * 1024; //50 MB default limit
+
/**
* Constructor.
*
* @param baseUri base {@link URI} of the container (includes context path, if any).
* @param container Netty container implementation.
- * @param resourceConfig the application {@link ResourceConfig}
*/
public JerseyServerHandler(URI baseUri, NettyHttpContainer container, ResourceConfig resourceConfig) {
this.baseUri = baseUri;
@@ -85,6 +87,33 @@
requestContext.setWriter(new NettyResponseWriter(ctx, req, container));
+ long contentLength = req.headers().contains(HttpHeaderNames.CONTENT_LENGTH) ? HttpUtil.getContentLength(req)
+ : -1L;
+ if (contentLength >= MAX_REQUEST_ENTITY_BYTES) {
+ requestContext.abortWith(javax.ws.rs.core.Response.status(Status.REQUEST_ENTITY_TOO_LARGE).build());
+ } else {
+ /**
+ * Jackson JSON decoder tries to read a minimum of 2 bytes (4
+ * for BOM). So, during an empty or 1-byte input, we'd want to
+ * avoid reading the entity to safely handle this edge case by
+ * eventually throwing a malformed JSON exception.
+ */
+ String contentType = req.headers().get(HttpHeaderNames.CONTENT_TYPE);
+ boolean isJson = contentType != null ? contentType.toLowerCase().contains(MediaType.APPLICATION_JSON)
+ : false;
+ //process entity streams only if there is an entity issued in the request (i.e., content-length >=0).
+ //Otherwise, it's safe to discard during next processing
+ if ((!isJson && contentLength != -1) || HttpUtil.isTransferEncodingChunked(req)
+ || (isJson && contentLength >= 2)) {
+ requestContext.setEntityStream(new NettyInputStream(isList));
+ }
+ }
+
+ // copying headers from netty request to jersey container request context.
+ for (String name : req.headers().names()) {
+ requestContext.headers(name, req.headers().getAll(name));
+ }
+
// must be like this, since there is a blocking read from Jersey
container.getExecutorService().execute(new Runnable() {
@Override
@@ -95,18 +124,17 @@
}
if (msg instanceof HttpContent) {
- HttpContent httpContent = (HttpContent) msg;
+ HttpContent httpContent = (HttpContent) msg;
- ByteBuf content = httpContent.content();
+ ByteBuf content = httpContent.content();
+ if (content.isReadable()) {
+ isList.add(content);
+ }
- if (content.isReadable()) {
- isList.add(new ByteBufInputStream(content, true));
- }
-
- if (msg instanceof LastHttpContent) {
- isList.add(NettyInputStream.END_OF_INPUT);
- }
- }
+ if (msg instanceof LastHttpContent) {
+ isList.add(Unpooled.EMPTY_BUFFER);
+ }
+ }
}
/**
@@ -148,35 +176,10 @@
}
}, resourceConfig);
- // request entity handling.
- if ((req.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(req) > 0)
- || HttpUtil.isTransferEncodingChunked(req)) {
-
- ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
- @Override
- public void operationComplete(Future<? super Void> future) throws Exception {
- isList.add(NettyInputStream.END_OF_INPUT_ERROR);
- }
- });
-
- requestContext.setEntityStream(new NettyInputStream(isList));
- } else {
- requestContext.setEntityStream(new InputStream() {
- @Override
- public int read() throws IOException {
- return -1;
- }
- });
- }
-
- // copying headers from netty request to jersey container request context.
- for (String name : req.headers().names()) {
- requestContext.headers(name, req.headers().getAll(name));
- }
-
return requestContext;
}
+
private NettySecurityContext getSecurityContext(ChannelHandlerContext ctx) {
return new NettySecurityContext(ctx);
}
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java
index 00998c3..e2a4931 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java
@@ -46,7 +46,7 @@
/**
* Netty implementation of {@link ContainerResponseWriter}.
*
- * @author Pavel Bucek
+ * @author Pavel Bucek (pavel.bucek at oracle.com)
*/
class NettyResponseWriter implements ContainerResponseWriter {
@@ -119,7 +119,11 @@
JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ctx.channel());
- ctx.write(new HttpChunkedInput(jerseyChunkedInput)).addListener(FLUSH_FUTURE);
+ if (HttpUtil.isTransferEncodingChunked(response)) {
+ ctx.writeAndFlush(new HttpChunkedInput(jerseyChunkedInput));
+ } else {
+ ctx.write(new HttpChunkedInput(jerseyChunkedInput)).addListener(FLUSH_FUTURE);
+ }
return jerseyChunkedInput;
} else {