blob: be81b0b30cf8a3a4673de050322870ae7da11d9f [file] [log] [blame]
/*
* Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
package org.eclipse.persistence.testing.tests.dbchangenotification;
import java.util.Vector;
import org.eclipse.persistence.sessions.*;
import org.eclipse.persistence.queries.SQLCall;
import org.eclipse.persistence.testing.framework.*;
import org.eclipse.persistence.testing.models.employee.domain.*;
import org.eclipse.persistence.testing.models.employee.relational.EmployeeProject;
public class DbChangeNotificationTest extends TestCase {
int nMax = 2;
int n; // n <= nMax
Session session[] = new Session[nMax];
Vector employees[] = new Vector[nMax];
Vector smallProjects[] = new Vector[nMax];
Vector largeProjects[] = new Vector[nMax];
Vector phoneNumbers[] = new Vector[nMax];
String aqUser;
String aqPassword;
String queueName;
boolean useMultipleConsumers;
boolean useListener;
long timeToWait = 1000;
long timeDead = 5000;
CacheInvalidationHandler handler[] = new CacheInvalidationHandler[nMax];
public DbChangeNotificationTest(String aqUser, String aqPassword, String queueName, boolean useMultipleConsumers, boolean useListener) {
super();
this.aqUser = aqUser;
this.aqPassword = aqPassword;
this.queueName = queueName;
this.useMultipleConsumers = useMultipleConsumers;
this.useListener = useListener;
String uses;
if (useListener) {
uses = " listener";
} else {
uses = " thread";
}
String type;
if (useMultipleConsumers) {
n = nMax;
type = " broadcast";
} else {
n = 1;
type = " point to point";
}
setName(getName() + uses + type);
}
@Override
protected void setup() throws Exception {
javax.jms.TopicConnectionFactory topicConnectionFactory = null;
javax.jms.QueueConnectionFactory queueConnectionFactory = null;
if (useMultipleConsumers) {
topicConnectionFactory = oracle.jms.AQjmsFactory.getTopicConnectionFactory(getSession().getLogin().getConnectionString(), null);
} else {
queueConnectionFactory = oracle.jms.AQjmsFactory.getQueueConnectionFactory(getSession().getLogin().getConnectionString(), null);
}
for (int i = 0; i < n; i++) {
DatabaseLogin login = (DatabaseLogin)getSession().getLogin().clone();
session[i] = new org.eclipse.persistence.sessions.Project(login).createDatabaseSession();
((DatabaseSession)session[i]).addDescriptors(new EmployeeProject());
session[i].setSessionLog(getSession().getSessionLog());
session[i].setLogLevel(getSession().getLogLevel());
session[i].setName("session[" + Integer.toString(i) + "]");
((DatabaseSession)session[i]).login();
employees[i] = session[i].readAllObjects(Employee.class);
smallProjects[i] = session[i].readAllObjects(SmallProject.class);
largeProjects[i] = session[i].readAllObjects(LargeProject.class);
phoneNumbers[i] = session[i].readAllObjects(PhoneNumber.class);
session[i].executeNonSelectingCall(new SQLCall("BEGIN NOTIFY_SET_APPID('" + session[i].getName() + "'); END;"));
// String selector = "(JMSXUserID IS NULL) OR (JMSXUserID <> " + "'" + session[i].getName() + "')";
String selector = "(APP IS NULL) OR (APP <> " + "'" + session[i].getName() + "')";
javax.jms.Connection jmsConnection;
javax.jms.MessageConsumer messageConsumer;
if (useMultipleConsumers) {
javax.jms.TopicConnection topicConnection = topicConnectionFactory.createTopicConnection(aqUser, aqPassword);
jmsConnection = topicConnection;
javax.jms.TopicSession topicSession = topicConnection.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
javax.jms.Topic topic = ((oracle.jms.AQjmsSession)topicSession).getTopic(aqUser, queueName);
messageConsumer = topicSession.createSubscriber(topic, selector, false);
} else {
javax.jms.QueueConnection queueConnection = queueConnectionFactory.createQueueConnection(aqUser, aqPassword);
jmsConnection = queueConnection;
javax.jms.QueueSession queueSession = queueConnection.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = ((oracle.jms.AQjmsSession)queueSession).getQueue(aqUser, queueName);
messageConsumer = queueSession.createReceiver(queue, selector);
}
jmsConnection.start();
if (useListener) {
// every timeToWait listener.askToStopAfter() verifies whether it should stop
CacheInvalidationMessageListener listener = new CacheInvalidationMessageListener(session[i], jmsConnection, timeToWait);
messageConsumer.setMessageListener(listener);
handler[i] = listener;
} else {
// runnable calls messageConsumer.receive(timeToWait) in a loop, verifying whether it should stop
CacheInvalidationRunnable runnable = new CacheInvalidationRunnable(session[i], jmsConnection, messageConsumer, timeToWait);
new Thread(runnable).start();
handler[i] = runnable;
}
}
// enable sending notification messages - set appId to non-null
getSession().executeNonSelectingCall(new SQLCall("BEGIN NOTIFY_SET_APPID('original session'); END;"));
// enableTriggers(true);
Thread.sleep(1000);
}
@Override
protected void test() throws Exception {
int[] numMessagesExpected = new int[n];
int numUpdated;
// 1 ********************************************************
// Original session: Begin transaction, update SmallProjects, rollback transaction
// No change notification should be sent
getAbstractSession().beginTransaction();
SQLCall updateSmallProjectsCall = new SQLCall("UPDATE PROJECT SET DESCRIP = CONCAT('BLA ', DESCRIP) WHERE PROJ_TYPE = 'S'");
getSession().executeNonSelectingCall(updateSmallProjectsCall);
getAbstractSession().rollbackTransaction();
// 2 ********************************************************
// Original session updates LargeProjects
// Change notification should be sent to both session[0] and session[1]
SQLCall updateLargeProjectsCall = new SQLCall("UPDATE LPROJECT SET BUDGET = BUDGET + 10000");
numUpdated = getSession().executeNonSelectingCall(updateLargeProjectsCall);
for (int i = 0; i < n; i++) {
numMessagesExpected[i] += numUpdated;
}
// 3 ********************************************************
// session[0] updates Salaries
// Change notification should be sent only to session[1]
SQLCall updateSalariesCall = new SQLCall("UPDATE SALARY SET SALARY = SALARY + 10000");
numUpdated = session[0].executeNonSelectingCall(updateSalariesCall);
for (int i = 0; i < n; i++) {
if (i != 0) {
numMessagesExpected[i] += numUpdated;
}
}
// 4 ********************************************************
// session[1] updates PhoneNumbers
// Change notification should be sent only to session[0]
if (n > 1) {
// add 1 to area code: 110 -> 111
// Added 1000 to properly handled codes starting with 0: 095 -> 096 (not to 96)
SQLCall updatePhoneNumbersCall = new SQLCall("UPDATE PHONE SET AREA_CODE = SUBSTR(TO_CHAR(1001+TO_NUMBER(AREA_CODE)),2)");
numUpdated = session[1].executeNonSelectingCall(updatePhoneNumbersCall);
for (int i = 0; i < n; i++) {
if (i != 1) {
numMessagesExpected[i] += numUpdated;
}
}
}
for (int i = 0; i < n; i++) {
// stop either after expected number of messages has been received or
// after timeDead has past since receiving the last message
handler[i].askToStopAfter(numMessagesExpected[i], timeDead);
}
}
@Override
protected void verify() throws Exception {
Thread.sleep(1000);
// 1 ********************************************************
// Original session: Begin transaction, update SmallProjects, rollback transaction
// No change notification should be sent
for (int i = 0; i < n; i++) {
boolean shouldInvalidate = false;
int numInvalidated = 0;
for (int j = 0; j < smallProjects[i].size(); j++) {
if (!session[i].getIdentityMapAccessor().isValid(smallProjects[i].elementAt(j))) {
numInvalidated++;
}
}
verifyInvalidated(session[i].getName(), shouldInvalidate, "SmallProject", numInvalidated, smallProjects[i].size());
}
// 2 ********************************************************
// Original session updates LargeProjects
// Change notification should be sent to both session[0] and session[1]
for (int i = 0; i < n; i++) {
boolean shouldInvalidate = true;
int numInvalidated = 0;
for (int j = 0; j < largeProjects[i].size(); j++) {
if (!session[i].getIdentityMapAccessor().isValid(largeProjects[i].elementAt(j))) {
numInvalidated++;
}
}
verifyInvalidated(session[i].getName(), shouldInvalidate, "LargeProject", numInvalidated, largeProjects[i].size());
}
// 3 ********************************************************
// session[0] updates Salaries
// Change notification should be sent only to session[1]
for (int i = 0; i < n; i++) {
boolean shouldInvalidate = i == 1;
int numInvalidated = 0;
for (int j = 0; j < employees[i].size(); j++) {
if (!session[i].getIdentityMapAccessor().isValid(employees[i].elementAt(j))) {
numInvalidated++;
}
}
verifyInvalidated(session[i].getName(), shouldInvalidate, "Employee", numInvalidated, employees[i].size());
}
// 4 ********************************************************
// session[1] updates PhoneNumbers
// Change notification should be sent only to session[0]
if (n > 1) {
for (int i = 0; i < n; i++) {
boolean shouldInvalidate = i == 0;
int numInvalidated = 0;
for (int j = 0; j < phoneNumbers[i].size(); j++) {
if (!session[i].getIdentityMapAccessor().isValid(phoneNumbers[i].elementAt(j))) {
numInvalidated++;
}
}
verifyInvalidated(session[i].getName(), shouldInvalidate, "PhoneNumber", numInvalidated, phoneNumbers[i].size());
}
}
}
protected void verifyInvalidated(String sessionName, boolean shouldInvalidate, String className, int numInvalidated, int numTotal) {
if (shouldInvalidate) {
if (numInvalidated == 0) {
throw new TestErrorException(sessionName + " has not invalidated " + className);
} else if (numInvalidated < numTotal) {
throw new TestErrorException(sessionName + " has invalidated only " + numInvalidated + ' ' + className + ", should've invalidated " + numTotal);
}
} else {
if (numInvalidated > 0) {
throw new TestErrorException(sessionName + " has invalidated " + numInvalidated + ' ' + className + ", was not supposed to invalidate any");
}
}
}
@Override
public void reset() throws Exception {
// disable sending notification messages - set appId to null
getSession().executeNonSelectingCall(new SQLCall("BEGIN NOTIFY_SET_APPID; END;"));
// enableTriggers(false);
Thread.sleep(1000);
for (int i = 0; i < n; i++) {
((DatabaseSession)session[i]).logout();
}
// reset original values into the db
// 2 ********************************************************
SQLCall updateLargeProjectsCall = new SQLCall("UPDATE LPROJECT SET BUDGET = BUDGET - 10000");
getSession().executeNonSelectingCall(updateLargeProjectsCall);
// 3 ********************************************************
SQLCall updateSalariesCall = new SQLCall("UPDATE SALARY SET SALARY = SALARY - 10000");
getSession().executeNonSelectingCall(updateSalariesCall);
// 4 ********************************************************
if (n > 1) {
// subtract 1 from area code: 111 -> 110
// Added 1000 to properly handled codes starting with 0: 096 -> 095 (not to 95)
SQLCall updatePhoneNumbersCall = new SQLCall("UPDATE PHONE SET AREA_CODE = SUBSTR(TO_CHAR(999+TO_NUMBER(AREA_CODE)),2)");
getSession().executeNonSelectingCall(updatePhoneNumbersCall);
}
}
/*protected void enableTriggers(boolean enable) {
String[] tableNames = {"PROJECT", "LPROJECT", "SALARY", "PHONE"};
String action;
if(enable) {
action = "ENABLE";
} else {
action = "DISABLE";
}
getAbstractSession().beginTransaction();
for(int i=0; i<tableNames.length; i++) {
String triggerName = "NOTIFY_" + tableNames[i];
getSession().executeNonSelectingCall(new SQLCall("ALTER TRIGGER "+triggerName+" "+action));
}
getAbstractSession().commitTransaction();
}*/
}