blob: 032f3c6c38d3b2e6c0b46be253f2955db3faa7d2 [file] [log] [blame]
/*
* Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
package org.eclipse.persistence.testing.tests.distributedcache;
import java.rmi.registry.LocateRegistry;
import org.eclipse.persistence.testing.models.employee.domain.Employee;
import org.eclipse.persistence.testing.models.employee.relational.EmployeeProject;
import org.eclipse.persistence.internal.descriptors.OptimisticLockingPolicy;
import org.eclipse.persistence.internal.helper.ConcurrencyManager;
import org.eclipse.persistence.descriptors.ClassDescriptor;
import org.eclipse.persistence.sessions.Session;
import org.eclipse.persistence.queries.ReadObjectQuery;
import org.eclipse.persistence.sessions.DatabaseLogin;
import org.eclipse.persistence.sessions.Project;
import org.eclipse.persistence.sessions.SessionEvent;
import org.eclipse.persistence.sessions.SessionEventAdapter;
import org.eclipse.persistence.sessions.SessionEventListener;
import org.eclipse.persistence.sessions.UnitOfWork;
import org.eclipse.persistence.testing.framework.TestCase;
import org.eclipse.persistence.testing.framework.TestErrorException;
import org.eclipse.persistence.sessions.coordination.RemoteCommandManager;
import org.eclipse.persistence.sessions.coordination.TransportManager;
import org.eclipse.persistence.sessions.server.Server;
import org.eclipse.persistence.sessions.server.ServerSession;
public abstract class DistributedCacheMergeTest extends TestCase {
private OptimisticLockingPolicy policy1 = null;
private OptimisticLockingPolicy policy2 = null;
protected Server cluster1Session = null;
protected Server cluster2Session = null;
protected Session originalSession = null;
public static ConcurrencyManager semaphore = new ConcurrencyManager();
Object originalObject = null;
int initialNumProjs;
public DistributedCacheMergeTest() {
setDescription("Testing");
}
@Override
protected void setup() throws Exception {
originalSession = getExecutor().getSession();
originalSession.getIdentityMapAccessor().initializeAllIdentityMaps();
createObject();
try {
LocateRegistry.createRegistry(41099);
} catch (Exception e) {
//hopefully this exception is just because the registry already exists
}
cluster1Session = buildSession("cluster1");
cluster1Session.login();
cluster2Session = buildSession("cluster2");
cluster2Session.login();
Thread.sleep(5000);// Let the Cache-sync get configured.
policy1 = disableOptimisticLocking(cluster1Session);
policy2 = disableOptimisticLocking(cluster2Session);
}
protected void createObject() {
originalObject = getOriginalObject();
UnitOfWork uow = originalSession.acquireUnitOfWork();
Object newObj = uow.registerNewObject(originalObject);
uow.commit();
}
protected Server buildSession(String sessionName) throws Exception {
ServerSession session = null;
Project p = getNewProject();
DatabaseLogin theLogin = originalSession.getLogin();
p.setLogin(originalSession.getLogin());
session = (ServerSession)p.createServerSession();
session.setSessionLog(getSession().getSessionLog());
RemoteCommandManager cm = new RemoteCommandManager(session);
// set propagate command asynchronously for testing
cm.setShouldPropagateAsynchronously(true);
cm.getDiscoveryManager().setAnnouncementDelay(0);
// ovewrite default to use RMI registry naming service
cm.getTransportManager().setNamingServiceType(TransportManager.REGISTRY_NAMING_SERVICE);
// set full rmi URL of local host
cm.setUrl("rmi://localhost:41099");
// turn on cache sync with RCM
session.setShouldPropagateChanges(true);
cm.setServerPlatform(getSession().getServerPlatform());
cm.initialize();
// Sleep to allow RCM to startup and find each session.
try {
Thread.sleep(2000);
} catch (Exception ignore) {
}
return session;
}
@Override
public void reset() throws Exception {
int depth = semaphore.getDepth();
while (depth != 0) {
semaphore.release();
depth = semaphore.getDepth();
}
UnitOfWork uow = cluster1Session.acquireClientSession().acquireUnitOfWork();
uow.deleteObject(originalObject);
uow.commit();
enableOptimisticLocking(cluster1Session, policy1);
enableOptimisticLocking(cluster2Session, policy2);
cluster1Session.logout();
cluster1Session = null;
cluster2Session.logout();
cluster2Session = null;
}
private OptimisticLockingPolicy disableOptimisticLocking(Server server) {
ClassDescriptor descriptor = server.getDescriptor(getRootClass());
OptimisticLockingPolicy policy = descriptor.getOptimisticLockingPolicy();
descriptor.setOptimisticLockingPolicy(null);
return policy;
}
private void enableOptimisticLocking(Server server, OptimisticLockingPolicy policy) {
ClassDescriptor descriptor = server.getDescriptor(getRootClass());
descriptor.setOptimisticLockingPolicy(policy);
}
@Override
protected void test() {
Session clientSession1 = cluster1Session.acquireClientSession();
Session clientSession2 = cluster2Session.acquireClientSession();
cluster2Session.getIdentityMapAccessor().initializeAllIdentityMaps();
cluster2Session.getEventManager().addListener(buildCacheMergeBlockingListener());
Object object1 = findOriginalObject(clientSession1);
if (object1 == null) {
throw new TestErrorException("Employee on Server 1 exists");
}
initialNumProjs = getCollectionSize(object1);
UnitOfWork uow = clientSession1.acquireUnitOfWork();
Object newEmpWC = uow.registerObject(object1);
modifyCollection(uow, newEmpWC);
semaphore.acquire();
uow.commit();
if (getCollectionSize(object1) != (initialNumProjs + 1)) {
throw new TestErrorException("Employee has the wrong number of items in the collection; expected:" + (initialNumProjs + 1) + " was:" + getCollectionSize(object1));
}
try {
while (semaphore.getNumberOfWritersWaiting() == 0) {
Thread.sleep(10);
}
} catch (Exception e) {
throw new TestErrorException("Error while getting Thread to sleep", e);
}
// Make sure its not in server 2's cache
Object object2 = clientSession2.getIdentityMapAccessor().getFromIdentityMap(object1);
if (object2 != null) {
throw new TestErrorException("Employee already exists in Server 2's cache");
}
object2 = findOriginalObject(clientSession2);
if (object2 == null) {
throw new TestErrorException("Employee does not exist in Server 2's cache");
}
if (getCollectionSize(object2) != (initialNumProjs + 1)) {
throw new TestErrorException("Employee has the wrong number of items in the collection");
}
semaphore.release();
try {
Thread.sleep(1000);
} catch (Exception e) {
throw new TestErrorException("Error while getting Thread to sleep", e);
}
semaphore.acquire();
if (getCollectionSize(object2) != (initialNumProjs + 1)) {
if (getCollectionSize(object2) == (initialNumProjs + 2)) {
throw new TestErrorException("Employee has one too many items in the collection after merge");
}
throw new TestErrorException("Employee has wrong number of items in the collection after merge");
}
semaphore.release();
}
protected Class getRootClass() {
return Employee.class;
}
protected Project getNewProject() {
return new EmployeeProject();
}
protected abstract void modifyCollection(UnitOfWork uow, Object objectToModify);
protected abstract int getCollectionSize(Object rootObject);
protected abstract Object buildOriginalObject();
public Object findOriginalObject(Session session) {
ReadObjectQuery roq = new ReadObjectQuery(originalObject);
return session.executeQuery(roq);
}
protected SessionEventListener buildCacheMergeBlockingListener() {
return new SessionEventAdapter() {
@Override
public void preDistributedMergeUnitOfWorkChangeSet(SessionEvent event) {
try {
DistributedCacheMergeTest.semaphore.acquire();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Override
public void postDistributedMergeUnitOfWorkChangeSet(SessionEvent event) {
DistributedCacheMergeTest.semaphore.release();
}
};
}
public Object getOriginalObject() {
if (originalObject == null) {
originalObject = buildOriginalObject();
}
return originalObject;
}
}