23
23
from kafka .serializer import Serializer
24
24
from kafka .structs import TopicPartition
25
25
26
-
27
26
log = logging .getLogger (__name__ )
28
27
PRODUCER_CLIENT_ID_SEQUENCE = AtomicInteger ()
29
28
@@ -376,13 +375,13 @@ def __init__(self, **configs):
376
375
reporters = [reporter () for reporter in self .config ['metric_reporters' ]]
377
376
self ._metrics = Metrics (metric_config , reporters )
378
377
379
- client = KafkaClient (metrics = self ._metrics , metric_group_prefix = 'producer' ,
380
- wakeup_timeout_ms = self .config ['max_block_ms' ],
381
- ** self .config )
378
+ self . _client = KafkaClient (metrics = self ._metrics , metric_group_prefix = 'producer' ,
379
+ wakeup_timeout_ms = self .config ['max_block_ms' ],
380
+ ** self .config )
382
381
383
382
# Get auto-discovered version from client if necessary
384
383
if self .config ['api_version' ] is None :
385
- self .config ['api_version' ] = client .config ['api_version' ]
384
+ self .config ['api_version' ] = self . _client .config ['api_version' ]
386
385
387
386
if self .config ['compression_type' ] == 'lz4' :
388
387
assert self .config ['api_version' ] >= (0 , 8 , 2 ), 'LZ4 Requires >= Kafka 0.8.2 Brokers'
@@ -398,9 +397,9 @@ def __init__(self, **configs):
398
397
399
398
message_version = self ._max_usable_produce_magic ()
400
399
self ._accumulator = RecordAccumulator (message_version = message_version , metrics = self ._metrics , ** self .config )
401
- self ._metadata = client .cluster
400
+ self ._metadata = self . _client .cluster
402
401
guarantee_message_order = bool (self .config ['max_in_flight_requests_per_connection' ] == 1 )
403
- self ._sender = Sender (client , self ._metadata ,
402
+ self ._sender = Sender (self . _client , self ._metadata ,
404
403
self ._accumulator , self ._metrics ,
405
404
guarantee_message_order = guarantee_message_order ,
406
405
** self .config )
@@ -412,14 +411,22 @@ def __init__(self, **configs):
412
411
atexit .register (self ._cleanup )
413
412
log .debug ("Kafka producer started" )
414
413
414
+ def bootstrap_connected (self ):
415
+ """Return True if the bootstrap is connected."""
416
+ if self ._client ._bootstrap_fails > 0 :
417
+ return False
418
+ return True
419
+
415
420
def _cleanup_factory (self ):
416
421
"""Build a cleanup clojure that doesn't increase our ref count"""
417
422
_self = weakref .proxy (self )
423
+
418
424
def wrapper ():
419
425
try :
420
426
_self .close (timeout = 0 )
421
427
except (ReferenceError , AttributeError ):
422
428
pass
429
+
423
430
return wrapper
424
431
425
432
def _unregister_cleanup (self ):
0 commit comments