Skip to content

Integrate Apache Http client with WebClient #24700

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@ configure(allprojects) { project ->
exclude group: "commons-logging", name: "commons-logging"
}
dependency "org.eclipse.jetty:jetty-reactive-httpclient:1.1.2"
dependency 'org.apache.httpcomponents.client5:httpclient5:5.0'
dependency 'org.apache.httpcomponents.core5:httpcore5-reactive:5.0'

dependency "org.jruby:jruby:9.2.11.0"
dependency "org.python:jython-standalone:2.7.1"
Expand Down
2 changes: 2 additions & 0 deletions spring-web/spring-web.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ dependencies {
exclude group: "javax.servlet", module: "javax.servlet-api"
}
optional("org.eclipse.jetty:jetty-reactive-httpclient")
optional('org.apache.httpcomponents.client5:httpclient5:5.0')
optional('org.apache.httpcomponents.core5:httpcore5-reactive:5.0')
optional("com.squareup.okhttp3:okhttp")
optional("org.apache.httpcomponents:httpclient")
optional("org.apache.httpcomponents:httpasyncclient")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
* @author Arjen Poutsma
* @since 4.0
* @see HttpComponentsClientHttpRequestFactory#createRequest
* @deprecated as of Spring 5.0, with no direct replacement
* @deprecated as of Spring 5.0, in favor of
* {@link org.springframework.http.client.reactive.ApacheClientHttpConnector}
*/
@Deprecated
final class HttpComponentsAsyncClientHttpRequest extends AbstractBufferingAsyncClientHttpRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
* @author Stephane Nicoll
* @since 4.0
* @see HttpAsyncClient
* @deprecated as of Spring 5.0, with no direct replacement
* @deprecated as of Spring 5.0, in favor of
* {@link org.springframework.http.client.reactive.ApacheClientHttpConnector}
*/
@Deprecated
public class HttpComponentsAsyncClientHttpRequestFactory extends HttpComponentsClientHttpRequestFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
* @author Arjen Poutsma
* @since 4.0
* @see HttpComponentsAsyncClientHttpRequest#executeAsync()
* @deprecated as of Spring 5.0, with no direct replacement
* @deprecated as of Spring 5.0, in favor of
* {@link org.springframework.http.client.reactive.ApacheClientHttpConnector}
*/
@Deprecated
final class HttpComponentsAsyncClientHttpResponse extends AbstractClientHttpResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client.reactive;

import java.net.URI;
import java.nio.ByteBuffer;
import java.util.function.Function;

import org.apache.hc.client5.http.cookie.BasicCookieStore;
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.HttpResponse;
import org.apache.hc.core5.http.Message;
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
import org.apache.hc.core5.reactive.ReactiveEntityProducer;
import org.apache.hc.core5.reactive.ReactiveResponseConsumer;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpMethod;
import org.springframework.lang.Nullable;

/**
* {@link ClientHttpConnector} implementation for the Apache HttpComponents HttpClient 5.x.
*
* @author Martin Tarjányi
* @since 5.3
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
*/
public class ApacheClientHttpConnector implements ClientHttpConnector {
private final CloseableHttpAsyncClient client;

private final DataBufferFactory dataBufferFactory;

/**
* Default constructor that creates and starts a new instance of {@link CloseableHttpAsyncClient}.
*/
public ApacheClientHttpConnector() {
this(HttpAsyncClients.createDefault());
}

/**
* Constructor with an initialized {@link CloseableHttpAsyncClient}.
*/
public ApacheClientHttpConnector(CloseableHttpAsyncClient client) {
this.dataBufferFactory = new DefaultDataBufferFactory();
this.client = client;
this.client.start();
}

@Override
public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
Function<? super ClientHttpRequest, Mono<Void>> requestCallback) {

ApacheClientHttpRequest request = new ApacheClientHttpRequest(method, uri, this.dataBufferFactory);

return requestCallback.apply(request).then(Mono.defer(() -> execute(request)));
}

private Mono<ClientHttpResponse> execute(ApacheClientHttpRequest request) {
Flux<ByteBuffer> byteBufferFlux = request.getByteBufferFlux();

ReactiveEntityProducer reactiveEntityProducer = createReactiveEntityProducer(request, byteBufferFlux);

BasicRequestProducer basicRequestProducer = new BasicRequestProducer(request.getHttpRequest(),
reactiveEntityProducer);

HttpClientContext context = HttpClientContext.create();
context.setCookieStore(new BasicCookieStore());

return Mono.<Message<HttpResponse, Publisher<ByteBuffer>>>create(sink -> {
ReactiveResponseConsumer reactiveResponseConsumer = new ReactiveResponseConsumer(new MonoFutureCallbackAdapter<>(sink));
this.client.execute(basicRequestProducer, reactiveResponseConsumer, context, null);
}).map(message -> new ApacheClientHttpResponse(this.dataBufferFactory, message, context));
}

@Nullable
private ReactiveEntityProducer createReactiveEntityProducer(ApacheClientHttpRequest request,
@Nullable Flux<ByteBuffer> byteBufferFlux) {

if (byteBufferFlux == null) {
return null;
}

return new ReactiveEntityProducer(byteBufferFlux, request.getContentLength(), null, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.http.client.reactive;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.stream.Collectors;

import org.apache.hc.core5.http.HttpRequest;
import org.apache.hc.core5.http.message.BasicHttpRequest;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;

import static org.springframework.http.MediaType.ALL_VALUE;

/**
* {@link ClientHttpRequest} implementation for the Apache HttpComponents HttpClient 5.x.
*
* @author Martin Tarjányi
* @since 5.3
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
*/
class ApacheClientHttpRequest extends AbstractClientHttpRequest {
private final HttpRequest httpRequest;

private final DataBufferFactory dataBufferFactory;

private Flux<ByteBuffer> byteBufferFlux;

private long contentLength = -1;

public ApacheClientHttpRequest(HttpMethod method, URI uri, DataBufferFactory dataBufferFactory) {
this.httpRequest = new BasicHttpRequest(method.name(), uri);
this.dataBufferFactory = dataBufferFactory;
}

@Override
public HttpMethod getMethod() {
return HttpMethod.resolve(this.httpRequest.getMethod());
}

@Override
public URI getURI() {
try {
return this.httpRequest.getUri();
}
catch (URISyntaxException ex) {
throw new IllegalArgumentException("Invalid URI syntax.", ex);
}
}

@Override
public DataBufferFactory bufferFactory() {
return this.dataBufferFactory;
}

@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
return doCommit(() -> {
this.byteBufferFlux = Flux.from(body).map(DataBuffer::asByteBuffer);
return Mono.empty();
});
}

@Override
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
return writeWith(Flux.from(body).flatMap(p -> p));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is supposed to flush after each inner buffer flux is complete. Is this code doing that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likely not, but I'm not sure. Let me tag Apache contributors @ok2c and @rschmitt maybe they can help out here.
So the question is: Is it possible to control when the reactive data producer flushes parts of the request body?
Also, folks, you are more than welcome to review this pull request if you have time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martin-tarjanyi I personally have little experience with reactive bindings but I hope @rschmitt can answer your question. What I could do though is to look at various ways of optimizing the implementation once it gets accepted.

}

@Override
public Mono<Void> setComplete() {
return doCommit();
}

@Override
protected void applyHeaders() {
HttpHeaders headers = getHeaders();
this.contentLength = headers.getContentLength();

headers.entrySet()
.stream()
.filter(entry -> !HttpHeaders.CONTENT_LENGTH.equals(entry.getKey()))
.forEach(entry -> entry.getValue().forEach(v -> this.httpRequest.addHeader(entry.getKey(), v)));

if (!this.httpRequest.containsHeader(HttpHeaders.ACCEPT)) {
this.httpRequest.addHeader(HttpHeaders.ACCEPT, ALL_VALUE);
}
}

@Override
protected void applyCookies() {
if (getCookies().isEmpty()) {
return;
}

String cookiesString = getCookies().values()
.stream()
.flatMap(Collection::stream)
.map(HttpCookie::toString)
.collect(Collectors.joining("; "));

this.httpRequest.addHeader(HttpHeaders.COOKIE, cookiesString);
}

public HttpRequest getHttpRequest() {
return this.httpRequest;
}

public Flux<ByteBuffer> getByteBufferFlux() {
return this.byteBufferFlux;
}

public long getContentLength() {
return this.contentLength;
}
}
Loading