blob: 3a80593282e9ba30c748be0c385368b388762f16 [file] [log] [blame]
//
// ========================================================================
// Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.io.Closeable;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Scheduler;
/**
* <p>{@link ManagedSelector} wraps a {@link Selector} simplifying non-blocking operations on channels.</p>
* <p>{@link ManagedSelector} runs the select loop, which waits on {@link Selector#select()} until events
* happen for registered channels. When events happen, it notifies the {@link EndPoint} associated
* with the channel.</p>
*/
public class ManagedSelector extends AbstractLifeCycle implements Runnable, Dumpable
{
private static final Logger LOG = Log.getLogger(ManagedSelector.class);
private final Locker _locker = new Locker();
private boolean _selecting = false;
private final Queue<Runnable> _actions = new ArrayDeque<>();
private final SelectorManager _selectorManager;
private final int _id;
private final ExecutionStrategy _strategy;
private Selector _selector;
public ManagedSelector(SelectorManager selectorManager, int id)
{
this(selectorManager, id, ExecutionStrategy.Factory.getDefault());
}
public ManagedSelector(SelectorManager selectorManager, int id, ExecutionStrategy.Factory executionFactory)
{
_selectorManager = selectorManager;
_id = id;
_strategy = executionFactory.newExecutionStrategy(new SelectorProducer(), selectorManager.getExecutor());
setStopTimeout(5000);
}
public ExecutionStrategy getExecutionStrategy()
{
return _strategy;
}
@Override
protected void doStart() throws Exception
{
super.doStart();
_selector = newSelector();
_selectorManager.execute(this);
}
protected Selector newSelector() throws IOException
{
return Selector.open();
}
public int size()
{
Selector s = _selector;
if (s == null)
return 0;
return s.keys().size();
}
@Override
protected void doStop() throws Exception
{
if (LOG.isDebugEnabled())
LOG.debug("Stopping {}", this);
CloseEndPoints close_endps = new CloseEndPoints();
submit(close_endps);
close_endps.await(getStopTimeout());
super.doStop();
CloseSelector close_selector = new CloseSelector();
submit(close_selector);
close_selector.await(getStopTimeout());
if (LOG.isDebugEnabled())
LOG.debug("Stopped {}", this);
}
public void submit(Runnable change)
{
if (LOG.isDebugEnabled())
LOG.debug("Queued change {} on {}", change, this);
Selector selector = null;
try (Locker.Lock lock = _locker.lock())
{
_actions.offer(change);
if (_selecting)
{
selector = _selector;
// To avoid the extra select wakeup.
_selecting = false;
}
}
if (selector != null)
selector.wakeup();
}
@Override
public void run()
{
_strategy.execute();
}
/**
* A {@link SelectableEndPoint} is an {@link EndPoint} that wish to be
* notified of non-blocking events by the {@link ManagedSelector}.
*/
public interface SelectableEndPoint extends EndPoint
{
/**
* Callback method invoked when a read or write events has been
* detected by the {@link ManagedSelector} for this endpoint.
*
* @return a job that may block or null
*/
Runnable onSelected();
/**
* Callback method invoked when all the keys selected by the
* {@link ManagedSelector} for this endpoint have been processed.
*/
void updateKey();
}
private class SelectorProducer implements ExecutionStrategy.Producer
{
private Set<SelectionKey> _keys = Collections.emptySet();
private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
@Override
public Runnable produce()
{
while (true)
{
Runnable task = processSelected();
if (task != null)
return task;
Runnable action = runActions();
if (action != null)
return action;
update();
if (!select())
return null;
}
}
private Runnable runActions()
{
while (true)
{
Runnable action;
try (Locker.Lock lock = _locker.lock())
{
action = _actions.poll();
if (action == null)
{
// No more actions, so we need to select
_selecting = true;
return null;
}
}
if (action instanceof Product)
return action;
// Running the change may queue another action.
runChange(action);
}
}
private void runChange(Runnable change)
{
try
{
if (LOG.isDebugEnabled())
LOG.debug("Running change {}", change);
change.run();
}
catch (Throwable x)
{
LOG.debug("Could not run change " + change, x);
}
}
private boolean select()
{
try
{
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop waiting on select");
int selected = selector.select();
if (LOG.isDebugEnabled())
LOG.debug("Selector loop woken up from select, {}/{} selected", selected, selector.keys().size());
try (Locker.Lock lock = _locker.lock())
{
// finished selecting
_selecting = false;
}
_keys = selector.selectedKeys();
_cursor = _keys.iterator();
return true;
}
}
catch (Throwable x)
{
closeNoExceptions(_selector);
if (isRunning())
LOG.warn(x);
else
LOG.debug(x);
}
return false;
}
private Runnable processSelected()
{
while (_cursor.hasNext())
{
SelectionKey key = _cursor.next();
if (key.isValid())
{
Object attachment = key.attachment();
try
{
if (attachment instanceof SelectableEndPoint)
{
// Try to produce a task
Runnable task = ((SelectableEndPoint)attachment).onSelected();
if (task != null)
return task;
}
else if (key.isConnectable())
{
Runnable task = processConnect(key, (Connect)attachment);
if (task != null)
return task;
}
else if (key.isAcceptable())
{
processAccept(key);
}
else
{
throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
}
}
catch (CancelledKeyException x)
{
LOG.debug("Ignoring cancelled key for channel {}", key.channel());
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
catch (Throwable x)
{
LOG.warn("Could not process key for channel " + key.channel(), x);
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Selector loop ignoring invalid key for channel {}", key.channel());
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
closeNoExceptions((EndPoint)attachment);
}
}
return null;
}
private void update()
{
for (SelectionKey key : _keys)
updateKey(key);
_keys.clear();
}
private void updateKey(SelectionKey key)
{
Object attachment = key.attachment();
if (attachment instanceof SelectableEndPoint)
((SelectableEndPoint)attachment).updateKey();
}
}
private interface Product extends Runnable
{
}
private Runnable processConnect(SelectionKey key, final Connect connect)
{
SocketChannel channel = (SocketChannel)key.channel();
try
{
key.attach(connect.attachment);
boolean connected = _selectorManager.finishConnect(channel);
if (LOG.isDebugEnabled())
LOG.debug("Connected {} {}", connected, channel);
if (connected)
{
if (connect.timeout.cancel())
{
key.interestOps(0);
return new CreateEndPoint(channel, key)
{
@Override
protected void failed(Throwable failure)
{
super.failed(failure);
connect.failed(failure);
}
};
}
else
{
throw new SocketTimeoutException("Concurrent Connect Timeout");
}
}
else
{
throw new ConnectException();
}
}
catch (Throwable x)
{
connect.failed(x);
return null;
}
}
private void processAccept(SelectionKey key)
{
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel channel = null;
try
{
while ((channel = server.accept()) != null)
{
_selectorManager.accepted(channel);
}
}
catch (Throwable x)
{
closeNoExceptions(channel);
LOG.warn("Accept failed for channel " + channel, x);
}
}
private void closeNoExceptions(Closeable closeable)
{
try
{
if (closeable != null)
closeable.close();
}
catch (Throwable x)
{
LOG.ignore(x);
}
}
private EndPoint createEndPoint(SocketChannel channel, SelectionKey selectionKey) throws IOException
{
EndPoint endPoint = _selectorManager.newEndPoint(channel, this, selectionKey);
_selectorManager.endPointOpened(endPoint);
Connection connection = _selectorManager.newConnection(channel, endPoint, selectionKey.attachment());
endPoint.setConnection(connection);
selectionKey.attach(endPoint);
_selectorManager.connectionOpened(connection);
if (LOG.isDebugEnabled())
LOG.debug("Created {}", endPoint);
return endPoint;
}
public void destroyEndPoint(final EndPoint endPoint)
{
final Connection connection = endPoint.getConnection();
submit(new Product()
{
@Override
public void run()
{
if (LOG.isDebugEnabled())
LOG.debug("Destroyed {}", endPoint);
if (connection != null)
_selectorManager.connectionClosed(connection);
_selectorManager.endPointClosed(endPoint);
}
});
}
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
out.append(String.valueOf(this)).append(" id=").append(String.valueOf(_id)).append(System.lineSeparator());
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
final ArrayList<Object> dump = new ArrayList<>(selector.keys().size() * 2);
DumpKeys dumpKeys = new DumpKeys(dump);
submit(dumpKeys);
dumpKeys.await(5, TimeUnit.SECONDS);
ContainerLifeCycle.dump(out, indent, dump);
}
}
@Override
public String toString()
{
Selector selector = _selector;
return String.format("%s id=%s keys=%d selected=%d",
super.toString(),
_id,
selector != null && selector.isOpen() ? selector.keys().size() : -1,
selector != null && selector.isOpen() ? selector.selectedKeys().size() : -1);
}
private class DumpKeys implements Runnable
{
private final CountDownLatch latch = new CountDownLatch(1);
private final List<Object> _dumps;
private DumpKeys(List<Object> dumps)
{
this._dumps = dumps;
}
@Override
public void run()
{
Selector selector = _selector;
if (selector != null && selector.isOpen())
{
Set<SelectionKey> keys = selector.keys();
_dumps.add(selector + " keys=" + keys.size());
for (SelectionKey key : keys)
{
try
{
_dumps.add(String.format("SelectionKey@%x{i=%d}->%s", key.hashCode(), key.interestOps(), key.attachment()));
}
catch (Throwable x)
{
LOG.ignore(x);
}
}
}
latch.countDown();
}
public boolean await(long timeout, TimeUnit unit)
{
try
{
return latch.await(timeout, unit);
}
catch (InterruptedException x)
{
return false;
}
}
}
class Acceptor implements Runnable
{
private final ServerSocketChannel _channel;
public Acceptor(ServerSocketChannel channel)
{
this._channel = channel;
}
@Override
public void run()
{
try
{
SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
if (LOG.isDebugEnabled())
LOG.debug("{} acceptor={}", this, key);
}
catch (Throwable x)
{
closeNoExceptions(_channel);
LOG.warn(x);
}
}
}
class Accept implements Runnable, Closeable
{
private final SocketChannel channel;
private final Object attachment;
Accept(SocketChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
}
@Override
public void close()
{
LOG.debug("closed accept of {}", channel);
closeNoExceptions(channel);
}
@Override
public void run()
{
try
{
final SelectionKey key = channel.register(_selector, 0, attachment);
submit(new CreateEndPoint(channel, key));
}
catch (Throwable x)
{
closeNoExceptions(channel);
LOG.debug(x);
}
}
}
private class CreateEndPoint implements Product, Closeable
{
private final SocketChannel channel;
private final SelectionKey key;
public CreateEndPoint(SocketChannel channel, SelectionKey key)
{
this.channel = channel;
this.key = key;
}
@Override
public void run()
{
try
{
createEndPoint(channel, key);
}
catch (Throwable x)
{
LOG.debug(x);
failed(x);
}
}
@Override
public void close()
{
LOG.debug("closed creation of {}", channel);
closeNoExceptions(channel);
}
protected void failed(Throwable failure)
{
closeNoExceptions(channel);
LOG.debug(failure);
}
}
class Connect implements Runnable
{
private final AtomicBoolean failed = new AtomicBoolean();
private final SocketChannel channel;
private final Object attachment;
private final Scheduler.Task timeout;
Connect(SocketChannel channel, Object attachment)
{
this.channel = channel;
this.attachment = attachment;
this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public void run()
{
try
{
channel.register(_selector, SelectionKey.OP_CONNECT, this);
}
catch (Throwable x)
{
failed(x);
}
}
private void failed(Throwable failure)
{
if (failed.compareAndSet(false, true))
{
timeout.cancel();
closeNoExceptions(channel);
ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
}
}
}
private class ConnectTimeout implements Runnable
{
private final Connect connect;
private ConnectTimeout(Connect connect)
{
this.connect = connect;
}
@Override
public void run()
{
SocketChannel channel = connect.channel;
if (channel.isConnectionPending())
{
if (LOG.isDebugEnabled())
LOG.debug("Channel {} timed out while connecting, closing it", channel);
connect.failed(new SocketTimeoutException("Connect Timeout"));
}
}
}
private class CloseEndPoints implements Runnable
{
private final CountDownLatch _latch = new CountDownLatch(1);
private CountDownLatch _allClosed;
@Override
public void run()
{
List<EndPoint> end_points = new ArrayList<>();
for (SelectionKey key : _selector.keys())
{
if (key.isValid())
{
Object attachment = key.attachment();
if (attachment instanceof EndPoint)
end_points.add((EndPoint)attachment);
}
}
int size = end_points.size();
if (LOG.isDebugEnabled())
LOG.debug("Closing {} endPoints on {}", size, ManagedSelector.this);
_allClosed = new CountDownLatch(size);
_latch.countDown();
for (EndPoint endp : end_points)
submit(new EndPointCloser(endp, _allClosed));
if (LOG.isDebugEnabled())
LOG.debug("Closed {} endPoints on {}", size, ManagedSelector.this);
}
public boolean await(long timeout)
{
try
{
return _latch.await(timeout, TimeUnit.MILLISECONDS) &&
_allClosed.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
return false;
}
}
}
private class EndPointCloser implements Product
{
private final EndPoint _endPoint;
private final CountDownLatch _latch;
private EndPointCloser(EndPoint endPoint, CountDownLatch latch)
{
_endPoint = endPoint;
_latch = latch;
}
@Override
public void run()
{
closeNoExceptions(_endPoint.getConnection());
_latch.countDown();
}
}
private class CloseSelector implements Runnable
{
private CountDownLatch _latch = new CountDownLatch(1);
@Override
public void run()
{
Selector selector = _selector;
_selector = null;
closeNoExceptions(selector);
_latch.countDown();
}
public boolean await(long timeout)
{
try
{
return _latch.await(timeout, TimeUnit.MILLISECONDS);
}
catch (InterruptedException x)
{
return false;
}
}
}
}