| /* |
| * Copyright (c) 1998, 2020 Oracle and/or its affiliates. All rights reserved. |
| * Copyright (c) 1998, 2018 IBM Corporation. All rights reserved. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0, |
| * or the Eclipse Distribution License v. 1.0 which is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
| */ |
| |
| // Contributors: |
| // Oracle - initial API and implementation from Oracle TopLink |
| // 11/07/2017 - Dalia Abo Sheasha |
| // - 526957 : Split the logging and trace messages |
| package org.eclipse.persistence.sessions.coordination; |
| |
| import java.net.InetAddress; |
| |
| import org.eclipse.persistence.exceptions.RemoteCommandManagerException; |
| import org.eclipse.persistence.internal.helper.Helper; |
| import org.eclipse.persistence.internal.localization.LoggingLocalization; |
| import org.eclipse.persistence.internal.localization.TraceLocalization; |
| import org.eclipse.persistence.internal.sessions.AbstractSession; |
| import org.eclipse.persistence.internal.sessions.DatabaseSessionImpl; |
| import org.eclipse.persistence.internal.sessions.UnitOfWorkChangeSet; |
| import org.eclipse.persistence.internal.sessions.coordination.CommandPropagator; |
| import org.eclipse.persistence.internal.sessions.coordination.RCMCommand; |
| import org.eclipse.persistence.internal.sessions.coordination.RemoteConnection; |
| import org.eclipse.persistence.platform.server.ServerPlatform; |
| import org.eclipse.persistence.sessions.Session; |
| import org.eclipse.persistence.sessions.SessionProfiler; |
| import org.eclipse.persistence.sessions.coordination.rmi.RMITransportManager; |
| import org.eclipse.persistence.sessions.serializers.JavaSerializer; |
| import org.eclipse.persistence.sessions.serializers.Serializer; |
| |
| /** |
| * <p> |
| * <b>Purpose</b>: Provide a CommandManager implementation for cache coordination. |
| * <p> |
| * <b>Description</b>: A RemoteCommandManager (or RCM) instance is the primary component |
| * of an RCM service instance. It manages the other components of the service, and |
| * directs the overall service operation. Its ServiceId uniquely distinguishes it |
| * from other service instances in the cluster. |
| * <p> |
| * Each RCM has a logical channel to which it subscribes and publishes. This channel |
| * determines which other instances in the cluster the service instance sends and |
| * receives remote commands to/from. All RCM's on the same channel should have the same |
| * discovery manager settings (be communicating on the same multicast) so that |
| * the discovery managers may be able to discover one another. RCM's on different |
| * channels may operate on the same or on different multicast groups. |
| * <p> |
| * An RCM instance knows about other instances in the cluster through its DiscoveryManager. |
| * Its TransportManager is responsible for setting up the connections to other instances |
| * once they are discovered. |
| * <p> |
| * An RCM is instructed to "propagate", or execute on all remote service instances |
| * in the cluster that subscribe to the same channel, a remote command by its |
| * CommandProcessor. Likewise, when an RCM receives a remote command to be executed |
| * then it passes the command off to the CommandProcessor for the processing of the |
| * command to occur. CommandProcessors pass commands to the RCM as an Object (in a |
| * format that may be specific to the application) and the RCM uses its CommandConverter |
| * to convert it to a EclipseLink Command object before sending the Command off to the |
| * cluster. Similarly, when a EclipseLink Command object is received then the RCM invokes |
| * its CommandConverter to convert the object into the application format that will be |
| * passed to the CommandProcessor to process the command. |
| * |
| * @see CommandManager |
| * @see Command |
| * @see CommandProcessor |
| * @see CommandConverter |
| * @see DiscoveryManager |
| * @author Steven Vo |
| * @since OracleAS TopLink 10<i>g</i> (9.0.4) |
| */ |
| public class RemoteCommandManager implements org.eclipse.persistence.sessions.coordination.CommandManager { |
| public static final String DEFAULT_CHANNEL = "EclipseLinkCommandChannel"; |
| public static final boolean DEFAULT_ASYNCHRONOUS_MODE = true; |
| |
| /** Uniquely identifies this service in the cluster */ |
| protected ServiceId serviceId; |
| |
| /** Manages the detection of new services as they join the cluster */ |
| protected DiscoveryManager discoveryManager; |
| |
| /** Manages the transport level connections between command managers */ |
| protected TransportManager transportManager; |
| |
| /** Invoked to process a command when it is received from the cluster */ |
| protected CommandProcessor commandProcessor; |
| |
| /** Used for converting commands between EclipseLink Command and app command formats */ |
| protected CommandConverter commandConverter; |
| |
| /** Determines whether propagation should be synchronous or asynchronous */ |
| protected boolean isAsynchronous; |
| |
| /** Determines whether profiling command should be send */ |
| protected boolean isEclipseLinkSession; |
| |
| /** Uniquely identifies ServerPlatform in the cluster */ |
| protected ServerPlatform serverPlatform; |
| |
| /** Set the Serializer to use for serialization of commands. */ |
| protected Serializer serializer; |
| |
| //** Indicates whether RCM is active. In case there's discoveryManager it mirrors discoveryManager.isDiscoveryStopped() |
| protected boolean isStopped = true; |
| |
| public RemoteCommandManager(CommandProcessor commandProcessor) { |
| this.serviceId = new ServiceId(); |
| |
| // BUG - 3824040 we must call the setCommandProcessor method to |
| // ensure the isEclipseLinkSession flag is set correctly. |
| setCommandProcessor(commandProcessor); |
| |
| // Set default values |
| this.transportManager = new RMITransportManager(this); |
| this.discoveryManager = this.transportManager.createDiscoveryManager(); |
| this.serviceId.setChannel(DEFAULT_CHANNEL); |
| this.isAsynchronous = DEFAULT_ASYNCHRONOUS_MODE; |
| this.serializer = JavaSerializer.instance; |
| |
| // Set the command processor to point back to this command manager |
| commandProcessor.setCommandManager(this); |
| } |
| |
| public RemoteCommandManager(CommandProcessor commandProcessor, TransportManager transportManager) { |
| this(commandProcessor); |
| this.transportManager = transportManager; |
| this.discoveryManager = this.transportManager.createDiscoveryManager(); |
| } |
| |
| /** |
| * PUBLIC: |
| * Initialize the remote command manager. This will also trigger the |
| * DiscoveryManager to start establishing the EclipseLink cluster. |
| */ |
| @Override |
| public void initialize() { |
| Object[] args = { this.getServiceId() }; |
| logDebug("starting_rcm", args); |
| |
| // replace the $HOST substring of the URL with the discovered ipAddress |
| if ((getUrl() != null) && (getUrl().indexOf(ServiceId.HOST_TOKEN) >= 0)) { |
| try { |
| // discover local IP address |
| String ipAddress = InetAddress.getLocalHost().getHostAddress(); |
| replaceLocalHostIPAddress(ipAddress); |
| } catch (Exception ex) { |
| // catch different exceptions that are due to security or IP address of a host could not be determined. |
| // i.e. java.lang.SecurityException or java.net.UnknownHostException |
| throw RemoteCommandManagerException.errorDiscoveringLocalHostIPAddress(ex); |
| } |
| } |
| this.isStopped = false; |
| if (this.discoveryManager != null) { |
| this.discoveryManager.startDiscovery(); |
| } else { |
| this.transportManager.createConnections(); |
| } |
| |
| Serializer serializer = getSerializer(); |
| if (serializer != null) { |
| serializer.initialize(UnitOfWorkChangeSet.class, null, (AbstractSession)getCommandProcessor()); |
| } |
| } |
| |
| /** |
| * PUBLIC: |
| * Indicates whether the RCM has been stopped: |
| * either initialize hasn't been called or shutdown has been called. |
| */ |
| public boolean isStopped() { |
| return isStopped; |
| } |
| |
| /** |
| * PUBLIC: |
| * Shut down the remote command manager. This will also trigger the |
| * DiscoveryManager to stop. |
| * NOTE: Although this call initiates the shutdown process, |
| * no guarantees are made as to when it will actually complete. |
| */ |
| @Override |
| public void shutdown() { |
| Object[] args = { this.getServiceId() }; |
| logDebug("stopping_rcm", args); |
| |
| if(discoveryManager != null) { |
| discoveryManager.stopDiscovery(); |
| |
| // use a new Discovery thread with same settings as the previous one. |
| DiscoveryManager newDmgr = transportManager.createDiscoveryManager(); |
| newDmgr.shallowCopy(discoveryManager); |
| discoveryManager = newDmgr; |
| } |
| isStopped = true; |
| transportManager.discardConnections(); |
| } |
| |
| /** |
| * ADVANCED: |
| * Propagate a remote command to all remote RCM services participating |
| * in the EclipseLink cluster. |
| * |
| * @param command An object representing a EclipseLink command |
| */ |
| @Override |
| public void propagateCommand(Object command) { |
| Command newCommand; |
| CommandPropagator propagator; |
| |
| this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordination); |
| try { |
| if (this.commandConverter != null) { |
| // Use the converter if we have one |
| Object[] args = { command }; |
| logDebug("converting_to_toplink_command", args); |
| this.commandProcessor.incrementProfile(SessionProfiler.RcmSent); |
| newCommand = this.commandConverter.convertToEclipseLinkCommand(command); |
| } else if (command instanceof Command) { |
| // If converter is not set then maybe it just doesn't need converting |
| newCommand = (Command)command; |
| } else { |
| // We can't convert the thing - we may as well chuck it! |
| Object[] args = { command }; |
| logWarning("missing_converter", args); |
| return; |
| } |
| |
| // Set our service id on the command to indicate that it came from us |
| newCommand.setServiceId(getServiceId()); |
| |
| // PERF: Support plugable serialization. |
| Serializer serializer = getSerializer(); |
| byte[] commandBytes = null; |
| if (serializer != null) { |
| this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordinationSerialize); |
| try { |
| commandBytes = (byte[])serializer.serialize(command, (AbstractSession)getCommandProcessor()); |
| } finally { |
| this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordinationSerialize); |
| } |
| } |
| |
| // Propagate the command (synchronously or asynchronously) |
| propagator = new CommandPropagator(this, newCommand, commandBytes); |
| |
| if (shouldPropagateAsynchronously()) { |
| propagator.asynchronousPropagateCommand(); |
| } else { |
| propagator.synchronousPropagateCommand(); |
| } |
| } finally { |
| this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordination); |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Deserialize the command and execute it. |
| */ |
| public void processCommandFromRemoteConnection(byte[] commandBytes) { |
| this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordinationSerialize); |
| Command command = null; |
| try { |
| Serializer serializer = getSerializer(); |
| if (serializer == null) { |
| serializer = JavaSerializer.instance; |
| } |
| command = (Command)serializer.deserialize(commandBytes, (AbstractSession)getCommandProcessor()); |
| } finally { |
| this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordinationSerialize); |
| } |
| processCommandFromRemoteConnection(command); |
| } |
| |
| /** |
| * INTERNAL: |
| * Delegate to command processor |
| */ |
| public void processCommandFromRemoteConnection(Command command) { |
| Object[] args = { command.getClass().getName(), command.getServiceId() }; |
| logDebug("received_remote_command", args); |
| |
| this.commandProcessor.incrementProfile(SessionProfiler.RcmReceived); |
| this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordination); |
| try { |
| // If the command is internal then execute it on this RCM |
| if (command.isInternalCommand() || command instanceof RCMCommand) { |
| logDebug("processing_internal_command", args); |
| ((RCMCommand)command).executeWithRCM(this); |
| return; |
| } |
| |
| // Convert command if neccessary |
| Object newCommand = command; |
| if (commandConverter != null) { |
| logDebug("converting_to_user_command", args); |
| newCommand = commandConverter.convertToUserCommand(command); |
| } |
| |
| // process command with command processor |
| logDebug("processing_remote_command", args); |
| this.commandProcessor.processCommand(newCommand); |
| } finally { |
| this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordination); |
| } |
| this.commandProcessor.incrementProfile(SessionProfiler.RemoteChangeSet); |
| } |
| |
| @Override |
| public CommandProcessor getCommandProcessor() { |
| return commandProcessor; |
| } |
| |
| @Override |
| public void setCommandProcessor(CommandProcessor newCommandProcessor) { |
| commandProcessor = newCommandProcessor; |
| if (newCommandProcessor instanceof Session) { |
| isEclipseLinkSession = true; |
| } |
| } |
| |
| @Override |
| public TransportManager getTransportManager() { |
| return transportManager; |
| } |
| |
| @Override |
| public void setTransportManager(TransportManager newTransportManager) { |
| transportManager = newTransportManager; |
| newTransportManager.setRemoteCommandManager(this); |
| discoveryManager = transportManager.createDiscoveryManager(); |
| } |
| |
| /** |
| * INTERNAL: |
| * Delegate to the command procesor to handle the exception. |
| */ |
| public void handleException(RuntimeException exception) { |
| commandProcessor.handleException(exception); |
| } |
| |
| /** |
| * INTERNAL: |
| * A new service has been detected by the discovery manager. Take the appropriate |
| * action to connect to the service. |
| */ |
| public void newServiceDiscovered(ServiceId service) { |
| RemoteConnection connection = transportManager.createConnection(service); |
| transportManager.addConnectionToExternalService(connection); |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the discovery manager that detects the arrival of new cluster members |
| */ |
| @Override |
| public DiscoveryManager getDiscoveryManager() { |
| return discoveryManager; |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the converter instance used to convert between EclipseLink Command |
| * objects and an application command format. |
| */ |
| @Override |
| public CommandConverter getCommandConverter() { |
| return commandConverter; |
| } |
| |
| /** |
| * ADVANCED: |
| * Set the converter instance that will be invoked by this CommandProcessor |
| * to convert commands from their application command format into EclipseLink |
| * Command objects before being propagated to remote command manager services. |
| * The converter will also be invoked to convert EclipseLink Command objects into |
| * application format before being sent to the CommandProcessor for execution. |
| */ |
| @Override |
| public void setCommandConverter(CommandConverter newCommandConverter) { |
| commandConverter = newCommandConverter; |
| } |
| |
| /** |
| * INTERNAL: |
| */ |
| public boolean shouldLogMessage(int logLevel) { |
| return commandProcessor.shouldLogMessages(logLevel); |
| } |
| public boolean shouldLogDebugMessage() { |
| return commandProcessor.shouldLogMessages(CommandProcessor.LOG_DEBUG); |
| } |
| public boolean shouldLogWarningMessage() { |
| return commandProcessor.shouldLogMessages(CommandProcessor.LOG_WARNING); |
| } |
| |
| /** |
| * INTERNAL: |
| */ |
| public void logMessage(int logLevel, String message, Object[] args) { |
| if (commandProcessor.shouldLogMessages(logLevel)) { |
| logMessageWithoutLevelCheck(logLevel, message, args); |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Use this method in case the necessary logLevel has been confirmed |
| * by calling commandProcessor.shouldLogMessages method |
| */ |
| public void logMessageWithoutLevelCheck(int logLevel, String message, Object[] args) { |
| String i18nmsg = message; |
| if ((logLevel == CommandProcessor.LOG_ERROR) || (logLevel == CommandProcessor.LOG_WARNING)) { |
| i18nmsg = LoggingLocalization.buildMessage(message, args); |
| } else if ((logLevel == CommandProcessor.LOG_INFO) || (logLevel == CommandProcessor.LOG_DEBUG)) { |
| i18nmsg = TraceLocalization.buildMessage(message, args); |
| } |
| commandProcessor.logMessage(logLevel, i18nmsg); |
| } |
| |
| /** |
| * INTERNAL: |
| * Convenience logging methods. |
| */ |
| public void logDebug(String message, Object[] args) { |
| this.logMessage(CommandProcessor.LOG_DEBUG, message, args); |
| } |
| public void logDebugWithoutLevelCheck(String message, Object[] args) { |
| this.logMessageWithoutLevelCheck(CommandProcessor.LOG_DEBUG, message, args); |
| } |
| |
| public void logInfo(String message, Object[] args) { |
| this.logMessage(CommandProcessor.LOG_INFO, message, args); |
| } |
| |
| public void logWarning(String message, Object[] args) { |
| this.logMessage(CommandProcessor.LOG_WARNING, message, args); |
| } |
| public void logWarningWithoutLevelCheck(String message, Object[] args) { |
| this.logMessageWithoutLevelCheck(CommandProcessor.LOG_WARNING, message, args); |
| } |
| |
| public void logError(String message, Object[] args) { |
| this.logMessage(CommandProcessor.LOG_ERROR, message, args); |
| } |
| |
| /** |
| * INTERNAL: |
| * Return the service info that identifies this service instance |
| */ |
| public ServiceId getServiceId() { |
| return serviceId; |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the service channel for this command manager. All command managers |
| * with the same service channel will send and receive commands from each other. |
| * Commands sent on other service channels will not be exchanged with this |
| * command manager. |
| */ |
| @Override |
| public String getChannel() { |
| return this.getServiceId().getChannel(); |
| } |
| |
| /** |
| * ADVANCED: |
| * Set the service channel for this command manager. All command managers |
| * with the same service channel will send and receive commands from each other. |
| * Commands sent on other service channels will not be exchanged with this |
| * command manager. |
| */ |
| @Override |
| public void setChannel(String channel) { |
| this.getServiceId().setChannel(channel); |
| } |
| |
| /** |
| * INTERNAL: |
| * Return whether this command manager should process profile commands |
| */ |
| @Override |
| public boolean isCommandProcessorASession() { |
| return this.isEclipseLinkSession; |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the URL for this command manager. |
| */ |
| @Override |
| public String getUrl() { |
| return this.getServiceId().getURL(); |
| } |
| |
| /** |
| * ADVANCED: |
| * Set the URL for this command manager. |
| */ |
| @Override |
| public void setUrl(String url) { |
| this.getServiceId().setURL(url); |
| } |
| |
| /** |
| * PUBLIC: |
| * Return whether this command manager should propagate commands |
| * synchronously or asynchronously. If set to synchronous propagation then |
| * propagateCommand() will not return to the caller until the command has |
| * been executed on all of the services in the cluster. In asynchronous mode |
| * the command manager will create a separate thread for each of the remote |
| * service executions, and then promptly return to the caller. |
| */ |
| @Override |
| public boolean shouldPropagateAsynchronously() { |
| return isAsynchronous; |
| } |
| |
| /** |
| * ADVANCED: |
| * Set whether this command manager should propagate commands |
| * synchronously or asynchronously. If set to synchronous propagation then |
| * propagateCommand() will not return to the caller until the command has |
| * been executed on all of the services in the cluster. In asynchronous mode |
| * the command manager will create a separate thread for each of the remote |
| * service executions, and then promptly return to the caller. |
| */ |
| @Override |
| public void setShouldPropagateAsynchronously(boolean asyncMode) { |
| isAsynchronous = asyncMode; |
| } |
| |
| /** |
| * ADVANCED: |
| * Allow user to replace the $HOST subString of the local host URL with the user user input at runtime. |
| * By default, EclipseLink will try to discovery the local host IP and may fail due to security or network restrictions. |
| * In this case, user can call this API to specify the IP address or host name during pre-login session event or before session login. |
| * Example: |
| * If input is 145.23.127.79, the local host URL of ormi://$HOST:2971:/app_name will become ormi://145.23.127.79:2971:/app_name |
| */ |
| public void replaceLocalHostIPAddress(String ipAddress) { |
| String newURL = Helper.replaceFirstSubString(this.getUrl(), ServiceId.HOST_TOKEN, ipAddress); |
| if (newURL != null) { |
| this.setUrl(newURL); |
| } |
| } |
| |
| /** |
| * ADVANCED: |
| * Allow user to replace the $PORT subString of the local host URL with the user user input at runtime. |
| * In this case, user can call this API to specify the port number for a specific transport during pre-login session event or before session login. |
| * Example: |
| * If input is 7799, the local host URL of ormi://145.23.127.79:$PORT/app_name will become ormi://145.23.127.79:7799/app_name |
| */ |
| public void replaceTransportPortNumber(String portNumber) { |
| String newURL = Helper.replaceFirstSubString(this.getUrl(), ServiceId.PORT_TOKEN, portNumber); |
| if (newURL != null) { |
| this.setUrl(newURL); |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Return the serverPlatform that identifies the application server |
| */ |
| public ServerPlatform getServerPlatform() { |
| if (isCommandProcessorASession()) { |
| return ((DatabaseSessionImpl)getCommandProcessor()).getServerPlatform(); |
| } else { |
| if (serverPlatform != null) { |
| return this.serverPlatform; |
| } else { |
| throw RemoteCommandManagerException.errorGettingServerPlatform(); |
| } |
| } |
| } |
| |
| /** |
| * PUBLIC: |
| * The ServerPlatform must be set manually when the RemoteCommandManager'CommandProcessor is not EclipseLink Session. |
| * When the CommandProcessor is a EclipseLink Session, the ServerPlatform is automatically gotten from the Session. |
| */ |
| public void setServerPlatform(ServerPlatform theServerPlatform) { |
| this.serverPlatform = theServerPlatform; |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the Serializer to use for serialization of commands. |
| */ |
| public Serializer getSerializer() { |
| return serializer; |
| } |
| |
| /** |
| * PUBLIC: |
| * Set the Serializer to use for serialization of commands. |
| */ |
| public void setSerializer(Serializer serializer) { |
| this.serializer = serializer; |
| } |
| } |