From 3008bdd14cc1e23cdfa8e2ad87befb19cba58383 Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Mon, 16 Feb 2015 15:43:59 +0100 Subject: [PATCH 1/8] DATAMONGO-1165 - Add support for Streaming large result lists. Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-cross-store/pom.xml | 4 ++-- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb-log4j/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index d5c7aaf0c4..c17209d782 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 1.7.0.BUILD-SNAPSHOT + 1.7.0.DATAMONGO-1165-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-cross-store/pom.xml b/spring-data-mongodb-cross-store/pom.xml index c7610beee4..cfae107863 100644 --- a/spring-data-mongodb-cross-store/pom.xml +++ b/spring-data-mongodb-cross-store/pom.xml @@ -6,7 +6,7 @@ org.springframework.data spring-data-mongodb-parent - 1.7.0.BUILD-SNAPSHOT + 1.7.0.DATAMONGO-1165-SNAPSHOT ../pom.xml @@ -48,7 +48,7 @@ org.springframework.data spring-data-mongodb - 1.7.0.BUILD-SNAPSHOT + 1.7.0.DATAMONGO-1165-SNAPSHOT diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 13110137b6..14b44bc516 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 1.7.0.BUILD-SNAPSHOT + 1.7.0.DATAMONGO-1165-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-log4j/pom.xml b/spring-data-mongodb-log4j/pom.xml index 6ff09e4577..840768b3d3 100644 --- a/spring-data-mongodb-log4j/pom.xml +++ b/spring-data-mongodb-log4j/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 1.7.0.BUILD-SNAPSHOT + 1.7.0.DATAMONGO-1165-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 7c2d818c42..7773b13a21 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 1.7.0.BUILD-SNAPSHOT + 1.7.0.DATAMONGO-1165-SNAPSHOT ../pom.xml From a7533ea122b11e4cc6d877374478942128a3c264 Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Mon, 16 Feb 2015 16:02:29 +0100 Subject: [PATCH 2/8] DATAMONGO-1165 - Add support for Streaming large result lists. Initial PoC for a MongoDB Cursor backed Iterator that allows fetching of large result sets. Original pull request: #274. --- .../data/mongodb/core/MongoOperations.java | 3 + .../data/mongodb/core/MongoTemplate.java | 81 ++++++++++++++++++- .../repository/query/AbstractMongoQuery.java | 30 +++++-- .../ForwardingAutoCloseableIterator.java | 57 +++++++++++++ .../repository/query/MongoQueryMethod.java | 9 ++- .../mongodb/util/AutoCloseableIterator.java | 22 +++++ .../data/mongodb/util/CloseableIterator.java | 27 +++++++ .../mongodb/repository/PersonRepository.java | 6 +- .../PersonRepositoryIntegrationTests.java | 30 ++++++- 9 files changed, 255 insertions(+), 10 deletions(-) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index e38912bfe9..03719dfdd6 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -33,6 +33,7 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.mongodb.util.CloseableIterator; import com.mongodb.CommandResult; import com.mongodb.DBCollection; @@ -959,4 +960,6 @@ T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl * @return */ MongoConverter getConverter(); + + CloseableIterator getStreamingMappingCursor(Query query, Class entityType); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index f8f391c3da..bc80fb8755 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2014 the original author or authors. + * Copyright 2010-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -95,6 +95,7 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.mongodb.util.CloseableIterator; import org.springframework.jca.cci.core.ConnectionCallback; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -103,6 +104,7 @@ import com.mongodb.BasicDBObject; import com.mongodb.CommandResult; +import com.mongodb.Cursor; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; @@ -2262,4 +2264,81 @@ public GeoResult doWith(DBObject object) { } } + @Override + public CloseableIterator getStreamingMappingCursor(Query query, Class entityClass) { + + MongoPersistentEntity entity = mappingContext.getPersistentEntity(entityClass); + + DBCollection collection = getCollection(getCollectionName(entityClass)); + + DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), entity); + DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entity); + + return new CloseableIterableCusorAdapter(collection.find(mappedQuery, mappedFields), mongoConverter, entityClass); + } + + static class CloseableIterableCusorAdapter implements CloseableIterator { + + private volatile Cursor cursor; + private MongoConverter converter; + private Class entityClass; + + public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, Class entityClass) { + + this.cursor = cursor; + this.converter = converter; + this.entityClass = entityClass; + } + + @Override + public boolean hasNext() { + + if (cursor == null) { + return false; + } + + try { + return cursor.hasNext(); + } catch (Exception ex) { + throw new RuntimeException("error on hasNext"); + } + } + + @Override + public Object next() { + + if (cursor == null) { + return null; + } + + try { + + DBObject item = cursor.next(); + Object converted = converter.read(entityClass, item); + + return converted; + } catch (Exception ex) { + throw new RuntimeException("error on next"); + } + } + + @Override + public void close() throws IOException { + + Cursor c = cursor; + try { + c.close(); + } finally { + + cursor = null; + converter = null; + entityClass = null; + } + } + + @Override + public Iterator iterator() { + return this; + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java index f6676fecbc..cde8b5e13d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2014 the original author or authors. + * Copyright 2010-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,8 +18,6 @@ import java.util.Collections; import java.util.List; -import org.springframework.core.convert.ConversionService; -import org.springframework.core.convert.support.DefaultConversionService; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; @@ -32,10 +30,12 @@ import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.util.CloseableIterator; import org.springframework.data.repository.query.ParameterAccessor; import org.springframework.data.repository.query.RepositoryQuery; import org.springframework.data.util.TypeInformation; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import com.mongodb.WriteResult; @@ -48,8 +48,6 @@ */ public abstract class AbstractMongoQuery implements RepositoryQuery { - private static final ConversionService CONVERSION_SERVICE = new DefaultConversionService(); - private final MongoQueryMethod method; private final MongoOperations operations; @@ -87,6 +85,10 @@ public Object execute(Object[] parameters) { applyQueryMetaAttributesWhenPresent(query); + if (method.isStreamQuery()) { + return new StreamExecution().execute(query); + } + if (isDeleteQuery()) { return new DeleteExecution().execute(query); } else if (method.isGeoNearQuery() && method.isPageQuery()) { @@ -415,4 +417,22 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata metada return writeResult != null ? writeResult.getN() : 0L; } } + + final class StreamExecution extends Execution { + + private final boolean IS_JAVA_7 = ClassUtils.isPresent("java.lang.AutoCloseable", + StreamExecution.class.getClassLoader()); + + @Override + Object execute(Query query) { + + Class entityType = getQueryMethod().getEntityInformation().getJavaType(); + CloseableIterator result = operations.getStreamingMappingCursor(query, entityType); + return IS_JAVA_7 ? wrapInAutoCloseableIterator(result) : result; + } + + private CloseableIterator wrapInAutoCloseableIterator(CloseableIterator iter) { + return new ForwardingAutoCloseableIterator(iter); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java new file mode 100644 index 0000000000..836894032a --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java @@ -0,0 +1,57 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.repository.query; + +import java.io.IOException; +import java.util.Iterator; + +import org.springframework.data.mongodb.util.AutoCloseableIterator; +import org.springframework.data.mongodb.util.CloseableIterator; + +/** + * An {@link CloseableIterator} that can be used with TRW in Java 7. + * + * @author Thomas Darimont + * @param + */ +class ForwardingAutoCloseableIterator implements AutoCloseableIterator { + + private final CloseableIterator target; + + public ForwardingAutoCloseableIterator(CloseableIterator target) { + this.target = target; + } + + @Override + public boolean hasNext() { + return target.hasNext(); + } + + @Override + public T next() { + return target.next(); + } + + @Override + public void close() throws IOException { + this.target.close(); + } + + @Override + public Iterator iterator() { + return this; + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java index ee50e0da21..b4a59c0570 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2014 the original author or authors. + * Copyright 2011-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; import org.springframework.data.mongodb.repository.Meta; import org.springframework.data.mongodb.repository.Query; +import org.springframework.data.mongodb.util.CloseableIterator; import org.springframework.data.repository.core.RepositoryMetadata; import org.springframework.data.repository.query.QueryMethod; import org.springframework.data.util.ClassTypeInformation; @@ -233,4 +234,10 @@ public org.springframework.data.mongodb.core.query.Meta getQueryMetaAttributes() return metaAttributes; } + + public boolean isStreamQuery() { + + Class returnType = method.getReturnType(); + return org.springframework.util.ClassUtils.isAssignable(CloseableIterator.class, returnType); + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java new file mode 100644 index 0000000000..34dfac718a --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java @@ -0,0 +1,22 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.util; + +/** + * @author Thomas Darimont + * @param + */ +public interface AutoCloseableIterator extends CloseableIterator, AutoCloseable {} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java new file mode 100644 index 0000000000..9885deb806 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java @@ -0,0 +1,27 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.util; + +import java.io.Closeable; +import java.util.Iterator; + +/** + * @author Thomas Darimont + * @param + */ +public interface CloseableIterator extends Iterator, Closeable, Iterable { + +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java index 9d95937428..880222693e 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2014 the original author or authors. + * Copyright 2010-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ import org.springframework.data.geo.Point; import org.springframework.data.geo.Polygon; import org.springframework.data.mongodb.repository.Person.Sex; +import org.springframework.data.mongodb.util.AutoCloseableIterator; import org.springframework.data.querydsl.QueryDslPredicateExecutor; /** @@ -320,4 +321,7 @@ public interface PersonRepository extends MongoRepository, Query @Query("{ ?0 : ?1 }") List findByKeyValue(String key, String value); + + @Query("{firstname:{$in:?0}}") + AutoCloseableIterator findByCustomQueryWithCursorByFirstnames(List firstnames); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java index 8dfde08b7f..dd14e0ccfa 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2013 the original author or authors. + * Copyright 2010-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,11 @@ */ package org.springframework.data.mongodb.repository; +import java.util.Arrays; +import java.util.List; + +import org.junit.Test; +import org.springframework.data.mongodb.util.AutoCloseableIterator; import org.springframework.test.context.ContextConfiguration; /** @@ -24,4 +29,25 @@ * @author Thomas Darimont */ @ContextConfiguration -public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests {} +public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests { + + @Test + public void findsAllMusiciansWithCursor() throws Exception { + + repository.save(new Person("foo", "bar")); + repository.save(new Person("bar", "bar")); + repository.save(new Person("fuu", "bar")); + repository.save(new Person("notfound", "bar")); + + List firstNames = Arrays.asList("bar", "foo", "fuu"); + AutoCloseableIterator result = repository.findByCustomQueryWithCursorByFirstnames(firstNames); + + try { + for (Person person : result) { + System.out.printf("%s%n", person); + } + } finally { + result.close(); + } + } +} From bcf8244bb91e65b6d9259b4124e778c59083a173 Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Tue, 24 Feb 2015 16:56:18 +0100 Subject: [PATCH 3/8] DATAMONGO-1165 - Add support for Streaming large result lists. WIP --- pom.xml | 2 +- .../data/mongodb/core/MongoOperations.java | 9 +- .../data/mongodb/core/MongoTemplate.java | 88 ++---------------- .../mongodb/core/convert/QueryMapper.java | 40 +++++++- .../data/mongodb/gridfs/GridFsTemplate.java | 3 +- .../repository/query/AbstractMongoQuery.java | 39 +++++--- .../ForwardingAutoCloseableIterator.java | 57 ------------ .../repository/query/MongoQueryMethod.java | 2 +- .../support/SpringDataMongodbSerializer.java | 2 +- .../mongodb/util/AutoCloseableIterator.java | 22 ----- .../util/CloseableIterableCusorAdapter.java | 92 +++++++++++++++++++ .../data/mongodb/util/CloseableIterator.java | 27 ------ 12 files changed, 176 insertions(+), 207 deletions(-) delete mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java delete mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java delete mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java diff --git a/pom.xml b/pom.xml index c17209d782..9e914bfb88 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ multi spring-data-mongodb - 1.10.0.BUILD-SNAPSHOT + 1.10.0.DATACMNS-650-SNAPSHOT 2.13.0 2.13.0 diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 03719dfdd6..aa416884de 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -24,6 +24,7 @@ import org.springframework.data.mongodb.core.aggregation.AggregationResults; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.mapreduce.GroupBy; import org.springframework.data.mongodb.core.mapreduce.GroupByResults; import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; @@ -33,7 +34,6 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; -import org.springframework.data.mongodb.util.CloseableIterator; import com.mongodb.CommandResult; import com.mongodb.DBCollection; @@ -961,5 +961,10 @@ T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl */ MongoConverter getConverter(); - CloseableIterator getStreamingMappingCursor(Query query, Class entityType); + /** + * Returns the underlying {@link QueryMapper}. + * + * @return + */ + QueryMapper getQueryMapper(); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index bc80fb8755..e62902b85e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -95,7 +95,6 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; -import org.springframework.data.mongodb.util.CloseableIterator; import org.springframework.jca.cci.core.ConnectionCallback; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -104,7 +103,6 @@ import com.mongodb.BasicDBObject; import com.mongodb.CommandResult; -import com.mongodb.Cursor; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; @@ -314,6 +312,14 @@ public MongoConverter getConverter() { return this.mongoConverter; } + /* (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoOperations#getQueryMapper() + */ + @Override + public QueryMapper getQueryMapper() { + return queryMapper; + } + public String getCollectionName(Class entityClass) { return this.determineCollectionName(entityClass); } @@ -2263,82 +2269,4 @@ public GeoResult doWith(DBObject object) { return new GeoResult(doWith, new Distance(distance, metric)); } } - - @Override - public CloseableIterator getStreamingMappingCursor(Query query, Class entityClass) { - - MongoPersistentEntity entity = mappingContext.getPersistentEntity(entityClass); - - DBCollection collection = getCollection(getCollectionName(entityClass)); - - DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), entity); - DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entity); - - return new CloseableIterableCusorAdapter(collection.find(mappedQuery, mappedFields), mongoConverter, entityClass); - } - - static class CloseableIterableCusorAdapter implements CloseableIterator { - - private volatile Cursor cursor; - private MongoConverter converter; - private Class entityClass; - - public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, Class entityClass) { - - this.cursor = cursor; - this.converter = converter; - this.entityClass = entityClass; - } - - @Override - public boolean hasNext() { - - if (cursor == null) { - return false; - } - - try { - return cursor.hasNext(); - } catch (Exception ex) { - throw new RuntimeException("error on hasNext"); - } - } - - @Override - public Object next() { - - if (cursor == null) { - return null; - } - - try { - - DBObject item = cursor.next(); - Object converted = converter.read(entityClass, item); - - return converted; - } catch (Exception ex) { - throw new RuntimeException("error on next"); - } - } - - @Override - public void close() throws IOException { - - Cursor c = cursor; - try { - c.close(); - } finally { - - cursor = null; - converter = null; - entityClass = null; - } - } - - @Override - public Iterator iterator() { - return this; - } - } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java index 661f6940bf..23ac9d5aa6 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2014 the original author or authors. + * Copyright 2011-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,6 +81,23 @@ public QueryMapper(MongoConverter converter) { this.mappingContext = converter.getMappingContext(); } + /** + * Replaces the property keys used in the given {@link DBObject} with the appropriate keys by using the + * {@link PersistentEntity} metadata linked with the given {@code entityClass}. + * + * @see #getMappedObject(DBObject, MongoPersistentEntity) + * @param query must not be {@literal null}. + * @param entityClass must not be {@literal null}. + * @return + * @since 1.7 + */ + public DBObject getMappedObject(DBObject query, Class entityClass) { + + Assert.notNull(entityClass, "Entity class must not be null!"); + + return getMappedObject(query, mappingContext.getPersistentEntity(entityClass)); + } + /** * Replaces the property keys used in the given {@link DBObject} with the appropriate keys by using the * {@link PersistentEntity} metadata. @@ -144,9 +161,26 @@ public DBObject getMappedSort(DBObject sortObject, MongoPersistentEntity enti return mappedSort; } + /** + * Maps fields to retrieve to the {@link MongoPersistentEntity}s properties linked to the given {@code entityClass}. + *

+ * + * @see QueryMapper#getMappedFields(DBObject, MongoPersistentEntity) + * @param fieldsObject + * @param entityClass must not be {@literal null} + * @return + * @since 1.7 + */ + public DBObject getMappedFields(DBObject fieldsObject, Class entityClass) { + + Assert.notNull(entityClass, "Entity class must not be null!"); + + return getMappedFields(fieldsObject, mappingContext.getPersistentEntity(entityClass)); + } + /** * Maps fields to retrieve to the {@link MongoPersistentEntity}s properties.
- * Also onverts and potentially adds missing property {@code $meta} representation. + * Also converts and potentially adds missing property {@code $meta} representation. * * @param fieldsObject * @param entity @@ -283,7 +317,7 @@ protected Object getMappedValue(Field documentField, Object value) { } else if (valueDbo.containsField("$ne")) { resultDbo.put("$ne", convertId(valueDbo.get("$ne"))); } else { - return getMappedObject(resultDbo, null); + return getMappedObject(resultDbo, (MongoPersistentEntity) null); } return resultDbo; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java index 20781134ec..d36bee6feb 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java @@ -26,6 +26,7 @@ import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.query.Query; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -257,7 +258,7 @@ private DBObject getMappedQuery(Query query) { } private DBObject getMappedQuery(DBObject query) { - return query == null ? null : queryMapper.getMappedObject(query, null); + return query == null ? null : queryMapper.getMappedObject(query, (MongoPersistentEntity) null); } private GridFS getGridFs() { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java index cde8b5e13d..5044aee94b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java @@ -18,6 +18,7 @@ import java.util.Collections; import java.util.List; +import org.springframework.dao.DataAccessException; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; @@ -27,16 +28,21 @@ import org.springframework.data.geo.GeoResult; import org.springframework.data.geo.GeoResults; import org.springframework.data.geo.Point; +import org.springframework.data.mongodb.core.CollectionCallback; import org.springframework.data.mongodb.core.MongoOperations; +import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.util.CloseableIterator; +import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter; import org.springframework.data.repository.query.ParameterAccessor; import org.springframework.data.repository.query.RepositoryQuery; +import org.springframework.data.util.CloseableIterator; import org.springframework.data.util.TypeInformation; import org.springframework.util.Assert; -import org.springframework.util.ClassUtils; +import com.mongodb.DBCollection; +import com.mongodb.DBObject; +import com.mongodb.MongoException; import com.mongodb.WriteResult; /** @@ -420,19 +426,28 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata metada final class StreamExecution extends Execution { - private final boolean IS_JAVA_7 = ClassUtils.isPresent("java.lang.AutoCloseable", - StreamExecution.class.getClassLoader()); - @Override - Object execute(Query query) { + Object execute(final Query query) { - Class entityType = getQueryMethod().getEntityInformation().getJavaType(); - CloseableIterator result = operations.getStreamingMappingCursor(query, entityType); - return IS_JAVA_7 ? wrapInAutoCloseableIterator(result) : result; - } + final Class entityType = getQueryMethod().getEntityInformation().getJavaType(); + final QueryMapper queryMapper = operations.getQueryMapper(); + + @SuppressWarnings("unchecked") + CloseableIterator result = (CloseableIterator) operations.execute(entityType, + new CollectionCallback() { + + @Override + public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException { + + DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), entityType); + DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entityType); + + return new CloseableIterableCusorAdapter(collection.find(mappedQuery, mappedFields), operations + .getConverter(), entityType); + } + }); - private CloseableIterator wrapInAutoCloseableIterator(CloseableIterator iter) { - return new ForwardingAutoCloseableIterator(iter); + return result; } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java deleted file mode 100644 index 836894032a..0000000000 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/ForwardingAutoCloseableIterator.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.mongodb.repository.query; - -import java.io.IOException; -import java.util.Iterator; - -import org.springframework.data.mongodb.util.AutoCloseableIterator; -import org.springframework.data.mongodb.util.CloseableIterator; - -/** - * An {@link CloseableIterator} that can be used with TRW in Java 7. - * - * @author Thomas Darimont - * @param - */ -class ForwardingAutoCloseableIterator implements AutoCloseableIterator { - - private final CloseableIterator target; - - public ForwardingAutoCloseableIterator(CloseableIterator target) { - this.target = target; - } - - @Override - public boolean hasNext() { - return target.hasNext(); - } - - @Override - public T next() { - return target.next(); - } - - @Override - public void close() throws IOException { - this.target.close(); - } - - @Override - public Iterator iterator() { - return this; - } -} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java index b4a59c0570..15d99591e6 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java @@ -29,10 +29,10 @@ import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; import org.springframework.data.mongodb.repository.Meta; import org.springframework.data.mongodb.repository.Query; -import org.springframework.data.mongodb.util.CloseableIterator; import org.springframework.data.repository.core.RepositoryMetadata; import org.springframework.data.repository.query.QueryMethod; import org.springframework.data.util.ClassTypeInformation; +import org.springframework.data.util.CloseableIterator; import org.springframework.data.util.TypeInformation; import org.springframework.util.Assert; import org.springframework.util.StringUtils; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java index 2808028894..7f48b5e51e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java @@ -100,7 +100,7 @@ protected String getKeyForPath(Path expr, PathMetadata metadata) { protected DBObject asDBObject(String key, Object value) { if (ID_KEY.equals(key)) { - return mapper.getMappedObject(super.asDBObject(key, value), null); + return mapper.getMappedObject(super.asDBObject(key, value), (MongoPersistentEntity) null); } return super.asDBObject(key, value instanceof Pattern ? value : converter.convertToMongoType(value)); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java deleted file mode 100644 index 34dfac718a..0000000000 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/AutoCloseableIterator.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.mongodb.util; - -/** - * @author Thomas Darimont - * @param - */ -public interface AutoCloseableIterator extends CloseableIterator, AutoCloseable {} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java new file mode 100644 index 0000000000..6a33546443 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java @@ -0,0 +1,92 @@ +/* + * Copyright 2015 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.util; + +import java.io.IOException; +import java.util.Iterator; + +import org.springframework.data.mongodb.core.convert.MongoConverter; + +import com.mongodb.Cursor; +import com.mongodb.DBObject; + +/** + * @author Thomas Darimont + */ +public class CloseableIterableCusorAdapter implements CloseableIterator { + + private volatile Cursor cursor; + private MongoConverter converter; + private Class entityClass; + + public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, Class entityClass) { + + this.cursor = cursor; + this.converter = converter; + this.entityClass = entityClass; + } + + @Override + public boolean hasNext() { + + if (cursor == null) { + return false; + } + + try { + return cursor.hasNext(); + } catch (Exception ex) { + throw new RuntimeException("error on hasNext"); + } + } + + @Override + public T next() { + + if (cursor == null) { + return null; + } + + try { + + DBObject item = cursor.next(); + Object converted = converter.read(entityClass, item); + + return (T) converted; + } catch (Exception ex) { + throw new RuntimeException("error on next"); + } + } + + @Override + public void close() throws IOException { + + Cursor c = cursor; + try { + c.close(); + } finally { + + cursor = null; + converter = null; + entityClass = null; + } + } + + @Override + public Iterator iterator() { + return this; + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java deleted file mode 100644 index 9885deb806..0000000000 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterator.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.mongodb.util; - -import java.io.Closeable; -import java.util.Iterator; - -/** - * @author Thomas Darimont - * @param - */ -public interface CloseableIterator extends Iterator, Closeable, Iterable { - -} From 214420986f6ee768fda0ef7ca608d4051b06611c Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Tue, 24 Feb 2015 22:07:08 +0100 Subject: [PATCH 4/8] DATAMONGO-1165 - Add support for Streaming large result lists. Initial support for CloteableIterable and Java 8 Stream as return type of custom Repository finder methods. --- .../data/mongodb/core/MongoOperations.java | 11 +++ .../data/mongodb/core/MongoTemplate.java | 8 +++ .../mongodb/core/convert/QueryMapper.java | 40 +---------- .../data/mongodb/gridfs/GridFsTemplate.java | 3 +- .../repository/query/AbstractMongoQuery.java | 68 +++++++++++++------ .../repository/query/MongoQueryMethod.java | 7 -- .../support/SpringDataMongodbSerializer.java | 2 +- .../util/CloseableIterableCusorAdapter.java | 4 +- ...tractPersonRepositoryIntegrationTests.java | 4 +- .../mongodb/repository/PersonRepository.java | 8 ++- .../PersonRepositoryIntegrationTests.java | 42 +++++++++--- 11 files changed, 115 insertions(+), 82 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index aa416884de..6843d36497 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -20,11 +20,14 @@ import java.util.Set; import org.springframework.data.geo.GeoResults; +import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationResults; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; +import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; import org.springframework.data.mongodb.core.mapreduce.GroupBy; import org.springframework.data.mongodb.core.mapreduce.GroupByResults; import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; @@ -967,4 +970,12 @@ T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl * @return */ QueryMapper getQueryMapper(); + + /** + * Returns the underlying {@link MappingContext}. + * + * @return + * @since 1.7 + */ + MappingContext, MongoPersistentProperty> getMappingContext(); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index e62902b85e..fc8d6d1b50 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -320,6 +320,14 @@ public QueryMapper getQueryMapper() { return queryMapper; } + /* (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoOperations#getMappingContext() + */ + @Override + public MappingContext, MongoPersistentProperty> getMappingContext() { + return mappingContext; + } + public String getCollectionName(Class entityClass) { return this.determineCollectionName(entityClass); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java index 23ac9d5aa6..661f6940bf 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/convert/QueryMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2015 the original author or authors. + * Copyright 2011-2014 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -81,23 +81,6 @@ public QueryMapper(MongoConverter converter) { this.mappingContext = converter.getMappingContext(); } - /** - * Replaces the property keys used in the given {@link DBObject} with the appropriate keys by using the - * {@link PersistentEntity} metadata linked with the given {@code entityClass}. - * - * @see #getMappedObject(DBObject, MongoPersistentEntity) - * @param query must not be {@literal null}. - * @param entityClass must not be {@literal null}. - * @return - * @since 1.7 - */ - public DBObject getMappedObject(DBObject query, Class entityClass) { - - Assert.notNull(entityClass, "Entity class must not be null!"); - - return getMappedObject(query, mappingContext.getPersistentEntity(entityClass)); - } - /** * Replaces the property keys used in the given {@link DBObject} with the appropriate keys by using the * {@link PersistentEntity} metadata. @@ -161,26 +144,9 @@ public DBObject getMappedSort(DBObject sortObject, MongoPersistentEntity enti return mappedSort; } - /** - * Maps fields to retrieve to the {@link MongoPersistentEntity}s properties linked to the given {@code entityClass}. - *

- * - * @see QueryMapper#getMappedFields(DBObject, MongoPersistentEntity) - * @param fieldsObject - * @param entityClass must not be {@literal null} - * @return - * @since 1.7 - */ - public DBObject getMappedFields(DBObject fieldsObject, Class entityClass) { - - Assert.notNull(entityClass, "Entity class must not be null!"); - - return getMappedFields(fieldsObject, mappingContext.getPersistentEntity(entityClass)); - } - /** * Maps fields to retrieve to the {@link MongoPersistentEntity}s properties.
- * Also converts and potentially adds missing property {@code $meta} representation. + * Also onverts and potentially adds missing property {@code $meta} representation. * * @param fieldsObject * @param entity @@ -317,7 +283,7 @@ protected Object getMappedValue(Field documentField, Object value) { } else if (valueDbo.containsField("$ne")) { resultDbo.put("$ne", convertId(valueDbo.get("$ne"))); } else { - return getMappedObject(resultDbo, (MongoPersistentEntity) null); + return getMappedObject(resultDbo, null); } return resultDbo; diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java index d36bee6feb..20781134ec 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java @@ -26,7 +26,6 @@ import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.convert.QueryMapper; -import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.query.Query; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -258,7 +257,7 @@ private DBObject getMappedQuery(Query query) { } private DBObject getMappedQuery(DBObject query) { - return query == null ? null : queryMapper.getMappedObject(query, (MongoPersistentEntity) null); + return query == null ? null : queryMapper.getMappedObject(query, null); } private GridFS getGridFs() { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java index 5044aee94b..35f8f7481e 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java @@ -17,6 +17,9 @@ import java.util.Collections; import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.StreamSupport; import org.springframework.dao.DataAccessException; import org.springframework.data.domain.PageImpl; @@ -31,16 +34,19 @@ import org.springframework.data.mongodb.core.CollectionCallback; import org.springframework.data.mongodb.core.MongoOperations; import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter; import org.springframework.data.repository.query.ParameterAccessor; import org.springframework.data.repository.query.RepositoryQuery; import org.springframework.data.util.CloseableIterator; +import org.springframework.data.util.CloseableIteratorDisposingRunnable; import org.springframework.data.util.TypeInformation; import org.springframework.util.Assert; import com.mongodb.DBCollection; +import com.mongodb.DBCursor; import com.mongodb.DBObject; import com.mongodb.MongoException; import com.mongodb.WriteResult; @@ -91,11 +97,11 @@ public Object execute(Object[] parameters) { applyQueryMetaAttributesWhenPresent(query); - if (method.isStreamQuery()) { + if (method.isCloseableIteratorQuery()) { + return new CursorBackedExecution().execute(query); + } else if (method.isStreamQuery()) { return new StreamExecution().execute(query); - } - - if (isDeleteQuery()) { + } else if (isDeleteQuery()) { return new DeleteExecution().execute(query); } else if (method.isGeoNearQuery() && method.isPageQuery()) { @@ -424,30 +430,54 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata metada } } - final class StreamExecution extends Execution { + /** + * @author Thomas Darimont + */ + private class CursorBackedExecution extends Execution { @Override Object execute(final Query query) { - final Class entityType = getQueryMethod().getEntityInformation().getJavaType(); - final QueryMapper queryMapper = operations.getQueryMapper(); + Class javaType = getQueryMethod().getEntityInformation().getJavaType(); + QueryMapper queryMapper = operations.getQueryMapper(); + + return createCursorAndWrapInCloseableIterator(query, javaType, queryMapper); + } + + @SuppressWarnings("unchecked") + private CloseableIterator createCursorAndWrapInCloseableIterator(final Query query, + final Class entityType, final QueryMapper queryMapper) { + + final MongoPersistentEntity persistentEntity = operations.getMappingContext().getPersistentEntity(entityType); + + return (CloseableIterator) operations.execute(entityType, new CollectionCallback() { - @SuppressWarnings("unchecked") - CloseableIterator result = (CloseableIterator) operations.execute(entityType, - new CollectionCallback() { + @Override + public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException { - @Override - public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException { + DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity); + DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity); - DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), entityType); - DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), entityType); + DBCursor cursor = collection.find(mappedQuery, mappedFields); + + return new CloseableIterableCusorAdapter(cursor, operations.getConverter(), entityType); + } + }); + } + } + + /** + * @author Thomas Darimont + */ + final class StreamExecution extends CursorBackedExecution { + + @Override + Object execute(Query query) { - return new CloseableIterableCusorAdapter(collection.find(mappedQuery, mappedFields), operations - .getConverter(), entityType); - } - }); + CloseableIterator result = (CloseableIterator) super.execute(query); + Spliterator spliterator = Spliterators.spliteratorUnknownSize(result, Spliterator.NONNULL); - return result; + return StreamSupport.stream(spliterator, false).onClose(new CloseableIteratorDisposingRunnable(result)); } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java index 15d99591e6..50960e8a88 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/MongoQueryMethod.java @@ -32,7 +32,6 @@ import org.springframework.data.repository.core.RepositoryMetadata; import org.springframework.data.repository.query.QueryMethod; import org.springframework.data.util.ClassTypeInformation; -import org.springframework.data.util.CloseableIterator; import org.springframework.data.util.TypeInformation; import org.springframework.util.Assert; import org.springframework.util.StringUtils; @@ -234,10 +233,4 @@ public org.springframework.data.mongodb.core.query.Meta getQueryMetaAttributes() return metaAttributes; } - - public boolean isStreamQuery() { - - Class returnType = method.getReturnType(); - return org.springframework.util.ClassUtils.isAssignable(CloseableIterator.class, returnType); - } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java index 7f48b5e51e..2808028894 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SpringDataMongodbSerializer.java @@ -100,7 +100,7 @@ protected String getKeyForPath(Path expr, PathMetadata metadata) { protected DBObject asDBObject(String key, Object value) { if (ID_KEY.equals(key)) { - return mapper.getMappedObject(super.asDBObject(key, value), (MongoPersistentEntity) null); + return mapper.getMappedObject(super.asDBObject(key, value), null); } return super.asDBObject(key, value instanceof Pattern ? value : converter.convertToMongoType(value)); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java index 6a33546443..47410e4c2d 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java @@ -15,10 +15,10 @@ */ package org.springframework.data.mongodb.util; -import java.io.IOException; import java.util.Iterator; import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.util.CloseableIterator; import com.mongodb.Cursor; import com.mongodb.DBObject; @@ -72,7 +72,7 @@ public T next() { } @Override - public void close() throws IOException { + public void close() { Cursor c = cursor; try { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java index 5423ad68ca..3f2c077237 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/AbstractPersonRepositoryIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2011-2014 the original author or authors. + * Copyright 2011-2015 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -65,7 +65,7 @@ public abstract class AbstractPersonRepositoryIntegrationTests { @Autowired MongoOperations operations; - Person dave, oliver, carter, boyd, stefan, leroi, alicia; + protected Person dave, oliver, carter, boyd, stefan, leroi, alicia; QPerson person; List all; diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java index 880222693e..3a66f897aa 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java @@ -18,6 +18,7 @@ import java.util.Collection; import java.util.Date; import java.util.List; +import java.util.stream.Stream; import org.springframework.data.domain.Page; import org.springframework.data.domain.Pageable; @@ -31,8 +32,8 @@ import org.springframework.data.geo.Point; import org.springframework.data.geo.Polygon; import org.springframework.data.mongodb.repository.Person.Sex; -import org.springframework.data.mongodb.util.AutoCloseableIterator; import org.springframework.data.querydsl.QueryDslPredicateExecutor; +import org.springframework.data.util.CloseableIterator; /** * Sample repository managing {@link Person} entities. @@ -323,5 +324,8 @@ public interface PersonRepository extends MongoRepository, Query List findByKeyValue(String key, String value); @Query("{firstname:{$in:?0}}") - AutoCloseableIterator findByCustomQueryWithCursorByFirstnames(List firstnames); + CloseableIterator findByCustomQueryWithCursorByFirstnames(List firstnames); + + @Query("{firstname:{$in:?0}}") + Stream findByCustomQueryWithStreamingCursorByFirstnames(List firstnames); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java index dd14e0ccfa..9035822ed0 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java @@ -15,11 +15,17 @@ */ package org.springframework.data.mongodb.repository; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + +import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.junit.Test; -import org.springframework.data.mongodb.util.AutoCloseableIterator; +import org.springframework.data.util.CloseableIterator; import org.springframework.test.context.ContextConfiguration; /** @@ -31,23 +37,39 @@ @ContextConfiguration public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests { + /** + * @see DATAMONGO-1165 + */ @Test - public void findsAllMusiciansWithCursor() throws Exception { - - repository.save(new Person("foo", "bar")); - repository.save(new Person("bar", "bar")); - repository.save(new Person("fuu", "bar")); - repository.save(new Person("notfound", "bar")); + public void shouldAllowReturningCloseableIteratorInCustomQuery() throws Exception { - List firstNames = Arrays.asList("bar", "foo", "fuu"); - AutoCloseableIterator result = repository.findByCustomQueryWithCursorByFirstnames(firstNames); + CloseableIterator result = repository.findByCustomQueryWithCursorByFirstnames(Arrays.asList("Dave")); + List readPersons = new ArrayList(); try { for (Person person : result) { - System.out.printf("%s%n", person); + readPersons.add(person); } } finally { result.close(); } + + assertThat(readPersons, hasItems(dave)); + } + + /** + * @see DATAMONGO-1165 + */ + @Test + public void shouldAllowReturningJava8StreamInCustomQuery() throws Exception { + + Stream result = repository.findByCustomQueryWithStreamingCursorByFirstnames(Arrays.asList("Dave")); + + try { + List readPersons = result.collect(Collectors. toList()); + assertThat(readPersons, hasItems(dave)); + } finally { + result.close(); + } } } From 6c00b1cf211163f2110351c9d61f6ec6c5aa7766 Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Wed, 25 Feb 2015 17:39:43 +0100 Subject: [PATCH 5/8] DATAMONGO-1165 - Add support for Streaming large result lists. Added ExceptionTranslation to CloseableIterableCursorAdapter. --- .../data/mongodb/core/MongoOperations.java | 7 +++++++ .../data/mongodb/core/MongoTemplate.java | 8 ++++++++ .../repository/query/AbstractMongoQuery.java | 3 ++- .../util/CloseableIterableCusorAdapter.java | 19 ++++++++++++++----- 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 6843d36497..663eed9644 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Set; +import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.geo.GeoResults; import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.mongodb.core.aggregation.Aggregation; @@ -978,4 +979,10 @@ T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl * @since 1.7 */ MappingContext, MongoPersistentProperty> getMappingContext(); + + /** + * @return + * @since 1.7 + */ + PersistenceExceptionTranslator getExceptionTranslator(); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index fc8d6d1b50..8e108764cc 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -328,6 +328,14 @@ public MappingContext, MongoPersistentPropert return mappingContext; } + /* (non-Javadoc) + * @see org.springframework.data.mongodb.core.MongoOperations#getExceptionTranslator() + */ + @Override + public PersistenceExceptionTranslator getExceptionTranslator() { + return exceptionTranslator; + } + public String getCollectionName(Class entityClass) { return this.determineCollectionName(entityClass); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java index 35f8f7481e..2c190089eb 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java @@ -460,7 +460,8 @@ public Object doInCollection(DBCollection collection) throws MongoException, Dat DBCursor cursor = collection.find(mappedQuery, mappedFields); - return new CloseableIterableCusorAdapter(cursor, operations.getConverter(), entityType); + return new CloseableIterableCusorAdapter(cursor, operations.getConverter(), operations + .getExceptionTranslator(), entityType); } }); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java index 47410e4c2d..61557109d9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java @@ -17,6 +17,7 @@ import java.util.Iterator; +import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.util.CloseableIterator; @@ -31,12 +32,15 @@ public class CloseableIterableCusorAdapter implements CloseableIterator { private volatile Cursor cursor; private MongoConverter converter; private Class entityClass; + private PersistenceExceptionTranslator exceptionTranslator; - public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, Class entityClass) { + public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, + PersistenceExceptionTranslator exceptionTranslator, Class entityClass) { this.cursor = cursor; this.converter = converter; this.entityClass = entityClass; + this.exceptionTranslator = exceptionTranslator; } @Override @@ -48,8 +52,8 @@ public boolean hasNext() { try { return cursor.hasNext(); - } catch (Exception ex) { - throw new RuntimeException("error on hasNext"); + } catch (RuntimeException ex) { + throw exceptionTranslator.translateExceptionIfPossible(ex); } } @@ -66,8 +70,8 @@ public T next() { Object converted = converter.read(entityClass, item); return (T) converted; - } catch (Exception ex) { - throw new RuntimeException("error on next"); + } catch (RuntimeException ex) { + throw exceptionTranslator.translateExceptionIfPossible(ex); } } @@ -76,12 +80,17 @@ public void close() { Cursor c = cursor; try { + c.close(); + } catch (RuntimeException ex) { + + throw exceptionTranslator.translateExceptionIfPossible(ex); } finally { cursor = null; converter = null; entityClass = null; + exceptionTranslator = null; } } From 12c2ddff266d6889ad38c4922dec01fb9ebb836d Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Wed, 25 Feb 2015 18:04:12 +0100 Subject: [PATCH 6/8] DATAMONGO-1165 - Add support for Streaming large result lists. Introduced new executeAsStream(..) method to MongoOperations in the context of pushing cursor creation and wrapping in CloseableIterator down to MongoTemplate. --- .../data/mongodb/core/MongoOperations.java | 42 +++++++------------ .../data/mongodb/core/MongoTemplate.java | 36 ++++++++-------- .../repository/query/AbstractMongoQuery.java | 37 +--------------- 3 files changed, 36 insertions(+), 79 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 663eed9644..5ca9294b56 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -19,16 +19,11 @@ import java.util.List; import java.util.Set; -import org.springframework.dao.support.PersistenceExceptionTranslator; import org.springframework.data.geo.GeoResults; -import org.springframework.data.mapping.context.MappingContext; import org.springframework.data.mongodb.core.aggregation.Aggregation; import org.springframework.data.mongodb.core.aggregation.AggregationResults; import org.springframework.data.mongodb.core.aggregation.TypedAggregation; import org.springframework.data.mongodb.core.convert.MongoConverter; -import org.springframework.data.mongodb.core.convert.QueryMapper; -import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; -import org.springframework.data.mongodb.core.mapping.MongoPersistentProperty; import org.springframework.data.mongodb.core.mapreduce.GroupBy; import org.springframework.data.mongodb.core.mapreduce.GroupByResults; import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; @@ -38,8 +33,10 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.util.CloseableIterator; import com.mongodb.CommandResult; +import com.mongodb.Cursor; import com.mongodb.DBCollection; import com.mongodb.DBObject; import com.mongodb.WriteResult; @@ -151,6 +148,20 @@ public interface MongoOperations { */ T executeInSession(DbCallback action); + /** + * Executes the given {@link Query} on the entity collection of the specified {@code entityType} backed by a Mongo DB + * {@link Cursor}. + *

+ * Returns a {@link CloseableIterator} that wraps the a Mongo DB {@link Cursor} that needs to be closed. + * + * @param element return type + * @param query + * @param entity + * @return + * @since 1.7 + */ + CloseableIterator executeAsStream(Query query, Class entityType); + /** * Create an uncapped collection with a name based on the provided entity class. * @@ -964,25 +975,4 @@ T findAndModify(Query query, Update update, FindAndModifyOptions options, Cl * @return */ MongoConverter getConverter(); - - /** - * Returns the underlying {@link QueryMapper}. - * - * @return - */ - QueryMapper getQueryMapper(); - - /** - * Returns the underlying {@link MappingContext}. - * - * @return - * @since 1.7 - */ - MappingContext, MongoPersistentProperty> getMappingContext(); - - /** - * @return - * @since 1.7 - */ - PersistenceExceptionTranslator getExceptionTranslator(); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index 8e108764cc..172aab57db 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -95,6 +95,8 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; +import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter; +import org.springframework.data.util.CloseableIterator; import org.springframework.jca.cci.core.ConnectionCallback; import org.springframework.util.Assert; import org.springframework.util.CollectionUtils; @@ -313,27 +315,27 @@ public MongoConverter getConverter() { } /* (non-Javadoc) - * @see org.springframework.data.mongodb.core.MongoOperations#getQueryMapper() + * @see org.springframework.data.mongodb.core.MongoOperations#executeAsStream(org.springframework.data.mongodb.core.query.Query, java.lang.Class) */ @Override - public QueryMapper getQueryMapper() { - return queryMapper; - } + public CloseableIterator executeAsStream(final Query query, final Class entityType) { - /* (non-Javadoc) - * @see org.springframework.data.mongodb.core.MongoOperations#getMappingContext() - */ - @Override - public MappingContext, MongoPersistentProperty> getMappingContext() { - return mappingContext; - } + return execute(entityType, new CollectionCallback>() { + + @Override + public CloseableIterator doInCollection(DBCollection collection) throws MongoException, DataAccessException { + + MongoPersistentEntity persistentEntity = mappingContext.getPersistentEntity(entityType); + + DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity); + DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity); + + DBCursor cursor = collection.find(mappedQuery, mappedFields); + + return new CloseableIterableCusorAdapter(cursor, mongoConverter, exceptionTranslator, entityType); + } + }); - /* (non-Javadoc) - * @see org.springframework.data.mongodb.core.MongoOperations#getExceptionTranslator() - */ - @Override - public PersistenceExceptionTranslator getExceptionTranslator() { - return exceptionTranslator; } public String getCollectionName(Class entityClass) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java index 2c190089eb..912c3054ac 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java @@ -21,7 +21,6 @@ import java.util.Spliterators; import java.util.stream.StreamSupport; -import org.springframework.dao.DataAccessException; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; import org.springframework.data.domain.Slice; @@ -31,13 +30,9 @@ import org.springframework.data.geo.GeoResult; import org.springframework.data.geo.GeoResults; import org.springframework.data.geo.Point; -import org.springframework.data.mongodb.core.CollectionCallback; import org.springframework.data.mongodb.core.MongoOperations; -import org.springframework.data.mongodb.core.convert.QueryMapper; -import org.springframework.data.mongodb.core.mapping.MongoPersistentEntity; import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; -import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter; import org.springframework.data.repository.query.ParameterAccessor; import org.springframework.data.repository.query.RepositoryQuery; import org.springframework.data.util.CloseableIterator; @@ -45,10 +40,6 @@ import org.springframework.data.util.TypeInformation; import org.springframework.util.Assert; -import com.mongodb.DBCollection; -import com.mongodb.DBCursor; -import com.mongodb.DBObject; -import com.mongodb.MongoException; import com.mongodb.WriteResult; /** @@ -437,33 +428,7 @@ private class CursorBackedExecution extends Execution { @Override Object execute(final Query query) { - - Class javaType = getQueryMethod().getEntityInformation().getJavaType(); - QueryMapper queryMapper = operations.getQueryMapper(); - - return createCursorAndWrapInCloseableIterator(query, javaType, queryMapper); - } - - @SuppressWarnings("unchecked") - private CloseableIterator createCursorAndWrapInCloseableIterator(final Query query, - final Class entityType, final QueryMapper queryMapper) { - - final MongoPersistentEntity persistentEntity = operations.getMappingContext().getPersistentEntity(entityType); - - return (CloseableIterator) operations.execute(entityType, new CollectionCallback() { - - @Override - public Object doInCollection(DBCollection collection) throws MongoException, DataAccessException { - - DBObject mappedFields = queryMapper.getMappedFields(query.getFieldsObject(), persistentEntity); - DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity); - - DBCursor cursor = collection.find(mappedQuery, mappedFields); - - return new CloseableIterableCusorAdapter(cursor, operations.getConverter(), operations - .getExceptionTranslator(), entityType); - } - }); + return operations.executeAsStream(query, getQueryMethod().getEntityInformation().getJavaType()); } } From 2dee07ce53e54dad5cca53eecb021810d60c7aca Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Mon, 2 Mar 2015 17:17:14 +0100 Subject: [PATCH 7/8] DATAMONGO-1165 - Add support for Streaming large result lists. Added support for translation of MongoDB Exceptions as well as potential event propagation via ReadDbObjectCallback in MongoTemplate.executeAsStream(..). --- .../data/mongodb/core/MongoOperations.java | 4 +- .../data/mongodb/core/MongoTemplate.java | 83 +++++++++++++- .../repository/query/AbstractMongoQuery.java | 26 ++--- .../util/CloseableIterableCusorAdapter.java | 101 ------------------ .../mongodb/repository/PersonRepository.java | 7 +- .../PersonRepositoryIntegrationTests.java | 22 ---- 6 files changed, 94 insertions(+), 149 deletions(-) delete mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java index 5ca9294b56..b46d3d6e8b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoOperations.java @@ -156,11 +156,11 @@ public interface MongoOperations { * * @param element return type * @param query - * @param entity + * @param entityType * @return * @since 1.7 */ - CloseableIterator executeAsStream(Query query, Class entityType); + CloseableIterator executeAsStream(Query query, Class entityType); /** * Create an uncapped collection with a name based on the provided entity class. diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java index 172aab57db..3a51e66f28 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java @@ -95,7 +95,6 @@ import org.springframework.data.mongodb.core.query.NearQuery; import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.mongodb.core.query.Update; -import org.springframework.data.mongodb.util.CloseableIterableCusorAdapter; import org.springframework.data.util.CloseableIterator; import org.springframework.jca.cci.core.ConnectionCallback; import org.springframework.util.Assert; @@ -105,6 +104,7 @@ import com.mongodb.BasicDBObject; import com.mongodb.CommandResult; +import com.mongodb.Cursor; import com.mongodb.DB; import com.mongodb.DBCollection; import com.mongodb.DBCursor; @@ -318,7 +318,7 @@ public MongoConverter getConverter() { * @see org.springframework.data.mongodb.core.MongoOperations#executeAsStream(org.springframework.data.mongodb.core.query.Query, java.lang.Class) */ @Override - public CloseableIterator executeAsStream(final Query query, final Class entityType) { + public CloseableIterator executeAsStream(final Query query, final Class entityType) { return execute(entityType, new CollectionCallback>() { @@ -331,8 +331,9 @@ public CloseableIterator doInCollection(DBCollection collection) throws Mongo DBObject mappedQuery = queryMapper.getMappedObject(query.getQueryObject(), persistentEntity); DBCursor cursor = collection.find(mappedQuery, mappedFields); + ReadDbObjectCallback readCallback = new ReadDbObjectCallback(mongoConverter, entityType); - return new CloseableIterableCusorAdapter(cursor, mongoConverter, exceptionTranslator, entityType); + return new CloseableIterableCusorAdapter(cursor, exceptionTranslator, readCallback); } }); @@ -2123,9 +2124,10 @@ public DBObject doInCollection(DBCollection collection) throws MongoException, D * Simple internal callback to allow operations on a {@link DBObject}. * * @author Oliver Gierke + * @author Thomas Darimont */ - private interface DbObjectCallback { + static interface DbObjectCallback { T doWith(DBObject object); } @@ -2287,4 +2289,77 @@ public GeoResult doWith(DBObject object) { return new GeoResult(doWith, new Distance(distance, metric)); } } + + /** + * A {@link CloseableIterator} that is backed by a MongoDB {@link Cursor}. + * + * @since 1.7 + * @author Thomas Darimont + */ + static class CloseableIterableCusorAdapter implements CloseableIterator { + + private volatile Cursor cursor; + private PersistenceExceptionTranslator exceptionTranslator; + private DbObjectCallback objectReadCallback; + + /** + * Creates a new {@link CloseableIterableCusorAdapter} backed by the given {@link Cursor}. + * + * @param cursor + * @param exceptionTranslator + * @param objectReadCallback + */ + public CloseableIterableCusorAdapter(Cursor cursor, PersistenceExceptionTranslator exceptionTranslator, + DbObjectCallback objectReadCallback) { + + this.cursor = cursor; + this.exceptionTranslator = exceptionTranslator; + this.objectReadCallback = objectReadCallback; + } + + @Override + public boolean hasNext() { + + if (cursor == null) { + return false; + } + + try { + return cursor.hasNext(); + } catch (RuntimeException ex) { + throw exceptionTranslator.translateExceptionIfPossible(ex); + } + } + + @Override + public T next() { + + if (cursor == null) { + return null; + } + + try { + DBObject item = cursor.next(); + T converted = objectReadCallback.doWith(item); + return converted; + } catch (RuntimeException ex) { + throw exceptionTranslator.translateExceptionIfPossible(ex); + } + } + + @Override + public void close() { + + Cursor c = cursor; + try { + c.close(); + } catch (RuntimeException ex) { + throw exceptionTranslator.translateExceptionIfPossible(ex); + } finally { + cursor = null; + exceptionTranslator = null; + objectReadCallback = null; + } + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java index 912c3054ac..dce6f110df 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java @@ -88,9 +88,7 @@ public Object execute(Object[] parameters) { applyQueryMetaAttributesWhenPresent(query); - if (method.isCloseableIteratorQuery()) { - return new CursorBackedExecution().execute(query); - } else if (method.isStreamQuery()) { + if (method.isStreamQuery()) { return new StreamExecution().execute(query); } else if (isDeleteQuery()) { return new DeleteExecution().execute(query); @@ -423,24 +421,20 @@ private Object deleteAndConvertResult(Query query, MongoEntityMetadata metada /** * @author Thomas Darimont + * @since 1.7 */ - private class CursorBackedExecution extends Execution { - - @Override - Object execute(final Query query) { - return operations.executeAsStream(query, getQueryMethod().getEntityInformation().getJavaType()); - } - } - - /** - * @author Thomas Darimont - */ - final class StreamExecution extends CursorBackedExecution { + final class StreamExecution extends Execution { + /* (non-Javadoc) + * @see org.springframework.data.mongodb.repository.query.AbstractMongoQuery.Execution#execute(org.springframework.data.mongodb.core.query.Query) + */ @Override Object execute(Query query) { - CloseableIterator result = (CloseableIterator) super.execute(query); + Class entityType = getQueryMethod().getEntityInformation().getJavaType(); + + @SuppressWarnings("unchecked") + CloseableIterator result = (CloseableIterator) operations.executeAsStream(query, entityType); Spliterator spliterator = Spliterators.spliteratorUnknownSize(result, Spliterator.NONNULL); return StreamSupport.stream(spliterator, false).onClose(new CloseableIteratorDisposingRunnable(result)); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java deleted file mode 100644 index 61557109d9..0000000000 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/util/CloseableIterableCusorAdapter.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Copyright 2015 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.springframework.data.mongodb.util; - -import java.util.Iterator; - -import org.springframework.dao.support.PersistenceExceptionTranslator; -import org.springframework.data.mongodb.core.convert.MongoConverter; -import org.springframework.data.util.CloseableIterator; - -import com.mongodb.Cursor; -import com.mongodb.DBObject; - -/** - * @author Thomas Darimont - */ -public class CloseableIterableCusorAdapter implements CloseableIterator { - - private volatile Cursor cursor; - private MongoConverter converter; - private Class entityClass; - private PersistenceExceptionTranslator exceptionTranslator; - - public CloseableIterableCusorAdapter(Cursor cursor, MongoConverter converter, - PersistenceExceptionTranslator exceptionTranslator, Class entityClass) { - - this.cursor = cursor; - this.converter = converter; - this.entityClass = entityClass; - this.exceptionTranslator = exceptionTranslator; - } - - @Override - public boolean hasNext() { - - if (cursor == null) { - return false; - } - - try { - return cursor.hasNext(); - } catch (RuntimeException ex) { - throw exceptionTranslator.translateExceptionIfPossible(ex); - } - } - - @Override - public T next() { - - if (cursor == null) { - return null; - } - - try { - - DBObject item = cursor.next(); - Object converted = converter.read(entityClass, item); - - return (T) converted; - } catch (RuntimeException ex) { - throw exceptionTranslator.translateExceptionIfPossible(ex); - } - } - - @Override - public void close() { - - Cursor c = cursor; - try { - - c.close(); - } catch (RuntimeException ex) { - - throw exceptionTranslator.translateExceptionIfPossible(ex); - } finally { - - cursor = null; - converter = null; - entityClass = null; - exceptionTranslator = null; - } - } - - @Override - public Iterator iterator() { - return this; - } -} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java index 3a66f897aa..2d5d0c12af 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepository.java @@ -33,7 +33,6 @@ import org.springframework.data.geo.Polygon; import org.springframework.data.mongodb.repository.Person.Sex; import org.springframework.data.querydsl.QueryDslPredicateExecutor; -import org.springframework.data.util.CloseableIterator; /** * Sample repository managing {@link Person} entities. @@ -323,9 +322,9 @@ public interface PersonRepository extends MongoRepository, Query @Query("{ ?0 : ?1 }") List findByKeyValue(String key, String value); - @Query("{firstname:{$in:?0}}") - CloseableIterator findByCustomQueryWithCursorByFirstnames(List firstnames); - + /** + * @see DATAMONGO-1165 + */ @Query("{firstname:{$in:?0}}") Stream findByCustomQueryWithStreamingCursorByFirstnames(List firstnames); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java index 9035822ed0..98fdc43aef 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/PersonRepositoryIntegrationTests.java @@ -18,14 +18,12 @@ import static org.hamcrest.Matchers.*; import static org.junit.Assert.*; -import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.Test; -import org.springframework.data.util.CloseableIterator; import org.springframework.test.context.ContextConfiguration; /** @@ -37,26 +35,6 @@ @ContextConfiguration public class PersonRepositoryIntegrationTests extends AbstractPersonRepositoryIntegrationTests { - /** - * @see DATAMONGO-1165 - */ - @Test - public void shouldAllowReturningCloseableIteratorInCustomQuery() throws Exception { - - CloseableIterator result = repository.findByCustomQueryWithCursorByFirstnames(Arrays.asList("Dave")); - - List readPersons = new ArrayList(); - try { - for (Person person : result) { - readPersons.add(person); - } - } finally { - result.close(); - } - - assertThat(readPersons, hasItems(dave)); - } - /** * @see DATAMONGO-1165 */ From b5abeb7d2d1ae984fc7f6dd004e8efef9340394b Mon Sep 17 00:00:00 2001 From: Thomas Darimont Date: Tue, 3 Mar 2015 13:12:21 +0100 Subject: [PATCH 8/8] DATAMONGO-1165 - Add support for Streaming large result lists. Adapted Java 8 Stream creation to new Java8StreamUtils from SD Commons. --- .../mongodb/repository/query/AbstractMongoQuery.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java index dce6f110df..c814560f6b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/query/AbstractMongoQuery.java @@ -17,9 +17,6 @@ import java.util.Collections; import java.util.List; -import java.util.Spliterator; -import java.util.Spliterators; -import java.util.stream.StreamSupport; import org.springframework.data.domain.PageImpl; import org.springframework.data.domain.Pageable; @@ -35,8 +32,7 @@ import org.springframework.data.mongodb.core.query.Query; import org.springframework.data.repository.query.ParameterAccessor; import org.springframework.data.repository.query.RepositoryQuery; -import org.springframework.data.util.CloseableIterator; -import org.springframework.data.util.CloseableIteratorDisposingRunnable; +import org.springframework.data.util.Java8StreamUtils; import org.springframework.data.util.TypeInformation; import org.springframework.util.Assert; @@ -433,11 +429,7 @@ Object execute(Query query) { Class entityType = getQueryMethod().getEntityInformation().getJavaType(); - @SuppressWarnings("unchecked") - CloseableIterator result = (CloseableIterator) operations.executeAsStream(query, entityType); - Spliterator spliterator = Spliterators.spliteratorUnknownSize(result, Spliterator.NONNULL); - - return StreamSupport.stream(spliterator, false).onClose(new CloseableIteratorDisposingRunnable(result)); + return Java8StreamUtils.createStreamFromIterator(operations.executeAsStream(query, entityType)); } } }