@@ -472,14 +472,48 @@ def delete_topics(self, topics, timeout_ms=None):
472
472
.format (version ))
473
473
return response
474
474
475
- # list topics functionality is in ClusterMetadata
476
- # Note: if implemented here, send the request to the least_loaded_node()
477
475
478
- # describe topics functionality is in ClusterMetadata
479
- # Note: if implemented here, send the request to the controller
476
+ def _get_cluster_metadata (self , topics = None , auto_topic_creation = False ):
477
+ """
478
+ topics == None means "get all topics"
479
+ """
480
+ version = self ._matching_api_version (MetadataRequest )
481
+ if version <= 3 :
482
+ if auto_topic_creation :
483
+ raise IncompatibleBrokerVersion (
484
+ "auto_topic_creation requires MetadataRequest >= v4, which"
485
+ " is not supported by Kafka {}"
486
+ .format (self .config ['api_version' ]))
480
487
481
- # describe cluster functionality is in ClusterMetadata
482
- # Note: if implemented here, send the request to the least_loaded_node()
488
+ request = MetadataRequest [version ](topics = topics )
489
+ elif version <= 5 :
490
+ request = MetadataRequest [version ](
491
+ topics = topics ,
492
+ allow_auto_topic_creation = auto_topic_creation
493
+ )
494
+
495
+ future = self ._send_request_to_node (
496
+ self ._client .least_loaded_node (),
497
+ request
498
+ )
499
+ self ._wait_for_futures ([future ])
500
+ return future .value
501
+
502
+ def list_topics (self ):
503
+ metadata = self ._get_cluster_metadata (topics = None )
504
+ obj = metadata .to_object ()
505
+ return [t ['topic' ] for t in obj ['topics' ]]
506
+
507
+ def describe_topics (self , topics = None ):
508
+ metadata = self ._get_cluster_metadata (topics = topics )
509
+ obj = metadata .to_object ()
510
+ return obj ['topics' ]
511
+
512
+ def describe_cluster (self ):
513
+ metadata = self ._get_cluster_metadata ()
514
+ obj = metadata .to_object ()
515
+ obj .pop ('topics' ) # We have 'describe_topics' for this
516
+ return obj
483
517
484
518
@staticmethod
485
519
def _convert_describe_acls_response_to_acls (describe_response ):
0 commit comments