Skip to content

Commit c9e497e

Browse files
author
Thomas Darimont
committed
DATAMONGO-1165 - Add support for Streaming large result lists.
Initial PoC for a MongoDB Cursor backed Iterator that allows fetching of large result sets.
1 parent 9270727 commit c9e497e

File tree

9 files changed

+255
-10
lines changed

9 files changed

+255
-10
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.springframework.data.mongodb.core.query.NearQuery;
3434
import org.springframework.data.mongodb.core.query.Query;
3535
import org.springframework.data.mongodb.core.query.Update;
36+
import org.springframework.data.mongodb.util.CloseableIterator;
3637

3738
import com.mongodb.CommandResult;
3839
import com.mongodb.DBCollection;
@@ -959,4 +960,6 @@ <T> T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl
959960
* @return
960961
*/
961962
MongoConverter getConverter();
963+
964+
CloseableIterator<Object> getStreamingMappingCursor(Query query, Class<?> entityType);
962965
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 80 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 the original author or authors.
2+
* Copyright 2010-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -95,6 +95,7 @@
9595
import org.springframework.data.mongodb.core.query.NearQuery;
9696
import org.springframework.data.mongodb.core.query.Query;
9797
import org.springframework.data.mongodb.core.query.Update;
98+
import org.springframework.data.mongodb.util.CloseableIterator;
9899
import org.springframework.jca.cci.core.ConnectionCallback;
99100
import org.springframework.util.Assert;
100101
import org.springframework.util.CollectionUtils;
@@ -103,6 +104,7 @@
103104

104105
import com.mongodb.BasicDBObject;
105106
import com.mongodb.CommandResult;
107+
import com.mongodb.Cursor;
106108
import com.mongodb.DB;
107109
import com.mongodb.DBCollection;
108110
import com.mongodb.DBCursor;
@@ -2262,4 +2264,81 @@ public GeoResult<T> doWith(DBObject object) {
22622264
}
22632265
}
22642266

2267+
@Override
2268+
public CloseableIterator<Object> getStreamingMappingCursor(Query query, Class<?> entityClass) {
2269+
2270+
MongoPersistentEntity<?> entity = mappingContext.getPersistentEntity(entityClass);
2271+
2272+
DBCollection collection = getCollection(getCollectionName(entityClass));
2273+
2274+
DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), entity);
2275+
DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entity);
2276+
2277+
return new CloseableIterableCusorAdapter(collection.find(mappedQuery, mappedFields), mongoConverter, entityClass);
2278+
}
2279+
2280+
static class CloseableIterableCusorAdapter implements CloseableIterator<Object> {
2281+
2282+
private volatile Cursor cursor;
2283+
private MongoConverter converter;
2284+
private Class<?> entityClass;
2285+
2286+
public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, Class<?> entityClass) {
2287+
2288+
this.cursor = cursor;
2289+
this.converter = converter;
2290+
this.entityClass = entityClass;
2291+
}
2292+
2293+
@Override
2294+
public boolean hasNext() {
2295+
2296+
if (cursor == null) {
2297+
return false;
2298+
}
2299+
2300+
try {
2301+
return cursor.hasNext();
2302+
} catch (Exception ex) {
2303+
throw new RuntimeException("error on hasNext");
2304+
}
2305+
}
2306+
2307+
@Override
2308+
public Object next() {
2309+
2310+
if (cursor == null) {
2311+
return null;
2312+
}
2313+
2314+
try {
2315+
2316+
DBObject item = cursor.next();
2317+
Object converted = converter.read(entityClass, item);
2318+
2319+
return converted;
2320+
} catch (Exception ex) {
2321+
throw new RuntimeException("error on next");
2322+
}
2323+
}
2324+
2325+
@Override
2326+
public void close() throws IOException {
2327+
2328+
Cursor c = cursor;
2329+
try {
2330+
c.close();
2331+
} finally {
2332+
2333+
cursor = null;
2334+
converter = null;
2335+
entityClass = null;
2336+
}
2337+
}
2338+
2339+
@Override
2340+
public Iterator<Object> iterator() {
2341+
return this;
2342+
}
2343+
}
22652344
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 the original author or authors.
2+
* Copyright 2010-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,6 @@
1818
import java.util.Collections;
1919
import java.util.List;
2020

21-
import org.springframework.core.convert.ConversionService;
22-
import org.springframework.core.convert.support.DefaultConversionService;
2321
import org.springframework.data.domain.PageImpl;
2422
import org.springframework.data.domain.Pageable;
2523
import org.springframework.data.domain.Slice;
@@ -32,10 +30,12 @@
3230
import org.springframework.data.mongodb.core.MongoOperations;
3331
import org.springframework.data.mongodb.core.query.NearQuery;
3432
import org.springframework.data.mongodb.core.query.Query;
33+
import org.springframework.data.mongodb.util.CloseableIterator;
3534
import org.springframework.data.repository.query.ParameterAccessor;
3635
import org.springframework.data.repository.query.RepositoryQuery;
3736
import org.springframework.data.util.TypeInformation;
3837
import org.springframework.util.Assert;
38+
import org.springframework.util.ClassUtils;
3939

4040
import com.mongodb.WriteResult;
4141

@@ -48,8 +48,6 @@
4848
*/
4949
public abstract class AbstractMongoQuery implements RepositoryQuery {
5050

51-
private static final ConversionService CONVERSION_SERVICE = new DefaultConversionService();
52-
5351
private final MongoQueryMethod method;
5452
private final MongoOperations operations;
5553

@@ -87,6 +85,10 @@ public Object execute(Object[] parameters) {
8785

8886
applyQueryMetaAttributesWhenPresent(query);
8987

88+
if (method.isStreamQuery()) {
89+
return new StreamExecution().execute(query);
90+
}
91+
9092
if (isDeleteQuery()) {
9193
return new DeleteExecution().execute(query);
9294
} else if (method.isGeoNearQuery() && method.isPageQuery()) {
@@ -415,4 +417,22 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata<?> metada
415417
return writeResult != null ? writeResult.getN() : 0L;
416418
}
417419
}
420+
421+
final class StreamExecution extends Execution {
422+
423+
private final boolean IS_JAVA_7 = ClassUtils.isPresent("java.lang.AutoCloseable",
424+
StreamExecution.class.getClassLoader());
425+
426+
@Override
427+
Object execute(Query query) {
428+
429+
Class<?> entityType = getQueryMethod().getEntityInformation().getJavaType();
430+
CloseableIterator<Object> result = operations.getStreamingMappingCursor(query, entityType);
431+
return IS_JAVA_7 ? wrapInAutoCloseableIterator(result) : result;
432+
}
433+
434+
private CloseableIterator<Object> wrapInAutoCloseableIterator(CloseableIterator<Object> iter) {
435+
return new ForwardingAutoCloseableIterator<Object>(iter);
436+
}
437+
}
418438
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.repository.query;
17+
18+
import java.io.IOException;
19+
import java.util.Iterator;
20+
21+
import org.springframework.data.mongodb.util.AutoCloseableIterator;
22+
import org.springframework.data.mongodb.util.CloseableIterator;
23+
24+
/**
25+
* An {@link CloseableIterator} that can be used with TRW in Java 7.
26+
*
27+
* @author Thomas Darimont
28+
* @param <T>
29+
*/
30+
class ForwardingAutoCloseableIterator<T> implements AutoCloseableIterator<T> {
31+
32+
private final CloseableIterator<T> target;
33+
34+
public ForwardingAutoCloseableIterator(CloseableIterator<T> target) {
35+
this.target = target;
36+
}
37+
38+
@Override
39+
public boolean hasNext() {
40+
return target.hasNext();
41+
}
42+
43+
@Override
44+
public T next() {
45+
return target.next();
46+
}
47+
48+
@Override
49+
public void close() throws IOException {
50+
this.target.close();
51+
}
52+
53+
@Override
54+
public Iterator<T> iterator() {
55+
return this;
56+
}
57+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2014 the original author or authors.
2+
* Copyright 2011-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@
2929
import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty;
3030
import org.springframework.data.mongodb.repository.Meta;
3131
import org.springframework.data.mongodb.repository.Query;
32+
import org.springframework.data.mongodb.util.CloseableIterator;
3233
import org.springframework.data.repository.core.RepositoryMetadata;
3334
import org.springframework.data.repository.query.QueryMethod;
3435
import org.springframework.data.util.ClassTypeInformation;
@@ -233,4 +234,10 @@ public org.springframework.data.mongodb.core.query.Meta getQueryMetaAttributes()
233234

234235
return metaAttributes;
235236
}
237+
238+
public boolean isStreamQuery() {
239+
240+
Class<?> returnType = method.getReturnType();
241+
return org.springframework.util.ClassUtils.isAssignable(CloseableIterator.class, returnType);
242+
}
236243
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.util;
17+
18+
/**
19+
* @author Thomas Darimont
20+
* @param <T>
21+
*/
22+
public interface AutoCloseableIterator<T> extends CloseableIterator<T>, AutoCloseable {}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.util;
17+
18+
import java.io.Closeable;
19+
import java.util.Iterator;
20+
21+
/**
22+
* @author Thomas Darimont
23+
* @param <T>
24+
*/
25+
public interface CloseableIterator<T> extends Iterator<T>, Closeable, Iterable<T> {
26+
27+
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 the original author or authors.
2+
* Copyright 2010-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -31,6 +31,7 @@
3131
import org.springframework.data.geo.Point;
3232
import org.springframework.data.geo.Polygon;
3333
import org.springframework.data.mongodb.repository.Person.Sex;
34+
import org.springframework.data.mongodb.util.AutoCloseableIterator;
3435
import org.springframework.data.querydsl.QueryDslPredicateExecutor;
3536

3637
/**
@@ -320,4 +321,7 @@ public interface PersonRepository extends MongoRepository<Person, String>, Query
320321

321322
@Query("{ ?0 : ?1 }")
322323
List<Person> findByKeyValue(String key, String value);
324+
325+
@Query("{firstname:{$in:?0}}")
326+
AutoCloseableIterator<Person> findByCustomQueryWithCursorByFirstnames(List<String> firstnames);
323327
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2013 the original author or authors.
2+
* Copyright 2010-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,6 +15,11 @@
1515
*/
1616
package org.springframework.data.mongodb.repository;
1717

18+
import java.util.Arrays;
19+
import java.util.List;
20+
21+
import org.junit.Test;
22+
import org.springframework.data.mongodb.util.AutoCloseableIterator;
1823
import org.springframework.test.context.ContextConfiguration;
1924

2025
/**
@@ -24,4 +29,25 @@
2429
* @author Thomas Darimont
2530
*/
2631
@ContextConfiguration
27-
public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests {}
32+
public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests {
33+
34+
@Test
35+
public void findsAllMusiciansWithCursor() throws Exception {
36+
37+
repository.save(new Person("foo", "bar"));
38+
repository.save(new Person("bar", "bar"));
39+
repository.save(new Person("fuu", "bar"));
40+
repository.save(new Person("notfound", "bar"));
41+
42+
List<String> firstNames = Arrays.asList("bar", "foo", "fuu");
43+
AutoCloseableIterator<Person> result = repository.findByCustomQueryWithCursorByFirstnames(firstNames);
44+
45+
try {
46+
for (Person person : result) {
47+
System.out.printf("%s%n", person);
48+
}
49+
} finally {
50+
result.close();
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)