1
1
#!/usr/bin/env python
2
- import threading , logging , time
3
- import multiprocessing
2
+ import threading , time
4
3
5
- from kafka import KafkaConsumer , KafkaProducer
4
+ from kafka import KafkaAdminClient , KafkaConsumer , KafkaProducer
5
+ from kafka .admin import NewTopic
6
6
7
7
8
8
class Producer (threading .Thread ):
9
9
def __init__ (self ):
10
10
threading .Thread .__init__ (self )
11
11
self .stop_event = threading .Event ()
12
-
12
+
13
13
def stop (self ):
14
14
self .stop_event .set ()
15
15
@@ -23,14 +23,15 @@ def run(self):
23
23
24
24
producer .close ()
25
25
26
- class Consumer (multiprocessing .Process ):
26
+
27
+ class Consumer (threading .Thread ):
27
28
def __init__ (self ):
28
- multiprocessing . Process .__init__ (self )
29
- self .stop_event = multiprocessing .Event ()
30
-
29
+ threading . Thread .__init__ (self )
30
+ self .stop_event = threading .Event ()
31
+
31
32
def stop (self ):
32
33
self .stop_event .set ()
33
-
34
+
34
35
def run (self ):
35
36
consumer = KafkaConsumer (bootstrap_servers = 'localhost:9092' ,
36
37
auto_offset_reset = 'earliest' ,
@@ -44,29 +45,38 @@ def run(self):
44
45
break
45
46
46
47
consumer .close ()
47
-
48
-
48
+
49
+
49
50
def main ():
51
+ # Create 'my-topic' Kafka topic
52
+ try :
53
+ admin = KafkaAdminClient (bootstrap_servers = 'localhost:9092' )
54
+
55
+ topic = NewTopic (name = 'my-topic' ,
56
+ num_partitions = 1 ,
57
+ replication_factor = 1 )
58
+ admin .create_topics ([topic ])
59
+ except Exception :
60
+ pass
61
+
50
62
tasks = [
51
63
Producer (),
52
64
Consumer ()
53
65
]
54
66
67
+ # Start threads of a publisher/producer and a subscriber/consumer to 'my-topic' Kafka topic
55
68
for t in tasks :
56
69
t .start ()
57
70
58
71
time .sleep (10 )
59
-
72
+
73
+ # Stop threads
60
74
for task in tasks :
61
75
task .stop ()
62
76
63
77
for task in tasks :
64
78
task .join ()
65
-
66
-
79
+
80
+
67
81
if __name__ == "__main__" :
68
- logging .basicConfig (
69
- format = '%(asctime)s.%(msecs)s:%(name)s:%(thread)d:%(levelname)s:%(process)d:%(message)s' ,
70
- level = logging .INFO
71
- )
72
82
main ()
0 commit comments