Skip to content

ZOOKEEPER-4921: Retry endlessly to establish a brand-new session #2265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: branch-3.9.4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1241,7 +1241,7 @@ public void run() {
to = connectTimeout - clientCnxnSocket.getIdleSend();
}

int expiration = expirationTimeout - clientCnxnSocket.getIdleRecv();
int expiration = sessionId == 0 ? Integer.MAX_VALUE : expirationTimeout - clientCnxnSocket.getIdleRecv();
if (expiration <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from server in %dms for session id 0x%s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.zookeeper.test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand All @@ -31,12 +34,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.common.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
Expand All @@ -54,6 +60,21 @@ public void setUp() throws Exception {
zk = createClient();
}

private static class ExpiredWatcher implements Watcher {
public volatile CompletableFuture<Void> expired = new CompletableFuture<>();

synchronized void reset() {
expired = new CompletableFuture<>();
}

@Override
public synchronized void process(WatchedEvent event) {
if (event.getState() == Event.KeeperState.Expired) {
expired.complete(null);
}
}
}

private static class BusyServer implements AutoCloseable {
private final ServerSocket server;
private final Socket client;
Expand Down Expand Up @@ -143,17 +164,24 @@ public void testSessionExpirationAfterAllServerDown() throws Exception {
// stop client also to gain less distraction
zk.close();

// small connection timeout to gain quick ci feedback
int sessionTimeout = 3000;
CompletableFuture<Void> expired = new CompletableFuture<>();
// given: established session
int sessionTimeout = 3000; // small connection timeout to gain quick ci feedback
ExpiredWatcher watcher = new ExpiredWatcher();
zk = createClient(new CountdownWatcher(), hostPort, sessionTimeout);
zk.register(event -> {
if (event.getState() == Watcher.Event.KeeperState.Expired) {
expired.complete(null);
}
});
zk.register(watcher);

// when: all server down
long start = Time.currentElapsedTime();
zk.sync("/"); // touch timeout counts
stopServer();
expired.join();

// then: get Expired after session timeout
watcher.expired.join();
long elapsed = Time.currentElapsedTime() - start;
assertThat(elapsed, greaterThanOrEqualTo((long) zk.getSessionTimeout()));
assertThat(elapsed, lessThan(zk.getSessionTimeout() * 10L));

// then: future request will get SessionExpiredException
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
}

Expand All @@ -162,18 +190,17 @@ public void testSessionExpirationWhenNoServerUp() throws Exception {
// stop client also to gain less distraction
zk.close();

// given: unavailable cluster
stopServer();

// small connection timeout to gain quick ci feedback
int sessionTimeout = 3000;
CompletableFuture<Void> expired = new CompletableFuture<>();
new TestableZooKeeper(hostPort, sessionTimeout, event -> {
if (event.getState() == Watcher.Event.KeeperState.Expired) {
expired.complete(null);
}
});
expired.join();
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
// when: try to establish a brand-new session
int sessionTimeout = 300; // small connection timeout to gain quick ci feedback
ExpiredWatcher watcher = new ExpiredWatcher();
try (ZooKeeper zk = new ZooKeeper(hostPort, sessionTimeout, watcher)) {
// then: never Expired
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
}
}

@Test
Expand Down