blob: 91d1b9ff677415c5b771c3e3b2045c9e9b3820db [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.spdy.server;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.spdy.StandardCompressionFactory;
import org.eclipse.jetty.spdy.api.BytesDataInfo;
import org.eclipse.jetty.spdy.api.DataInfo;
import org.eclipse.jetty.spdy.api.GoAwayInfo;
import org.eclipse.jetty.spdy.api.GoAwayResultInfo;
import org.eclipse.jetty.spdy.api.ReplyInfo;
import org.eclipse.jetty.spdy.api.SPDY;
import org.eclipse.jetty.spdy.api.Session;
import org.eclipse.jetty.spdy.api.SessionStatus;
import org.eclipse.jetty.spdy.api.Stream;
import org.eclipse.jetty.spdy.api.StreamFrameListener;
import org.eclipse.jetty.spdy.api.StringDataInfo;
import org.eclipse.jetty.spdy.api.SynInfo;
import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener;
import org.eclipse.jetty.spdy.frames.ControlFrame;
import org.eclipse.jetty.spdy.frames.GoAwayFrame;
import org.eclipse.jetty.spdy.frames.RstStreamFrame;
import org.eclipse.jetty.spdy.frames.SynReplyFrame;
import org.eclipse.jetty.spdy.frames.SynStreamFrame;
import org.eclipse.jetty.spdy.generator.Generator;
import org.eclipse.jetty.spdy.parser.Parser;
import org.eclipse.jetty.spdy.parser.Parser.Listener;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Fields;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
public class ClosedStreamTest extends AbstractTest
{
//TODO: Right now it sends a rst as the stream is unknown to the session once it's closed.
//TODO: But according to the spec we probably should just ignore the data?!
@Test
public void testDataSentOnClosedStreamIsIgnored() throws Exception
{
ServerSocketChannel server = ServerSocketChannel.open();
server.bind(new InetSocketAddress("localhost", 0));
Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null);
final CountDownLatch dataLatch = new CountDownLatch(2);
session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
dataLatch.countDown();
}
});
SocketChannel channel = server.accept();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
channel.read(readBuffer);
readBuffer.flip();
int streamId = readBuffer.getInt(8);
Generator generator = new Generator(new MappedByteBufferPool(), new StandardCompressionFactory.StandardCompressor());
ByteBuffer writeBuffer = generator.control(new SynReplyFrame(SPDY.V2, (byte)0, streamId, new Fields()));
channel.write(writeBuffer);
Assert.assertThat(writeBuffer.hasRemaining(), is(false));
byte[] bytes = new byte[1];
writeBuffer = generator.data(streamId, bytes.length, new BytesDataInfo(bytes, true));
channel.write(writeBuffer);
Assert.assertThat(writeBuffer.hasRemaining(), is(false));
// Write again to simulate the faulty condition
writeBuffer.flip();
channel.write(writeBuffer);
Assert.assertThat(writeBuffer.hasRemaining(), is(false));
Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS));
session.goAway(new GoAwayInfo(5, TimeUnit.SECONDS));
server.close();
}
@Test
public void testSendDataOnHalfClosedStreamCausesExceptionOnServer() throws Exception
{
final CountDownLatch replyReceivedLatch = new CountDownLatch(1);
final CountDownLatch clientReceivedDataLatch = new CountDownLatch(1);
final CountDownLatch exceptionWhenSendingData = new CountDownLatch(1);
Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(true), new Callback.Adapter());
try
{
replyReceivedLatch.await(5,TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
try
{
stream.data(new StringDataInfo("data send after half closed",false), new Callback.Adapter());
}
catch (RuntimeException e)
{
// we expect an exception here, but we don't want it to be logged
exceptionWhenSendingData.countDown();
}
return null;
}
}),null);
Stream stream = clientSession.syn(new SynInfo(new Fields(), false),new StreamFrameListener.Adapter()
{
@Override
public void onReply(Stream stream, ReplyInfo replyInfo)
{
replyReceivedLatch.countDown();
}
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
clientReceivedDataLatch.countDown();
}
});
assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true));
assertThat("stream is half closed from server",stream.isHalfClosed(),is(true));
assertThat("client has not received any data sent after stream was half closed by server",
clientReceivedDataLatch.await(1,TimeUnit.SECONDS), is(false));
assertThat("sending data threw an exception",exceptionWhenSendingData.await(5,TimeUnit.SECONDS), is(true));
}
@Test
public void testV2ReceiveDataOnHalfClosedStream() throws Exception
{
runReceiveDataOnHalfClosedStream(SPDY.V2);
}
@Test
@Ignore("until v3 is properly implemented")
public void testV3ReceiveDataOnHalfClosedStream() throws Exception
{
runReceiveDataOnHalfClosedStream(SPDY.V3);
}
private void runReceiveDataOnHalfClosedStream(short version) throws Exception
{
final CountDownLatch clientResetReceivedLatch = new CountDownLatch(1);
final CountDownLatch serverReplySentLatch = new CountDownLatch(1);
final CountDownLatch clientReplyReceivedLatch = new CountDownLatch(1);
final CountDownLatch serverDataReceivedLatch = new CountDownLatch(1);
final CountDownLatch goAwayReceivedLatch = new CountDownLatch(1);
InetSocketAddress startServer = startServer(new ServerSessionFrameListener.Adapter()
{
@Override
public StreamFrameListener onSyn(Stream stream, SynInfo synInfo)
{
stream.reply(new ReplyInfo(false), new Callback.Adapter());
serverReplySentLatch.countDown();
try
{
clientReplyReceivedLatch.await(5,TimeUnit.SECONDS);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
return new StreamFrameListener.Adapter()
{
@Override
public void onData(Stream stream, DataInfo dataInfo)
{
serverDataReceivedLatch.countDown();
}
};
}
@Override
public void onGoAway(Session session, GoAwayResultInfo goAwayInfo)
{
goAwayReceivedLatch.countDown();
}
});
final Generator generator = new Generator(new MappedByteBufferPool(), new StandardCompressionFactory().newCompressor());
int streamId = 1;
ByteBuffer synData = generator.control(new SynStreamFrame(version,SynInfo.FLAG_CLOSE, streamId,0,(byte)0,(short)0,new Fields()));
final SocketChannel socketChannel = SocketChannel.open(startServer);
socketChannel.write(synData);
assertThat("synData is fully written", synData.hasRemaining(), is(false));
assertThat("server: push reply is sent",serverReplySentLatch.await(5,TimeUnit.SECONDS),is(true));
Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor());
parser.addListener(new Listener.Adapter()
{
@Override
public void onControlFrame(ControlFrame frame)
{
if (frame instanceof SynReplyFrame)
{
SynReplyFrame synReplyFrame = (SynReplyFrame)frame;
clientReplyReceivedLatch.countDown();
int streamId = synReplyFrame.getStreamId();
ByteBuffer data = generator.data(streamId,0,new StringDataInfo("data",false));
try
{
socketChannel.write(data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
else if (frame instanceof RstStreamFrame)
{
clientResetReceivedLatch.countDown();
}
}
});
ByteBuffer response = ByteBuffer.allocate(28);
socketChannel.read(response);
response.flip();
parser.parse(response);
assertThat("server didn't receive data",serverDataReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
assertThat("client didn't receive reset",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true));
ByteBuffer buffer = generator.control(new GoAwayFrame(version, streamId, SessionStatus.OK.getCode()));
socketChannel.write(buffer);
Assert.assertThat(buffer.hasRemaining(), is(false));
assertThat("GoAway frame is received by server", goAwayReceivedLatch.await(5,TimeUnit.SECONDS), is(true));
socketChannel.close();
}
}