blob: f441257092a1f93aafa536186bfcd0a8a1c322a6 [file] [log] [blame]
/*
* Copyright (c) 1998, 2020 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2018 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
// 11/07/2017 - Dalia Abo Sheasha
// - 526957 : Split the logging and trace messages
package org.eclipse.persistence.sessions.coordination;
import java.net.InetAddress;
import org.eclipse.persistence.exceptions.RemoteCommandManagerException;
import org.eclipse.persistence.internal.helper.Helper;
import org.eclipse.persistence.internal.localization.LoggingLocalization;
import org.eclipse.persistence.internal.localization.TraceLocalization;
import org.eclipse.persistence.internal.sessions.AbstractSession;
import org.eclipse.persistence.internal.sessions.DatabaseSessionImpl;
import org.eclipse.persistence.internal.sessions.UnitOfWorkChangeSet;
import org.eclipse.persistence.internal.sessions.coordination.CommandPropagator;
import org.eclipse.persistence.internal.sessions.coordination.RCMCommand;
import org.eclipse.persistence.internal.sessions.coordination.RemoteConnection;
import org.eclipse.persistence.platform.server.ServerPlatform;
import org.eclipse.persistence.sessions.Session;
import org.eclipse.persistence.sessions.SessionProfiler;
import org.eclipse.persistence.sessions.coordination.rmi.RMITransportManager;
import org.eclipse.persistence.sessions.serializers.JavaSerializer;
import org.eclipse.persistence.sessions.serializers.Serializer;
/**
* <p>
* <b>Purpose</b>: Provide a CommandManager implementation for cache coordination.
* <p>
* <b>Description</b>: A RemoteCommandManager (or RCM) instance is the primary component
* of an RCM service instance. It manages the other components of the service, and
* directs the overall service operation. Its ServiceId uniquely distinguishes it
* from other service instances in the cluster.
* <p>
* Each RCM has a logical channel to which it subscribes and publishes. This channel
* determines which other instances in the cluster the service instance sends and
* receives remote commands to/from. All RCM's on the same channel should have the same
* discovery manager settings (be communicating on the same multicast) so that
* the discovery managers may be able to discover one another. RCM's on different
* channels may operate on the same or on different multicast groups.
* <p>
* An RCM instance knows about other instances in the cluster through its DiscoveryManager.
* Its TransportManager is responsible for setting up the connections to other instances
* once they are discovered.
* <p>
* An RCM is instructed to "propagate", or execute on all remote service instances
* in the cluster that subscribe to the same channel, a remote command by its
* CommandProcessor. Likewise, when an RCM receives a remote command to be executed
* then it passes the command off to the CommandProcessor for the processing of the
* command to occur. CommandProcessors pass commands to the RCM as an Object (in a
* format that may be specific to the application) and the RCM uses its CommandConverter
* to convert it to a EclipseLink Command object before sending the Command off to the
* cluster. Similarly, when a EclipseLink Command object is received then the RCM invokes
* its CommandConverter to convert the object into the application format that will be
* passed to the CommandProcessor to process the command.
*
* @see CommandManager
* @see Command
* @see CommandProcessor
* @see CommandConverter
* @see DiscoveryManager
* @author Steven Vo
* @since OracleAS TopLink 10<i>g</i> (9.0.4)
*/
public class RemoteCommandManager implements org.eclipse.persistence.sessions.coordination.CommandManager {
public static final String DEFAULT_CHANNEL = "EclipseLinkCommandChannel";
public static final boolean DEFAULT_ASYNCHRONOUS_MODE = true;
/** Uniquely identifies this service in the cluster */
protected ServiceId serviceId;
/** Manages the detection of new services as they join the cluster */
protected DiscoveryManager discoveryManager;
/** Manages the transport level connections between command managers */
protected TransportManager transportManager;
/** Invoked to process a command when it is received from the cluster */
protected CommandProcessor commandProcessor;
/** Used for converting commands between EclipseLink Command and app command formats */
protected CommandConverter commandConverter;
/** Determines whether propagation should be synchronous or asynchronous */
protected boolean isAsynchronous;
/** Determines whether profiling command should be send */
protected boolean isEclipseLinkSession;
/** Uniquely identifies ServerPlatform in the cluster */
protected ServerPlatform serverPlatform;
/** Set the Serializer to use for serialization of commands. */
protected Serializer serializer;
//** Indicates whether RCM is active. In case there's discoveryManager it mirrors discoveryManager.isDiscoveryStopped()
protected boolean isStopped = true;
public RemoteCommandManager(CommandProcessor commandProcessor) {
this.serviceId = new ServiceId();
// BUG - 3824040 we must call the setCommandProcessor method to
// ensure the isEclipseLinkSession flag is set correctly.
setCommandProcessor(commandProcessor);
// Set default values
this.transportManager = new RMITransportManager(this);
this.discoveryManager = this.transportManager.createDiscoveryManager();
this.serviceId.setChannel(DEFAULT_CHANNEL);
this.isAsynchronous = DEFAULT_ASYNCHRONOUS_MODE;
this.serializer = JavaSerializer.instance;
// Set the command processor to point back to this command manager
commandProcessor.setCommandManager(this);
}
public RemoteCommandManager(CommandProcessor commandProcessor, TransportManager transportManager) {
this(commandProcessor);
this.transportManager = transportManager;
this.discoveryManager = this.transportManager.createDiscoveryManager();
}
/**
* PUBLIC:
* Initialize the remote command manager. This will also trigger the
* DiscoveryManager to start establishing the EclipseLink cluster.
*/
@Override
public void initialize() {
Object[] args = { this.getServiceId() };
logDebug("starting_rcm", args);
// replace the $HOST substring of the URL with the discovered ipAddress
if ((getUrl() != null) && (getUrl().indexOf(ServiceId.HOST_TOKEN) >= 0)) {
try {
// discover local IP address
String ipAddress = InetAddress.getLocalHost().getHostAddress();
replaceLocalHostIPAddress(ipAddress);
} catch (Exception ex) {
// catch different exceptions that are due to security or IP address of a host could not be determined.
// i.e. java.lang.SecurityException or java.net.UnknownHostException
throw RemoteCommandManagerException.errorDiscoveringLocalHostIPAddress(ex);
}
}
this.isStopped = false;
if (this.discoveryManager != null) {
this.discoveryManager.startDiscovery();
} else {
this.transportManager.createConnections();
}
Serializer serializer = getSerializer();
if (serializer != null) {
serializer.initialize(UnitOfWorkChangeSet.class, null, (AbstractSession)getCommandProcessor());
}
}
/**
* PUBLIC:
* Indicates whether the RCM has been stopped:
* either initialize hasn't been called or shutdown has been called.
*/
public boolean isStopped() {
return isStopped;
}
/**
* PUBLIC:
* Shut down the remote command manager. This will also trigger the
* DiscoveryManager to stop.
* NOTE: Although this call initiates the shutdown process,
* no guarantees are made as to when it will actually complete.
*/
@Override
public void shutdown() {
Object[] args = { this.getServiceId() };
logDebug("stopping_rcm", args);
if(discoveryManager != null) {
discoveryManager.stopDiscovery();
// use a new Discovery thread with same settings as the previous one.
DiscoveryManager newDmgr = transportManager.createDiscoveryManager();
newDmgr.shallowCopy(discoveryManager);
discoveryManager = newDmgr;
}
isStopped = true;
transportManager.discardConnections();
}
/**
* ADVANCED:
* Propagate a remote command to all remote RCM services participating
* in the EclipseLink cluster.
*
* @param command An object representing a EclipseLink command
*/
@Override
public void propagateCommand(Object command) {
Command newCommand;
CommandPropagator propagator;
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordination);
try {
if (this.commandConverter != null) {
// Use the converter if we have one
Object[] args = { command };
logDebug("converting_to_toplink_command", args);
this.commandProcessor.incrementProfile(SessionProfiler.RcmSent);
newCommand = this.commandConverter.convertToEclipseLinkCommand(command);
} else if (command instanceof Command) {
// If converter is not set then maybe it just doesn't need converting
newCommand = (Command)command;
} else {
// We can't convert the thing - we may as well chuck it!
Object[] args = { command };
logWarning("missing_converter", args);
return;
}
// Set our service id on the command to indicate that it came from us
newCommand.setServiceId(getServiceId());
// PERF: Support plugable serialization.
Serializer serializer = getSerializer();
byte[] commandBytes = null;
if (serializer != null) {
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordinationSerialize);
try {
commandBytes = (byte[])serializer.serialize(command, (AbstractSession)getCommandProcessor());
} finally {
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordinationSerialize);
}
}
// Propagate the command (synchronously or asynchronously)
propagator = new CommandPropagator(this, newCommand, commandBytes);
if (shouldPropagateAsynchronously()) {
propagator.asynchronousPropagateCommand();
} else {
propagator.synchronousPropagateCommand();
}
} finally {
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordination);
}
}
/**
* INTERNAL:
* Deserialize the command and execute it.
*/
public void processCommandFromRemoteConnection(byte[] commandBytes) {
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordinationSerialize);
Command command = null;
try {
Serializer serializer = getSerializer();
if (serializer == null) {
serializer = JavaSerializer.instance;
}
command = (Command)serializer.deserialize(commandBytes, (AbstractSession)getCommandProcessor());
} finally {
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordinationSerialize);
}
processCommandFromRemoteConnection(command);
}
/**
* INTERNAL:
* Delegate to command processor
*/
public void processCommandFromRemoteConnection(Command command) {
Object[] args = { command.getClass().getName(), command.getServiceId() };
logDebug("received_remote_command", args);
this.commandProcessor.incrementProfile(SessionProfiler.RcmReceived);
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordination);
try {
// If the command is internal then execute it on this RCM
if (command.isInternalCommand() || command instanceof RCMCommand) {
logDebug("processing_internal_command", args);
((RCMCommand)command).executeWithRCM(this);
return;
}
// Convert command if neccessary
Object newCommand = command;
if (commandConverter != null) {
logDebug("converting_to_user_command", args);
newCommand = commandConverter.convertToUserCommand(command);
}
// process command with command processor
logDebug("processing_remote_command", args);
this.commandProcessor.processCommand(newCommand);
} finally {
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordination);
}
this.commandProcessor.incrementProfile(SessionProfiler.RemoteChangeSet);
}
@Override
public CommandProcessor getCommandProcessor() {
return commandProcessor;
}
@Override
public void setCommandProcessor(CommandProcessor newCommandProcessor) {
commandProcessor = newCommandProcessor;
if (newCommandProcessor instanceof Session) {
isEclipseLinkSession = true;
}
}
@Override
public TransportManager getTransportManager() {
return transportManager;
}
@Override
public void setTransportManager(TransportManager newTransportManager) {
transportManager = newTransportManager;
newTransportManager.setRemoteCommandManager(this);
discoveryManager = transportManager.createDiscoveryManager();
}
/**
* INTERNAL:
* Delegate to the command procesor to handle the exception.
*/
public void handleException(RuntimeException exception) {
commandProcessor.handleException(exception);
}
/**
* INTERNAL:
* A new service has been detected by the discovery manager. Take the appropriate
* action to connect to the service.
*/
public void newServiceDiscovered(ServiceId service) {
RemoteConnection connection = transportManager.createConnection(service);
transportManager.addConnectionToExternalService(connection);
}
/**
* PUBLIC:
* Return the discovery manager that detects the arrival of new cluster members
*/
@Override
public DiscoveryManager getDiscoveryManager() {
return discoveryManager;
}
/**
* PUBLIC:
* Return the converter instance used to convert between EclipseLink Command
* objects and an application command format.
*/
@Override
public CommandConverter getCommandConverter() {
return commandConverter;
}
/**
* ADVANCED:
* Set the converter instance that will be invoked by this CommandProcessor
* to convert commands from their application command format into EclipseLink
* Command objects before being propagated to remote command manager services.
* The converter will also be invoked to convert EclipseLink Command objects into
* application format before being sent to the CommandProcessor for execution.
*/
@Override
public void setCommandConverter(CommandConverter newCommandConverter) {
commandConverter = newCommandConverter;
}
/**
* INTERNAL:
*/
public boolean shouldLogMessage(int logLevel) {
return commandProcessor.shouldLogMessages(logLevel);
}
public boolean shouldLogDebugMessage() {
return commandProcessor.shouldLogMessages(CommandProcessor.LOG_DEBUG);
}
public boolean shouldLogWarningMessage() {
return commandProcessor.shouldLogMessages(CommandProcessor.LOG_WARNING);
}
/**
* INTERNAL:
*/
public void logMessage(int logLevel, String message, Object[] args) {
if (commandProcessor.shouldLogMessages(logLevel)) {
logMessageWithoutLevelCheck(logLevel, message, args);
}
}
/**
* INTERNAL:
* Use this method in case the necessary logLevel has been confirmed
* by calling commandProcessor.shouldLogMessages method
*/
public void logMessageWithoutLevelCheck(int logLevel, String message, Object[] args) {
String i18nmsg = message;
if ((logLevel == CommandProcessor.LOG_ERROR) || (logLevel == CommandProcessor.LOG_WARNING)) {
i18nmsg = LoggingLocalization.buildMessage(message, args);
} else if ((logLevel == CommandProcessor.LOG_INFO) || (logLevel == CommandProcessor.LOG_DEBUG)) {
i18nmsg = TraceLocalization.buildMessage(message, args);
}
commandProcessor.logMessage(logLevel, i18nmsg);
}
/**
* INTERNAL:
* Convenience logging methods.
*/
public void logDebug(String message, Object[] args) {
this.logMessage(CommandProcessor.LOG_DEBUG, message, args);
}
public void logDebugWithoutLevelCheck(String message, Object[] args) {
this.logMessageWithoutLevelCheck(CommandProcessor.LOG_DEBUG, message, args);
}
public void logInfo(String message, Object[] args) {
this.logMessage(CommandProcessor.LOG_INFO, message, args);
}
public void logWarning(String message, Object[] args) {
this.logMessage(CommandProcessor.LOG_WARNING, message, args);
}
public void logWarningWithoutLevelCheck(String message, Object[] args) {
this.logMessageWithoutLevelCheck(CommandProcessor.LOG_WARNING, message, args);
}
public void logError(String message, Object[] args) {
this.logMessage(CommandProcessor.LOG_ERROR, message, args);
}
/**
* INTERNAL:
* Return the service info that identifies this service instance
*/
public ServiceId getServiceId() {
return serviceId;
}
/**
* PUBLIC:
* Return the service channel for this command manager. All command managers
* with the same service channel will send and receive commands from each other.
* Commands sent on other service channels will not be exchanged with this
* command manager.
*/
@Override
public String getChannel() {
return this.getServiceId().getChannel();
}
/**
* ADVANCED:
* Set the service channel for this command manager. All command managers
* with the same service channel will send and receive commands from each other.
* Commands sent on other service channels will not be exchanged with this
* command manager.
*/
@Override
public void setChannel(String channel) {
this.getServiceId().setChannel(channel);
}
/**
* INTERNAL:
* Return whether this command manager should process profile commands
*/
@Override
public boolean isCommandProcessorASession() {
return this.isEclipseLinkSession;
}
/**
* PUBLIC:
* Return the URL for this command manager.
*/
@Override
public String getUrl() {
return this.getServiceId().getURL();
}
/**
* ADVANCED:
* Set the URL for this command manager.
*/
@Override
public void setUrl(String url) {
this.getServiceId().setURL(url);
}
/**
* PUBLIC:
* Return whether this command manager should propagate commands
* synchronously or asynchronously. If set to synchronous propagation then
* propagateCommand() will not return to the caller until the command has
* been executed on all of the services in the cluster. In asynchronous mode
* the command manager will create a separate thread for each of the remote
* service executions, and then promptly return to the caller.
*/
@Override
public boolean shouldPropagateAsynchronously() {
return isAsynchronous;
}
/**
* ADVANCED:
* Set whether this command manager should propagate commands
* synchronously or asynchronously. If set to synchronous propagation then
* propagateCommand() will not return to the caller until the command has
* been executed on all of the services in the cluster. In asynchronous mode
* the command manager will create a separate thread for each of the remote
* service executions, and then promptly return to the caller.
*/
@Override
public void setShouldPropagateAsynchronously(boolean asyncMode) {
isAsynchronous = asyncMode;
}
/**
* ADVANCED:
* Allow user to replace the $HOST subString of the local host URL with the user user input at runtime.
* By default, EclipseLink will try to discovery the local host IP and may fail due to security or network restrictions.
* In this case, user can call this API to specify the IP address or host name during pre-login session event or before session login.
* Example:
* If input is 145.23.127.79, the local host URL of ormi://$HOST:2971:/app_name will become ormi://145.23.127.79:2971:/app_name
*/
public void replaceLocalHostIPAddress(String ipAddress) {
String newURL = Helper.replaceFirstSubString(this.getUrl(), ServiceId.HOST_TOKEN, ipAddress);
if (newURL != null) {
this.setUrl(newURL);
}
}
/**
* ADVANCED:
* Allow user to replace the $PORT subString of the local host URL with the user user input at runtime.
* In this case, user can call this API to specify the port number for a specific transport during pre-login session event or before session login.
* Example:
* If input is 7799, the local host URL of ormi://145.23.127.79:$PORT/app_name will become ormi://145.23.127.79:7799/app_name
*/
public void replaceTransportPortNumber(String portNumber) {
String newURL = Helper.replaceFirstSubString(this.getUrl(), ServiceId.PORT_TOKEN, portNumber);
if (newURL != null) {
this.setUrl(newURL);
}
}
/**
* INTERNAL:
* Return the serverPlatform that identifies the application server
*/
public ServerPlatform getServerPlatform() {
if (isCommandProcessorASession()) {
return ((DatabaseSessionImpl)getCommandProcessor()).getServerPlatform();
} else {
if (serverPlatform != null) {
return this.serverPlatform;
} else {
throw RemoteCommandManagerException.errorGettingServerPlatform();
}
}
}
/**
* PUBLIC:
* The ServerPlatform must be set manually when the RemoteCommandManager'CommandProcessor is not EclipseLink Session.
* When the CommandProcessor is a EclipseLink Session, the ServerPlatform is automatically gotten from the Session.
*/
public void setServerPlatform(ServerPlatform theServerPlatform) {
this.serverPlatform = theServerPlatform;
}
/**
* PUBLIC:
* Return the Serializer to use for serialization of commands.
*/
public Serializer getSerializer() {
return serializer;
}
/**
* PUBLIC:
* Set the Serializer to use for serialization of commands.
*/
public void setSerializer(Serializer serializer) {
this.serializer = serializer;
}
}