blob: e8f1299602170b61c74f532a8207225d2282787b [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.http2.server;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.MetaData;
import org.eclipse.jetty.http.MetaData.Request;
import org.eclipse.jetty.http2.ErrorCode;
import org.eclipse.jetty.http2.HTTP2Connection;
import org.eclipse.jetty.http2.ISession;
import org.eclipse.jetty.http2.IStream;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PrefaceFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.parser.ServerParser;
import org.eclipse.jetty.http2.parser.SettingsBodyParser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
public class HTTP2ServerConnection extends HTTP2Connection implements Connection.UpgradeTo
{
private final Queue<HttpChannelOverHTTP2> channels = new ArrayDeque<>();
private final List<Frame> upgradeFrames = new ArrayList<>();
private final AtomicLong totalRequests = new AtomicLong();
private final AtomicLong totalResponses = new AtomicLong();
private final ServerSessionListener listener;
private final HttpConfiguration httpConfig;
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ServerSessionListener listener)
{
this(byteBufferPool, executor, endPoint, httpConfig, parser, session, inputBufferSize, ExecutionStrategy.Factory.getDefault(), listener);
}
public HTTP2ServerConnection(ByteBufferPool byteBufferPool, Executor executor, EndPoint endPoint, HttpConfiguration httpConfig, ServerParser parser, ISession session, int inputBufferSize, ExecutionStrategy.Factory executionFactory, ServerSessionListener listener)
{
super(byteBufferPool, executor, endPoint, parser, session, inputBufferSize, executionFactory);
this.listener = listener;
this.httpConfig = httpConfig;
}
@Override
public int getMessagesIn()
{
return totalRequests.intValue();
}
@Override
public int getMessagesOut()
{
return totalResponses.intValue();
}
@Override
protected ServerParser getParser()
{
return (ServerParser)super.getParser();
}
@Override
public void onUpgradeTo(ByteBuffer buffer)
{
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 onUpgradeTo {} {}", this, BufferUtil.toDetailString(buffer));
setInputBuffer(buffer);
}
@Override
public void onOpen()
{
notifyAccept(getSession());
for (Frame frame : upgradeFrames)
getSession().onFrame(frame);
super.onOpen();
}
private void notifyAccept(ISession session)
{
try
{
listener.onAccept(session);
}
catch (Throwable x)
{
LOG.info("Failure while notifying listener " + listener, x);
}
}
public void onNewStream(Connector connector, IStream stream, HeadersFrame frame)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onRequest(frame);
if (task != null)
offerTask(task, false);
}
public void onData(IStream stream, DataFrame frame, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", frame, stream);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null)
{
Runnable task = channel.onRequestContent(frame, callback);
if (task != null)
offerTask(task, false);
}
}
public boolean onStreamTimeout(IStream stream, Throwable failure)
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = channel != null && channel.onStreamTimeout(failure);
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
return result;
}
public void onStreamFailure(IStream stream, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null)
channel.onFailure(failure);
}
public boolean onSessionTimeout(Throwable failure)
{
ISession session = getSession();
boolean result = true;
for (Stream stream : session.getStreams())
{
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null)
result &= !channel.isRequestHandled();
}
if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);
return result;
}
public void onSessionFailure(Throwable failure)
{
ISession session = getSession();
if (LOG.isDebugEnabled())
LOG.debug("Processing failure on {}: {}", session, failure);
for (Stream stream : session.getStreams())
onStreamFailure((IStream)stream, failure);
}
public void push(Connector connector, IStream stream, MetaData.Request request)
{
if (LOG.isDebugEnabled())
LOG.debug("Processing push {} on {}", request, stream);
HttpChannelOverHTTP2 channel = provideHttpChannel(connector, stream);
Runnable task = channel.onPushRequest(request);
if (task != null)
offerTask(task, true);
}
private HttpChannelOverHTTP2 provideHttpChannel(Connector connector, IStream stream)
{
HttpChannelOverHTTP2 channel = pollChannel();
if (channel != null)
{
channel.getHttpTransport().setStream(stream);
if (LOG.isDebugEnabled())
LOG.debug("Recycling channel {} for {}", channel, this);
}
else
{
HttpTransportOverHTTP2 transport = new HttpTransportOverHTTP2(connector, this);
transport.setStream(stream);
channel = newServerHttpChannelOverHTTP2(connector, httpConfig, transport);
if (LOG.isDebugEnabled())
LOG.debug("Creating channel {} for {}", channel, this);
}
stream.setAttribute(IStream.CHANNEL_ATTRIBUTE, channel);
return channel;
}
protected ServerHttpChannelOverHTTP2 newServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration httpConfig, HttpTransportOverHTTP2 transport)
{
return new ServerHttpChannelOverHTTP2(connector, httpConfig, getEndPoint(), transport);
}
private void offerChannel(HttpChannelOverHTTP2 channel)
{
synchronized (this)
{
channels.offer(channel);
}
}
private HttpChannelOverHTTP2 pollChannel()
{
synchronized (this)
{
return channels.poll();
}
}
public boolean upgrade(Request request)
{
if (HttpMethod.PRI.is(request.getMethod()))
{
getParser().directUpgrade();
}
else
{
HttpField settingsField = request.getFields().getField(HttpHeader.HTTP2_SETTINGS);
if (settingsField == null)
throw new BadMessageException("Missing " + HttpHeader.HTTP2_SETTINGS + " header");
String value = settingsField.getValue();
final byte[] settings = B64Code.decodeRFC4648URL(value == null ? "" : value);
if (LOG.isDebugEnabled())
LOG.debug("{} settings {}",this,TypeUtil.toHexString(settings));
SettingsFrame settingsFrame = SettingsBodyParser.parseBody(BufferUtil.toBuffer(settings));
if (settingsFrame == null)
{
LOG.warn("Invalid {} header value: {}", HttpHeader.HTTP2_SETTINGS, value);
throw new BadMessageException();
}
getParser().standardUpgrade();
upgradeFrames.add(new PrefaceFrame());
upgradeFrames.add(settingsFrame);
// Remember the request to send a response from onOpen().
upgradeFrames.add(new HeadersFrame(1, new Request(request), null, true));
}
return true;
}
protected class ServerHttpChannelOverHTTP2 extends HttpChannelOverHTTP2 implements ExecutionStrategy.Rejectable
{
public ServerHttpChannelOverHTTP2(Connector connector, HttpConfiguration configuration, EndPoint endPoint, HttpTransportOverHTTP2 transport)
{
super(connector, configuration, endPoint, transport);
}
@Override
public Runnable onRequest(HeadersFrame frame)
{
totalRequests.incrementAndGet();
return super.onRequest(frame);
}
@Override
public void onCompleted()
{
totalResponses.incrementAndGet();
super.onCompleted();
if (!getStream().isReset())
recycle();
}
@Override
public void recycle()
{
getStream().removeAttribute(IStream.CHANNEL_ATTRIBUTE);
super.recycle();
offerChannel(this);
}
@Override
public void reject()
{
IStream stream = getStream();
if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Request #{}/{} rejected", stream.getId(), Integer.toHexString(stream.getSession().hashCode()));
stream.reset(new ResetFrame(stream.getId(), ErrorCode.ENHANCE_YOUR_CALM_ERROR.code), Callback.NOOP);
// Consume the existing queued data frames to
// avoid stalling the session flow control.
consumeInput();
}
}
}