| /* |
| * Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0, |
| * or the Eclipse Distribution License v. 1.0 which is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
| */ |
| |
| // Contributors: |
| // Oracle - initial API and implementation from Oracle TopLink |
| package org.eclipse.persistence.internal.sessions.coordination.broadcast; |
| |
| import org.eclipse.persistence.internal.helper.Helper; |
| import org.eclipse.persistence.internal.sessions.coordination.RemoteConnection; |
| import org.eclipse.persistence.sessions.coordination.broadcast.BroadcastTransportManager; |
| import org.eclipse.persistence.sessions.coordination.Command; |
| import org.eclipse.persistence.sessions.coordination.RemoteCommandManager; |
| import org.eclipse.persistence.exceptions.CommunicationException; |
| import org.eclipse.persistence.exceptions.RemoteCommandManagerException; |
| |
| /** |
| * <p> |
| * <b>Purpose</b>: Base class extending RemoteConnection for broadcasting RCM protocols: JMS and Oc4jJGroups. |
| * </p><p> |
| * <b>Description</b>: Defines lifecycle states and most of the methods, |
| * as well as exception processing and info logging. |
| * </p> |
| * @author Andrei Ilitchev |
| * @since OracleAS TopLink 11<i>g</i> (11.1.1) |
| */ |
| public abstract class BroadcastRemoteConnection extends RemoteConnection { |
| protected RemoteCommandManager rcm; |
| protected String topicName; |
| |
| // Working state - connection can send and/or receive messages. |
| public static final String STATE_ACTIVE = "ACTIVE"; |
| // close method called but not all resources used by connection are freed yet. |
| // Note that even after close method returns the state still may be CLOSING. |
| public static final String STATE_CLOSING = "CLOSING"; |
| // All resources are freed. |
| public static final String STATE_CLOSED = "CLOSED"; |
| // STATE_ACTIVE -> STATE_CLOSING -> STATE_CLOSED |
| protected String state = STATE_ACTIVE; |
| |
| // Connection information String. |
| protected String displayString; |
| // Array containing a single element - displayString. Used for warnings and debug logging. |
| protected Object[] info; |
| // Array containing a two elements - displayString and an empty String. |
| // Used for debug logging which require messageId in case messageId is null |
| // (so that a new array is not created each time). |
| protected Object[] infoExt; |
| |
| protected BroadcastRemoteConnection(RemoteCommandManager rcm) { |
| this.serviceId = rcm.getServiceId(); |
| this.rcm = rcm; |
| this.topicName = ((BroadcastTransportManager)rcm.getTransportManager()).getTopicName(); |
| } |
| |
| /** |
| * INTERNAL: |
| * Publish the remote command. The result of execution is returned. |
| * This method is used only by external (publishing) connection. |
| */ |
| @Override |
| public Object executeCommand(Command command) throws CommunicationException { |
| if (isActive()) { |
| try { |
| return executeCommandInternal(command); |
| } catch (Exception exception) { |
| // Note that there is no need to removeConnection here - it's removed by the calling method: |
| // org.eclipse.persistence.internal.sessions.coordination.CommandPropagator.propagateCommand. |
| // This method catches CommunicationException and processes it in handleCommunicationException method. |
| // The latter method, in case shouldRemoveConnectionOnError==true, removes the connection; |
| // otherwise it wraps the CommunicationException into RemoteCommandManagerException |
| // (with errorCode RemoteCommandManagerException.ERROR_PROPAGATING_COMMAND) |
| // and gives the use a chance to handle it - this is an opportunity for the user to |
| // stop remote command processing. |
| throw CommunicationException.errorSendingMessage(getServiceId().getId(), exception); |
| } |
| } else { |
| rcm.logWarning("broadcast_ignored_command_while_closing_connection", getInfo()); |
| return null; |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Publish the remote command. The result of execution is returned. |
| * This method is used only by external (publishing) connection. |
| */ |
| @Override |
| public Object executeCommand(byte[] command) throws CommunicationException { |
| if (isActive()) { |
| try { |
| return executeCommandInternal(command); |
| } catch (Exception exception) { |
| // Note that there is no need to removeConnection here - it's removed by the calling method: |
| // org.eclipse.persistence.internal.sessions.coordination.CommandPropagator.propagateCommand. |
| // This method catches CommunicationException and processes it in handleCommunicationException method. |
| // The latter method, in case shouldRemoveConnectionOnError==true, removes the connection; |
| // otherwise it wraps the CommunicationException into RemoteCommandManagerException |
| // (with errorCode RemoteCommandManagerException.ERROR_PROPAGATING_COMMAND) |
| // and gives the use a chance to handle it - this is an opportunity for the user to |
| // stop remote command processing. |
| throw CommunicationException.errorSendingMessage(getServiceId().getId(), exception); |
| } |
| } else { |
| rcm.logWarning("broadcast_ignored_command_while_closing_connection", getInfo()); |
| return null; |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Publish the remote command. The result of execution is returned. |
| * This method is used only by external (publishing) connection. |
| */ |
| protected abstract Object executeCommandInternal(Object command) throws Exception; |
| |
| /** |
| * INTERNAL: |
| * Called from executeCommandInternal to log debugInfo right before sending the message. |
| * Returns array {toString(), messageId }. |
| * In case messageId is null returns getInfoExt() avoiding creation of a new array. |
| */ |
| protected Object[] logDebugBeforePublish(String messageId) { |
| Object[] debugInfo = null; |
| if(messageId == null) { |
| debugInfo = getInfoExt(); |
| } else { |
| debugInfo = new Object[] {toString(), messageId}; |
| } |
| // call logDebugWithoutLevelCheck to avoid the second rcm.shouldLogDebugMessage() check |
| rcm.logDebugWithoutLevelCheck("broadcast_sending_message", debugInfo); |
| return debugInfo; |
| } |
| |
| /** |
| * INTERNAL: |
| * Called from executeCommandInternal to log debugInfo right after sending the message. |
| * Only call this method in case logDebugBeforePublish returned non-null |
| * this is indication that debug logging is enabled. |
| * Pass to this method debugInfo returned by logDebugBeforePublish. |
| * Need to pass messageId only in case it has changed since logDebugBeforePublish: |
| * some broadcasting protocols (JMS) don't generate messageId until the message is published. |
| */ |
| protected void logDebugAfterPublish(Object[] debugInfo, String messageId) { |
| if(messageId != null) { |
| if(debugInfo == getInfoExt()) { |
| // need to create a new debugInfo object - the original is the cached info. |
| debugInfo = new Object[] {toString(), messageId}; |
| } else { |
| // need only update messageId on existing debugInfo |
| debugInfo[3] = messageId; |
| } |
| } |
| // call logDebugWithoutLevelCheck to avoid the second rcm.shouldLogDebugMessage() check |
| rcm.logDebugWithoutLevelCheck("broadcast_sent_message", debugInfo); |
| } |
| |
| /** |
| * INTERNAL: |
| * Called when a message is received to log debugInfo: |
| * { toString(), messageId }. |
| * This method is used by local (listening) connection only. |
| */ |
| protected void logDebugOnReceiveMessage(String messageId) { |
| Object[] debugInfo = null; |
| if(messageId == null) { |
| debugInfo = getInfoExt(); |
| } else { |
| debugInfo = new Object[] {toString(), messageId}; |
| } |
| // call logDebugWithoutLevelCheck to avoid the second rcm.shouldLogDebugMessage() check |
| rcm.logDebugWithoutLevelCheck("broadcast_retreived_message", debugInfo); |
| } |
| |
| /** |
| * INTERNAL: |
| * Process the object extracted from the received message. |
| * Pass to this method messageInfo created by logDebugOnReceiveMessage method. |
| * This method is used by local (listening) connection only. |
| */ |
| protected void processReceivedObject(Object object, String messageId) { |
| Command remoteCommand = null; |
| if (object instanceof Command) { |
| remoteCommand = (Command)object; |
| try { |
| // prevent the processing of messages sent with the same serviceId |
| if (shouldCheckServiceId()) { |
| if (remoteCommand.getServiceId().getId().equals(this.serviceId.getId())) { |
| return; |
| } |
| } |
| if (remoteCommand.getServiceId().getChannel().equals(this.serviceId.getChannel())) { |
| if(rcm.shouldLogDebugMessage()) { |
| Object[] args = { toString(), messageId, remoteCommand.getServiceId().toString(), Helper.getShortClassName(remoteCommand) }; |
| rcm.logDebugWithoutLevelCheck("broadcast_processing_remote_command", args); |
| } |
| rcm.processCommandFromRemoteConnection(remoteCommand); |
| } else { |
| if(rcm.shouldLogWarningMessage()) { |
| Object[] args = { toString(), messageId, remoteCommand.getServiceId().toString(), Helper.getShortClassName(remoteCommand)}; |
| rcm.logWarningWithoutLevelCheck("broadcast_ignore_remote_command_from_different_channel", args); |
| } |
| } |
| } catch (RuntimeException e) { |
| try { |
| rcm.handleException(RemoteCommandManagerException.errorProcessingRemoteCommand(toString(), messageId, remoteCommand.getServiceId().toString(), Helper.getShortClassName(remoteCommand), e)); |
| } catch (RuntimeException ex) { |
| // User had a chance to handle the exception. |
| // The method is called by a listener thread - no one could catch this exception. |
| } |
| } |
| } else if (object == null) { |
| Object[] args = { toString(), messageId}; |
| rcm.logWarning("broadcast_remote_command_is_null", args); |
| } else { |
| if(rcm.shouldLogWarningMessage()) { |
| String className = object.getClass().getName(); |
| Object[] args = { toString(), messageId, className }; |
| rcm.logWarningWithoutLevelCheck("broadcast_remote_command_wrong_type", args); |
| } |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Call this message in case there is failure to extract the object |
| * (to be passed to processReceivedObject) from the message. |
| * Pass to this method debugInfo created by logDebugOnReceiveMessage method. |
| * This method is used by local (listening) connection only. |
| */ |
| protected void failDeserializeMessage(String messageId, Exception exception) { |
| try { |
| rcm.handleException(RemoteCommandManagerException.errorDeserializeRemoteCommand(toString(), messageId, exception)); |
| } catch (Exception ex) { |
| // User had a chance to handle the exception. |
| // The method is called by a listener thread - no one could catch this exception. |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * This method is called when connection in no longer used and it's resources should be freed. |
| * As soon as this method is called the state is CLOSING. |
| * Usually the state is CLOSED just before the method returns, |
| * but there are some special cases (see comment to areAllResourcesFreedOnClose method) |
| * when the state is still CLOSING after the method returns. |
| */ |
| @Override |
| public void close() { |
| synchronized(this) { |
| if(isClosing()) { |
| rcm.logWarning("broadcast_connection_already_closing", getInfo()); |
| return; |
| } else if(isClosed()) { |
| rcm.logWarning("broadcast_connection_already_closed", getInfo()); |
| return; |
| } else { |
| state = STATE_CLOSING; |
| } |
| } |
| try { |
| rcm.logDebug("broadcast_closing_connection", getInfo()); |
| closeInternal(); |
| } catch (Exception exception) { |
| Object[] args = { toString(), exception }; |
| rcm.logWarning("broadcast_exception_thrown_when_attempting_to_close_connection", args); |
| } finally { |
| if(areAllResourcesFreedOnClose()) { |
| rcm.logDebug("broadcast_connection_closed", getInfo()); |
| state = STATE_CLOSED; |
| } |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * State of the connection. |
| */ |
| public String getState() { |
| return state; |
| } |
| |
| /** |
| * INTERNAL: |
| * Connection is open for business. |
| */ |
| public boolean isActive() { |
| return state == STATE_ACTIVE; |
| } |
| |
| /** |
| * INTERNAL: |
| * close method has been called. |
| */ |
| public boolean isClosing() { |
| return state == STATE_CLOSING; |
| } |
| |
| /** |
| * INTERNAL: |
| * Connection is closed - all resources were freed. |
| */ |
| public boolean isClosed() { |
| return state == STATE_CLOSED; |
| } |
| |
| /** |
| * INTERNAL: |
| * This method is called by close method. |
| * This method usually |
| * (but not always see comment to areAllResourcesFreedOnClose method) |
| * frees all the resources. |
| */ |
| protected abstract void closeInternal() throws Exception; |
| |
| /** |
| * INTERNAL: |
| */ |
| public String getTopicName() { |
| return topicName; |
| } |
| |
| /** |
| * INTERNAL: |
| */ |
| protected Object[] getInfo() { |
| if(info == null) { |
| info = new Object[] {toString()}; |
| } |
| return info; |
| } |
| |
| /** |
| * INTERNAL: |
| */ |
| protected Object[] getInfoExt() { |
| if(infoExt == null) { |
| infoExt = new Object[] {toString(), ""}; |
| } |
| return infoExt; |
| } |
| |
| /** |
| * INTERNAL: |
| * Indicates whether all the resources used by connection are freed after close method returns. |
| * Usually that's the case. However in case of local (listening) JMSTopicRemoteConnection |
| * close merely indicates to the listening thread that it should free TopicConnection and exit. |
| * Note that it may take a while: the listening thread waits until subscriber.receive method either |
| * returns a message or throws an exception. |
| */ |
| protected boolean areAllResourcesFreedOnClose() { |
| return true; |
| } |
| |
| /** |
| * INTERNAL: |
| */ |
| @Override |
| public String toString() { |
| if(displayString == null) { |
| createDisplayString(); |
| } |
| return displayString; |
| } |
| |
| /** |
| * INTERNAL: |
| */ |
| protected void createDisplayString() { |
| this.displayString = Helper.getShortClassName(this) + "[" + serviceId.toString() + ", topic " + topicName +"]"; |
| } |
| |
| /** |
| * INTERNAL: |
| * Return whether a BroadcastConnection should check a ServiceId against its |
| * own ServiceId to avoid the processing of Commands with the same ServiceId. |
| * @return boolean |
| */ |
| protected boolean shouldCheckServiceId() { |
| return false; |
| } |
| |
| } |