blob: 1b777ee033a19d75663444971ab74a03a19f479b [file] [log] [blame]
/*
* Copyright (c) 2011, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// James Sutherland - initial API and implementation
package org.eclipse.persistence.platform.database.oracle.dcn;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.eclipse.persistence.annotations.DatabaseChangeNotificationType;
import org.eclipse.persistence.descriptors.CacheIndex;
import org.eclipse.persistence.descriptors.ClassDescriptor;
import org.eclipse.persistence.descriptors.DescriptorEvent;
import org.eclipse.persistence.descriptors.DescriptorEventAdapter;
import org.eclipse.persistence.exceptions.DatabaseException;
import org.eclipse.persistence.internal.databaseaccess.Accessor;
import org.eclipse.persistence.internal.expressions.SQLSelectStatement;
import org.eclipse.persistence.internal.helper.DatabaseField;
import org.eclipse.persistence.internal.helper.DatabaseTable;
import org.eclipse.persistence.internal.identitymaps.CacheId;
import org.eclipse.persistence.internal.identitymaps.CacheKey;
import org.eclipse.persistence.internal.sessions.AbstractRecord;
import org.eclipse.persistence.internal.sessions.AbstractSession;
import org.eclipse.persistence.logging.SessionLog;
import org.eclipse.persistence.mappings.DatabaseMapping;
import org.eclipse.persistence.platform.database.events.DatabaseEventListener;
import org.eclipse.persistence.queries.ValueReadQuery;
import org.eclipse.persistence.sessions.Session;
import oracle.jdbc.OracleConnection;
import oracle.jdbc.OracleStatement;
import oracle.jdbc.dcn.DatabaseChangeEvent;
import oracle.jdbc.dcn.DatabaseChangeListener;
import oracle.jdbc.dcn.DatabaseChangeRegistration;
import oracle.jdbc.dcn.RowChangeDescription;
import oracle.jdbc.dcn.TableChangeDescription;
/**
* PUBLIC:
* Listener for Oracle Database Change event Notification (DCN).
* This allows the EclipseLink cache to be invalidated by database events.
*
* @author James Sutherland
* @since EclipseLink 2.4
*/
public class OracleChangeNotificationListener implements DatabaseEventListener {
public static String ORA_TRANSACTION_ID = "oracle.dcn.transaction-id";
public static String ROWID = "ROWID";
/** The Oracle JDBC registration object. */
protected DatabaseChangeRegistration register;
/** Map each table to the descriptor that needs to be invalidated. */
protected Map<DatabaseTable, ClassDescriptor> descriptorsByTable;
/** Cache query for transaction id. */
protected ValueReadQuery transactionIdQuery;
public OracleChangeNotificationListener() {
this.transactionIdQuery = new ValueReadQuery("SELECT DBMS_TRANSACTION.LOCAL_TRANSACTION_ID FROM DUAL");
transactionIdQuery.setName(ORA_TRANSACTION_ID);
}
/**
* INTERNAL:
* Register the event listener with the database.
*/
@Override
public void register(Session session) {
final AbstractSession databaseSession = (AbstractSession)session;
// Determine which tables should be tracked for change events.
this.descriptorsByTable = new HashMap<DatabaseTable, ClassDescriptor>();
for (ClassDescriptor descriptor : session.getDescriptors().values()) {
if (!descriptor.getTables().isEmpty()) {
if ((descriptor.getCachePolicy().getDatabaseChangeNotificationType() != null)
&& (descriptor.getCachePolicy().getDatabaseChangeNotificationType() != DatabaseChangeNotificationType.NONE)) {
this.descriptorsByTable.put(descriptor.getTables().get(0), descriptor);
}
}
}
Accessor accessor = databaseSession.getAccessor();
accessor.incrementCallCount(databaseSession);
try {
OracleConnection connection = (OracleConnection)databaseSession.getServerPlatform().unwrapConnection(accessor.getConnection());
databaseSession.log(SessionLog.FINEST, SessionLog.CONNECTION, "dcn_registering");
Properties properties = new Properties();
properties.setProperty(OracleConnection.DCN_NOTIFY_ROWIDS, "true");
properties.setProperty(OracleConnection.DCN_IGNORE_INSERTOP, "true");
try {
// Register with the database change notification, the connection is not relevant, the events occur after the connection is closed,
// and a different connection can be used to unregister the event listener.
this.register = connection.registerDatabaseChangeNotification(properties);
final List<DatabaseField> fields = new ArrayList<DatabaseField>();
fields.add(new DatabaseField(ROWID));
this.register.addListener(new DatabaseChangeListener() {
@Override
public void onDatabaseChangeNotification(DatabaseChangeEvent changeEvent) {
databaseSession.log(SessionLog.FINEST, SessionLog.CONNECTION, "dcn_change_event", changeEvent);
if (changeEvent.getTableChangeDescription() != null) {
for (TableChangeDescription tableChange : changeEvent.getTableChangeDescription()) {
ClassDescriptor descriptor = OracleChangeNotificationListener.this.descriptorsByTable.get(new DatabaseTable(tableChange.getTableName()));
if (descriptor != null) {
CacheIndex index = descriptor.getCachePolicy().getCacheIndex(fields);
for (RowChangeDescription rowChange : tableChange.getRowChangeDescription()) {
CacheId id = new CacheId(new Object[]{rowChange.getRowid().stringValue()});
CacheKey key = databaseSession.getIdentityMapAccessorInstance().getIdentityMapManager().getCacheKeyByIndex(
index, id, true, descriptor);
if (key != null) {
if ((key.getTransactionId() == null) || !key.getTransactionId().equals(changeEvent.getTransactionId(true))) {
databaseSession.log(SessionLog.FINEST, SessionLog.CONNECTION, "dcn_invalidate", key.getKey(), descriptor.getJavaClass().getName());
key.setInvalidationState(CacheKey.CACHE_KEY_INVALID);
}
}
}
}
}
}
}
});
// Register each table for database events, this is done by executing a select from the table.
for (DatabaseTable table : this.descriptorsByTable.keySet()) {
OracleStatement statement = (OracleStatement)connection.createStatement();
statement.setDatabaseChangeRegistration(this.register);
try {
statement.executeQuery("SELECT ROWID FROM " + table.getQualifiedName()).close();
databaseSession.log(SessionLog.FINEST, SessionLog.CONNECTION, "dcn_register_table", table.getQualifiedName());
} catch (Exception failed) {
// This will fail if the table does not exist,
// just log the error to allow table creation to work.
databaseSession.logThrowable(SessionLog.WARNING, SessionLog.SQL, failed);
} finally {
statement.close();
}
}
} catch (SQLException exception) {
throw DatabaseException.sqlException(exception, databaseSession.getAccessor(), databaseSession, false);
}
} finally {
accessor.decrementCallCount();
}
}
/**
* Initialize the descriptor to receive database change events.
* This is called when the descriptor is initialized.
*/
@Override
public void initialize(final ClassDescriptor descriptor, AbstractSession session) {
if (descriptor.getOptimisticLockingPolicy() == null) {
boolean requiresLocking = descriptor.hasMultipleTables();
for (DatabaseMapping mapping : descriptor.getMappings()) {
if (mapping.isCollectionMapping()) {
requiresLocking = true;
}
}
if (requiresLocking) {
session.log(SessionLog.WARNING, SessionLog.METADATA, "locking_required_for_database_change_notification", descriptor.getJavaClass());
}
}
final DatabaseField rowId = descriptor.buildField(new DatabaseField(ROWID));
final List<DatabaseField> fields = new ArrayList<DatabaseField>();
fields.add(rowId);
// May already have the index if has inheritance.
CacheIndex existingIndex = descriptor.getCachePolicy().getCacheIndex(fields);
if (existingIndex == null) {
if (descriptor.isChildDescriptor()) {
existingIndex = descriptor.getInheritancePolicy().getRootParentDescriptor().getCachePolicy().getCacheIndex(fields);
}
if (existingIndex == null) {
existingIndex = new CacheIndex(fields);
existingIndex.setIsUpdateable(false);
existingIndex.setIsInsertable(false);
}
descriptor.getCachePolicy().addCacheIndex(existingIndex);
}
final CacheIndex index = existingIndex;
rowId.setInsertable(false);
rowId.setUpdatable(false);
rowId.setCreatable(false);
descriptor.getFields().add(rowId);
descriptor.getAllFields().add(rowId);
final ValueReadQuery rowIdQuery = new ValueReadQuery();
rowIdQuery.setName(ROWID);
SQLSelectStatement sqlStatement = new SQLSelectStatement();
sqlStatement.setWhereClause(descriptor.getObjectBuilder().getPrimaryKeyExpression());
sqlStatement.addField(rowId);
sqlStatement.addTable(descriptor.getTables().get(0));
rowIdQuery.setSQLStatement(sqlStatement);
sqlStatement.normalize(session, null);
descriptor.getEventManager().addListener(new DescriptorEventAdapter() {
@Override
public void postMerge(DescriptorEvent event) {
if ((event.getChangeSet() != null) && event.getChangeSet().hasChanges()) {
Object id = event.getChangeSet().getId();
CacheKey cacheKey = event.getChangeSet().getActiveCacheKey();
if (cacheKey == null) {
cacheKey = event.getSession().getParent().getIdentityMapAccessorInstance().getIdentityMapManager().getCacheKeyForObject(id, descriptor.getJavaClass(), descriptor, false);
}
cacheKey.setTransactionId(event.getSession().getProperty(ORA_TRANSACTION_ID));
if (event.getChangeSet().isNew()) {
AbstractRecord row = descriptor.getObjectBuilder().buildRowFromPrimaryKeyValues(id, event.getSession());
Object rowid = event.getSession().executeQuery(rowIdQuery, row);
CacheId indexValue = new CacheId(new Object[]{rowid});
event.getSession().getParent().getIdentityMapAccessorInstance().getIdentityMapManager().putCacheKeyByIndex(index, indexValue, cacheKey, descriptor);
}
}
}
@Override
public void postUpdate(DescriptorEvent event) {
Object txId = event.getSession().getProperty(ORA_TRANSACTION_ID);
if (txId == null) {
txId = event.getSession().executeQuery(transactionIdQuery);
event.getSession().setProperty(ORA_TRANSACTION_ID, txId);
}
}
});
}
/**
* INTERNAL:
* Remove the event listener from the database.
*/
@Override
public void remove(Session session) {
if (this.register == null) {
return;
}
AbstractSession databaseSession = (AbstractSession)session;
Accessor accessor = databaseSession.getAccessor();
accessor.incrementCallCount(databaseSession);
try {
OracleConnection connection = (OracleConnection)databaseSession.getServerPlatform().unwrapConnection(accessor.getConnection());
databaseSession.log(SessionLog.FINEST, SessionLog.CONNECTION, "dcn_unregister");
try {
connection.unregisterDatabaseChangeNotification(this.register);
} catch (SQLException exception) {
throw DatabaseException.sqlException(exception, databaseSession.getAccessor(), databaseSession, false);
}
} finally {
accessor.decrementCallCount();
}
}
/**
* INTERNAL:
* Return the database register.
*/
public DatabaseChangeRegistration getRegister() {
return register;
}
/**
* INTERNAL:
* Set the database register.
*/
protected void setRegister(DatabaseChangeRegistration register) {
this.register = register;
}
/**
* INTERNAL:
* Return the mapping of tables to descriptors.
*/
public Map<DatabaseTable, ClassDescriptor> getDescriptorsByTable() {
return descriptorsByTable;
}
/**
* INTERNAL:
* Set the mapping of tables to descriptors.
*/
protected void setDescriptorsByTable(Map<DatabaseTable, ClassDescriptor> descriptorsByTable) {
this.descriptorsByTable = descriptorsByTable;
}
}