@@ -116,6 +116,12 @@ def test_resource(cls, filename):
116
116
return path
117
117
return os .path .join (cls .project_root , "servers" , "resources" , "default" , filename )
118
118
119
+ @classmethod
120
+ def run_script (cls , script , * args ):
121
+ result = [os .path .join (cls .kafka_root , 'bin' , script )]
122
+ result .extend ([str (arg ) for arg in args ])
123
+ return result
124
+
119
125
@classmethod
120
126
def kafka_run_class_args (cls , * args ):
121
127
result = [os .path .join (cls .kafka_root , 'bin' , 'kafka-run-class.sh' )]
@@ -202,6 +208,7 @@ def open(self):
202
208
# Configure Zookeeper child process
203
209
template = self .test_resource ("zookeeper.properties" )
204
210
properties = self .tmp_dir .join ("zookeeper.properties" )
211
+ # Consider replacing w/ run_script('zookeper-server-start.sh', ...)
205
212
args = self .kafka_run_class_args ("org.apache.zookeeper.server.quorum.QuorumPeerMain" ,
206
213
properties .strpath )
207
214
env = self .kafka_run_class_env ()
@@ -348,8 +355,7 @@ def _jaas_config(self):
348
355
349
356
def _add_scram_user (self ):
350
357
self .out ("Adding SCRAM credentials for user {} to zookeeper." .format (self .broker_user ))
351
- args = self .kafka_run_class_args (
352
- "kafka.admin.ConfigCommand" ,
358
+ args = self .run_script ('kafka-configs.sh' ,
353
359
"--zookeeper" ,
354
360
"%s:%d/%s" % (self .zookeeper .host ,
355
361
self .zookeeper .port ,
@@ -390,8 +396,7 @@ def out(self, message):
390
396
391
397
def _create_zk_chroot (self ):
392
398
self .out ("Creating Zookeeper chroot node..." )
393
- args = self .kafka_run_class_args ("org.apache.zookeeper.ZooKeeperMain" ,
394
- "-server" ,
399
+ args = self .run_script ('zookeeper-shell.sh' ,
395
400
"%s:%d" % (self .zookeeper .host ,
396
401
self .zookeeper .port ),
397
402
"create" ,
@@ -416,6 +421,7 @@ def start(self):
416
421
properties_template = self .test_resource ("kafka.properties" )
417
422
jaas_conf_template = self .test_resource ("kafka_server_jaas.conf" )
418
423
424
+ # Consider replacing w/ run_script('kafka-server-start.sh', ...)
419
425
args = self .kafka_run_class_args ("kafka.Kafka" , properties .strpath )
420
426
env = self .kafka_run_class_env ()
421
427
if self .sasl_enabled :
@@ -590,7 +596,7 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa
590
596
raise errors .for_code (error_code )
591
597
592
598
def _create_topic_via_cli (self , topic_name , num_partitions , replication_factor ):
593
- args = self .kafka_run_class_args ('kafka.admin.TopicCommand ' ,
599
+ args = self .run_script ('kafka-topics.sh ' ,
594
600
'--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
595
601
self .zookeeper .port ,
596
602
self .zk_chroot ),
@@ -614,12 +620,12 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
614
620
raise RuntimeError ("Failed to create topic %s" % (topic_name ,))
615
621
616
622
def get_topic_names (self ):
617
- args = self .kafka_run_class_args ('kafka.admin.TopicCommand ' ,
623
+ args = self .run_script ('kafka-topics.sh ' ,
618
624
'--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
619
625
self .zookeeper .port ,
620
626
self .zk_chroot ),
621
627
'--list'
622
- )
628
+ )
623
629
env = self .kafka_run_class_env ()
624
630
env .pop ('KAFKA_LOG4J_OPTS' )
625
631
proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
0 commit comments