| /* |
| * Copyright (c) 1998, 2019 Oracle and/or its affiliates. All rights reserved. |
| * |
| * This program and the accompanying materials are made available under the |
| * terms of the Eclipse Public License v. 2.0 which is available at |
| * http://www.eclipse.org/legal/epl-2.0, |
| * or the Eclipse Distribution License v. 1.0 which is available at |
| * http://www.eclipse.org/org/documents/edl-v10.php. |
| * |
| * SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause |
| */ |
| |
| // Contributors: |
| // Oracle - initial API and implementation from Oracle TopLink |
| package org.eclipse.persistence.queries; |
| |
| import java.util.*; |
| |
| import org.eclipse.persistence.descriptors.ClassDescriptor; |
| import org.eclipse.persistence.exceptions.*; |
| import org.eclipse.persistence.expressions.*; |
| import org.eclipse.persistence.internal.queries.*; |
| import org.eclipse.persistence.internal.sessions.AbstractRecord; |
| import org.eclipse.persistence.internal.expressions.*; |
| import org.eclipse.persistence.internal.databaseaccess.*; |
| import org.eclipse.persistence.internal.helper.*; |
| |
| /** |
| * <p><b>Purpose</b>: |
| * Stream class which is used to deal with large collections returned |
| * from TOPLink queries more efficiently. |
| * |
| * <p><b>Responsibilities</b>: |
| * Wraps a database result set cursor to provide a stream on the resulting selected objects. |
| * |
| * @author Yvon Lavoie |
| * @since TOPLink/Java 1.0 |
| */ |
| public class CursoredStream extends Cursor { |
| |
| /** Marker for backing up. */ |
| protected int marker; |
| |
| /** |
| * INTERNAL: |
| * Initialize the state of the stream |
| */ |
| public CursoredStream() { |
| super(); |
| } |
| |
| /** |
| * INTERNAL: |
| * Initialize the state of the stream |
| */ |
| public CursoredStream(DatabaseCall call, CursoredStreamPolicy policy) { |
| super(call, policy); |
| // Must close on exception as stream will not be returned to user. |
| try { |
| setLimits(); |
| } catch (RuntimeException exception) { |
| try { |
| close(); |
| } catch (RuntimeException ignore) { |
| } |
| throw exception; |
| } |
| } |
| |
| /** |
| * PUBLIC: |
| * Return whether the cursored stream is at its end. |
| */ |
| public boolean atEnd() throws DatabaseException { |
| if ((this.position + 1) <= this.objectCollection.size()) { |
| return false; |
| } |
| if (this.nextRow != null) { |
| return false; |
| } |
| if (isClosed()) { |
| return true; |
| } |
| int oldSize = this.objectCollection.size(); |
| retrieveNextPage(); |
| return this.objectCollection.size() == oldSize; |
| } |
| |
| /** |
| * PUBLIC: |
| * Returns the number of objects that can be read from this input without blocking. |
| */ |
| public int available() throws DatabaseException { |
| //For CR#2570/CR#2571. |
| return this.objectCollection.size() - this.position; |
| } |
| |
| /** |
| * INTERNAL: |
| * Must build the count on the primary key fields, not * as * is not allowed if there was a distinct. |
| * This require a manually defined operator. |
| * added for CR 2900 |
| */ |
| public Expression buildCountDistinctExpression(List includeFields, ExpressionBuilder builder) { |
| ExpressionOperator countOperator = new ExpressionOperator(); |
| countOperator.setType(ExpressionOperator.AggregateOperator); |
| Vector databaseStrings = new Vector(); |
| databaseStrings.add("COUNT(DISTINCT "); |
| databaseStrings.add(")"); |
| countOperator.printsAs(databaseStrings); |
| countOperator.bePrefix(); |
| countOperator.setNodeClass(ClassConstants.FunctionExpression_Class); |
| Expression firstFieldExpression = builder.getField(((DatabaseField)includeFields.get(0)).getQualifiedName()); |
| return countOperator.expressionForArguments(firstFieldExpression, new ArrayList(0)); |
| |
| } |
| |
| /** |
| * INTERNAL: |
| * Answer a list of the elements of the receiver's collection from startIndex to endIndex. |
| */ |
| protected List<Object> copy(int startIndex, int endIndex) throws QueryException { |
| while (this.objectCollection.size() < endIndex) { |
| if (retrieveNextObject() == null) { |
| throw QueryException.readBeyondStream(this.query); |
| } |
| } |
| return Helper.copyVector(this.objectCollection, startIndex, endIndex); |
| } |
| |
| /** |
| * INTERNAL: |
| * Retrieve the size of the open cursor by executing a count on the same query as the cursor. |
| */ |
| @Override |
| protected int getCursorSize() throws DatabaseException, QueryException { |
| ValueReadQuery query; |
| if (!((CursoredStreamPolicy)this.policy).hasSizeQuery()) { |
| if (this.query.isCallQuery()) { |
| throw QueryException.additionalSizeQueryNotSpecified(this.query); |
| } |
| |
| if (!this.query.isExpressionQuery()) { |
| throw QueryException.sizeOnlySupportedOnExpressionQueries(this.query); |
| } |
| |
| // Construct the select statement |
| SQLSelectStatement selectStatement = new SQLSelectStatement(); |
| |
| // 2612538 - the default size of Map (32) is appropriate |
| Map clonedExpressions = new IdentityHashMap(); |
| selectStatement.setWhereClause(((ExpressionQueryMechanism)this.query.getQueryMechanism()).buildBaseSelectionCriteria(false, clonedExpressions)); |
| |
| ClassDescriptor descriptor = this.query.getDescriptor(); |
| // Case, normal read for branch inheritance class that reads subclasses all in its own table(s). |
| if (descriptor.hasInheritance() && (descriptor.getInheritancePolicy().getWithAllSubclassesExpression() != null)) { |
| Expression branchIndicator = descriptor.getInheritancePolicy().getWithAllSubclassesExpression(); |
| if ((branchIndicator != null) && (selectStatement.getWhereClause() != null)) { |
| selectStatement.setWhereClause(selectStatement.getWhereClause().and(branchIndicator)); |
| } else if (branchIndicator != null) { |
| selectStatement.setWhereClause((Expression)branchIndicator.clone()); |
| } |
| } |
| |
| selectStatement.setTables((Vector)descriptor.getTables().clone()); |
| |
| // Count * cannot be used with distinct. |
| // Count * cannot be used with distinct. |
| // CR 2900 if the original query used distinct only on one field then perform the count |
| // on that field. |
| if (((ReadAllQuery)this.query).shouldDistinctBeUsed() && (this.query.getCall().getFields().size() == 1)) { |
| selectStatement.addField(buildCountDistinctExpression(this.query.getCall().getFields(), ((ReadAllQuery)this.query).getExpressionBuilder())); |
| } else { |
| selectStatement.computeDistinct(); |
| if (selectStatement.shouldDistinctBeUsed() && (descriptor.getPrimaryKeyFields().size() == 1)) { |
| // Can only do this with a singleton primary keys. |
| selectStatement.addField(buildCountDistinctExpression(descriptor.getPrimaryKeyFields(), ((ReadAllQuery)this.query).getExpressionBuilder())); |
| } else { |
| selectStatement.addField(((ReadAllQuery)this.query).getExpressionBuilder().count()); |
| } |
| selectStatement.dontUseDistinct(); |
| } |
| selectStatement.normalize(getSession(), descriptor, clonedExpressions); |
| |
| // Construct the query |
| query = new ValueReadQuery(); |
| query.setSQLStatement(selectStatement); |
| } else { |
| query = ((CursoredStreamPolicy)this.policy).getSizeQuery(); |
| } |
| |
| Number value = (Number)getSession().executeQuery(query, this.query.getTranslationRow()); |
| if (value == null) { |
| throw QueryException.incorrectSizeQueryForCursorStream(this.query); |
| } |
| |
| return value.intValue(); |
| } |
| |
| /** |
| * INTERNAL: |
| * Return the threshold for the stream. |
| */ |
| protected int getInitialReadSize() { |
| return ((CursoredStreamPolicy)this.policy).getInitialReadSize(); |
| } |
| |
| /** |
| * INTERNAL: |
| * Return the marker used for mark() {@literal &} reset() operations. |
| */ |
| protected int getMarker() { |
| return marker; |
| } |
| |
| /** |
| * INTERNAL: |
| * Return the page size for the stream. |
| */ |
| @Override |
| public int getPageSize() { |
| return this.policy.getPageSize(); |
| } |
| |
| /** |
| * INTERNAL: |
| * Return the position of the stream inside the object collection |
| */ |
| @Override |
| public int getPosition() { |
| return position; |
| } |
| |
| /** |
| * PUBLIC: |
| * Return whether the cursored stream has any more elements. |
| */ |
| @Override |
| public boolean hasMoreElements() { |
| return !atEnd(); |
| } |
| |
| /** |
| * PUBLIC: |
| * Return whether the cursored stream has any more elements. |
| */ |
| @Override |
| public boolean hasNext() { |
| return !atEnd(); |
| } |
| |
| /** |
| * PUBLIC: |
| * Mark the present position in the stream. |
| * Subsequent calls to reset() will attempt to reposition the stream to this point. |
| * |
| * @param readAheadLimit Limit on the number of characters that may be |
| * read while still preserving the mark. Because |
| * the stream's input comes from the database, there |
| * is no actual limit, so this argument is ignored. |
| */ |
| public void mark(int readAheadLimit) { |
| this.marker = this.position; |
| } |
| |
| /** |
| * PUBLIC: |
| * Tests if this input stream supports the <code>mark</code> |
| * and <code>reset</code> methods. The <code>markSupported</code> |
| * method of <code>InputStream</code> returns <code>false</code>. |
| */ |
| public boolean markSupported() { |
| return true; |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the next object from the collection, if beyond the read limit read from the cursor. |
| * @return the next object in stream |
| */ |
| @Override |
| public Object nextElement() { |
| return read(); |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the next object from the collection, if beyond the read limit read from the cursor. |
| * @return the next object in stream |
| */ |
| @Override |
| public Object next() { |
| return read(); |
| } |
| |
| /** |
| * PUBLIC: |
| * Return a Vector of at most numberOfElements of the next objects from the collection. If there |
| * aren't that many objects left to read, just return what is available. |
| * @return the next objects in stream |
| */ |
| public Vector nextElements(int numberOfElements) { |
| Vector nextElements = new Vector(numberOfElements); |
| while (nextElements.size() < numberOfElements) { |
| if (atEnd()) { |
| return nextElements; |
| } |
| nextElements.add(nextElement()); |
| } |
| return nextElements; |
| } |
| |
| /** |
| * PUBLIC: |
| * Return a Vector of at most numberOfElements of the next objects from the collection. If there |
| * aren't that many objects left to read, just return what is available. |
| * @return the next objects in stream |
| */ |
| public List<Object> next(int numberOfElements) { |
| return nextElements(numberOfElements); |
| } |
| |
| /** |
| * PUBLIC: |
| * Return the next object in the stream, without incrementing the stream's position. |
| */ |
| public Object peek() throws DatabaseException { |
| Object object = read(); |
| this.position = this.position - 1; |
| return object; |
| } |
| |
| /** |
| * PUBLIC: |
| * This method differs slightly from conventional read() operation on a Java stream. This |
| * method return the next object in the collection rather than specifying the number of |
| * bytes to be read in. |
| * |
| * Return the next object from the collection, if beyond the read limit read from the cursor |
| * @return - next object in stream |
| * @throws DatabaseException if read pass end of stream |
| */ |
| public Object read() throws DatabaseException, QueryException { |
| // CR#2571. If no more objects in collection get next page. |
| if (this.objectCollection.size() == this.position) { |
| retrieveNextPage(); |
| } |
| if (atEnd()) { |
| throw QueryException.readBeyondStream(this.query); |
| } |
| Object object = this.objectCollection.get(this.position); |
| this.position = this.position + 1; |
| return object; |
| } |
| |
| /** |
| * PUBLIC: |
| * This method differs slightly from conventional read() operation on a Java stream. This |
| * method returns the next number of objects in the collection in a vector. |
| * |
| * Return the next object from the collection, if beyond the read limit read from the cursor |
| * @param number - number of objects to be returned |
| * @return - vector containing next number of objects |
| * @throws DatabaseException if read pass end of stream |
| */ |
| public List<Object> read(int number) throws DatabaseException { |
| List<Object> result = copy(this.position, this.position + number); |
| this.position = this.position + result.size(); |
| return result; |
| } |
| |
| /** |
| * PUBLIC: |
| * Release all objects read in so far. |
| * This should be performed when reading in a large collection of |
| * objects in order to preserve memory. |
| */ |
| @Override |
| public void clear() { |
| super.clear(); |
| if (this.position == 0) { |
| return; |
| } |
| this.objectCollection = Helper.copyVector(this.objectCollection, this.position, this.objectCollection.size()); |
| this.position = 0; |
| } |
| |
| /** |
| * PUBLIC: |
| * Release all objects read in so far. |
| * This should be performed when reading in a large collection of |
| * objects in order to preserve memory. |
| */ |
| public void releasePrevious() { |
| clear(); |
| } |
| |
| /** |
| * PUBLIC: |
| * Repositions this stream to the position at the time the |
| * mark method was last called on this stream. |
| */ |
| public void reset() { |
| this.position = this.marker; |
| } |
| |
| @Override |
| protected Object retrieveNextObject() throws DatabaseException { |
| while (true) { |
| AbstractRecord row = null; |
| if (this.nextRow == null) { |
| if (isClosed()) { |
| return null; |
| } |
| row = getAccessor().cursorRetrieveNextRow(this.fields, this.resultSet, this.executionSession); |
| } else { |
| row = this.nextRow; |
| this.nextRow = null; |
| } |
| if (row == null) { |
| close(); |
| return null; |
| } |
| // If using 1-m joining need to fetch 1-m rows as well. |
| if (this.query.isObjectLevelReadQuery() && ((ObjectLevelReadQuery)this.query).hasJoining()) { |
| if (!isClosed()) { |
| JoinedAttributeManager joinManager = ((ObjectLevelReadQuery)this.query).getJoinedAttributeManager(); |
| if (joinManager.isToManyJoin()) { |
| this.nextRow = joinManager.processDataResults(row, this, true); |
| if (this.nextRow == null) { |
| close(); |
| } |
| } |
| } |
| } |
| Object object = buildAndRegisterObject(row); |
| if (object == InvalidObject.instance) { |
| continue; |
| } |
| if (object != null) { |
| this.objectCollection.add(object); |
| } |
| return object; |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Retrieve and add the next page size of rows to the vector. |
| * Return the last object, or null if at end. |
| */ |
| protected Object retrieveNextPage() throws DatabaseException { |
| Object last = null; |
| int pageSize = getPageSize(); |
| for (int index = 0; index < pageSize; index++) { |
| last = retrieveNextObject(); |
| if (last == null) { |
| return null; |
| } |
| } |
| return last; |
| } |
| |
| /** |
| * INTERNAL: |
| * Initialize the stream size and position |
| */ |
| protected void setLimits() { |
| this.position = 0; |
| this.marker = 0; |
| int readSize = getInitialReadSize(); |
| for (int index = 0; index < readSize; index++) { |
| retrieveNextObject(); |
| } |
| } |
| |
| /** |
| * INTERNAL: |
| * Set the marker used for mark() {@literal &} reset() operations |
| */ |
| protected void setMarker(int value) { |
| marker = value; |
| } |
| } |