blob: 629c532c526e11b35f9dfc6c414b22d7cb842f7b [file] [log] [blame]
/*
* Copyright (c) 1998, 2021 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1998, 2021 IBM Corporation. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
// 02/11/2009-1.1 Michael O'Brien
// - 259993: As part 2) During mergeClonesAfterCompletion()
// If the the acquire and release threads are different
// switch back to the stored acquire thread stored on the mergeManager.
// tware, David Mulligan - fix performance issue with releasing locks
// 11/07/2017 - Dalia Abo Sheasha
// - 526957 : Split the logging and trace messages
package org.eclipse.persistence.internal.helper;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import org.eclipse.persistence.descriptors.ClassDescriptor;
import org.eclipse.persistence.descriptors.FetchGroupManager;
import org.eclipse.persistence.exceptions.ConcurrencyException;
import org.eclipse.persistence.internal.helper.linkedlist.ExposedNodeLinkedList;
import org.eclipse.persistence.internal.identitymaps.CacheKey;
import org.eclipse.persistence.internal.localization.TraceLocalization;
import org.eclipse.persistence.internal.queries.ContainerPolicy;
import org.eclipse.persistence.internal.sessions.AbstractSession;
import org.eclipse.persistence.internal.sessions.MergeManager;
import org.eclipse.persistence.internal.sessions.ObjectChangeSet;
import org.eclipse.persistence.internal.sessions.UnitOfWorkChangeSet;
import org.eclipse.persistence.logging.SessionLog;
import org.eclipse.persistence.mappings.DatabaseMapping;
import static java.util.Collections.unmodifiableMap;
/**
* INTERNAL:
* <p>
* <b>Purpose</b>: Acquires all required locks for a particular merge process.
* Implements a deadlock avoidance algorithm to prevent concurrent merge conflicts.
*
* <p>
* <b>Responsibilities</b>:
* <ul>
* <li> Acquires locks for writing threads.
* <li> Provides deadlock avoidance behavior.
* <li> Releases locks for writing threads.
* </ul>
* @author Gordon Yorke
* @since 10.0.3
*/
public class WriteLockManager {
/**
* The code spots where we use this constant are code spots where we afraid the thread might be trying to run a
* commit. Blowing up the thread with an interrupted exception might be too dangerous. We are not certain the
* eclipselink code is able to cope with it and release all resources appropriately.
*
*/
private static final Boolean ALLOW_INTERRUPTED_EXCEPTION_TO_BE_FIRED_UP_FALSE = false;
/**
* This flag we use if the write lock manager is stuck building clones of objects. Because we are not in the code
* area of commit to the db anything.
*/
private static final Boolean ALLOW_INTERRUPTED_EXCEPTION_TO_BE_FIRED_UP_TRUE = true;
/**
* This a map from a thread to cache keys the thread is finding itself not being able to acquire. This map is
* important to explain why a thread might be stuck in a stack trace of the form: {@code
* at java.lang.Class.getEnclosingMethod0(Native Method)
at java.lang.Class.getEnclosingMethodInfo(Class.java:1072)
at java.lang.Class.getEnclosingClass(Class.java:1272)
at java.lang.Class.getSimpleBinaryName(Class.java:1443)
at java.lang.Class.getSimpleName(Class.java:1309)
at org.eclipse.persistence.internal.identitymaps.IdentityMapManager.acquireLockNoWait(IdentityMapManager.java:205)
at org.eclipse.persistence.internal.sessions.IdentityMapAccessor.acquireLockNoWait(IdentityMapAccessor.java:108)
at org.eclipse.persistence.internal.helper.WriteLockManager.attemptToAcquireLock(WriteLockManager.java:431)
at org.eclipse.persistence.internal.helper.WriteLockManager.acquireRequiredLocks(WriteLockManager.java:280)
* }
*
* We want to be able to trace these dead lock situations. To put them our on the massive log dump as to do the dead
* lock detection.
*
*/
private static final Map<Thread, Set<ConcurrencyManager>> THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS = new ConcurrentHashMap<>();
/**
* We want to have traceability of what objects where changed by thread that is in the middle of a commit. This
* information can be useful when a massive dump is performed to explain the situation of any thread that might
* eventually be stuck inside of the write lock manager to tells us what exactly are the objects it has changed and
* wants to commit or merge into the shared cache. Relates to the
* {@link #THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS} but this map does not tells us about any
* specific problem such as a cache key that could not be acquired just tells us what objects were modified.
*
*/
private static final Map<Thread, Set<Object>> MAP_WRITE_LOCK_MANAGER_THREAD_TO_OBJECT_IDS_WITH_CHANGE_SET = new ConcurrentHashMap<>();
/** Semaphore related properties */
private static final transient ThreadLocal<Boolean> SEMAPHORE_THREAD_LOCAL_VAR = new ThreadLocal<>();
private static final transient int SEMAPHORE_MAX_NUMBER_THREADS = ConcurrencyUtil.SINGLETON.getNoOfThreadsAllowedToDoWriteLockManagerAcquireRequiredLocksInParallel();
private static final transient Semaphore SEMAPHORE_LIMIT_MAX_NUMBER_OF_THREADS_WRITE_LOCK_MANAGER = new Semaphore(SEMAPHORE_MAX_NUMBER_THREADS);
private transient ConcurrencySemaphore writeLockManagerSemaphore = new ConcurrencySemaphore(SEMAPHORE_THREAD_LOCAL_VAR, SEMAPHORE_MAX_NUMBER_THREADS, SEMAPHORE_LIMIT_MAX_NUMBER_OF_THREADS_WRITE_LOCK_MANAGER, this,"write_lock_manager_semaphore_acquired_01");
// this will allow us to prevent a readlock thread from looping forever.
public static final int MAXTRIES = 10000;
public static final int MAX_WAIT = 600000; //10 mins
/* This attribute stores the list of threads that have had a problem acquiring locks */
/* the first element in this list will be the prevailing thread */
protected ExposedNodeLinkedList prevailingQueue;
public WriteLockManager() {
this.prevailingQueue = new ExposedNodeLinkedList();
}
/**
* INTERNAL:
* This method will return once the object is locked and all non-indirect
* related objects are also locked.
*/
public Map acquireLocksForClone(Object objectForClone, ClassDescriptor descriptor, CacheKey cacheKey, AbstractSession cloningSession) {
// determineIfReleaseDeferredLockAppearsToBeDeadLocked
final long whileStartTimeMillis = System.currentTimeMillis();
final Thread currentThread = Thread.currentThread();
DeferredLockManager lockManager = ConcurrencyManager.getDeferredLockManager(currentThread);
ReadLockManager readLockManager = ConcurrencyManager.getReadLockManager(currentThread);
boolean successful = false;
IdentityHashMap lockedObjects = new IdentityHashMap();
IdentityHashMap refreshedObjects = new IdentityHashMap();
CacheKey lastCacheKeyWeNeededToWaitToAcquire = null;
try {
// if the descriptor has indirection for all mappings then wait as there will be no deadlock risks
CacheKey toWaitOn = acquireLockAndRelatedLocks(objectForClone, lockedObjects, refreshedObjects, cacheKey, descriptor, cloningSession);
int tries = 0;
while (toWaitOn != null) {// loop until we've tried too many times.
for (Iterator lockedList = lockedObjects.values().iterator(); lockedList.hasNext();) {
((CacheKey)lockedList.next()).releaseReadLock();
lockedList.remove();
}
// of the concurrency manager that we use for creating the massive log dump
// to indicate that the current thread is now stuck trying to acquire some arbitrary
// cache key for writing
StackTraceElement stackTraceElement = Thread.currentThread().getStackTrace()[1];
lastCacheKeyWeNeededToWaitToAcquire = toWaitOn;
lastCacheKeyWeNeededToWaitToAcquire.putThreadAsWaitingToAcquireLockForWriting(currentThread, stackTraceElement.getClassName() + "." + stackTraceElement.getMethodName() + "(...)");
// Since we know this one of those methods that can appear in the dead locks
// we threads frozen here forever inside of the wait that used to have no timeout
// we will now always check for how long the current thread is stuck in this while loop going nowhere
// using the exact same approach we have been adding to the concurrency manager
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(toWaitOn, whileStartTimeMillis, lockManager, readLockManager, ALLOW_INTERRUPTED_EXCEPTION_TO_BE_FIRED_UP_TRUE);
synchronized (toWaitOn) {
try {
if (toWaitOn.isAcquired()) {//last minute check to insure it is still locked.
toWaitOn.wait(ConcurrencyUtil.SINGLETON.getAcquireWaitTime());// wait for lock on object to be released
}
} catch (InterruptedException ex) {
// Ignore exception thread should continue.
}
}
Object waitObject = toWaitOn.getObject();
// Object may be null for loss of identity.
if (waitObject != null) {
cloningSession.checkAndRefreshInvalidObject(waitObject, toWaitOn, cloningSession.getDescriptor(waitObject));
refreshedObjects.put(waitObject, waitObject);
}
toWaitOn = acquireLockAndRelatedLocks(objectForClone, lockedObjects, refreshedObjects, cacheKey, descriptor, cloningSession);
if ((toWaitOn != null) && ((++tries) > MAXTRIES)) {
// If we've tried too many times abort.
throw ConcurrencyException.maxTriesLockOnCloneExceded(objectForClone);
}
}
successful = true;//successfully acquired all locks
} catch (InterruptedException exception) {
throw ConcurrencyException.maxTriesLockOnCloneExceded(objectForClone);
} finally {
if (lastCacheKeyWeNeededToWaitToAcquire != null) {
lastCacheKeyWeNeededToWaitToAcquire.removeThreadNoLongerWaitingToAcquireLockForWriting(currentThread);
}
if (!successful) {//did not acquire locks but we are exiting
for (Iterator lockedList = lockedObjects.values().iterator(); lockedList.hasNext();) {
((CacheKey)lockedList.next()).releaseReadLock();
lockedList.remove();
}
}
}
return lockedObjects;
}
/**
* INTERNAL:
* This is a recursive method used to acquire read locks on all objects that
* will be cloned. These include all related objects for which there is no
* indirection.
* The returned object is the first object that the lock could not be acquired for.
* The caller must try for exceptions and release locked objects in the case
* of an exception.
*/
public CacheKey acquireLockAndRelatedLocks(Object objectForClone, Map lockedObjects, Map refreshedObjects, CacheKey cacheKey, ClassDescriptor descriptor, AbstractSession cloningSession) {
if (!refreshedObjects.containsKey(objectForClone) && cloningSession.isConsideredInvalid(objectForClone, cacheKey, descriptor)) {
return cacheKey;
}
// Attempt to get a read-lock, null is returned if cannot be read-locked.
if (cacheKey.acquireReadLockNoWait()) {
if (cacheKey.getObject() == null) {
// This will be the case for deleted objects, NoIdentityMap, and aggregates.
lockedObjects.put(objectForClone, cacheKey);
} else {
objectForClone = cacheKey.getObject();
if (lockedObjects.containsKey(objectForClone)) {
// This is a check for loss of identity, the original check in
// checkAndLockObject() will shortcircuit in the usual case.
cacheKey.releaseReadLock();
return null;
}
// Store locked cachekey for release later.
lockedObjects.put(objectForClone, cacheKey);
}
return traverseRelatedLocks(objectForClone, lockedObjects, refreshedObjects, descriptor, cloningSession);
} else {
// Return the cache key that could not be locked.
return cacheKey;
}
}
/**
* INTERNAL:
* This method will transition the previously acquired active
* locks to deferred locks in the case a readlock could not be acquired for
* a related object. Deferred locks must be employed to prevent deadlock
* when waiting for the readlock while still protecting readers from
* incomplete data.
*/
public void transitionToDeferredLocks(MergeManager mergeManager){
try{
if (mergeManager.isTransitionedToDeferredLocks()) {
return;
}
for (CacheKey cacheKey : mergeManager.getAcquiredLocks()){
cacheKey.transitionToDeferredLock();
}
mergeManager.transitionToDeferredLocks();
}catch (RuntimeException ex){
for (CacheKey cacheKey : mergeManager.getAcquiredLocks()){
cacheKey.release();
}
ConcurrencyManager.getDeferredLockManager(Thread.currentThread()).setIsThreadComplete(true);
ConcurrencyManager.removeDeferredLockManager(Thread.currentThread());
mergeManager.getAcquiredLocks().clear();
throw ex;
}
}
/**
* INTERNAL:
* Traverse the object and acquire locks on all related objects.
*/
public CacheKey traverseRelatedLocks(Object objectForClone, Map lockedObjects, Map refreshedObjects, ClassDescriptor descriptor, AbstractSession cloningSession) {
// If all mappings have indirection short-circuit.
if (descriptor.shouldAcquireCascadedLocks()) {
FetchGroupManager fetchGroupManager = descriptor.getFetchGroupManager();
boolean isPartialObject = (fetchGroupManager != null) && fetchGroupManager.isPartialObject(objectForClone);
for (Iterator<DatabaseMapping> mappings = descriptor.getLockableMappings().iterator();
mappings.hasNext();) {
DatabaseMapping mapping = mappings.next();
// Only cascade fetched mappings.
if (!isPartialObject || (fetchGroupManager.isAttributeFetched(objectForClone, mapping.getAttributeName()))) {
// any mapping in this list must not have indirection.
Object objectToLock = mapping.getAttributeValueFromObject(objectForClone);
if (mapping.isCollectionMapping()) {
// Ignore null, means empty.
if (objectToLock != null) {
ContainerPolicy cp = mapping.getContainerPolicy();
Object iterator = cp.iteratorFor(objectToLock);
while (cp.hasNext(iterator)) {
Object object = cp.next(iterator, cloningSession);
if (mapping.getReferenceDescriptor().hasWrapperPolicy()) {
object = mapping.getReferenceDescriptor().getWrapperPolicy().unwrapObject(object, cloningSession);
}
CacheKey toWaitOn = checkAndLockObject(object, lockedObjects, refreshedObjects, mapping, cloningSession);
if (toWaitOn != null) {
return toWaitOn;
}
}
}
} else {
if (mapping.getReferenceDescriptor().hasWrapperPolicy()) {
objectToLock = mapping.getReferenceDescriptor().getWrapperPolicy().unwrapObject(objectToLock, cloningSession);
}
CacheKey toWaitOn = checkAndLockObject(objectToLock, lockedObjects, refreshedObjects, mapping, cloningSession);
if (toWaitOn != null) {
return toWaitOn;
}
}
}
}
}
return null;
}
/**
* INTERNAL:
* This method will be the entry point for threads attempting to acquire locks for all objects that have
* a changeset. This method will hand off the processing of the deadlock algorithm to other member
* methods. The mergeManager must be the active mergemanager for the calling thread.
* Returns true if all required locks were acquired
* This is wrapper method with semaphore logic.
*/
public void acquireRequiredLocks(MergeManager mergeManager, UnitOfWorkChangeSet changeSet) {
boolean semaphoreWasAcquired = false;
boolean useSemaphore = ConcurrencyUtil.SINGLETON.isUseSemaphoreToLimitConcurrencyOnWriteLockManagerAcquireRequiredLocks();
try {
semaphoreWasAcquired = writeLockManagerSemaphore.acquireSemaphoreIfAppropriate(useSemaphore);
acquireRequiredLocksInternal(mergeManager, changeSet);
} finally {
writeLockManagerSemaphore.releaseSemaphoreAllowOtherThreadsToStartDoingObjectBuilding(semaphoreWasAcquired);
}
}
/**
* INTERNAL:
* This method will be the entry point for threads attempting to acquire locks for all objects that have
* a changeset. This method will hand off the processing of the deadlock algorithm to other member
* methods. The mergeManager must be the active mergemanager for the calling thread.
* Returns true if all required locks were acquired
*/
private void acquireRequiredLocksInternal(MergeManager mergeManager, UnitOfWorkChangeSet changeSet) {
if (!MergeManager.LOCK_ON_MERGE) {//lockOnMerge is a backdoor and not public
return;
}
boolean locksToAcquire = true;
final Thread currentThread = Thread.currentThread();
final long timeWhenLocksToAcquireLoopStarted = System.currentTimeMillis();
populateMapThreadToObjectIdsWithChagenSet(currentThread, changeSet.getAllChangeSets().values());
clearMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(currentThread);
//while that thread has locks to acquire continue to loop.
try {
// initialize the MergeManager during this commit or merge for insert/updates only
// this call is not required in acquireLocksForClone() or acquireLockAndRelatedLocks()
mergeManager.setLockThread(Thread.currentThread());
AbstractSession session = mergeManager.getSession();
// If the session in the mergemanager is not a unit of work then the
// merge is of a changeSet into a distributed session.
if (session.isUnitOfWork()) {
session = session.getParent();
}
while (locksToAcquire) {
//lets assume all locks will be acquired
locksToAcquire = false;
//first access the changeSet and begin to acquire locks
ClassDescriptor descriptor = null;
for (ObjectChangeSet objectChangeSet : changeSet.getAllChangeSets().values()) {
// No Need to acquire locks for invalidated objects.
if ((mergeManager.shouldMergeChangesIntoDistributedCache() && (objectChangeSet.getSynchronizationType() == ClassDescriptor.INVALIDATE_CHANGED_OBJECTS))
|| objectChangeSet.getId() == null) {
//skip this process as we will be unable to acquire the correct cachekey anyway
//this is a new object with identity after write sequencing, ? huh, all objects must have an id by merge?
continue;
}
descriptor = objectChangeSet.getDescriptor();
// Maybe null for distributed merge, initialize it.
if (descriptor == null) {
descriptor = session.getDescriptor(objectChangeSet.getClassType(session));
objectChangeSet.setDescriptor(descriptor);
}
// PERF: Do not merge nor lock into the session cache if descriptor set to unit of work isolated.
if (descriptor.getCachePolicy().shouldIsolateObjectsInUnitOfWork()) {
continue;
}
AbstractSession targetSession = session.getParentIdentityMapSession(descriptor, true, true);
CacheKey activeCacheKey = attemptToAcquireLock(descriptor, objectChangeSet.getId(), targetSession);
if (activeCacheKey == null) {
// if cacheKey is null then the lock was not available no need to synchronize this block,because if the
// check fails then this thread will just return to the queue until it gets woken up.
if (this.prevailingQueue.getFirst() == mergeManager) {
// wait on this object until it is free, or until wait time expires because
// this thread is the prevailing thread
// see bug 483478
activeCacheKey = waitOnObjectLock(descriptor, objectChangeSet.getId(),
targetSession, (int) Math.round(((0.001d + Math.random()) * 500)));
}
if (activeCacheKey == null) {
// failed to acquire lock, release all acquired
// locks and place thread on waiting list
releaseAllAcquiredLocks(mergeManager);
// get cacheKey
activeCacheKey = targetSession.getIdentityMapAccessorInstance().getCacheKeyForObjectForLock(objectChangeSet.getId(), descriptor.getJavaClass(), descriptor);
if (session.shouldLog(SessionLog.FINER, SessionLog.CACHE)) {
Object[] params = new Object[3];
params[0] = descriptor.getJavaClass();
params[1] = objectChangeSet.getId();
params[2] = Thread.currentThread().getName();
session.log(SessionLog.FINER, SessionLog.CACHE, "dead_lock_encountered_on_write_no_cachekey", params, null);
}
if (mergeManager.getWriteLockQueued() == null) {
// thread is entering the wait queue for the
// first time
// set the QueueNode to be the node from the
// linked list for quick removal upon
// acquiring all locks
synchronized (this.prevailingQueue) {
mergeManager.setQueueNode(this.prevailingQueue.addLast(mergeManager));
}
}
// set the cache key on the merge manager for
// the object that could not be acquired
mergeManager.setWriteLockQueued(objectChangeSet.getId());
try {
if (activeCacheKey != null){
//wait on the lock of the object that we couldn't get.
synchronized (activeCacheKey) {
// verify that the cache key is still locked before we wait on it, as
//it may have been released since we tried to acquire it.
if (activeCacheKey.isAcquired() && (activeCacheKey.getActiveThread() != Thread.currentThread())) {
Thread thread = activeCacheKey.getActiveThread();
if (thread.isAlive()){
long time = System.currentTimeMillis();
activeCacheKey.wait(MAX_WAIT);
if (System.currentTimeMillis() - time >= MAX_WAIT){
Object[] params = new Object[]{MAX_WAIT /1000, descriptor.getJavaClassName(), activeCacheKey.getKey(), thread.getName()};
StringBuilder buffer = new StringBuilder(TraceLocalization.buildMessage("max_time_exceeded_for_acquirerequiredlocks_wait", params));
StackTraceElement[] trace = thread.getStackTrace();
for (StackTraceElement element : trace){
buffer.append("\t\tat");
buffer.append(element.toString());
buffer.append("\n");
}
session.log(SessionLog.SEVERE, SessionLog.CACHE, buffer.toString());
session.getIdentityMapAccessor().printIdentityMapLocks();
}
}else{
session.log(SessionLog.SEVERE, SessionLog.CACHE, "releasing_invalid_lock", new Object[] { thread.getName(),descriptor.getJavaClass(), objectChangeSet.getId()});
//thread that held lock is no longer alive. Something bad has happened like
while (activeCacheKey.isAcquired()){
// could have a depth greater than one.
activeCacheKey.release();
}
}
}
}
}
} catch (InterruptedException exception) {
throw org.eclipse.persistence.exceptions.ConcurrencyException.waitWasInterrupted(exception.getMessage());
}
// we want to record this information so that we have traceability over this sort of problems
addCacheKeyToMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(currentThread, activeCacheKey, timeWhenLocksToAcquireLoopStarted);
// failed to acquire, exit this loop to restart all over again.
locksToAcquire = true;
break;
}else{
removeCacheKeyFromMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(currentThread, activeCacheKey);
objectChangeSet.setActiveCacheKey(activeCacheKey);
mergeManager.getAcquiredLocks().add(activeCacheKey);
}
} else {
removeCacheKeyFromMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(currentThread, activeCacheKey);
objectChangeSet.setActiveCacheKey(activeCacheKey);
mergeManager.getAcquiredLocks().add(activeCacheKey);
}
}
}
} catch (RuntimeException exception) {
// if there was an exception then release.
//must not release in a finally block as release only occurs in this method
// if there is a problem or all of the locks can not be acquired.
releaseAllAcquiredLocks(mergeManager);
throw exception;
} catch (InterruptedException exception) {
releaseAllAcquiredLocks(mergeManager);
throw ConcurrencyException.waitFailureOnClientSession(exception);
} catch (Error error){
releaseAllAcquiredLocks(mergeManager);
mergeManager.getSession().logThrowable(SessionLog.SEVERE, SessionLog.TRANSACTION, error);
throw error;
}finally {
if (mergeManager.getWriteLockQueued() != null) {
//the merge manager entered the wait queue and must be cleaned up
synchronized(this.prevailingQueue) {
this.prevailingQueue.remove(mergeManager.getQueueNode());
}
mergeManager.setWriteLockQueued(null);
}
clearMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(currentThread);
clearMapThreadToObjectIdsWithChagenSet(currentThread);
}
}
/**
* INTERNAL:
* This method will be called by a merging thread that is attempting to lock
* a new object that was not locked previously. Unlike the other methods
* within this class this method will lock only this object.
*/
public CacheKey appendLock(Object primaryKey, Object objectToLock, ClassDescriptor descriptor, MergeManager mergeManager, AbstractSession session) {
CacheKey lockedCacheKey = session.getIdentityMapAccessorInstance().acquireLockNoWait(primaryKey, descriptor.getJavaClass(), false, descriptor);
if (lockedCacheKey == null) {
session.getIdentityMapAccessorInstance().getWriteLockManager().transitionToDeferredLocks(mergeManager);
lockedCacheKey = session.getIdentityMapAccessorInstance().acquireDeferredLock(primaryKey, descriptor.getJavaClass(), descriptor, true);
Object cachedObject = lockedCacheKey.getObject();
if (cachedObject == null) {
if (lockedCacheKey.getActiveThread() == Thread.currentThread()) {
lockedCacheKey.setObject(objectToLock);
} else {
cachedObject = lockedCacheKey.waitForObject();
}
}
lockedCacheKey.releaseDeferredLock();
return lockedCacheKey;
} else {
if (lockedCacheKey.getObject() == null) {
lockedCacheKey.setObject(objectToLock); // set the object in the
// cachekey
// for others to find an prevent cycles
}
if (mergeManager.isTransitionedToDeferredLocks()){
ConcurrencyManager.getDeferredLockManager(Thread.currentThread()).getActiveLocks().add(lockedCacheKey);
}else{
mergeManager.getAcquiredLocks().add(lockedCacheKey);
}
return lockedCacheKey;
}
}
/**
* INTERNAL:
* This method performs the operations of finding the cacheKey and locking it if possible.
* Returns True if the lock was acquired, false otherwise
*/
protected CacheKey attemptToAcquireLock(ClassDescriptor descriptor, Object primaryKey, AbstractSession session) {
return session.getIdentityMapAccessorInstance().acquireLockNoWait(primaryKey, descriptor.getJavaClass(), true, descriptor);
}
/**
* INTERNAL:
* Simply check that the object is not already locked then pass it on to the locking method
*/
protected CacheKey checkAndLockObject(Object objectToLock, Map lockedObjects, Map refreshedObjects, DatabaseMapping mapping, AbstractSession cloningSession) {
//the cachekey should always reference an object otherwise what would we be cloning.
if ((objectToLock != null) && !lockedObjects.containsKey(objectToLock)) {
Object primaryKeyToLock = null;
ClassDescriptor referenceDescriptor = null;
if (mapping.getReferenceDescriptor().hasInheritance() || mapping.getReferenceDescriptor().isDescriptorForInterface()) {
referenceDescriptor = cloningSession.getDescriptor(objectToLock);
} else {
referenceDescriptor = mapping.getReferenceDescriptor();
}
// Need to traverse aggregates, but not lock aggregates directly.
if (referenceDescriptor.isDescriptorTypeAggregate()) {
traverseRelatedLocks(objectToLock, lockedObjects, refreshedObjects, referenceDescriptor, cloningSession);
} else {
primaryKeyToLock = referenceDescriptor.getObjectBuilder().extractPrimaryKeyFromObject(objectToLock, cloningSession);
CacheKey cacheKey = cloningSession.getIdentityMapAccessorInstance().getCacheKeyForObjectForLock(primaryKeyToLock, objectToLock.getClass(), referenceDescriptor);
if (cacheKey == null) {
// Cache key may be null for no-identity map, missing or deleted object, just create a new one to be locked.
cacheKey = new CacheKey(primaryKeyToLock);
cacheKey.setReadTime(System.currentTimeMillis());
}
CacheKey toWaitOn = acquireLockAndRelatedLocks(objectToLock, lockedObjects, refreshedObjects, cacheKey, referenceDescriptor, cloningSession);
if (toWaitOn != null) {
return toWaitOn;
}
}
}
return null;
}
/**
* INTERNAL:
* This method will release all acquired locks
*/
public void releaseAllAcquiredLocks(MergeManager mergeManager) {
if (!MergeManager.LOCK_ON_MERGE) {//lockOnMerge is a backdoor and not public
return;
}
List<CacheKey> acquiredLocks = mergeManager.getAcquiredLocks();
Iterator<CacheKey> locks = acquiredLocks.iterator();
RuntimeException exception = null;
while (locks.hasNext()) {
try {
CacheKey cacheKeyToRemove = locks.next();
if (cacheKeyToRemove.getObject() == null) {
cacheKeyToRemove.removeFromOwningMap();
}
if (mergeManager.isTransitionedToDeferredLocks()) {
cacheKeyToRemove.releaseDeferredLock();
} else {
cacheKeyToRemove.release();
}
} catch (RuntimeException e){
if (exception == null){
exception = e;
}
}
}
acquiredLocks.clear();
if (exception != null){
throw exception;
}
}
/**
* INTERNAL:
* This method performs the operations of finding the cacheKey and locking it if possible.
* Waits until the lock can be acquired
*/
protected CacheKey waitOnObjectLock(ClassDescriptor descriptor, Object primaryKey, AbstractSession session, int waitTime) {
return session.getIdentityMapAccessorInstance().acquireLockWithWait(primaryKey, descriptor.getJavaClass(), true, descriptor, waitTime);
}
// Helper data structures to have tracebility about object ids with change sets and cache keys we are sturggling to acquire
/** Getter for {@link #THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS} */
public static Map<Thread, Set<ConcurrencyManager>> getThreadToFailToAcquireCacheKeys() {
return unmodifiableMap(THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS);
}
/** Getter for {@link #MAP_WRITE_LOCK_MANAGER_THREAD_TO_OBJECT_IDS_WITH_CHANGE_SET} */
public static Map<Thread, Set<Object>> getMapWriteLockManagerThreadToObjectIdsWithChangeSet() {
return unmodifiableMap(MAP_WRITE_LOCK_MANAGER_THREAD_TO_OBJECT_IDS_WITH_CHANGE_SET);
}
/**
* Remove the current thread from the map of object ids with change sets that are about to bec ommited
*
* @param thread
* the thread that is clearing itself out of the map of change sets it needs to merge into the shared
* cache
*/
public static void clearMapThreadToObjectIdsWithChagenSet(Thread thread) {
MAP_WRITE_LOCK_MANAGER_THREAD_TO_OBJECT_IDS_WITH_CHANGE_SET.remove(thread);
}
/**
* Before a thread starts long wait loop to acquire write locks during a commit transaction the thread will record
* in this map the object ids it holds with chance sets. It will be useful information if a dead lock is taking
* place.
*
* @param thread
* the thread that is in the middle of merge to the shared cache trying to acquire write locks to do this
* merge
* @param objectChangeSets
* the object change sets it has in its hands and that it would like to merge into the cache
*/
public static void populateMapThreadToObjectIdsWithChagenSet(Thread thread,
Collection<ObjectChangeSet> objectChangeSets) {
// (a) make sure the map has an entry for the the thread
boolean hasKey = MAP_WRITE_LOCK_MANAGER_THREAD_TO_OBJECT_IDS_WITH_CHANGE_SET.containsKey(thread);
if (!hasKey) {
MAP_WRITE_LOCK_MANAGER_THREAD_TO_OBJECT_IDS_WITH_CHANGE_SET.putIfAbsent(thread,
ConcurrentHashMap.newKeySet());
}
// (b) The ids of the objects with change sets
Set<Object> primarykeys = MAP_WRITE_LOCK_MANAGER_THREAD_TO_OBJECT_IDS_WITH_CHANGE_SET.get(thread);
primarykeys.clear();
for (ObjectChangeSet objectChangeSet : objectChangeSets) {
Object primaryKey = objectChangeSet.getId();
primarykeys.add(primaryKey);
}
}
/**
* Before the problematic while loop starts we should always clear for this thread the set of cache keys it could
* not acquire.
*
* @param thread
* the thread that what clear his set of cache keys it is struggling to acquire.
*/
public static void clearMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(Thread thread) {
THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS.remove(thread);
}
/**
* The thread was doing its while loop to acquire all required locks to proceed with the commmit and it realized
* there was one cache key it is unable to acquire
*
* @param thread
* thread the thread working on updating the shared cache
* @param cacheKeyThatCouldNotBeAcquired
* the cache key it is not managing to acquire
* @throws InterruptedException
* Should be fired because we are passing a flag into the
* determineIfReleaseDeferredLockAppearsToBeDeadLocked to say we do not want the thread to be blown up
* (e.g. we are afraid of breaking threads in the middle of a commit process could be quite dangerous).
* See
* {@link #ALLOW_INTERRUPTED_EXCEPTION_TO_BE_FIRED_UP_FALSE}
*/
public static void addCacheKeyToMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(Thread thread, ConcurrencyManager cacheKeyThatCouldNotBeAcquired, long whileStartDate) throws InterruptedException {
// sanity check, make sure the cacheKeyThatCouldNotBeAcquired is not null
// should never happen because when the write lock manager fails to acquire the cache key both with acquire no
// wait and acquire with wait
// then the code will just grab the cache key fro loggging puprposes using the
// see the code getCacheKeyForObjectForLock
// this is why we believe this is never null. But the sanity check does not hurt us.
if (cacheKeyThatCouldNotBeAcquired == null) {
return;
}
// (b) add the cache key to the set if absent
Set<ConcurrencyManager> cacheKeysWeAreHavingDifficultyAcquiring = getCacheKeysThatCouldNotBeAcquiredByThread(
thread);
if(!cacheKeysWeAreHavingDifficultyAcquiring.contains(cacheKeyThatCouldNotBeAcquired)) {
cacheKeysWeAreHavingDifficultyAcquiring.add(cacheKeyThatCouldNotBeAcquired);
}
// (c) If a write lock fails to be acquired and goes into the basked of cache keys that could not be acquired
// it could be an indication this thread is stuck for a long while
// NOTE:
// it might be best to not even give the possibility for an exception to be fired
// for code that is in the lock manager
final Thread currentThread = Thread.currentThread();
DeferredLockManager lockManager = ConcurrencyManager.getDeferredLockManager(currentThread);
ReadLockManager readLockManager = ConcurrencyManager.getReadLockManager(currentThread);
ConcurrencyUtil.SINGLETON.determineIfReleaseDeferredLockAppearsToBeDeadLocked(
cacheKeyThatCouldNotBeAcquired, whileStartDate, lockManager, readLockManager,
ALLOW_INTERRUPTED_EXCEPTION_TO_BE_FIRED_UP_FALSE);
}
/**
* A cache keys was successfully acquired we want to make sure it is not recorded in the map of cache keys that
* could not be acquired. The situation theoretically can change. Failing to acquire a write lock can be a temporary
* situation. The lock might become available eventually. Otherwise there would be no point for the while loop that
* is trying to acquire these locks.
*
* @param thread
* the thread that just managed to grab a write lock
* @param cacheKeyThatCouldNotBeAcquired
* the cache key it managed to acquire for writing.
*/
public static void removeCacheKeyFromMapWriteLockManagerToCacheKeysThatCouldNotBeAcquired(Thread thread,
ConcurrencyManager cacheKeyThatCouldNotBeAcquired) {
Set<ConcurrencyManager> cacheKeysWeAreHavingDifficultyAcquiring = getCacheKeysThatCouldNotBeAcquiredByThread(
thread);
cacheKeysWeAreHavingDifficultyAcquiring.remove(cacheKeyThatCouldNotBeAcquired);
}
/**
* If the thread is not yet registered in the map it will get registered with an empty map.
*
* @param thread
* the thread that wants to get its set of cache keys it is not managing to acquire.
* @return the set of cache keys the thrad is struggling to acquire
*/
private static Set<ConcurrencyManager> getCacheKeysThatCouldNotBeAcquiredByThread(Thread thread) {
// (a) make sure the map has an entry for the the thread
boolean hasKey = THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS.containsKey(thread);
if (!hasKey) {
Set<ConcurrencyManager> cacheKeySet = ConcurrentHashMap.newKeySet();
THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS.putIfAbsent(thread, cacheKeySet);
}
// (b) We are certain the map is not empty anymore return the set
return THREAD_TO_FAIL_TO_ACQUIRE_CACHE_KEYS.get(thread);
}
}