/******************************************************************************* | |
* Copyright (c) 1998, 2013 Oracle and/or its affiliates. All rights reserved. | |
* This program and the accompanying materials are made available under the | |
* terms of the Eclipse Public License v1.0 and Eclipse Distribution License v. 1.0 | |
* which accompanies this distribution. | |
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html | |
* and the Eclipse Distribution License is available at | |
* http://www.eclipse.org/org/documents/edl-v10.php. | |
* | |
* Contributors: | |
* Oracle - initial API and implementation from Oracle TopLink | |
******************************************************************************/ | |
package org.eclipse.persistence.sessions.coordination; | |
import org.eclipse.persistence.exceptions.RemoteCommandManagerException; | |
import org.eclipse.persistence.internal.sessions.coordination.RemoteConnection; | |
import org.eclipse.persistence.internal.sessions.coordination.RCMCommand; | |
import org.eclipse.persistence.internal.sessions.coordination.CommandPropagator; | |
import org.eclipse.persistence.sessions.coordination.rmi.RMITransportManager; | |
import org.eclipse.persistence.sessions.serializers.JavaSerializer; | |
import org.eclipse.persistence.sessions.serializers.Serializer; | |
import org.eclipse.persistence.internal.helper.Helper; | |
import org.eclipse.persistence.internal.localization.LoggingLocalization; | |
import org.eclipse.persistence.internal.localization.TraceLocalization; | |
import org.eclipse.persistence.internal.sessions.AbstractSession; | |
import org.eclipse.persistence.internal.sessions.DatabaseSessionImpl; | |
import org.eclipse.persistence.internal.sessions.UnitOfWorkChangeSet; | |
import org.eclipse.persistence.sessions.*; | |
import org.eclipse.persistence.platform.server.ServerPlatform; | |
import java.net.InetAddress; | |
/** | |
* <p> | |
* <b>Purpose</b>: Provide a CommandManager implementation for cache coordination. | |
* <p> | |
* <b>Description</b>: A RemoteCommandManager (or RCM) instance is the primary component | |
* of an RCM service instance. It manages the other components of the service, and | |
* directs the overall service operation. Its ServiceId uniquely distinguishes it | |
* from other service instances in the cluster. | |
* <p> | |
* Each RCM has a logical channel to which it subscribes and publishes. This channel | |
* determines which other instances in the cluster the service instance sends and | |
* receives remote commands to/from. All RCM's on the same channel should have the same | |
* discovery manager settings (be communicating on the same multicast) so that | |
* the discovery managers may be able to discover one another. RCM's on different | |
* channels may operate on the same or on different multicast groups. | |
* <p> | |
* An RCM instance knows about other instances in the cluster through its DiscoveryManager. | |
* Its TransportManager is responsible for setting up the connections to other instances | |
* once they are discovered. | |
* <p> | |
* An RCM is instructed to "propagate", or execute on all remote service instances | |
* in the cluster that subscribe to the same channel, a remote command by its | |
* CommandProcessor. Likewise, when an RCM receives a remote command to be executed | |
* then it passes the command off to the CommandProcessor for the processing of the | |
* command to occur. CommandProcessors pass commands to the RCM as an Object (in a | |
* format that may be specific to the application) and the RCM uses its CommandConverter | |
* to convert it to a EclipseLink Command object before sending the Command off to the | |
* cluster. Similarly, when a EclipseLink Command object is received then the RCM invokes | |
* its CommandConverter to convert the object into the application format that will be | |
* passed to the CommandProcessor to process the command. | |
* | |
* @see CommandManager | |
* @see Command | |
* @see CommandProcessor | |
* @see CommandConverter | |
* @see CommandTransporter | |
* @see DiscoveryManager | |
* @author Steven Vo | |
* @since OracleAS TopLink 10<i>g</i> (9.0.4) | |
*/ | |
public class RemoteCommandManager implements org.eclipse.persistence.sessions.coordination.CommandManager { | |
public static final String DEFAULT_CHANNEL = "EclipseLinkCommandChannel"; | |
public static final boolean DEFAULT_ASYNCHRONOUS_MODE = true; | |
/** Uniquely identifies this service in the cluster */ | |
protected ServiceId serviceId; | |
/** Manages the detection of new services as they join the cluster */ | |
protected DiscoveryManager discoveryManager; | |
/** Manages the transport level connections between command managers */ | |
protected TransportManager transportManager; | |
/** Invoked to process a command when it is received from the cluster */ | |
protected CommandProcessor commandProcessor; | |
/** Used for converting commands between EclipseLink Command and app command formats */ | |
protected CommandConverter commandConverter; | |
/** Determines whether propagation should be synchronous or asynchronous */ | |
protected boolean isAsynchronous; | |
/** Determines whether profiling command should be send */ | |
protected boolean isEclipseLinkSession; | |
/** Uniquely identifies ServerPlatform in the cluster */ | |
protected ServerPlatform serverPlatform; | |
/** Set the Serializer to use for serialization of commands. */ | |
protected Serializer serializer; | |
//** Indicates whether RCM is active. In case there's discoveryManager it mirrors discoveryManager.isDiscoveryStopped() | |
protected boolean isStopped = true; | |
public RemoteCommandManager(CommandProcessor commandProcessor) { | |
this.serviceId = new ServiceId(); | |
// BUG - 3824040 we must call the setCommandProcessor method to | |
// ensure the isEclipseLinkSession flag is set correctly. | |
setCommandProcessor(commandProcessor); | |
// Set default values | |
this.transportManager = new RMITransportManager(this); | |
this.discoveryManager = this.transportManager.createDiscoveryManager(); | |
this.serviceId.setChannel(DEFAULT_CHANNEL); | |
this.isAsynchronous = DEFAULT_ASYNCHRONOUS_MODE; | |
this.serializer = JavaSerializer.instance; | |
// Set the command processor to point back to this command manager | |
commandProcessor.setCommandManager(this); | |
} | |
public RemoteCommandManager(CommandProcessor commandProcessor, TransportManager transportManager) { | |
this(commandProcessor); | |
this.transportManager = transportManager; | |
this.discoveryManager = this.transportManager.createDiscoveryManager(); | |
} | |
/** | |
* PUBLIC: | |
* Initialize the remote command manager. This will also trigger the | |
* DiscoveryManager to start establishing the EclipseLink cluster. | |
*/ | |
public void initialize() { | |
Object[] args = { this.getServiceId() }; | |
logDebug("starting_rcm", args); | |
// replace the $HOST substring of the URL with the discovered ipAddress | |
if ((getUrl() != null) && (getUrl().indexOf(ServiceId.HOST_TOKEN) >= 0)) { | |
try { | |
// discover local IP address | |
String ipAddress = InetAddress.getLocalHost().getHostAddress(); | |
replaceLocalHostIPAddress(ipAddress); | |
} catch (Exception ex) { | |
// catch different exceptions that are due to security or IP address of a host could not be determined. | |
// i.e. java.lang.SecurityException or java.net.UnknownHostException | |
throw RemoteCommandManagerException.errorDiscoveringLocalHostIPAddress(ex); | |
} | |
} | |
this.isStopped = false; | |
if (this.discoveryManager != null) { | |
this.discoveryManager.startDiscovery(); | |
} else { | |
this.transportManager.createConnections(); | |
} | |
Serializer serializer = getSerializer(); | |
if (serializer != null) { | |
serializer.initialize(UnitOfWorkChangeSet.class, null, (AbstractSession)getCommandProcessor()); | |
} | |
} | |
/** | |
* PUBLIC: | |
* Indicates whether the RCM has been stopped: | |
* either initialize hasn't been called or shutdown has been called. | |
*/ | |
public boolean isStopped() { | |
return isStopped; | |
} | |
/** | |
* PUBLIC: | |
* Shut down the remote command manager. This will also trigger the | |
* DiscoveryManager to stop. | |
* NOTE: Although this call initiates the shutdown process, | |
* no guarantees are made as to when it will actually complete. | |
*/ | |
public void shutdown() { | |
Object[] args = { this.getServiceId() }; | |
logDebug("stopping_rcm", args); | |
if(discoveryManager != null) { | |
discoveryManager.stopDiscovery(); | |
// use a new Discovery thread with same settings as the previous one. | |
DiscoveryManager newDmgr = transportManager.createDiscoveryManager(); | |
newDmgr.shallowCopy(discoveryManager); | |
discoveryManager = newDmgr; | |
} | |
isStopped = true; | |
transportManager.discardConnections(); | |
} | |
/** | |
* ADVANCED: | |
* Propagate a remote command to all remote RCM services participating | |
* in the EclipseLink cluster. | |
* | |
* @param command An object representing a EclipseLink command | |
*/ | |
public void propagateCommand(Object command) { | |
Command newCommand; | |
CommandPropagator propagator; | |
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordination); | |
try { | |
if (this.commandConverter != null) { | |
// Use the converter if we have one | |
Object[] args = { command }; | |
logDebug("converting_to_toplink_command", args); | |
this.commandProcessor.incrementProfile(SessionProfiler.RcmSent); | |
newCommand = this.commandConverter.convertToEclipseLinkCommand(command); | |
} else if (command instanceof Command) { | |
// If converter is not set then maybe it just doesn't need converting | |
newCommand = (Command)command; | |
} else { | |
// We can't convert the thing - we may as well chuck it! | |
Object[] args = { command }; | |
logWarning("missing_converter", args); | |
return; | |
} | |
// Set our service id on the command to indicate that it came from us | |
newCommand.setServiceId(getServiceId()); | |
// PERF: Support plugable serialization. | |
Serializer serializer = getSerializer(); | |
byte[] commandBytes = null; | |
if (serializer != null) { | |
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordinationSerialize); | |
try { | |
commandBytes = (byte[])serializer.serialize(command, (AbstractSession)getCommandProcessor()); | |
} finally { | |
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordinationSerialize); | |
} | |
} | |
// Propagate the command (synchronously or asynchronously) | |
propagator = new CommandPropagator(this, newCommand, commandBytes); | |
if (shouldPropagateAsynchronously()) { | |
propagator.asynchronousPropagateCommand(); | |
} else { | |
propagator.synchronousPropagateCommand(); | |
} | |
} finally { | |
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordination); | |
} | |
} | |
/** | |
* INTERNAL: | |
* Deserialize the command and execute it. | |
*/ | |
public void processCommandFromRemoteConnection(byte[] commandBytes) { | |
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordinationSerialize); | |
Command command = null; | |
try { | |
Serializer serializer = getSerializer(); | |
if (serializer == null) { | |
serializer = JavaSerializer.instance; | |
} | |
command = (Command)serializer.deserialize(commandBytes, (AbstractSession)getCommandProcessor()); | |
} finally { | |
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordinationSerialize); | |
} | |
processCommandFromRemoteConnection(command); | |
} | |
/** | |
* INTERNAL: | |
* Delegate to command processor | |
*/ | |
public void processCommandFromRemoteConnection(Command command) { | |
Object[] args = { command.getClass().getName(), command.getServiceId() }; | |
logDebug("received_remote_command", args); | |
this.commandProcessor.incrementProfile(SessionProfiler.RcmReceived); | |
this.commandProcessor.startOperationProfile(SessionProfiler.CacheCoordination); | |
try { | |
// If the command is internal then execute it on this RCM | |
if (command.isInternalCommand() || command instanceof RCMCommand) { | |
logDebug("processing_internal_command", args); | |
((RCMCommand)command).executeWithRCM(this); | |
return; | |
} | |
// Convert command if neccessary | |
Object newCommand = command; | |
if (commandConverter != null) { | |
logDebug("converting_to_user_command", args); | |
newCommand = commandConverter.convertToUserCommand(command); | |
} | |
// process command with command processor | |
logDebug("processing_remote_command", args); | |
this.commandProcessor.processCommand(newCommand); | |
} finally { | |
this.commandProcessor.endOperationProfile(SessionProfiler.CacheCoordination); | |
} | |
this.commandProcessor.incrementProfile(SessionProfiler.RemoteChangeSet); | |
} | |
public CommandProcessor getCommandProcessor() { | |
return commandProcessor; | |
} | |
public void setCommandProcessor(CommandProcessor newCommandProcessor) { | |
commandProcessor = newCommandProcessor; | |
if (newCommandProcessor instanceof Session) { | |
isEclipseLinkSession = true; | |
} | |
} | |
public TransportManager getTransportManager() { | |
return transportManager; | |
} | |
public void setTransportManager(TransportManager newTransportManager) { | |
transportManager = newTransportManager; | |
newTransportManager.setRemoteCommandManager(this); | |
discoveryManager = transportManager.createDiscoveryManager(); | |
} | |
/** | |
* INTERNAL: | |
* Delegate to the command procesor to handle the exception. | |
*/ | |
public void handleException(RuntimeException exception) { | |
commandProcessor.handleException(exception); | |
} | |
/** | |
* INTERNAL: | |
* A new service has been detected by the discovery manager. Take the appropriate | |
* action to connect to the service. | |
*/ | |
public void newServiceDiscovered(ServiceId service) { | |
RemoteConnection connection = transportManager.createConnection(service); | |
transportManager.addConnectionToExternalService(connection); | |
} | |
/** | |
* PUBLIC: | |
* Return the discovery manager that detects the arrival of new cluster members | |
*/ | |
public DiscoveryManager getDiscoveryManager() { | |
return discoveryManager; | |
} | |
/** | |
* PUBLIC: | |
* Return the converter instance used to convert between EclipseLink Command | |
* objects and an application command format. | |
*/ | |
public CommandConverter getCommandConverter() { | |
return commandConverter; | |
} | |
/** | |
* ADVANCED: | |
* Set the converter instance that will be invoked by this CommandProcessor | |
* to convert commands from their application command format into EclipseLink | |
* Command objects before being propagated to remote command manager services. | |
* The converter will also be invoked to convert EclipseLink Command objects into | |
* application format before being sent to the CommandProcessor for execution. | |
*/ | |
public void setCommandConverter(CommandConverter newCommandConverter) { | |
commandConverter = newCommandConverter; | |
} | |
/** | |
* INTERNAL: | |
*/ | |
public boolean shouldLogMessage(int logLevel) { | |
return commandProcessor.shouldLogMessages(logLevel); | |
} | |
public boolean shouldLogDebugMessage() { | |
return commandProcessor.shouldLogMessages(CommandProcessor.LOG_DEBUG); | |
} | |
public boolean shouldLogWarningMessage() { | |
return commandProcessor.shouldLogMessages(CommandProcessor.LOG_WARNING); | |
} | |
/** | |
* INTERNAL: | |
*/ | |
public void logMessage(int logLevel, String message, Object[] args) { | |
if (commandProcessor.shouldLogMessages(logLevel)) { | |
logMessageWithoutLevelCheck(logLevel, message, args); | |
} | |
} | |
/** | |
* INTERNAL: | |
* Use this method in case the necessary logLevel has been confirmed | |
* by calling commandProcessor.shouldLogMessages method | |
*/ | |
public void logMessageWithoutLevelCheck(int logLevel, String message, Object[] args) { | |
String i18nmsg = message; | |
if ((logLevel == CommandProcessor.LOG_ERROR) || (logLevel == CommandProcessor.LOG_WARNING)) { | |
i18nmsg = LoggingLocalization.buildMessage(message, args); | |
} else if ((logLevel == CommandProcessor.LOG_INFO) || (logLevel == CommandProcessor.LOG_DEBUG)) { | |
i18nmsg = TraceLocalization.buildMessage(message, args); | |
} | |
commandProcessor.logMessage(logLevel, i18nmsg); | |
} | |
/** | |
* INTERNAL: | |
* Convenience logging methods. | |
*/ | |
public void logDebug(String message, Object[] args) { | |
this.logMessage(CommandProcessor.LOG_DEBUG, message, args); | |
} | |
public void logDebugWithoutLevelCheck(String message, Object[] args) { | |
this.logMessageWithoutLevelCheck(CommandProcessor.LOG_DEBUG, message, args); | |
} | |
public void logInfo(String message, Object[] args) { | |
this.logMessage(CommandProcessor.LOG_INFO, message, args); | |
} | |
public void logWarning(String message, Object[] args) { | |
this.logMessage(CommandProcessor.LOG_WARNING, message, args); | |
} | |
public void logWarningWithoutLevelCheck(String message, Object[] args) { | |
this.logMessageWithoutLevelCheck(CommandProcessor.LOG_WARNING, message, args); | |
} | |
public void logError(String message, Object[] args) { | |
this.logMessage(CommandProcessor.LOG_ERROR, message, args); | |
} | |
/** | |
* INTERNAL: | |
* Return the service info that identifies this service instance | |
*/ | |
public ServiceId getServiceId() { | |
return serviceId; | |
} | |
/** | |
* PUBLIC: | |
* Return the service channel for this command manager. All command managers | |
* with the same service channel will send and receive commands from each other. | |
* Commands sent on other service channels will not be exchanged with this | |
* command manager. | |
*/ | |
public String getChannel() { | |
return this.getServiceId().getChannel(); | |
} | |
/** | |
* ADVANCED: | |
* Set the service channel for this command manager. All command managers | |
* with the same service channel will send and receive commands from each other. | |
* Commands sent on other service channels will not be exchanged with this | |
* command manager. | |
*/ | |
public void setChannel(String channel) { | |
this.getServiceId().setChannel(channel); | |
} | |
/** | |
* INTERNAL: | |
* Return whether this command manager should process profile commands | |
*/ | |
public boolean isCommandProcessorASession() { | |
return this.isEclipseLinkSession; | |
} | |
/** | |
* PUBLIC: | |
* Return the URL for this command manager. | |
*/ | |
public String getUrl() { | |
return this.getServiceId().getURL(); | |
} | |
/** | |
* ADVANCED: | |
* Set the URL for this command manager. | |
*/ | |
public void setUrl(String url) { | |
this.getServiceId().setURL(url); | |
} | |
/** | |
* PUBLIC: | |
* Return whether this command manager should propagate commands | |
* synchronously or asynchronously. If set to synchronous propagation then | |
* propagateCommand() will not return to the caller until the command has | |
* been executed on all of the services in the cluster. In asynchronous mode | |
* the command manager will create a separate thread for each of the remote | |
* service executions, and then promptly return to the caller. | |
*/ | |
public boolean shouldPropagateAsynchronously() { | |
return isAsynchronous; | |
} | |
/** | |
* ADVANCED: | |
* Set whether this command manager should propagate commands | |
* synchronously or asynchronously. If set to synchronous propagation then | |
* propagateCommand() will not return to the caller until the command has | |
* been executed on all of the services in the cluster. In asynchronous mode | |
* the command manager will create a separate thread for each of the remote | |
* service executions, and then promptly return to the caller. | |
*/ | |
public void setShouldPropagateAsynchronously(boolean asyncMode) { | |
isAsynchronous = asyncMode; | |
} | |
/** | |
* ADVANCED: | |
* Allow user to replace the $HOST subString of the local host URL with the user user input at runtime. | |
* By default, EclipseLink will try to discovery the local host IP and may fail due to security or network restrictions. | |
* In this case, user can call this API to specify the IP address or host name during pre-login session event or before session login. | |
* Example: | |
* If input is 145.23.127.79, the local host URL of ormi://$HOST:2971:/app_name will become ormi://145.23.127.79:2971:/app_name | |
*/ | |
public void replaceLocalHostIPAddress(String ipAddress) { | |
String newURL = Helper.replaceFirstSubString(this.getUrl(), ServiceId.HOST_TOKEN, ipAddress); | |
if (newURL != null) { | |
this.setUrl(newURL); | |
} | |
} | |
/** | |
* ADVANCED: | |
* Allow user to replace the $PORT subString of the local host URL with the user user input at runtime. | |
* In this case, user can call this API to specify the port number for a specific transport during pre-login session event or before session login. | |
* Example: | |
* If input is 7799, the local host URL of ormi://145.23.127.79:$PORT/app_name will become ormi://145.23.127.79:7799/app_name | |
*/ | |
public void replaceTransportPortNumber(String portNumber) { | |
String newURL = Helper.replaceFirstSubString(this.getUrl(), ServiceId.PORT_TOKEN, portNumber); | |
if (newURL != null) { | |
this.setUrl(newURL); | |
} | |
} | |
/** | |
* INTERNAL: | |
* Return the serverPlatform that identifies the application server | |
*/ | |
public ServerPlatform getServerPlatform() { | |
if (isCommandProcessorASession()) { | |
return ((DatabaseSessionImpl)getCommandProcessor()).getServerPlatform(); | |
} else { | |
if (serverPlatform != null) { | |
return this.serverPlatform; | |
} else { | |
throw RemoteCommandManagerException.errorGettingServerPlatform(); | |
} | |
} | |
} | |
/** | |
* PUBLIC: | |
* The ServerPlatform must be set manually when the RemoteCommandManager'CommandProcessor is not EclipseLink Session. | |
* When the CommandProcessor is a EclipseLink Session, the ServerPlatform is automatically gotten from the Session. | |
*/ | |
public void setServerPlatform(ServerPlatform theServerPlatform) { | |
this.serverPlatform = theServerPlatform; | |
} | |
/** | |
* PUBLIC: | |
* Return the Serializer to use for serialization of commands. | |
*/ | |
public Serializer getSerializer() { | |
return serializer; | |
} | |
/** | |
* PUBLIC: | |
* Set the Serializer to use for serialization of commands. | |
*/ | |
public void setSerializer(Serializer serializer) { | |
this.serializer = serializer; | |
} | |
} |