| // |
| // ======================================================================== |
| // Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd. |
| // ------------------------------------------------------------------------ |
| // All rights reserved. This program and the accompanying materials |
| // are made available under the terms of the Eclipse Public License v1.0 |
| // and Apache License v2.0 which accompanies this distribution. |
| // |
| // The Eclipse Public License is available at |
| // http://www.eclipse.org/legal/epl-v10.html |
| // |
| // The Apache License v2.0 is available at |
| // http://www.opensource.org/licenses/apache2.0.php |
| // |
| // You may elect to redistribute this code under either of these licenses. |
| // ======================================================================== |
| // |
| |
| package org.eclipse.jetty.io; |
| |
| import java.io.IOException; |
| import java.net.InetSocketAddress; |
| import java.net.Socket; |
| import java.net.SocketAddress; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.ServerSocketChannel; |
| import java.nio.channels.SocketChannel; |
| import java.util.concurrent.Executor; |
| |
| import org.eclipse.jetty.util.component.ContainerLifeCycle; |
| import org.eclipse.jetty.util.component.Dumpable; |
| import org.eclipse.jetty.util.log.Log; |
| import org.eclipse.jetty.util.log.Logger; |
| import org.eclipse.jetty.util.thread.ExecutionStrategy; |
| import org.eclipse.jetty.util.thread.Scheduler; |
| |
| /** |
| * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that |
| * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p> |
| * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific |
| * {@link EndPoint}s and {@link Connection}s.</p> |
| */ |
| public abstract class SelectorManager extends ContainerLifeCycle implements Dumpable |
| { |
| public static final int DEFAULT_CONNECT_TIMEOUT = 15000; |
| protected static final Logger LOG = Log.getLogger(SelectorManager.class); |
| |
| private final Executor executor; |
| private final Scheduler scheduler; |
| private final ManagedSelector[] _selectors; |
| private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT; |
| private ExecutionStrategy.Factory _executionFactory = ExecutionStrategy.Factory.getDefault(); |
| private long _selectorIndex; |
| |
| protected SelectorManager(Executor executor, Scheduler scheduler) |
| { |
| this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2); |
| } |
| |
| protected SelectorManager(Executor executor, Scheduler scheduler, int selectors) |
| { |
| if (selectors <= 0) |
| throw new IllegalArgumentException("No selectors"); |
| this.executor = executor; |
| this.scheduler = scheduler; |
| _selectors = new ManagedSelector[selectors]; |
| } |
| |
| public Executor getExecutor() |
| { |
| return executor; |
| } |
| |
| public Scheduler getScheduler() |
| { |
| return scheduler; |
| } |
| |
| /** |
| * Get the connect timeout |
| * |
| * @return the connect timeout (in milliseconds) |
| */ |
| public long getConnectTimeout() |
| { |
| return _connectTimeout; |
| } |
| |
| /** |
| * Set the connect timeout (in milliseconds) |
| * |
| * @param milliseconds the number of milliseconds for the timeout |
| */ |
| public void setConnectTimeout(long milliseconds) |
| { |
| _connectTimeout = milliseconds; |
| } |
| |
| /** |
| * @return the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector} |
| */ |
| public ExecutionStrategy.Factory getExecutionStrategyFactory() |
| { |
| return _executionFactory; |
| } |
| |
| /** |
| * @param _executionFactory the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector} |
| */ |
| public void setExecutionStrategyFactory(ExecutionStrategy.Factory _executionFactory) |
| { |
| if (isRunning()) |
| throw new IllegalStateException("Cannot change " + ExecutionStrategy.Factory.class.getSimpleName() + " after start()"); |
| this._executionFactory = _executionFactory; |
| } |
| |
| /** |
| * @return the selector priority delta |
| * @deprecated not implemented |
| */ |
| @Deprecated |
| public int getSelectorPriorityDelta() |
| { |
| return 0; |
| } |
| |
| /** |
| * @param selectorPriorityDelta the selector priority delta |
| * @deprecated not implemented |
| */ |
| @Deprecated |
| public void setSelectorPriorityDelta(int selectorPriorityDelta) |
| { |
| } |
| |
| /** |
| * Executes the given task in a different thread. |
| * |
| * @param task the task to execute |
| */ |
| protected void execute(Runnable task) |
| { |
| executor.execute(task); |
| } |
| |
| /** |
| * @return the number of selectors in use |
| */ |
| public int getSelectorCount() |
| { |
| return _selectors.length; |
| } |
| |
| private ManagedSelector chooseSelector(SocketChannel channel) |
| { |
| // Ideally we would like to have all connections from the same client end |
| // up on the same selector (to try to avoid smearing the data from a single |
| // client over all cores), but because of proxies, the remote address may not |
| // really be the client - so we have to hedge our bets to ensure that all |
| // channels don't end up on the one selector for a proxy. |
| ManagedSelector candidate1 = null; |
| if (channel != null) |
| { |
| try |
| { |
| SocketAddress remote = channel.getRemoteAddress(); |
| if (remote instanceof InetSocketAddress) |
| { |
| byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress(); |
| if (addr != null) |
| { |
| int s = addr[addr.length - 1] & 0xFF; |
| candidate1 = _selectors[s % getSelectorCount()]; |
| } |
| } |
| } |
| catch (IOException x) |
| { |
| LOG.ignore(x); |
| } |
| } |
| |
| // The ++ increment here is not atomic, but it does not matter, |
| // so long as the value changes sometimes, then connections will |
| // be distributed over the available selectors. |
| long s = _selectorIndex++; |
| int index = (int)(s % getSelectorCount()); |
| ManagedSelector candidate2 = _selectors[index]; |
| |
| if (candidate1 == null || candidate1.size() >= candidate2.size() * 2) |
| return candidate2; |
| return candidate1; |
| } |
| |
| /** |
| * <p>Registers a channel to perform a non-blocking connect.</p> |
| * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)} |
| * must be called prior to calling this method, and the connect operation must not be completed |
| * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p> |
| * |
| * @param channel the channel to register |
| * @param attachment the attachment object |
| * @see #accept(SocketChannel, Object) |
| */ |
| public void connect(SocketChannel channel, Object attachment) |
| { |
| ManagedSelector set = chooseSelector(channel); |
| set.submit(set.new Connect(channel, attachment)); |
| } |
| |
| /** |
| * @param channel the channel to accept |
| * @see #accept(SocketChannel, Object) |
| */ |
| public void accept(SocketChannel channel) |
| { |
| accept(channel, null); |
| } |
| |
| /** |
| * <p>Registers a channel to perform non-blocking read/write operations.</p> |
| * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()}, |
| * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or |
| * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed |
| * successfully.</p> |
| * |
| * @param channel the channel to register |
| * @param attachment the attachment object |
| */ |
| public void accept(SocketChannel channel, Object attachment) |
| { |
| final ManagedSelector selector = chooseSelector(channel); |
| selector.submit(selector.new Accept(channel, attachment)); |
| } |
| |
| /** |
| * <p>Registers a server channel for accept operations. |
| * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel} |
| * then the {@link #accepted(SocketChannel)} method is called, which must be |
| * overridden by a derivation of this class to handle the accepted channel |
| * |
| * @param server the server channel to register |
| */ |
| public void acceptor(ServerSocketChannel server) |
| { |
| final ManagedSelector selector = chooseSelector(null); |
| selector.submit(selector.new Acceptor(server)); |
| } |
| |
| /** |
| * Callback method when a channel is accepted from the {@link ServerSocketChannel} |
| * passed to {@link #acceptor(ServerSocketChannel)}. |
| * The default impl throws an {@link UnsupportedOperationException}, so it must |
| * be overridden by subclasses if a server channel is provided. |
| * |
| * @param channel the |
| * @throws IOException if unable to accept channel |
| */ |
| protected void accepted(SocketChannel channel) throws IOException |
| { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| protected void doStart() throws Exception |
| { |
| for (int i = 0; i < _selectors.length; i++) |
| { |
| ManagedSelector selector = newSelector(i); |
| _selectors[i] = selector; |
| addBean(selector); |
| } |
| super.doStart(); |
| } |
| |
| /** |
| * <p>Factory method for {@link ManagedSelector}.</p> |
| * |
| * @param id an identifier for the {@link ManagedSelector to create} |
| * @return a new {@link ManagedSelector} |
| */ |
| protected ManagedSelector newSelector(int id) |
| { |
| return new ManagedSelector(this, id, getExecutionStrategyFactory()); |
| } |
| |
| @Override |
| protected void doStop() throws Exception |
| { |
| super.doStop(); |
| for (ManagedSelector selector : _selectors) |
| removeBean(selector); |
| } |
| |
| /** |
| * <p>Callback method invoked when an endpoint is opened.</p> |
| * |
| * @param endpoint the endpoint being opened |
| */ |
| protected void endPointOpened(EndPoint endpoint) |
| { |
| endpoint.onOpen(); |
| } |
| |
| /** |
| * <p>Callback method invoked when an endpoint is closed.</p> |
| * |
| * @param endpoint the endpoint being closed |
| */ |
| protected void endPointClosed(EndPoint endpoint) |
| { |
| endpoint.onClose(); |
| } |
| |
| /** |
| * <p>Callback method invoked when a connection is opened.</p> |
| * |
| * @param connection the connection just opened |
| */ |
| public void connectionOpened(Connection connection) |
| { |
| try |
| { |
| connection.onOpen(); |
| } |
| catch (Throwable x) |
| { |
| if (isRunning()) |
| LOG.warn("Exception while notifying connection " + connection, x); |
| else |
| LOG.debug("Exception while notifying connection " + connection, x); |
| throw x; |
| } |
| } |
| |
| /** |
| * <p>Callback method invoked when a connection is closed.</p> |
| * |
| * @param connection the connection just closed |
| */ |
| public void connectionClosed(Connection connection) |
| { |
| try |
| { |
| connection.onClose(); |
| } |
| catch (Throwable x) |
| { |
| LOG.debug("Exception while notifying connection " + connection, x); |
| } |
| } |
| |
| protected boolean finishConnect(SocketChannel channel) throws IOException |
| { |
| return channel.finishConnect(); |
| } |
| |
| /** |
| * <p>Callback method invoked when a non-blocking connect cannot be completed.</p> |
| * <p>By default it just logs with level warning.</p> |
| * |
| * @param channel the channel that attempted the connect |
| * @param ex the exception that caused the connect to fail |
| * @param attachment the attachment object associated at registration |
| */ |
| protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment) |
| { |
| LOG.warn(String.format("%s - %s", channel, attachment), ex); |
| } |
| |
| /** |
| * <p>Factory method to create {@link EndPoint}.</p> |
| * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)} |
| * or {@link #accept(SocketChannel)}.</p> |
| * |
| * @param channel the channel associated to the endpoint |
| * @param selector the selector the channel is registered to |
| * @param selectionKey the selection key |
| * @return a new endpoint |
| * @throws IOException if the endPoint cannot be created |
| * @see #newConnection(SocketChannel, EndPoint, Object) |
| */ |
| protected abstract EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException; |
| |
| /** |
| * <p>Factory method to create {@link Connection}.</p> |
| * |
| * @param channel the channel associated to the connection |
| * @param endpoint the endpoint |
| * @param attachment the attachment |
| * @return a new connection |
| * @throws IOException if unable to create new connection |
| * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey) |
| */ |
| public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException; |
| } |