//
//  ========================================================================
//  Copyright (c) 1995-2017 Mort Bay Consulting Pty. Ltd.
//  ------------------------------------------------------------------------
//  All rights reserved. This program and the accompanying materials
//  are made available under the terms of the Eclipse Public License v1.0
//  and Apache License v2.0 which accompanies this distribution.
//
//      The Eclipse Public License is available at
//      http://www.eclipse.org/legal/epl-v10.html
//
//      The Apache License v2.0 is available at
//      http://www.opensource.org/licenses/apache2.0.php
//
//  You may elect to redistribute this code under either of these licenses.
//  ========================================================================
//

package org.eclipse.jetty.util.thread.strategy;

import java.io.Closeable;
import java.util.concurrent.Executor;

import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutionStrategy;
import org.eclipse.jetty.util.thread.Locker;
import org.eclipse.jetty.util.thread.Locker.Lock;
import org.eclipse.jetty.util.thread.ThreadPool;

/**
 * <p>A strategy where the thread that produces will always run the resulting task.</p>
 * <p>The strategy may then dispatch another thread to continue production.</p>
 * <p>The strategy is also known by the nickname 'eat what you kill', which comes from
 * the hunting ethic that says a person should not kill anything he or she does not
 * plan on eating. In this case, the phrase is used to mean that a thread should
 * not produce a task that it does not intend to run. By making producers run the
 * task that they have just produced avoids execution delays and avoids parallel slow
 * down by running the task in the same core, with good chances of having a hot CPU
 * cache. It also avoids the creation of a queue of produced tasks that the system
 * does not yet have capacity to consume, which can save memory and exert back
 * pressure on producers.</p>
 */
public class ExecuteProduceConsume extends ExecutingExecutionStrategy implements ExecutionStrategy, Runnable
{
    private static final Logger LOG = Log.getLogger(ExecuteProduceConsume.class);

    private final Locker _locker = new Locker();
    private final Runnable _runExecute = new RunExecute();
    private final Producer _producer;
    private final ThreadPool _threadPool;
    private boolean _idle = true;
    private boolean _execute;
    private boolean _producing;
    private boolean _pending;
    private boolean _lowThreads;

    public ExecuteProduceConsume(Producer producer, Executor executor)
    {
        super(executor);
        this._producer = producer;
        _threadPool = executor instanceof ThreadPool ? (ThreadPool)executor : null;
    }

    @Deprecated
    public ExecuteProduceConsume(Producer producer, Executor executor, ExecutionStrategy lowResourceStrategy)
    {
        this(producer, executor);
    }

    @Override
    public void execute()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} execute", this);

        boolean produce = false;
        try (Lock locked = _locker.lock())
        {
            // If we are idle and a thread is not producing
            if (_idle)
            {
                if (_producing)
                    throw new IllegalStateException();

                // Then this thread will do the producing
                produce = _producing = true;
                // and we are no longer idle
                _idle = false;
            }
            else
            {
                // Otherwise, lets tell the producing thread
                // that it should call produce again before going idle
                _execute = true;
            }
        }

        if (produce)
            produceConsume();
    }

    @Override
    public void dispatch()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} spawning", this);
        boolean dispatch = false;
        try (Lock locked = _locker.lock())
        {
            if (_idle)
                dispatch = true;
            else
                _execute = true;
        }
        if (dispatch)
            execute(_runExecute);
    }

    @Override
    public void run()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} run", this);
        boolean produce = false;
        try (Lock locked = _locker.lock())
        {
            _pending = false;
            if (!_idle && !_producing)
            {
                produce = _producing = true;
            }
        }

        if (produce)
            produceConsume();
    }

    private void produceConsume()
    {
        if (_threadPool != null && _threadPool.isLowOnThreads())
        {
            // If we are low on threads we must not produce and consume
            // in the same thread, but produce and execute to consume.
            if (!produceExecuteConsume())
                return;
        }
        executeProduceConsume();
    }

    public boolean isLowOnThreads()
    {
        return _lowThreads;
    }

    /**
     * @return true if we are still producing
     */
    private boolean produceExecuteConsume()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} enter low threads mode", this);
        _lowThreads = true;
        try
        {
            boolean idle = false;
            while (_threadPool.isLowOnThreads())
            {
                Runnable task = _producer.produce();
                if (LOG.isDebugEnabled())
                    LOG.debug("{} produced {}", _producer, task);

                if (task == null)
                {
                    // No task, so we are now idle
                    try (Lock locked = _locker.lock())
                    {
                        if (_execute)
                        {
                            _execute = false;
                            _producing = true;
                            _idle = false;
                            continue;
                        }

                        _producing = false;
                        idle = _idle = true;
                        break;
                    }
                }

                // Execute the task.
                executeProduct(task);
            }
            return !idle;
        }
        finally
        {
            _lowThreads = false;
            if (LOG.isDebugEnabled())
                LOG.debug("{} exit low threads mode", this);
        }
    }

    /**
     * <p>Only called when in {@link #isLowOnThreads() low threads mode}
     * to execute the task produced by the producer.</p>
     * <p>Because </p>
     * <p>If the task implements {@link Rejectable}, then {@link Rejectable#reject()}
     * is immediately called on the task object. If the task also implements
     * {@link Closeable}, then {@link Closeable#close()} is called on the task object.</p>
     * <p>If the task does not implement {@link Rejectable}, then it is
     * {@link #execute(Runnable) executed}.</p>
     *
     * @param task the produced task to execute
     */
    protected void executeProduct(Runnable task)
    {
        if (task instanceof Rejectable)
        {
            try
            {
                ((Rejectable)task).reject();
                if (task instanceof Closeable)
                    ((Closeable)task).close();
            }
            catch (Throwable x)
            {
                LOG.debug(x);
            }
        }
        else
        {
            execute(task);
        }
    }

    private void executeProduceConsume()
    {
        if (LOG.isDebugEnabled())
            LOG.debug("{} produce enter", this);

        while (true)
        {
            // If we got here, then we are the thread that is producing.
            if (LOG.isDebugEnabled())
                LOG.debug("{} producing", this);

            Runnable task = _producer.produce();

            if (LOG.isDebugEnabled())
                LOG.debug("{} produced {}", this, task);

            boolean dispatch = false;
            try (Lock locked = _locker.lock())
            {
                // Finished producing
                _producing = false;

                // Did we produced a task?
                if (task == null)
                {
                    // There is no task.
                    // Could another one just have been queued with an execute?
                    if (_execute)
                    {
                        _idle = false;
                        _producing = true;
                        _execute = false;
                        continue;
                    }

                    // ... and no additional calls to execute, so we are idle
                    _idle = true;
                    break;
                }

                // We have a task, which we will run ourselves,
                // so if we don't have another thread pending
                if (!_pending)
                {
                    // dispatch one
                    dispatch = _pending = true;
                }

                _execute = false;
            }

            // If we became pending
            if (dispatch)
            {
                // Spawn a new thread to continue production by running the produce loop.
                if (LOG.isDebugEnabled())
                    LOG.debug("{} dispatch", this);
                if (!execute(this))
                    task = null;
            }

            // Run the task.
            if (LOG.isDebugEnabled())
                LOG.debug("{} run {}", this, task);
            if (task != null)
                task.run();
            if (LOG.isDebugEnabled())
                LOG.debug("{} ran {}", this, task);

            // Once we have run the task, we can try producing again.
            try (Lock locked = _locker.lock())
            {
                // Is another thread already producing or we are now idle?
                if (_producing || _idle)
                    break;
                _producing = true;
            }
        }

        if (LOG.isDebugEnabled())
            LOG.debug("{} produce exit", this);
    }

    public Boolean isIdle()
    {
        try (Lock locked = _locker.lock())
        {
            return _idle;
        }
    }

    public String toString()
    {
        StringBuilder builder = new StringBuilder();
        builder.append("EPC ");
        try (Lock locked = _locker.lock())
        {
            builder.append(_idle ? "Idle/" : "");
            builder.append(_producing ? "Prod/" : "");
            builder.append(_pending ? "Pend/" : "");
            builder.append(_execute ? "Exec/" : "");
        }
        builder.append(_producer);
        return builder.toString();
    }

    private class RunExecute implements Runnable
    {
        @Override
        public void run()
        {
            execute();
        }
    }

    public static class Factory implements ExecutionStrategy.Factory
    {
        @Override
        public ExecutionStrategy newExecutionStrategy(Producer producer, Executor executor)
        {
            return new ExecuteProduceConsume(producer, executor);
        }
    }
}
