@@ -116,6 +116,12 @@ def test_resource(cls, filename):
116
116
return path
117
117
return os .path .join (cls .project_root , "servers" , "resources" , "default" , filename )
118
118
119
+ @classmethod
120
+ def run_script (cls , script , * args ):
121
+ result = [os .path .join (cls .kafka_root , 'bin' , script )]
122
+ result .extend ([str (arg ) for arg in args ])
123
+ return result
124
+
119
125
@classmethod
120
126
def kafka_run_class_args (cls , * args ):
121
127
result = [os .path .join (cls .kafka_root , 'bin' , 'kafka-run-class.sh' )]
@@ -202,6 +208,7 @@ def open(self):
202
208
# Configure Zookeeper child process
203
209
template = self .test_resource ("zookeeper.properties" )
204
210
properties = self .tmp_dir .join ("zookeeper.properties" )
211
+ # Consider replacing w/ run_script('zookeper-server-start.sh', ...)
205
212
args = self .kafka_run_class_args ("org.apache.zookeeper.server.quorum.QuorumPeerMain" ,
206
213
properties .strpath )
207
214
env = self .kafka_run_class_env ()
@@ -348,17 +355,12 @@ def _jaas_config(self):
348
355
349
356
def _add_scram_user (self ):
350
357
self .out ("Adding SCRAM credentials for user {} to zookeeper." .format (self .broker_user ))
351
- args = self .kafka_run_class_args (
352
- "kafka.admin.ConfigCommand" ,
353
- "--zookeeper" ,
354
- "%s:%d/%s" % (self .zookeeper .host ,
355
- self .zookeeper .port ,
356
- self .zk_chroot ),
357
- "--alter" ,
358
- "--entity-type" , "users" ,
359
- "--entity-name" , self .broker_user ,
360
- "--add-config" ,
361
- "{}=[password={}]" .format (self .sasl_mechanism , self .broker_password ),
358
+ args = self .run_script ('kafka-configs.sh' ,
359
+ '--alter' ,
360
+ '--entity-type' , 'users' ,
361
+ '--entity-name' , self .broker_user ,
362
+ '--add-config' ,
363
+ '{}=[password={}]' .format (self .sasl_mechanism , self .broker_password ),
362
364
)
363
365
env = self .kafka_run_class_env ()
364
366
proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
@@ -390,13 +392,12 @@ def out(self, message):
390
392
391
393
def _create_zk_chroot (self ):
392
394
self .out ("Creating Zookeeper chroot node..." )
393
- args = self .kafka_run_class_args ("org.apache.zookeeper.ZooKeeperMain" ,
394
- "-server" ,
395
- "%s:%d" % (self .zookeeper .host ,
396
- self .zookeeper .port ),
397
- "create" ,
398
- "/%s" % (self .zk_chroot ,),
399
- "kafka-python" )
395
+ args = self .run_script ('zookeeper-shell.sh' ,
396
+ '%s:%d' % (self .zookeeper .host ,
397
+ self .zookeeper .port ),
398
+ 'create' ,
399
+ '/%s' % (self .zk_chroot ,),
400
+ 'kafka-python' )
400
401
env = self .kafka_run_class_env ()
401
402
proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
402
403
@@ -416,6 +417,7 @@ def start(self):
416
417
properties_template = self .test_resource ("kafka.properties" )
417
418
jaas_conf_template = self .test_resource ("kafka_server_jaas.conf" )
418
419
420
+ # Consider replacing w/ run_script('kafka-server-start.sh', ...)
419
421
args = self .kafka_run_class_args ("kafka.Kafka" , properties .strpath )
420
422
env = self .kafka_run_class_env ()
421
423
if self .sasl_enabled :
@@ -590,17 +592,17 @@ def _create_topic_via_admin_api(self, topic_name, num_partitions, replication_fa
590
592
raise errors .for_code (error_code )
591
593
592
594
def _create_topic_via_cli (self , topic_name , num_partitions , replication_factor ):
593
- args = self .kafka_run_class_args ('kafka.admin.TopicCommand ' ,
594
- '--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
595
- self .zookeeper .port ,
596
- self .zk_chroot ),
597
- '--create' ,
598
- '--topic' , topic_name ,
599
- '--partitions' , self .partitions \
600
- if num_partitions is None else num_partitions ,
601
- '--replication-factor' , self .replicas \
602
- if replication_factor is None \
603
- else replication_factor )
595
+ args = self .run_script ('kafka-topics.sh ' ,
596
+ '--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
597
+ self .zookeeper .port ,
598
+ self .zk_chroot ),
599
+ '--create' ,
600
+ '--topic' , topic_name ,
601
+ '--partitions' , self .partitions \
602
+ if num_partitions is None else num_partitions ,
603
+ '--replication-factor' , self .replicas \
604
+ if replication_factor is None \
605
+ else replication_factor )
604
606
if env_kafka_version () >= (0 , 10 ):
605
607
args .append ('--if-not-exists' )
606
608
env = self .kafka_run_class_env ()
@@ -614,12 +616,12 @@ def _create_topic_via_cli(self, topic_name, num_partitions, replication_factor):
614
616
raise RuntimeError ("Failed to create topic %s" % (topic_name ,))
615
617
616
618
def get_topic_names (self ):
617
- args = self .kafka_run_class_args ('kafka.admin.TopicCommand ' ,
619
+ args = self .run_script ('kafka-topics.sh ' ,
618
620
'--zookeeper' , '%s:%s/%s' % (self .zookeeper .host ,
619
621
self .zookeeper .port ,
620
622
self .zk_chroot ),
621
623
'--list'
622
- )
624
+ )
623
625
env = self .kafka_run_class_env ()
624
626
env .pop ('KAFKA_LOG4J_OPTS' )
625
627
proc = subprocess .Popen (args , env = env , stdout = subprocess .PIPE , stderr = subprocess .PIPE )
0 commit comments