blob: 783dcaa5875610843c62ffa413007c7f84ddc237 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class HTTP2Flusher extends IteratingCallback
{
private static final Logger LOG = Log.getLogger(HTTP2Flusher.class);
private final Queue<WindowEntry> windows = new ArrayDeque<>();
private final List<Entry> frames = new ArrayList<>();
private final Map<IStream, Integer> streams = new HashMap<>();
private final List<Entry> resets = new ArrayList<>();
private final List<Entry> actives = new ArrayList<>();
private final HTTP2Session session;
private final ByteBufferPool.Lease lease;
private Throwable terminated;
public HTTP2Flusher(HTTP2Session session)
{
this.session = session;
this.lease = new ByteBufferPool.Lease(session.getGenerator().getByteBufferPool());
}
public void window(IStream stream, WindowUpdateFrame frame)
{
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
windows.offer(new WindowEntry(stream, frame));
}
// Flush stalled data.
if (closed == null)
iterate();
}
public boolean prepend(Entry entry)
{
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
{
frames.add(0, entry);
if (LOG.isDebugEnabled())
LOG.debug("Prepended {}, frames={}", entry, frames.size());
}
}
if (closed == null)
return true;
closed(entry, closed);
return false;
}
public boolean append(Entry entry)
{
Throwable closed;
synchronized (this)
{
closed = terminated;
if (closed == null)
{
frames.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Appended {}, frames={}", entry, frames.size());
}
}
if (closed == null)
return true;
closed(entry, closed);
return false;
}
private Entry remove(int index)
{
synchronized (this)
{
return frames.remove(index);
}
}
public int getQueueSize()
{
synchronized (this)
{
return frames.size();
}
}
@Override
protected Action process() throws Throwable
{
if (LOG.isDebugEnabled())
LOG.debug("Flushing {}", session);
synchronized (this)
{
if (terminated != null)
throw terminated;
// First thing, update the window sizes, so we can
// reason about the frames to remove from the queue.
while (!windows.isEmpty())
{
WindowEntry entry = windows.poll();
entry.perform();
}
// Now the window sizes cannot change.
// Window updates that happen concurrently will
// be queued and processed on the next iteration.
int sessionWindow = session.getSendWindow();
int index = 0;
int size = frames.size();
while (index < size)
{
Entry entry = frames.get(index);
IStream stream = entry.stream;
// If the stream has been reset, don't send the frame.
if (stream != null && stream.isReset() && !entry.isProtocol())
{
remove(index);
--size;
resets.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered for reset {}", entry);
continue;
}
// Check if the frame fits in the flow control windows.
int remaining = entry.dataRemaining();
if (remaining > 0)
{
if (sessionWindow <= 0)
{
++index;
// There may be *non* flow controlled frames to send.
continue;
}
if (stream != null)
{
// The stream may have a smaller window than the session.
Integer streamWindow = streams.get(stream);
if (streamWindow == null)
{
streamWindow = stream.updateSendWindow(0);
streams.put(stream, streamWindow);
}
// Is it a frame belonging to an already stalled stream ?
if (streamWindow <= 0)
{
++index;
// There may be *non* flow controlled frames to send.
continue;
}
}
// The frame fits both flow control windows, reduce them.
sessionWindow -= remaining;
if (stream != null)
streams.put(stream, streams.get(stream) - remaining);
}
// The frame will be written, remove it from the queue.
remove(index);
--size;
actives.add(entry);
if (LOG.isDebugEnabled())
LOG.debug("Gathered for write {}", entry);
}
streams.clear();
}
// Perform resets outside the sync block.
for (int i = 0; i < resets.size(); ++i)
{
Entry entry = resets.get(i);
entry.reset();
}
resets.clear();
if (actives.isEmpty())
{
if (LOG.isDebugEnabled())
LOG.debug("Flushed {}", session);
return Action.IDLE;
}
for (int i = 0; i < actives.size(); ++i)
{
Entry entry = actives.get(i);
Throwable failure = entry.generate(lease);
if (failure != null)
{
// Failure to generate the entry is catastrophic.
failed(failure);
return Action.SUCCEEDED;
}
}
List<ByteBuffer> byteBuffers = lease.getByteBuffers();
if (LOG.isDebugEnabled())
LOG.debug("Writing {} buffers ({} bytes) for {} frames {}", byteBuffers.size(), lease.getTotalLength(), actives.size(), actives);
session.getEndPoint().write(this, byteBuffers.toArray(new ByteBuffer[byteBuffers.size()]));
return Action.SCHEDULED;
}
@Override
public void succeeded()
{
lease.recycle();
if (LOG.isDebugEnabled())
LOG.debug("Written {} frames for {}", actives.size(), actives);
actives.forEach(Entry::succeeded);
actives.clear();
super.succeeded();
}
@Override
protected void onCompleteSuccess()
{
throw new IllegalStateException();
}
@Override
protected void onCompleteFailure(Throwable x)
{
lease.recycle();
Throwable closed;
synchronized (this)
{
closed = terminated;
terminated = x;
if (LOG.isDebugEnabled())
LOG.debug("{}, active/queued={}/{}", closed != null ? "Closing" : "Failing", actives.size(), frames.size());
actives.addAll(frames);
frames.clear();
}
actives.forEach(entry -> entry.failed(x));
actives.clear();
// If the failure came from within the
// flusher, we need to close the connection.
if (closed == null)
session.abort(x);
}
void terminate(Throwable cause)
{
Throwable closed;
synchronized (this)
{
closed = terminated;
terminated = cause;
if (LOG.isDebugEnabled())
LOG.debug("{}", closed != null ? "Terminated" : "Terminating");
}
if (closed == null)
iterate();
}
private void closed(Entry entry, Throwable failure)
{
entry.failed(failure);
}
public static abstract class Entry extends Callback.Nested
{
protected final Frame frame;
protected final IStream stream;
protected Entry(Frame frame, IStream stream, Callback callback)
{
super(callback);
this.frame = frame;
this.stream = stream;
}
public int dataRemaining()
{
return 0;
}
public Throwable generate(ByteBufferPool.Lease lease)
{
return null;
}
public void reset()
{
failed(new EofException("reset"));
}
@Override
public void failed(Throwable x)
{
if (stream != null)
{
stream.close();
stream.getSession().removeStream(stream);
}
super.failed(x);
}
public boolean isProtocol()
{
switch (frame.getType())
{
case PRIORITY:
case RST_STREAM:
case GO_AWAY:
case WINDOW_UPDATE:
case DISCONNECT:
return true;
default:
return false;
}
}
@Override
public String toString()
{
return frame.toString();
}
}
private class WindowEntry
{
private final IStream stream;
private final WindowUpdateFrame frame;
public WindowEntry(IStream stream, WindowUpdateFrame frame)
{
this.stream = stream;
this.frame = frame;
}
public void perform()
{
FlowControlStrategy flowControl = session.getFlowControlStrategy();
flowControl.onWindowUpdate(session, stream, frame);
}
}
}