Skip to content

Commit 27bef19

Browse files
defer: refactor to use Reactor instead of manual subscription
1 parent 6c71232 commit 27bef19

File tree

1 file changed

+13
-49
lines changed

1 file changed

+13
-49
lines changed

defer/server/src/main/java/com/graphqljava/defer/GraphQLController.java

Lines changed: 13 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -99,65 +99,29 @@ private Mono<Void> handleNormalResponse(ServerHttpResponse serverHttpResponse, E
9999
headers.setContentType(MediaType.APPLICATION_JSON_UTF8);
100100
String body = objectMapper.writeValueAsString(result);
101101
return serverHttpResponse.writeWith(strToDataBuffer(body));
102-
// PrintWriter writer = httpServletResponse.getWriter();
103-
// writer.write(body);
104-
// writer.close();
105-
106102
}
107103

108104
private Mono<Void> sendDeferResponse(ServerHttpResponse serverHttpResponse, ExecutionResult executionResult, Publisher<DeferredExecutionResult> deferredResults) {
105+
// this implements this apollo defer spec: https://github.com/apollographql/apollo-server/blob/defer-support/docs/source/defer-support.md
106+
// the spec says CRLF + "-----" + CRLF is needed at the end, but it works without it and with it we get client
107+
// side errors with it, so we skp it
109108
serverHttpResponse.setStatusCode(HttpStatus.OK);
110109
HttpHeaders headers = serverHttpResponse.getHeaders();
111110
headers.set("Content-Type", "multipart/mixed; boundary=\"-\"");
112111
headers.set("Connection", "keep-alive");
113112

113+
Flux<Mono<DataBuffer>> deferredDataBuffers = Flux.from(deferredResults).map(deferredExecutionResult -> {
114+
DeferPart deferPart = new DeferPart(deferredExecutionResult.toSpecification());
115+
StringBuilder builder = new StringBuilder();
116+
String body = deferPart.write();
117+
builder.append(CRLF).append("---").append(CRLF);
118+
builder.append(body);
119+
return strToDataBuffer(builder.toString());
120+
});
121+
Flux<Mono<DataBuffer>> firstResult = Flux.just(firstResult(executionResult));
114122

115-
Flux<Mono<DataBuffer>> dataBufferFlux = Flux.create(monoFluxSink -> {
116-
117-
Mono<DataBuffer> firstDataBuffer = firstResult(executionResult);
118-
monoFluxSink.next(firstDataBuffer);
119-
120-
deferredResults.subscribe(new Subscriber<DeferredExecutionResult>() {
121-
122-
Subscription subscription;
123-
124-
@Override
125-
public void onSubscribe(Subscription s) {
126-
subscription = s;
127-
subscription.request(10);
128-
}
129-
130-
@Override
131-
public void onNext(DeferredExecutionResult executionResult) {
132-
try {
133-
DeferPart deferPart = new DeferPart(executionResult.toSpecification());
134-
StringBuilder builder = new StringBuilder();
135-
String body = deferPart.write();
136-
builder.append(CRLF).append("---").append(CRLF);
137-
builder.append(body);
138-
Mono<DataBuffer> dataBuffer = strToDataBuffer(builder.toString());
139-
monoFluxSink.next(dataBuffer);
140-
} catch (Exception e) {
141-
monoFluxSink.error(e);
142-
}
143-
}
144-
145-
@Override
146-
public void onError(Throwable t) {
147-
monoFluxSink.error(t);
148-
}
149-
150-
@Override
151-
public void onComplete() {
152-
StringBuilder end = new StringBuilder();
153-
end.append(CRLF).append("-----").append(CRLF);
154-
Mono<DataBuffer> dataBuffer = strToDataBuffer(end.toString());
155-
monoFluxSink.next(dataBuffer);
156-
}
157-
});
158123

159-
});
160-
return serverHttpResponse.writeAndFlushWith(dataBufferFlux);
124+
return serverHttpResponse.writeAndFlushWith(Flux.mergeSequential(firstResult, deferredDataBuffers));
161125
}
162126

163127
private Mono<DataBuffer> firstResult(ExecutionResult executionResult) {

0 commit comments

Comments
 (0)