Skip to content

Commit fbce145

Browse files
committed
Add support for ReadConcern.
1 parent b77a4e2 commit fbce145

File tree

7 files changed

+267
-97
lines changed

7 files changed

+267
-97
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import java.util.List;
19+
20+
import org.bson.Document;
21+
22+
import com.mongodb.ReadConcern;
23+
import com.mongodb.ReadPreference;
24+
import com.mongodb.client.MongoCollection;
25+
26+
/**
27+
* Delegate to apply {@link ReadConcern} and {@link ReadPreference} settings upon {@link CollectionPreparer preparing a
28+
* collection}.
29+
*
30+
* @author Mark Paluch
31+
* @since 4.1
32+
*/
33+
class CollectionPreparerDelegate implements ReadConcernAware, ReadPreferenceAware, CollectionPreparer {
34+
35+
List<Object> sources;
36+
37+
private CollectionPreparerDelegate(List<Object> sources) {
38+
this.sources = sources;
39+
}
40+
41+
public static CollectionPreparerDelegate of(ReadPreferenceAware... awares) {
42+
return of((Object[]) awares);
43+
}
44+
45+
public static CollectionPreparerDelegate of(Object... mixedAwares) {
46+
47+
if (mixedAwares.length == 1 && mixedAwares[0] instanceof CollectionPreparerDelegate) {
48+
return (CollectionPreparerDelegate) mixedAwares[0];
49+
}
50+
return new CollectionPreparerDelegate(List.of(mixedAwares));
51+
}
52+
53+
@Override
54+
public MongoCollection<Document> prepare(MongoCollection<Document> collection) {
55+
56+
MongoCollection<Document> collectionToUse = collection;
57+
58+
for (Object source : sources) {
59+
if (source instanceof ReadConcernAware rca && rca.hasReadConcern()) {
60+
61+
ReadConcern concern = rca.getReadConcern();
62+
if (collection.getReadConcern() != concern) {
63+
collectionToUse = collectionToUse.withReadConcern(concern);
64+
}
65+
break;
66+
}
67+
}
68+
69+
for (Object source : sources) {
70+
if (source instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
71+
72+
ReadPreference preference = rpa.getReadPreference();
73+
if (collection.getReadPreference() != preference) {
74+
collectionToUse = collectionToUse.withReadPreference(preference);
75+
}
76+
break;
77+
}
78+
}
79+
80+
return collectionToUse;
81+
}
82+
83+
@Override
84+
public boolean hasReadConcern() {
85+
86+
for (Object aware : sources) {
87+
if (aware instanceof ReadConcernAware rca && rca.hasReadConcern()) {
88+
return true;
89+
}
90+
}
91+
92+
return false;
93+
}
94+
95+
@Override
96+
public ReadConcern getReadConcern() {
97+
98+
for (Object aware : sources) {
99+
if (aware instanceof ReadConcernAware rca && rca.hasReadConcern()) {
100+
return rca.getReadConcern();
101+
}
102+
}
103+
104+
return null;
105+
}
106+
107+
@Override
108+
public boolean hasReadPreference() {
109+
110+
for (Object aware : sources) {
111+
if (aware instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
112+
return true;
113+
}
114+
}
115+
116+
return false;
117+
}
118+
119+
@Override
120+
public ReadPreference getReadPreference() {
121+
122+
for (Object aware : sources) {
123+
if (aware instanceof ReadPreferenceAware rpa && rpa.hasReadPreference()) {
124+
return rpa.getReadPreference();
125+
}
126+
}
127+
128+
return null;
129+
}
130+
131+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/MongoTemplate.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ protected <T> Stream<T> doStream(Query query, Class<?> entityType, String collec
454454
Document mappedQuery = queryContext.getMappedQuery(persistentEntity);
455455
Document mappedFields = queryContext.getMappedFields(persistentEntity, projection);
456456

457-
ReadPreferenceDelegate readPreference = createDelegate(query);
457+
CollectionPreparerDelegate readPreference = createDelegate(query);
458458
FindIterable<Document> cursor = new QueryCursorPreparer(query, entityType).initiateFind(collection,
459459
col -> readPreference.prepare(col).find(mappedQuery, Document.class).projection(mappedFields));
460460

@@ -1025,7 +1025,7 @@ public <S, T> T findAndReplace(Query query, S replacement, FindAndReplaceOptions
10251025
QueryContext queryContext = queryOperations.createQueryContext(query);
10261026

10271027
EntityProjection<T, S> projection = operations.introspectProjection(resultType, entityType);
1028-
ReadPreferenceDelegate collectionPreparer = createDelegate(query);
1028+
CollectionPreparerDelegate collectionPreparer = createDelegate(query);
10291029
Document mappedQuery = queryContext.getMappedQuery(entity);
10301030
Document mappedFields = queryContext.getMappedFields(entity, projection);
10311031
Document mappedSort = queryContext.getMappedSort(entity);
@@ -1096,7 +1096,7 @@ public long count(Query query, @Nullable Class<?> entityClass, String collection
10961096
CountOptions options = countContext.getCountOptions(entityClass);
10971097
Document mappedQuery = countContext.getMappedQuery(entityClass, mappingContext::getPersistentEntity);
10981098

1099-
ReadPreferenceDelegate readPreference = createDelegate(query);
1099+
CollectionPreparerDelegate readPreference = createDelegate(query);
11001100
return doCount(readPreference, collectionName, mappedQuery, options);
11011101
}
11021102

@@ -1117,7 +1117,7 @@ protected long doCount(CollectionPreparer collectionPreparer, String collectionN
11171117
*/
11181118
@Override
11191119
public long estimatedCount(String collectionName) {
1120-
return doEstimatedCount(ReadPreferenceDelegate.of(this), collectionName, new EstimatedDocumentCountOptions());
1120+
return doEstimatedCount(CollectionPreparerDelegate.of(this), collectionName, new EstimatedDocumentCountOptions());
11211121
}
11221122

11231123
protected long doEstimatedCount(CollectionPreparer collectionPreparer, String collectionName,
@@ -1836,7 +1836,7 @@ public <T> List<T> mapReduce(Query query, Class<?> domainType, String inputColle
18361836

18371837
String mapFunc = replaceWithResourceIfNecessary(mapFunction);
18381838
String reduceFunc = replaceWithResourceIfNecessary(reduceFunction);
1839-
ReadPreferenceDelegate readPreference = createDelegate(query);
1839+
CollectionPreparerDelegate readPreference = createDelegate(query);
18401840
MongoCollection<Document> inputCollection = readPreference
18411841
.prepare(getAndPrepareCollection(doGetDatabase(), inputCollectionName));
18421842

@@ -2061,7 +2061,7 @@ protected <O> AggregationResults<O> doAggregate(Aggregation aggregation, String
20612061
return execute(collectionName, collection -> {
20622062

20632063
List<Document> rawResult = new ArrayList<>();
2064-
ReadPreferenceDelegate delegate = ReadPreferenceDelegate.of(options);
2064+
CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);
20652065
Class<?> domainType = aggregation instanceof TypedAggregation ? ((TypedAggregation<?>) aggregation).getInputType()
20662066
: null;
20672067

@@ -2132,7 +2132,8 @@ protected <O> Stream<O> aggregateStream(Aggregation aggregation, String collecti
21322132

21332133
return execute(collectionName, (CollectionCallback<Stream<O>>) collection -> {
21342134

2135-
ReadPreferenceDelegate delegate = ReadPreferenceDelegate.of(options);
2135+
CollectionPreparerDelegate delegate = CollectionPreparerDelegate.of(options);
2136+
21362137
AggregateIterable<Document> cursor = delegate.prepare(collection).aggregate(pipeline, Document.class) //
21372138
.allowDiskUse(options.isAllowDiskUse());
21382139

@@ -2649,8 +2650,8 @@ protected <T> T doFindAndReplace(CollectionPreparer collectionPreparer, String c
26492650
options, projection);
26502651
}
26512652

2652-
ReadPreferenceDelegate createDelegate(Query query) {
2653-
return ReadPreferenceDelegate.of(query);
2653+
CollectionPreparerDelegate createDelegate(Query query) {
2654+
return CollectionPreparerDelegate.of(query);
26542655
}
26552656

26562657
/**
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.core;
17+
18+
import org.springframework.lang.Nullable;
19+
20+
import com.mongodb.ReadConcern;
21+
22+
/**
23+
* Interface to be implemented by any object that wishes to expose the {@link ReadConcern}.
24+
* <p>
25+
* Typically implemented by cursor or query preparer objects.
26+
*
27+
* @author Mark Paluch
28+
* @since 4.1
29+
*/
30+
public interface ReadConcernAware {
31+
32+
/**
33+
* @return {@literal true} if a {@link ReadConcern} is set.
34+
*/
35+
default boolean hasReadConcern() {
36+
return getReadConcern() != null;
37+
}
38+
39+
/**
40+
* @return the {@link ReadConcern} to apply or {@literal null} if none set.
41+
*/
42+
@Nullable
43+
ReadConcern getReadConcern();
44+
}

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/core/ReadPreferenceDelegate.java

Lines changed: 0 additions & 85 deletions
This file was deleted.

0 commit comments

Comments
 (0)