Skip to content

DATAMONGO-1165 - Add support for Streaming large result lists. #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand All @@ -29,7 +29,7 @@
<properties>
<project.type>multi</project.type>
<dist.id>spring-data-mongodb</dist.id>
<springdata.commons>1.10.0.BUILD-SNAPSHOT</springdata.commons>
<springdata.commons>1.10.0.DATACMNS-650-SNAPSHOT</springdata.commons>
<mongo>2.13.0</mongo>
<mongo.osgi>2.13.0</mongo.osgi>
</properties>
Expand Down
4 changes: 2 additions & 2 deletions spring-data-mongodb-cross-store/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down Expand Up @@ -48,7 +48,7 @@
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
</dependency>

<dependency>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-log4j/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>1.7.0.BUILD-SNAPSHOT</version>
<version>1.7.0.DATAMONGO-1165-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.util.CloseableIterator;

import com.mongodb.CommandResult;
import com.mongodb.Cursor;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteResult;
Expand Down Expand Up @@ -146,6 +148,20 @@ public interface MongoOperations {
*/
<T> T executeInSession(DbCallback<T> action);

/**
* Executes the given {@link Query} on the entity collection of the specified {@code entityType} backed by a Mongo DB
* {@link Cursor}.
* <p>
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed.
*
* @param <T> element return type
* @param query
* @param entityType
* @return
* @since 1.7
*/
<T> CloseableIterator<T> executeAsStream(Query query, Class<T> entityType);

/**
* Create an uncapped collection with a name based on the provided entity class.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2014 the original author or authors.
* Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -95,6 +95,7 @@
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
import org.springframework.data.util.CloseableIterator;
import org.springframework.jca.cci.core.ConnectionCallback;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
Expand All @@ -103,6 +104,7 @@

import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.Cursor;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
Expand Down Expand Up @@ -312,6 +314,31 @@ public MongoConverter getConverter() {
return this.mongoConverter;
}

/* (non-Javadoc)
* @see org.springframework.data.mongodb.core.MongoOperations#executeAsStream(org.springframework.data.mongodb.core.query.Query, java.lang.Class)
*/
@Override
public <T> CloseableIterator<T> executeAsStream(final Query query, final Class<T> entityType) {

return execute(entityType, new CollectionCallback<CloseableIterator<T>>() {

@Override
public CloseableIterator<T> doInCollection(DBCollection collection) throws MongoException, DataAccessException {

MongoPersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(entityType);

DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity);
DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity);

DBCursor cursor = collection.find(mappedQuery, mappedFields);
ReadDbObjectCallback<T> readCallback = new ReadDbObjectCallback<T>(mongoConverter, entityType);

return new CloseableIterableCusorAdapter<T>(cursor, exceptionTranslator, readCallback);
}
});

}

public String getCollectionName(Class<?> entityClass) {
return this.determineCollectionName(entityClass);
}
Expand Down Expand Up @@ -2097,9 +2124,10 @@ public DBObject doInCollection(DBCollection collection) throws MongoException, D
* Simple internal callback to allow operations on a {@link DBObject}.
*
* @author Oliver Gierke
* @author Thomas Darimont
*/

private interface DbObjectCallback<T> {
static interface DbObjectCallback<T> {

T doWith(DBObject object);
}
Expand Down Expand Up @@ -2262,4 +2290,76 @@ public GeoResult<T> doWith(DBObject object) {
}
}

/**
* A {@link CloseableIterator} that is backed by a MongoDB {@link Cursor}.
*
* @since 1.7
* @author Thomas Darimont
*/
static class CloseableIterableCusorAdapter<T> implements CloseableIterator<T> {

private volatile Cursor cursor;
private PersistenceExceptionTranslator exceptionTranslator;
private DbObjectCallback<T> objectReadCallback;

/**
* Creates a new {@link CloseableIterableCusorAdapter} backed by the given {@link Cursor}.
*
* @param cursor
* @param exceptionTranslator
* @param objectReadCallback
*/
public CloseableIterableCusorAdapter(Cursor cursor, PersistenceExceptionTranslator exceptionTranslator,
DbObjectCallback<T> objectReadCallback) {

this.cursor = cursor;
this.exceptionTranslator = exceptionTranslator;
this.objectReadCallback = objectReadCallback;
}

@Override
public boolean hasNext() {

if (cursor == null) {
return false;
}

try {
return cursor.hasNext();
} catch (RuntimeException ex) {
throw exceptionTranslator.translateExceptionIfPossible(ex);
}
}

@Override
public T next() {

if (cursor == null) {
return null;
}

try {
DBObject item = cursor.next();
T converted = objectReadCallback.doWith(item);
return converted;
} catch (RuntimeException ex) {
throw exceptionTranslator.translateExceptionIfPossible(ex);
}
}

@Override
public void close() {

Cursor c = cursor;
try {
c.close();
} catch (RuntimeException ex) {
throw exceptionTranslator.translateExceptionIfPossible(ex);
} finally {
cursor = null;
exceptionTranslator = null;
objectReadCallback = null;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2014 the original author or authors.
* Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,6 @@
import java.util.Collections;
import java.util.List;

import org.springframework.core.convert.ConversionService;
import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
Expand All @@ -34,6 +32,7 @@
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.RepositoryQuery;
import org.springframework.data.util.Java8StreamUtils;
import org.springframework.data.util.TypeInformation;
import org.springframework.util.Assert;

Expand All @@ -48,8 +47,6 @@
*/
public abstract class AbstractMongoQuery implements RepositoryQuery {

private static final ConversionService CONVERSION_SERVICE = new DefaultConversionService();

private final MongoQueryMethod method;
private final MongoOperations operations;

Expand Down Expand Up @@ -87,7 +84,9 @@ public Object execute(Object[] parameters) {

applyQueryMetaAttributesWhenPresent(query);

if (isDeleteQuery()) {
if (method.isStreamQuery()) {
return new StreamExecution().execute(query);
} else if (isDeleteQuery()) {
return new DeleteExecution().execute(query);
} else if (method.isGeoNearQuery() && method.isPageQuery()) {

Expand Down Expand Up @@ -415,4 +414,22 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata<?> metada
return writeResult != null ? writeResult.getN() : 0L;
}
}

/**
* @author Thomas Darimont
* @since 1.7
*/
final class StreamExecution extends Execution {

/* (non-Javadoc)
* @see org.springframework.data.mongodb.repository.query.AbstractMongoQuery.Execution#execute(org.springframework.data.mongodb.core.query.Query)
*/
@Override
Object execute(Query query) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think part of this code could be moved to SD Commons.


Class<?> entityType = getQueryMethod().getEntityInformation().getJavaType();

return Java8StreamUtils.createStreamFromIterator(operations.executeAsStream(query, entityType));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2014 the original author or authors.
* Copyright 2011-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2011-2014 the original author or authors.
* Copyright 2011-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -65,7 +65,7 @@ public abstract class AbstractPersonRepositoryIntegrationTests {

@Autowired MongoOperations operations;

Person dave, oliver, carter, boyd, stefan, leroi, alicia;
protected Person dave, oliver, carter, boyd, stefan, leroi, alicia;
QPerson person;

List<Person> all;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2014 the original author or authors.
* Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.stream.Stream;

import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand Down Expand Up @@ -320,4 +321,10 @@ public interface PersonRepository extends MongoRepository<Person, String>, Query

@Query("{ ?0 : ?1 }")
List<Person> findByKeyValue(String key, String value);

/**
* @see DATAMONGO-1165
*/
@Query("{firstname:{$in:?0}}")
Stream<Person> findByCustomQueryWithStreamingCursorByFirstnames(List<String> firstnames);
}
Loading