/*
 * Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved.
 *
 * This program and the accompanying materials are made available under the
 * terms of the Eclipse Public License v. 2.0 which is available at
 * http://www.eclipse.org/legal/epl-2.0,
 * or the Eclipse Distribution License v. 1.0 which is available at
 * http://www.eclipse.org/org/documents/edl-v10.php.
 *
 * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
 */

// Contributors:
//     Oracle - initial API and implementation from Oracle TopLink
package org.eclipse.persistence.internal.sessions.coordination.broadcast;

import org.eclipse.persistence.internal.helper.Helper;
import org.eclipse.persistence.internal.sessions.coordination.RemoteConnection;
import org.eclipse.persistence.sessions.coordination.broadcast.BroadcastTransportManager;
import org.eclipse.persistence.sessions.coordination.Command;
import org.eclipse.persistence.sessions.coordination.RemoteCommandManager;
import org.eclipse.persistence.exceptions.CommunicationException;
import org.eclipse.persistence.exceptions.RemoteCommandManagerException;

/**
 * <p>
 * <b>Purpose</b>: Base class extending RemoteConnection for broadcasting RCM protocols: JMS and Oc4jJGroups.
 * </p><p>
 * <b>Description</b>: Defines lifecycle states and most of the methods,
 * as well as exception processing and info logging.
 * </p>
 * @author Andrei Ilitchev
 * @since OracleAS TopLink 11<i>g</i> (11.1.1)
 */
public abstract class BroadcastRemoteConnection extends RemoteConnection {
    protected RemoteCommandManager rcm;
    protected String topicName;

    // Working state - connection can send and/or receive messages.
    public static final String STATE_ACTIVE         = "ACTIVE";
    // close method called but not all resources used by connection are freed yet.
    // Note that even after close method returns the state still may be CLOSING.
    public static final String STATE_CLOSING        = "CLOSING";
    // All resources are freed.
    public static final String STATE_CLOSED         = "CLOSED";
    // STATE_ACTIVE -> STATE_CLOSING -> STATE_CLOSED
    protected String state = STATE_ACTIVE;

    // Connection information String.
    protected String displayString;
    // Array containing a single element - displayString. Used for warnings and debug logging.
    protected Object[] info;
    // Array containing a two elements - displayString and an empty String.
    // Used for debug logging which require messageId in case messageId is null
    // (so that a new array is not created each time).
    protected Object[] infoExt;

    protected BroadcastRemoteConnection(RemoteCommandManager rcm) {
        this.serviceId = rcm.getServiceId();
        this.rcm = rcm;
        this.topicName = ((BroadcastTransportManager)rcm.getTransportManager()).getTopicName();
    }

    /**
     * INTERNAL:
     * Publish the remote command. The result of execution is returned.
     * This method is used only by external (publishing) connection.
     */
    @Override
    public Object executeCommand(Command command) throws CommunicationException {
        if (isActive()) {
            try {
                return executeCommandInternal(command);
            } catch (Exception exception) {
                // Note that there is no need to removeConnection here - it's removed by the calling method:
                // org.eclipse.persistence.internal.sessions.coordination.CommandPropagator.propagateCommand.
                // This method catches CommunicationException and processes it in handleCommunicationException method.
                // The latter method, in case shouldRemoveConnectionOnError==true, removes the connection;
                // otherwise it wraps the CommunicationException into RemoteCommandManagerException
                // (with errorCode RemoteCommandManagerException.ERROR_PROPAGATING_COMMAND)
                // and gives the use a chance to handle it - this is an opportunity for the user to
                // stop remote command processing.
                throw CommunicationException.errorSendingMessage(getServiceId().getId(), exception);
            }
        } else {
            rcm.logWarning("broadcast_ignored_command_while_closing_connection", getInfo());
            return null;
        }
    }

    /**
     * INTERNAL:
     * Publish the remote command. The result of execution is returned.
     * This method is used only by external (publishing) connection.
     */
    @Override
    public Object executeCommand(byte[] command) throws CommunicationException {
        if (isActive()) {
            try {
                return executeCommandInternal(command);
            } catch (Exception exception) {
                // Note that there is no need to removeConnection here - it's removed by the calling method:
                // org.eclipse.persistence.internal.sessions.coordination.CommandPropagator.propagateCommand.
                // This method catches CommunicationException and processes it in handleCommunicationException method.
                // The latter method, in case shouldRemoveConnectionOnError==true, removes the connection;
                // otherwise it wraps the CommunicationException into RemoteCommandManagerException
                // (with errorCode RemoteCommandManagerException.ERROR_PROPAGATING_COMMAND)
                // and gives the use a chance to handle it - this is an opportunity for the user to
                // stop remote command processing.
                throw CommunicationException.errorSendingMessage(getServiceId().getId(), exception);
            }
        } else {
            rcm.logWarning("broadcast_ignored_command_while_closing_connection", getInfo());
            return null;
        }
    }

    /**
     * INTERNAL:
     * Publish the remote command. The result of execution is returned.
     * This method is used only by external (publishing) connection.
     */
    protected abstract Object executeCommandInternal(Object command) throws Exception;

    /**
     * INTERNAL:
     * Called from executeCommandInternal to log debugInfo right before sending the message.
     * Returns array {toString(), messageId }.
     * In case messageId is null returns getInfoExt() avoiding creation of a new array.
     */
    protected Object[] logDebugBeforePublish(String messageId) {
        Object[] debugInfo = null;
        if(messageId == null) {
            debugInfo = getInfoExt();
        } else {
            debugInfo = new Object[] {toString(), messageId};
        }
        // call logDebugWithoutLevelCheck to avoid the second rcm.shouldLogDebugMessage() check
        rcm.logDebugWithoutLevelCheck("broadcast_sending_message", debugInfo);
        return debugInfo;
    }

    /**
     * INTERNAL:
     * Called from executeCommandInternal to log debugInfo right after sending the message.
     * Only call this method in case logDebugBeforePublish returned non-null
     * this is indication that debug logging is enabled.
     * Pass to this method debugInfo returned by logDebugBeforePublish.
     * Need to pass messageId only in case it has changed since logDebugBeforePublish:
     * some broadcasting protocols (JMS) don't generate messageId until the message is published.
     */
    protected void logDebugAfterPublish(Object[] debugInfo, String messageId) {
        if(messageId != null) {
            if(debugInfo == getInfoExt()) {
                // need to create a new debugInfo object - the original is the cached info.
                debugInfo = new Object[] {toString(), messageId};
            } else {
                // need only update messageId on existing debugInfo
                debugInfo[3] = messageId;
            }
        }
        // call logDebugWithoutLevelCheck to avoid the second rcm.shouldLogDebugMessage() check
        rcm.logDebugWithoutLevelCheck("broadcast_sent_message", debugInfo);
    }

    /**
     * INTERNAL:
     * Called when a message is received to log debugInfo:
     * { toString(), messageId }.
     * This method is used by local (listening) connection only.
     */
    protected void logDebugOnReceiveMessage(String messageId) {
        Object[] debugInfo = null;
        if(messageId == null) {
            debugInfo = getInfoExt();
        } else {
            debugInfo = new Object[] {toString(), messageId};
        }
        // call logDebugWithoutLevelCheck to avoid the second rcm.shouldLogDebugMessage() check
        rcm.logDebugWithoutLevelCheck("broadcast_retreived_message", debugInfo);
    }

    /**
     * INTERNAL:
     * Process the object extracted from the received message.
     * Pass to this method messageInfo created by logDebugOnReceiveMessage method.
     * This method is used by local (listening) connection only.
     */
    protected void processReceivedObject(Object object, String messageId) {
        Command remoteCommand = null;
        if (object instanceof Command) {
            remoteCommand = (Command)object;
            try {
                // prevent the processing of messages sent with the same serviceId
                if (shouldCheckServiceId()) {
                    if (remoteCommand.getServiceId().getId().equals(this.serviceId.getId())) {
                        return;
                    }
                }
                if (remoteCommand.getServiceId().getChannel().equals(this.serviceId.getChannel())) {
                    if(rcm.shouldLogDebugMessage()) {
                        Object[] args = { toString(), messageId, remoteCommand.getServiceId().toString(), Helper.getShortClassName(remoteCommand) };
                        rcm.logDebugWithoutLevelCheck("broadcast_processing_remote_command", args);
                    }
                    rcm.processCommandFromRemoteConnection(remoteCommand);
                } else {
                    if(rcm.shouldLogWarningMessage()) {
                        Object[] args = { toString(), messageId, remoteCommand.getServiceId().toString(), Helper.getShortClassName(remoteCommand)};
                        rcm.logWarningWithoutLevelCheck("broadcast_ignore_remote_command_from_different_channel", args);
                    }
                }
            } catch (RuntimeException e) {
                try {
                    rcm.handleException(RemoteCommandManagerException.errorProcessingRemoteCommand(toString(), messageId, remoteCommand.getServiceId().toString(), Helper.getShortClassName(remoteCommand), e));
                } catch (RuntimeException ex) {
                    // User had a chance to handle the exception.
                    // The method is called by a listener thread - no one could catch this exception.
                }
            }
        } else if (object == null) {
            Object[] args = { toString(), messageId};
            rcm.logWarning("broadcast_remote_command_is_null", args);
        } else {
            if(rcm.shouldLogWarningMessage()) {
                String className = object.getClass().getName();
                Object[] args = { toString(), messageId, className };
                rcm.logWarningWithoutLevelCheck("broadcast_remote_command_wrong_type", args);
            }
        }
    }

    /**
     * INTERNAL:
     * Call this message in case there is failure to extract the object
     * (to be passed to processReceivedObject) from the message.
     * Pass to this method debugInfo created by logDebugOnReceiveMessage method.
     * This method is used by local (listening) connection only.
     */
    protected void failDeserializeMessage(String messageId, Exception exception) {
        try {
            rcm.handleException(RemoteCommandManagerException.errorDeserializeRemoteCommand(toString(), messageId, exception));
        } catch (Exception ex) {
            // User had a chance to handle the exception.
            // The method is called by a listener thread - no one could catch this exception.
        }
    }

    /**
     * INTERNAL:
     * This method is called when connection in no longer used and it's resources should be freed.
     * As soon as this method is called the state is CLOSING.
     * Usually the state is CLOSED just before the method returns,
     * but there are some special cases (see comment to areAllResourcesFreedOnClose method)
     * when the state is still CLOSING after the method returns.
     */
    @Override
    public void close() {
        synchronized(this) {
            if(isClosing()) {
                rcm.logWarning("broadcast_connection_already_closing", getInfo());
                return;
            } else if(isClosed()) {
                rcm.logWarning("broadcast_connection_already_closed", getInfo());
                return;
            } else {
                state = STATE_CLOSING;
            }
        }
        try {
            rcm.logDebug("broadcast_closing_connection", getInfo());
            closeInternal();
        } catch (Exception exception) {
            Object[] args = { toString(), exception };
            rcm.logWarning("broadcast_exception_thrown_when_attempting_to_close_connection", args);
        } finally {
            if(areAllResourcesFreedOnClose()) {
                rcm.logDebug("broadcast_connection_closed", getInfo());
                state = STATE_CLOSED;
            }
        }
    }

    /**
     * INTERNAL:
     * State of the connection.
     */
    public String getState() {
        return state;
    }

    /**
     * INTERNAL:
     * Connection is open for business.
     */
    public boolean isActive() {
        return state == STATE_ACTIVE;
    }

    /**
     * INTERNAL:
     * close method has been called.
     */
    public boolean isClosing() {
        return state == STATE_CLOSING;
    }

    /**
     * INTERNAL:
     * Connection is closed - all resources were freed.
     */
    public boolean isClosed() {
        return state == STATE_CLOSED;
    }

    /**
     * INTERNAL:
     * This method is called by close method.
     * This method usually
     * (but not always see comment to areAllResourcesFreedOnClose method)
     * frees all the resources.
     */
    protected abstract void closeInternal() throws Exception;

    /**
     * INTERNAL:
     */
    public String getTopicName() {
        return topicName;
    }

    /**
     * INTERNAL:
     */
    protected Object[] getInfo() {
        if(info == null) {
            info = new Object[] {toString()};
        }
        return info;
    }

    /**
     * INTERNAL:
     */
    protected Object[] getInfoExt() {
        if(infoExt == null) {
            infoExt = new Object[] {toString(), ""};
        }
        return infoExt;
    }

    /**
     * INTERNAL:
     * Indicates whether all the resources used by connection are freed after close method returns.
     * Usually that's the case. However in case of local (listening) JMSTopicRemoteConnection
     * close merely indicates to the listening thread that it should free TopicConnection and exit.
     * Note that it may take a while: the listening thread waits until subscriber.receive method either
     * returns a message or throws an exception.
     */
    protected boolean areAllResourcesFreedOnClose() {
        return true;
    }

    /**
     * INTERNAL:
     */
    @Override
    public String toString() {
        if(displayString == null) {
            createDisplayString();
        }
        return displayString;
    }

    /**
     * INTERNAL:
     */
    protected void createDisplayString() {
        this.displayString = Helper.getShortClassName(this) + "[" + serviceId.toString() + ", topic " + topicName +"]";
    }

    /**
     * INTERNAL:
     * Return whether a BroadcastConnection should check a ServiceId against its
     * own ServiceId to avoid the processing of Commands with the same ServiceId.
     * @return boolean
     */
    protected boolean shouldCheckServiceId() {
        return false;
    }

}
