-
Notifications
You must be signed in to change notification settings - Fork 1.1k
DATAMONGO-1165 - Add support for Streaming large result lists. #274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
3008bdd
a7533ea
bcf8244
2144209
6c00b1c
12c2ddf
2dee07c
b5abeb7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
/* | ||
* Copyright 2010-2014 the original author or authors. | ||
* Copyright 2010-2015 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
|
@@ -17,9 +17,11 @@ | |
|
||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Spliterator; | ||
import java.util.Spliterators; | ||
import java.util.stream.StreamSupport; | ||
|
||
import org.springframework.core.convert.ConversionService; | ||
import org.springframework.core.convert.support.DefaultConversionService; | ||
import org.springframework.dao.DataAccessException; | ||
import org.springframework.data.domain.PageImpl; | ||
import org.springframework.data.domain.Pageable; | ||
import org.springframework.data.domain.Slice; | ||
|
@@ -29,14 +31,24 @@ | |
import org.springframework.data.geo.GeoResult; | ||
import org.springframework.data.geo.GeoResults; | ||
import org.springframework.data.geo.Point; | ||
import org.springframework.data.mongodb.core.CollectionCallback; | ||
import org.springframework.data.mongodb.core.MongoOperations; | ||
import org.springframework.data.mongodb.core.convert.QueryMapper; | ||
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; | ||
import org.springframework.data.mongodb.core.query.NearQuery; | ||
import org.springframework.data.mongodb.core.query.Query; | ||
import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter; | ||
import org.springframework.data.repository.query.ParameterAccessor; | ||
import org.springframework.data.repository.query.RepositoryQuery; | ||
import org.springframework.data.util.CloseableIterator; | ||
import org.springframework.data.util.CloseableIteratorDisposingRunnable; | ||
import org.springframework.data.util.TypeInformation; | ||
import org.springframework.util.Assert; | ||
|
||
import com.mongodb.DBCollection; | ||
import com.mongodb.DBCursor; | ||
import com.mongodb.DBObject; | ||
import com.mongodb.MongoException; | ||
import com.mongodb.WriteResult; | ||
|
||
/** | ||
|
@@ -48,8 +60,6 @@ | |
*/ | ||
public abstract class AbstractMongoQuery implements RepositoryQuery { | ||
|
||
private static final ConversionService CONVERSION_SERVICE = new DefaultConversionService(); | ||
|
||
private final MongoQueryMethod method; | ||
private final MongoOperations operations; | ||
|
||
|
@@ -87,7 +97,11 @@ public Object execute(Object[] parameters) { | |
|
||
applyQueryMetaAttributesWhenPresent(query); | ||
|
||
if (isDeleteQuery()) { | ||
if (method.isCloseableIteratorQuery()) { | ||
return new CursorBackedExecution().execute(query); | ||
} else if (method.isStreamQuery()) { | ||
return new StreamExecution().execute(query); | ||
} else if (isDeleteQuery()) { | ||
return new DeleteExecution().execute(query); | ||
} else if (method.isGeoNearQuery() && method.isPageQuery()) { | ||
|
||
|
@@ -415,4 +429,55 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata<?> metada | |
return writeResult != null ? writeResult.getN() : 0L; | ||
} | ||
} | ||
|
||
/** | ||
* @author Thomas Darimont | ||
*/ | ||
private class CursorBackedExecution extends Execution { | ||
|
||
@Override | ||
Object execute(final Query query) { | ||
|
||
Class<?> javaType = getQueryMethod().getEntityInformation().getJavaType(); | ||
QueryMapper queryMapper = operations.getQueryMapper(); | ||
|
||
return createCursorAndWrapInCloseableIterator(query, javaType, queryMapper); | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private CloseableIterator<Object> createCursorAndWrapInCloseableIterator(final Query query, | ||
final Class<?> entityType, final QueryMapper queryMapper) { | ||
|
||
final MongoPersistentEntity<?> persistentEntity = operations.getMappingContext().getPersistentEntity(entityType); | ||
|
||
return (CloseableIterator<Object>) operations.execute(entityType, new CollectionCallback<Object>() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure the current setup here works as we expect it. Returning the adapter will basically just port the |
||
|
||
@Override | ||
public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException { | ||
|
||
DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity); | ||
DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity); | ||
|
||
DBCursor cursor = collection.find(mappedQuery, mappedFields); | ||
|
||
return new CloseableIterableCusorAdapter<Object>(cursor, operations.getConverter(), entityType); | ||
} | ||
}); | ||
} | ||
} | ||
|
||
/** | ||
* @author Thomas Darimont | ||
*/ | ||
final class StreamExecution extends CursorBackedExecution { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you think it's possible to move that into Spring Data Commons and rather turn it into a post-processing step in case the execution returns a |
||
|
||
@Override | ||
Object execute(Query query) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think part of this code could be moved to SD Commons. |
||
|
||
CloseableIterator<Object> result = (CloseableIterator<Object>) super.execute(query); | ||
Spliterator<Object> spliterator = Spliterators.spliteratorUnknownSize(result, Spliterator.NONNULL); | ||
|
||
return StreamSupport.stream(spliterator, false).onClose(new CloseableIteratorDisposingRunnable(result)); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
/* | ||
* Copyright 2015 the original author or authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.springframework.data.mongodb.util; | ||
|
||
import java.util.Iterator; | ||
|
||
import org.springframework.data.mongodb.core.convert.MongoConverter; | ||
import org.springframework.data.util.CloseableIterator; | ||
|
||
import com.mongodb.Cursor; | ||
import com.mongodb.DBObject; | ||
|
||
/** | ||
* @author Thomas Darimont | ||
*/ | ||
public class CloseableIterableCusorAdapter<T> implements CloseableIterator<T> { | ||
|
||
private volatile Cursor cursor; | ||
private MongoConverter converter; | ||
private Class<?> entityClass; | ||
|
||
public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, Class<?> entityClass) { | ||
|
||
this.cursor = cursor; | ||
this.converter = converter; | ||
this.entityClass = entityClass; | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
|
||
if (cursor == null) { | ||
return false; | ||
} | ||
|
||
try { | ||
return cursor.hasNext(); | ||
} catch (Exception ex) { | ||
throw new RuntimeException("error on hasNext"); | ||
} | ||
} | ||
|
||
@Override | ||
public T next() { | ||
|
||
if (cursor == null) { | ||
return null; | ||
} | ||
|
||
try { | ||
|
||
DBObject item = cursor.next(); | ||
Object converted = converter.read(entityClass, item); | ||
|
||
return (T) converted; | ||
} catch (Exception ex) { | ||
throw new RuntimeException("error on next"); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
|
||
Cursor c = cursor; | ||
try { | ||
c.close(); | ||
} finally { | ||
|
||
cursor = null; | ||
converter = null; | ||
entityClass = null; | ||
} | ||
} | ||
|
||
@Override | ||
public Iterator<T> iterator() { | ||
return this; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any way we can avoid leaking
MongoTemplate
internals here?