From 3952b9a2cc5d134df171d97e53589d6c59f54167 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Tue, 21 Apr 2020 11:09:51 +0200 Subject: [PATCH 1/2] Add logic for inferring newer broker versions - New Fetch / ListOffsets request / response objects - Add new test cases to inferr code based on mentioned objects - Add unit test to check inferred version against whatever resides in KAFKA_VERSION - Add new kafka broker versions to integration list - Add more kafka broker versions to travis task list - Add support for broker version 2.5 id --- .travis.yml | 5 + build_integration.sh | 6 +- kafka/conn.py | 16 +- kafka/protocol/admin.py | 24 ++- kafka/protocol/fetch.py | 182 +++++++++++++++++- kafka/protocol/offset.py | 89 ++++++++- servers/2.5.0/resources/kafka.properties | 147 ++++++++++++++ .../2.5.0/resources/kafka_server_jaas.conf | 4 + servers/2.5.0/resources/log4j.properties | 25 +++ servers/2.5.0/resources/zookeeper.properties | 21 ++ test/test_consumer_integration.py | 15 +- 11 files changed, 522 insertions(+), 12 deletions(-) create mode 100644 servers/2.5.0/resources/kafka.properties create mode 100644 servers/2.5.0/resources/kafka_server_jaas.conf create mode 100644 servers/2.5.0/resources/log4j.properties create mode 100644 servers/2.5.0/resources/zookeeper.properties diff --git a/.travis.yml b/.travis.yml index 8e2fdfedf..cb4e0ad94 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,7 +14,12 @@ env: - KAFKA_VERSION=0.10.2.2 - KAFKA_VERSION=0.11.0.3 - KAFKA_VERSION=1.1.1 + - KAFKA_VERSION=2.0.1 + - KAFKA_VERSION=2.1.1 + - KAFKA_VERSION=2.2.0 + - KAFKA_VERSION=2.3.0 - KAFKA_VERSION=2.4.0 + - KAFKA_VERSION=2.5.0 addons: apt: diff --git a/build_integration.sh b/build_integration.sh index 98b9b2766..d91bf7acf 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,6 +1,6 @@ #!/bin/bash -: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"} +: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.0 2.2.1 2.3.0 2.4.0 2.5.0"} : ${SCALA_VERSION:=2.11} : ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/} : ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git} @@ -33,12 +33,14 @@ pushd servers echo "-------------------------------------" echo "Checking kafka binaries for ${kafka}" echo - # kafka 0.8.0 is only available w/ scala 2.8.0 if [ "$kafka" == "0.8.0" ]; then KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz" + else if [ "$kafka" \> "2.4.0" ]; then + KAFKA_ARTIFACT="kafka_2.12-${kafka}.tgz" else KAFKA_ARTIFACT="kafka_${SCALA_VERSION}-${kafka}.tgz" fi + fi if [ ! -f "../$kafka/kafka-bin/bin/kafka-run-class.sh" ]; then if [ -f "${KAFKA_ARTIFACT}" ]; then echo "Using cached artifact: ${KAFKA_ARTIFACT}" diff --git a/kafka/conn.py b/kafka/conn.py index c383123ca..38ce57fa5 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -24,9 +24,12 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import SaslHandShakeRequest +from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest from kafka.protocol.commit import OffsetFetchRequest +from kafka.protocol.offset import OffsetRequest +from kafka.protocol.produce import ProduceRequest from kafka.protocol.metadata import MetadataRequest +from kafka.protocol.fetch import FetchRequest from kafka.protocol.parser import KafkaProtocol from kafka.protocol.types import Int32, Int8 from kafka.scram import ScramClient @@ -1166,10 +1169,21 @@ def _infer_broker_version_from_api_versions(self, api_versions): # in reverse order. As soon as we find one that works, return it test_cases = [ # format (, ) + ((2, 5, 0), DescribeAclsRequest[2]), + ((2, 4, 0), ProduceRequest[8]), + ((2, 3, 0), FetchRequest[11]), + ((2, 2, 0), OffsetRequest[5]), + ((2, 1, 0), FetchRequest[10]), + ((2, 0, 0), FetchRequest[8]), + ((1, 1, 0), FetchRequest[7]), ((1, 0, 0), MetadataRequest[5]), ((0, 11, 0), MetadataRequest[4]), ((0, 10, 2), OffsetFetchRequest[2]), ((0, 10, 1), MetadataRequest[2]), + # taken from https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_feature.c#L234 + ((0, 9, 0), ProduceRequest[1]), + ((0, 8, 2), OffsetFetchRequest[1]), + ((0, 8, 1), OffsetFetchRequest[0]), ] # Get the best match of test cases diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index b2694dc96..1d3900737 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -477,6 +477,13 @@ class DescribeAclsResponse_v1(Response): ('permission_type', Int8))))) ) + +class DescribeAclsResponse_v2(Response): + API_KEY = 29 + API_VERSION = 2 + SCHEMA = DescribeAclsResponse_v1.SCHEMA + + class DescribeAclsRequest_v0(Request): API_KEY = 29 API_VERSION = 0 @@ -490,6 +497,7 @@ class DescribeAclsRequest_v0(Request): ('permission_type', Int8) ) + class DescribeAclsRequest_v1(Request): API_KEY = 29 API_VERSION = 1 @@ -504,8 +512,19 @@ class DescribeAclsRequest_v1(Request): ('permission_type', Int8) ) -DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1] -DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1] + +class DescribeAclsRequest_v2(Request): + """ + Enable flexible version + """ + API_KEY = 29 + API_VERSION = 2 + RESPONSE_TYPE = DescribeAclsResponse_v2 + SCHEMA = DescribeAclsRequest_v1.SCHEMA + + +DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1, DescribeAclsRequest_v2] +DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1, DescribeAclsResponse_v2] class CreateAclsResponse_v0(Response): API_KEY = 30 @@ -862,3 +881,4 @@ class CreatePartitionsRequest_v1(Request): CreatePartitionsResponse = [ CreatePartitionsResponse_v0, CreatePartitionsResponse_v1, ] + diff --git a/kafka/protocol/fetch.py b/kafka/protocol/fetch.py index dd3f648cf..f367848ce 100644 --- a/kafka/protocol/fetch.py +++ b/kafka/protocol/fetch.py @@ -94,6 +94,72 @@ class FetchResponse_v6(Response): SCHEMA = FetchResponse_v5.SCHEMA +class FetchResponse_v7(Response): + """ + Add error_code and session_id to response + """ + API_KEY = 1 + API_VERSION = 7 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('message_set', Bytes))))) + ) + + +class FetchResponse_v8(Response): + API_KEY = 1 + API_VERSION = 8 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v9(Response): + API_KEY = 1 + API_VERSION = 9 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v10(Response): + API_KEY = 1 + API_VERSION = 10 + SCHEMA = FetchResponse_v7.SCHEMA + + +class FetchResponse_v11(Response): + API_KEY = 1 + API_VERSION = 11 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('error_code', Int16), + ('session_id', Int32), + ('topics', Array( + ('topics', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('highwater_offset', Int64), + ('last_stable_offset', Int64), + ('log_start_offset', Int64), + ('aborted_transactions', Array( + ('producer_id', Int64), + ('first_offset', Int64))), + ('preferred_read_replica', Int32), + ('message_set', Bytes))))) + ) + + class FetchRequest_v0(Request): API_KEY = 1 API_VERSION = 0 @@ -196,13 +262,125 @@ class FetchRequest_v6(Request): SCHEMA = FetchRequest_v5.SCHEMA +class FetchRequest_v7(Request): + """ + Add incremental fetch requests + """ + API_KEY = 1 + API_VERSION = 7 + RESPONSE_TYPE = FetchResponse_v7 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ) + + +class FetchRequest_v8(Request): + """ + bump used to indicate that on quota violation brokers send out responses before throttling. + """ + API_KEY = 1 + API_VERSION = 8 + RESPONSE_TYPE = FetchResponse_v8 + SCHEMA = FetchRequest_v7.SCHEMA + + +class FetchRequest_v9(Request): + """ + adds the current leader epoch (see KIP-320) + """ + API_KEY = 1 + API_VERSION = 9 + RESPONSE_TYPE = FetchResponse_v9 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)), + )), + ) + + +class FetchRequest_v10(Request): + """ + bumped up to indicate ZStandard capability. (see KIP-110) + """ + API_KEY = 1 + API_VERSION = 10 + RESPONSE_TYPE = FetchResponse_v10 + SCHEMA = FetchRequest_v9.SCHEMA + + +class FetchRequest_v11(Request): + """ + added rack ID to support read from followers (KIP-392) + """ + API_KEY = 1 + API_VERSION = 11 + RESPONSE_TYPE = FetchResponse_v11 + SCHEMA = Schema( + ('replica_id', Int32), + ('max_wait_time', Int32), + ('min_bytes', Int32), + ('max_bytes', Int32), + ('isolation_level', Int8), + ('session_id', Int32), + ('session_epoch', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int32), + ('fetch_offset', Int64), + ('log_start_offset', Int64), + ('max_bytes', Int32))))), + ('forgotten_topics_data', Array( + ('topic', String), + ('partitions', Array(Int32)) + )), + ('rack_id', String('utf-8')), + ) + + FetchRequest = [ FetchRequest_v0, FetchRequest_v1, FetchRequest_v2, FetchRequest_v3, FetchRequest_v4, FetchRequest_v5, - FetchRequest_v6 + FetchRequest_v6, FetchRequest_v7, FetchRequest_v8, + FetchRequest_v9, FetchRequest_v10, FetchRequest_v11, ] FetchResponse = [ FetchResponse_v0, FetchResponse_v1, FetchResponse_v2, FetchResponse_v3, FetchResponse_v4, FetchResponse_v5, - FetchResponse_v6 + FetchResponse_v6, FetchResponse_v7, FetchResponse_v8, + FetchResponse_v9, FetchResponse_v10, FetchResponse_v11, ] diff --git a/kafka/protocol/offset.py b/kafka/protocol/offset.py index 3c254de40..1ed382b0d 100644 --- a/kafka/protocol/offset.py +++ b/kafka/protocol/offset.py @@ -53,6 +53,43 @@ class OffsetResponse_v2(Response): ) +class OffsetResponse_v3(Response): + """ + on quota violation, brokers send out responses before throttling + """ + API_KEY = 2 + API_VERSION = 3 + SCHEMA = OffsetResponse_v2.SCHEMA + + +class OffsetResponse_v4(Response): + """ + Add leader_epoch to response + """ + API_KEY = 2 + API_VERSION = 4 + SCHEMA = Schema( + ('throttle_time_ms', Int32), + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('error_code', Int16), + ('timestamp', Int64), + ('offset', Int64), + ('leader_epoch', Int32))))) + ) + + +class OffsetResponse_v5(Response): + """ + adds a new error code, OFFSET_NOT_AVAILABLE + """ + API_KEY = 2 + API_VERSION = 5 + SCHEMA = OffsetResponse_v4.SCHEMA + + class OffsetRequest_v0(Request): API_KEY = 2 API_VERSION = 0 @@ -105,5 +142,53 @@ class OffsetRequest_v2(Request): } -OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2] -OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2] +class OffsetRequest_v3(Request): + API_KEY = 2 + API_VERSION = 3 + RESPONSE_TYPE = OffsetResponse_v3 + SCHEMA = OffsetRequest_v2.SCHEMA + DEFAULTS = { + 'replica_id': -1 + } + + +class OffsetRequest_v4(Request): + """ + Add current_leader_epoch to request + """ + API_KEY = 2 + API_VERSION = 4 + RESPONSE_TYPE = OffsetResponse_v4 + SCHEMA = Schema( + ('replica_id', Int32), + ('isolation_level', Int8), # <- added isolation_level + ('topics', Array( + ('topic', String('utf-8')), + ('partitions', Array( + ('partition', Int32), + ('current_leader_epoch', Int64), + ('timestamp', Int64))))) + ) + DEFAULTS = { + 'replica_id': -1 + } + + +class OffsetRequest_v5(Request): + API_KEY = 2 + API_VERSION = 5 + RESPONSE_TYPE = OffsetResponse_v5 + SCHEMA = OffsetRequest_v4.SCHEMA + DEFAULTS = { + 'replica_id': -1 + } + + +OffsetRequest = [ + OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2, + OffsetRequest_v3, OffsetRequest_v4, OffsetRequest_v5, +] +OffsetResponse = [ + OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2, + OffsetResponse_v3, OffsetResponse_v4, OffsetResponse_v5, +] diff --git a/servers/2.5.0/resources/kafka.properties b/servers/2.5.0/resources/kafka.properties new file mode 100644 index 000000000..5775cfdc4 --- /dev/null +++ b/servers/2.5.0/resources/kafka.properties @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# see kafka.server.KafkaConfig for additional details and defaults + +############################# Server Basics ############################# + +# The id of the broker. This must be set to a unique integer for each broker. +broker.id={broker_id} + +############################# Socket Server Settings ############################# + +listeners={transport}://{host}:{port} +security.inter.broker.protocol={transport} + +{sasl_config} + +ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks +ssl.keystore.password=foobar +ssl.key.password=foobar +ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks +ssl.truststore.password=foobar + +authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer +allow.everyone.if.no.acl.found=true + +# The port the socket server listens on +#port=9092 + +# Hostname the broker will bind to. If not set, the server will bind to all interfaces +#host.name=localhost + +# Hostname the broker will advertise to producers and consumers. If not set, it uses the +# value for "host.name" if configured. Otherwise, it will use the value returned from +# java.net.InetAddress.getCanonicalHostName(). +#advertised.host.name= + +# The port to publish to ZooKeeper for clients to use. If this is not set, +# it will publish the same port that the broker binds to. +#advertised.port= + +# The number of threads handling network requests +num.network.threads=3 + +# The number of threads doing disk I/O +num.io.threads=8 + +# The send buffer (SO_SNDBUF) used by the socket server +socket.send.buffer.bytes=102400 + +# The receive buffer (SO_RCVBUF) used by the socket server +socket.receive.buffer.bytes=102400 + +# The maximum size of a request that the socket server will accept (protection against OOM) +socket.request.max.bytes=104857600 + + +############################# Log Basics ############################# + +# A comma seperated list of directories under which to store log files +log.dirs={tmp_dir}/data + +# The default number of log partitions per topic. More partitions allow greater +# parallelism for consumption, but this will also result in more files across +# the brokers. +num.partitions={partitions} +default.replication.factor={replicas} + +## Short Replica Lag -- Drops failed brokers out of ISR +replica.lag.time.max.ms=1000 +replica.socket.timeout.ms=1000 + +############################# Log Flush Policy ############################# + +# Messages are immediately written to the filesystem but by default we only fsync() to sync +# the OS cache lazily. The following configurations control the flush of data to disk. +# There are a few important trade-offs here: +# 1. Durability: Unflushed data may be lost if you are not using replication. +# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. +# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. +# The settings below allow one to configure the flush policy to flush data after a period of time or +# every N messages (or both). This can be done globally and overridden on a per-topic basis. + +# The number of messages to accept before forcing a flush of data to disk +#log.flush.interval.messages=10000 + +# The maximum amount of time a message can sit in a log before we force a flush +#log.flush.interval.ms=1000 + +############################# Log Retention Policy ############################# + +# The following configurations control the disposal of log segments. The policy can +# be set to delete segments after a period of time, or after a given size has accumulated. +# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens +# from the end of the log. + +# The minimum age of a log file to be eligible for deletion +log.retention.hours=168 + +# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining +# segments don't drop below log.retention.bytes. +#log.retention.bytes=1073741824 + +# The maximum size of a log segment file. When this size is reached a new log segment will be created. +log.segment.bytes=1073741824 + +# The interval at which log segments are checked to see if they can be deleted according +# to the retention policies +log.retention.check.interval.ms=300000 + +# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. +# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. +log.cleaner.enable=false + +# tune down offset topics to reduce setup time in tests +offsets.commit.timeout.ms=500 +offsets.topic.num.partitions=2 +offsets.topic.replication.factor=1 + +# Allow shorter session timeouts for tests +group.min.session.timeout.ms=1000 + + +############################# Zookeeper ############################# + +# Zookeeper connection string (see zookeeper docs for details). +# This is a comma separated host:port pairs, each corresponding to a zk +# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". +# You can also append an optional chroot string to the urls to specify the +# root directory for all kafka znodes. +zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} + +# Timeout in ms for connecting to zookeeper +zookeeper.connection.timeout.ms=30000 +# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly +zookeeper.session.timeout.ms=500 diff --git a/servers/2.5.0/resources/kafka_server_jaas.conf b/servers/2.5.0/resources/kafka_server_jaas.conf new file mode 100644 index 000000000..18efe4369 --- /dev/null +++ b/servers/2.5.0/resources/kafka_server_jaas.conf @@ -0,0 +1,4 @@ +KafkaServer {{ + {jaas_config} +}}; +Client {{}}; \ No newline at end of file diff --git a/servers/2.5.0/resources/log4j.properties b/servers/2.5.0/resources/log4j.properties new file mode 100644 index 000000000..b0b76aa79 --- /dev/null +++ b/servers/2.5.0/resources/log4j.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +log4j.rootLogger=INFO, stdout, logfile + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n + +log4j.appender.logfile=org.apache.log4j.FileAppender +log4j.appender.logfile.File=${kafka.logs.dir}/server.log +log4j.appender.logfile.layout=org.apache.log4j.PatternLayout +log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/servers/2.5.0/resources/zookeeper.properties b/servers/2.5.0/resources/zookeeper.properties new file mode 100644 index 000000000..e3fd09742 --- /dev/null +++ b/servers/2.5.0/resources/zookeeper.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# the directory where the snapshot is stored. +dataDir={tmp_dir} +# the port at which the clients will connect +clientPort={port} +clientPortAddress={host} +# disable the per-ip limit on the number of connections since this is a non-production config +maxClientCnxns=0 diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 6e6bc9455..90b7ed203 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -6,14 +6,23 @@ from kafka.vendor.six.moves import range import kafka.codec -from kafka.errors import ( - KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError -) +from kafka.errors import UnsupportedCodecError, UnsupportedVersionError from kafka.structs import TopicPartition, OffsetAndTimestamp from test.testutil import Timer, assert_message_count, env_kafka_version, random_string +@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") +def test_kafka_version_infer(kafka_consumer_factory): + consumer = kafka_consumer_factory() + actual_ver_major_minor = env_kafka_version()[:2] + client = consumer._client + conn = list(client._conns.values())[0] + inferred_ver_major_minor = conn.check_version()[:2] + assert actual_ver_major_minor == inferred_ver_major_minor, \ + "Was expecting inferred broker version to be %s but was %s" % (actual_ver_major_minor, inferred_ver_major_minor) + + @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") def test_kafka_consumer(kafka_consumer_factory, send_messages): """Test KafkaConsumer""" From f1aa128874fca4dee664bd652dafaabe8c054637 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 27 Apr 2020 21:16:56 +0200 Subject: [PATCH 2/2] Implement PR change requests: fewer versions for travis testing, remove unused older versions for inference code, remove one minor version from known server list Do not use newly created ACL request / responses in allowed version lists, due to flexible versions enabling in kafka actually requiring a serialization protocol header update Revert admin client file change --- .travis.yml | 4 - build_integration.sh | 2 +- kafka/conn.py | 8 +- kafka/protocol/admin.py | 4 +- servers/2.2.0/resources/kafka.properties | 147 ------------------ .../2.2.0/resources/kafka_server_jaas.conf | 4 - servers/2.2.0/resources/log4j.properties | 25 --- servers/2.2.0/resources/zookeeper.properties | 21 --- 8 files changed, 5 insertions(+), 210 deletions(-) delete mode 100644 servers/2.2.0/resources/kafka.properties delete mode 100644 servers/2.2.0/resources/kafka_server_jaas.conf delete mode 100644 servers/2.2.0/resources/log4j.properties delete mode 100644 servers/2.2.0/resources/zookeeper.properties diff --git a/.travis.yml b/.travis.yml index cb4e0ad94..b98aa16b1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,10 +14,6 @@ env: - KAFKA_VERSION=0.10.2.2 - KAFKA_VERSION=0.11.0.3 - KAFKA_VERSION=1.1.1 - - KAFKA_VERSION=2.0.1 - - KAFKA_VERSION=2.1.1 - - KAFKA_VERSION=2.2.0 - - KAFKA_VERSION=2.3.0 - KAFKA_VERSION=2.4.0 - KAFKA_VERSION=2.5.0 diff --git a/build_integration.sh b/build_integration.sh index d91bf7acf..c020b0fe2 100755 --- a/build_integration.sh +++ b/build_integration.sh @@ -1,6 +1,6 @@ #!/bin/bash -: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.0 2.2.1 2.3.0 2.4.0 2.5.0"} +: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.1 2.3.0 2.4.0 2.5.0"} : ${SCALA_VERSION:=2.11} : ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/} : ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git} diff --git a/kafka/conn.py b/kafka/conn.py index 38ce57fa5..5c7287568 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -24,7 +24,7 @@ from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate from kafka.oauth.abstract import AbstractTokenProvider -from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest +from kafka.protocol.admin import SaslHandShakeRequest, DescribeAclsRequest_v2 from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.offset import OffsetRequest from kafka.protocol.produce import ProduceRequest @@ -1169,7 +1169,7 @@ def _infer_broker_version_from_api_versions(self, api_versions): # in reverse order. As soon as we find one that works, return it test_cases = [ # format (, ) - ((2, 5, 0), DescribeAclsRequest[2]), + ((2, 5, 0), DescribeAclsRequest_v2), ((2, 4, 0), ProduceRequest[8]), ((2, 3, 0), FetchRequest[11]), ((2, 2, 0), OffsetRequest[5]), @@ -1180,10 +1180,6 @@ def _infer_broker_version_from_api_versions(self, api_versions): ((0, 11, 0), MetadataRequest[4]), ((0, 10, 2), OffsetFetchRequest[2]), ((0, 10, 1), MetadataRequest[2]), - # taken from https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_feature.c#L234 - ((0, 9, 0), ProduceRequest[1]), - ((0, 8, 2), OffsetFetchRequest[1]), - ((0, 8, 1), OffsetFetchRequest[0]), ] # Get the best match of test cases diff --git a/kafka/protocol/admin.py b/kafka/protocol/admin.py index 1d3900737..af88ea473 100644 --- a/kafka/protocol/admin.py +++ b/kafka/protocol/admin.py @@ -523,8 +523,8 @@ class DescribeAclsRequest_v2(Request): SCHEMA = DescribeAclsRequest_v1.SCHEMA -DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1, DescribeAclsRequest_v2] -DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1, DescribeAclsResponse_v2] +DescribeAclsRequest = [DescribeAclsRequest_v0, DescribeAclsRequest_v1] +DescribeAclsResponse = [DescribeAclsResponse_v0, DescribeAclsResponse_v1] class CreateAclsResponse_v0(Response): API_KEY = 30 diff --git a/servers/2.2.0/resources/kafka.properties b/servers/2.2.0/resources/kafka.properties deleted file mode 100644 index 5775cfdc4..000000000 --- a/servers/2.2.0/resources/kafka.properties +++ /dev/null @@ -1,147 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults - -############################# Server Basics ############################# - -# The id of the broker. This must be set to a unique integer for each broker. -broker.id={broker_id} - -############################# Socket Server Settings ############################# - -listeners={transport}://{host}:{port} -security.inter.broker.protocol={transport} - -{sasl_config} - -ssl.keystore.location={ssl_dir}/kafka.server.keystore.jks -ssl.keystore.password=foobar -ssl.key.password=foobar -ssl.truststore.location={ssl_dir}/kafka.server.truststore.jks -ssl.truststore.password=foobar - -authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer -allow.everyone.if.no.acl.found=true - -# The port the socket server listens on -#port=9092 - -# Hostname the broker will bind to. If not set, the server will bind to all interfaces -#host.name=localhost - -# Hostname the broker will advertise to producers and consumers. If not set, it uses the -# value for "host.name" if configured. Otherwise, it will use the value returned from -# java.net.InetAddress.getCanonicalHostName(). -#advertised.host.name= - -# The port to publish to ZooKeeper for clients to use. If this is not set, -# it will publish the same port that the broker binds to. -#advertised.port= - -# The number of threads handling network requests -num.network.threads=3 - -# The number of threads doing disk I/O -num.io.threads=8 - -# The send buffer (SO_SNDBUF) used by the socket server -socket.send.buffer.bytes=102400 - -# The receive buffer (SO_RCVBUF) used by the socket server -socket.receive.buffer.bytes=102400 - -# The maximum size of a request that the socket server will accept (protection against OOM) -socket.request.max.bytes=104857600 - - -############################# Log Basics ############################# - -# A comma seperated list of directories under which to store log files -log.dirs={tmp_dir}/data - -# The default number of log partitions per topic. More partitions allow greater -# parallelism for consumption, but this will also result in more files across -# the brokers. -num.partitions={partitions} -default.replication.factor={replicas} - -## Short Replica Lag -- Drops failed brokers out of ISR -replica.lag.time.max.ms=1000 -replica.socket.timeout.ms=1000 - -############################# Log Flush Policy ############################# - -# Messages are immediately written to the filesystem but by default we only fsync() to sync -# the OS cache lazily. The following configurations control the flush of data to disk. -# There are a few important trade-offs here: -# 1. Durability: Unflushed data may be lost if you are not using replication. -# 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. -# 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. -# The settings below allow one to configure the flush policy to flush data after a period of time or -# every N messages (or both). This can be done globally and overridden on a per-topic basis. - -# The number of messages to accept before forcing a flush of data to disk -#log.flush.interval.messages=10000 - -# The maximum amount of time a message can sit in a log before we force a flush -#log.flush.interval.ms=1000 - -############################# Log Retention Policy ############################# - -# The following configurations control the disposal of log segments. The policy can -# be set to delete segments after a period of time, or after a given size has accumulated. -# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens -# from the end of the log. - -# The minimum age of a log file to be eligible for deletion -log.retention.hours=168 - -# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining -# segments don't drop below log.retention.bytes. -#log.retention.bytes=1073741824 - -# The maximum size of a log segment file. When this size is reached a new log segment will be created. -log.segment.bytes=1073741824 - -# The interval at which log segments are checked to see if they can be deleted according -# to the retention policies -log.retention.check.interval.ms=300000 - -# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires. -# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction. -log.cleaner.enable=false - -# tune down offset topics to reduce setup time in tests -offsets.commit.timeout.ms=500 -offsets.topic.num.partitions=2 -offsets.topic.replication.factor=1 - -# Allow shorter session timeouts for tests -group.min.session.timeout.ms=1000 - - -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. -zookeeper.connect={zk_host}:{zk_port}/{zk_chroot} - -# Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=30000 -# We want to expire kafka broker sessions quickly when brokers die b/c we restart them quickly -zookeeper.session.timeout.ms=500 diff --git a/servers/2.2.0/resources/kafka_server_jaas.conf b/servers/2.2.0/resources/kafka_server_jaas.conf deleted file mode 100644 index 18efe4369..000000000 --- a/servers/2.2.0/resources/kafka_server_jaas.conf +++ /dev/null @@ -1,4 +0,0 @@ -KafkaServer {{ - {jaas_config} -}}; -Client {{}}; \ No newline at end of file diff --git a/servers/2.2.0/resources/log4j.properties b/servers/2.2.0/resources/log4j.properties deleted file mode 100644 index b0b76aa79..000000000 --- a/servers/2.2.0/resources/log4j.properties +++ /dev/null @@ -1,25 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -log4j.rootLogger=INFO, stdout, logfile - -log4j.appender.stdout=org.apache.log4j.ConsoleAppender -log4j.appender.stdout.layout=org.apache.log4j.PatternLayout -log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n - -log4j.appender.logfile=org.apache.log4j.FileAppender -log4j.appender.logfile.File=${kafka.logs.dir}/server.log -log4j.appender.logfile.layout=org.apache.log4j.PatternLayout -log4j.appender.logfile.layout.ConversionPattern=[%d] %p %m (%c)%n diff --git a/servers/2.2.0/resources/zookeeper.properties b/servers/2.2.0/resources/zookeeper.properties deleted file mode 100644 index e3fd09742..000000000 --- a/servers/2.2.0/resources/zookeeper.properties +++ /dev/null @@ -1,21 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# the directory where the snapshot is stored. -dataDir={tmp_dir} -# the port at which the clients will connect -clientPort={port} -clientPortAddress={host} -# disable the per-ip limit on the number of connections since this is a non-production config -maxClientCnxns=0