| // |
| // ======================================================================== |
| // Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.io; |
| |
| import static org.hamcrest.Matchers.greaterThan; |
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertTrue; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.Socket; |
| import java.net.SocketTimeoutException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.nio.charset.StandardCharsets; |
| import java.util.concurrent.ArrayBlockingQueue; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.eclipse.jetty.util.BufferUtil; |
| import org.eclipse.jetty.util.Callback; |
| import org.eclipse.jetty.util.FutureCallback; |
| import org.eclipse.jetty.util.log.Log; |
| import org.eclipse.jetty.util.log.Logger; |
| import org.eclipse.jetty.util.thread.QueuedThreadPool; |
| import org.eclipse.jetty.util.thread.Scheduler; |
| import org.eclipse.jetty.util.thread.TimerScheduler; |
| import org.hamcrest.Matchers; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Ignore; |
| import org.junit.Test; |
| |
| public class SelectChannelEndPointTest |
| { |
| private static final Logger LOG = Log.getLogger(SelectChannelEndPointTest.class); |
| protected CountDownLatch _lastEndPointLatch; |
| protected volatile EndPoint _lastEndPoint; |
| protected ServerSocketChannel _connector; |
| protected QueuedThreadPool _threadPool = new QueuedThreadPool(); |
| protected Scheduler _scheduler = new TimerScheduler(); |
| protected SelectorManager _manager = new SelectorManager(_threadPool, _scheduler) |
| { |
| @Override |
| public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) |
| { |
| return SelectChannelEndPointTest.this.newConnection(channel, endpoint); |
| } |
| |
| @Override |
| protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException |
| { |
| SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000); |
| _lastEndPoint = endp; |
| _lastEndPointLatch.countDown(); |
| return endp; |
| } |
| }; |
| |
| // Must be volatile or the test may fail spuriously |
| protected volatile int _blockAt = 0; |
| private volatile int _writeCount = 1; |
| |
| @Before |
| public void startManager() throws Exception |
| { |
| _writeCount = 1; |
| _lastEndPoint = null; |
| _lastEndPointLatch = new CountDownLatch(1); |
| _connector = ServerSocketChannel.open(); |
| _connector.socket().bind(null); |
| _scheduler.start(); |
| _threadPool.start(); |
| _manager.start(); |
| } |
| |
| @After |
| public void stopManager() throws Exception |
| { |
| _scheduler.stop(); |
| _manager.stop(); |
| _threadPool.stop(); |
| _connector.close(); |
| } |
| |
| protected Socket newClient() throws IOException |
| { |
| return new Socket(_connector.socket().getInetAddress(), _connector.socket().getLocalPort()); |
| } |
| |
| protected Connection newConnection(SocketChannel channel, EndPoint endpoint) |
| { |
| return new TestConnection(endpoint); |
| } |
| |
| public class TestConnection extends AbstractConnection |
| { |
| volatile FutureCallback _blockingRead; |
| ByteBuffer _in = BufferUtil.allocate(32 * 1024); |
| ByteBuffer _out = BufferUtil.allocate(32 * 1024); |
| long _last = -1; |
| final CountDownLatch _latch; |
| |
| public TestConnection(EndPoint endp) |
| { |
| super(endp, _threadPool); |
| _latch=null; |
| } |
| |
| public TestConnection(EndPoint endp,CountDownLatch latch) |
| { |
| super(endp, _threadPool); |
| _latch=latch; |
| } |
| |
| @Override |
| public void onOpen() |
| { |
| super.onOpen(); |
| fillInterested(); |
| } |
| |
| @Override |
| public void onFillInterestedFailed(Throwable cause) |
| { |
| Callback blocking = _blockingRead; |
| if (blocking!=null) |
| { |
| _blockingRead=null; |
| blocking.failed(cause); |
| return; |
| } |
| super.onFillInterestedFailed(cause); |
| } |
| |
| @Override |
| public void onFillable() |
| { |
| if (_latch!=null) |
| { |
| try |
| { |
| _latch.await(); |
| } |
| catch (InterruptedException e) |
| { |
| e.printStackTrace(); |
| } |
| } |
| |
| Callback blocking = _blockingRead; |
| if (blocking!=null) |
| { |
| _blockingRead=null; |
| blocking.succeeded(); |
| return; |
| } |
| |
| EndPoint _endp = getEndPoint(); |
| try |
| { |
| _last = System.currentTimeMillis(); |
| boolean progress = true; |
| while (progress) |
| { |
| progress = false; |
| |
| // Fill the input buffer with everything available |
| BufferUtil.compact(_in); |
| if (BufferUtil.isFull(_in)) |
| throw new IllegalStateException("FULL " + BufferUtil.toDetailString(_in)); |
| int filled = _endp.fill(_in); |
| if (filled > 0) |
| progress = true; |
| |
| // If the tests wants to block, then block |
| while (_blockAt > 0 && _endp.isOpen() && _in.remaining() < _blockAt) |
| { |
| FutureCallback future = _blockingRead = new FutureCallback(); |
| fillInterested(); |
| future.get(); |
| filled = _endp.fill(_in); |
| progress |= filled > 0; |
| } |
| |
| // Copy to the out buffer |
| if (BufferUtil.hasContent(_in) && BufferUtil.append(_out, _in) > 0) |
| progress = true; |
| |
| // Blocking writes |
| if (BufferUtil.hasContent(_out)) |
| { |
| ByteBuffer out = _out.duplicate(); |
| BufferUtil.clear(_out); |
| for (int i = 0; i < _writeCount; i++) |
| { |
| FutureCallback blockingWrite = new FutureCallback(); |
| _endp.write(blockingWrite, out.asReadOnlyBuffer()); |
| blockingWrite.get(); |
| } |
| progress = true; |
| } |
| |
| // are we done? |
| if (_endp.isInputShutdown()) |
| _endp.shutdownOutput(); |
| } |
| |
| if (_endp.isOpen()) |
| fillInterested(); |
| } |
| catch (ExecutionException e) |
| { |
| // Timeout does not close, so echo exception then shutdown |
| try |
| { |
| FutureCallback blockingWrite = new FutureCallback(); |
| _endp.write(blockingWrite, BufferUtil.toBuffer("EE: " + BufferUtil.toString(_in))); |
| blockingWrite.get(); |
| _endp.shutdownOutput(); |
| } |
| catch (Exception e2) |
| { |
| // e2.printStackTrace(); |
| } |
| } |
| catch (InterruptedException | EofException e) |
| { |
| SelectChannelEndPoint.LOG.ignore(e); |
| } |
| catch (Exception e) |
| { |
| SelectChannelEndPoint.LOG.warn(e); |
| } |
| finally |
| { |
| } |
| } |
| } |
| |
| @Test |
| public void testEcho() throws Exception |
| { |
| Socket client = newClient(); |
| |
| client.setSoTimeout(60000); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| // Write client to server |
| client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); |
| |
| // Verify echo server to client |
| for (char c : "HelloWorld".toCharArray()) |
| { |
| int b = client.getInputStream().read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| |
| // wait for read timeout |
| client.setSoTimeout(500); |
| long start = System.currentTimeMillis(); |
| try |
| { |
| client.getInputStream().read(); |
| Assert.fail(); |
| } |
| catch (SocketTimeoutException e) |
| { |
| long duration = System.currentTimeMillis() - start; |
| Assert.assertThat("timeout duration", duration, greaterThanOrEqualTo(400L)); |
| } |
| |
| // write then shutdown |
| client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); |
| |
| // Verify echo server to client |
| for (char c : "Goodbye Cruel TLS".toCharArray()) |
| { |
| int b = client.getInputStream().read(); |
| Assert.assertThat("expect valid char integer", b, greaterThan(0)); |
| assertEquals("expect characters to be same", c, (char)b); |
| } |
| client.close(); |
| |
| for (int i = 0; i < 10; ++i) |
| { |
| if (server.isOpen()) |
| Thread.sleep(10); |
| else |
| break; |
| } |
| assertFalse(server.isOpen()); |
| } |
| |
| @Test |
| public void testShutdown() throws Exception |
| { |
| Socket client = newClient(); |
| |
| client.setSoTimeout(500); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| // Write client to server |
| client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); |
| |
| // Verify echo server to client |
| for (char c : "HelloWorld".toCharArray()) |
| { |
| int b = client.getInputStream().read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| |
| // wait for read timeout |
| long start = System.currentTimeMillis(); |
| try |
| { |
| client.getInputStream().read(); |
| Assert.fail(); |
| } |
| catch (SocketTimeoutException e) |
| { |
| assertTrue(System.currentTimeMillis() - start >= 400); |
| } |
| |
| // write then shutdown |
| client.getOutputStream().write("Goodbye Cruel TLS".getBytes(StandardCharsets.UTF_8)); |
| client.shutdownOutput(); |
| |
| // Verify echo server to client |
| for (char c : "Goodbye Cruel TLS".toCharArray()) |
| { |
| int b = client.getInputStream().read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| |
| // Read close |
| assertEquals(-1, client.getInputStream().read()); |
| } |
| |
| @Test |
| public void testReadBlocked() throws Exception |
| { |
| Socket client = newClient(); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| OutputStream clientOutputStream = client.getOutputStream(); |
| InputStream clientInputStream = client.getInputStream(); |
| |
| int specifiedTimeout = 1000; |
| client.setSoTimeout(specifiedTimeout); |
| |
| // Write 8 and cause block waiting for 10 |
| _blockAt = 10; |
| clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8)); |
| clientOutputStream.flush(); |
| |
| Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); |
| _lastEndPoint.setIdleTimeout(10 * specifiedTimeout); |
| Thread.sleep((11 * specifiedTimeout) / 10); |
| |
| long start = System.currentTimeMillis(); |
| try |
| { |
| int b = clientInputStream.read(); |
| Assert.fail("Should have timed out waiting for a response, but read " + b); |
| } |
| catch (SocketTimeoutException e) |
| { |
| int elapsed = Long.valueOf(System.currentTimeMillis() - start).intValue(); |
| Assert.assertThat("Expected timeout", elapsed, greaterThanOrEqualTo(3 * specifiedTimeout / 4)); |
| } |
| |
| // write remaining characters |
| clientOutputStream.write("90ABCDEF".getBytes(StandardCharsets.UTF_8)); |
| clientOutputStream.flush(); |
| |
| // Verify echo server to client |
| for (char c : "1234567890ABCDEF".toCharArray()) |
| { |
| int b = clientInputStream.read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| } |
| |
| @Test |
| public void testIdle() throws Exception |
| { |
| Socket client = newClient(); |
| |
| client.setSoTimeout(3000); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| // Write client to server |
| client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); |
| |
| // Verify echo server to client |
| for (char c : "HelloWorld".toCharArray()) |
| { |
| int b = client.getInputStream().read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| |
| Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); |
| int idleTimeout = 500; |
| _lastEndPoint.setIdleTimeout(idleTimeout); |
| |
| // read until idle shutdown received |
| long start = System.currentTimeMillis(); |
| int b = client.getInputStream().read(); |
| assertEquals(-1, b); |
| long idle = System.currentTimeMillis() - start; |
| assertTrue(idle > idleTimeout / 2); |
| assertTrue(idle < idleTimeout * 2); |
| |
| // But endpoint may still be open for a little bit. |
| for (int i = 0; i < 10; ++i) |
| { |
| if (_lastEndPoint.isOpen()) |
| Thread.sleep(2 * idleTimeout / 10); |
| else |
| break; |
| } |
| assertFalse(_lastEndPoint.isOpen()); |
| } |
| |
| @Test |
| public void testBlockedReadIdle() throws Exception |
| { |
| Socket client = newClient(); |
| InputStream clientInputStream = client.getInputStream(); |
| OutputStream clientOutputStream = client.getOutputStream(); |
| |
| client.setSoTimeout(5000); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| // Write client to server |
| clientOutputStream.write("HelloWorld".getBytes(StandardCharsets.UTF_8)); |
| |
| // Verify echo server to client |
| for (char c : "HelloWorld".toCharArray()) |
| { |
| int b = clientInputStream.read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| |
| Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); |
| int idleTimeout = 500; |
| _lastEndPoint.setIdleTimeout(idleTimeout); |
| |
| // Write 8 and cause block waiting for 10 |
| _blockAt = 10; |
| clientOutputStream.write("12345678".getBytes(StandardCharsets.UTF_8)); |
| clientOutputStream.flush(); |
| |
| // read until idle shutdown received |
| long start = System.currentTimeMillis(); |
| int b = clientInputStream.read(); |
| assertEquals('E', b); |
| long idle = System.currentTimeMillis() - start; |
| assertTrue(idle > idleTimeout / 2); |
| assertTrue(idle < idleTimeout * 2); |
| |
| for (char c : "E: 12345678".toCharArray()) |
| { |
| b = clientInputStream.read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| b = clientInputStream.read(); |
| assertEquals(-1,b); |
| |
| // But endpoint is still open. |
| if(_lastEndPoint.isOpen()) |
| // Wait for another idle callback |
| Thread.sleep(idleTimeout * 2); |
| |
| // endpoint is closed. |
| assertFalse(_lastEndPoint.isOpen()); |
| } |
| |
| @Test |
| public void testStress() throws Exception |
| { |
| Socket client = newClient(); |
| client.setSoTimeout(30000); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| final int writes = 200000; |
| |
| final byte[] bytes = "HelloWorld-".getBytes(StandardCharsets.UTF_8); |
| byte[] count = "0\n".getBytes(StandardCharsets.UTF_8); |
| BufferedOutputStream out = new BufferedOutputStream(client.getOutputStream()); |
| final CountDownLatch latch = new CountDownLatch(writes); |
| final InputStream in = new BufferedInputStream(client.getInputStream()); |
| final long start = System.currentTimeMillis(); |
| out.write(bytes); |
| out.write(count); |
| out.flush(); |
| |
| Assert.assertTrue(_lastEndPointLatch.await(1, TimeUnit.SECONDS)); |
| _lastEndPoint.setIdleTimeout(5000); |
| |
| new Thread() |
| { |
| @Override |
| public void run() |
| { |
| Thread.currentThread().setPriority(MAX_PRIORITY); |
| long last = -1; |
| int count = -1; |
| try |
| { |
| while (latch.getCount() > 0) |
| { |
| // Verify echo server to client |
| for (byte b0 : bytes) |
| { |
| int b = in.read(); |
| Assert.assertThat(b, greaterThan(0)); |
| assertEquals(0xff & b0, b); |
| } |
| |
| count = 0; |
| int b = in.read(); |
| while (b > 0 && b != '\n') |
| { |
| count = count * 10 + (b - '0'); |
| b = in.read(); |
| } |
| last = System.currentTimeMillis(); |
| |
| //if (latch.getCount()%1000==0) |
| // System.out.println(writes-latch.getCount()); |
| |
| latch.countDown(); |
| } |
| } |
| catch (Throwable e) |
| { |
| |
| long now = System.currentTimeMillis(); |
| System.err.println("count=" + count); |
| System.err.println("latch=" + latch.getCount()); |
| System.err.println("time=" + (now - start)); |
| System.err.println("last=" + (now - last)); |
| System.err.println("endp=" + _lastEndPoint); |
| System.err.println("conn=" + _lastEndPoint.getConnection()); |
| |
| e.printStackTrace(); |
| } |
| } |
| }.start(); |
| |
| // Write client to server |
| for (int i = 1; i < writes; i++) |
| { |
| out.write(bytes); |
| out.write(Integer.toString(i).getBytes(StandardCharsets.ISO_8859_1)); |
| out.write('\n'); |
| if (i % 1000 == 0) |
| { |
| //System.err.println(i+"/"+writes); |
| out.flush(); |
| } |
| Thread.yield(); |
| } |
| out.flush(); |
| |
| long last = latch.getCount(); |
| while (!latch.await(5, TimeUnit.SECONDS)) |
| { |
| //System.err.println(latch.getCount()); |
| if (latch.getCount() == last) |
| Assert.fail(); |
| last = latch.getCount(); |
| } |
| |
| assertEquals(0, latch.getCount()); |
| } |
| |
| @Test |
| public void testWriteBlocked() throws Exception |
| { |
| Socket client = newClient(); |
| |
| client.setSoTimeout(10000); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| // Write client to server |
| _writeCount = 10000; |
| String data = "Now is the time for all good men to come to the aid of the party"; |
| client.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8)); |
| BufferedInputStream in = new BufferedInputStream(client.getInputStream()); |
| |
| int byteNum = 0; |
| try |
| { |
| for (int i = 0; i < _writeCount; i++) |
| { |
| if (i % 1000 == 0) |
| TimeUnit.MILLISECONDS.sleep(200); |
| |
| // Verify echo server to client |
| for (int j = 0; j < data.length(); j++) |
| { |
| char c = data.charAt(j); |
| int b = in.read(); |
| byteNum++; |
| assertTrue(b > 0); |
| assertEquals("test-" + i + "/" + j,c,(char)b); |
| } |
| |
| if (i == 0) |
| _lastEndPoint.setIdleTimeout(60000); |
| } |
| } |
| catch (SocketTimeoutException e) |
| { |
| System.err.println("SelectorManager.dump() = " + _manager.dump()); |
| LOG.warn("Server: " + server); |
| LOG.warn("Error reading byte #" + byteNum,e); |
| throw e; |
| } |
| |
| client.close(); |
| |
| for (int i = 0; i < 10; ++i) |
| { |
| if (server.isOpen()) |
| Thread.sleep(10); |
| else |
| break; |
| } |
| assertFalse(server.isOpen()); |
| } |
| |
| |
| // TODO make this test reliable |
| @Test |
| @Ignore |
| public void testRejectedExecution() throws Exception |
| { |
| _manager.stop(); |
| _threadPool.stop(); |
| |
| final CountDownLatch latch = new CountDownLatch(1); |
| |
| BlockingQueue<Runnable> q = new ArrayBlockingQueue<>(4); |
| _threadPool = new QueuedThreadPool(4,4,60000,q); |
| _manager = new SelectorManager(_threadPool, _scheduler, 1) |
| { |
| @Override |
| public Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) |
| { |
| return new TestConnection(endpoint,latch); |
| } |
| |
| @Override |
| protected SelectChannelEndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException |
| { |
| SelectChannelEndPoint endp = new SelectChannelEndPoint(channel, selectSet, selectionKey, getScheduler(), 60000); |
| _lastEndPoint = endp; |
| _lastEndPointLatch.countDown(); |
| return endp; |
| } |
| }; |
| |
| _threadPool.start(); |
| _manager.start(); |
| |
| AtomicInteger timeout = new AtomicInteger(); |
| AtomicInteger rejections = new AtomicInteger(); |
| AtomicInteger echoed = new AtomicInteger(); |
| |
| CountDownLatch closed = new CountDownLatch(20); |
| for (int i=0;i<20;i++) |
| { |
| new Thread() |
| { |
| public void run() |
| { |
| try(Socket client = newClient();) |
| { |
| client.setSoTimeout(5000); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| // Write client to server |
| client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); |
| client.getOutputStream().flush(); |
| client.shutdownOutput(); |
| |
| // Verify echo server to client |
| for (char c : "HelloWorld".toCharArray()) |
| { |
| int b = client.getInputStream().read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| assertEquals(-1,client.getInputStream().read()); |
| echoed.incrementAndGet(); |
| } |
| catch(SocketTimeoutException x) |
| { |
| x.printStackTrace(); |
| timeout.incrementAndGet(); |
| } |
| catch(Throwable x) |
| { |
| rejections.incrementAndGet(); |
| } |
| finally |
| { |
| closed.countDown(); |
| } |
| } |
| }.start(); |
| } |
| |
| // unblock the handling |
| latch.countDown(); |
| |
| // wait for all clients to complete or fail |
| closed.await(); |
| |
| // assert some clients must have been rejected |
| Assert.assertThat(rejections.get(),Matchers.greaterThan(0)); |
| // but not all of them |
| Assert.assertThat(rejections.get(),Matchers.lessThan(20)); |
| // none should have timed out |
| Assert.assertThat(timeout.get(),Matchers.equalTo(0)); |
| // and the rest should have worked |
| Assert.assertThat(echoed.get(),Matchers.equalTo(20-rejections.get())); |
| |
| // and the selector is still working for new requests |
| try(Socket client = newClient();) |
| { |
| client.setSoTimeout(5000); |
| |
| SocketChannel server = _connector.accept(); |
| server.configureBlocking(false); |
| |
| _manager.accept(server); |
| |
| // Write client to server |
| client.getOutputStream().write("HelloWorld".getBytes(StandardCharsets.UTF_8)); |
| client.getOutputStream().flush(); |
| client.shutdownOutput(); |
| |
| // Verify echo server to client |
| for (char c : "HelloWorld".toCharArray()) |
| { |
| int b = client.getInputStream().read(); |
| assertTrue(b > 0); |
| assertEquals(c, (char)b); |
| } |
| assertEquals(-1,client.getInputStream().read()); |
| } |
| |
| } |
| } |