| /* |
| * Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0, |
| * or the Eclipse Distribution License v. 1.0 which is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
| */ |
| |
| // Contributors: |
| // Oracle - initial API and implementation from Oracle TopLink |
| // cdelahun - Bug 214534: added JMS Cache Coordination for publishing only |
| package org.eclipse.persistence.internal.sessions.coordination.jms; |
| |
| import java.io.Serializable; |
| |
| import jakarta.jms.BytesMessage; |
| import jakarta.jms.JMSException; |
| import jakarta.jms.Message; |
| import jakarta.jms.ObjectMessage; |
| import jakarta.jms.Topic; |
| import jakarta.jms.TopicConnection; |
| import jakarta.jms.TopicConnectionFactory; |
| import jakarta.jms.TopicPublisher; |
| import jakarta.jms.TopicSession; |
| import jakarta.jms.TopicSubscriber; |
| |
| import org.eclipse.persistence.exceptions.RemoteCommandManagerException; |
| import org.eclipse.persistence.internal.sessions.AbstractSession; |
| import org.eclipse.persistence.internal.sessions.coordination.broadcast.BroadcastRemoteConnection; |
| import org.eclipse.persistence.sessions.coordination.RemoteCommandManager; |
| import org.eclipse.persistence.sessions.coordination.jms.JMSTopicTransportManager; |
| import org.eclipse.persistence.sessions.serializers.JavaSerializer; |
| import org.eclipse.persistence.sessions.serializers.Serializer; |
| |
| /** |
| * <p> |
| * <b>Purpose</b>: Define the implementation of the abstract RemoteConnection for JMS. |
| * <p> |
| * <b>Description</b>: Executing commands implementation of RemoteConnection is done via JMS Publisher. |
| * |
| * Using a single TopicConnection for both publishing and subscribing would |
| * allow subscriber to ignore messages sent through the same TopicConnection - and therefore |
| * allow JMSTopicRemoteConnection to ignore messages that it has itself published. |
| * Unfortunately J2EE doesn't allow that: |
| * J2EE spec. (J2EE.6.6 in v1.4) states: |
| * "The following methods may only be used by application components executing |
| * in the application client container: javax.jms.Session method setMessageListener ... |
| * Application components in the web and EJB containers must not attempt to create more than one |
| * active (not closed) Session object per connection." |
| * Because of these restrictions |
| * a) two JMSTopicRemoteConnection are required - one for publishing (external) and another one for listening (local); |
| * b) listening should be done using subscriber.receive() in an infinite loop in a separate thread, |
| * that's why the class implements Runnable interface. |
| * c) publishing connection (external) could be used concurrently to send messages, so it cannot use the same publisher/session/topicConnection |
| * Instead, it will store the TopicConnectionFactory and use it to create connections when executeCommandInternal is called (much like |
| * DatabaseAccessor when an external pool is used) |
| * |
| * @author Steven Vo |
| * @since OracleAS TopLink 10<i>g</i> (10.0.3) |
| */ |
| public class JMSTopicRemoteConnection extends BroadcastRemoteConnection implements Runnable { |
| protected TopicConnectionFactory topicConnectionFactory; |
| protected Topic topic; |
| // indicates whether it's a local connection. |
| protected boolean isLocal; |
| |
| // not used by External (publishing) connection unless it is set to reuse the JMSTopicPublisher |
| private TopicPublisher publisher; |
| protected TopicConnection topicConnection; |
| protected TopicSession topicSession; |
| |
| // Used only by local (listening) connection |
| protected TopicSubscriber subscriber; |
| |
| |
| // Used only by local (listening) connection and |
| // only in case shouldRemoveConnectionOnError==false. |
| // Time to wait after error receiving jms message: |
| // wait to avoid a busy thread throwing barrage of exceptions |
| // in case JMS server goes down. |
| // User can avoid the wait by handling ERROR_RECEIVING_JMS_MESSAGE exception. |
| public static long WAIT_ON_ERROR_RECEIVING_JMS_MESSAGE = 10000; |
| |
| /** |
| * INTERNAL: |
| * Constructor creating either a local or external connection. Local connections created this way connect to the topicSession and cache |
| * the session and subscriber. External connections cache only the topicConnection and will obtain the session/publisher when needed. |
| */ |
| public JMSTopicRemoteConnection(RemoteCommandManager rcm, TopicConnectionFactory topicConnectionFactory, Topic topic, boolean isLocalConnectionBeingCreated, boolean reuseJMSTopicPublisher) throws JMSException { |
| super(rcm); |
| this.topicConnectionFactory = topicConnectionFactory; |
| this.topic = topic; |
| this.isLocal = isLocalConnectionBeingCreated; |
| rcm.logDebug("creating_broadcast_connection", getInfo()); |
| try { |
| if(isLocalConnectionBeingCreated) { |
| // it's a local connection |
| this.topicConnection = topicConnectionFactory.createTopicConnection(); |
| this.topicSession = topicConnection.createTopicSession(false, jakarta.jms.Session.AUTO_ACKNOWLEDGE); |
| this.subscriber = topicSession.createSubscriber(topic); |
| topicConnection.start(); |
| rcm.logDebug("broadcast_connection_created", getInfo()); |
| rcm.getServerPlatform().launchContainerRunnable(this); |
| } else if (reuseJMSTopicPublisher) { |
| // it's an external connection and is set to reuse the TopicPublisher (legacy) |
| this.topicConnection = topicConnectionFactory.createTopicConnection(); |
| this.topicSession = topicConnection.createTopicSession(false, jakarta.jms.Session.AUTO_ACKNOWLEDGE); |
| this.setPublisher(topicSession.createPublisher(topic)); |
| rcm.logDebug("broadcast_connection_created", getInfo()); |
| } //else bug214534: it's an external connection, with TopicConnections, sessions and publishers created as needed |
| } catch (JMSException ex) { |
| rcm.logDebug("failed_to_create_broadcast_connection", getInfo()); |
| close(); |
| throw ex; |
| } |
| } |
| |
| /** |
| * Creates local connections that do not use a TopicConnection or TopicSession, |
| * useful only for processing already received JMS messages |
| * @see onMessage |
| */ |
| public JMSTopicRemoteConnection(RemoteCommandManager rcm){ |
| super(rcm); |
| this.isLocal = true; |
| } |
| |
| /** |
| * INTERNAL: |
| * Indicates whether connection is local (subscriber) |
| * or external (publisher). |
| */ |
| public boolean isLocal() { |
| return isLocal; |
| } |
| |
| /** |
| * INTERNAL: |
| * Execute the remote command. The result of execution is returned. |
| * This method is used only by external (publishing) connection. |
| */ |
| @Override |
| protected Object executeCommandInternal(Object command) throws Exception { |
| TopicConnection jmsConnection = null; |
| try { |
| TopicPublisher topicPublisher = this.publisher; |
| TopicSession publishingSession = this.topicSession; |
| //if the publisher is set, reuse it. Otherwise, create it (and the connection and session) from the topicConnectionFactory |
| if ( topicPublisher == null ){ |
| jmsConnection = topicConnectionFactory.createTopicConnection(); |
| publishingSession = jmsConnection.createTopicSession(false, jakarta.jms.Session.AUTO_ACKNOWLEDGE); |
| topicPublisher = publishingSession.createPublisher(topic); |
| } |
| |
| Message message; |
| if (command instanceof byte[]) { |
| message = publishingSession.createBytesMessage(); |
| ((BytesMessage)message).writeBytes((byte[])command); |
| } else { |
| message = publishingSession.createObjectMessage(); |
| ((ObjectMessage)message).setObject((Serializable)command); |
| } |
| |
| Object[] debugInfo = null; |
| if (rcm.shouldLogDebugMessage()) { |
| // null passed because JMSMessageId is not yet created. |
| debugInfo = logDebugBeforePublish(null); |
| } |
| |
| topicPublisher.publish(message); |
| |
| // debug logging is on |
| if (debugInfo != null) { |
| // now messageId has been created - let's use it. |
| logDebugAfterPublish(debugInfo, message.getJMSMessageID()); |
| } |
| |
| return null; |
| } finally { |
| //only need to close the topicConnection, not the session or publisher, and only if it was created in this method. |
| if (jmsConnection != null) { |
| jmsConnection.close(); |
| } |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Process received JMS message. |
| * This method is used only by local (listening) connection. |
| */ |
| public void onMessage(Message message) { |
| String topic = null; |
| String messageId = ""; |
| if (rcm.shouldLogDebugMessage()) { |
| try { |
| messageId = message.getJMSMessageID(); |
| logDebugOnReceiveMessage(messageId); |
| topic = logDebugJMSTopic(message); |
| } catch (JMSException ex) { |
| // ignore |
| } |
| } |
| |
| Object object = null; |
| try { |
| if (message instanceof ObjectMessage) { |
| object = ((ObjectMessage)message).getObject(); |
| } else if (message instanceof BytesMessage) { |
| BytesMessage byteMessage = (BytesMessage)message; |
| byte[] bytes = new byte[(int)byteMessage.getBodyLength()]; |
| byteMessage.readBytes(bytes); |
| Serializer serializer = this.rcm.getSerializer(); |
| if (serializer == null) { |
| serializer = JavaSerializer.instance; |
| } |
| object = serializer.deserialize(bytes, (AbstractSession)this.rcm.getCommandProcessor()); |
| } else { |
| if (this.rcm.shouldLogWarningMessage() && (topic == null)) { |
| try { |
| topic = ((Topic)message.getJMSDestination()).getTopicName(); |
| } catch (JMSException ex) { |
| // ignore |
| topic = ""; |
| } |
| Object[] args = { message.getClass().getName(), topic }; |
| this.rcm.logWarningWithoutLevelCheck("received_unexpected_message_type", args); |
| } |
| return; |
| } |
| } catch (Exception exception) { |
| if (messageId.length() == 0) { |
| try { |
| messageId = message.getJMSMessageID(); |
| } catch (JMSException ex) { |
| // ignore |
| } |
| } |
| failDeserializeMessage(messageId, exception); |
| return; |
| } |
| |
| processReceivedObject(object, messageId); |
| } |
| |
| /** |
| * INTERNAL: |
| * Indicates whether all the resources used by connection are freed after close method returns. |
| * Usually that's the case. However in case of local (listening) JMSTopicRemoteConnection |
| * close merely indicates to the listening thread that it should free TopicConnection and exit. |
| * Note that it may take a while: the listening thread waits until subscriber.receive method either |
| * returns a message or throws an exception. |
| */ |
| @Override |
| protected boolean areAllResourcesFreedOnClose() { |
| return !isLocal(); |
| } |
| |
| /** |
| * INTERNAL: |
| * This method is called by close method. |
| * This method usually |
| * (but not always see comment to areAllResourcesFreedOnClose method) |
| * frees all the resources. |
| */ |
| @Override |
| protected void closeInternal() throws JMSException { |
| //this method should be a no-op now that external connections open/close TopicConnection when needed. Close on Local |
| //connections will eventually cause topicConnection.close() in their listening thread, so it should not be called here |
| if(areAllResourcesFreedOnClose() && topicConnection!=null) { |
| // There is no need to close the sessions, producers, and consumers of a closed TopicConnection. |
| topicConnection.close(); |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| */ |
| protected String logDebugJMSTopic(Message message) throws JMSException { |
| String topic = ((Topic)message.getJMSDestination()).getTopicName(); |
| Object[] args = { topic }; |
| // call logDebugWithoutLevelCheck to avoid the second rcm.shouldLogDebugMessage() check |
| rcm.logDebugWithoutLevelCheck("retreived_remote_message_from_JMS_topic", args); |
| return topic; |
| } |
| |
| /** |
| * INTERNAL: |
| * This method is used by local (listening) connection only. |
| * The only way to exit the loop is to set isActive to false - |
| * there should be no uncaught exceptions thrown from inside the loop. |
| * The execution exits the loop either in case of exception in remove connection on error mode; |
| * or by trasportManager.removeLocalConnection() call |
| * (which calls connection.close(), which sets isActive to false). |
| */ |
| @Override |
| public void run() { |
| JMSTopicTransportManager tm = (JMSTopicTransportManager)rcm.getTransportManager(); |
| rcm.logDebug("broadcast_connection_start_listening", getInfo()); |
| |
| // indicates whether to create a new connection before exiting the thread. |
| boolean shouldReconnect = false; |
| // exception indicating that the received message is null. |
| RuntimeException messageIsNullException = null; |
| // isActive() returning false indicates that close method has been called. |
| // Should never exit this loop because of an uncaught exception. |
| while (isActive()) { |
| try { |
| Message message = subscriber.receive(); |
| // need the second isActive check here: |
| // close method could have been called while subscriber.receive() was waiting. |
| if (isActive()) { |
| if(message == null) { |
| try { |
| // user has a chance to handle exception - for instance to ignore it. |
| rcm.handleException(RemoteCommandManagerException.errorJMSMessageIsNull()); |
| // exception has been handled, go to the next iteration. |
| continue; |
| } catch (RuntimeException ex) { |
| messageIsNullException = ex; |
| // throw a dummy JMSException to get into catch block |
| throw new JMSException(""); |
| } |
| } |
| // process the message and log a warning without throwing exception if there was exception. |
| rcm.getServerPlatform().launchContainerRunnable(new JMSOnMessageHelper(message)); |
| } |
| } catch (JMSException e) { |
| // need the second isActive check here: |
| // close method could have been called while subscriber.receive() was waiting. |
| if (isActive()) { |
| RemoteCommandManagerException rcmException; |
| if(messageIsNullException != null) { |
| rcmException = RemoteCommandManagerException.errorReceivingJMSMessage(messageIsNullException); |
| messageIsNullException = null; |
| } else { |
| rcmException = RemoteCommandManagerException.errorReceivingJMSMessage(e); |
| } |
| if (tm.shouldRemoveConnectionOnError()) { |
| shouldReconnect = true; |
| Object[] args = { getServiceId(), rcmException }; |
| rcm.logWarning("drop_connection_on_error", args); |
| // after connection is closed isActive will return false. |
| tm.removeLocalConnection(); |
| } else { |
| try { |
| // user has a chance to handle exception: |
| // for instance to shut down the command manager. |
| rcm.handleException(rcmException); |
| } catch (RuntimeException ex) { |
| // Ignore the exception, sleep before going back to listening. |
| Object[] args = { toString(), rcmException, WAIT_ON_ERROR_RECEIVING_JMS_MESSAGE }; |
| rcm.logWarning("broadcast_listening_sleep_on_error", args); |
| try { |
| Thread.sleep(WAIT_ON_ERROR_RECEIVING_JMS_MESSAGE); |
| } catch (InterruptedException interruptedEception) { |
| // Ignore |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| // Out of the loop - that means close method has been called. |
| rcm.logDebug("broadcast_connection_stop_listening", getInfo()); |
| if(isClosing()) { |
| try { |
| // There is no need to close the sessions, producers, and consumers of a closed TopicConnection. |
| topicConnection.close(); |
| } catch (JMSException closeException) { |
| Object[] args = { displayString, closeException }; |
| rcm.logWarning("broadcast_exception_thrown_when_attempting_to_close_connection", args); |
| } finally { |
| rcm.logDebug("broadcast_connection_closed", getInfo()); |
| state = STATE_CLOSED; |
| } |
| } |
| |
| if(shouldReconnect && !tm.getRemoteCommandManager().isStopped()) { |
| try { |
| tm.createLocalConnection(); |
| } catch (RemoteCommandManagerException ex) { |
| // Ignore exception - user had a chance to handle it in createLocalConnection method: |
| // for instance to change host url and create a new local connection. |
| } |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Used for debug logging |
| */ |
| @Override |
| protected void createDisplayString() { |
| super.createDisplayString(); |
| displayString = (isLocal() ? "Local " : "External ") + displayString; |
| } |
| |
| /** |
| * INTERNAL: |
| * Return whether a BroadcastConnection should check a ServiceId against its |
| * own ServiceId to avoid the processing of Commands with the same ServiceId. |
| * This should take place (return true) for a JMSTopicRemoteConnection. |
| * @return boolean |
| */ |
| @Override |
| protected boolean shouldCheckServiceId() { |
| return true; |
| } |
| |
| /** |
| * INTERNAL: |
| * set the TopicPublisher to be used when this RemoteConnection executes a command. |
| * Setting the TopicPublisher avoids having it obtained on each executeCommandInternal |
| * call. Passing in a publisher requires a TopicSession to also be set. These will |
| * not be closed until the external RemoteConnection is closed, and then only if the |
| * TopicConnection is also set. |
| */ |
| public void setPublisher(TopicPublisher publisher) { |
| this.publisher = publisher; |
| } |
| |
| public TopicPublisher getPublisher() { |
| return publisher; |
| } |
| |
| /** |
| * INTERNAL: |
| * set the TopicSubscriber on a local RemoteConnection for reading JMS messages when |
| * this runnable connection is started in a thread. If setting this, a TopicConnection |
| * is also required to be set, in order for it to be closed when the thread completes. |
| * This is only to be used when using the JMSTopicRemoteConnection(rcm) constructor. |
| */ |
| public void setSuscriber(TopicSubscriber subscriber) { |
| this.subscriber = subscriber; |
| } |
| |
| public TopicSubscriber getSubscriber() { |
| return subscriber; |
| } |
| |
| /** |
| * INTERNAL: |
| * set the TopicSession to be used when this RemoteConnection executes a command if |
| * the publisher is also set. Setting the TopicSession and Publisher avoids having |
| * them obtained on each executeCommandInternal call. Passing in a TopicSession |
| * requires a TopicPublisher to also be set. These will not be closed until the |
| * external RemoteConnection is closed, and then only if the TopicConnection is also |
| * set. |
| */ |
| public void setTopicSession(TopicSession topicSession) { |
| this.topicSession = topicSession; |
| } |
| |
| public TopicSession getTopicSession() { |
| return topicSession; |
| } |
| |
| /** |
| * INTERNAL: |
| * Set the TopicConnectionFactory, which is used if the publisher is not set to |
| * obtain the TopicConnection, TopicSession and TopicPublisher |
| */ |
| public void setTopicConnectionFactory(TopicConnectionFactory topicConnectionFactory){ |
| this.topicConnectionFactory = topicConnectionFactory; |
| } |
| |
| public TopicConnection getTopicConnectionFactory() { |
| return topicConnection; |
| } |
| |
| /** |
| * INTERNAL: |
| * Set the TopicConnection. If this is set, a Publisher and TopicSession must also |
| * be set, or a new TopicConnection will be obtained on each executeCommandInternal |
| * call. This TopicConnection is only used on close, as closing the TopicConnection |
| * also closes any open TopicSessions and Publishers obtained from it. |
| */ |
| public void setTopicConnection(TopicConnection topicConnection){ |
| this.topicConnection = topicConnection; |
| } |
| |
| public TopicConnection getTopicConnection() { |
| return topicConnection; |
| } |
| |
| /** |
| * INTERNAL: |
| * Set the Topic. The Topic is required with the TopicConnectionFactory to obtain connections |
| * if the TopicPublisher is not set. |
| */ |
| public void setTopic(Topic topic){ |
| this.topic = topic; |
| } |
| |
| public Topic getTopic() { |
| return topic; |
| } |
| |
| class JMSOnMessageHelper implements Runnable { |
| Message message = null; |
| |
| public JMSOnMessageHelper(Message message) { |
| this.message = message; |
| } |
| |
| @Override |
| public void run() { |
| onMessage(message); |
| } |
| } |
| } |