blob: 39f74fb6af6516080fece445d6fbb5536eaf2d64 [file] [log] [blame]
/*
* Copyright (c) 2010, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
package org.glassfish.jersey.message.internal;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.glassfish.jersey.internal.LocalizationMessages;
import org.glassfish.jersey.internal.guava.Preconditions;
/**
* A committing output stream with optional serialized entity buffering functionality
* which allows measuring of the entity size.
* <p>
* When buffering functionality is enabled the output stream buffers
* the written bytes into an internal buffer of a configurable size. After the last
* written byte the {@link #commit()} method is expected to be called to notify
* a {@link org.glassfish.jersey.message.internal.OutboundMessageContext.StreamProvider#getOutputStream(int) callback}
* with an actual measured entity size. If the entity is too large to
* fit into the internal buffer and the buffer exceeds before the {@link #commit()}
* is called then the stream is automatically committed and the callback is called
* with parameter {@code size} value of {@code -1}.
* </p>
* <p>
* Callback method also returns the output stream in which the output will be written. The committing output stream
* must be initialized with the callback using
* {@link #setStreamProvider(org.glassfish.jersey.message.internal.OutboundMessageContext.StreamProvider)}
* before first byte is written.
* </p>
* The buffering is by default disabled and can be enabled by calling {@link #enableBuffering()}
* or {@link #enableBuffering(int)} before writing the first byte into this output stream. The former
* method enables buffering with the default size
* <tt>{@value CommittingOutputStream#DEFAULT_BUFFER_SIZE}</tt> bytes specified in {@link #DEFAULT_BUFFER_SIZE}.
* </p>
*
* @author Paul Sandoz
* @author Marek Potociar
* @author Miroslav Fuksa
*/
public final class CommittingOutputStream extends OutputStream {
private static final Logger LOGGER = Logger.getLogger(CommittingOutputStream.class.getName());
/**
* Null stream provider.
*/
private static final OutboundMessageContext.StreamProvider NULL_STREAM_PROVIDER =
contentLength -> new NullOutputStream();
/**
* Default size of the buffer which will be used if no user defined size is specified.
*/
public static final int DEFAULT_BUFFER_SIZE = 8192;
/**
* Adapted output stream.
*/
private OutputStream adaptedOutput;
/**
* Buffering stream provider.
*/
private OutboundMessageContext.StreamProvider streamProvider;
/**
* Internal buffer size.
*/
private int bufferSize = 0;
/**
* Entity buffer.
*/
private ByteArrayOutputStream buffer;
/**
* When {@code true}, the data are written directly to output stream and not to the buffer.
*/
private boolean directWrite = true;
/**
* When {@code true}, the stream is already committed (redirected to adaptedOutput).
*/
private boolean isCommitted;
/**
* When {@code true}, the stream is already closed.
*/
private boolean isClosed;
private static final String STREAM_PROVIDER_NULL = LocalizationMessages.STREAM_PROVIDER_NULL();
private static final String COMMITTING_STREAM_BUFFERING_ILLEGAL_STATE = LocalizationMessages
.COMMITTING_STREAM_BUFFERING_ILLEGAL_STATE();
/**
* Creates new committing output stream. The returned stream instance still needs to be initialized before
* writing first bytes.
*/
public CommittingOutputStream() {
}
/**
* Set the buffering output stream provider. If the committing output stream works in buffering mode
* this method must be called before first bytes are written into this stream.
*
* @param streamProvider non-null stream provider callback.
*/
public void setStreamProvider(OutboundMessageContext.StreamProvider streamProvider) {
if (isClosed) {
throw new IllegalStateException(LocalizationMessages.OUTPUT_STREAM_CLOSED());
}
Objects.nonNull(streamProvider);
if (this.streamProvider != null) {
LOGGER.log(Level.WARNING, LocalizationMessages.COMMITTING_STREAM_ALREADY_INITIALIZED());
}
this.streamProvider = streamProvider;
}
/**
* Enable buffering of the serialized entity.
*
* @param bufferSize size of the buffer. When the value is less or equal to zero the buffering will be disabled and {@code -1}
* will be passed to the
* {@link org.glassfish.jersey.message.internal.OutboundMessageContext.StreamProvider#getOutputStream(int) callback}.
*/
public void enableBuffering(int bufferSize) {
Preconditions.checkState(!isCommitted && (this.buffer == null || this.buffer.size() == 0),
COMMITTING_STREAM_BUFFERING_ILLEGAL_STATE);
this.bufferSize = bufferSize;
if (bufferSize <= 0) {
this.directWrite = true;
this.buffer = null;
} else {
directWrite = false;
buffer = new ByteArrayOutputStream(bufferSize);
}
}
/**
* Enable buffering of the serialized entity with the {@link #DEFAULT_BUFFER_SIZE default buffer size }.
*/
void enableBuffering() {
enableBuffering(DEFAULT_BUFFER_SIZE);
}
/**
* Determine whether the stream was already committed or not.
*
* @return {@code true} if this stream was already committed, {@code false} otherwise.
*/
public boolean isCommitted() {
return isCommitted;
}
private void commitStream() throws IOException {
commitStream(-1);
}
private void commitStream(int currentSize) throws IOException {
if (!isCommitted) {
Preconditions.checkState(streamProvider != null, STREAM_PROVIDER_NULL);
adaptedOutput = streamProvider.getOutputStream(currentSize);
if (adaptedOutput == null) {
adaptedOutput = new NullOutputStream();
}
directWrite = true;
isCommitted = true;
}
}
@Override
public void write(byte b[]) throws IOException {
if (directWrite) {
commitStream();
adaptedOutput.write(b);
} else {
if (b.length + buffer.size() > bufferSize) {
flushBuffer(false);
adaptedOutput.write(b);
} else {
buffer.write(b);
}
}
}
@Override
public void write(byte b[], int off, int len) throws IOException {
if (directWrite) {
commitStream();
adaptedOutput.write(b, off, len);
} else {
if (len + buffer.size() > bufferSize) {
flushBuffer(false);
adaptedOutput.write(b, off, len);
} else {
buffer.write(b, off, len);
}
}
}
@Override
public void write(int b) throws IOException {
if (directWrite) {
commitStream();
adaptedOutput.write(b);
} else {
if (buffer.size() + 1 > bufferSize) {
flushBuffer(false);
adaptedOutput.write(b);
} else {
buffer.write(b);
}
}
}
/**
* Commit the output stream.
*
* @throws IOException when underlying stream returned from the callback method throws the io exception.
*/
public void commit() throws IOException {
flushBuffer(true);
commitStream();
}
@Override
public void close() throws IOException {
if (isClosed) {
return;
}
isClosed = true;
if (streamProvider == null) {
streamProvider = NULL_STREAM_PROVIDER;
}
commit();
adaptedOutput.close();
}
/**
* Check if the committing output stream has been closed already.
*
* @return {@code true} if the stream has been closed, {@code false} otherwise.
*/
public boolean isClosed() {
return isClosed;
}
@Override
public void flush() throws IOException {
if (isCommitted()) {
adaptedOutput.flush();
}
}
private void flushBuffer(boolean endOfStream) throws IOException {
if (!directWrite) {
int currentSize;
if (endOfStream) {
currentSize = buffer == null ? 0 : buffer.size();
} else {
currentSize = -1;
}
commitStream(currentSize);
if (buffer != null) {
buffer.writeTo(adaptedOutput);
}
}
}
}