Skip to content

Commit d4f1ef8

Browse files
Thomas Darimontodrotbohm
authored andcommitted
DATAMONGO-1165 - Add support for Java 8 Stream as return type for repository methods.
Added support for a MongoDB Cursor backed Iterator that allows the usage of a Java 8 Stream at the repository level. Original pull request: #274.
1 parent a86d704 commit d4f1ef8

File tree

6 files changed

+187
-12
lines changed

6 files changed

+187
-12
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@
3333
import org.springframework.data.mongodb.core.query.NearQuery;
3434
import org.springframework.data.mongodb.core.query.Query;
3535
import org.springframework.data.mongodb.core.query.Update;
36+
import org.springframework.data.util.CloseableIterator;
3637

3738
import com.mongodb.CommandResult;
39+
import com.mongodb.Cursor;
3840
import com.mongodb.DB;
3941
import com.mongodb.DBCollection;
4042
import com.mongodb.DBObject;
@@ -165,6 +167,20 @@ public interface MongoOperations {
165167
@Deprecated
166168
<T> T executeInSession(DbCallback<T> action);
167169

170+
/**
171+
* Executes the given {@link Query} on the entity collection of the specified {@code entityType} backed by a Mongo DB
172+
* {@link Cursor}.
173+
* <p>
174+
* Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed.
175+
*
176+
* @param <T> element return type
177+
* @param query
178+
* @param entityType
179+
* @return
180+
* @since 1.7
181+
*/
182+
<T> CloseableIterator<T> executeAsStream(Query query, Class<T> entityType);
183+
168184
/**
169185
* Create an uncapped collection with a name based on the provided entity class.
170186
*

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 102 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@
9595
import org.springframework.data.mongodb.core.query.NearQuery;
9696
import org.springframework.data.mongodb.core.query.Query;
9797
import org.springframework.data.mongodb.core.query.Update;
98+
import org.springframework.data.util.CloseableIterator;
99+
import org.springframework.data.mongodb.util.CloseableIterator;
98100
import org.springframework.data.mongodb.util.MongoClientVersion;
99101
import org.springframework.jca.cci.core.ConnectionCallback;
100102
import org.springframework.util.Assert;
@@ -106,6 +108,7 @@
106108
import com.mongodb.BasicDBObject;
107109
import com.mongodb.Bytes;
108110
import com.mongodb.CommandResult;
111+
import com.mongodb.Cursor;
109112
import com.mongodb.DB;
110113
import com.mongodb.DBCollection;
111114
import com.mongodb.DBCursor;
@@ -315,6 +318,31 @@ public MongoConverter getConverter() {
315318
return this.mongoConverter;
316319
}
317320

321+
/* (non-Javadoc)
322+
* @see org.springframework.data.mongodb.core.MongoOperations#executeAsStream(org.springframework.data.mongodb.core.query.Query, java.lang.Class)
323+
*/
324+
@Override
325+
public <T> CloseableIterator<T> executeAsStream(final Query query, final Class<T> entityType) {
326+
327+
return execute(entityType, new CollectionCallback<CloseableIterator<T>>() {
328+
329+
@Override
330+
public CloseableIterator<T> doInCollection(DBCollection collection) throws MongoException, DataAccessException {
331+
332+
MongoPersistentEntity<?> persistentEntity = mappingContext.getPersistentEntity(entityType);
333+
334+
DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity);
335+
DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity);
336+
337+
DBCursor cursor = collection.find(mappedQuery, mappedFields);
338+
ReadDbObjectCallback<T> readCallback = new ReadDbObjectCallback<T>(mongoConverter, entityType);
339+
340+
return new CloseableIterableCusorAdapter<T>(cursor, exceptionTranslator, readCallback);
341+
}
342+
});
343+
344+
}
345+
318346
public String getCollectionName(Class<?> entityClass) {
319347
return this.determineCollectionName(entityClass);
320348
}
@@ -2122,9 +2150,10 @@ public DBObject doInCollection(DBCollection collection) throws MongoException, D
21222150
* Simple internal callback to allow operations on a {@link DBObject}.
21232151
*
21242152
* @author Oliver Gierke
2153+
* @author Thomas Darimont
21252154
*/
21262155

2127-
private interface DbObjectCallback<T> {
2156+
static interface DbObjectCallback<T> {
21282157

21292158
T doWith(DBObject object);
21302159
}
@@ -2287,4 +2316,76 @@ public GeoResult<T> doWith(DBObject object) {
22872316
}
22882317
}
22892318

2319+
/**
2320+
* A {@link CloseableIterator} that is backed by a MongoDB {@link Cursor}.
2321+
*
2322+
* @since 1.7
2323+
* @author Thomas Darimont
2324+
*/
2325+
static class CloseableIterableCusorAdapter<T> implements CloseableIterator<T> {
2326+
2327+
private volatile Cursor cursor;
2328+
private PersistenceExceptionTranslator exceptionTranslator;
2329+
private DbObjectCallback<T> objectReadCallback;
2330+
2331+
/**
2332+
* Creates a new {@link CloseableIterableCusorAdapter} backed by the given {@link Cursor}.
2333+
*
2334+
* @param cursor
2335+
* @param exceptionTranslator
2336+
* @param objectReadCallback
2337+
*/
2338+
public CloseableIterableCusorAdapter(Cursor cursor, PersistenceExceptionTranslator exceptionTranslator,
2339+
DbObjectCallback<T> objectReadCallback) {
2340+
2341+
this.cursor = cursor;
2342+
this.exceptionTranslator = exceptionTranslator;
2343+
this.objectReadCallback = objectReadCallback;
2344+
}
2345+
2346+
@Override
2347+
public boolean hasNext() {
2348+
2349+
if (cursor == null) {
2350+
return false;
2351+
}
2352+
2353+
try {
2354+
return cursor.hasNext();
2355+
} catch (RuntimeException ex) {
2356+
throw exceptionTranslator.translateExceptionIfPossible(ex);
2357+
}
2358+
}
2359+
2360+
@Override
2361+
public T next() {
2362+
2363+
if (cursor == null) {
2364+
return null;
2365+
}
2366+
2367+
try {
2368+
DBObject item = cursor.next();
2369+
T converted = objectReadCallback.doWith(item);
2370+
return converted;
2371+
} catch (RuntimeException ex) {
2372+
throw exceptionTranslator.translateExceptionIfPossible(ex);
2373+
}
2374+
}
2375+
2376+
@Override
2377+
public void close() {
2378+
2379+
Cursor c = cursor;
2380+
try {
2381+
c.close();
2382+
} catch (RuntimeException ex) {
2383+
throw exceptionTranslator.translateExceptionIfPossible(ex);
2384+
} finally {
2385+
cursor = null;
2386+
exceptionTranslator = null;
2387+
objectReadCallback = null;
2388+
}
2389+
}
2390+
}
22902391
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 the original author or authors.
2+
* Copyright 2010-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,9 +17,10 @@
1717

1818
import java.util.Collections;
1919
import java.util.List;
20+
import java.util.Spliterator;
21+
import java.util.Spliterators;
22+
import java.util.stream.StreamSupport;
2023

21-
import org.springframework.core.convert.ConversionService;
22-
import org.springframework.core.convert.support.DefaultConversionService;
2324
import org.springframework.data.domain.PageImpl;
2425
import org.springframework.data.domain.Pageable;
2526
import org.springframework.data.domain.Slice;
@@ -34,6 +35,8 @@
3435
import org.springframework.data.mongodb.core.query.Query;
3536
import org.springframework.data.repository.query.ParameterAccessor;
3637
import org.springframework.data.repository.query.RepositoryQuery;
38+
import org.springframework.data.util.CloseableIterator;
39+
import org.springframework.data.util.CloseableIteratorDisposingRunnable;
3740
import org.springframework.data.util.TypeInformation;
3841
import org.springframework.util.Assert;
3942

@@ -48,8 +51,6 @@
4851
*/
4952
public abstract class AbstractMongoQuery implements RepositoryQuery {
5053

51-
private static final ConversionService CONVERSION_SERVICE = new DefaultConversionService();
52-
5354
private final MongoQueryMethod method;
5455
private final MongoOperations operations;
5556

@@ -87,7 +88,9 @@ public Object execute(Object[] parameters) {
8788

8889
applyQueryMetaAttributesWhenPresent(query);
8990

90-
if (isDeleteQuery()) {
91+
if (method.isStreamQuery()) {
92+
return new StreamExecution().execute(query);
93+
} else if (isDeleteQuery()) {
9194
return new DeleteExecution().execute(query);
9295
} else if (method.isGeoNearQuery() && method.isPageQuery()) {
9396

@@ -415,4 +418,26 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata<?> metada
415418
return writeResult != null ? writeResult.getN() : 0L;
416419
}
417420
}
421+
422+
/**
423+
* @author Thomas Darimont
424+
* @since 1.7
425+
*/
426+
final class StreamExecution extends Execution {
427+
428+
/* (non-Javadoc)
429+
* @see org.springframework.data.mongodb.repository.query.AbstractMongoQuery.Execution#execute(org.springframework.data.mongodb.core.query.Query)
430+
*/
431+
@Override
432+
Object execute(Query query) {
433+
434+
Class<?> entityType = getQueryMethod().getEntityInformation().getJavaType();
435+
436+
@SuppressWarnings("unchecked")
437+
CloseableIterator<Object> result = (CloseableIterator<Object>) operations.executeAsStream(query, entityType);
438+
Spliterator<Object> spliterator = Spliterators.spliteratorUnknownSize(result, Spliterator.NONNULL);
439+
440+
return StreamSupport.stream(spliterator, false).onClose(new CloseableIteratorDisposingRunnable(result));
441+
}
442+
}
418443
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2011-2014 the original author or authors.
2+
* Copyright 2011-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -65,7 +65,7 @@ public abstract class AbstractPersonRepositoryIntegrationTests {
6565

6666
@Autowired MongoOperations operations;
6767

68-
Person dave, oliver, carter, boyd, stefan, leroi, alicia;
68+
protected Person dave, oliver, carter, boyd, stefan, leroi, alicia;
6969
QPerson person;
7070

7171
List<Person> all;

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2014 the original author or authors.
2+
* Copyright 2010-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818
import java.util.Collection;
1919
import java.util.Date;
2020
import java.util.List;
21+
import java.util.stream.Stream;
2122

2223
import org.springframework.data.domain.Page;
2324
import org.springframework.data.domain.Pageable;
@@ -320,4 +321,10 @@ public interface PersonRepository extends MongoRepository<Person, String>, Query
320321

321322
@Query("{ ?0 : ?1 }")
322323
List<Person> findByKeyValue(String key, String value);
324+
325+
/**
326+
* @see DATAMONGO-1165
327+
*/
328+
@Query("{firstname:{$in:?0}}")
329+
Stream<Person> findByCustomQueryWithStreamingCursorByFirstnames(List<String> firstnames);
323330
}

spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2010-2013 the original author or authors.
2+
* Copyright 2010-2015 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -15,6 +15,15 @@
1515
*/
1616
package org.springframework.data.mongodb.repository;
1717

18+
import static org.hamcrest.Matchers.*;
19+
import static org.junit.Assert.*;
20+
21+
import java.util.Arrays;
22+
import java.util.List;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
25+
26+
import org.junit.Test;
1827
import org.springframework.test.context.ContextConfiguration;
1928

2029
/**
@@ -24,4 +33,21 @@
2433
* @author Thomas Darimont
2534
*/
2635
@ContextConfiguration
27-
public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests {}
36+
public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests {
37+
38+
/**
39+
* @see DATAMONGO-1165
40+
*/
41+
@Test
42+
public void shouldAllowReturningJava8StreamInCustomQuery() throws Exception {
43+
44+
Stream<Person> result = repository.findByCustomQueryWithStreamingCursorByFirstnames(Arrays.asList("Dave"));
45+
46+
try {
47+
List<Person> readPersons = result.collect(Collectors.<Person> toList());
48+
assertThat(readPersons, hasItems(dave));
49+
} finally {
50+
result.close();
51+
}
52+
}
53+
}

0 commit comments

Comments
 (0)