//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.io;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;

import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Scheduler;

/**
 * <p>{@link SelectorManager} manages a number of {@link ManagedSelector}s that
 * simplify the non-blocking primitives provided by the JVM via the {@code java.nio} package.</p>
 * <p>{@link SelectorManager} subclasses implement methods to return protocol-specific
 * {@link EndPoint}s and {@link Connection}s.</p>
 */
public abstract class SelectorManager extends ContainerLifeCycle implements Dumpable
{
    public static final int DEFAULT_CONNECT_TIMEOUT = 15000;
    protected static final Logger LOG = Log.getLogger(SelectorManager.class);

    private final Executor executor;
    private final Scheduler scheduler;
    private final ManagedSelector[] _selectors;
    private long _connectTimeout = DEFAULT_CONNECT_TIMEOUT;
    private ExecutionStrategy.Factory _executionFactory = ExecutionStrategy.Factory.getDefault();
    private long _selectorIndex;

    protected SelectorManager(Executor executor, Scheduler scheduler)
    {
        this(executor, scheduler, (Runtime.getRuntime().availableProcessors() + 1) / 2);
    }

    protected SelectorManager(Executor executor, Scheduler scheduler, int selectors)
    {
        if (selectors <= 0)
            throw new IllegalArgumentException("No selectors");
        this.executor = executor;
        this.scheduler = scheduler;
        _selectors = new ManagedSelector[selectors];
    }

    public Executor getExecutor()
    {
        return executor;
    }

    public Scheduler getScheduler()
    {
        return scheduler;
    }

    /**
     * Get the connect timeout
     *
     * @return the connect timeout (in milliseconds)
     */
    public long getConnectTimeout()
    {
        return _connectTimeout;
    }

    /**
     * Set the connect timeout (in milliseconds)
     *
     * @param milliseconds the number of milliseconds for the timeout
     */
    public void setConnectTimeout(long milliseconds)
    {
        _connectTimeout = milliseconds;
    }

    /**
     * @return the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector}
     */
    public ExecutionStrategy.Factory getExecutionStrategyFactory()
    {
        return _executionFactory;
    }

    /**
     * @param _executionFactory the {@link ExecutionStrategy.Factory} used by {@link ManagedSelector}
     */
    public void setExecutionStrategyFactory(ExecutionStrategy.Factory _executionFactory)
    {
        if (isRunning())
            throw new IllegalStateException("Cannot change " + ExecutionStrategy.Factory.class.getSimpleName() + " after start()");
        this._executionFactory = _executionFactory;
    }

    /**
     * @return the selector priority delta
     * @deprecated not implemented
     */
    @Deprecated
    public int getSelectorPriorityDelta()
    {
        return 0;
    }

    /**
     * @param selectorPriorityDelta the selector priority delta
     * @deprecated not implemented
     */
    @Deprecated
    public void setSelectorPriorityDelta(int selectorPriorityDelta)
    {
    }

    /**
     * Executes the given task in a different thread.
     *
     * @param task the task to execute
     */
    protected void execute(Runnable task)
    {
        executor.execute(task);
    }

    /**
     * @return the number of selectors in use
     */
    public int getSelectorCount()
    {
        return _selectors.length;
    }

    private ManagedSelector chooseSelector(SocketChannel channel)
    {
        // Ideally we would like to have all connections from the same client end
        // up on the same selector (to try to avoid smearing the data from a single
        // client over all cores), but because of proxies, the remote address may not
        // really be the client - so we have to hedge our bets to ensure that all
        // channels don't end up on the one selector for a proxy.
        ManagedSelector candidate1 = null;
        if (channel != null)
        {
            try
            {
                SocketAddress remote = channel.getRemoteAddress();
                if (remote instanceof InetSocketAddress)
                {
                    byte[] addr = ((InetSocketAddress)remote).getAddress().getAddress();
                    if (addr != null)
                    {
                        int s = addr[addr.length - 1] & 0xFF;
                        candidate1 = _selectors[s % getSelectorCount()];
                    }
                }
            }
            catch (IOException x)
            {
                LOG.ignore(x);
            }
        }

        // The ++ increment here is not atomic, but it does not matter,
        // so long as the value changes sometimes, then connections will
        // be distributed over the available selectors.
        long s = _selectorIndex++;
        int index = (int)(s % getSelectorCount());
        ManagedSelector candidate2 = _selectors[index];

        if (candidate1 == null || candidate1.size() >= candidate2.size() * 2)
            return candidate2;
        return candidate1;
    }

    /**
     * <p>Registers a channel to perform a non-blocking connect.</p>
     * <p>The channel must be set in non-blocking mode, {@link SocketChannel#connect(SocketAddress)}
     * must be called prior to calling this method, and the connect operation must not be completed
     * (the return value of {@link SocketChannel#connect(SocketAddress)} must be false).</p>
     *
     * @param channel    the channel to register
     * @param attachment the attachment object
     * @see #accept(SocketChannel, Object)
     */
    public void connect(SocketChannel channel, Object attachment)
    {
        ManagedSelector set = chooseSelector(channel);
        set.submit(set.new Connect(channel, attachment));
    }

    /**
     * @param channel the channel to accept
     * @see #accept(SocketChannel, Object)
     */
    public void accept(SocketChannel channel)
    {
        accept(channel, null);
    }

    /**
     * <p>Registers a channel to perform non-blocking read/write operations.</p>
     * <p>This method is called just after a channel has been accepted by {@link ServerSocketChannel#accept()},
     * or just after having performed a blocking connect via {@link Socket#connect(SocketAddress, int)}, or
     * just after a non-blocking connect via {@link SocketChannel#connect(SocketAddress)} that completed
     * successfully.</p>
     *
     * @param channel    the channel to register
     * @param attachment the attachment object
     */
    public void accept(SocketChannel channel, Object attachment)
    {
        final ManagedSelector selector = chooseSelector(channel);
        selector.submit(selector.new Accept(channel, attachment));
    }

    /**
     * <p>Registers a server channel for accept operations.
     * When a {@link SocketChannel} is accepted from the given {@link ServerSocketChannel}
     * then the {@link #accepted(SocketChannel)} method is called, which must be
     * overridden by a derivation of this class to handle the accepted channel
     *
     * @param server the server channel to register
     */
    public void acceptor(ServerSocketChannel server)
    {
        final ManagedSelector selector = chooseSelector(null);
        selector.submit(selector.new Acceptor(server));
    }

    /**
     * Callback method when a channel is accepted from the {@link ServerSocketChannel}
     * passed to {@link #acceptor(ServerSocketChannel)}.
     * The default impl throws an {@link UnsupportedOperationException}, so it must
     * be overridden by subclasses if a server channel is provided.
     *
     * @param channel the
     * @throws IOException if unable to accept channel
     */
    protected void accepted(SocketChannel channel) throws IOException
    {
        throw new UnsupportedOperationException();
    }

    @Override
    protected void doStart() throws Exception
    {
        for (int i = 0; i < _selectors.length; i++)
        {
            ManagedSelector selector = newSelector(i);
            _selectors[i] = selector;
            addBean(selector);
        }
        super.doStart();
    }

    /**
     * <p>Factory method for {@link ManagedSelector}.</p>
     *
     * @param id an identifier for the {@link ManagedSelector to create}
     * @return a new {@link ManagedSelector}
     */
    protected ManagedSelector newSelector(int id)
    {
        return new ManagedSelector(this, id, getExecutionStrategyFactory());
    }

    @Override
    protected void doStop() throws Exception
    {
        super.doStop();
        for (ManagedSelector selector : _selectors)
            removeBean(selector);
    }

    /**
     * <p>Callback method invoked when an endpoint is opened.</p>
     *
     * @param endpoint the endpoint being opened
     */
    protected void endPointOpened(EndPoint endpoint)
    {
        endpoint.onOpen();
    }

    /**
     * <p>Callback method invoked when an endpoint is closed.</p>
     *
     * @param endpoint the endpoint being closed
     */
    protected void endPointClosed(EndPoint endpoint)
    {
        endpoint.onClose();
    }

    /**
     * <p>Callback method invoked when a connection is opened.</p>
     *
     * @param connection the connection just opened
     */
    public void connectionOpened(Connection connection)
    {
        try
        {
            connection.onOpen();
        }
        catch (Throwable x)
        {
            if (isRunning())
                LOG.warn("Exception while notifying connection " + connection, x);
            else
                LOG.debug("Exception while notifying connection " + connection, x);
            throw x;
        }
    }

    /**
     * <p>Callback method invoked when a connection is closed.</p>
     *
     * @param connection the connection just closed
     */
    public void connectionClosed(Connection connection)
    {
        try
        {
            connection.onClose();
        }
        catch (Throwable x)
        {
            LOG.debug("Exception while notifying connection " + connection, x);
        }
    }

    protected boolean finishConnect(SocketChannel channel) throws IOException
    {
        return channel.finishConnect();
    }

    /**
     * <p>Callback method invoked when a non-blocking connect cannot be completed.</p>
     * <p>By default it just logs with level warning.</p>
     *
     * @param channel    the channel that attempted the connect
     * @param ex         the exception that caused the connect to fail
     * @param attachment the attachment object associated at registration
     */
    protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
    {
        LOG.warn(String.format("%s - %s", channel, attachment), ex);
    }

    /**
     * <p>Factory method to create {@link EndPoint}.</p>
     * <p>This method is invoked as a result of the registration of a channel via {@link #connect(SocketChannel, Object)}
     * or {@link #accept(SocketChannel)}.</p>
     *
     * @param channel      the channel associated to the endpoint
     * @param selector     the selector the channel is registered to
     * @param selectionKey the selection key
     * @return a new endpoint
     * @throws IOException if the endPoint cannot be created
     * @see #newConnection(SocketChannel, EndPoint, Object)
     */
    protected abstract EndPoint newEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey selectionKey) throws IOException;

    /**
     * <p>Factory method to create {@link Connection}.</p>
     *
     * @param channel    the channel associated to the connection
     * @param endpoint   the endpoint
     * @param attachment the attachment
     * @return a new connection
     * @throws IOException if unable to create new connection
     * @see #newEndPoint(SocketChannel, ManagedSelector, SelectionKey)
     */
    public abstract Connection newConnection(SocketChannel channel, EndPoint endpoint, Object attachment) throws IOException;
}
