Skip to content

KafkaClient.maybe_connect and ClusterMetadata.update_metadata are not synchronized which leads to broken asserts #2188

Closed
@hackaugusto

Description

@hackaugusto

I hit this problem in production:

step thread1 thread2
1 sends MetadataRequest_v1
2 calls poll()
3 calls maybe_connect with node id n
4 process MetadataResponse_v1 which removes n from the metadata

When step 3 happens, the id n is added to _connecting. Once step 4 happens, the corresponding metadata for node n is removed from ClusterMetadata. The result is that all subsequent calls to poll will fails on this assertion:

assert broker, 'Broker id %s not in current metadata' % (node_id,)

Here is a somewhat hacked together script to reproduce the problem:

#!/bin/env python
from contextlib import contextmanager
import subprocess
import threading
import logging
from time import sleep
from kafka.client_async import KafkaClient
from kafka.admin.client import KafkaAdminClient
from kafka.admin.new_topic import NewTopic

PORT_START = 2000
PORT_END = 2020
ZK_PORT = 2181


def create_pod():
    args = [
        "podman",
        "pod",
        "create",
        "--publish",
        f"{PORT_START}-{PORT_END}:{PORT_START}-{PORT_END}",
        "--name",
        "kafka",
    ]
    subprocess.run(args)
    print("kafka pod created")


def start_zk():
    args = [
        "podman",
        "run",
        "--detach",
        "--tty",
        "--rm",
        "--pod",
        "kafka",
        "--name",
        "zk",
        "--env",
        f"ZOOKEEPER_CLIENT_PORT={ZK_PORT}",
        "confluentinc/cp-zookeeper",
    ]
    subprocess.run(args)
    print("zk started")


def start_kafka(num: int):
    port = PORT_START + num * 2
    port_host = port + 1

    args = [
        "podman",
        "run",
        "--detach",
        "--tty",
        "--rm",
        "--pod",
        "kafka",
        "--name",
        f"kafka-{num}",
        "--env",
        "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT",
        "--env",
        f"KAFKA_ZOOKEEPER_CONNECT=localhost:{ZK_PORT}",
        "--env",
        f"KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:{port},PLAINTEXT_HOST://localhost:{port_host}",
        "--env",
        "KAFKA_AUTO_CREATE_TOPICS_ENABLE=false",
        "confluentinc/cp-kafka",
    ]
    subprocess.run(args)
    print(f"kafka-{num} started")


def stop_kafka(num: int):
    args = [
        "podman",
        "stop",
        f"kafka-{num}",
    ]
    subprocess.run(args)
    print(f"kafka-{num} stopped")


def stop_zk():
    args = [
        "podman",
        "stop",
        "zk",
    ]
    subprocess.run(args)
    print("zk stopped")


def destroy_pod():
    args = ["podman", "pod", "rm", "kafka"]
    subprocess.run(args)
    print("kafka pod removed")


@contextmanager
def setup_env():
    create_pod()
    start_zk()
    start_kafka(0)
    sleep(2)  # make sure kafka-0 has the lowest id
    start_kafka(1)

    try:
        yield
    except:
        logging.getLogger().exception("An error occurred")
    finally:
        stop_kafka(1)
        stop_kafka(0)
        stop_zk()
        destroy_pod()


def test():

    logging.basicConfig()
    logging.getLogger().setLevel("DEBUG")

    port = PORT_START
    bootstrap = f"localhost:{port}"

    with setup_env():
        # Wait for the servers to be ready and for the metadata to be available
        sleep(7)

        kc = KafkaClient(bootstrap_servers=[bootstrap])

        # Poll to update the metadata from the bootstrap server
        kc.poll()
        kc.poll()
        assert len(kc.cluster.brokers()) == 2, kc.cluster.brokers()

        # Force openning a connection to the first server
        first_id = min(broker.nodeId for broker in kc.cluster.brokers())
        kc.maybe_connect(first_id)
        kc.poll()
        kc.poll()

        print("=" * 50)
        print("=" * 50)

        stop_kafka(1)
        sleep(2)  # give time for kafka-0 to notice

        original_update_metadata = kc.cluster.update_metadata

        def patched_update_metadata(*args, **kwargs):
            latest_id = max(broker.nodeId for broker in kc.cluster.brokers())
            print(kc._connecting)
            kc.maybe_connect(latest_id)
            print(kc._connecting)
            print("*" * 50)
            print("*" * 50)
            original_update_metadata(*args, **kwargs)

        kc.cluster.update_metadata = patched_update_metadata

        kc.cluster.request_update()
        try:
            kc.poll()  # sends the metadata request
            kc.poll()
            kc.poll()
        except AssertionError:
            pass

        kc.cluster.update_metadata = original_update_metadata
        assert len(kc.cluster.brokers()) == 1, kc.cluster.brokers()

        while True:
            try:
                kc.poll()
            except AssertionError:
                print("x", end="", flush=True)
            else:
                print(".", end="", flush=True)

            sleep(0.5)

if __name__ == "__main__":
    test()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions