Rewritten Netty Jersey implementation using direct ByteBuf consumption (#4312)

Signed-off-by: Venkat Ganesh <010gvr@gmail.com>
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
index 1a9cbe5..5531498 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/JerseyClientHandler.java
@@ -16,7 +16,6 @@
 
 package org.glassfish.jersey.netty.connector;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Map;
@@ -31,6 +30,7 @@
 import org.glassfish.jersey.netty.connector.internal.NettyInputStream;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.HttpContent;
@@ -50,7 +50,7 @@
 class JerseyClientHandler extends SimpleChannelInboundHandler<HttpObject> {
 
     private final NettyConnector connector;
-    private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
+    private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
 
     private final AsyncConnectorCallback asyncConnectorCallback;
     private final ClientRequest jerseyRequest;
@@ -89,7 +89,7 @@
             for (Map.Entry<String, String> entry : response.headers().entries()) {
                 jerseyResponse.getHeaders().add(entry.getKey(), entry.getValue());
             }
-
+            isList.clear(); // clearing the content - possible leftover from previous request processing.
             // request entity handling.
             if ((response.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(response) > 0)
                     || HttpUtil.isTransferEncodingChunked(response)) {
@@ -97,7 +97,7 @@
                 ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                     @Override
                     public void operationComplete(Future<? super Void> future) throws Exception {
-                        isList.add(NettyInputStream.END_OF_INPUT_ERROR);
+                        isList.add(Unpooled.EMPTY_BUFFER);
                     }
                 });
 
@@ -123,21 +123,16 @@
 
         }
         if (msg instanceof HttpContent) {
-
             HttpContent httpContent = (HttpContent) msg;
 
             ByteBuf content = httpContent.content();
-
             if (content.isReadable()) {
-                // copy bytes - when netty reads last chunk, it automatically closes the channel, which invalidates all
-                // relates ByteBuffs.
-                byte[] bytes = new byte[content.readableBytes()];
-                content.getBytes(content.readerIndex(), bytes);
-                isList.add(new ByteArrayInputStream(bytes));
+                content.retain();
+                isList.add(content);
             }
 
             if (msg instanceof LastHttpContent) {
-                isList.add(NettyInputStream.END_OF_INPUT);
+                isList.add(Unpooled.EMPTY_BUFFER);
             }
         }
     }
@@ -153,6 +148,6 @@
             });
         }
         future.completeExceptionally(cause);
-        isList.add(NettyInputStream.END_OF_INPUT_ERROR);
+        ctx.close();
     }
 }
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java
index ef14ba3..e5262d3 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/JerseyChunkedInput.java
@@ -42,10 +42,9 @@
 public class JerseyChunkedInput extends OutputStream implements ChunkedInput<ByteBuf>, ChannelFutureListener {
 
     private static final ByteBuffer VOID = ByteBuffer.allocate(0);
-    private static final int CAPACITY = 8;
-    // TODO this needs to be configurable, see JERSEY-3228
-    private static final int WRITE_TIMEOUT = 10000;
-    private static final int READ_TIMEOUT = 10000;
+    private static final int CAPACITY = Integer.getInteger("jersey.ci.capacity", 8);
+    private static final int WRITE_TIMEOUT = Integer.getInteger("jersey.ci.read.timeout", 10000);
+    private static final int READ_TIMEOUT = Integer.getInteger("jersey.ci.write.timeout", 10000);
 
     private final LinkedBlockingDeque<ByteBuffer> queue = new LinkedBlockingDeque<>(CAPACITY);
     private final Channel ctx;
diff --git a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
index aab83b3..741121f 100644
--- a/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
+++ b/connectors/netty-connector/src/main/java/org/glassfish/jersey/netty/connector/internal/NettyInputStream.java
@@ -20,77 +20,47 @@
 import java.io.InputStream;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
 /**
  * Input stream which servers as Request entity input.
  * <p>
- * Converts Netty NIO buffers to an input streams and stores them in the queue,
- * waiting for Jersey to process it.
- *
- * @author Pavel Bucek
+ * Consumes a list of pending {@link ByteBuf}s and processes them on request by Jersey
  */
 public class NettyInputStream extends InputStream {
 
-    private volatile boolean end = false;
+    private final LinkedBlockingDeque<ByteBuf> isList;
 
-    /**
-     * End of input.
-     */
-    public static final InputStream END_OF_INPUT = new InputStream() {
-        @Override
-        public int read() throws IOException {
-            return 0;
-        }
-
-        @Override
-        public String toString() {
-            return "END_OF_INPUT " + super.toString();
-        }
-    };
-
-    /**
-     * Unexpected end of input.
-     */
-    public static final InputStream END_OF_INPUT_ERROR = new InputStream() {
-        @Override
-        public int read() throws IOException {
-            return 0;
-        }
-
-        @Override
-        public String toString() {
-            return "END_OF_INPUT_ERROR " + super.toString();
-        }
-    };
-
-    private final LinkedBlockingDeque<InputStream> isList;
-
-    public NettyInputStream(LinkedBlockingDeque<InputStream> isList) {
+    public NettyInputStream(LinkedBlockingDeque<ByteBuf> isList) {
         this.isList = isList;
     }
 
-    private interface ISReader {
-        int readFrom(InputStream take) throws IOException;
-    }
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
 
-    private int readInternal(ISReader isReader) throws IOException {
-        if (end) {
-            return -1;
-        }
-
-        InputStream take;
+        ByteBuf take;
         try {
             take = isList.take();
-
-            if (checkEndOfInput(take)) {
+            boolean isReadable = take.isReadable();
+            int read = -1;
+            if (checkEndOfInputOrError(take)) {
+                take.release();
                 return -1;
             }
 
-            int read = isReader.readFrom(take);
-
-            if (take.available() > 0) {
-                isList.addFirst(take);
+            if (isReadable) {
+                int readableBytes = take.readableBytes();
+                read = Math.min(readableBytes, len);
+                take.readBytes(b, off, read);
+                if (read < len) {
+                    take.release();
+                } else {
+                    isList.addFirst(take);
+                }
             } else {
-                take.close();
+                read = 0;
+                take.release(); //We don't need `0`
             }
 
             return read;
@@ -100,33 +70,53 @@
     }
 
     @Override
-    public int read(byte[] b, int off, int len) throws IOException {
-        return readInternal(take -> take.read(b, off, len));
+    public int read() throws IOException {
+
+        ByteBuf take;
+        try {
+            take = isList.take();
+            boolean isReadable = take.isReadable();
+            if (checkEndOfInputOrError(take)) {
+                take.release();
+                return -1;
+            }
+
+            if (isReadable) {
+                return take.readInt();
+            } else {
+                take.release(); //We don't need `0`
+            }
+
+            return 0;
+        } catch (InterruptedException e) {
+            throw new IOException("Interrupted.", e);
+        }
     }
 
     @Override
-    public int read() throws IOException {
-        return readInternal(InputStream::read);
+    public void close() throws IOException {
+        if (isList != null) {
+            while (!isList.isEmpty()) {
+                try {
+                    isList.take().release();
+                } catch (InterruptedException e) {
+                    throw new IOException("Interrupted. Potential ByteBuf Leak.", e);
+                }
+            }
+        }
+        super.close();
     }
 
     @Override
     public int available() throws IOException {
-        InputStream peek = isList.peek();
-        if (peek != null) {
-            return peek.available();
+        ByteBuf peek = isList.peek();
+        if (peek != null && peek.isReadable()) {
+            return peek.readableBytes();
         }
-
         return 0;
     }
 
-    private boolean checkEndOfInput(InputStream take) throws IOException {
-        if (take == END_OF_INPUT) {
-            end = true;
-            return true;
-        } else if (take == END_OF_INPUT_ERROR) {
-            end = true;
-            throw new IOException("Connection was closed prematurely.");
-        }
-        return false;
+    private boolean checkEndOfInputOrError(ByteBuf take) throws IOException {
+        return take == Unpooled.EMPTY_BUFFER;
     }
 }
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
index b8da5fe..e64476d 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyHttp2ServerHandler.java
@@ -28,8 +28,8 @@
 import java.util.concurrent.LinkedBlockingDeque;
 
 import javax.ws.rs.core.SecurityContext;
-
-import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -55,7 +55,7 @@
 class JerseyHttp2ServerHandler extends ChannelDuplexHandler {
 
     private final URI baseUri;
-    private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
+    private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
     private final NettyHttpContainer container;
     private final ResourceConfig resourceConfig;
 
@@ -92,9 +92,9 @@
      * Process incoming data.
      */
     private void onDataRead(ChannelHandlerContext ctx, Http2DataFrame data) throws Exception {
-        isList.add(new ByteBufInputStream(data.content(), true));
+        isList.add(data.content());
         if (data.isEndStream()) {
-            isList.add(NettyInputStream.END_OF_INPUT);
+            isList.add(Unpooled.EMPTY_BUFFER);
         }
     }
 
@@ -163,7 +163,7 @@
             ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
                 @Override
                 public void operationComplete(Future<? super Void> future) throws Exception {
-                    isList.add(NettyInputStream.END_OF_INPUT_ERROR);
+                    isList.add(Unpooled.EMPTY_BUFFER);
                 }
             });
 
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
index 06d1782..712cb1f 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/JerseyServerHandler.java
@@ -16,16 +16,17 @@
 
 package org.glassfish.jersey.netty.httpserver;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.LinkedBlockingDeque;
 
+import javax.ws.rs.core.Response.Status;
+import javax.ws.rs.core.MediaType;
+
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -36,8 +37,6 @@
 import io.netty.handler.codec.http.HttpUtil;
 import io.netty.handler.codec.http.HttpVersion;
 import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
 import org.glassfish.jersey.internal.PropertiesDelegate;
 import org.glassfish.jersey.netty.connector.internal.NettyInputStream;
 import org.glassfish.jersey.server.ContainerRequest;
@@ -46,23 +45,26 @@
 
 /**
  * {@link io.netty.channel.ChannelInboundHandler} which servers as a bridge
- * between Netty and Jersey.
+ * between Netty and Jersey. Handles additional validation on the payload size
+ * that is controlled by a JVM property {@code max.http.request.entitySizeMb}.
  *
- * @author Pavel Bucek
+ * @author Pavel Bucek (pavel.bucek at oracle.com)
  */
 class JerseyServerHandler extends ChannelInboundHandlerAdapter {
 
     private final URI baseUri;
-    private final LinkedBlockingDeque<InputStream> isList = new LinkedBlockingDeque<>();
+    private final LinkedBlockingDeque<ByteBuf> isList = new LinkedBlockingDeque<>();
     private final NettyHttpContainer container;
     private final ResourceConfig resourceConfig;
 
+    private static final long MAX_REQUEST_ENTITY_BYTES = Long.getLong("jersey.max.http.request.entitySizeMb", new Long(50000))
+            .longValue() * 1024 * 1024; //50 MB default limit
+
     /**
      * Constructor.
      *
      * @param baseUri   base {@link URI} of the container (includes context path, if any).
      * @param container Netty container implementation.
-     * @param resourceConfig  the application {@link ResourceConfig}
      */
     public JerseyServerHandler(URI baseUri, NettyHttpContainer container, ResourceConfig resourceConfig) {
         this.baseUri = baseUri;
@@ -85,6 +87,33 @@
 
             requestContext.setWriter(new NettyResponseWriter(ctx, req, container));
 
+            long contentLength = req.headers().contains(HttpHeaderNames.CONTENT_LENGTH) ? HttpUtil.getContentLength(req)
+                    : -1L;
+            if (contentLength >= MAX_REQUEST_ENTITY_BYTES) {
+                requestContext.abortWith(javax.ws.rs.core.Response.status(Status.REQUEST_ENTITY_TOO_LARGE).build());
+            } else {
+                /**
+                 * Jackson JSON decoder tries to read a minimum of 2 bytes (4
+                 * for BOM). So, during an empty or 1-byte input, we'd want to
+                 * avoid reading the entity to safely handle this edge case by
+                 * eventually throwing a malformed JSON exception.
+                 */
+                String contentType = req.headers().get(HttpHeaderNames.CONTENT_TYPE);
+                boolean isJson = contentType != null ? contentType.toLowerCase().contains(MediaType.APPLICATION_JSON)
+                        : false;
+                //process entity streams only if there is an entity issued in the request (i.e., content-length >=0).
+                //Otherwise, it's safe to discard during next processing
+                if ((!isJson && contentLength != -1) || HttpUtil.isTransferEncodingChunked(req)
+                        || (isJson && contentLength >= 2)) {
+                    requestContext.setEntityStream(new NettyInputStream(isList));
+                }
+            }
+
+            // copying headers from netty request to jersey container request context.
+            for (String name : req.headers().names()) {
+                requestContext.headers(name, req.headers().getAll(name));
+            }
+
             // must be like this, since there is a blocking read from Jersey
             container.getExecutorService().execute(new Runnable() {
                 @Override
@@ -95,18 +124,17 @@
         }
 
         if (msg instanceof HttpContent) {
-            HttpContent httpContent = (HttpContent) msg;
+          HttpContent httpContent = (HttpContent) msg;
 
-            ByteBuf content = httpContent.content();
+          ByteBuf content = httpContent.content();
+          if (content.isReadable()) {
+              isList.add(content);
+          }
 
-            if (content.isReadable()) {
-                isList.add(new ByteBufInputStream(content, true));
-            }
-
-            if (msg instanceof LastHttpContent) {
-                isList.add(NettyInputStream.END_OF_INPUT);
-            }
-        }
+          if (msg instanceof LastHttpContent) {
+              isList.add(Unpooled.EMPTY_BUFFER);
+          }
+      }
     }
 
     /**
@@ -148,35 +176,10 @@
                     }
                 }, resourceConfig);
 
-        // request entity handling.
-        if ((req.headers().contains(HttpHeaderNames.CONTENT_LENGTH) && HttpUtil.getContentLength(req) > 0)
-                || HttpUtil.isTransferEncodingChunked(req)) {
-
-            ctx.channel().closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
-                @Override
-                public void operationComplete(Future<? super Void> future) throws Exception {
-                    isList.add(NettyInputStream.END_OF_INPUT_ERROR);
-                }
-            });
-
-            requestContext.setEntityStream(new NettyInputStream(isList));
-        } else {
-            requestContext.setEntityStream(new InputStream() {
-                @Override
-                public int read() throws IOException {
-                    return -1;
-                }
-            });
-        }
-
-        // copying headers from netty request to jersey container request context.
-        for (String name : req.headers().names()) {
-            requestContext.headers(name, req.headers().getAll(name));
-        }
-
         return requestContext;
     }
 
+
     private NettySecurityContext getSecurityContext(ChannelHandlerContext ctx) {
         return new NettySecurityContext(ctx);
     }
diff --git a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java
index 00998c3..e2a4931 100644
--- a/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java
+++ b/containers/netty-http/src/main/java/org/glassfish/jersey/netty/httpserver/NettyResponseWriter.java
@@ -46,7 +46,7 @@
 /**
  * Netty implementation of {@link ContainerResponseWriter}.
  *
- * @author Pavel Bucek
+ * @author Pavel Bucek (pavel.bucek at oracle.com)
  */
 class NettyResponseWriter implements ContainerResponseWriter {
 
@@ -119,7 +119,11 @@
 
             JerseyChunkedInput jerseyChunkedInput = new JerseyChunkedInput(ctx.channel());
 
-            ctx.write(new HttpChunkedInput(jerseyChunkedInput)).addListener(FLUSH_FUTURE);
+            if (HttpUtil.isTransferEncodingChunked(response)) {
+                ctx.writeAndFlush(new HttpChunkedInput(jerseyChunkedInput));
+            } else {
+                ctx.write(new HttpChunkedInput(jerseyChunkedInput)).addListener(FLUSH_FUTURE);
+            }
             return jerseyChunkedInput;
 
         } else {