Skip to content

Commit 2144209

Browse files
author
Thomas Darimont
committed
DATAMONGO-1165 - Add support for Streaming large result lists.
Initial support for CloteableIterable and Java 8 Stream as return type of custom Repository finder methods.
1 parent bcf8244 commit 2144209

File tree

11 files changed

+115
-82
lines changed

11 files changed

+115
-82
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,14 @@
2020
import java.util.Set;
2121

2222
import org.springframework.data.geo.GeoResults;
23+
import org.springframework.data.mapping.context.MappingContext;
2324
import org.springframework.data.mongodb.core.aggregation.Aggregation;
2425
import org.springframework.data.mongodb.core.aggregation.AggregationResults;
2526
import org.springframework.data.mongodb.core.aggregation.TypedAggregation;
2627
import org.springframework.data.mongodb.core.convert.MongoConverter;
2728
import org.springframework.data.mongodb.core.convert.QueryMapper;
29+
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
30+
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
2831
import org.springframework.data.mongodb.core.mapreduce.GroupBy;
2932
import org.springframework.data.mongodb.core.mapreduce.GroupByResults;
3033
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
@@ -967,4 +970,12 @@ <T> T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl
967970
* @return
968971
*/
969972
QueryMapper getQueryMapper();
973+
974+
/**
975+
* Returns the underlying {@link MappingContext}.
976+
*
977+
* @return
978+
* @since 1.7
979+
*/
980+
MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> getMappingContext();
970981
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,14 @@ public QueryMapper getQueryMapper() {
320320
return queryMapper;
321321
}
322322

323+
/* (non-Javadoc)
324+
* @see org.springframework.data.mongodb.core.MongoOperations#getMappingContext()
325+
*/
326+
@Override
327+
public MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> getMappingContext() {
328+
return mappingContext;
329+
}
330+
323331
public String getCollectionName(Class<?> entityClass) {
324332
return this.determineCollectionName(entityClass);
325333
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java

Lines changed: 3 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2015 the original author or authors.
2+
* Copyright 2011-2014 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -81,23 +81,6 @@ public QueryMapper(MongoConverter converter) {
8181
this.mappingContext = converter.getMappingContext();
8282
}
8383

84-
/**
85-
* Replaces the property keys used in the given {@link DBObject} with the appropriate keys by using the
86-
* {@link PersistentEntity} metadata linked with the given {@code entityClass}.
87-
*
88-
* @see #getMappedObject(DBObject, MongoPersistentEntity)
89-
* @param query must not be {@literal null}.
90-
* @param entityClass must not be {@literal null}.
91-
* @return
92-
* @since 1.7
93-
*/
94-
public DBObject getMappedObject(DBObject query, Class<?> entityClass) {
95-
96-
Assert.notNull(entityClass, "Entity class must not be null!");
97-
98-
return getMappedObject(query, mappingContext.getPersistentEntity(entityClass));
99-
}
100-
10184
/**
10285
* Replaces the property keys used in the given {@link DBObject} with the appropriate keys by using the
10386
* {@link PersistentEntity} metadata.
@@ -161,26 +144,9 @@ public DBObject getMappedSort(DBObject sortObject, MongoPersistentEntity<?> enti
161144
return mappedSort;
162145
}
163146

164-
/**
165-
* Maps fields to retrieve to the {@link MongoPersistentEntity}s properties linked to the given {@code entityClass}.
166-
* <p>
167-
*
168-
* @see QueryMapper#getMappedFields(DBObject, MongoPersistentEntity)
169-
* @param fieldsObject
170-
* @param entityClass must not be {@literal null}
171-
* @return
172-
* @since 1.7
173-
*/
174-
public DBObject getMappedFields(DBObject fieldsObject, Class<?> entityClass) {
175-
176-
Assert.notNull(entityClass, "Entity class must not be null!");
177-
178-
return getMappedFields(fieldsObject, mappingContext.getPersistentEntity(entityClass));
179-
}
180-
181147
/**
182148
* Maps fields to retrieve to the {@link MongoPersistentEntity}s properties. <br />
183-
* Also converts and potentially adds missing property {@code $meta} representation.
149+
* Also onverts and potentially adds missing property {@code $meta} representation.
184150
*
185151
* @param fieldsObject
186152
* @param entity
@@ -317,7 +283,7 @@ protected Object getMappedValue(Field documentField, Object value) {
317283
} else if (valueDbo.containsField("$ne")) {
318284
resultDbo.put("$ne", convertId(valueDbo.get("$ne")));
319285
} else {
320-
return getMappedObject(resultDbo, (MongoPersistentEntity<?>) null);
286+
return getMappedObject(resultDbo, null);
321287
}
322288

323289
return resultDbo;

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.springframework.data.mongodb.MongoDbFactory;
2727
import org.springframework.data.mongodb.core.convert.MongoConverter;
2828
import org.springframework.data.mongodb.core.convert.QueryMapper;
29-
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
3029
import org.springframework.data.mongodb.core.query.Query;
3130
import org.springframework.util.Assert;
3231
import org.springframework.util.StringUtils;
@@ -258,7 +257,7 @@ private DBObject getMappedQuery(Query query) {
258257
}
259258

260259
private DBObject getMappedQuery(DBObject query) {
261-
return query == null ? null : queryMapper.getMappedObject(query, (MongoPersistentEntity<?>) null);
260+
return query == null ? null : queryMapper.getMappedObject(query, null);
262261
}
263262

264263
private GridFS getGridFs() {

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
import java.util.Collections;
1919
import java.util.List;
20+
import java.util.Spliterator;
21+
import java.util.Spliterators;
22+
import java.util.stream.StreamSupport;
2023

2124
import org.springframework.dao.DataAccessException;
2225
import org.springframework.data.domain.PageImpl;
@@ -31,16 +34,19 @@
3134
import org.springframework.data.mongodb.core.CollectionCallback;
3235
import org.springframework.data.mongodb.core.MongoOperations;
3336
import org.springframework.data.mongodb.core.convert.QueryMapper;
37+
import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity;
3438
import org.springframework.data.mongodb.core.query.NearQuery;
3539
import org.springframework.data.mongodb.core.query.Query;
3640
import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter;
3741
import org.springframework.data.repository.query.ParameterAccessor;
3842
import org.springframework.data.repository.query.RepositoryQuery;
3943
import org.springframework.data.util.CloseableIterator;
44+
import org.springframework.data.util.CloseableIteratorDisposingRunnable;
4045
import org.springframework.data.util.TypeInformation;
4146
import org.springframework.util.Assert;
4247

4348
import com.mongodb.DBCollection;
49+
import com.mongodb.DBCursor;
4450
import com.mongodb.DBObject;
4551
import com.mongodb.MongoException;
4652
import com.mongodb.WriteResult;
@@ -91,11 +97,11 @@ public Object execute(Object[] parameters) {
9197

9298
applyQueryMetaAttributesWhenPresent(query);
9399

94-
if (method.isStreamQuery()) {
100+
if (method.isCloseableIteratorQuery()) {
101+
return new CursorBackedExecution().execute(query);
102+
} else if (method.isStreamQuery()) {
95103
return new StreamExecution().execute(query);
96-
}
97-
98-
if (isDeleteQuery()) {
104+
} else if (isDeleteQuery()) {
99105
return new DeleteExecution().execute(query);
100106
} else if (method.isGeoNearQuery() && method.isPageQuery()) {
101107

@@ -424,30 +430,54 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata<?> metada
424430
}
425431
}
426432

427-
final class StreamExecution extends Execution {
433+
/**
434+
* @author Thomas Darimont
435+
*/
436+
private class CursorBackedExecution extends Execution {
428437

429438
@Override
430439
Object execute(final Query query) {
431440

432-
final Class<?> entityType = getQueryMethod().getEntityInformation().getJavaType();
433-
final QueryMapper queryMapper = operations.getQueryMapper();
441+
Class<?> javaType = getQueryMethod().getEntityInformation().getJavaType();
442+
QueryMapper queryMapper = operations.getQueryMapper();
443+
444+
return createCursorAndWrapInCloseableIterator(query, javaType, queryMapper);
445+
}
446+
447+
@SuppressWarnings("unchecked")
448+
private CloseableIterator<Object> createCursorAndWrapInCloseableIterator(final Query query,
449+
final Class<?> entityType, final QueryMapper queryMapper) {
450+
451+
final MongoPersistentEntity<?> persistentEntity = operations.getMappingContext().getPersistentEntity(entityType);
452+
453+
return (CloseableIterator<Object>) operations.execute(entityType, new CollectionCallback<Object>() {
434454

435-
@SuppressWarnings("unchecked")
436-
CloseableIterator<Object> result = (CloseableIterator<Object>) operations.execute(entityType,
437-
new CollectionCallback<Object>() {
455+
@Override
456+
public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException {
438457

439-
@Override
440-
public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException {
458+
DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity);
459+
DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity);
441460

442-
DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), entityType);
443-
DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entityType);
461+
DBCursor cursor = collection.find(mappedQuery, mappedFields);
462+
463+
return new CloseableIterableCusorAdapter<Object>(cursor, operations.getConverter(), entityType);
464+
}
465+
});
466+
}
467+
}
468+
469+
/**
470+
* @author Thomas Darimont
471+
*/
472+
final class StreamExecution extends CursorBackedExecution {
473+
474+
@Override
475+
Object execute(Query query) {
444476

445-
return new CloseableIterableCusorAdapter<Object>(collection.find(mappedQuery, mappedFields), operations
446-
.getConverter(), entityType);
447-
}
448-
});
477+
CloseableIterator<Object> result = (CloseableIterator<Object>) super.execute(query);
478+
Spliterator<Object> spliterator = Spliterators.spliteratorUnknownSize(result, Spliterator.NONNULL);
449479

450-
return result;
480+
return StreamSupport.stream(spliterator, false).onClose(new CloseableIteratorDisposingRunnable(result));
451481
}
452482
}
453483
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.springframework.data.repository.core.RepositoryMetadata;
3333
import org.springframework.data.repository.query.QueryMethod;
3434
import org.springframework.data.util.ClassTypeInformation;
35-
import org.springframework.data.util.CloseableIterator;
3635
import org.springframework.data.util.TypeInformation;
3736
import org.springframework.util.Assert;
3837
import org.springframework.util.StringUtils;
@@ -234,10 +233,4 @@ public org.springframework.data.mongodb.core.query.Meta getQueryMetaAttributes()
234233

235234
return metaAttributes;
236235
}
237-
238-
public boolean isStreamQuery() {
239-
240-
Class<?> returnType = method.getReturnType();
241-
return org.springframework.util.ClassUtils.isAssignable(CloseableIterator.class, returnType);
242-
}
243236
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ protected String getKeyForPath(Path<?> expr, PathMetadata<?> metadata) {
100100
protected DBObject asDBObject(String key, Object value) {
101101

102102
if (ID_KEY.equals(key)) {
103-
return mapper.getMappedObject(super.asDBObject(key, value), (MongoPersistentEntity<?>) null);
103+
return mapper.getMappedObject(super.asDBObject(key, value), null);
104104
}
105105

106106
return super.asDBObject(key, value instanceof Pattern ? value : converter.convertToMongoType(value));

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@
1515
*/
1616
package org.springframework.data.mongodb.util;
1717

18-
import java.io.IOException;
1918
import java.util.Iterator;
2019

2120
import org.springframework.data.mongodb.core.convert.MongoConverter;
21+
import org.springframework.data.util.CloseableIterator;
2222

2323
import com.mongodb.Cursor;
2424
import com.mongodb.DBObject;
@@ -72,7 +72,7 @@ public T next() {
7272
}
7373

7474
@Override
75-
public void close() throws IOException {
75+
public void close() {
7676

7777
Cursor c = cursor;
7878
try {

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2014 the original author or authors.
2+
* Copyright 2011-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,7 +65,7 @@ public abstract class AbstractPersonRepositoryIntegrationTests {
6565

6666
@Autowired MongoOperations operations;
6767

68-
Person dave, oliver, carter, boyd, stefan, leroi, alicia;
68+
protected Person dave, oliver, carter, boyd, stefan, leroi, alicia;
6969
QPerson person;
7070

7171
List<Person> all;

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.util.Collection;
1919
import java.util.Date;
2020
import java.util.List;
21+
import java.util.stream.Stream;
2122

2223
import org.springframework.data.domain.Page;
2324
import org.springframework.data.domain.Pageable;
@@ -31,8 +32,8 @@
3132
import org.springframework.data.geo.Point;
3233
import org.springframework.data.geo.Polygon;
3334
import org.springframework.data.mongodb.repository.Person.Sex;
34-
import org.springframework.data.mongodb.util.AutoCloseableIterator;
3535
import org.springframework.data.querydsl.QueryDslPredicateExecutor;
36+
import org.springframework.data.util.CloseableIterator;
3637

3738
/**
3839
* Sample repository managing {@link Person} entities.
@@ -323,5 +324,8 @@ public interface PersonRepository extends MongoRepository<Person, String>, Query
323324
List<Person> findByKeyValue(String key, String value);
324325

325326
@Query("{firstname:{$in:?0}}")
326-
AutoCloseableIterator<Person> findByCustomQueryWithCursorByFirstnames(List<String> firstnames);
327+
CloseableIterator<Person> findByCustomQueryWithCursorByFirstnames(List<String> firstnames);
328+
329+
@Query("{firstname:{$in:?0}}")
330+
Stream<Person> findByCustomQueryWithStreamingCursorByFirstnames(List<String> firstnames);
327331
}

0 commit comments

Comments
 (0)