Description
Bug Report
I have not been able to find any information about this particular error on the Internet.
Versions
- Driver: r2dbc-postgres
v1.0.7
and r2dbc-poolv1.0.2
- Database: Postgres 14
- Java: 17
- OS: Linux
Current Behavior
When using failover protocol with primary and standby Postgres instances we can set target type to "preferSecondary. In theory when standby instance is down, driver should connect to the primary one. And it's working fine until we add second parallel connection (for example using r2dbc-pool). Then it throws misleading error:
Table schema
NA
Steps to reproduce
Minimal, Reproducible Example: https://github.com/mateusz-baluch-xtb/R2dbcPostgresFailoverPoolExample
Explanation:
r2dbc-pool is calling MultiHostConnectionStrategy.connect()
only once - when we are creating new datasource. Then to create each connection it uses the same Mono which contains references to the same exceptionRef
instances.
Code that is important to understand the problem (copied from io.r2dbc.postgresql.MultiHostConnectionStrategy
class):
public Mono<Client> connect(TargetServerType targetServerType) {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
return attemptConnection(targetServerType)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
return Mono.empty();
})
.switchIfEmpty(Mono.defer(() -> targetServerType == PREFER_SECONDARY ? attemptConnection(PRIMARY) : Mono.empty()))
.switchIfEmpty(Mono.error(() -> {
Throwable error = exceptionRef.get();
if (error == null) {
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("No server matches target type '%s'", targetServerType), null);
} else {
return new PostgresqlConnectionFactory.PostgresConnectionException(String.format("Cannot connect to a host of %s", this.addresses), error);
}
}));
}
private Mono<Client> attemptConnection(TargetServerType targetServerType) {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
return getCandidates(targetServerType).concatMap(candidate -> this.attemptConnection(targetServerType, candidate)
.onErrorResume(e -> {
if (!exceptionRef.compareAndSet(null, e)) {
exceptionRef.get().addSuppressed(e);
}
this.statusMap.put(candidate, HostConnectOutcome.fail(candidate));
return Mono.empty();
}))
.next()
.switchIfEmpty(Mono.defer(() -> exceptionRef.get() != null
? Mono.error(exceptionRef.get())
: Mono.empty()));
}
When we call MultiHostConnectionStrategy.connect()
method, it returns Mono<Connection>
with references to the same two instances of exceptionRef
(one from connect(TargetServerType)
and the second one from attemptConnection(TargetServerType)
). So when we are using target type PREFER_SECONDARY
and the first attemptConnection
is not successful, the exceptionRef
is filled with that particular exception. Then in connect(TargetServerType)
method, we store the same exception in its exceptionRef
. First connection is always successful (connected to the primary instance), because exceptionRef
in the connect
method is empty. All subsequent connections are failing, because the
attemtConnection
method is returning the same exception, and then in the connect
method there is line:
exceptionRef.get().addSuppressed(e);
This is causing the java.lang.IllegalArgumentException: Self-suppression not permitted
exception to be thrown, because e
is the same exception as stored in a exceptionRef
variable.
Expected behavior/code
When standby instance is not available the connections should always be able to connect to the primary server, even using r2dbc-pool.
Possible Solution
The solution to that problem is to create a new instance of AtomicReference<Throwable>
in the attemptConnection
and connect
methods for each connection attempt. We can do it by wrapping method calls inside defer function (Mono.defer(() -> call())
).
Example:
@Override
public Mono<Client> connect() {
return Mono.defer(() -> connect(this.multiHostConfiguration.getTargetServerType()));
}
public Mono<Client> connect(TargetServerType targetServerType) {
AtomicReference<Throwable> exceptionRef = new AtomicReference<>();
return Mono.defer(() -> attemptConnection(targetServerType))
...
Solution explanation:
The Mono.defer
method will call connect
and attemptConnection
for each connection (not only once), and it will create new instances of AtomicReference<Throwable>
. This will prevent the java.lang.IllegalArgumentException: Self-suppression not permitted
exception from being thrown.
Additional context
NA