blob: 9abd36016c17f0fc14058d9177fa30e6aa33c61a [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.proxy;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritePendingException;
import javax.servlet.ReadListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletInputStream;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.DeferredContentProvider;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IteratingCallback;
/**
* <p>Servlet 3.1 asynchronous proxy servlet.</p>
* <p>Both the request processing and the I/O are asynchronous.</p>
*
* @see ProxyServlet
* @see AsyncMiddleManServlet
* @see ConnectHandler
*/
public class AsyncProxyServlet extends ProxyServlet
{
private static final String WRITE_LISTENER_ATTRIBUTE = AsyncProxyServlet.class.getName() + ".writeListener";
@Override
protected ContentProvider proxyRequestContent(HttpServletRequest request, HttpServletResponse response, Request proxyRequest) throws IOException
{
ServletInputStream input = request.getInputStream();
DeferredContentProvider provider = new DeferredContentProvider();
input.setReadListener(newReadListener(request, response, proxyRequest, provider));
return provider;
}
protected ReadListener newReadListener(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider)
{
return new StreamReader(request, response, proxyRequest, provider);
}
@Override
protected void onResponseContent(HttpServletRequest request, HttpServletResponse response, Response proxyResponse, byte[] buffer, int offset, int length, Callback callback)
{
try
{
if (_log.isDebugEnabled())
_log.debug("{} proxying content to downstream: {} bytes", getRequestId(request), length);
StreamWriter writeListener = (StreamWriter)request.getAttribute(WRITE_LISTENER_ATTRIBUTE);
if (writeListener == null)
{
writeListener = newWriteListener(request, proxyResponse);
request.setAttribute(WRITE_LISTENER_ATTRIBUTE, writeListener);
// Set the data to write before calling setWriteListener(), because
// setWriteListener() may trigger the call to onWritePossible() on
// a different thread and we would have a race.
writeListener.data(buffer, offset, length, callback);
// Setting the WriteListener triggers an invocation to onWritePossible().
response.getOutputStream().setWriteListener(writeListener);
}
else
{
writeListener.data(buffer, offset, length, callback);
writeListener.onWritePossible();
}
}
catch (Throwable x)
{
callback.failed(x);
proxyResponse.abort(x);
}
}
protected StreamWriter newWriteListener(HttpServletRequest request, Response proxyResponse)
{
return new StreamWriter(request, proxyResponse);
}
/**
* <p>Convenience extension of {@link AsyncProxyServlet} that offers transparent proxy functionalities.</p>
*
* @see org.eclipse.jetty.proxy.AbstractProxyServlet.TransparentDelegate
*/
public static class Transparent extends AsyncProxyServlet
{
private final TransparentDelegate delegate = new TransparentDelegate(this);
@Override
public void init(ServletConfig config) throws ServletException
{
super.init(config);
delegate.init(config);
}
@Override
protected String rewriteTarget(HttpServletRequest clientRequest)
{
return delegate.rewriteTarget(clientRequest);
}
}
protected class StreamReader extends IteratingCallback implements ReadListener
{
private final byte[] buffer = new byte[getHttpClient().getRequestBufferSize()];
private final HttpServletRequest request;
private final HttpServletResponse response;
private final Request proxyRequest;
private final DeferredContentProvider provider;
protected StreamReader(HttpServletRequest request, HttpServletResponse response, Request proxyRequest, DeferredContentProvider provider)
{
this.request = request;
this.response = response;
this.proxyRequest = proxyRequest;
this.provider = provider;
}
@Override
public void onDataAvailable() throws IOException
{
iterate();
}
@Override
public void onAllDataRead() throws IOException
{
if (_log.isDebugEnabled())
_log.debug("{} proxying content to upstream completed", getRequestId(request));
provider.close();
}
@Override
public void onError(Throwable t)
{
onClientRequestFailure(request, proxyRequest, response, t);
}
@Override
protected Action process() throws Exception
{
int requestId = _log.isDebugEnabled() ? getRequestId(request) : 0;
ServletInputStream input = request.getInputStream();
// First check for isReady() because it has
// side effects, and then for isFinished().
while (input.isReady() && !input.isFinished())
{
int read = input.read(buffer);
if (_log.isDebugEnabled())
_log.debug("{} asynchronous read {} bytes on {}", requestId, read, input);
if (read > 0)
{
if (_log.isDebugEnabled())
_log.debug("{} proxying content to upstream: {} bytes", requestId, read);
onRequestContent(request, proxyRequest, provider, buffer, 0, read, this);
return Action.SCHEDULED;
}
}
if (input.isFinished())
{
if (_log.isDebugEnabled())
_log.debug("{} asynchronous read complete on {}", requestId, input);
return Action.SUCCEEDED;
}
else
{
if (_log.isDebugEnabled())
_log.debug("{} asynchronous read pending on {}", requestId, input);
return Action.IDLE;
}
}
protected void onRequestContent(HttpServletRequest request, Request proxyRequest, DeferredContentProvider provider, byte[] buffer, int offset, int length, Callback callback)
{
provider.offer(ByteBuffer.wrap(buffer, offset, length), callback);
}
@Override
public void failed(Throwable x)
{
super.failed(x);
onError(x);
}
}
protected class StreamWriter implements WriteListener
{
private final HttpServletRequest request;
private final Response proxyResponse;
private WriteState state;
private byte[] buffer;
private int offset;
private int length;
private Callback callback;
protected StreamWriter(HttpServletRequest request, Response proxyResponse)
{
this.request = request;
this.proxyResponse = proxyResponse;
this.state = WriteState.IDLE;
}
protected void data(byte[] bytes, int offset, int length, Callback callback)
{
if (state != WriteState.IDLE)
throw new WritePendingException();
this.state = WriteState.READY;
this.buffer = bytes;
this.offset = offset;
this.length = length;
this.callback = callback;
}
@Override
public void onWritePossible() throws IOException
{
int requestId = getRequestId(request);
ServletOutputStream output = request.getAsyncContext().getResponse().getOutputStream();
if (state == WriteState.READY)
{
// There is data to write.
if (_log.isDebugEnabled())
_log.debug("{} asynchronous write start of {} bytes on {}", requestId, length, output);
output.write(buffer, offset, length);
state = WriteState.PENDING;
if (output.isReady())
{
if (_log.isDebugEnabled())
_log.debug("{} asynchronous write of {} bytes completed on {}", requestId, length, output);
complete();
}
else
{
if (_log.isDebugEnabled())
_log.debug("{} asynchronous write of {} bytes pending on {}", requestId, length, output);
}
}
else if (state == WriteState.PENDING)
{
// The write blocked but is now complete.
if (_log.isDebugEnabled())
_log.debug("{} asynchronous write of {} bytes completing on {}", requestId, length, output);
complete();
}
else
{
throw new IllegalStateException();
}
}
protected void complete()
{
buffer = null;
offset = 0;
length = 0;
Callback c = callback;
callback = null;
state = WriteState.IDLE;
// Call the callback only after the whole state has been reset,
// because the callback may trigger a reentrant call and
// the state must already be the new one that we reset here.
c.succeeded();
}
@Override
public void onError(Throwable failure)
{
proxyResponse.abort(failure);
}
}
private enum WriteState
{
READY, PENDING, IDLE
}
}