Skip to content

DATAMONGO-1165 - Add support for Streaming large result lists. #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand All @@ -29,7 +29,7 @@
<properties>
<project.type>multi</project.type>
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>1.10.0.BUILD-SNAPSHOT</springdata.commons>
<springdata.commons>1.10.0.DATACMNS-650-SNAPSHOT</springdata.commons>
<mongo>2.13.0</mongo>
<mongo.osgi>2.13.0</mongo.osgi>
</properties>
Expand Down
4 changes: 2 additions & 2 deletions spring-data-mongodb-cross-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-log4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
import java.util.Set;

import org.springframework.data.geo.GeoResults;
import org.springframework.data.mapping.context.MappingContext;
import org.springframework.data.mongodb.core.aggregation.Aggregation;
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
import org.springframework.data.mongodb.core.mapreduce.GroupBy;
import org.springframework.data.mongodb.core.mapreduce.GroupByResults;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
Expand Down Expand Up @@ -959,4 +963,19 @@ <T> T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl
* @return
*/
MongoConverter getConverter();

/**
* Returns the underlying {@link QueryMapper}.
*
* @return
*/
QueryMapper getQueryMapper();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way we can avoid leaking MongoTemplate internals here?


/**
* Returns the underlying {@link MappingContext}.
*
* @return
* @since 1.7
*/
MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> getMappingContext();
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2014 the original author or authors.
* Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -312,6 +312,22 @@ public MongoConverter getConverter() {
return this.mongoConverter;
}

/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#getQueryMapper()
*/
@Override
public QueryMapper getQueryMapper() {
return queryMapper;
}

/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#getMappingContext()
*/
@Override
public MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> getMappingContext() {
return mappingContext;
}

public String getCollectionName(Class<?> entityClass) {
return this.determineCollectionName(entityClass);
}
Expand Down Expand Up @@ -2261,5 +2277,4 @@ public GeoResult<T> doWith(DBObject object) {
return new GeoResult<T>(doWith, new Distance(distance, metric));
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2014 the original author or authors.
* Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,11 @@

import java.util.Collections;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;

import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.dao.DataAccessException;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
Expand All @@ -29,14 +31,24 @@
import org.springframework.data.geo.GeoResult;
import org.springframework.data.geo.GeoResults;
import org.springframework.data.geo.Point;
import org.springframework.data.mongodb.core.CollectionCallback;
import org.springframework.data.mongodb.core.MongoOperations;
import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.util.CloseableIterator;
import org.springframework.data.util.CloseableIteratorDisposingRunnable;
import org.springframework.data.util.TypeInformation;
import org.springframework.util.Assert;

import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.MongoException;
import com.mongodb.WriteResult;

/**
Expand All @@ -48,8 +60,6 @@
*/
public abstract class AbstractMongoQuery implements RepositoryQuery {

private static final ConversionService CONVERSION_SERVICE = new DefaultConversionService();

private final MongoQueryMethod method;
private final MongoOperations operations;

Expand Down Expand Up @@ -87,7 +97,11 @@ public Object execute(Object[] parameters) {

applyQueryMetaAttributesWhenPresent(query);

if (isDeleteQuery()) {
if (method.isCloseableIteratorQuery()) {
return new CursorBackedExecution().execute(query);
} else if (method.isStreamQuery()) {
return new StreamExecution().execute(query);
} else if (isDeleteQuery()) {
return new DeleteExecution().execute(query);
} else if (method.isGeoNearQuery() && method.isPageQuery()) {

Expand Down Expand Up @@ -415,4 +429,55 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata<?> metada
return writeResult != null ? writeResult.getN() : 0L;
}
}

/**
* @author Thomas Darimont
*/
private class CursorBackedExecution extends Execution {

@Override
Object execute(final Query query) {

Class<?> javaType = getQueryMethod().getEntityInformation().getJavaType();
QueryMapper queryMapper = operations.getQueryMapper();

return createCursorAndWrapInCloseableIterator(query, javaType, queryMapper);
}

@SuppressWarnings("unchecked")
private CloseableIterator<Object> createCursorAndWrapInCloseableIterator(final Query query,
final Class<?> entityType, final QueryMapper queryMapper) {

final MongoPersistentEntity<?> persistentEntity = operations.getMappingContext().getPersistentEntity(entityType);

return (CloseableIterator<Object>) operations.execute(entityType, new CollectionCallback<Object>() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure the current setup here works as we expect it. Returning the adapter will basically just port the DBCursor to the client in adapter shape. Exceptions thrown while iterating over the result and mapping it still wouldn't be subject to the exception translation implemented in MongoTemplate.


@Override
public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException {

DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity);
DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity);

DBCursor cursor = collection.find(mappedQuery, mappedFields);

return new CloseableIterableCusorAdapter<Object>(cursor, operations.getConverter(), entityType);
}
});
}
}

/**
* @author Thomas Darimont
*/
final class StreamExecution extends CursorBackedExecution {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's possible to move that into Spring Data Commons and rather turn it into a post-processing step in case the execution returns a CloseableIterable but effectively needs to be a Stream (similarly to what we already do with Optionals. I think this would allow us to automatically support Stream as return type on all stores that implement the execution into a ClosableIterable.


@Override
Object execute(Query query) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think part of this code could be moved to SD Commons.


CloseableIterator<Object> result = (CloseableIterator<Object>) super.execute(query);
Spliterator<Object> spliterator = Spliterators.spliteratorUnknownSize(result, Spliterator.NONNULL);

return StreamSupport.stream(spliterator, false).onClose(new CloseableIteratorDisposingRunnable(result));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2014 the original author or authors.
* Copyright 2011-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.mongodb.util;

import java.util.Iterator;

import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.util.CloseableIterator;

import com.mongodb.Cursor;
import com.mongodb.DBObject;

/**
* @author Thomas Darimont
*/
public class CloseableIterableCusorAdapter<T> implements CloseableIterator<T> {

private volatile Cursor cursor;
private MongoConverter converter;
private Class<?> entityClass;

public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, Class<?> entityClass) {

this.cursor = cursor;
this.converter = converter;
this.entityClass = entityClass;
}

@Override
public boolean hasNext() {

if (cursor == null) {
return false;
}

try {
return cursor.hasNext();
} catch (Exception ex) {
throw new RuntimeException("error on hasNext");
}
}

@Override
public T next() {

if (cursor == null) {
return null;
}

try {

DBObject item = cursor.next();
Object converted = converter.read(entityClass, item);

return (T) converted;
} catch (Exception ex) {
throw new RuntimeException("error on next");
}
}

@Override
public void close() {

Cursor c = cursor;
try {
c.close();
} finally {

cursor = null;
converter = null;
entityClass = null;
}
}

@Override
public Iterator<T> iterator() {
return this;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2014 the original author or authors.
* Copyright 2011-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,7 +65,7 @@ public abstract class AbstractPersonRepositoryIntegrationTests {

@Autowired MongoOperations operations;

Person dave, oliver, carter, boyd, stefan, leroi, alicia;
protected Person dave, oliver, carter, boyd, stefan, leroi, alicia;
QPerson person;

List<Person> all;
Expand Down
Loading