blob: 92107eb0b535f0cc4afc1a9feaaa89a2ec4e1766 [file] [log] [blame]
/*******************************************************************************
* Copyright (c) 1998, 2013 Oracle and/or its affiliates. All rights reserved.
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v1.0 and Eclipse Distribution License v. 1.0
* which accompanies this distribution.
* The Eclipse Public License is available at http://www.eclipse.org/legal/epl-v10.html
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Oracle - initial API and implementation from Oracle TopLink
******************************************************************************/
package org.eclipse.persistence.queries;
import java.util.*;
import org.eclipse.persistence.descriptors.ClassDescriptor;
import org.eclipse.persistence.exceptions.*;
import org.eclipse.persistence.expressions.*;
import org.eclipse.persistence.internal.queries.*;
import org.eclipse.persistence.internal.sessions.AbstractRecord;
import org.eclipse.persistence.internal.expressions.*;
import org.eclipse.persistence.internal.databaseaccess.*;
import org.eclipse.persistence.internal.helper.*;
/**
* <p><b>Purpose</b>:
* Stream class which is used to deal with large collections returned
* from TOPLink queries more efficiently.
* <p>
* <p><b>Responsibilities</b>:
* Wraps a database result set cursor to provide a stream on the resulting selected objects.
*
* @author Yvon Lavoie
* @since TOPLink/Java 1.0
*/
public class CursoredStream extends Cursor {
/** Marker for backing up. */
protected int marker;
/**
* INTERNAL:
* Initialize the state of the stream
*/
public CursoredStream() {
super();
}
/**
* INTERNAL:
* Initialize the state of the stream
*/
public CursoredStream(DatabaseCall call, CursoredStreamPolicy policy) {
super(call, policy);
// Must close on exception as stream will not be returned to user.
try {
setLimits();
} catch (RuntimeException exception) {
try {
close();
} catch (RuntimeException ignore) {
}
throw exception;
}
}
/**
* PUBLIC:
* Return whether the cursored stream is at its end.
*/
public boolean atEnd() throws DatabaseException {
if ((this.position + 1) <= this.objectCollection.size()) {
return false;
}
if (this.nextRow != null) {
return false;
}
if (isClosed()) {
return true;
}
int oldSize = this.objectCollection.size();
retrieveNextPage();
return this.objectCollection.size() == oldSize;
}
/**
* PUBLIC:
* Returns the number of objects that can be read from this input without blocking.
*/
public int available() throws DatabaseException {
//For CR#2570/CR#2571.
return this.objectCollection.size() - this.position;
}
/**
* INTERNAL:
* Must build the count on the primary key fields, not * as * is not allowed if there was a distinct.
* This require a manually defined operator.
* added for CR 2900
*/
public Expression buildCountDistinctExpression(List includeFields, ExpressionBuilder builder) {
ExpressionOperator countOperator = new ExpressionOperator();
countOperator.setType(ExpressionOperator.AggregateOperator);
Vector databaseStrings = new Vector();
databaseStrings.add("COUNT(DISTINCT ");
databaseStrings.add(")");
countOperator.printsAs(databaseStrings);
countOperator.bePrefix();
countOperator.setNodeClass(ClassConstants.FunctionExpression_Class);
Expression firstFieldExpression = builder.getField(((DatabaseField)includeFields.get(0)).getQualifiedName());
return countOperator.expressionForArguments(firstFieldExpression, new ArrayList(0));
}
/**
* INTERNAL:
* Answer a list of the elements of the receiver's collection from startIndex to endIndex.
*/
protected List<Object> copy(int startIndex, int endIndex) throws QueryException {
while (this.objectCollection.size() < endIndex) {
if (retrieveNextObject() == null) {
throw QueryException.readBeyondStream(this.query);
}
}
return Helper.copyVector(this.objectCollection, startIndex, endIndex);
}
/**
* INTERNAL:
* Retrieve the size of the open cursor by executing a count on the same query as the cursor.
*/
protected int getCursorSize() throws DatabaseException, QueryException {
ValueReadQuery query;
if (!((CursoredStreamPolicy)this.policy).hasSizeQuery()) {
if (this.query.isCallQuery()) {
throw QueryException.additionalSizeQueryNotSpecified(this.query);
}
if (!this.query.isExpressionQuery()) {
throw QueryException.sizeOnlySupportedOnExpressionQueries(this.query);
}
// Construct the select statement
SQLSelectStatement selectStatement = new SQLSelectStatement();
// 2612538 - the default size of Map (32) is appropriate
Map clonedExpressions = new IdentityHashMap();
selectStatement.setWhereClause(((ExpressionQueryMechanism)this.query.getQueryMechanism()).buildBaseSelectionCriteria(false, clonedExpressions));
ClassDescriptor descriptor = this.query.getDescriptor();
// Case, normal read for branch inheritance class that reads subclasses all in its own table(s).
if (descriptor.hasInheritance() && (descriptor.getInheritancePolicy().getWithAllSubclassesExpression() != null)) {
Expression branchIndicator = descriptor.getInheritancePolicy().getWithAllSubclassesExpression();
if ((branchIndicator != null) && (selectStatement.getWhereClause() != null)) {
selectStatement.setWhereClause(selectStatement.getWhereClause().and(branchIndicator));
} else if (branchIndicator != null) {
selectStatement.setWhereClause((Expression)branchIndicator.clone());
}
}
selectStatement.setTables((Vector)descriptor.getTables().clone());
// Count * cannot be used with distinct.
// Count * cannot be used with distinct.
// CR 2900 if the original query used distinct only on one field then perform the count
// on that field.
if (((ReadAllQuery)this.query).shouldDistinctBeUsed() && (this.query.getCall().getFields().size() == 1)) {
selectStatement.addField(buildCountDistinctExpression(this.query.getCall().getFields(), ((ReadAllQuery)this.query).getExpressionBuilder()));
} else {
selectStatement.computeDistinct();
if (selectStatement.shouldDistinctBeUsed() && (descriptor.getPrimaryKeyFields().size() == 1)) {
// Can only do this with a singleton primary keys.
selectStatement.addField(buildCountDistinctExpression(descriptor.getPrimaryKeyFields(), ((ReadAllQuery)this.query).getExpressionBuilder()));
} else {
selectStatement.addField(((ReadAllQuery)this.query).getExpressionBuilder().count());
}
selectStatement.dontUseDistinct();
}
selectStatement.normalize(getSession(), descriptor, clonedExpressions);
// Construct the query
query = new ValueReadQuery();
query.setSQLStatement(selectStatement);
} else {
query = ((CursoredStreamPolicy)this.policy).getSizeQuery();
}
Number value = (Number)getSession().executeQuery(query, this.query.getTranslationRow());
if (value == null) {
throw QueryException.incorrectSizeQueryForCursorStream(this.query);
}
return value.intValue();
}
/**
* INTERNAL:
* Return the threshold for the stream.
*/
protected int getInitialReadSize() {
return ((CursoredStreamPolicy)this.policy).getInitialReadSize();
}
/**
* INTERNAL:
* Return the marker used for mark() & reset() operations.
*/
protected int getMarker() {
return marker;
}
/**
* INTERNAL:
* Return the page size for the stream.
*/
public int getPageSize() {
return ((CursoredStreamPolicy)this.policy).getPageSize();
}
/**
* INTERNAL:
* Return the position of the stream inside the object collection
*/
public int getPosition() {
return position;
}
/**
* PUBLIC:
* Return whether the cursored stream has any more elements.
*/
public boolean hasMoreElements() {
return !atEnd();
}
/**
* PUBLIC:
* Return whether the cursored stream has any more elements.
*/
public boolean hasNext() {
return !atEnd();
}
/**
* PUBLIC:
* Mark the present position in the stream.
* Subsequent calls to reset() will attempt to reposition the stream to this point.
*
* @param readAheadLimit Limit on the number of characters that may be
* read while still preserving the mark. Because
* the stream's input comes from the database, there
* is no actual limit, so this argument is ignored.
*/
public void mark(int readAheadLimit) {
this.marker = this.position;
}
/**
* PUBLIC:
* Tests if this input stream supports the <code>mark</code>
* and <code>reset</code> methods. The <code>markSupported</code>
* method of <code>InputStream</code> returns <code>false</code>.
*/
public boolean markSupported() {
return true;
}
/**
* PUBLIC:
* Return the next object from the collection, if beyond the read limit read from the cursor.
* @return the next object in stream
*/
public Object nextElement() {
return read();
}
/**
* PUBLIC:
* Return the next object from the collection, if beyond the read limit read from the cursor.
* @return the next object in stream
*/
public Object next() {
return read();
}
/**
* PUBLIC:
* Return a Vector of at most numberOfElements of the next objects from the collection. If there
* aren't that many objects left to read, just return what is available.
* @return the next objects in stream
*/
public Vector nextElements(int numberOfElements) {
Vector nextElements = new Vector(numberOfElements);
while (nextElements.size() < numberOfElements) {
if (atEnd()) {
return nextElements;
}
nextElements.add(nextElement());
}
return nextElements;
}
/**
* PUBLIC:
* Return a Vector of at most numberOfElements of the next objects from the collection. If there
* aren't that many objects left to read, just return what is available.
* @return the next objects in stream
*/
public List<Object> next(int numberOfElements) {
return nextElements(numberOfElements);
}
/**
* PUBLIC:
* Return the next object in the stream, without incrementing the stream's position.
*/
public Object peek() throws DatabaseException {
Object object = read();
this.position = this.position - 1;
return object;
}
/**
* PUBLIC:
* This method differs slightly from conventional read() operation on a Java stream. This
* method return the next object in the collection rather than specifying the number of
* bytes to be read in.
*
* Return the next object from the collection, if beyond the read limit read from the cursor
* @return - next object in stream
* @exception - throws exception if read pass end of stream
*/
public Object read() throws DatabaseException, QueryException {
// CR#2571. If no more objects in collection get next page.
if (this.objectCollection.size() == this.position) {
retrieveNextPage();
}
if (atEnd()) {
throw QueryException.readBeyondStream(this.query);
}
Object object = this.objectCollection.get(this.position);
this.position = this.position + 1;
return object;
}
/**
* PUBLIC:
* This method differs slightly from conventional read() operation on a Java stream. This
* method returns the next number of objects in the collection in a vector.
*
* Return the next object from the collection, if beyond the read limit read from the cursor
* @param number - number of objects to be returned
* @return - vector containing next number of objects
* @exception - throws exception if read pass end of stream
*/
public List<Object> read(int number) throws DatabaseException {
List<Object> result = copy(this.position, this.position + number);
this.position = this.position + result.size();
return result;
}
/**
* PUBLIC:
* Release all objects read in so far.
* This should be performed when reading in a large collection of
* objects in order to preserve memory.
*/
public void clear() {
super.clear();
if (this.position == 0) {
return;
}
this.objectCollection = Helper.copyVector(this.objectCollection, this.position, this.objectCollection.size());
this.position = 0;
}
/**
* PUBLIC:
* Release all objects read in so far.
* This should be performed when reading in a large collection of
* objects in order to preserve memory.
*/
public void releasePrevious() {
clear();
}
/**
* PUBLIC:
* Repositions this stream to the position at the time the
* mark method was last called on this stream.
*/
public void reset() {
this.position = this.marker;
}
protected Object retrieveNextObject() throws DatabaseException {
while (true) {
AbstractRecord row = null;
if (this.nextRow == null) {
if (isClosed()) {
return null;
}
row = getAccessor().cursorRetrieveNextRow(this.fields, this.resultSet, this.executionSession);
} else {
row = this.nextRow;
this.nextRow = null;
}
if (row == null) {
close();
return null;
}
// If using 1-m joining need to fetch 1-m rows as well.
if (this.query.isObjectLevelReadQuery() && ((ObjectLevelReadQuery)this.query).hasJoining()) {
if (!isClosed()) {
JoinedAttributeManager joinManager = ((ObjectLevelReadQuery)this.query).getJoinedAttributeManager();
if (joinManager.isToManyJoin()) {
this.nextRow = joinManager.processDataResults(row, this, true);
if (this.nextRow == null) {
close();
}
}
}
}
Object object = buildAndRegisterObject(row);
if (object == InvalidObject.instance) {
continue;
}
if (object != null) {
this.objectCollection.add(object);
}
return object;
}
}
/**
* INTERNAL:
* Retrieve and add the next page size of rows to the vector.
* Return the last object, or null if at end.
*/
protected Object retrieveNextPage() throws DatabaseException {
Object last = null;
int pageSize = getPageSize();
for (int index = 0; index < pageSize; index++) {
last = retrieveNextObject();
if (last == null) {
return null;
}
}
return last;
}
/**
* INTERNAL:
* Initialize the stream size and position
*/
protected void setLimits() {
this.position = 0;
this.marker = 0;
int readSize = getInitialReadSize();
for (int index = 0; index < readSize; index++) {
retrieveNextObject();
}
}
/**
* INTERNAL:
* Set the marker used for mark() & reset() operations
*/
protected void setMarker(int value) {
marker = value;
}
}