Skip to content

Commit 14a63db

Browse files
Integrate Apache http client with WebClient
1 parent 7411b2e commit 14a63db

File tree

15 files changed

+500
-10
lines changed

15 files changed

+500
-10
lines changed

build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ configure(allprojects) { project ->
155155
exclude group: "commons-logging", name: "commons-logging"
156156
}
157157
dependency "org.eclipse.jetty:jetty-reactive-httpclient:1.1.2"
158+
dependency 'org.apache.httpcomponents.client5:httpclient5:5.0'
159+
dependency 'org.apache.httpcomponents.core5:httpcore5-reactive:5.0'
158160

159161
dependency "org.jruby:jruby:9.2.11.0"
160162
dependency "org.python:jython-standalone:2.7.1"

spring-web/spring-web.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ dependencies {
3535
exclude group: "javax.servlet", module: "javax.servlet-api"
3636
}
3737
optional("org.eclipse.jetty:jetty-reactive-httpclient")
38+
optional('org.apache.httpcomponents.client5:httpclient5:5.0')
39+
optional('org.apache.httpcomponents.core5:httpcore5-reactive:5.0')
3840
optional("com.squareup.okhttp3:okhttp")
3941
optional("org.apache.httpcomponents:httpclient")
4042
optional("org.apache.httpcomponents:httpasyncclient")

spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
* @author Arjen Poutsma
4949
* @since 4.0
5050
* @see HttpComponentsClientHttpRequestFactory#createRequest
51-
* @deprecated as of Spring 5.0, with no direct replacement
51+
* @deprecated as of Spring 5.0, in favor of
52+
* {@link org.springframework.http.client.reactive.ApacheClientHttpConnector}
5253
*/
5354
@Deprecated
5455
final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncClientHttpRequest {

spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpRequestFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
* @author Stephane Nicoll
4545
* @since 4.0
4646
* @see HttpAsyncClient
47-
* @deprecated as of Spring 5.0, with no direct replacement
47+
* @deprecated as of Spring 5.0, in favor of
48+
* {@link org.springframework.http.client.reactive.ApacheClientHttpConnector}
4849
*/
4950
@Deprecated
5051
public class HttpComponentsAsyncClientHttpRequestFactory extends HttpComponentsClientHttpRequestFactory

spring-web/src/main/java/org/springframework/http/client/HttpComponentsAsyncClientHttpResponse.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@
3737
* @author Arjen Poutsma
3838
* @since 4.0
3939
* @see HttpComponentsAsyncClientHttpRequest#executeAsync()
40-
* @deprecated as of Spring 5.0, with no direct replacement
40+
* @deprecated as of Spring 5.0, in favor of
41+
* {@link org.springframework.http.client.reactive.ApacheClientHttpConnector}
4142
*/
4243
@Deprecated
4344
final class HttpComponentsAsyncClientHttpResponse extends AbstractClientHttpResponse {
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client.reactive;
18+
19+
import java.net.URI;
20+
import java.nio.ByteBuffer;
21+
import java.util.function.Function;
22+
23+
import org.apache.hc.client5.http.cookie.BasicCookieStore;
24+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
25+
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
26+
import org.apache.hc.client5.http.protocol.HttpClientContext;
27+
import org.apache.hc.core5.http.HttpResponse;
28+
import org.apache.hc.core5.http.Message;
29+
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
30+
import org.apache.hc.core5.reactive.ReactiveEntityProducer;
31+
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
32+
import org.reactivestreams.Publisher;
33+
import reactor.core.publisher.Flux;
34+
import reactor.core.publisher.Mono;
35+
36+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
37+
import org.springframework.http.HttpMethod;
38+
import org.springframework.lang.Nullable;
39+
40+
/**
41+
* {@link ClientHttpConnector} implementation for the Apache HttpComponents HttpClient 5.x.
42+
*
43+
* @author Martin Tarjányi
44+
* @since 5.3
45+
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
46+
*/
47+
public class ApacheClientHttpConnector implements ClientHttpConnector {
48+
private final CloseableHttpAsyncClient client;
49+
50+
private final DefaultDataBufferFactory dataBufferFactory;
51+
52+
/**
53+
* Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}.
54+
*/
55+
public ApacheClientHttpConnector() {
56+
this(HttpAsyncClients.createDefault());
57+
}
58+
59+
/**
60+
* Constructor with an initialized {@link CloseableHttpAsyncClient}.
61+
*/
62+
public ApacheClientHttpConnector(CloseableHttpAsyncClient client) {
63+
this.dataBufferFactory = new DefaultDataBufferFactory();
64+
this.client = client;
65+
this.client.start();
66+
}
67+
68+
@Override
69+
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
70+
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {
71+
72+
ApacheClientHttpRequest request = new ApacheClientHttpRequest(method, uri, this.dataBufferFactory);
73+
74+
return requestCallback.apply(request).then(Mono.defer(() -> execute(request)));
75+
}
76+
77+
private Mono<ClientHttpResponse> execute(ApacheClientHttpRequest request) {
78+
Flux<ByteBuffer> byteBufferFlux = request.getByteBufferFlux();
79+
80+
ReactiveEntityProducer reactiveEntityProducer = createReactiveEntityProducer(request, byteBufferFlux);
81+
82+
BasicRequestProducer basicRequestProducer = new BasicRequestProducer(request.getHttpRequest(),
83+
reactiveEntityProducer);
84+
85+
HttpClientContext context = HttpClientContext.create();
86+
context.setCookieStore(new BasicCookieStore());
87+
88+
return Mono.<Message<HttpResponse, Publisher<ByteBuffer>>>create(sink -> {
89+
ReactiveResponseConsumer reactiveResponseConsumer = new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink));
90+
this.client.execute(basicRequestProducer, reactiveResponseConsumer, context, null);
91+
}).map(message -> new ApacheClientHttpResponse(this.dataBufferFactory, message, context));
92+
}
93+
94+
@Nullable
95+
private ReactiveEntityProducer createReactiveEntityProducer(ApacheClientHttpRequest request,
96+
@Nullable Flux<ByteBuffer> byteBufferFlux) {
97+
98+
if (byteBufferFlux == null) {
99+
return null;
100+
}
101+
102+
return new ReactiveEntityProducer(byteBufferFlux, request.getContentLength(), null, null);
103+
}
104+
}
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client.reactive;
18+
19+
import java.net.URI;
20+
import java.net.URISyntaxException;
21+
import java.nio.ByteBuffer;
22+
import java.util.Collection;
23+
24+
import org.apache.hc.core5.http.HttpRequest;
25+
import org.apache.hc.core5.http.message.BasicHttpRequest;
26+
import org.reactivestreams.Publisher;
27+
import reactor.core.publisher.Flux;
28+
import reactor.core.publisher.Mono;
29+
30+
import org.springframework.core.io.buffer.DataBuffer;
31+
import org.springframework.core.io.buffer.DataBufferFactory;
32+
import org.springframework.http.HttpHeaders;
33+
import org.springframework.http.HttpMethod;
34+
35+
import static org.springframework.http.MediaType.ALL_VALUE;
36+
37+
/**
38+
* {@link ClientHttpRequest} implementation for the Apache HttpComponents HttpClient 5.x.
39+
*
40+
* @author Martin Tarjányi
41+
* @since 5.3
42+
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
43+
*/
44+
class ApacheClientHttpRequest extends AbstractClientHttpRequest {
45+
private final HttpRequest httpRequest;
46+
47+
private final DataBufferFactory dataBufferFactory;
48+
49+
private Flux<ByteBuffer> byteBufferFlux;
50+
51+
private long contentLength = -1;
52+
53+
public ApacheClientHttpRequest(HttpMethod method, URI uri, DataBufferFactory dataBufferFactory) {
54+
this.httpRequest = new BasicHttpRequest(method.name(), uri);
55+
this.dataBufferFactory = dataBufferFactory;
56+
}
57+
58+
@Override
59+
public HttpMethod getMethod() {
60+
return HttpMethod.resolve(this.httpRequest.getMethod());
61+
}
62+
63+
@Override
64+
public URI getURI() {
65+
try {
66+
return this.httpRequest.getUri();
67+
}
68+
catch (URISyntaxException ex) {
69+
throw new IllegalArgumentException("Invalid URI syntax.", ex);
70+
}
71+
}
72+
73+
@Override
74+
public DataBufferFactory bufferFactory() {
75+
return this.dataBufferFactory;
76+
}
77+
78+
@Override
79+
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
80+
return doCommit(() -> {
81+
this.byteBufferFlux = Flux.from(body).map(DataBuffer::asByteBuffer);
82+
return Mono.empty();
83+
});
84+
}
85+
86+
@Override
87+
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
88+
return writeWith(Flux.from(body).flatMap(p -> p));
89+
}
90+
91+
@Override
92+
public Mono<Void> setComplete() {
93+
return doCommit();
94+
}
95+
96+
@Override
97+
protected void applyHeaders() {
98+
HttpHeaders headers = getHeaders();
99+
this.contentLength = headers.getContentLength();
100+
101+
headers.entrySet()
102+
.stream()
103+
.filter(entry -> !HttpHeaders.CONTENT_LENGTH.equals(entry.getKey()))
104+
.forEach(entry -> entry.getValue().forEach(v -> this.httpRequest.addHeader(entry.getKey(), v)));
105+
106+
if (!this.httpRequest.containsHeader(HttpHeaders.ACCEPT)) {
107+
this.httpRequest.addHeader(HttpHeaders.ACCEPT, ALL_VALUE);
108+
}
109+
}
110+
111+
@Override
112+
protected void applyCookies() {
113+
getCookies().values()
114+
.stream()
115+
.flatMap(Collection::stream)
116+
.forEach(httpCookie -> this.httpRequest.addHeader(HttpHeaders.COOKIE, httpCookie.toString()));
117+
}
118+
119+
public HttpRequest getHttpRequest() {
120+
return this.httpRequest;
121+
}
122+
123+
public Flux<ByteBuffer> getByteBufferFlux() {
124+
return this.byteBufferFlux;
125+
}
126+
127+
public long getContentLength() {
128+
return this.contentLength;
129+
}
130+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2002-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client.reactive;
18+
19+
import java.nio.ByteBuffer;
20+
import java.util.Arrays;
21+
import java.util.Objects;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
24+
import org.apache.hc.client5.http.protocol.HttpClientContext;
25+
import org.apache.hc.core5.http.Header;
26+
import org.apache.hc.core5.http.HttpResponse;
27+
import org.apache.hc.core5.http.Message;
28+
import org.reactivestreams.Publisher;
29+
import reactor.core.publisher.Flux;
30+
31+
import org.springframework.core.io.buffer.DataBuffer;
32+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
33+
import org.springframework.http.HttpHeaders;
34+
import org.springframework.http.HttpStatus;
35+
import org.springframework.http.ResponseCookie;
36+
import org.springframework.util.LinkedMultiValueMap;
37+
import org.springframework.util.MultiValueMap;
38+
39+
import static org.apache.hc.client5.http.cookie.Cookie.MAX_AGE_ATTR;
40+
41+
/**
42+
* {@link ClientHttpResponse} implementation for the Apache HttpComponents HttpClient 5.x.
43+
*
44+
* @author Martin Tarjányi
45+
* @since 5.3
46+
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
47+
*/
48+
class ApacheClientHttpResponse implements ClientHttpResponse {
49+
private final Message<HttpResponse, Publisher<ByteBuffer>> message;
50+
51+
private final Flux<DataBuffer> dataBufferFlux;
52+
53+
private final HttpClientContext context;
54+
55+
private final AtomicBoolean rejectSubscribers = new AtomicBoolean();
56+
57+
public ApacheClientHttpResponse(DefaultDataBufferFactory dataBufferFactory,
58+
Message<HttpResponse, Publisher<ByteBuffer>> message,
59+
HttpClientContext context) {
60+
61+
this.message = message;
62+
this.context = context;
63+
64+
this.dataBufferFlux = Flux.from(this.message.getBody())
65+
.doOnSubscribe(s -> {
66+
if (!this.rejectSubscribers.compareAndSet(false, true)) {
67+
throw new IllegalStateException("The client response body can only be consumed once.");
68+
}
69+
})
70+
.map(dataBufferFactory::wrap);
71+
}
72+
73+
@Override
74+
public HttpStatus getStatusCode() {
75+
return HttpStatus.valueOf(this.message.getHead().getCode());
76+
}
77+
78+
@Override
79+
public int getRawStatusCode() {
80+
return this.message.getHead().getCode();
81+
}
82+
83+
@Override
84+
public MultiValueMap<String, ResponseCookie> getCookies() {
85+
LinkedMultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
86+
this.context.getCookieStore().getCookies().forEach(cookie ->
87+
result.add(cookie.getName(), ResponseCookie.from(cookie.getName(), cookie.getValue())
88+
.domain(cookie.getDomain())
89+
.path(cookie.getPath())
90+
.maxAge(Long.parseLong(Objects.toString(cookie.getAttribute(MAX_AGE_ATTR), "-1")))
91+
.secure(cookie.isSecure())
92+
.httpOnly(cookie.containsAttribute("httponly"))
93+
.build()));
94+
return result;
95+
}
96+
97+
@Override
98+
public Flux<DataBuffer> getBody() {
99+
return this.dataBufferFlux;
100+
}
101+
102+
@Override
103+
public HttpHeaders getHeaders() {
104+
return Arrays.stream(this.message.getHead().getHeaders())
105+
.collect(HttpHeaders::new, this::addHeader, HttpHeaders::putAll);
106+
}
107+
108+
private void addHeader(HttpHeaders httpHeaders, Header header) {
109+
httpHeaders.add(header.getName(), header.getValue());
110+
}
111+
}

0 commit comments

Comments
 (0)