blob: 08319639e52d0881495a855630e92d79fc5034cf [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.server;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritePendingException;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletOutputStream;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.WriteListener;
import org.eclipse.jetty.http.HttpContent;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.IteratingNestedCallback;
import org.eclipse.jetty.util.SharedBlockingCallback;
import org.eclipse.jetty.util.SharedBlockingCallback.Blocker;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>{@link HttpOutput} implements {@link ServletOutputStream}
* as required by the Servlet specification.</p>
* <p>{@link HttpOutput} buffers content written by the application until a
* further write will overflow the buffer, at which point it triggers a commit
* of the response.</p>
* <p>{@link HttpOutput} can be closed and reopened, to allow requests included
* via {@link RequestDispatcher#include(ServletRequest, ServletResponse)} to
* close the stream, to be reopened after the inclusion ends.</p>
*/
public class HttpOutput extends ServletOutputStream implements Runnable
{
/**
* The HttpOutput.Interceptor is a single intercept point for all
* output written to the HttpOutput: via writer; via output stream;
* asynchronously; or blocking.
* <p>
* The Interceptor can be used to implement translations (eg Gzip) or
* additional buffering that acts on all output. Interceptors are
* created in a chain, so that multiple concerns may intercept.
* <p>
* The {@link HttpChannel} is an {@link Interceptor} and is always the
* last link in any Interceptor chain.
* <p>
* Responses are committed by the first call to
* {@link #write(ByteBuffer, boolean, Callback)}
* and closed by a call to {@link #write(ByteBuffer, boolean, Callback)}
* with the last boolean set true. If no content is available to commit
* or close, then a null buffer is passed.
*/
public interface Interceptor
{
/**
* Write content.
* The response is committed by the first call to write and is closed by
* a call with last == true. Empty content buffers may be passed to
* force a commit or close.
*
* @param content The content to be written or an empty buffer.
* @param last True if this is the last call to write
* @param callback The callback to use to indicate {@link Callback#succeeded()}
* or {@link Callback#failed(Throwable)}.
*/
void write(ByteBuffer content, boolean last, Callback callback);
/**
* @return The next Interceptor in the chain or null if this is the
* last Interceptor in the chain.
*/
Interceptor getNextInterceptor();
/**
* @return True if the Interceptor is optimized to receive direct
* {@link ByteBuffer}s in the {@link #write(ByteBuffer, boolean, Callback)}
* method. If false is returned, then passing direct buffers may cause
* inefficiencies.
*/
boolean isOptimizedForDirectBuffers();
/**
* Reset the buffers.
* <p>If the Interceptor contains buffers then reset them.
*
* @throws IllegalStateException Thrown if the response has been
* committed and buffers and/or headers cannot be reset.
*/
default void resetBuffer() throws IllegalStateException
{
Interceptor next = getNextInterceptor();
if (next != null)
next.resetBuffer();
}
}
private static Logger LOG = Log.getLogger(HttpOutput.class);
private final HttpChannel _channel;
private final SharedBlockingCallback _writeBlocker;
private Interceptor _interceptor;
/**
* Bytes written via the write API (excludes bytes written via sendContent). Used to autocommit once content length is written.
*/
private long _written;
private ByteBuffer _aggregate;
private int _bufferSize;
private int _commitSize;
private WriteListener _writeListener;
private volatile Throwable _onError;
/*
ACTION OPEN ASYNC READY PENDING UNREADY CLOSED
-------------------------------------------------------------------------------------------
setWriteListener() READY->owp ise ise ise ise ise
write() OPEN ise PENDING wpe wpe eof
flush() OPEN ise PENDING wpe wpe eof
close() CLOSED CLOSED CLOSED CLOSED wpe CLOSED
isReady() OPEN:true READY:true READY:true UNREADY:false UNREADY:false CLOSED:true
write completed - - - ASYNC READY->owp -
*/
private enum OutputState
{
OPEN, ASYNC, READY, PENDING, UNREADY, ERROR, CLOSED
}
private final AtomicReference<OutputState> _state = new AtomicReference<>(OutputState.OPEN);
public HttpOutput(HttpChannel channel)
{
_channel = channel;
_interceptor = channel;
_writeBlocker = new WriteBlocker(channel);
HttpConfiguration config = channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
{
LOG.warn("OutputAggregationSize {} exceeds bufferSize {}", _commitSize, _bufferSize);
_commitSize = _bufferSize;
}
}
public HttpChannel getHttpChannel()
{
return _channel;
}
public Interceptor getInterceptor()
{
return _interceptor;
}
public void setInterceptor(Interceptor filter)
{
_interceptor = filter;
}
public boolean isWritten()
{
return _written > 0;
}
public long getWritten()
{
return _written;
}
public void reopen()
{
_state.set(OutputState.OPEN);
}
private boolean isLastContentToWrite(int len)
{
_written += len;
return _channel.getResponse().isAllContentWritten(_written);
}
public boolean isAllContentWritten()
{
return _channel.getResponse().isAllContentWritten(_written);
}
protected Blocker acquireWriteBlockingCallback() throws IOException
{
return _writeBlocker.acquire();
}
private void write(ByteBuffer content, boolean complete) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
write(content, complete, blocker);
blocker.block();
}
catch (Exception failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
if (failure instanceof IOException)
throw failure;
throw new IOException(failure);
}
}
protected void write(ByteBuffer content, boolean complete, Callback callback)
{
_interceptor.write(content, complete, callback);
}
private void abort(Throwable failure)
{
closed();
_channel.abort(failure);
}
@Override
public void close()
{
while (true)
{
OutputState state = _state.get();
switch (state)
{
case CLOSED:
{
return;
}
case UNREADY:
{
if (_state.compareAndSet(state, OutputState.ERROR))
_writeListener.onError(_onError == null ? new EofException("Async close") : _onError);
break;
}
default:
{
if (!_state.compareAndSet(state, OutputState.CLOSED))
break;
try
{
write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, !_channel.getResponse().isIncluding());
}
catch (IOException x)
{
// Ignore it, it's been already logged in write().
}
finally
{
releaseBuffer();
}
// Return even if an exception is thrown by write().
return;
}
}
}
}
/**
* Called to indicate that the last write has been performed.
* It updates the state and performs cleanup operations.
*/
void closed()
{
while (true)
{
OutputState state = _state.get();
switch (state)
{
case CLOSED:
{
return;
}
case UNREADY:
{
if (_state.compareAndSet(state, OutputState.ERROR))
_writeListener.onError(_onError == null ? new EofException("Async closed") : _onError);
break;
}
default:
{
if (!_state.compareAndSet(state, OutputState.CLOSED))
break;
try
{
_channel.getResponse().closeOutput();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug(x);
abort(x);
}
finally
{
releaseBuffer();
}
// Return even if an exception is thrown by closeOutput().
return;
}
}
}
}
private void releaseBuffer()
{
if (_aggregate != null)
{
_channel.getConnector().getByteBufferPool().release(_aggregate);
_aggregate = null;
}
}
public boolean isClosed()
{
return _state.get() == OutputState.CLOSED;
}
@Override
public void flush() throws IOException
{
while (true)
{
switch (_state.get())
{
case OPEN:
write(BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER, false);
return;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
new AsyncFlush().iterate();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
return;
default:
throw new IllegalStateException();
}
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException
{
// Async or Blocking ?
while (true)
{
switch (_state.get())
{
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
// Should we aggregate?
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
if (filled == len && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException();
return;
}
// adjust offset/length
off += filled;
len -= filled;
}
// Do the asynchronous writing from the callback
new AsyncWrite(b, off, len, last).iterate();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
// handle blocking write
// Should we aggregate?
int capacity = getBufferSize();
boolean last = isLastContentToWrite(len);
if (!last && len <= _commitSize)
{
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(capacity, _interceptor.isOptimizedForDirectBuffers());
// YES - fill the aggregate with content from the buffer
int filled = BufferUtil.fill(_aggregate, b, off, len);
// return if we are not complete, not full and filled all the content
if (filled == len && !BufferUtil.isFull(_aggregate))
return;
// adjust offset/length
off += filled;
len -= filled;
}
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
write(_aggregate, last && len == 0);
// should we fill aggregate again from the buffer?
if (len > 0 && !last && len <= _commitSize && len <= BufferUtil.space(_aggregate))
{
BufferUtil.append(_aggregate, b, off, len);
return;
}
}
// write any remaining content in the buffer directly
if (len > 0)
{
// write a buffer capacity at a time to avoid JVM pooling large direct buffers
// http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6210541
ByteBuffer view = ByteBuffer.wrap(b, off, len);
while (len > getBufferSize())
{
int p = view.position();
int l = p + getBufferSize();
view.limit(p + getBufferSize());
write(view, false);
len -= getBufferSize();
view.limit(l + Math.min(len, getBufferSize()));
view.position(l);
}
write(view, last);
}
else if (last)
{
write(BufferUtil.EMPTY_BUFFER, true);
}
if (last)
closed();
}
public void write(ByteBuffer buffer) throws IOException
{
// Async or Blocking ?
while (true)
{
switch (_state.get())
{
case OPEN:
// process blocking below
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
// Do the asynchronous writing from the callback
boolean last = isLastContentToWrite(buffer.remaining());
new AsyncWrite(buffer, last).iterate();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
// handle blocking write
int len = BufferUtil.length(buffer);
boolean last = isLastContentToWrite(len);
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
write(_aggregate, last && len == 0);
// write any remaining content in the buffer directly
if (len > 0)
write(buffer, last);
else if (last)
write(BufferUtil.EMPTY_BUFFER, true);
if (last)
closed();
}
@Override
public void write(int b) throws IOException
{
_written += 1;
boolean complete = _channel.getResponse().isAllContentWritten(_written);
// Async or Blocking ?
while (true)
{
switch (_state.get())
{
case OPEN:
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
if (complete || BufferUtil.isFull(_aggregate))
{
write(_aggregate, complete);
if (complete)
closed();
}
break;
case ASYNC:
throw new IllegalStateException("isReady() not called");
case READY:
if (!_state.compareAndSet(OutputState.READY, OutputState.PENDING))
continue;
if (_aggregate == null)
_aggregate = _channel.getByteBufferPool().acquire(getBufferSize(), _interceptor.isOptimizedForDirectBuffers());
BufferUtil.append(_aggregate, (byte)b);
// Check if all written or full
if (!complete && !BufferUtil.isFull(_aggregate))
{
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
throw new IllegalStateException();
return;
}
// Do the asynchronous writing from the callback
new AsyncFlush().iterate();
return;
case PENDING:
case UNREADY:
throw new WritePendingException();
case ERROR:
throw new EofException(_onError);
case CLOSED:
throw new EofException("Closed");
default:
throw new IllegalStateException();
}
break;
}
}
@Override
public void print(String s) throws IOException
{
if (isClosed())
throw new IOException("Closed");
write(s.getBytes(_channel.getResponse().getCharacterEncoding()));
}
/**
* Blocking send of whole content.
*
* @param content The whole content to send
* @throws IOException if the send fails
*/
public void sendContent(ByteBuffer content) throws IOException
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent({})", BufferUtil.toDetailString(content));
write(content, true);
closed();
}
/**
* Blocking send of stream content.
*
* @param in The stream content to send
* @throws IOException if the send fails
*/
public void sendContent(InputStream in) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
new InputStreamWritingCB(in, blocker).iterate();
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}
/**
* Blocking send of channel content.
*
* @param in The channel content to send
* @throws IOException if the send fails
*/
public void sendContent(ReadableByteChannel in) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
new ReadableByteChannelWritingCB(in, blocker).iterate();
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}
/**
* Blocking send of HTTP content.
*
* @param content The HTTP content to send
* @throws IOException if the send fails
*/
public void sendContent(HttpContent content) throws IOException
{
try (Blocker blocker = _writeBlocker.acquire())
{
sendContent(content, blocker);
blocker.block();
}
catch (Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug(failure);
abort(failure);
throw failure;
}
}
/**
* Asynchronous send of whole content.
*
* @param content The whole content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(ByteBuffer content, final Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(buffer={},{})", BufferUtil.toDetailString(content), callback);
write(content, true, new Callback.Nested(callback)
{
@Override
public void succeeded()
{
closed();
super.succeeded();
}
@Override
public void failed(Throwable x)
{
abort(x);
super.failed(x);
}
});
}
/**
* Asynchronous send of stream content.
* The stream will be closed after reading all content.
*
* @param in The stream content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(InputStream in, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(stream={},{})", in, callback);
new InputStreamWritingCB(in, callback).iterate();
}
/**
* Asynchronous send of channel content.
* The channel will be closed after reading all content.
*
* @param in The channel content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(ReadableByteChannel in, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(channel={},{})", in, callback);
new ReadableByteChannelWritingCB(in, callback).iterate();
}
/**
* Asynchronous send of HTTP content.
*
* @param httpContent The HTTP content to send
* @param callback The callback to use to notify success or failure
*/
public void sendContent(HttpContent httpContent, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("sendContent(http={},{})", httpContent, callback);
if (BufferUtil.hasContent(_aggregate))
{
callback.failed(new IOException("cannot sendContent() after write()"));
return;
}
if (_channel.isCommitted())
{
callback.failed(new IOException("cannot sendContent(), output already committed"));
return;
}
while (true)
{
switch (_state.get())
{
case OPEN:
if (!_state.compareAndSet(OutputState.OPEN, OutputState.PENDING))
continue;
break;
case ERROR:
callback.failed(new EofException(_onError));
return;
case CLOSED:
callback.failed(new EofException("Closed"));
return;
default:
throw new IllegalStateException();
}
break;
}
ByteBuffer buffer = _channel.useDirectBuffers() ? httpContent.getDirectBuffer() : null;
if (buffer == null)
buffer = httpContent.getIndirectBuffer();
if (buffer != null)
{
sendContent(buffer, callback);
return;
}
try
{
ReadableByteChannel rbc = httpContent.getReadableByteChannel();
if (rbc != null)
{
// Close of the rbc is done by the async sendContent
sendContent(rbc, callback);
return;
}
InputStream in = httpContent.getInputStream();
if (in != null)
{
sendContent(in, callback);
return;
}
throw new IllegalArgumentException("unknown content for " + httpContent);
}
catch (Throwable th)
{
abort(th);
callback.failed(th);
}
}
public int getBufferSize()
{
return _bufferSize;
}
public void setBufferSize(int size)
{
_bufferSize = size;
_commitSize = size;
}
public void recycle()
{
_interceptor = _channel;
HttpConfiguration config = _channel.getHttpConfiguration();
_bufferSize = config.getOutputBufferSize();
_commitSize = config.getOutputAggregationSize();
if (_commitSize > _bufferSize)
_commitSize = _bufferSize;
releaseBuffer();
_written = 0;
_writeListener = null;
_onError = null;
reopen();
}
public void resetBuffer()
{
_interceptor.resetBuffer();
if (BufferUtil.hasContent(_aggregate))
BufferUtil.clear(_aggregate);
_written = 0;
reopen();
}
@Override
public void setWriteListener(WriteListener writeListener)
{
if (!_channel.getState().isAsync())
throw new IllegalStateException("!ASYNC");
if (_state.compareAndSet(OutputState.OPEN, OutputState.READY))
{
_writeListener = writeListener;
if (_channel.getState().onWritePossible())
_channel.execute(_channel);
}
else
throw new IllegalStateException();
}
/**
* @see javax.servlet.ServletOutputStream#isReady()
*/
@Override
public boolean isReady()
{
while (true)
{
switch (_state.get())
{
case OPEN:
return true;
case ASYNC:
if (!_state.compareAndSet(OutputState.ASYNC, OutputState.READY))
continue;
return true;
case READY:
return true;
case PENDING:
if (!_state.compareAndSet(OutputState.PENDING, OutputState.UNREADY))
continue;
return false;
case UNREADY:
return false;
case ERROR:
return true;
case CLOSED:
return true;
default:
throw new IllegalStateException();
}
}
}
@Override
public void run()
{
loop:
while (true)
{
OutputState state = _state.get();
if (_onError != null)
{
switch (state)
{
case CLOSED:
case ERROR:
{
_onError = null;
break loop;
}
default:
{
if (_state.compareAndSet(state, OutputState.ERROR))
{
Throwable th = _onError;
_onError = null;
if (LOG.isDebugEnabled())
LOG.debug("onError", th);
_writeListener.onError(th);
close();
break loop;
}
}
}
continue;
}
switch (_state.get())
{
case ASYNC:
case READY:
case PENDING:
case UNREADY:
// Even though a write is not possible, because a close has
// occurred, we need to call onWritePossible to tell async
// producer that the last write completed, so fall through.
case CLOSED:
try
{
_writeListener.onWritePossible();
break loop;
}
catch (Throwable e)
{
_onError = e;
}
break;
default:
_onError = new IllegalStateException("state=" + _state.get());
}
}
}
private void close(Closeable resource)
{
try
{
resource.close();
}
catch (Throwable x)
{
LOG.ignore(x);
}
}
@Override
public String toString()
{
return String.format("%s@%x{%s}", this.getClass().getSimpleName(), hashCode(), _state.get());
}
private abstract class AsyncICB extends IteratingCallback
{
final boolean _last;
AsyncICB(boolean last)
{
_last = last;
}
@Override
protected void onCompleteSuccess()
{
while (true)
{
OutputState last = _state.get();
switch (last)
{
case PENDING:
if (!_state.compareAndSet(OutputState.PENDING, OutputState.ASYNC))
continue;
break;
case UNREADY:
if (!_state.compareAndSet(OutputState.UNREADY, OutputState.READY))
continue;
if (_last)
closed();
if (_channel.getState().onWritePossible())
_channel.execute(_channel);
break;
case CLOSED:
break;
default:
throw new IllegalStateException();
}
break;
}
}
@Override
public void onCompleteFailure(Throwable e)
{
_onError = e == null ? new IOException() : e;
if (_channel.getState().onWritePossible())
_channel.execute(_channel);
}
}
private class AsyncFlush extends AsyncICB
{
protected volatile boolean _flushed;
public AsyncFlush()
{
super(false);
}
@Override
protected Action process()
{
if (BufferUtil.hasContent(_aggregate))
{
_flushed = true;
write(_aggregate, false, this);
return Action.SCHEDULED;
}
if (!_flushed)
{
_flushed = true;
write(BufferUtil.EMPTY_BUFFER, false, this);
return Action.SCHEDULED;
}
return Action.SUCCEEDED;
}
}
private class AsyncWrite extends AsyncICB
{
private final ByteBuffer _buffer;
private final ByteBuffer _slice;
private final int _len;
protected volatile boolean _completed;
public AsyncWrite(byte[] b, int off, int len, boolean last)
{
super(last);
_buffer = ByteBuffer.wrap(b, off, len);
_len = len;
// always use a view for large byte arrays to avoid JVM pooling large direct buffers
_slice = _len < getBufferSize() ? null : _buffer.duplicate();
}
public AsyncWrite(ByteBuffer buffer, boolean last)
{
super(last);
_buffer = buffer;
_len = buffer.remaining();
// Use a slice buffer for large indirect to avoid JVM pooling large direct buffers
if (_buffer.isDirect() || _len < getBufferSize())
_slice = null;
else
{
_slice = _buffer.duplicate();
}
}
@Override
protected Action process()
{
// flush any content from the aggregate
if (BufferUtil.hasContent(_aggregate))
{
_completed = _len == 0;
write(_aggregate, _last && _completed, this);
return Action.SCHEDULED;
}
// Can we just aggregate the remainder?
if (!_last && _len < BufferUtil.space(_aggregate) && _len < _commitSize)
{
int position = BufferUtil.flipToFill(_aggregate);
BufferUtil.put(_buffer, _aggregate);
BufferUtil.flipToFlush(_aggregate, position);
return Action.SUCCEEDED;
}
// Is there data left to write?
if (_buffer.hasRemaining())
{
// if there is no slice, just write it
if (_slice == null)
{
_completed = true;
write(_buffer, _last, this);
return Action.SCHEDULED;
}
// otherwise take a slice
int p = _buffer.position();
int l = Math.min(getBufferSize(), _buffer.remaining());
int pl = p + l;
_slice.limit(pl);
_buffer.position(pl);
_slice.position(p);
_completed = !_buffer.hasRemaining();
write(_slice, _last && _completed, this);
return Action.SCHEDULED;
}
// all content written, but if we have not yet signal completion, we
// need to do so
if (_last && !_completed)
{
_completed = true;
write(BufferUtil.EMPTY_BUFFER, true, this);
return Action.SCHEDULED;
}
if (LOG.isDebugEnabled() && _completed)
LOG.debug("EOF of {}", this);
return Action.SUCCEEDED;
}
}
/**
* An iterating callback that will take content from an
* InputStream and write it to the associated {@link HttpChannel}.
* A non direct buffer of size {@link HttpOutput#getBufferSize()} is used.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
*/
private class InputStreamWritingCB extends IteratingNestedCallback
{
private final InputStream _in;
private final ByteBuffer _buffer;
private boolean _eof;
public InputStreamWritingCB(InputStream in, Callback callback)
{
super(callback);
_in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), false);
}
@Override
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
if (_eof)
{
if (LOG.isDebugEnabled())
LOG.debug("EOF of {}", this);
// Handle EOF
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Action.SUCCEEDED;
}
// Read until buffer full or EOF
int len = 0;
while (len < _buffer.capacity() && !_eof)
{
int r = _in.read(_buffer.array(), _buffer.arrayOffset() + len, _buffer.capacity() - len);
if (r < 0)
_eof = true;
else
len += r;
}
// write what we have
_buffer.position(0);
_buffer.limit(len);
write(_buffer, _eof, this);
return Action.SCHEDULED;
}
@Override
public void onCompleteFailure(Throwable x)
{
abort(x);
_channel.getByteBufferPool().release(_buffer);
HttpOutput.this.close(_in);
super.onCompleteFailure(x);
}
}
/**
* An iterating callback that will take content from a
* ReadableByteChannel and write it to the {@link HttpChannel}.
* A {@link ByteBuffer} of size {@link HttpOutput#getBufferSize()} is used that will be direct if
* {@link HttpChannel#useDirectBuffers()} is true.
* This callback is passed to the {@link HttpChannel#write(ByteBuffer, boolean, Callback)} to
* be notified as each buffer is written and only once all the input is consumed will the
* wrapped {@link Callback#succeeded()} method be called.
*/
private class ReadableByteChannelWritingCB extends IteratingNestedCallback
{
private final ReadableByteChannel _in;
private final ByteBuffer _buffer;
private boolean _eof;
public ReadableByteChannelWritingCB(ReadableByteChannel in, Callback callback)
{
super(callback);
_in = in;
_buffer = _channel.getByteBufferPool().acquire(getBufferSize(), _channel.useDirectBuffers());
}
@Override
protected Action process() throws Exception
{
// Only return if EOF has previously been read and thus
// a write done with EOF=true
if (_eof)
{
if (LOG.isDebugEnabled())
LOG.debug("EOF of {}", this);
_in.close();
closed();
_channel.getByteBufferPool().release(_buffer);
return Action.SUCCEEDED;
}
// Read from stream until buffer full or EOF
BufferUtil.clearToFill(_buffer);
while (_buffer.hasRemaining() && !_eof)
_eof = (_in.read(_buffer)) < 0;
// write what we have
BufferUtil.flipToFlush(_buffer, 0);
write(_buffer, _eof, this);
return Action.SCHEDULED;
}
@Override
public void onCompleteFailure(Throwable x)
{
abort(x);
_channel.getByteBufferPool().release(_buffer);
HttpOutput.this.close(_in);
super.onCompleteFailure(x);
}
}
private static class WriteBlocker extends SharedBlockingCallback
{
private final HttpChannel _channel;
private WriteBlocker(HttpChannel channel)
{
_channel = channel;
}
@Override
protected long getIdleTimeout()
{
long blockingTimeout = _channel.getHttpConfiguration().getBlockingTimeout();
if (blockingTimeout == 0)
return _channel.getIdleTimeout();
return blockingTimeout;
}
}
}