Description
I am having a strange problem. I have a django endpoint that I am trying to get to post of Kafka topic.
It accepts JSON body, byte encodes that json message then send to kafka via kafka producer.
However the message never makes it to Kafka, it fails silently. After some investigation the producer is eating the error which is struct.error. The error does not had a message. This happens during message encoding.
Any help is appreciated.
Using Python version 3.6.0 (Anaconda package)
OS: Windows 7 64bit
Kafka: 0.11.0.1 (bitnami ovm)
Curl command:
curl -i -X POST -H "Content-Type: application/json" -b "{"id": "1234", "message": "test message 1"}" http://localhost:8000/api/v1/log/
`
Python Code
KAFKA_BROKERLIST = '192.168.95.168:9092'
KAFKA_TOPIC = 'test'
KAFKA_ACKS = 1
def post(self, request):
try:
message = json.loads(request.body, encoding='utf-8')
msg = None
if 'id' not in message: # we need to add an id to the message before we add to kafka
id = uuid.uuid4()
if isinstance(message , list): message.insert(0,id)
elif isinstance(message, dict): message['id']=id
elif isinstance(message, str): message = "id: {} {}".format(id,message)
else: #we could not add id to the message for specified format
raise Exception("Id was not supplied, and could not generate id for message body. message format ={}".format(type(message)))
value = bytes(json.dumps(message),'utf-8')
self.producer.send(topic=settings.KAFKA_TOPIC, value=value, timestamp_ms=time())
self.producer.flush()
# except KafkaError as e:
# raise Exception(str(e))
except Exception as e:
if hasattr(e, 'message'): raise Exception(e.message)
else: raise Exception(str(e))
return ""
#Producer wrap
class loggingProducerView(View):
_producer = None
@property
def producer(self):
if self._producer is None:
self._producer = KafkaProducer(
client_id ='logging_service',
bootstrap_servers = KAFKA_BROKERLIST,
acks = KAFKA_ACKS
)
return self._producer
`
Debug output:
Starting the Kafka producer
Added sensor with name connections-closed
Added sensor with name connections-created
Added sensor with name select-time
Added sensor with name io-time
Bootstrapping cluster metadata from [('192.168.95.168', 9092, <AddressFamily.AF_INET: 2>)]
Attempting to bootstrap via node at 192.168.95.168:9092
Added sensor with name bytes-sent-received
Added sensor with name bytes-sent
Added sensor with name bytes-received
Added sensor with name request-latency
Added sensor with name node-bootstrap.bytes-sent
Added sensor with name node-bootstrap.bytes-received
Added sensor with name node-bootstrap.latency
: creating new socket
: setting socket option (6, 1, 1)
: connecting to 192.168.95.168:9092
: established TCP connection
: Connection complete.
Node bootstrap connected
Request 1: MetadataRequest_v0(topics=[])
Response 1: MetadataResponse_v0(brokers=[(node_id=0, host='192.168.95.168', port=9092)], topics=[(error_code=0, topic='__consumer_offsets', partitions=[(error_code=0, partition=23, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=41, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=32, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=8, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=17, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=44, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=35, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=26, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=11, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=29, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=38, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=47, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=20, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=2, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=5, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=14, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=46, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=49, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=40, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=4, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=13, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=22, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=31, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=16, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=7, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=43, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=25, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=34, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=10, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=37, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=1, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=19, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=28, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=45, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=36, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=27, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=9, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=18, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=21, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=48, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=12, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=3, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=30, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=39, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=15, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=42, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=24, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=33, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=6, leader=0, replicas=[0], isr=[0]), (error_code=0, partition=0, leader=0, replicas=[0], isr=[0])]), (error_code=0, topic='logging', partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 2, groups: 0)
Bootstrap succeeded: found 1 brokers and 2 topics.
: Closing connection.
: reconnect backoff 0.048303844089174645 after 1 failures
Initiating connection to node 0 at 192.168.95.168:9092
Added sensor with name node-0.bytes-sent
Added sensor with name node-0.bytes-received
Added sensor with name node-0.latency
: creating new socket
: setting socket option (6, 1, 1)
: connecting to 192.168.95.168:9092
Node 0 connected
Broker version identifed as 0.11.0
Set configuration api_version=(0, 11, 0) to skip auto check_version requests on startup
Added sensor with name bufferpool-wait-time
Added sensor with name batch-size
Added sensor with name compression-rate
Added sensor with name queue-time
Added sensor with name produce-throttle-time
Added sensor with name records-per-request
Added sensor with name bytes
Added sensor with name record-retries
Added sensor with name errors
Added sensor with name record-size-max
Starting Kafka producer I/O thread.
Kafka producer started
Sending (key=None value=b'{"id": "1234", "message": "test message 1"}') to TopicPartition(topic='logging', partition=0)
Sending metadata request MetadataRequest_v1(topics=['logging']) to node 0
Request 3: MetadataRequest_v1(topics=['logging'])
Response 3: MetadataResponse_v1(brokers=[(node_id=0, host='192.168.95.168', port=9092, rack=None)], controller_id=0, topics=[(error_code=0, topic='logging', is_internal=False, partitions=[(error_code=0, partition=0, leader=0, replicas=[0], isr=[0])])])
Updated cluster metadata to ClusterMetadata(brokers: 1, topics: 1, groups: 0)
Allocating a new 16384 byte message buffer for TopicPartition(topic='logging', partition=0)
Exception occurred during message send: <class 'struct.error'>