blob: 873e70d1845682930aa5ef70a0920db979b07811 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.ArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.OpCode;
import org.eclipse.jetty.websocket.common.frames.BinaryFrame;
/**
* Interface for working with bytes destined for {@link EndPoint#write(Callback, ByteBuffer...)}
*/
public class FrameFlusher
{
private class Flusher extends IteratingCallback
{
private final List<FrameEntry> entries;
private final List<ByteBuffer> buffers;
private ByteBuffer aggregate;
private BatchMode batchMode;
public Flusher(int maxGather)
{
entries = new ArrayList<>(maxGather);
buffers = new ArrayList<>((maxGather * 2) + 1);
}
private Action batch()
{
if (aggregate == null)
{
aggregate = bufferPool.acquire(bufferSize,true);
if (LOG.isDebugEnabled())
{
LOG.debug("{} acquired aggregate buffer {}",FrameFlusher.this,aggregate);
}
}
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
entry.generateHeaderBytes(aggregate);
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
{
BufferUtil.append(aggregate,payload);
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} aggregated {} frames: {}",FrameFlusher.this,entries.size(),entries);
}
succeeded();
return Action.SCHEDULED;
}
@Override
protected void onCompleteSuccess()
{
// This IteratingCallback never completes.
}
@Override
public void onCompleteFailure(Throwable x)
{
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback,x);
entry.release();
}
entries.clear();
failure = x;
onFailure(x);
}
private Action flush()
{
if (!BufferUtil.isEmpty(aggregate))
{
buffers.add(aggregate);
if (LOG.isDebugEnabled())
{
LOG.debug("{} flushing aggregate {}",FrameFlusher.this,aggregate);
}
}
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
// Skip the "synthetic" frame used for flushing.
if (entry.frame == FLUSH_FRAME)
{
continue;
}
buffers.add(entry.generateHeaderBytes());
ByteBuffer payload = entry.frame.getPayload();
if (BufferUtil.hasContent(payload))
{
buffers.add(payload);
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} flushing {} frames: {}",FrameFlusher.this,entries.size(),entries);
}
if (buffers.isEmpty())
{
releaseAggregate();
// We may have the FLUSH_FRAME to notify.
succeedEntries();
return Action.IDLE;
}
endpoint.write(this,buffers.toArray(new ByteBuffer[buffers.size()]));
buffers.clear();
return Action.SCHEDULED;
}
@Override
protected Action process() throws Exception
{
int space = aggregate == null?bufferSize:BufferUtil.space(aggregate);
BatchMode currentBatchMode = BatchMode.AUTO;
synchronized (lock)
{
while ((entries.size() <= maxGather) && !queue.isEmpty())
{
FrameEntry entry = queue.remove(0);
currentBatchMode = BatchMode.max(currentBatchMode,entry.batchMode);
// Force flush if we need to.
if (entry.frame == FLUSH_FRAME)
{
currentBatchMode = BatchMode.OFF;
}
int payloadLength = BufferUtil.length(entry.frame.getPayload());
int approxFrameLength = Generator.MAX_HEADER_LENGTH + payloadLength;
// If it is a "big" frame, avoid copying into the aggregate buffer.
if (approxFrameLength > (bufferSize >> 2))
{
currentBatchMode = BatchMode.OFF;
}
// If the aggregate buffer overflows, do not batch.
space -= approxFrameLength;
if (space <= 0)
{
currentBatchMode = BatchMode.OFF;
}
entries.add(entry);
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} processing {} entries: {}",FrameFlusher.this,entries.size(),entries);
}
if (entries.isEmpty())
{
if (batchMode != BatchMode.AUTO)
{
// Nothing more to do, release the aggregate buffer if we need to.
// Releasing it here rather than in succeeded() allows for its reuse.
releaseAggregate();
return Action.IDLE;
}
LOG.debug("{} auto flushing",FrameFlusher.this);
return flush();
}
batchMode = currentBatchMode;
return currentBatchMode == BatchMode.OFF?flush():batch();
}
private void releaseAggregate()
{
if ((aggregate != null) && BufferUtil.isEmpty(aggregate))
{
bufferPool.release(aggregate);
aggregate = null;
}
}
@Override
public void succeeded()
{
succeedEntries();
super.succeeded();
}
private void succeedEntries()
{
// Do not allocate the iterator here.
for (int i = 0; i < entries.size(); ++i)
{
FrameEntry entry = entries.get(i);
notifyCallbackSuccess(entry.callback);
entry.release();
}
entries.clear();
}
}
private class FrameEntry
{
private final Frame frame;
private final WriteCallback callback;
private final BatchMode batchMode;
private ByteBuffer headerBuffer;
private FrameEntry(Frame frame, WriteCallback callback, BatchMode batchMode)
{
this.frame = Objects.requireNonNull(frame);
this.callback = callback;
this.batchMode = batchMode;
}
private ByteBuffer generateHeaderBytes()
{
return headerBuffer = generator.generateHeaderBytes(frame);
}
private void generateHeaderBytes(ByteBuffer buffer)
{
generator.generateHeaderBytes(frame,buffer);
}
private void release()
{
if (headerBuffer != null)
{
generator.getBufferPool().release(headerBuffer);
headerBuffer = null;
}
}
@Override
public String toString()
{
return String.format("%s[%s,%s,%s,%s]",getClass().getSimpleName(),frame,callback,batchMode,failure);
}
}
public static final BinaryFrame FLUSH_FRAME = new BinaryFrame();
private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private final ByteBufferPool bufferPool;
private final EndPoint endpoint;
private final int bufferSize;
private final Generator generator;
private final int maxGather;
private final Object lock = new Object();
private final ArrayQueue<FrameEntry> queue = new ArrayQueue<>(16,16,lock);
private final Flusher flusher;
private final AtomicBoolean closed = new AtomicBoolean();
private volatile Throwable failure;
public FrameFlusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint, int bufferSize, int maxGather)
{
this.bufferPool = bufferPool;
this.endpoint = endpoint;
this.bufferSize = bufferSize;
this.generator = Objects.requireNonNull(generator);
this.maxGather = maxGather;
this.flusher = new Flusher(maxGather);
}
public void close()
{
if (closed.compareAndSet(false,true))
{
LOG.debug("{} closing {}",this);
EOFException eof = new EOFException("Connection has been closed locally");
flusher.failed(eof);
// Fail also queued entries.
List<FrameEntry> entries = new ArrayList<>();
synchronized (lock)
{
entries.addAll(queue);
queue.clear();
}
// Notify outside sync block.
for (FrameEntry entry : entries)
{
notifyCallbackFailure(entry.callback,eof);
}
}
}
public void enqueue(Frame frame, WriteCallback callback, BatchMode batchMode)
{
if (closed.get())
{
notifyCallbackFailure(callback,new EOFException("Connection has been closed locally"));
return;
}
if (flusher.isFailed())
{
notifyCallbackFailure(callback,failure);
return;
}
FrameEntry entry = new FrameEntry(frame,callback,batchMode);
synchronized (lock)
{
switch (frame.getOpCode())
{
case OpCode.PING:
{
// Prepend PINGs so they are processed first.
queue.add(0,entry);
break;
}
case OpCode.CLOSE:
{
// There may be a chance that other frames are
// added after this close frame, but we will
// fail them later to keep it simple here.
closed.set(true);
queue.add(entry);
break;
}
default:
{
queue.add(entry);
break;
}
}
}
if (LOG.isDebugEnabled())
{
LOG.debug("{} queued {}",this,entry);
}
flusher.iterate();
}
protected void notifyCallbackFailure(WriteCallback callback, Throwable failure)
{
try
{
if (callback != null)
{
callback.writeFailed(failure);
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying failure of callback " + callback,x);
}
}
protected void notifyCallbackSuccess(WriteCallback callback)
{
try
{
if (callback != null)
{
callback.writeSuccess();
}
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Exception while notifying success of callback " + callback,x);
}
}
protected void onFailure(Throwable x)
{
LOG.warn(x);
}
@Override
public String toString()
{
ByteBuffer aggregate = flusher.aggregate;
return String.format("%s[queueSize=%d,aggregateSize=%d,failure=%s]",getClass().getSimpleName(),queue.size(),aggregate == null?0:aggregate.position(),
failure);
}
}