| // |
| // ======================================================================== |
| // Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.spdy.server; |
| |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.not; |
| import static org.junit.Assert.assertThat; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.eclipse.jetty.io.MappedByteBufferPool; |
| import org.eclipse.jetty.spdy.StandardCompressionFactory; |
| import org.eclipse.jetty.spdy.api.BytesDataInfo; |
| import org.eclipse.jetty.spdy.api.DataInfo; |
| import org.eclipse.jetty.spdy.api.GoAwayInfo; |
| import org.eclipse.jetty.spdy.api.GoAwayResultInfo; |
| import org.eclipse.jetty.spdy.api.ReplyInfo; |
| import org.eclipse.jetty.spdy.api.SPDY; |
| import org.eclipse.jetty.spdy.api.Session; |
| import org.eclipse.jetty.spdy.api.SessionStatus; |
| import org.eclipse.jetty.spdy.api.Stream; |
| import org.eclipse.jetty.spdy.api.StreamFrameListener; |
| import org.eclipse.jetty.spdy.api.StringDataInfo; |
| import org.eclipse.jetty.spdy.api.SynInfo; |
| import org.eclipse.jetty.spdy.api.server.ServerSessionFrameListener; |
| import org.eclipse.jetty.spdy.frames.ControlFrame; |
| import org.eclipse.jetty.spdy.frames.GoAwayFrame; |
| import org.eclipse.jetty.spdy.frames.RstStreamFrame; |
| import org.eclipse.jetty.spdy.frames.SynReplyFrame; |
| import org.eclipse.jetty.spdy.frames.SynStreamFrame; |
| import org.eclipse.jetty.spdy.generator.Generator; |
| import org.eclipse.jetty.spdy.parser.Parser; |
| import org.eclipse.jetty.spdy.parser.Parser.Listener; |
| import org.eclipse.jetty.util.Callback; |
| import org.eclipse.jetty.util.Fields; |
| import org.junit.Assert; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| public class ClosedStreamTest extends AbstractTest |
| { |
| //TODO: Right now it sends a rst as the stream is unknown to the session once it's closed. |
| //TODO: But according to the spec we probably should just ignore the data?! |
| @Test |
| public void testDataSentOnClosedStreamIsIgnored() throws Exception |
| { |
| ServerSocketChannel server = ServerSocketChannel.open(); |
| server.bind(new InetSocketAddress("localhost", 0)); |
| |
| Session session = startClient(new InetSocketAddress("localhost", server.socket().getLocalPort()), null); |
| final CountDownLatch dataLatch = new CountDownLatch(2); |
| session.syn(new SynInfo(new Fields(), true), new StreamFrameListener.Adapter() |
| { |
| @Override |
| public void onData(Stream stream, DataInfo dataInfo) |
| { |
| dataLatch.countDown(); |
| } |
| }); |
| |
| SocketChannel channel = server.accept(); |
| ByteBuffer readBuffer = ByteBuffer.allocate(1024); |
| channel.read(readBuffer); |
| readBuffer.flip(); |
| int streamId = readBuffer.getInt(8); |
| |
| Generator generator = new Generator(new MappedByteBufferPool(), new StandardCompressionFactory.StandardCompressor()); |
| |
| ByteBuffer writeBuffer = generator.control(new SynReplyFrame(SPDY.V2, (byte)0, streamId, new Fields())); |
| channel.write(writeBuffer); |
| Assert.assertThat(writeBuffer.hasRemaining(), is(false)); |
| |
| byte[] bytes = new byte[1]; |
| writeBuffer = generator.data(streamId, bytes.length, new BytesDataInfo(bytes, true)); |
| channel.write(writeBuffer); |
| Assert.assertThat(writeBuffer.hasRemaining(), is(false)); |
| |
| // Write again to simulate the faulty condition |
| writeBuffer.flip(); |
| channel.write(writeBuffer); |
| Assert.assertThat(writeBuffer.hasRemaining(), is(false)); |
| |
| Assert.assertFalse(dataLatch.await(1, TimeUnit.SECONDS)); |
| |
| session.goAway(new GoAwayInfo(5, TimeUnit.SECONDS)); |
| |
| server.close(); |
| } |
| |
| @Test |
| public void testSendDataOnHalfClosedStreamCausesExceptionOnServer() throws Exception |
| { |
| final CountDownLatch replyReceivedLatch = new CountDownLatch(1); |
| final CountDownLatch clientReceivedDataLatch = new CountDownLatch(1); |
| final CountDownLatch exceptionWhenSendingData = new CountDownLatch(1); |
| |
| Session clientSession = startClient(startServer(new ServerSessionFrameListener.Adapter() |
| { |
| @Override |
| public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) |
| { |
| stream.reply(new ReplyInfo(true), new Callback.Adapter()); |
| try |
| { |
| replyReceivedLatch.await(5,TimeUnit.SECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| e.printStackTrace(); |
| } |
| try |
| { |
| stream.data(new StringDataInfo("data send after half closed",false), new Callback.Adapter()); |
| } |
| catch (RuntimeException e) |
| { |
| // we expect an exception here, but we don't want it to be logged |
| exceptionWhenSendingData.countDown(); |
| } |
| |
| return null; |
| } |
| }),null); |
| |
| Stream stream = clientSession.syn(new SynInfo(new Fields(), false),new StreamFrameListener.Adapter() |
| { |
| @Override |
| public void onReply(Stream stream, ReplyInfo replyInfo) |
| { |
| replyReceivedLatch.countDown(); |
| } |
| |
| @Override |
| public void onData(Stream stream, DataInfo dataInfo) |
| { |
| clientReceivedDataLatch.countDown(); |
| } |
| }); |
| assertThat("reply has been received by client",replyReceivedLatch.await(5,TimeUnit.SECONDS),is(true)); |
| assertThat("stream is half closed from server",stream.isHalfClosed(),is(true)); |
| assertThat("client has not received any data sent after stream was half closed by server", |
| clientReceivedDataLatch.await(1,TimeUnit.SECONDS), is(false)); |
| assertThat("sending data threw an exception",exceptionWhenSendingData.await(5,TimeUnit.SECONDS), is(true)); |
| } |
| |
| @Test |
| public void testV2ReceiveDataOnHalfClosedStream() throws Exception |
| { |
| runReceiveDataOnHalfClosedStream(SPDY.V2); |
| } |
| |
| @Test |
| @Ignore("until v3 is properly implemented") |
| public void testV3ReceiveDataOnHalfClosedStream() throws Exception |
| { |
| runReceiveDataOnHalfClosedStream(SPDY.V3); |
| } |
| |
| private void runReceiveDataOnHalfClosedStream(short version) throws Exception |
| { |
| final CountDownLatch clientResetReceivedLatch = new CountDownLatch(1); |
| final CountDownLatch serverReplySentLatch = new CountDownLatch(1); |
| final CountDownLatch clientReplyReceivedLatch = new CountDownLatch(1); |
| final CountDownLatch serverDataReceivedLatch = new CountDownLatch(1); |
| final CountDownLatch goAwayReceivedLatch = new CountDownLatch(1); |
| |
| InetSocketAddress startServer = startServer(new ServerSessionFrameListener.Adapter() |
| { |
| @Override |
| public StreamFrameListener onSyn(Stream stream, SynInfo synInfo) |
| { |
| stream.reply(new ReplyInfo(false), new Callback.Adapter()); |
| serverReplySentLatch.countDown(); |
| try |
| { |
| clientReplyReceivedLatch.await(5,TimeUnit.SECONDS); |
| } |
| catch (InterruptedException e) |
| { |
| e.printStackTrace(); |
| } |
| return new StreamFrameListener.Adapter() |
| { |
| @Override |
| public void onData(Stream stream, DataInfo dataInfo) |
| { |
| serverDataReceivedLatch.countDown(); |
| } |
| }; |
| } |
| @Override |
| public void onGoAway(Session session, GoAwayResultInfo goAwayInfo) |
| { |
| goAwayReceivedLatch.countDown(); |
| } |
| }); |
| |
| final Generator generator = new Generator(new MappedByteBufferPool(), new StandardCompressionFactory().newCompressor()); |
| int streamId = 1; |
| ByteBuffer synData = generator.control(new SynStreamFrame(version,SynInfo.FLAG_CLOSE, streamId,0,(byte)0,(short)0,new Fields())); |
| |
| final SocketChannel socketChannel = SocketChannel.open(startServer); |
| socketChannel.write(synData); |
| assertThat("synData is fully written", synData.hasRemaining(), is(false)); |
| |
| assertThat("server: push reply is sent",serverReplySentLatch.await(5,TimeUnit.SECONDS),is(true)); |
| |
| Parser parser = new Parser(new StandardCompressionFactory.StandardDecompressor()); |
| parser.addListener(new Listener.Adapter() |
| { |
| @Override |
| public void onControlFrame(ControlFrame frame) |
| { |
| if (frame instanceof SynReplyFrame) |
| { |
| SynReplyFrame synReplyFrame = (SynReplyFrame)frame; |
| clientReplyReceivedLatch.countDown(); |
| int streamId = synReplyFrame.getStreamId(); |
| ByteBuffer data = generator.data(streamId,0,new StringDataInfo("data",false)); |
| try |
| { |
| socketChannel.write(data); |
| } |
| catch (IOException e) |
| { |
| e.printStackTrace(); |
| } |
| } |
| else if (frame instanceof RstStreamFrame) |
| { |
| clientResetReceivedLatch.countDown(); |
| } |
| } |
| }); |
| ByteBuffer response = ByteBuffer.allocate(28); |
| socketChannel.read(response); |
| response.flip(); |
| parser.parse(response); |
| |
| assertThat("server didn't receive data",serverDataReceivedLatch.await(1,TimeUnit.SECONDS),not(true)); |
| assertThat("client didn't receive reset",clientResetReceivedLatch.await(1,TimeUnit.SECONDS),not(true)); |
| |
| ByteBuffer buffer = generator.control(new GoAwayFrame(version, streamId, SessionStatus.OK.getCode())); |
| socketChannel.write(buffer); |
| Assert.assertThat(buffer.hasRemaining(), is(false)); |
| |
| assertThat("GoAway frame is received by server", goAwayReceivedLatch.await(5,TimeUnit.SECONDS), is(true)); |
| |
| socketChannel.close(); |
| } |
| } |