Closed
Description
I hit this problem in production:
step | thread1 | thread2 |
---|---|---|
1 | sends MetadataRequest_v1 | |
2 | calls poll() | |
3 | calls maybe_connect with node id n |
|
4 | process MetadataResponse_v1 which removes n from the metadata |
When step 3 happens, the id n
is added to _connecting
. Once step 4 happens, the corresponding metadata for node n
is removed from ClusterMetadata
. The result is that all subsequent calls to poll
will fails on this assertion:
kafka-python/kafka/client_async.py
Line 372 in 9feeb79
Here is a somewhat hacked together script to reproduce the problem:
#!/bin/env python
from contextlib import contextmanager
import subprocess
import threading
import logging
from time import sleep
from kafka.client_async import KafkaClient
from kafka.admin.client import KafkaAdminClient
from kafka.admin.new_topic import NewTopic
PORT_START = 2000
PORT_END = 2020
ZK_PORT = 2181
def create_pod():
args = [
"podman",
"pod",
"create",
"--publish",
f"{PORT_START}-{PORT_END}:{PORT_START}-{PORT_END}",
"--name",
"kafka",
]
subprocess.run(args)
print("kafka pod created")
def start_zk():
args = [
"podman",
"run",
"--detach",
"--tty",
"--rm",
"--pod",
"kafka",
"--name",
"zk",
"--env",
f"ZOOKEEPER_CLIENT_PORT={ZK_PORT}",
"confluentinc/cp-zookeeper",
]
subprocess.run(args)
print("zk started")
def start_kafka(num: int):
port = PORT_START + num * 2
port_host = port + 1
args = [
"podman",
"run",
"--detach",
"--tty",
"--rm",
"--pod",
"kafka",
"--name",
f"kafka-{num}",
"--env",
"KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
"--env",
f"KAFKA_ZOOKEEPER_CONNECT=localhost:{ZK_PORT}",
"--env",
f"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:{port},PLAINTEXT_HOST://localhost:{port_host}",
"--env",
"KAFKA_AUTO_CREATE_TOPICS_ENABLE=false",
"confluentinc/cp-kafka",
]
subprocess.run(args)
print(f"kafka-{num} started")
def stop_kafka(num: int):
args = [
"podman",
"stop",
f"kafka-{num}",
]
subprocess.run(args)
print(f"kafka-{num} stopped")
def stop_zk():
args = [
"podman",
"stop",
"zk",
]
subprocess.run(args)
print("zk stopped")
def destroy_pod():
args = ["podman", "pod", "rm", "kafka"]
subprocess.run(args)
print("kafka pod removed")
@contextmanager
def setup_env():
create_pod()
start_zk()
start_kafka(0)
sleep(2) # make sure kafka-0 has the lowest id
start_kafka(1)
try:
yield
except:
logging.getLogger().exception("An error occurred")
finally:
stop_kafka(1)
stop_kafka(0)
stop_zk()
destroy_pod()
def test():
logging.basicConfig()
logging.getLogger().setLevel("DEBUG")
port = PORT_START
bootstrap = f"localhost:{port}"
with setup_env():
# Wait for the servers to be ready and for the metadata to be available
sleep(7)
kc = KafkaClient(bootstrap_servers=[bootstrap])
# Poll to update the metadata from the bootstrap server
kc.poll()
kc.poll()
assert len(kc.cluster.brokers()) == 2, kc.cluster.brokers()
# Force openning a connection to the first server
first_id = min(broker.nodeId for broker in kc.cluster.brokers())
kc.maybe_connect(first_id)
kc.poll()
kc.poll()
print("=" * 50)
print("=" * 50)
stop_kafka(1)
sleep(2) # give time for kafka-0 to notice
original_update_metadata = kc.cluster.update_metadata
def patched_update_metadata(*args, **kwargs):
latest_id = max(broker.nodeId for broker in kc.cluster.brokers())
print(kc._connecting)
kc.maybe_connect(latest_id)
print(kc._connecting)
print("*" * 50)
print("*" * 50)
original_update_metadata(*args, **kwargs)
kc.cluster.update_metadata = patched_update_metadata
kc.cluster.request_update()
try:
kc.poll() # sends the metadata request
kc.poll()
kc.poll()
except AssertionError:
pass
kc.cluster.update_metadata = original_update_metadata
assert len(kc.cluster.brokers()) == 1, kc.cluster.brokers()
while True:
try:
kc.poll()
except AssertionError:
print("x", end="", flush=True)
else:
print(".", end="", flush=True)
sleep(0.5)
if __name__ == "__main__":
test()
Metadata
Metadata
Assignees
Labels
No labels