blob: 5f963a79d2a32b373e446ea4499d6f5b33c44a09 [file] [log] [blame]
/*
* Copyright (c) 1998, 2022 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
package org.eclipse.persistence.testing.tests.distributedservers.rcm.jms;
import java.util.Properties;
import javax.jms.ObjectMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import org.eclipse.persistence.exceptions.RemoteCommandManagerException;
import org.eclipse.persistence.internal.sessions.AbstractSession;
import org.eclipse.persistence.sessions.coordination.RemoteCommandManager;
import org.eclipse.persistence.sessions.coordination.jms.JMSTopicTransportManager;
import org.eclipse.persistence.testing.framework.oracle.OracleAqHelper;
import org.eclipse.persistence.testing.tests.distributedservers.rcm.broadcast.BroadcastSetupHelper;
public class JMSSetupHelper extends BroadcastSetupHelper {
protected static JMSSetupHelper helper;
public static JMSSetupHelper getHelper() {
if (helper == null) {
helper = new JMSSetupHelper();
}
return helper;
}
protected JMSSetupHelper() {
super();
factoryJndiName = JMSTopicTransportManager.DEFAULT_CONNECTION_FACTORY;
topicJndiName = JMSTopicTransportManager.DEFAULT_TOPIC;
}
// If removeConnectionOnError is set,
// local (listening) connection is removed in JMS case if subscriber.receive() throws exception;
// however in Oc4jJGroups case the only (local and external) connection is not removed
// unless message sending fails.
@Override
public boolean isLocalConnectionRemovedOnListeningError() {
return true;
}
// Returns errorCode of RemoteCommandManagerException thrown in case
// creation of localConnection has failed.
@Override
public int getRcmExceptionErrorCodeOnFailureToCreateLocalConnection() {
return RemoteCommandManagerException.ERROR_CREATING_LOCAL_JMS_CONNECTION;
}
// JMSTopicTransportManager has separate connection for sending (external) and receiving (local) messages.
// Oc4jJGroups uses a single connection for both sending and receiving messages.
@Override
public boolean isLocalConnectionAlsoExternalConnection() {
return false;
}
protected String user = OracleAqHelper.getAqUser();
protected String password = OracleAqHelper.getAqPassword();
protected String connectionString;
protected String queueName = "jms_test";
protected String queueTableName = "jms_test_table";
protected String newConnectionString;
protected oracle.jdbc.pool.OracleDataSource oracleDataSource;
public void setConnectionString(String newConnectionString) {
this.newConnectionString = newConnectionString;
}
public void setConnectionStringFromSession(AbstractSession session) {
this.newConnectionString = session.getLogin().getConnectionString();
}
protected void updateDbSettings() {
connectionString = newConnectionString;
}
// returns array of two objects: the first is factory, the second is topic
@Override
protected Object[] internalCreateFactory() throws Exception {
updateDbSettings();
createInDb();
oracleDataSource = new oracle.jdbc.pool.OracleDataSource();
oracleDataSource.setURL(connectionString);
oracleDataSource.setUser(user);
oracleDataSource.setPassword(password);
TopicConnectionFactory topicConnectionFactory = AQjmsFactory.getTopicConnectionFactory(oracleDataSource);
TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
Topic topic = ((AQjmsSession)topicSession).getTopic(user, queueName);
topicSession.close();
topicConnection.close();
return new Object[] { topicConnectionFactory, topic };
}
@Override
protected void internalStartFactory() throws Exception {
startInDb();
}
@Override
protected void internalStopFactory() throws Exception {
stopInDb();
}
@Override
protected void internalDestroyFactory() throws Exception {
try {
stopInDb();
destroyInDb();
} catch (java.sql.SQLException ex) {
} finally {
oracleDataSource = null;
}
}
@Override
protected void createTransportManager(RemoteCommandManager rcm) {
JMSTopicTransportManager tm = new JMSTopicTransportManager(rcm);
Properties props = new Properties();
// should use the testing JNDI factory.
props.setProperty(Context.INITIAL_CONTEXT_FACTORY, TEST_CONTEXT_FACTORY);
// the testing JNDI doesn't require PROVIDER_URL property, but without it NPE is thrown
props.setProperty(Context.PROVIDER_URL, "");
tm.setRemoteContextProperties(props);
// the testing JNDI doesn't require password, but without it NPE is thrown
tm.setPassword("");
}
protected void createInDb() throws java.sql.SQLException {
// just in case the table hasn't been already dropped
try {
destroyInDb();
} catch (java.sql.SQLException ex) {
// ignore
}
java.sql.Connection conn = java.sql.DriverManager.getConnection(connectionString, user, password);
try {
java.sql.Statement stmt = conn.createStatement();
String createTable = "BEGIN DBMS_AQADM.CREATE_QUEUE_TABLE (queue_table => '" + queueTableName + "', multiple_consumers => true, queue_payload_type => 'SYS.AQ$_JMS_BYTES_MESSAGE'); END;";
stmt.executeUpdate(createTable);
String createQueue = "BEGIN DBMS_AQADM.CREATE_QUEUE (queue_name => '" + queueName + "', queue_table => '" + queueTableName + "'); END;";
stmt.executeUpdate(createQueue);
} finally {
conn.close();
}
}
protected void startInDb() throws java.sql.SQLException {
java.sql.Connection conn = java.sql.DriverManager.getConnection(connectionString, user, password);
try {
java.sql.Statement stmt = conn.createStatement();
String startQueue = "BEGIN DBMS_AQADM.START_QUEUE (queue_name => '" + queueName + "'); END;";
stmt.executeUpdate(startQueue);
} finally {
conn.close();
}
}
protected void stopInDb() throws java.sql.SQLException {
java.sql.Connection conn = java.sql.DriverManager.getConnection(connectionString, user, password);
try {
java.sql.Statement stmt = conn.createStatement();
String stopQueue = "BEGIN DBMS_AQADM.STOP_QUEUE (queue_name => '" + queueName + "'); END;";
stmt.executeUpdate(stopQueue);
} finally {
conn.close();
}
}
protected void destroyInDb() throws java.sql.SQLException {
java.sql.Connection conn = java.sql.DriverManager.getConnection(connectionString, user, password);
try {
java.sql.Statement stmt1 = conn.createStatement();
String dropQueue = "BEGIN DBMS_AQADM.DROP_QUEUE (queue_name => '" + queueName + "'); END;";
try {
stmt1.executeUpdate(dropQueue);
} catch (java.sql.SQLException ex) {
}
java.sql.Statement stmt2 = conn.createStatement();
String dropTable = "BEGIN DBMS_AQADM.DROP_QUEUE_TABLE (queue_table => '" + queueTableName + "', force => TRUE); END;";
stmt2.executeUpdate(dropTable);
} finally {
conn.close();
}
}
// Sends an arbitrary message to speed up shut down of listening threads.
@Override
protected void sendMessageToStopListenerThreads() throws Exception {
Context context = new InitialContext(CONTEXT_PROPERTIES);
TopicConnectionFactory topicConnectionFactory = (TopicConnectionFactory)context.lookup(this.factoryJndiName);
TopicConnection topicConnection = topicConnectionFactory.createTopicConnection();
try {
Topic topic = (Topic)context.lookup(this.topicJndiName);
TopicSession session = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
TopicPublisher publisher = session.createPublisher(topic);
ObjectMessage objectMessage = session.createObjectMessage();
publisher.publish(objectMessage);
} finally {
topicConnection.close();
}
}
@Override
protected void createExternalConnection(AbstractSession session) {
((JMSTopicTransportManager)session.getCommandManager().getTransportManager()).createExternalConnection();
}
}