From b2b87e165358d50110275ebad3c6ea99c2992c5f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Jun 2025 14:49:02 -0700 Subject: [PATCH 1/4] Add test_aws_msk_iam_sasl_mechanism --- kafka/sasl/msk.py | 7 +++++-- test/sasl/test_msk.py | 16 +++++++++++++++- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index db56b4801..0be10f332 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -27,16 +27,19 @@ def __init__(self, **config): self._is_done = False self._is_authenticated = False - def auth_bytes(self): + def _build_client(self): session = BotoSession() credentials = session.get_credentials().get_frozen_credentials() - client = AwsMskIamClient( + return AwsMskIamClient( host=self.host, access_key=credentials.access_key, secret_key=credentials.secret_key, region=session.get_config_variable('region'), token=credentials.token, ) + + def auth_bytes(self): + client = self._build_client() return client.first_message() def receive(self, auth_bytes): diff --git a/test/sasl/test_msk.py b/test/sasl/test_msk.py index e9f1325f3..f3cc46ce8 100644 --- a/test/sasl/test_msk.py +++ b/test/sasl/test_msk.py @@ -2,7 +2,7 @@ import json import sys -from kafka.sasl.msk import AwsMskIamClient +from kafka.sasl.msk import AwsMskIamClient, SaslMechanismAwsMskIam try: from unittest import mock @@ -69,3 +69,17 @@ def test_aws_msk_iam_client_temporary_credentials(): 'x-amz-security-token': 'XXXXX', } assert actual == expected + + +def test_aws_msk_iam_sasl_mechanism(): + with mock.patch('kafka.sasl.msk.BotoSession'): + sasl = SaslMechanismAwsMskIam(security_protocol='SASL_SSL', host='localhost') + with mock.patch.object(sasl, '_build_client', return_value=client_factory(token=None)): + assert sasl.auth_bytes() != b'' + assert not sasl.is_done() + assert not sasl.is_authenticated() + sasl.receive(b'foo') + assert sasl._auth == 'foo' + assert sasl.is_done() + assert sasl.is_authenticated() + assert sasl.auth_details() From 687cdfef3c46b6152f1b229eac21cd55a0806039 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Jun 2025 13:47:26 -0700 Subject: [PATCH 2/4] typo --- kafka/sasl/msk.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index 0be10f332..8cea31509 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -45,7 +45,7 @@ def auth_bytes(self): def receive(self, auth_bytes): self._is_done = True self._is_authenticated = auth_bytes != b'' - self._auth = auth_bytes.deode('utf-8') + self._auth = auth_bytes.decode('utf-8') def is_done(self): return self._is_done From f577e7d08ef6ed3de5f4348250c8f2c1ce3ed2c8 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Jun 2025 13:47:43 -0700 Subject: [PATCH 3/4] log scope --- kafka/sasl/msk.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index 8cea31509..c6311cc7e 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -4,6 +4,7 @@ import hashlib import hmac import json +import logging import string # needed for AWS_MSK_IAM authentication: @@ -17,6 +18,9 @@ from kafka.vendor.six.moves import urllib +log = logging.getLogger(__name__) + + class SaslMechanismAwsMskIam(SaslMechanism): def __init__(self, **config): assert BotoSession is not None, 'AWS_MSK_IAM requires the "botocore" package' @@ -40,6 +44,7 @@ def _build_client(self): def auth_bytes(self): client = self._build_client() + log.debug("Generating auth token for MSK scope: %s", client._scope) return client.first_message() def receive(self, auth_bytes): From 98cb219188952fd334aba9a017d9303f7e5f2d13 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 4 Jun 2025 14:49:18 -0700 Subject: [PATCH 4/4] Raise configuration error if region is none --- kafka/sasl/msk.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/sasl/msk.py b/kafka/sasl/msk.py index c6311cc7e..7ec03215d 100644 --- a/kafka/sasl/msk.py +++ b/kafka/sasl/msk.py @@ -14,6 +14,7 @@ # no botocore available, will disable AWS_MSK_IAM mechanism BotoSession = None +from kafka.errors import KafkaConfigurationError from kafka.sasl.abc import SaslMechanism from kafka.vendor.six.moves import urllib @@ -34,6 +35,8 @@ def __init__(self, **config): def _build_client(self): session = BotoSession() credentials = session.get_credentials().get_frozen_credentials() + if not session.get_config_variable('region'): + raise KafkaConfigurationError('Unable to determine region for AWS MSK cluster. Is AWS_DEFAULT_REGION set?') return AwsMskIamClient( host=self.host, access_key=credentials.access_key,