blob: 5530b389d01f3a7cc301b30c509507467275e559 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.util;
import java.io.IOException;
import java.io.InputStream;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Response.Listener;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* Implementation of {@link Listener} that produces an {@link InputStream}
* that allows applications to read the response content.
* <p>
* Typical usage is:
* <pre>
* InputStreamResponseListener listener = new InputStreamResponseListener();
* client.newRequest(...).send(listener);
*
* // Wait for the response headers to arrive
* Response response = listener.get(5, TimeUnit.SECONDS);
* if (response.getStatus() == 200)
* {
* // Obtain the input stream on the response content
* try (InputStream input = listener.getInputStream())
* {
* // Read the response content
* }
* }
* </pre>
* <p>
* The {@link HttpClient} implementation (the producer) will feed the input stream
* asynchronously while the application (the consumer) is reading from it.
* <p>
* If the consumer is faster than the producer, then the consumer will block
* with the typical {@link InputStream#read()} semantic.
* If the consumer is slower than the producer, then the producer will block
* until the client consumes.
*/
public class InputStreamResponseListener extends Listener.Adapter
{
private static final Logger LOG = Log.getLogger(InputStreamResponseListener.class);
private static final DeferredContentProvider.Chunk EOF = new DeferredContentProvider.Chunk(BufferUtil.EMPTY_BUFFER, Callback.NOOP);
private final Object lock = this;
private final CountDownLatch responseLatch = new CountDownLatch(1);
private final CountDownLatch resultLatch = new CountDownLatch(1);
private final AtomicReference<InputStream> stream = new AtomicReference<>();
private final Queue<DeferredContentProvider.Chunk> chunks = new ArrayDeque<>();
private Response response;
private Result result;
private Throwable failure;
private boolean closed;
public InputStreamResponseListener()
{
}
/**
* @deprecated response content is not buffered anymore, but handled asynchronously.
*/
@Deprecated
public InputStreamResponseListener(long maxBufferSize)
{
}
@Override
public void onHeaders(Response response)
{
synchronized (lock)
{
this.response = response;
responseLatch.countDown();
}
}
@Override
public void onContent(Response response, ByteBuffer content, Callback callback)
{
if (content.remaining() == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("Skipped empty content {}", content);
callback.succeeded();
return;
}
boolean closed;
synchronized (lock)
{
closed = this.closed;
if (!closed)
{
if (LOG.isDebugEnabled())
LOG.debug("Queueing content {}", content);
chunks.add(new DeferredContentProvider.Chunk(content, callback));
lock.notifyAll();
}
}
if (closed)
{
if (LOG.isDebugEnabled())
LOG.debug("InputStream closed, ignored content {}", content);
callback.failed(new AsynchronousCloseException());
}
}
@Override
public void onSuccess(Response response)
{
synchronized (lock)
{
if (!closed)
chunks.add(EOF);
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("End of content");
}
@Override
public void onFailure(Response response, Throwable failure)
{
List<Callback> callbacks;
synchronized (lock)
{
if (this.failure != null)
return;
this.failure = failure;
callbacks = drain();
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("Content failure", failure);
callbacks.forEach(callback -> callback.failed(failure));
}
@Override
public void onComplete(Result result)
{
Throwable failure = result.getFailure();
List<Callback> callbacks = Collections.emptyList();
synchronized (lock)
{
this.result = result;
if (result.isFailed() && this.failure == null)
{
this.failure = failure;
callbacks = drain();
}
// Notify the response latch in case of request failures.
responseLatch.countDown();
resultLatch.countDown();
lock.notifyAll();
}
if (LOG.isDebugEnabled())
{
if (failure == null)
LOG.debug("Result success");
else
LOG.debug("Result failure", failure);
}
callbacks.forEach(callback -> callback.failed(failure));
}
/**
* Waits for the given timeout for the response to be available, then returns it.
* <p>
* The wait ends as soon as all the HTTP headers have been received, without waiting for the content.
* To wait for the whole content, see {@link #await(long, TimeUnit)}.
*
* @param timeout the time to wait
* @param unit the timeout unit
* @return the response
* @throws InterruptedException if the thread is interrupted
* @throws TimeoutException if the timeout expires
* @throws ExecutionException if a failure happened
*/
public Response get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException, ExecutionException
{
boolean expired = !responseLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
synchronized (lock)
{
// If the request failed there is no response.
if (response == null)
throw new ExecutionException(failure);
return response;
}
}
/**
* Waits for the given timeout for the whole request/response cycle to be finished,
* then returns the corresponding result.
* <p>
*
* @param timeout the time to wait
* @param unit the timeout unit
* @return the result
* @throws InterruptedException if the thread is interrupted
* @throws TimeoutException if the timeout expires
* @see #get(long, TimeUnit)
*/
public Result await(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException
{
boolean expired = !resultLatch.await(timeout, unit);
if (expired)
throw new TimeoutException();
synchronized (lock)
{
return result;
}
}
/**
* Returns an {@link InputStream} providing the response content bytes.
* <p>
* The method may be invoked only once; subsequent invocations will return a closed {@link InputStream}.
*
* @return an input stream providing the response content
*/
public InputStream getInputStream()
{
InputStream result = new Input();
if (stream.compareAndSet(null, result))
return result;
return IO.getClosedStream();
}
private List<Callback> drain()
{
List<Callback> callbacks = new ArrayList<>();
synchronized (lock)
{
while (true)
{
DeferredContentProvider.Chunk chunk = chunks.peek();
if (chunk == null || chunk == EOF)
break;
callbacks.add(chunk.callback);
chunks.poll();
}
}
return callbacks;
}
private class Input extends InputStream
{
@Override
public int read() throws IOException
{
byte[] tmp = new byte[1];
int read = read(tmp);
if (read < 0)
return read;
return tmp[0] & 0xFF;
}
@Override
public int read(byte[] b, int offset, int length) throws IOException
{
try
{
int result;
Callback callback = null;
synchronized (lock)
{
DeferredContentProvider.Chunk chunk;
while (true)
{
chunk = chunks.peek();
if (chunk == EOF)
return -1;
if (chunk != null)
break;
if (failure != null)
throw toIOException(failure);
if (closed)
throw new AsynchronousCloseException();
lock.wait();
}
ByteBuffer buffer = chunk.buffer;
result = Math.min(buffer.remaining(), length);
buffer.get(b, offset, result);
if (!buffer.hasRemaining())
{
callback = chunk.callback;
chunks.poll();
}
}
if (callback != null)
callback.succeeded();
return result;
}
catch (InterruptedException x)
{
throw new InterruptedIOException();
}
}
private IOException toIOException(Throwable failure)
{
if (failure instanceof IOException)
return (IOException)failure;
else
return new IOException(failure);
}
@Override
public void close() throws IOException
{
List<Callback> callbacks;
synchronized (lock)
{
if (closed)
return;
closed = true;
callbacks = drain();
lock.notifyAll();
}
if (LOG.isDebugEnabled())
LOG.debug("InputStream close");
Throwable failure = new AsynchronousCloseException();
callbacks.forEach(callback -> callback.failed(failure));
super.close();
}
}
}