action);
+ /**
+ * Executes the given {@link Query} on the entity collection of the specified {@code entityType} backed by a Mongo DB
+ * {@link Cursor}.
+ *
+ * Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed.
+ *
+ * @param element return type
+ * @param query
+ * @param entityType
+ * @return
+ * @since 1.7
+ */
+ CloseableIterator executeAsStream(Query query, Class entityType);
+
/**
* Create an uncapped collection with a name based on the provided entity class.
*
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
index f8f391c3da..3a51e66f28 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2014 the original author or authors.
+ * Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -95,6 +95,7 @@
import org.springframework.data.mongodb.core.query.NearQuery;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.mongodb.core.query.Update;
+import org.springframework.data.util.CloseableIterator;
import org.springframework.jca.cci.core.ConnectionCallback;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
@@ -103,6 +104,7 @@
import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
+import com.mongodb.Cursor;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
@@ -312,6 +314,31 @@ public MongoConverter getConverter() {
return this.mongoConverter;
}
+ /* (non-Javadoc)
+ * @see org.springframework.data.mongodb.core.MongoOperations#executeAsStream(org.springframework.data.mongodb.core.query.Query, java.lang.Class)
+ */
+ @Override
+ public CloseableIterator executeAsStream(final Query query, final Class entityType) {
+
+ return execute(entityType, new CollectionCallback>() {
+
+ @Override
+ public CloseableIterator doInCollection(DBCollection collection) throws MongoException, DataAccessException {
+
+ MongoPersistentEntity> persistentEntity = mappingContext.getPersistentEntity(entityType);
+
+ DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity);
+ DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity);
+
+ DBCursor cursor = collection.find(mappedQuery, mappedFields);
+ ReadDbObjectCallback readCallback = new ReadDbObjectCallback(mongoConverter, entityType);
+
+ return new CloseableIterableCusorAdapter(cursor, exceptionTranslator, readCallback);
+ }
+ });
+
+ }
+
public String getCollectionName(Class> entityClass) {
return this.determineCollectionName(entityClass);
}
@@ -2097,9 +2124,10 @@ public DBObject doInCollection(DBCollection collection) throws MongoException, D
* Simple internal callback to allow operations on a {@link DBObject}.
*
* @author Oliver Gierke
+ * @author Thomas Darimont
*/
- private interface DbObjectCallback {
+ static interface DbObjectCallback {
T doWith(DBObject object);
}
@@ -2262,4 +2290,76 @@ public GeoResult doWith(DBObject object) {
}
}
+ /**
+ * A {@link CloseableIterator} that is backed by a MongoDB {@link Cursor}.
+ *
+ * @since 1.7
+ * @author Thomas Darimont
+ */
+ static class CloseableIterableCusorAdapter implements CloseableIterator {
+
+ private volatile Cursor cursor;
+ private PersistenceExceptionTranslator exceptionTranslator;
+ private DbObjectCallback objectReadCallback;
+
+ /**
+ * Creates a new {@link CloseableIterableCusorAdapter} backed by the given {@link Cursor}.
+ *
+ * @param cursor
+ * @param exceptionTranslator
+ * @param objectReadCallback
+ */
+ public CloseableIterableCusorAdapter(Cursor cursor, PersistenceExceptionTranslator exceptionTranslator,
+ DbObjectCallback objectReadCallback) {
+
+ this.cursor = cursor;
+ this.exceptionTranslator = exceptionTranslator;
+ this.objectReadCallback = objectReadCallback;
+ }
+
+ @Override
+ public boolean hasNext() {
+
+ if (cursor == null) {
+ return false;
+ }
+
+ try {
+ return cursor.hasNext();
+ } catch (RuntimeException ex) {
+ throw exceptionTranslator.translateExceptionIfPossible(ex);
+ }
+ }
+
+ @Override
+ public T next() {
+
+ if (cursor == null) {
+ return null;
+ }
+
+ try {
+ DBObject item = cursor.next();
+ T converted = objectReadCallback.doWith(item);
+ return converted;
+ } catch (RuntimeException ex) {
+ throw exceptionTranslator.translateExceptionIfPossible(ex);
+ }
+ }
+
+ @Override
+ public void close() {
+
+ Cursor c = cursor;
+ try {
+ c.close();
+ } catch (RuntimeException ex) {
+ throw exceptionTranslator.translateExceptionIfPossible(ex);
+ } finally {
+ cursor = null;
+ exceptionTranslator = null;
+ objectReadCallback = null;
+ }
+ }
+ }
}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java
index f6676fecbc..c814560f6b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2014 the original author or authors.
+ * Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,8 +18,6 @@
import java.util.Collections;
import java.util.List;
-import org.springframework.core.convert.ConversionService;
-import org.springframework.core.convert.support.DefaultConversionService;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
@@ -34,6 +32,7 @@
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.data.repository.query.ParameterAccessor;
import org.springframework.data.repository.query.RepositoryQuery;
+import org.springframework.data.util.Java8StreamUtils;
import org.springframework.data.util.TypeInformation;
import org.springframework.util.Assert;
@@ -48,8 +47,6 @@
*/
public abstract class AbstractMongoQuery implements RepositoryQuery {
- private static final ConversionService CONVERSION_SERVICE = new DefaultConversionService();
-
private final MongoQueryMethod method;
private final MongoOperations operations;
@@ -87,7 +84,9 @@ public Object execute(Object[] parameters) {
applyQueryMetaAttributesWhenPresent(query);
- if (isDeleteQuery()) {
+ if (method.isStreamQuery()) {
+ return new StreamExecution().execute(query);
+ } else if (isDeleteQuery()) {
return new DeleteExecution().execute(query);
} else if (method.isGeoNearQuery() && method.isPageQuery()) {
@@ -415,4 +414,22 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata> metada
return writeResult != null ? writeResult.getN() : 0L;
}
}
+
+ /**
+ * @author Thomas Darimont
+ * @since 1.7
+ */
+ final class StreamExecution extends Execution {
+
+ /* (non-Javadoc)
+ * @see org.springframework.data.mongodb.repository.query.AbstractMongoQuery.Execution#execute(org.springframework.data.mongodb.core.query.Query)
+ */
+ @Override
+ Object execute(Query query) {
+
+ Class> entityType = getQueryMethod().getEntityInformation().getJavaType();
+
+ return Java8StreamUtils.createStreamFromIterator(operations.executeAsStream(query, entityType));
+ }
+ }
}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java
index ee50e0da21..50960e8a88 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2011-2014 the original author or authors.
+ * Copyright 2011-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java
index 5423ad68ca..3f2c077237 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2011-2014 the original author or authors.
+ * Copyright 2011-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -65,7 +65,7 @@ public abstract class AbstractPersonRepositoryIntegrationTests {
@Autowired MongoOperations operations;
- Person dave, oliver, carter, boyd, stefan, leroi, alicia;
+ protected Person dave, oliver, carter, boyd, stefan, leroi, alicia;
QPerson person;
List all;
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java
index 9d95937428..2d5d0c12af 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2014 the original author or authors.
+ * Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
import java.util.Collection;
import java.util.Date;
import java.util.List;
+import java.util.stream.Stream;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
@@ -320,4 +321,10 @@ public interface PersonRepository extends MongoRepository, Query
@Query("{ ?0 : ?1 }")
List findByKeyValue(String key, String value);
+
+ /**
+ * @see DATAMONGO-1165
+ */
+ @Query("{firstname:{$in:?0}}")
+ Stream findByCustomQueryWithStreamingCursorByFirstnames(List firstnames);
}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java
index 8dfde08b7f..98fdc43aef 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2013 the original author or authors.
+ * Copyright 2010-2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,6 +15,15 @@
*/
package org.springframework.data.mongodb.repository;
+import static org.hamcrest.Matchers.*;
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.junit.Test;
import org.springframework.test.context.ContextConfiguration;
/**
@@ -24,4 +33,21 @@
* @author Thomas Darimont
*/
@ContextConfiguration
-public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests {}
+public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests {
+
+ /**
+ * @see DATAMONGO-1165
+ */
+ @Test
+ public void shouldAllowReturningJava8StreamInCustomQuery() throws Exception {
+
+ Stream result = repository.findByCustomQueryWithStreamingCursorByFirstnames(Arrays.asList("Dave"));
+
+ try {
+ List readPersons = result.collect(Collectors. toList());
+ assertThat(readPersons, hasItems(dave));
+ } finally {
+ result.close();
+ }
+ }
+}