blob: 1320878743bf52bae85ae534c74eee4cf32828e9 [file] [log] [blame]
/*
* Copyright (c) 1998, 2020 Oracle and/or its affiliates. All rights reserved.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0,
* or the Eclipse Distribution License v. 1.0 which is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
*/
// Contributors:
// Oracle - initial API and implementation from Oracle TopLink
package org.eclipse.persistence.internal.eis.adapters.aq;
import jakarta.resource.*;
import jakarta.resource.cci.*;
import oracle.AQ.*;
import org.eclipse.persistence.eis.EISException;
/**
* Interaction to Oracle AQ JCA adapter.
* Executes the interaction spec to enqueue or dequeue a message.
*
* @author James
* @since OracleAS TopLink 10<i>g</i> (10.0.3)
*/
public class AQInteraction implements Interaction {
/** Store the connection the interaction was created from. */
protected AQConnection connection;
/**
* Default constructor.
*/
public AQInteraction(AQConnection connection) {
this.connection = connection;
}
@Override
public void clearWarnings() {
}
@Override
public void close() {
}
/**
* Execute the interaction spec.
* The spec is either an enqueue or dequeue interaction.
*/
@Override
public jakarta.resource.cci.Record execute(InteractionSpec spec, jakarta.resource.cci.Record input) throws ResourceException {
AQRecord record = new AQRecord();
execute(spec, input, record);
return record;
}
/**
* Execute the interaction and set the output into the output record.
* The spec is either an enqueue or dequeue interaction.
* Only raw messages are supported.
*/
@Override
public boolean execute(InteractionSpec spec, jakarta.resource.cci.Record input, jakarta.resource.cci.Record output) throws ResourceException {
if (!(spec instanceof AQInteractionSpec)) {
throw EISException.invalidAQInteractionSpecType();
}
if ((!(input instanceof AQRecord)) || (!(output instanceof AQRecord))) {
throw EISException.invalidAQRecordType();
}
// Use auto-commit if not in a transaction
boolean autocommit = false;
if (!connection.getAQTransaction().isInTransaction()) {
autocommit = true;
connection.getAQTransaction().begin();
}
try {
if (spec instanceof AQEnqueueInteractionSpec) {
executeEnqueueInteraction((AQEnqueueInteractionSpec)spec, (AQRecord)input, (AQRecord)output);
} else if (spec instanceof AQDequeueInteractionSpec) {
executeDequeueInteraction((AQDequeueInteractionSpec)spec, (AQRecord)input, (AQRecord)output);
}
} catch (Exception exception) {
throw new ResourceException(exception.toString());
} finally {
if (autocommit) {
connection.getAQTransaction().commit();
}
}
return true;
}
/**
* Execute the enqueue interaction.
* Only raw messages are supported.
*/
protected void executeEnqueueInteraction(AQEnqueueInteractionSpec spec, AQRecord input, AQRecord output) throws ResourceException {
try {
AQQueue queue = this.connection.getSession().getQueue(spec.getSchema(), spec.getQueue());
AQMessage message = queue.createMessage();
AQRawPayload payload = message.getRawPayload();
if (input.size() != 1) {
throw EISException.invalidAQInput();
}
byte[] bytes = null;
if (input.get(0) instanceof String) {
bytes = ((String)input.get(0)).getBytes();
} else if (input.get(0) instanceof byte[]) {
bytes = (byte[])input.get(0);
} else {
throw EISException.invalidAQInput();
}
payload.setStream(bytes, bytes.length);
AQEnqueueOption enqueueOption = spec.getOptions();
if (enqueueOption == null) {
enqueueOption = new AQEnqueueOption();
}
queue.enqueue(enqueueOption, message);
} catch (Exception exception) {
throw new ResourceException(exception.toString());
}
}
/**
* Execute the dequeue interaction.
* Only raw messages are supported.
*/
protected void executeDequeueInteraction(AQDequeueInteractionSpec spec, AQRecord input, AQRecord output) throws ResourceException {
try {
AQQueue queue = this.connection.getSession().getQueue(spec.getSchema(), spec.getQueue());
AQDequeueOption dequeueOption = spec.getOptions();
if (dequeueOption == null) {
dequeueOption = new AQDequeueOption();
}
AQMessage message = queue.dequeue(dequeueOption);
output.add(message.getRawPayload().getBytes());
} catch (Exception exception) {
throw new ResourceException(exception.toString());
}
}
@Override
public Connection getConnection() {
return connection;
}
@Override
public ResourceWarning getWarnings() {
return null;
}
}